TakeWhile trap

Atualizado: Jun 16




First things first: RxJs takeWhile has no trap whatsoever, however, I ran into a small bug while using it in my code that took me some time to understand — and I figured it would be an interesting case to share. When it comes to subscribers that should be removed after some action/ event (on a component’s destroy, after some configuration is done, etc.), I have a rule of thumb:


takeWhile((value) => shouldThisSubscriptionBeAlive(value))

this way I ensure that I don’t leave unnecessary subscribers “alive”. Nevertheless, it should be used carefully to avoid scenarios like the one I came across. 😬

I will use an analogy and create a simple scenario to simplify the problem to the essentials.


Our scenario


Colorful toy trucks parked together in rows — Picture under the Creative Commons License (source: Flickr)



The analogy here is about a source of water that loads tanker trucks in order to distribute it in a nearby village (forgive my imagination but I was a dad recently)

  • Each truck can be loaded with 10 units of water;

  • the amount of time that source throughputs 1 unit of water is between 0 and 300 milliseconds;

  • each truck needs between 0 and 300 milliseconds to be prepared to start loading;

  • it can be loaded just one truck at a time.

The solution (conceptually identical to the one we had in our project) —don’t be scared, I’ll explain it next :)


Playground: https://stackblitz.com/edit/take-while-trap


Algorithm walkthrough

The water source is represented by a Subject that emit 1 unit of water per N milliseconds. The water cadence is mimicked by a timer that emit every T milliseconds (T ranges between 0 and 300 milliseconds).



const waterSource = new Subject();
const rollingWatter = timer(0, randomBetweenZeroAnd(300));

(...)

// start rolling water :)
rollingWater.subscribe(val => waterSource.next({ amount: 1, time: val }));

After that I created two functions: prepareTruck() and setTruckLoadingWater(). The former mimics the truck being prepared to start loading.


function prepareTruck(truck) {  
    return new Promise(function(resolve, reject) {       
        setTimeout(function() {      
           console.log(`Truck ${truck.id} is ready`);
           resolve();    
        }, randomBetweenZeroAnd(300));  
    });
}

The latter is where the magic happens. It prepares the truck, loads it up and, when the truck is fully loaded, it “grabs” another truck to start loading it up.


async function setTruckLoadingWater() {    

    const truck = {loaded: 0, capacity: 10, id: trucksCounter++};
    await prepareTruck(truck);
    thereIsATruckLoadingWater = true;  
    console.log(`Truck ${truck.id} starts loading`);     

    waterSource    
        .pipe(takeWhile(() => thereIsATruckLoadingWater))    
        .subscribe((waterAmount) => {      
            ++truck.loaded;
            if(truck.loaded === truck.capacity) {
                thereIsATruckLoadingWater = false;
                setTruckLoadingWater();
                console.log(`Truck ${truck.id} is fully loaded`);
            } else console.log(`Truck ${truck.id}, has ${truck.loaded}/${truck.capacity}`);  
        });
}

The problem

Everything seemed fine until we get this output🤔


Output captured in https://stackblitz.com/edit/take-while-trap



What the hell!?Trucks are being loaded even after hit maximum capacity? So, what is the problem here….?

Let’s (try to) make it simple: our takeWhile predicate is not prepared to the “reactive” nature of the subscription and we were assuming the subscription was unsubscribed right in the moment the takeWhile predicate evaluates as falsy — and that is (quite) wrong. takeWhile documentation states:


“takeWhile subscribes and begins mirroring the source Observable. Each value emitted on the source is given to the predicate function which returns a boolean, representing a condition to be satisfied by the source values. The output Observable emits the source values until such time as the predicate returns false, at which point takeWhile stops mirroring the source Observable and completes the output Observable.”

Basically takeWhile expression is evaluated only after a source Observable emission — and that is exactly our problem here. Between the moment a truck is loaded and another truck is prepared to be loaded, if there are no emissions from the water source, even with thereIsATruckLoadingWater being falsy, takeWhile predicate is not evaluated and therefore there is no unsubscription.


Let’s check the following table as an exemple of an arbitrary sequence.



Events sequence


As you can see in the above example, in between the event where the truck #1 is fully loaded and the truck #2 starts loading there is no water source emission, takeWhile predicate is not evaluated (while it is falsy) and therefore truck #1 keeps alive.



Solution

This problem is solved by just giving each subscription its own predicate (and not a shared one):

  • replace line 37 with

.pipe(takeWhile(() => truck.capacity !== truck.loaded))
  • and remove thereIsATruckLoadingWater variable.


Conclusion


RxJs is a powerful library and it came with the aim to democratize, diversify and simplify the usage of Observer pattern.

Having a tool like this we have the chance to solve a myriad of problem in a non-invasive way like, for instance, passing information between different parts of the Application without compromising the code structure, (mainly Angular 2.x developers know what I mean), nonetheless, as we have all heard before “With great power comes great responsibility” 🕷 an therefore we should use it carefully by understanding its asynchronous and reactive nature in order to avoid (or understand) problems like this.



BTW, has this ever happened to you? :)