首先在 gradle 文件中添加依賴
implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
創建被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
創建觀察者
Observer observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "======================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "======================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "======================onError");
}
@Override
public void onComplete() {
Log.d(TAG, "======================onComplete");
}
};
訂閱
observable.subscribe(observer);
總結
被被觀察者就是數據的產生者,觀察者是數據的使用者,而訂閱,則是把產生者與使用者連接在一起,以便處理數據。
一個常見的場景是,網絡層源源不斷的接受數據,然后向上層發射數據包。而上層則依次處理一個一個的數據包。