响应式编程

什么是响应式编程

reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.
—- wikipedia

反应式编程是与数据流传播变化有关的编程模式。目前市面比较流行的库有 backbone.jsrxjs

喵喵喵?听着似乎与泛滥的 MV* 并无区别。

不明所以

André Staltz 浏览了很多关于 响应式编程 的书籍与和实践之后,给出了以下两个特征。

  1. anything can be a stream。
  2. 丰富的处理流函数

anything can be a stream

asynchronous event stream 在通用场景中是可以通过回调处理副作用的事件流,比如 onClick 等 dom 事件,类似发布订阅自定义事件。
响应式编程中可以发布 数据, 用户输入, 属性,任何操作和数据都可以是 event stream.

1
2
3
4
5
6
7
8
9
10
11
const observable = Rx.observable.create(observer => {
console.log('start executing');
observer.next('hello world')
});

observable.subscribe(value => {
console.log(value)
});
// 输出:
// start executing
// hello world

stream 是啥?

Stream 是一组执行中的事件,可以分发三种内容:value / error / “completed” 标志。分发的内容可以被监听 (subscribe) ,类似发布订阅的模式。

demo: stream 状态

click button 为例:

Click event stream

ASCII 展示:

1
2
3
4
5
6
--a---b-c---d---X---|->

a, b, c, d are emitted values
X is an error
| is the 'completed' signal
---> is the timeline
demo: stream 监听

通过 subscribe 监听事件。

1
2
3
const observable = Rx.observable.fromEvent(button, 'click')
.map(event => event.clientX);
observable.subscribe(value => {console.log(value)});

丰富的处理流函数

  • merge: 合并多个 stream
  • buffer:缓存 stream,直到 stream emit
  • filter : 类似数组的 filter,返回一组新 stream
  • map : 类似数组的 map,返回一组新 stream

假设我们希望得到 “double click” 流,并且允许短时的多次点击也等同于 “double click”。

1
2
3
4
var singleClickStream = clickStream
.buffer(function() { return clickStream.throttle(250); })
.map(function(list) { return list.length; })
.filter(function(x) { return x === 1; });

Multiple clicks stream

why use it?

与 redux,mobx 相比

  1. 便于处理多个异步事件,相比 Promise 可以有多个返回值,以及可以进行更复杂的处理。
  2. 事件(异步、同步)处理标准化。

对应中后台的场景:

  1. 控制多个异步行为,尤其是异步行为存在依赖关系
  2. dashboard 在不同情况下需要更新不同的组件

rxjs

rxjs 是最流行的响应式库之一,我们以之为例来展示响应式编程是如何应用的。

  • Observable可观察对象:可观察对象是数据流的源头,可以来自事件,网络,也可以自定义数据流。
  • Observer观察者:通过订阅可观察对象,即可获得观察者,观察者拦截处理数据流,可以视为数据流的终点
  • Operator操作符:操作符主要用于数据流的转换操作
  • Subject主题:集Observable和Observer的特点于一身,还可以用于广播事件流
  • Sheduler调度者:控制事件流的并发

show me some code!

下图是一个常见的搜索框,可以选择排序方式:热度(popularity) / 日期(date) 。如果搜索字段发生变更,或者排序方式变化则重新发起请求。

rxjs

完整代码

react + 非 rxjs

我们按照 Contianer / Present 组件的方式构建代码。Contianer 组件把搜索相关的字段(query / subject)都放在 state 中保存,同时暴露了修改 state 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
const SmartWrapper = Component => {
return class extends React.Component {

this.state = {
query: "react",
subject: SUBJECT.POPULARITY,
stories: []
};

onSelectSubject = subject => {
this.setState({ subject });
};
onChangeQuery = query => {
if (query) {
this.setState({ query });
}
};

// debounce 操作
debouncdFetch = debounce(({ subject, query }) => {
return mockData(`http://hn.algolia.com/api/v1/${subject}?query=${query}`)
.then(result => result.data.hits)
.then(stories => this.setState({ stories }));
}, 1000);

componentDidUpdate(prevProps, prevState) {
const { query: preQuery, subject: preSubject } = prevState;
const { query, subject } = this.state;
if (preQuery !== query || preSubject !== subject) {
this.debouncdFetch({ query, subject });
}
}

render() {
const triggers = {
onSelectSubject: this.onSelectSubject,
onChangeQuery: this.onChangeQuery
};
return <Component {...this.props} {...this.state} {...triggers} />;
}
};
};

