Applied reactive streams

Summary:

  • You can merge the outputs of several observables into a single stream to simplify subscription logic.

  • You can use different merge strategies that contain different behavior for com- bining streams, depending on your needs.

  • You can interleave streams with merge(), cancel and switch to a new projected observable with switch(), or preserve entire observable sequences in order by using concat().

  • You can use split operators to combine and flatten a series of nested observable streams.

  • You can combine and project observables into a source observable using the higher-order operators such as mergeMap() and concatMap().

  • You implemented an auto-suggest search box.

  • You implemented a live stock ticker with deeply nested streams.

  • You implemented drag and drop using stream concatenation.

小节:

1.可以将多个流合并为单个流,以简化订阅逻辑

//merge()
const mouseUp$ = Rx.Observable.fromEvent(document, 'mouseup');
const touchEnd$ = Rx.Observable.fromEvent(document, 'touchend');

  Rx.Observable.merge(mouseUp$, touchEnd$)
  .do(event => console.log(event.type))
  .map(event => {
      switch(event.type) {
         case 'touchend':
            return {left: event.changedTouches[0].clientX,
                    top: event.changedTouches[0].clientY};
         case 'mouseup':
            return {left: event.clientX,
                    top:  event.clientY};
      }
  })
  .subscribe(obj =>
      console.log(`Left: ${obj.left}, Top: ${obj.top}`));

2.可以根据不同的需求,使用不同的合并策略

//merge()
const mouseUp$ = Rx.Observable.fromEvent(document, 'mouseup');
const touchEnd$ = Rx.Observable.fromEvent(document, 'touchend');

const conformantMouseUp$ = mouseUp$.map(event => ({
  left: event.clientX,
  top: event.clientY
}));

const conformantTouchEnd$ = touchEnd$.map(event => ({
  left: event.changedTouches[0].clientX,
  top: event.changedTouches[0].clientY,
}));

Rx.Observable.merge(conformantMouseUp$, conformantTouchEnd$)
   .subscribe(obj =>
      console.log(`Left: ${obj.left}, Top: ${obj.top}`));

3.可以使用merge(),插入流;使用switch(),选择一个新的映射observable;使用concat(),按顺序保留整个的observable

//switch()
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var switched = higherOrder.switch();
switched.subscribe(x => console.log(x));

//每次点击重新返回,一个新的Observable.interval
//concat()
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = timer.concat(sequence);
result.subscribe(x => console.log(x));

//间隔1s,打印0 1 2 3 ,无间隔的,打印1-10

4.可以使用分割opetators,组合压缩嵌套的ovservable流

5.使用mergeMap(),concatMap(),可以组合到一个源observable

//mergeMap()
var letters = Rx.Observable.of('a', 'b', 'c');
var result = letters.mergeMap(x =>
Rx.Observable.interval(1000).map(i => x+i)
);
result.subscribe(x => console.log(x));

//a0 b0 c0 a1 b1 c1...
//concatMap()
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMap(ev => Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));

//间隔1s:0 1 2 3 0 1 2 3...

6.实现一个auto-suggest search box

7.实现一个深层嵌套的实时股票

8.使用流的关联,实现拖拽

results matching ""

    No results matching ""