RxJs: Beginning

RxJs is one of the vast topics in itself that we can write a complete article about. I have seen and read many articles with some important and limited topics with some code snippets but couldn’t find one such piece with a complete brief about it. So, here we go for something different.

Here we go::

=> The term RxJs stands for Reactive Extensions For Javascript. It allows us to work with an asynchronous data stream. It provides one core type:

I.e., Observable, and the other three types are Observer, Schedular and Subject.

Observables and Observers in RxJs

So, we use Observables to perform asynchronous operations and handle asynchronous data. We can manage asynchronous operations in Angular using either Promises or Observables. Now, what are asynchronous operations and asynchronous data? We already know that JavaScript is a single-threaded programming language, meaning the code is executed line by line.

Once one code is complete, only the next code in the program will be executed. So, if a task takes a long time in execution, we are making HTTP requests to the server. In that case, it is going to take some time. So, the next statement after that HTTP request will have to wait for its execution; it will only get executed when the HTTP request completes.

So, we can say that the synchronous code is blocking in nature, and this is because asynchronous programming comes into the picture. An asynchronous code runs in the background without stopping the execution of the code in the main thread. So, an asynchronous code is non-blocking. That means we can make HTTP requests asynchronously.

Related Read: Setting Up Axios Interceptors For HTTP Calls With Refresh Token

In that case, it will run in the background, and the next code after that HTTP request will get executed immediately in the main thread. So, in this case, the HTTP request will not block the next line of code. Using an asynchronous program, we can perform long network requests without blocking the main thread.

We can do this in two ways:
1. By using Promises
2. By using Observables

Now, the difference between Promises and Observables

Let’s say we are creating an application that needs data from the server, e.g., requesting a list of users from the server. In that case, we will send a request to the server from our application. Now, the server will get data from the database or web API.

So, the data we are requesting is huge. In that case, the server will take some time to gather its data. And once the data is available, it will create a response and send that data with the response to the client. So, here the server gathers all the data, and once it is ready, it sends all the data to the client. This is how a “Promise” works.

A Promise promises us some data, and it provides us the data over time, and it provides the data once complete data is ready. This is how a Promise works and deals with asynchronous data.

How does an Observable deal with asynchronous data?

Let’s say we are making an HTTP request to the server to get all the users from our database so that the server will collect the data from the database or the Web API. The observable will not wait for the complete data to be available.

An Observable streams the data, so it will send the data in packets; when some of the data is available, it will send the data, then again, it will collect the rest of the data and send it with the response.

It sends the data in chunks. It is not waiting for all the data to be available and then sending the data at once, it is streaming the data, and that is what the difference between a Promise and an Observable is!

Rxjs has two main players: the Observable, the stream of data, and the observer, which will use that data. To make this observer use the data immediately by this observable, the observer has to subscribe to that observable. So, we can also say that the observer is the subscriber of the observable.

import { Component, OnInit } from '@angular/core';

import { Observable } from 'rxjs';




@Component({

 selector: 'app-root',

 templateUrl: './app.component.html',

 styleUrls: ['./app.component.scss']

})

export class AppComponent implements OnInit {

 title = 'rxjs';




 myObservable = new Observable((observer) =>{

   console.log('observable')

   // Emit some data

   // to emit the data on this observer, we can call next()

   observer.next("1")

   observer.next("2")

   observer.next("3")

   observer.next("4")

 });

 // The observable will only emit the data if it has subscriber, so let's create a subscriber for myObservable




 ngOnInit(){

   // the subscribe() takes 3 optional parameters and these are callback()

   this.myObservable.subscribe((val) => {

     console.log(val,"::val");

   });

// the next parameter is a callback() & it gets executed every time the next() returns a value

   // in this example we are subscribing myObservable, this next callback() will be called 4 times

 }







}

Here, we can see that the data has been streamed one by one. This is how the observable works.!

Hire Java Developers With Unparalleled Flexibility

Operators in Rxjs:

We use operators a lot when dealing with RxJs, so it’s important to understand what operators are and how we can use them on an Observable. So, operators in RxJs are simply functions that take an Observable as an input, transform it into a new Observable, and then return it.

We use operators to manipulate the observable data stream. Let’s have a look at some operators of RxJs:

Map() in RxJs :

The map operator transforms the data whatsoever coming from the server. The transformation operator is basically used to transform the items emitted by an observable by applying a function to each item.

As we can see in the below code snippet, we are creating an observable using the ‘from’ operator and to the from an operator, we are passing an array, and the array contains 5 values. So the observable from operator create will emit those 5 values present in the array i.e., 1, 2, 4, 6, 8. And after emitting all data, it will also emit a complete signal because we have created this observable using this operator.