export default SmartWrapper(App);

react + rxjs

Observable 观察内容

rxjs Observable 实质是 stream,我们可以把数据当做一种 stream .

BehaviorSubject 是一种 Oberservale, 即当前值就是观察项。如果值发生变化则会通知订阅者。

1
2
3
import {BehaviorSubject} from 'rxjs' 
const query$ = new BehaviorSubject("react"); // 搜索内容
const subject$ = new BehaviorSubject(SUBJECT.POPULARITY); // 排序方式

功能上 query$subject$ 发生变化会触发 fetch 行为。我们也可以把query$subject$ 的变化合成一个 Observable, Observable 的返回异步行为(fetch stories)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 因为
// 1. 输入时不搜索(debounce)
// 2. text为空时不搜索
const queryForFetch$ = query$.pipe(
debounce(() => timer(1000)),
filter(query => query !== "")
);

const fetch$ = combineLatest(subject$, queryForFetch$).pipe(
flatMap(([subject, query]) =>
mockData(`http://hn.algolia.com/api/v1/${subject}?query=${query}`)
),
map(result => result.data.hits)
);
Observable 变化函数 trigger

Observable.next 可以发送值给观察者。

1
2
const onSelectSubject = subject => subject$.next(subject),
const onChangeQuery = value => query$.next(value)
react 订阅 Observable

rxjs subscribe 可以订阅 Observable 的变化,进行副作用操作。

与非 rxjs 的实现相同,我们会把 Observable 都在 state 中储存。如果 Observable 发生变化需要触发 setState, 通知 react 发生更新。因为这部分行为具有一定的通用性,所以我们把它放在 HOC 中处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// withObservableStream.js

export default (observable, triggers, initialState) => Component => {
return class extends React.Component {
constructor(props) {
super(props);

this.state = {
...initialState,
};
}

componentDidMount() {
this.subscription = observable.subscribe(newState =>
this.setState({ ...newState }),
);
}

componentWillUnmount() {
this.subscription.unsubscribe();
}

render() {
return (
<Component {...this.props} {...this.state} {...triggers} />
);
}
};
};
最终代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
const query$ = new BehaviorSubject("react");
const subject$ = new BehaviorSubject(SUBJECT.POPULARITY);

// 1. 输入时不搜索(debounce)
// 2. text为空时不搜索
const queryForFetch$ = query$.pipe(
debounce(() => timer(1000)),
filter(query => query !== "")
);

// combineLatest:合并两个流为新流,并取两个流最近值
const fetch$ = combineLatest(subject$, queryForFetch$).pipe(
flatMap(([subject, query]) =>
mockData(`http://hn.algolia.com/api/v1/${subject}?query=${query}`)
),
map(result => result.data.hits)
);

export default withObservableStream(
combineLatest(subject$, query$, fetch$, (subject, query, stories) => ({
subject,
query,
stories
})),
{
onSelectSubject: subject => subject$.next(subject),
onChangeQuery: value => query$.next(value)
},
{
query: "react",
subject: SUBJECT.POPULARITY,
stories: []
}
)(App);
总结

这两段代码相比 rxjs 版的业务逻辑更加精简,清晰,可以很好地与 UI 分离。但是 rxjs 复杂度更高,理解与学习成本高。
rxjs 可以与 state 一起管理数据,除此之外还可以与 redux 一起配合(redux-observable)在 Redux 中使用到 RxJS 所提供的函数响应式编程(FRP)的能力。

后续

Rxjs 有以下两个优点
1、 Observale 的形态是流,扩展方便。
2、异步事件标准化

因为这两个优点,rxjs 十分适用于需要一定扩展度的公共组件。会尝试使用 rxjs 在 papaya-ui 中,解决固定场景的数据管理问题,比如 Dashboard 或者 search box + table 的场景。

资料

https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
https://zhuanlan.zhihu.com/p/31623736
https://www.robinwieruch.de/react-rxjs-state-management-tutorial
http://www.alloyteam.com/2016/12/learn-rxjs/
https://juejin.im/post/5b798501f265da43473130a1

如何优雅使用 hooks

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×