ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想。 实时数据处理是一件普通的现象,有一个高效、干净和可扩展的方式来处理这些情景是重要的。使用 Observables 和 Operators 来熟练操作它们。ReactiveX 提供一个可组合又灵活的 API 来创建和处理数据流,同时简化了异步编程带来的一些担忧,如:线程创建和并发问题。
RxJava是ReactiveX在Java上的开源的实现。Observable(观察者)和Subscriber(订阅者)是两个主要的类。在RxJava上,一个Observable是一个发出数据流或者事件的类,Subscriber是一个对这些发出的items(数据流或者事件)进行处理(采取行动)的类。一个Observable的标准流发出一个或多个item,然后成功完成或者出错。一个Observable可以有多个Subscribers,并且通过Observable发出的每一个item,该item将会被发送到Subscriber.onNext()方法来进行处理。一旦Observable不再发出items,它将会调用Subscriber.onCompleted()方法,或如果有一个出错的话Observable会调用Subscriber.onError()方法。现在,我们知道了很多关于Observable和Subscriber类,我们可以继续去介绍有关Observables 的创建和订阅。
Observable integerObservable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); subscriber.onCompleted(); } });
这个 Observable 发出了整数 1,2,3 然后结束了。现在我们需要创建一个 Subscriber,那样我们就能让这些发出的流起作用。
Subscriber integerSubscriber = new Subscriber() { @Override public void onCompleted() { System.out.println("Complete!"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer value) { System.out.println("onNext: " + value); } };
我们的 Subscriber 只是简单的把任何发出的 items 打印出来,完成之后通知我们。一旦你有一个 Observable 和一个 Subscriber,可以通过 Observable.subscribe() 方法将他们联系起来。
integerObservable.subscribe(integerSubscriber); // Outputs: // onNext: 1 // onNext: 2 // onNext: 3 // Complete!
上面所有这些代码可以简单的通过使用 Observable.just() 方法来创建一个 Observable 去发出这些定义的值,并且我们的 Subscriber 可以改变成匿名的内部类,如下:
Observable.just(1, 2 ,3).subscribe(new Subscriber() { @Override public void onCompleted() { System.out.println("Complete!"); } @Override public void onError(Throwable e) {} @Override public void onNext(Integer value) { System.out.println("onNext: " + value); } });
RxJava地址:https://github.com/ReactiveX/RxJava
RxAndroid地址:https://github.com/ReactiveX/RxAndroid
通过请求openweathermap 的天气查询接口返回天气数据
1、增加编译依赖
dependencies { compile fileTree(dir: 'libs', include: ['*.jar']) compile 'com.android.support:appcompat-v7:22.0.0' compile 'io.reactivex:rxjava:1.0.9' compile 'io.reactivex:rxandroid:0.24.0' compile 'com.squareup.retrofit:retrofit:1.9.0' }
retrofit 是一个 restful 请求客户端。详见:https://square.github.io/retrofit/
2、服务器接口
/** * 接口 * Created by Hal on 15/4/26. */ public class ApiManager { private static final String ENDPOINT = "https://api.openweathermap.org/data/2.5"; /** * 服务接口 */ private interface ApiManagerService { @GET("/weather") WeatherData getWeather(@Query("q") String place, @Query("units") String units); } private static final RestAdapter restAdapter = new RestAdapter.Builder() .setEndpoint(ENDPOINT).setLogLevel(RestAdapter.LogLevel.FULL).build(); private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class); /** * 将服务接口返回的数据,封装成{@link rx.Observable} * @param city * @return */ public static Observable<WeatherData> getWeatherData(final String city) { return Observable.create(new Observable.OnSubscribe<WeatherData>() { @Override public void call(Subscriber<? super WeatherData> subscriber) { // 订阅者回调 onNext 和 onCompleted subscriber.onNext(apiManager.getWeather(city, "metric")); subscriber.onCompleted(); } }).subscribeOn(Schedulers.io()); } }
订阅者的回调有三个方法,onNext,onError,onCompleted
3、接口调用
/** * 多个 city 请求 * map,flatMap 对 Observable进行变换 */ Observable.from(CITIES).flatMap(new Func1<String, Observable<WeatherData>>() { @Override public Observable<WeatherData> call(String s) { return ApiManager.getWeatherData(s); } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<WeatherData>() { // onNext @Override public void call(WeatherData weatherData) { Log.d(LOG_TAG, weatherData.toString()); } }, new Action1<Throwable>() { // onError @Override public void call(Throwable throwable) {} }); /** * 单个 city 请求 */ ApiManager.getWeatherData(CITIES[0]).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<WeatherData>() { @Override public void call(WeatherData weatherData) { Log.d(LOG_TAG, weatherData.toString()); ((TextView) findViewById(R.id.text)).setText(weatherData.toString()); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { Log.e(LOG_TAG, throwable.getMessage(), throwable); } }); /** * Android View 事件处理 */ ViewObservable.clicks(findViewById(R.id.text), false).subscribe(new Action1<OnClickEvent>() { @Override public void call(OnClickEvent onClickEvent) {} });
subscribeOn(Schedulers.io())与observeOn(AndroidSchedulers.mainThread())分别定义了这两个动作的线程。Android UI 更新需要在主线程。
4、retrofit 支持 rxjava 整合
/** * 服务接口 */ private interface ApiManagerService { @GET("/weather") WeatherData getWeather(@Query("q") String place, @Query("units") String units); /** * retrofit 支持 rxjava 整合 * 这种方法适用于新接口 */ @GET("/weather") Observable<WeatherData> getWeatherData(@Query("q") String place, @Query("units") String units); }