`
后来我们都老了
  • 浏览: 33781 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Rxjava学习

    博客分类:
  • java
阅读更多

1 基本概念

1.1 Rx概念

一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库,

其实 RxJava 的本质就是一个可以实现异步操作的库

1.2 Rx优势

同样是做异步,为什么人们用它,而不用现成的 Async / Future / XXX / ... 一个词:简洁! 异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 随着程序逻辑变得越来越复杂,它依然能够保持简洁,对日后代码维护省去不少力气。k

1.3 Rx结构

响应式编程的主要组成部分是observable, operator和susbscriber,网上大多数文章都是介绍说有两部分,我这里把operator操作符也加进去了,这样对结构的整体性会有更全面的认识)。 一般响应式编程的信息流如下所示:

Observable > Operator 1 > Operator 2 > Operator 3 > Subscriber

也就是说,observable是事件的生产者,subscriber是事件最终的消费者。 因为subscriber通常在主线程中执行,因此设计上要求其代码尽可能简单,只对事件进行响应,而修改事件的工作全部由operator执行。 如果我们不需要修改事件,就不需要在observable和subscriber中插入operator。这时的Rx结构如下:

Obsevable > Subscriber

这里看起来跟设计模式中的观察者模式很像,他们最重要的区别之一在于在没有subscriber(观察者模式中的observer)之前,observable(被观察者)不会产生事件。

1.4 最简单的模式

 

Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
            @Override
            public void call(Subscriber<? super Student> subscriber) {
                for (int i = 0; i < students.size(); i++) {
                    subscriber.onNext(students.get(i));
                }
                subscriber.onCompleted();
            }
        });
        //观察者1
        Subscriber<Student> subscriber = new Subscriber<Student>() {
            @Override
            public void onCompleted() {
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(Student student) {
                System.out.println(student.getAge());
            }
        };
        observable.subscribe(subscriber);

 

 

如果我们不关心subscriber是否结束(onComplete())或者发生错误(onError()),subscriber的代码可以简化为

 

 

Observable.just(student1,student2,student3,student4).subscribe(new Action1<Student>() {
    @Override
    public void call(Student student) {
        log(student.getAge());
    }
});

 

 

Observable.from(students).subscribe(new Action1<Stußßdent>() {
    @Override
    public void call(Student student) {
        log(student.getAge());
    }
});

 

 

很明显跟subscriber比起来,action相当于只有onNext()方法,因此,这种方式叫做不完整定义。

1.5 加入operator

1.5.1 map操作符

很多时候,我们需要针对处理过的事件做出响应,而不仅仅是Observable产生的原始事件。 意淫一下,加入我还要再输入每个student的自我介绍怎么办?传统方法for循环? 可以,但rxjava可以这么做

 

Observable.from(students)
                .map(new Func1<Student, Student>() {
                    @Override
                    public Student call(Student student) {
                        log(student.getIntroduce());
                        return student;
                    }
                })
                .subscribe(new Action1<Student>() {
                    @Override
                    public void call(Student student) {
                        log(student.getAge());
                    }
                });

 

 

1.5.2 flatMap操作符

坑爹,繁琐的需求又来了,每个student可以选择不同的课程,1对多... 怎么操作这些课程呢?传统方式嵌套for循环? No,rxjava可以这么做

 

Observable.from(students)
            .flatMap(new Func1<Student, Observable<Course>>() {
                @Override
                public Observable<Course> call(Student student) {
                    log("name:" + student.getName());
                    return Observable.from(student.getCourses());
                }
            })
            .map(new Func1<Course, Course>() {

                @Override
                public Course call(Course course) {
                    log("course:" + course.getName());
                    return course;
                }
            })
            .subscribe(new Subscriber<Course>() {
                @Override
                public void onCompleted() {
                    log("onCompleted");
                }

                @Override
                public void onError(Throwable throwable) {
                    log("onError");
                }

                @Override
                public void onNext(Course course) {
                    log(" teacher:" + course.getTeacherName());
                }
            });

 

 

1.5.3 filter操作符

如果我想在所有的学生中找出19岁以上怎么办?filter!

 

Observable.from(studentList)
            //获取19岁以上的
            .filter(new Func1<Student, Boolean>() {
                @Override
                public Boolean call(Student student) {
                    return student.getAge() >= 19;
                }
            })
            .flatMap(new Func1<Student, Observable<Course>>() {
                @Override
                public Observable<Course> call(Student student) {
                    System.out.println("student:" + student.getName() + " age:" + student.getAge());
                    return Observable.from(student.getCourses());
                }
            })
            .map(new Func1<Course, Course>() {

                @Override
                public Course call(Course course) {
                    System.out.print(" course:" + course.getName());
                    return course;
                }
            })
            .subscribe(new Subscriber<Course>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override
                public void onError(Throwable throwable) {
                    System.out.println("onError");
                }

                @Override
                public void onNext(Course course) {
                    System.out.println(" teacher:" + course.getTeacherName());
                }
            });

 

 

1.5.4 take操作符

如果我不想要这么多学生我只想要一个怎么办呢?take!

 

Observable.from(studentList)
            .filter(new Func1<Student, Boolean>() {
                @Override
                public Boolean call(Student student) {
                    return student.getAge() >= 19;
                }
            })
            .take(1)
            .flatMap(new Func1<Student, Observable<Course>>() {
                @Override
                public Observable<Course> call(Student student) {
                    System.out.println("student:" + student.getName() + " age:" + student.getAge());
                    return Observable.from(student.getCourses());
                }
            })
            .map(new Func1<Course, Course>() {

                @Override
                public Course call(Course course) {
                    System.out.print(" course:" + course.getName());
                    return course;
                }
            })
            .subscribe(new Subscriber<Course>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override
                public void onError(Throwable throwable) {
                    System.out.println("onError");
                }

                @Override
                public void onNext(Course course) {
                    System.out.println(" teacher:" + course.getTeacherName());
                }
            });

 

rxjava提供大量的操作,这里只介绍几个最基本的,有兴趣的同学可以会下自行学习.

1.6 取消订阅

实际上执行Observable.subscribe()时,它会返回一个Subscrition,它代表了Observable和Subscriber之间的关系。你可以通过Subscrition解除Observable和Subscriber之间的订阅关系,并立即停止执行整个订阅链。

subscription.unsubscribe();

2 调度器Schedulers

2.1 基本概念

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。

2.2 调度类别

Schedulers类别有很多种,下面介绍几个常用的,Android专用的在此不做介绍,有兴趣的同学请线下交流。

Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

Schedulers.io(): 行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

2.3 调度器的使用

subscribeOn():指定 subscribe() 所发生的线程,或者叫做事件产生的线程

Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        for (int i = 0; i < students.size(); i++) {
            LogUtil.log(Thread.currentThread().getName());
            subscriber.onNext(students.get(i));
        }
        subscriber.onCompleted();
    }
});
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Student student) {
        LogUtil.log(Thread.currentThread().getName() + ":" + student.getAge());
    }
};
LogUtil.log(Thread.currentThread().getName());
observable
        .subscribeOn(Schedulers.newThread())
        .subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
输出:
main
RxNewThreadScheduler-1
RxNewThreadScheduler-1:18
RxNewThreadScheduler-1
RxNewThreadScheduler-1:19
RxNewThreadScheduler-1
RxNewThreadScheduler-1:20


 
 从上图可以看出,线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;
observeOn():指定 Subscriber 所运行在的线程。或者叫做事件消费的线程
 
Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        for (int i = 0; i < students.size(); i++) {
            LogUtil.log(Thread.currentThread().getName());
            subscriber.onNext(students.get(i));
        }
        subscriber.onCompleted();
    }
});
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onCompleted() {  
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Student student) {
        LogUtil.log(Thread.currentThread().getName() + ":" + student.getAge());
    }
};
LogUtil.log(Thread.currentThread().getName());
observable
        .observeOn(Schedulers.newThread())
        .subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
 
输出:
main
main
main
main
RxNewThreadScheduler-1:18
RxNewThreadScheduler-1:19
RxNewThreadScheduler-1:20

从上图可以看出,线程切换发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程
3 应用场景
3.1 缓存降级
3.2 异步接口
可以实现异步接口调用的方法有很多,比如@Async/Future/CountdownLatch... 
  • 大小: 65.4 KB
  • 大小: 101.6 KB
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics