Applied reactive streams
Summary:
You can merge the outputs of several observables into a single stream to simplify subscription logic.
You can use different merge strategies that contain different behavior for com- bining streams, depending on your needs.
You can interleave streams with merge(), cancel and switch to a new projected observable with switch(), or preserve entire observable sequences in order by using concat().
You can use split operators to combine and flatten a series of nested observable streams.
You can combine and project observables into a source observable using the higher-order operators such as mergeMap() and concatMap().
You implemented an auto-suggest search box.
You implemented a live stock ticker with deeply nested streams.
You implemented drag and drop using stream concatenation.
小节:
1.可以将多个流合并为单个流,以简化订阅逻辑
//merge()
const mouseUp$ = Rx.Observable.fromEvent(document, 'mouseup');
const touchEnd$ = Rx.Observable.fromEvent(document, 'touchend');
Rx.Observable.merge(mouseUp$, touchEnd$)
.do(event => console.log(event.type))
.map(event => {
switch(event.type) {
case 'touchend':
return {left: event.changedTouches[0].clientX,
top: event.changedTouches[0].clientY};
case 'mouseup':
return {left: event.clientX,
top: event.clientY};
}
})
.subscribe(obj =>
console.log(`Left: ${obj.left}, Top: ${obj.top}`));
2.可以根据不同的需求,使用不同的合并策略
//merge()
const mouseUp$ = Rx.Observable.fromEvent(document, 'mouseup');
const touchEnd$ = Rx.Observable.fromEvent(document, 'touchend');
const conformantMouseUp$ = mouseUp$.map(event => ({
left: event.clientX,
top: event.clientY
}));
const conformantTouchEnd$ = touchEnd$.map(event => ({
left: event.changedTouches[0].clientX,
top: event.changedTouches[0].clientY,
}));
Rx.Observable.merge(conformantMouseUp$, conformantTouchEnd$)
.subscribe(obj =>
console.log(`Left: ${obj.left}, Top: ${obj.top}`));
3.可以使用merge(),插入流;使用switch(),选择一个新的映射observable;使用concat(),按顺序保留整个的observable
//switch()
var clicks = Rx.Observable.fromEvent(document, 'click');
var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000));
var switched = higherOrder.switch();
switched.subscribe(x => console.log(x));
//每次点击重新返回,一个新的Observable.interval
//concat()
var timer = Rx.Observable.interval(1000).take(4);
var sequence = Rx.Observable.range(1, 10);
var result = timer.concat(sequence);
result.subscribe(x => console.log(x));
//间隔1s,打印0 1 2 3 ,无间隔的,打印1-10
4.可以使用分割opetators,组合压缩嵌套的ovservable流
5.使用mergeMap(),concatMap(),可以组合到一个源observable
//mergeMap()
var letters = Rx.Observable.of('a', 'b', 'c');
var result = letters.mergeMap(x =>
Rx.Observable.interval(1000).map(i => x+i)
);
result.subscribe(x => console.log(x));
//a0 b0 c0 a1 b1 c1...
//concatMap()
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMap(ev => Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));
//间隔1s:0 1 2 3 0 1 2 3...
6.实现一个auto-suggest search box
7.实现一个深层嵌套的实时股票
8.使用流的关联,实现拖拽