RxJava简介和入门

RxJava是ReactiveX在Java上的开源的实现。Observable(观察者)和Subscriber(订阅者)是两个主要的类。在RxJava上,一个Observable是一个发出数据流或者事件的类,Subscriber是一个对这些发出的items(数据流或者事件)进行处理(采取行动)的类。一个Observable的标准流发出一个或多个item,然后成功完成或者出错。一个Observable可以有多个Subscribers,并且通过Observable发出的每一个item,该item将会被发送到Subscriber.onNext()方法来进行处理。

什么是ReactiveX?

ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想。 实时数据处理是一件普通的现象,有一个高效、干净和可扩展的方式来处理这些情景是重要的。使用 Observables 和 Operators 来熟练操作它们。ReactiveX 提供一个可组合又灵活的 API 来创建和处理数据流,同时简化了异步编程带来的一些担忧,如:线程创建和并发问题。

什么是RxJava?

RxJava是ReactiveX在Java上的开源的实现。Observable(观察者)和Subscriber(订阅者)是两个主要的类。在RxJava上,一个Observable是一个发出数据流或者事件的类,Subscriber是一个对这些发出的items(数据流或者事件)进行处理(采取行动)的类。一个Observable的标准流发出一个或多个item,然后成功完成或者出错。一个Observable可以有多个Subscribers,并且通过Observable发出的每一个item,该item将会被发送到Subscriber.onNext()方法来进行处理。一旦Observable不再发出items,它将会调用Subscriber.onCompleted()方法,或如果有一个出错的话Observable会调用Subscriber.onError()方法。现在,我们知道了很多关于Observable和Subscriber类,我们可以继续去介绍有关Observables 的创建和订阅。

Observable integerObservable = Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext(1);
        subscriber.onNext(2);
        subscriber.onNext(3);
        subscriber.onCompleted();
    }
});

这个 Observable 发出了整数 1,2,3 然后结束了。现在我们需要创建一个 Subscriber,那样我们就能让这些发出的流起作用。

Subscriber integerSubscriber = new Subscriber() {
    @Override 
    public void onCompleted() { 
        System.out.println("Complete!"); 
    } 
    
    @Override 
    public void onError(Throwable e) { } 
    
    @Override 
    public void onNext(Integer value) { 
        System.out.println("onNext: " + value); 
    } 
};

我们的 Subscriber 只是简单的把任何发出的 items 打印出来,完成之后通知我们。一旦你有一个 Observable 和一个 Subscriber,可以通过 Observable.subscribe() 方法将他们联系起来。 

integerObservable.subscribe(integerSubscriber); 
// Outputs: 
// onNext: 1 
// onNext: 2 
// onNext: 3 
// Complete!

上面所有这些代码可以简单的通过使用 Observable.just() 方法来创建一个 Observable 去发出这些定义的值,并且我们的 Subscriber 可以改变成匿名的内部类,如下: 

Observable.just(1, 2 ,3).subscribe(new Subscriber() { 
    @Override 
    public void onCompleted() { 
        System.out.println("Complete!"); 
    } 
    
    @Override 
    public void onError(Throwable e) {} 
    
    @Override 
    public void onNext(Integer value) { 
        System.out.println("onNext: " + value); 
    } 
});

RxJava地址:https://github.com/ReactiveX/RxJava

RxAndroid地址:https://github.com/ReactiveX/RxAndroid

实例

通过请求openweathermap 的天气查询接口返回天气数据

1、增加编译依赖

dependencies {
    compile fileTree(dir: 'libs', include: ['*.jar'])
    compile 'com.android.support:appcompat-v7:22.0.0'
    compile 'io.reactivex:rxjava:1.0.9'
    compile 'io.reactivex:rxandroid:0.24.0'
    compile 'com.squareup.retrofit:retrofit:1.9.0'
}

retrofit 是一个 restful 请求客户端。详见:https://square.github.io/retrofit/

2、服务器接口

/**
 * 接口
 * Created by Hal on 15/4/26.
 */
public class ApiManager {
    private static final String ENDPOINT = "https://api.openweathermap.org/data/2.5";
    /**
     * 服务接口
     */
    private interface ApiManagerService {
        @GET("/weather")
        WeatherData getWeather(@Query("q") String place, @Query("units") String units);
    }
    private static final RestAdapter restAdapter = new RestAdapter.Builder()
        .setEndpoint(ENDPOINT).setLogLevel(RestAdapter.LogLevel.FULL).build();
    private static final ApiManagerService apiManager = 
        restAdapter.create(ApiManagerService.class);
    /**
     * 将服务接口返回的数据,封装成{@link rx.Observable}
     * @param city
     * @return
     */
    public static Observable<WeatherData> getWeatherData(final String city) {
        return Observable.create(new Observable.OnSubscribe<WeatherData>() {
            @Override
            public void call(Subscriber<? super WeatherData> subscriber) {
                // 订阅者回调 onNext 和 onCompleted
                subscriber.onNext(apiManager.getWeather(city, "metric"));
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.io());
    }
}

订阅者的回调有三个方法,onNext,onError,onCompleted

3、接口调用

/**
 * 多个 city 请求
 * map,flatMap 对 Observable进行变换
 */
Observable.from(CITIES).flatMap(new Func1<String, Observable<WeatherData>>() {
    @Override
    public Observable<WeatherData> call(String s) {
        return ApiManager.getWeatherData(s);
    }
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<WeatherData>() { // onNext
        @Override
        public void call(WeatherData weatherData) {
            Log.d(LOG_TAG, weatherData.toString());
        }
    }, new Action1<Throwable>() { // onError
        @Override
        public void call(Throwable throwable) {}
    });

/**
 * 单个 city 请求
 */
ApiManager.getWeatherData(CITIES[0]).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<WeatherData>() {
    @Override
    public void call(WeatherData weatherData) {
        Log.d(LOG_TAG, weatherData.toString());
        ((TextView) findViewById(R.id.text)).setText(weatherData.toString());
    }
}, new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {
        Log.e(LOG_TAG, throwable.getMessage(), throwable);
    }
});

/**
 * Android View 事件处理
 */
ViewObservable.clicks(findViewById(R.id.text), false).subscribe(new Action1<OnClickEvent>() {
    @Override
    public void call(OnClickEvent onClickEvent) {}
});

subscribeOn(Schedulers.io())与observeOn(AndroidSchedulers.mainThread())分别定义了这两个动作的线程。Android UI 更新需要在主线程。

4、retrofit 支持 rxjava 整合

 /**
  * 服务接口
  */
private interface ApiManagerService {
     @GET("/weather")
     WeatherData getWeather(@Query("q") String place, @Query("units") String units);
     /**
      * retrofit 支持 rxjava 整合
      * 这种方法适用于新接口
      */
     @GET("/weather")
     Observable<WeatherData> getWeatherData(@Query("q") String place, @Query("units") String units);
}
如果在胜利前却步,往往只会拥抱失败;如果在困难时坚持,常常会获得新的成功。
0 不喜欢
说说我的看法 -
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号