Now, using an operator, we can transform the data returned by this observable, and then we can return a new observable with the transformed data. So, this observable returns 1, 2, 4, 6, 8. Now, what we want is, we want to transform the data emitted by this observable.

Let’s say we want to multiply each of the values emitted by this observable by 5, and then we want to return the transformed data. So, the new observable will return 5, 10, 20, 30, and 40. Such things can be achieved by using operators on this observable. So, we are using a map operator to achieve this.

The map operator is a function that takes a callback function as its argument. And this callback function is going to receive a value. It will receive a value the source observable will emit, i.e., myObservable. Now, this map operator will return a new observable, and that new observable will emit the transformed data from the source observable.

Related Read: Introduction To Mapbox With React 

So, we have assigned it to a new property i.e transformedObs. So, this transformedObs will emit the transformed data i.e 5, 10, 20, 30, 40. And we subscribed to transformedObs instead of myObservable. And we will get the transformed data.

import { Component, OnInit } from '@angular/core';
import { from, Observable, of} from 'rxjs';
import { map } from 'rxjs/operators';

@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit {
title = 'rxjs';

arr1 = [1,2,4,6,8];
arr2 = ['R', 'A', 'S' ];

// Created an observable through from operator and passing an array
myObservable = from(this.arr1); // 1, 2, 4, 6, 8 => 5, 10, 20, 30, 40
transformedObs = this.myObservable.pipe(map((val) => {
return val * 5;
}))

ngOnInit(){
this.transformedObs.subscribe((val) => {
console.log(val,"::val");
},(error) => {
alert(error.message);
},() => {
alert('Observable has complete emitting values ')
});

}
}

Here’s the output:

Filter() in RxJs:

A filtering operator filters data emitted by the source observable according to the specified condition. Let’s say, from the above-transformed data, we only want to emit data that satisfies a condition. Let’s say we only want to emit those data, which is either >= 30.

So, here we only want to emit 30 and 40. So basically, we want to filter some data on a given condition from this data. For that, we can use the filter operator.

import { Component, OnInit } from '@angular/core';
import { from, Observable, of} from 'rxjs';
import { map, filter } from 'rxjs/operators';

@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit {
title = 'rxjs';

arr1 = [1,2,4,6,8];
arr2 = ['R', 'A', 'S' ];

// Created an observable through from operator and passing an array
myObservable = from(this.arr1); // 1, 2, 4, 6, 8 => 5, 10, 20, 30, 40
transformedObs = this.myObservable.pipe(map((val) => {
return val * 5;
}))

// Here source observable i.e transformedObs is emitting => 5, 10, 20, 30, 40

filterObs = this.transformedObs.pipe(filter((val) => {
return val >= 30;

}))

ngOnInit(){
this.filterObs.subscribe((val) => {
console.log(val,"::val");
},(error) => {
alert(error.message);
},() => {
alert('Observable has complete emitting values ')
});

}
}

Here’s the output :

Here we can see that our output satisfies the condition that we have given and it returns a new observable with filtered data!

ForkJoin() in RxJs:

Forkjoin works similar to Promise.all in Javascript. I.e. let’s take an example:
Let’s say there are two observables. On that note, when both the observables are done, it will take the last value of the first and second data, respectively. So it combines the final values and gives the result.

We can give an array of observables in input and it will emit value once all the observables are finished loading in parallel.

Usage of ForkJoin():

  • To call multiple remote API in parallel – e.g If we have to call 3 API’S simultaneously.
  • To call mixed observables – e.g If we have to get the result of an API and a transformed object data.
import { Component, OnInit } from '@angular/core';
import { forkJoin } from 'rxjs';
import { DataService } from '../data.service';


@Component({
  selector: 'app-comp1',
  templateUrl: './comp1.component.html',
  styleUrls: ['./comp1.component.scss']
})
export class Comp1Component implements OnInit {

  constructor( private dataService: DataService) { }

  ngOnInit(): void {
  }
  onClick(){
      // simulate 3 requests
    forkJoin({
      getUserById: this.dataService.getUserById(1),  //user detail for id 1
      getProductById: this.dataService.getProductById(1),//product detail by id 1
      getCartById: this.dataService.getCartById(1) //cart

detail by id 1
    })
      .subscribe(({ getUserById, getProductById, getCartById }) => {
        console.log('userDetails',getUserById);
        console.log('productDetails', getProductById);
        console.log('cartDetails', getCartById);
      });
  }
}

Here, we can see that the 3 APIs have been called in parallel.

coma

Conclusion

RxJs has provided us with many more operators, which we can dive deeper into. Till then, enjoy reading this!

Keep Reading

Keep Reading

Struggling with EHR integration? Learn about next-gen solutions in our upcoming webinar on Mar 6, at 11 AM EST.

Register Now

Let's create something together!