Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
247 views
in Technique[技术] by (71.8m points)

Why does mergeMap in RxJS remember history when it's not a ReplaySubject

Two buttons on screen with clicks plumbed in like this.

  b1c: number;
  b1s: Subject<number>;
  o1: Observable<number>;
  b2c: number;
  b2s: Subject<number>;
  o2: Observable<number>;


  rxjs(): void {
    this.b1c = 0;
    this.b1s = new Subject<number>()

    this.b2c = 0;
    this.b2s = new Subject<number>()

    this.b1s.pipe(
      tap (z => console.log('do', z)),
      mergeMap(z1 => this.b2s.pipe(
        map(z2 => `${z1} / ${z2}`)
      ))
    ).subscribe(z => console.log(z));

    // When you click button 1 nothing happens.
    // When you click button 2 you get output i/j for all values of i from 0 to current, and the current value of j.
    // This is incorrect because this.b1s is not a ReplaySubject and therefore should not remember the previous values.
  }

  buttonClick(buttonNumber: number) {
    if (buttonNumber === 1) {
      this.b1s.next(this.b1c);
      this.b1c++;
    }
    else {
      this.b2s.next(this.b2c);
      this.b2c++;
    }
  }

Can you explain this behaviour? (Button clicks: 1, 1, 1, 1, 2, 1, 1, 1, 2.)

enter image description here

It should be only

5 / 1
6 / 1
7 / 1

because 0 to 4 have already been consumed; there is no reason why they should be remembered.

In fact maybe there should be no output at all because at no time do the two observables simultaneously fire -- you can't click two buttons at once.

How do you explain this behaviour, and how is it possible to deduce this from the documentation?

Furthermore: I don't understand why there are three 30s and three 50s and why they're mixed up. There should be six outputs becuase there are six events. https://rxjs-dev.firebaseapp.com/api/operators/flatMap And we still have this business about it remembering the last value from the other source.

And what on earth is

10*i--10*i--10*i-|

supposed to mean?

question from:https://stackoverflow.com/questions/65899033/why-does-mergemap-in-rxjs-remember-history-when-its-not-a-replaysubject

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

What you understand as 'remembering history' are just active subscriptions.

obs1.pipe(mergeMap(v => obs2)) maps each value from the source (obs1) to a stream (obs2). So you end up with multiple obs2 streams having an active subscription at the same time. These obs2 streams are merged in one output stream and run until they complete, error or you unsubscribe.

In your case obs2 = b2s is a Subject which is a hot observable that doesn't terminate on its own. The final observables behaves like this:

this.b1s.pipe(
  mergeMap(z1 => this.b2s.pipe(
    map(z2 => `${z1} / ${z2}`)
  ))
).subscribe(z => console.log(z));


b1s:    --0--1--2--3--4-------5--6--7--------
b2s-7:    │  │  │  │  │       │  │  └--7/1---
b2s-6:    │  │  │  │  │       │  └-----6/1---
b2s-5:    │  │  │  │  │       └--------5/1---
b2s-4:    │  │  │  │  └--4/0-----------4/1---  
b2s-3:    │  │  │  └-----3/0-----------3/1---  
b2s-2:    │  │  └--------2/0-----------2/1---  
b2s-1:    │  └-----------1/0-----------1/1---  
b2s-0:    └--------------0/0-----------0/1---

output: -----------------0/0-----------0/1---
                         1/0           1/1
                         2/0           2/1 
                         3/0           3/1
                         4/0           4/1
                                       5/1
                                       6/1
                                       7/1

The subscriptions to b2s-0 - b2s-4 are still active when b2s emits 1.

If you don't want the inner streams to run indefinitely you have to terminate them somehow. You could use take(1) if you only want them to emit one value.

this.b1s.pipe(
  mergeMap(z1 => this.b2s.pipe(
    take(1), // <-- only emit 1 value and then complete
    map(z2 => `${z1} / ${z2}`)
  ))
).subscribe(z => console.log(z));


b1s:    --0--1--2--3--4-------5--6--7--------
b2s-7:    │  │  │  │  │       │  │  └--7/1|
b2s-6:    │  │  │  │  │       │  └-----6/1| 
b2s-5:    │  │  │  │  │       └--------5/1|
b2s-4:    │  │  │  │  └--4/0|  
b2s-3:    │  │  │  └-----3/0|  
b2s-2:    │  │  └--------2/0|  
b2s-1:    │  └-----------1/0|  
b2s-0:    └--------------0/0|  

output: -----------------0/0-----------5/1---       
                         1/0           6/1 
                         2/0           7/1 
                         3/0           
                         4/0           

In the documentation obs2 = 10----10----10-| is a cold observable that terminates after 3 emissions. It also generates the same 3 emissions for every subscriber (unlike your Subject).

ob1:    --1-----------------------------3--------------5----------------------------|
obs2-3:   │                             │              └10*5------10*5------10*5-|
obs2-2:   │                             └10*3------10*3------10*3-| 
obs2-1:   └10*1------10*1------10*1-|

output: ---10*1------10*1------10*1------10*3------10*3-10*5-10*3-10*5------10*5----|
  =     --- 10 ------ 10 ------ 10 ------ 30 ------ 30 - 50 - 30 - 50 ------ 50 ----|

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...