RxJS Subject in depth explained within the Angular framework

1

Angular is a framework that highly relies on the use of RxJS Observables. We use Observables to push and receive data between a publisher (the Observable) and a subscriber (the Observer). You can think of it like a newspaper that you subscribe to. As long as you have an active subscription, the mailman will pass your house and throw the newspaper in your mailbox. Unfortunately in software nothing is ever that simple. There are lots of different use cases and requirement for different applications. With a normal observable every subscriber receives it’s own execution of the observable. This means that the mailman needs to pass each house and every subscriber will receive the newspapers one house after another. But what if you need every subscriber to receive the values simultaneous? In that case we can use the RxJS Subject.

The RxJS subject is a special kind of Observable that allows you to multicast values to subscribers. That means that every subscriber of the Subject will receive the new values at the same time. When the Subject sends out more then one value, each subscriber will first receive the first value and then everyone will receive the next value and so on.

import { Component, OnInit } from '@angular/core';
import { Subject, Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit {
  // Subject that can emit string values
  subjectA$ = new Subject<String>();
	
  // Observable emitting 2 string values
  observalueA$ = new Observable<String>(subscriber => {
    subscriber.next('First value observable');
    subscriber.next('Second value observable');
  });

  constructor() {  }

  ngOnInit(): void {
	// First subsrciption to the subject
    this.subjectA$.subscribe(value => {
        console.log(value);
    });

	// Second subsrciption to the subject
    this.subjectA$.subscribe(value => {
        console.log(value);
    });

	// First subsrciption to the Observable
    this.observalueA$.subscribe(val => {
        console.log(val);
    });

	// Second subscription to the Observable
    this.observalueA$.subscribe(val => {
        console.log(val);
    });

	// Emitting 2 values with the subject
    this.subjectA$.next('First value subject');
    this.subjectA$.next('Second value subject');
  }
}

In the example above we create a RxJS Subject and a normal RxJS Observable and log the values to the console. We strong typed the Observable and the Subject to only accept String values. You can change, remove or add any to the type parameter if you need more flexibility here. Al property names also end with a $, this is a best practice when naming Observables. If you now check the console in the browser you see the following output:

First value observable
Second value observable
First value observable
Second value observable

First value subject
First value subject
Second value subject
Second value subject

The observable will send all .next() defined values to each subscriber before moving on to the next subscriber. The values of the RxJS Subject are received different. When a new value is emitted with the .next() method, each subscriber of the Subject will receive the values at the same time. This is why the console results of the subject are 2 times first value and then 2 times second value. Another difference is that we can call the .next() method on the property holding our RxJS Subject, this is not possible with the Observable.

Calling methods on RxJS Subjects

The RxJS subject exposes 3 a few methods that you can call. The most important methods are:

  • .next() sends new values to subscribers
  • .error() sends errors to subscribers and closes the observable stream
  • .complete() closes the observable stream and calls a callback function
  • .unsubscribe() unsubscribes from the observable stream

The ,next method is already demonstrated in the previous example, so lets move on the the error method. To receive the errors we throw with the .error() method we need to change the subscribers a bit. We add a callback function that receives the error from the .error(). Calling this method also closes the observable stream. Meaning that subscribers do not receive the values send after the .error() is called.

import { Component, OnInit } from '@angular/core';
import { Subject, Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit {
  subjectA$ = new Subject<String>();

  constructor() {  }

  ngOnInit(): void {
    this.subjectA$.subscribe(value => {
        console.log(value);
    },
    error => console.log(error));

    this.subjectA$.subscribe(value => {
        console.log(value);
    },
    error => console.log(error));

    if (true) {
      this.subjectA$.next('First value subject');
    } else {
      this.subjectA$.error('Something went wrong');
    }

    if (false) {
      this.subjectA$.next('Second value subject');
    } else {
      this.subjectA$.error('Something went wrong');
    }

    this.subjectA$.next('Third value subject');


  }
}

The example above will log the following values to the console:

First value subject
First value subject
Something went wrong
Something went wrong

The error closes the observable stream and we never receive the values send after that. Beside the error we can also call the complete() method. The complete method of the RxJS subject also closes the observable stream. In the subscription we can also add a callback function to execute when the complete method is called on the RxJS Subject.

import { Component, OnInit } from '@angular/core';
import { Subject, Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit {
  subjectA$ = new Subject<String>();


  constructor() {  }

  ngOnInit(): void {
    this.subjectA$.subscribe(value => {
        console.log(value);
    },
    error => console.log(error),
    () => { console.log('Subscription completed'); });

    this.subjectA$.subscribe(value => {
        console.log(value);
    },
    error => console.log(error),
    () => { console.log('Subscription completed'); });

    if (true) {
      this.subjectA$.next('First value subject');
    } else {
      this.subjectA$.error('Something went wrong');
    }

    if (true) {
      this.subjectA$.next('Second value subject');
    } else {
      this.subjectA$.error('Something went wrong');
    }

    this.subjectA$.complete();
    this.subjectA$.next('Third value subject');


  }
}

This will result in the following values in the console:

First value subject
First value subject
Second value subject
Second value subject
Subscription completed
Subscription completed

We close the observable stream before we emit the last value to the subscribers. Because of that the subscribers never receive the last value we send to them. We do receive the second value because the if statement passes this time. After that we call the complete method, triggering the callback functions and logging: ‘Subscription completed’.

 

Unsubscribing from an RxJS Subject

As with normal Observables we need to unsubscribe our Subject subscriptions to prevent memory leaks. If we leave our subscriptions open we will create new subscription when we get back to the same page. We also will receive values when we call the subject from other places within our application. This will lead to excessive function calls, unwanted behavior and memory leakage.

The most common way to to unsubscribe observables in Angular applications is to call the unsubscribe method in the ngOnDestroy lifecycle hook. But we can also call the complete method like we saw in the example above. The .complete() method also ends the subscription and prevents memory leaks. But because the .unsubscribe() is the most commonly used and recommended way I will explain a bit more about it.

We can unsubscribe all subscriptions from one Subject at once, by calling the unsubscribe on the Subject instance.

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject, Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit, OnDestroy {
  subjectA$ = new Subject<String>();

  constructor() {  }

  ngOnInit(): void {
    const subscriptionOne$ = this.subjectA$.subscribe(value => {
        console.log(value);
    });

    const subscriptionTwo$ = this.subjectA$.subscribe(value => {
        console.log(value);
    });

    this.subjectA$.next('First value subject');
    this.subjectA$.next('Second value subject');
  }

  ngOnDestroy(): void {
    this.subjectA$.unsubscribe();
  }
}

This will unsubscribe all subscriptions at once. In some cases you might want to unsubscribe some subscriptions early. This is possible by saving the subscription in a variable and then unsubscribing that individual subscription.

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject, Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit, OnDestroy {
  subjectA$ = new Subject<String>();

  constructor() {  }

  ngOnInit(): void {
    const subscriptionOne$ = this.subjectA$.subscribe(value => {
        console.log(value);
    });

    const subscriptionTwo$ = this.subjectA$.subscribe(value => {
        console.log(value);
    });

    this.subjectA$.next('First value subject');
    this.subjectA$.next('Second value subject');

    subscriptionTwo$.unsubscribe();

    this.subjectA$.next('Third value subject');
  }

  ngOnDestroy(): void {
    this.subjectA.unsubscribe();
  }
}

The example above will output these values to the console:

First value subject
First value subject
Second value subject
Second value subject
Third value subject

The third value is only logged once, because one subscription has been unsubscribed before the value is emitted. In a lot of cases you will also subscribe to multiple Subjects. If you need to unsubscribe to all of them, it will become easier to miss one and create a memory leak. But there is a trick to unsubscribe multiple subscription to different Subjects. To do this we need a property that is an instance of the Subscription class.

We can use this property to add subscriptions from multiple Subjects. In the ngOnDestroy we then use this property to unsubscribe all subscriptions at once.

import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subject, Observable, Subscription } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit, OnDestroy {
  subjectA$ = new Subject<String>();
  subjectB$ = new Subject<String>();
  subjectC$ = new Subject<String>();

  subscriptions$ = new Subscription();

  constructor() {  }

  ngOnInit(): void {
    const subscriptionOne$ = this.subscriptions$.add(this.subjectA$.subscribe(value => {
        console.log(value);
    }));

    this.subscriptions$.add(this.subjectB$.subscribe(val => {
      console.log(val);
    }));

    this.subscriptions$.add(this.subjectC$.subscribe(val => {
      console.log(val);
    }));


    this.subjectA$.next('A value subject');
    this.subjectB$.next('B value subject');
    this.subjectC$.next('C value subject');
  }

  ngOnDestroy(): void {
    this.subscriptions$.unsubscribe();
  }
}

 

Key points about RxJS Subject

  • RxJS Subject is a special type of Observable that multi-casts values
  • You subscribe to the RxJS Subject to receive it’s values
  • A Subject can have multiple subscribers
  • You use the .next() method to send new values to subscribers
  • The Subscribers can receive values, errors and execute a completion callback function
  • You need to end the data stream to prevent memory leaks with the unsubscribe or complete method
  • You can save subscriptions in variables to handle them individually

If you have any questions feel free to ask them in the comments, I will get back to you as soon as possible. If this article was helpful to you, it helps a lot to share the post or leave a comments, so I appreciate it!!

Happy coding =)

Share.

1 Comment

Leave A Reply