Rxjava2 总结

Android · luhaoaimama1 · Created at · Last by mm685265 Replied at · 2833 hits
2919 1486395760

本文在 DiyCodeZone的个人博客 首发,关注作者的 DiyCode帐号 或者作者微博可第一时间收到新文章推送。

Rxjava2基础认知

  • 形式正确的有限Observable
    调用观察者的onCompleted正好一次或者它的onError正好一次,而且此后不能再调用观察者的任何其它方法。如果onComplete 或者 onError 走任何一个 都会 主动解除订阅关系;

    • 如果解除订阅关系以后在发射 onError 则会 报错;而发射onComplete则不会。
    • 注意解除订阅关系 还是可以发射 onNext
  • Disposable类:

    • dispose():主动解除订阅
    • isDisposed():查询是否解除订阅 true 代表 已经解除订阅
  • CompositeDisposable类:可以快速解除所有添加的Disposable类
    每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可快速解除.

CompositeDisposable compositeDisposable=new CompositeDisposable();
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onComplete();或者 emitter.onError(new Throwable("O__O "));
            }
        }).subscribe(new Observer<Integer>() {

            private Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                <!-- 订阅   -->
                mDisposable = d;
                <!-- 添加到容器中 -->
                compositeDisposable.add(d);
            }

            @Override
            public void onNext(Integer value) {
                <!-- 判断mDisposable.isDisposed() 如果解除了则不需要处理 -->
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });
        <!-- 解除所有订阅者 -->
        compositeDisposable.clear();

<!-- more -->

基础概念

  • Scheduler scheduler

timer() alt+点击timer可查看 关于timer的方法 可以看到时候有这个参数的变体!

  • Callable<U> bufferSupplier:自定义装载的容器
     Observable.range(1, 10)
                //() -> new ArrayList<>() 则是bufferSupplier
                .buffer(2, 1,() -> new ArrayList<>())
                .subscribe(integers -> System.out.println(integers));

创建操作

  • create : 创建一个具有发射能力的Observable
        Observable.create(e -> {
            e.onNext("Love");
            e.onNext("For");
            e.onNext("You!");
            e.onComplete();
        }).subscribe(s -> System.out.println(s));
  • just:只是简单的原样发射,可将数组或Iterable当做单个数据。它接受一至九个参数
Observable.just("Love", "For", "You!")
                .subscribe(s -> System.out.println(s));
  • empty:创建一个不发射任何数据但是正常终止的Observable
  • never:创建一个不发射数据也不终止的Observable
  • error:创建一个不发射数据以一个错误终止的Observable
Observable.empty();
Observable.never();
Observable.error(new Throwable("O__O"))
  • timer 在延迟一段给定的时间后发射一个简单的数字0
  Observable.timer(1000, TimeUnit.MILLISECONDS)
                .subscribe(s -> System.out.println(s));
  • range:
    • start:起始值
    • count:一个是范 围的数据的数目。0不发送 ,负数 异常
        Observable.range(5, 3)
                //输出 5,6,7
                .subscribe(s -> System.out.println(s));
  • intervalRange
    • start,count:同range
    • initialDelay 发送第一个值的延迟时间
    • period 每两个发射物的间隔时间
    • unit,scheduler 额你懂的
Observable.intervalRange(5, 100, 3000, 100,
                TimeUnit.MILLISECONDS, Schedulers.io())
                .subscribe(s -> System.out.println(s));
  • interval:相当于intervalRange的start=0; > period 这个值一旦设定后是不可变化的
   //period 以后的美每次间隔 这个值一旦设定后是不可变化的  所以 count方法无效的!
   int[] s = new int[]{0};
    Observable.interval(3000, 100 + count(s), TimeUnit.MILLISECONDS, Schedulers.io())
            .subscribe(s2 -> System.out.println(s2));

    private int count(int[] s) {
            int result = s[0] * 1000;
            s[0] = s[0] + 1;
            return result;
        }
  • defer 直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable
    Observable.defer(() -> Observable.just("Love", "For", "You!"))
                  .subscribe(s -> System.out.println(s));
  • from系列

    • fromArray
     Integer[] items = {0, 1, 2, 3, 4, 5};
            Observable.fromArray(items).subscribe(
                    integer -> System.out.println(integer));
    • fromCallable
      Observable.fromCallable(() -> Arrays.asList("hello", "gaga"))
                      .subscribe(strings -> System.out.println(strings))
    • fromIterable
        Observable.fromIterable(Arrays.<String>asList("one", "two", "three"))
                     .subscribe(integer -> System.out.println(integer));
    • fromFuture
       Observable.fromFuture(Observable.just(1).toFuture())
                      .doOnComplete(() -> System.out.println("complete"))
                      .subscribe();

过滤操作

  • elementAt:只发射第N项数据
    <!-- 无默认值版本 -->
     Observable.just(1,2)
                .elementAt(0)
                .subscribe(o -> System.out.print(o ));//结果:1

    <!-- 带默认值的变体版本 -->
    Observable.range(0, 10)
    //        如果索引值大于数据 项数,它会发射一个默认值(通过额外的参数指定),而不是抛出异常。
    //    但是如果你传递一 个负数索引值,它仍然会抛出一个 IndexOutOfBoundsException 异常。
                    .elementAt(100, -100)
                    .subscribe(o -> System.out.print(o + "\t"));
  • IgnoreElements:如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知
 Observable.range(0, 10)
                .ignoreElements()
                .subscribe(() -> System.out.println("complete")
                        , throwable -> System.out.println("throwable"));
  • take系列

    • 变体 count系列:只发射前面的N项数据
     Observable.range(0,10)
                    .take(3)
                    .subscribe(o -> System.out.print(o + "\t"))
    • 变体 time系列: 发射Observable开始的那段时间发射 的数据
           Observable.range(0,10)
                     .take(100, TimeUnit.MILLISECONDS)
                     .subscribe(o -> System.out.print(o + "\t"));
  • takeLast

    • 变体 count系列:只发射后面的N项数据
     Observable.range(0,10)
                         .takeLast(3)
                         .subscribe(o -> System.out.print(o + "\t"));
    • 变体 time系列: 发射在原始Observable的生命周 期内最后一段时间内发射的数据
     Observable.range(0,10)
                    .takeLast(100, TimeUnit.MILLISECONDS)
                    .subscribe(o -> System.out.print(o + "\t"));
  • takeUntil:发送complete的结束条件 当然发送结束之前也会包括这个值

            Observable.just(2,3,4,5)
                    //发送complete的结束条件 当然发送结束之前也会包括这个值
                    .takeUntil(integer ->  integer>3)
                    .subscribe(o -> System.out.print(o + "\t"));//2,3,4
  • takeWhile:当不满足这个条件 会发送结束 不会包括这个值
           Observable.just(2,3,4,5)
                   //当不满足这个条件 会发送结束 不会包括这个值
                   .takeWhile(integer ->integer<=4 )
                   .subscribe(o -> System.out.print(o + "\t"));//2,3,4
  • skip系列

    • 变体 count系列:丢弃Observable发射的前N项数据
    Observable.range(0,5)
                   .skip(3)
                   .subscribe(o -> System.out.print(o + "\t"));
    • 变体 time系列:丢弃原始Observable开始的那段时间发 射的数据
    Observable.range(0,5)
                   .skip(3)
                   .subscribe(o -> System.out.print(o + "\t"));
  • skipLast

    • 变体 count系列:丢弃Observable发射的前N项数据
    Observable.range(0,5)
                   .skipLast(3)
                   .subscribe(o -> System.out.print(o + "\t"));
    • 变体 time系列:丢弃在原始Observable的生命周 期内最后一段时间内发射的数据
    Observable.range(0,10)
                 .skipLast(100, TimeUnit.MILLISECONDS)
                 .subscribe(o -> System.out.print(o + "\t"));
  • distinct:去重

    • keySelector:这个函数根据原始Observable发射的数据项产生一个 Key,然后,比较这些Key而不是数据本身,来判定两个数据是否是不同的
          Observable.just(1, 2, 1, 2, 3)
                    //这个函数根据原始Observable发射的数据项产生一个 Key,
                    // 然后,比较这些Key而不是数据本身,来判定两个数据是否是不同的
                    .distinct(integer -> Math.random())
                    .subscribe(o -> System.out.print(o + "\t"));
        日志:
        原因 key不同 所以当做数据不同处理
        1   2   1   2   3
    • 无参版本 就是内部实现了的keySelector通过生成的key就是value本身
       Observable.just(1, 2, 1, 2, 3)
                    .distinct()
                    .subscribe(o -> System.out.print(o + "\t"));
        日志:
        1   2   3
  • distinctUntilChanged(相邻去重):它只判定一个数据和它的直接前驱是 否是不同的。

    其他概念与distinct一样

  • throttleWithTimeout/debounce:

操作符会过滤掉发射速率过快的数据项
throttleWithTimeout/debounce: 含义相同
如果发送数据后 指定时间段内没有新数据的话 。则发送这条
如果有新数据 则以这个新数据作为将要发送的数据项,并且重置这个时间段,重新计时。

    Observable.create(e -> {
            e.onNext("onNext 0");
            Thread.sleep(100);
            e.onNext("onNext 1");
            Thread.sleep(230);
            e.onNext("onNext 2");
            Thread.sleep(300);
            e.onNext("onNext 3");
            Thread.sleep(400);
            e.onNext("onNext 4");
            Thread.sleep(500);
            e.onNext("onNext 5");
            e.onNext("onNext 6");
        })
                .debounce(330, TimeUnit.MILLISECONDS)
//                .throttleWithTimeout(330, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(o -> System.out.println(o));//结果 3 4 6
  • filter:只发射通过了谓词测试的数据项
        Observable.range(0, 10)
                //过滤掉false的元素
                .filter(integer -> integer % 2 == 0)
                .subscribe(o -> System.out.print(o + "\t"));
  • ofType:ofType 是 filter 操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据
   Observable.just(0, "what?", 1, "String", 3)
                //ofType 是 filter 操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。
                .ofType(String.class)
                .subscribe(o -> System.out.print(o + "\t"));
  • first:只发射第一项(或者满足某个条件的第一项)数 感觉和take(1) elementAt(0)差不多
      Observable.range(0, 10)
                //如果元数据没有发送  则有发送默认值
                .first(-1)
                .subscribe(o -> System.out.print(o + "\t"));
  • last:只发射最后一项(或者满足某个条件的最后一项)数据 感觉和takeLast(1)差不多
       Observable.empty()
                     //如果元数据没有发送  则有发送默认值
                     .last(-1)
                     .subscribe(o -> System.out.print(o + "\t"));
  • sample/throttleLast: 周期采样后 发送最后的数据
  • throttleFirst:周期采样 的第一条数据 发送 > 注意: 如果是已经被发送过的 则不会继续发送
 Observable.create(e -> {
            e.onNext("onNext 0");
            Thread.sleep(100);
            e.onNext("onNext 1");
            Thread.sleep(50);
            e.onNext("onNext 2");
            Thread.sleep(70);
            e.onNext("onNext 3");
            Thread.sleep(200);
            e.onNext("onNext 4");
            e.onNext("onNext 5");
        })
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                <!--  结果 : onNext 2 onNext 3    onNext 5    -->
                .sample(200, TimeUnit.MILLISECONDS,Schedulers.newThread())
                <!--  结果 : onNext 2 onNext 3    onNext 5    -->
//                .throttleLast(200, TimeUnit.MILLISECONDS,Schedulers.newThread())
                <!--  结果 : onNext 0 onNext 3    onNext 4    -->
//                .throttleFirst(200, TimeUnit.MILLISECONDS,Schedulers.newThread())
                .subscribe(o -> System.out.print(o + "\t"));

辅助操作

  • repeat:不是创建一个Observable,而是重复发射原始,Observable的数据序列,这个序列或者是无限的,或者通过 repeat(n) 指定重复次数
      Observable.just("Love", "For", "You!")
                .repeat(3)//重复三次
                .subscribe(s -> System.out.println(s));
  • repeatUntil:getAsBoolean 如果返回 true则不repeat false则repeat.主要用于动态控制
Observable.just("Love", "For", "You!")
                .repeatUntil(new BooleanSupplier() {
                    @Override
                    public boolean getAsBoolean() throws Exception {
                        System.out.println("getAsBoolean");
                        count++;
                        if (count == 3)
                            return true;
                        else
                            return false;
                    }
                }).subscribe(s -> System.out.println(s));
  • delay:延迟一段指定的时间再发射来自Observable的发射物 > 注意: > delay 不会平移 onError 通知,它会立即将这个通知传递给订阅者,同时丢弃任何待 发射的 onNext 通知。 > 然而它会平移一个 onCompleted 通知
      Observable.range(0, 3)
                .delay(1400, TimeUnit.MILLISECONDS)
                .subscribe(o -> System.out.println("===>" + o + "\t"));
  • delaySubscription:让你你可以延迟订阅原始Observable
Observable.just(1)
                .delaySubscription(2000, TimeUnit.MILLISECONDS)
                .subscribe(o -> System.out.println("===>" + o + "\t")
                        , throwable -> System.out.println("===>throwable")
                        , () -> System.out.println("===>complete")
                        , disposable -> System.out.println("===>订阅"));
  • do系列

    • doOnEach:注册一个回调,它产生的Observable每发射一项数据就会调用它一次
      Observable.range(0, 3)
                    .doOnEach(integerNotification -> System.out.println(integerNotification.getValue()))
                    .subscribe(o -> System.out.print("===>" + o + "\t"));
        日志:
        doOnEach:
        doOnEach:0===>0
        doOnEach:1===>1
        doOnEach:2===>2
        doOnEach:null
    • doOnNext:注类似doOnEach 不是接受一个 Notification 参数,而是接受发射的数据项。
         Observable.range(0, 3)
                 .doOnNext(integer -> {
                     if (integer == 2)
                         throw new Error("O__O");
                     System.out.print(integer);
                 })
                 .subscribe(o -> System.out.print("===>" + o + "\t")
                         , throwable -> System.out.print("===>throwable")
                         , () -> System.out.print("===>complete"));
        日志:
        0===>0  1===>1  ===>throwable
    • doOnSubscribe:注册一个动作,在观察者订阅时使用
    Observable.range(0, 3)
                  .doOnSubscribe(disposable -> System.out.print("开始订阅"))
                  .subscribe(o -> System.out.print("===>" + o + "\t"));
        日志:
         开始订阅===>0  ===>1   ===>2
    • doOnComplete:注册一个动作,在观察者OnComplete时使用
     Observable.range(0, 3)
                    .doOnComplete(() -> System.out.print("doOnComplete"))
                    .subscribe(o -> System.out.print("===>" + o + "\t"));
        日志:
         ===>0  ===>1   ===>2   doOnComplete
    • doOnError:注册一个动作,在观察者doOnError时使用
      Observable.error(new Throwable("?"))
                     .doOnError(throwable -> System.out.print("throwable"))
                     .subscribe(o -> System.out.print("===>" + o + "\t"));
        日志:
        异常信息....
        throwable
    • doOnTerminate:注册一个动作,Observable终止之前会被调用,无论是正 常还是异常终止。
         Observable.range(0, 3)
                 .doOnTerminate(() -> System.out.print("\t doOnTerminate"))
                 .subscribe(o -> System.out.print("===>" + o + "\t"));
        日志:
        ===>0   ===>1   ===>2        doOnTerminate
    • doFinally:注册一个动作,当它产生的Observable终止之后会被调用,无论是正常还 是异常终止。在doOnTerminate之后执行
       Observable.range(0, 3)
                       .doFinally(() -> System.out.print("\t doFinally"))
                       .doOnTerminate(() -> System.out.print("\t  doOnTerminate"))
                       .subscribe(o -> System.out.print("===>" + o + "\t"));
        日志:
        ===>0   ===>1   ===>2         doOnTerminate  doFinally
    • doOnDispose:注册一个动作,当【观察者取消】订阅它生成的Observable它就会被调

    注意:貌似需要在 为出现complete和error的时候 dispose才会触发 ~

        Disposable ab = Observable.interval(1, TimeUnit.SECONDS)
                      .take(3)
                      .doOnDispose(() -> System.out.println("解除订阅"))
                      .subscribe(o -> System.out.print("===>" + o + "\t"));
              ab.dispose();
        日志:
        解除订阅
  • materialize:将数据项和事件通知都当做数据项发射

  • dematerialize:materialize相反

Observable.range(0, 3)
                //将Observable转换成一个通知列表。
                .materialize()
                //与上面的作用相反,将通知逆转回一个Observable
                .dematerialize()
                .subscribe(o -> System.out.print("===>" + o + "\t"));
  • observeOn:指定一个观察者在哪个调度器上观察这个Observable
  • subscribeOn:指定Observable自身在哪个调度器上执行

注意 遇到错误 会立即处理而不是等待下游还没观察的数据
既onError 通知会跳到(并吞掉)原始Observable发射的数据项前面

  Observable.range(0, 3)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(o -> System.out.print("===>" + o + "\t"));
  • subscribe:操作来自Observable的发射物和通知
Javadoc: subscribe()
Javadoc: subscribe(onNext)
Javadoc: subscribe(onNext,onError)
Javadoc: subscribe(onNext,onError,onComplete)
Javadoc: subscribe(onNext,onError,onComplete,onSubscribe)
Javadoc: subscribe(Observer)
Javadoc: subscribe(Subscriber)
  • foreach:forEach 方法是简化版的 subscribe ,你同样可以传递一到三个函数给它,解释和传递给 subscribe 时一样

不同的是,你无法使用 forEach 返回的对象取消订阅。也没办法传递一个可以用于取消订阅 的参数

        Observable.range(0, 3)
                //subscribe的简化版本  没啥用
                .forEach(o -> System.out.println("===>" + o + "\t"));
  • serialize:保证上游下游同一线程 ,防止不同线程下 onError 通知会跳到(并吞掉)原始Observable发射的数据项前面的错误行为
     Observable.range(0, 3)
                .serialize()
                .subscribe(o -> System.out.print("===>" + o + "\t"));
  • Timestamp:它将一个发射T类型数据的Observable转换为一个发射类型 为Timestamped<T> 的数据的Observable,每一项都包含数据的原始发射时间
Observable.interval(100, TimeUnit.MILLISECONDS)
               .take(3)
                .timestamp()
                .subscribe(o -> System.out.println("===>" + o + "\t")
                        , throwable -> System.out.println("===> throwable")
                        , () -> System.out.println("===> complete")
                        , disposable -> System.out.println("===> 订阅"));
    日志:
    ===> 订阅
    ===>Timed[time=1501224256554, unit=MILLISECONDS, value=0]
    ===>Timed[time=1501224256651, unit=MILLISECONDS, value=1]
    ===>Timed[time=1501224256751, unit=MILLISECONDS, value=2]
    ===> complete
  • timeInterval:一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable
        Observable.interval(100, TimeUnit.MILLISECONDS)
                .take(3)
//         把发送的数据 转化为  相邻发送数据的时间间隔实体
                .timeInterval()
//                .timeInterval(Schedulers.newThread())
                .subscribe(o -> System.out.println("===>" + o + "\t")
                        , throwable -> System.out.println("===>throwable")
                        , () -> System.out.println("===>complete")
                        , disposable -> System.out.println("===>订阅"));
    日志:
    ===>订阅
    ===>Timed[time=113, unit=MILLISECONDS, value=0]
    ===>Timed[time=102, unit=MILLISECONDS, value=1]
    ===>Timed[time=97, unit=MILLISECONDS, value=2]
    ===>complete
  • timeout

    • 变体:过了一个指定的时长仍没有发射数据(不是仅仅考虑第一个),它会发一个错误
     Observable.interval(100, TimeUnit.MILLISECONDS)
    //        过了一个指定的时长仍没有发射数据(不是仅仅考虑第一个),它会发一个错误
                    .timeout(50, TimeUnit.MILLISECONDS)
                    .subscribe(o -> System.out.println("===>" + o + "\t")
                            , throwable -> System.out.println("===>timeout throwable")
                            , () -> System.out.println("===>timeout complete")
                            , disposable -> System.out.println("===>timeout 订阅"));
    timeout:
    ===>timeout 订阅
    ===>timeout throwable
    • 变体 备用Observable:过了一个指定的时长仍没有发射数据(不是仅仅考虑第一个),它会发一个错误
        Observable<Integer> other;
        Observable.empty()
                // 过了一个指定的时长仍没有发射数据(不是仅仅考虑第一个),他会用备用Observable 发送数据,本身的会发送一个compelte
                .timeout(50, TimeUnit.MILLISECONDS, other = Observable.just(2, 3, 4))
                .subscribe(o -> System.out.println("===>" + o + "\t")
                        , throwable -> System.out.println("===>timeout2 throwable")
                        , () -> System.out.println("===>timeout2 complete")
                        , disposable -> System.out.println("===>timeout2 订阅"));
        other.subscribe(o -> System.out.println("k ===>" + o + "\t"));
       timeout2:
       ===>timeout2 订阅
       ===>timeout2 complete
       k ===>2
       k ===>3
       k ===>4

变换操作

  • map:对Observable发射的每一项数据应用一个函数,执行变换操作,就是方形过渡到圆形
  Observable.just(1,2)
                .map(integer -> "This is result " + integer)
                .subscribe(s -> System.out.println(s));
  • flatMap: 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
    • mapper:根据发射数据映射成Observable
    • combiner: 用来合并 的

注意:FlatMap 对这些Observables发射的数据做的是合并( merge )操作,因此它们可能是交 错的。

      Observable.just(1, 2, 3)
                .flatMap(integer -> Observable.range(integer * 10, 2)
                        , (a, b) -> {
                            //a : 原始数据的 just(1,2,3) 中的值
                            //b : 代表 flatMap后合并发送的数据的值
                            System.out.print("\n a:" + a + "\t b:" + b);
                            //return flatMap发送的值 ,经过处理后 而发送的值
                            return a + b;
                        })
                .subscribe(s -> System.out.print("\t"+s));
 日志:
  <!-- 这里有顺序是因为没有在其他线程执行 -->
 a:1     b:10   11
 a:1     b:11   12
 a:2     b:20   22
 a:2     b:21   23
 a:3     b:30   33
 a:3     b:31   34
  • concatMap:类似FlatMap但是保证顺序 因为没有合并操作!
    Observable.just(1, 2, 3)
                .concatMap(integer -> Observable.range(integer * 10, 2))
                .subscribe(s -> System.out.print("\t"+s));
  • cast:在发射之前强制将Observable发射的所有数据转换为指定类型
 Observable.just(1, 2, "string")
                .cast(Integer.class)//订阅之后才能发横强转
                .subscribe(integer -> System.out.println(integer)
                        , throwable -> System.out.println(throwable.getMessage()));
  • groupBy:通过keySelector的apply的值当做key 进行分组,发射GroupedObservable(有getKey()方法)的group 通过group继续订阅取得其组内的值;
    • keySelector:通过这个的返回值 当做key进行分组
    • valueSelector:value转换
  Observable.range(0, 10)
                .groupBy(integer -> integer % 2, integer -> "(" + integer + ")")
                .subscribe(group -> {
                    group.subscribe(integer -> System.out.println(
                            "key:" + group.getKey() + "==>value:" + integer));
                });
日志:
key:0==>value:(0)
key:1==>value:(1)
key:0==>value:(2)
key:1==>value:(3)
key:0==>value:(4)
key:1==>value:(5)
key:0==>value:(6)
key:1==>value:(7)
key:0==>value:(8)
key:1==>value:(9)
  • window: 依照此范例 每三秒收集,Observable在此时间内发送的值。组装成Observable发送出去。
Observable.interval(1, TimeUnit.SECONDS).take(7)
                //返回值  Observable<Observable<T>> 即代表 发送Observable<T>
                .window(3, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(integerObservable -> {
                    System.out.println(integerObservable);
                    integerObservable.subscribe(integer -> System.out.println(integerObservable+"===>"+integer));
                });
日志:
为什么不是 345一起? 因为会有太细微的时间差。例如5如果在多线程切换的时候是超过3秒的1毫秒则就尴尬了~
io.reactivex.subjects.UnicastSubject@531c3d1c
io.reactivex.subjects.UnicastSubject@531c3d1c===>0
io.reactivex.subjects.UnicastSubject@531c3d1c===>1
io.reactivex.subjects.UnicastSubject@531c3d1c===>2
io.reactivex.subjects.UnicastSubject@2ea0f969
io.reactivex.subjects.UnicastSubject@2ea0f969===>3
io.reactivex.subjects.UnicastSubject@2ea0f969===>4
io.reactivex.subjects.UnicastSubject@2d30de03
io.reactivex.subjects.UnicastSubject@2d30de03===>5
io.reactivex.subjects.UnicastSubject@2d30de03===>6
  • scan:连续地对数据序列的每一项应用一个函数,然后连续发射结果

感觉就是发送一个有 累加(函数) 过程序列
* initialValue(可选) 其实就是放到 原始数据之前发射。
* a 原始数据的中的值
* b 则是最后应用scan函数后发送的值

 Observable.just(1, 4, 2)
                //7是用来 对于第一次的 a的值
                .scan(7, (a, b) -> {
                    //b 原始数据的 just(1,4,2) 中的值
                    //a 则是最后应用scan 发送的值
                    System.out.format("a:%d * b:%d\n", a, b);
                    return a * b;
                })
                .subscribe(integer -> System.out.println("===>:"+integer));
日志:
===>:7
a:7 * b:1
===>:7
a:7 * b:4
===>:28
a:28 * b:2
===>:56
  • buffer系列

    • 变体 count系列
     * 范例:发射[1-10]
     * buffer count 2 skip 1,结果 [1,2]  [2,3] [3,4] 3=2*1+1
     * buffer count 2 skip 2,结果 [1,2]  [3,4] [5,6] 5=2*2+1
     * buffer count 2 skip 3,结果 [1,2]  [4,5] [7,8] 7=2*3+1;

    * count:缓存的数量
    * skip:每个缓存创建的间隔数量
    > 则代表 每次初始偏移量 每次真正的起始值=fistValue+skip*skipCount;
    > 注意skip不能小于0
    > 可以小于count这样就会导致每个发送的list之间的值会有重复
    > 可以大于count这样就会导致每个发送的list之间的值和原有的值之间会有遗漏
    > 可以等于count就你懂的了

    * bufferSupplier:自定义缓存装载的容器;

            Observable.range(1, 10)
                    .buffer(2, 1,() -> new ArrayList<>())//有默认的装载器
                    <!-- 其他方法 -->
                    <!-- .buffer(2)//skip 默认和count一样 -->
                    <!--  .buffer(2, () -> new ArrayList<>())-->
                    .subscribe(integers -> System.out.println(integers));
    
             解析:每发射1个。创建一个发射物list buffer,每个buffer缓存2个,收集的存入list后发送。
    • 变体 time系列
      • timespan:缓存的时间
      • timeskip:每个缓存创建的间隔时间 同skip 可以小于大于等于timespan
       Observable.interval(500, TimeUnit.MILLISECONDS).take(7)
                     .buffer(3, 2, TimeUnit.SECONDS, Schedulers.single(),
                             Functions.createArrayList(16))
                     .subscribe(integers -> System.out.println(integers));
    
     解析:每两秒创建一个发射物list buffer,每个buffer缓存三秒 收集的存入list后发送。
     日志:
       [0, 1, 2, 3, 4]
       [4, 5, 6]
    • 变体 自定义buffer创建和收集时间
      • bufferOpenings:每当 bufferOpenings 发射了一个数据时,它就 创建一个新的 List,开始装入之后的发射数据
      • closingSelector:每当 closingSelector 发射了一个数据时,就结束装填数据 发射List。
    <!-- 范例和time系列的就一样了 -->
 Consumer<Long> longConsumer = aLong -> System.out.println("开始创建 bufferSupplier");
        Consumer<Long> longConsumer2 = aLong -> System.out.println("结束收集");
        Observable.interval(500, TimeUnit.MILLISECONDS).take(7)
//                .doOnNext(aLong -> System.out.println("原始发射物:" + aLong))
                .buffer(Observable.interval(2, TimeUnit.SECONDS)
                                .startWith(-1L)//为了刚开始就发射一次
                                .take(2)//多余的我就不创建了
                                .doOnNext(longConsumer)
                        , aLong -> Observable.timer(3, TimeUnit.SECONDS)
                                .doOnNext(longConsumer2)
                        , () -> new ArrayList<>())
                .subscribe(integers -> System.out.println("buffer发射物" + integers));

日志:
openings:
开始创建 bufferSupplier
开始创建 bufferSupplier
结束收集
buffer发射物[0, 1, 2, 3, 4]
buffer发射物[4, 5, 6]

* 变体 仅仅bufer创建时间

* boundarySupplier 因为发送一个值代表上个缓存的发送 和这个缓存的创建

> 这个缓存是连续的, 因为发送一个值代表上个缓存的发送 和这个缓存的创建
> 有发射物的时候 没缓存就创建了 就是 默认第一个发射物的时候由内部创建
> 注意 如果不发送事件缓存 存满了 会自动发送出去的

    Observable.interval(500, TimeUnit.MILLISECONDS).take(7)
                    .buffer(() -> Observable.timer(2, TimeUnit.SECONDS)
                                    .doOnNext(aLong -> System.out.println("开始创建 bufferSupplier"))
                            , () -> new ArrayList<Object>())
                    .subscribe(integers -> System.out.println(integers));
    日志:
    开始创建 bufferSupplier
    [0, 1, 2]
    [3, 4, 5, 6]

合并操作符

  • zip(静态方法):只有当原始的Observable中的每一个都发射了 一条数据时 zip 才发射数据。接受一到九个参数
 Observable<Long> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
                 .take(3)
                 .subscribeOn(Schedulers.newThread());
         Observable<Long> observable2 = Observable.interval(200, TimeUnit.MILLISECONDS)
                 .take(4)
                 .subscribeOn(Schedulers.newThread());
         Observable.zip(observable1, observable2, (aLong, aLong2) -> {
             System.out.print("aLong:" + aLong + "\t aLong2:" + aLong2+"\t");
             return aLong + aLong2;
         }).subscribe(o -> System.out.println("===>" + o + "\t"));

 日志:
 aLong:0     aLong2:0===>0
 aLong:1     aLong2:1===>2
 aLong:2     aLong2:2===>4
  • zipWith:zip的非静态写法,总是接受两个参数,第一个参数是一个Observable或者一个Iterable。
         observable1.zipWith( observable2, (aLong, aLong2) -> {
             System.out.print("aLong:" + aLong + "\t aLong2:" + aLong2+"\t");
             return aLong + aLong2;
         }).subscribe(o -> System.out.println("===>" + o + "\t"));
  • merge(静态方法):根据时间线 合并多个observer
  Observable<Long> ob1 = Observable.interval(100, TimeUnit.MILLISECONDS)
                .take(3)
                .subscribeOn(Schedulers.newThread());

        Observable<Long> ob2 = Observable.interval(50, TimeUnit.MILLISECONDS)
                .take(3)
                .map(aLong -> aLong + 10)
                .subscribeOn(Schedulers.newThread());
        Observable.merge(ob1, ob2)
                .subscribe(o -> System.out.print(o + "\t"));
日志结果:可以见出是根据时间线合并
10  10  0   0   11  11  12  12  1   1   2   2
  • mergeWith:merge非静态写法
 ob1.mergeWith(ob2)
                .subscribe(o -> System.out.print( o + "\t"));
  • combineLatest(静态方法):使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值,它接受二到九个Observable作为参数 或者单 个Observables列表作为参数
  Observable<Long> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
                .take(4)
                .subscribeOn(Schedulers.newThread());
        Observable<Long> observable2 = Observable.interval(200, TimeUnit.MILLISECONDS)
                .take(5)
                .subscribeOn(Schedulers.newThread());
        Observable.combineLatest(observable1, observable2, (aLong, aLong2) -> {
            System.out.print("aLong:" + aLong + "\t aLong2:" + aLong2+"\t");
            return aLong + aLong2;
        }).subscribe(o -> System.out.println("===>" + o + "\t"));
    日志:
    aLong:1  aLong2:0   ===>1
    aLong:2  aLong2:0   ===>2
    aLong:3  aLong2:0   ===>3
    aLong:3  aLong2:1   ===>4
    aLong:3  aLong2:2   ===>5
    aLong:3  aLong2:3   ===>6
    aLong:3  aLong2:4   ===>7
  • withLatestFrom:类似zip ,但是只在单个原始Observable发射了一条数据时才发射数据,而不是两个都发

但是注意 如果没有合并元素 既辅助Observable一次都没发射的时候 是不发射数据的

       Observable<Long> observable2 = Observable.interval(150, TimeUnit.MILLISECONDS)
                .take(4)
                .subscribeOn(Schedulers.newThread());
        Observable.interval(100, TimeUnit.MILLISECONDS)
                .take(3)
                .subscribeOn(Schedulers.newThread())
                .withLatestFrom(observable2, (aLong, aLong2) -> {
                    System.out.print("aLong:" + aLong + "\t aLong2:" + aLong2 + "\t");
                    return aLong + aLong2;
                })
                .subscribe(o -> System.out.println("===>" + o + "\t"));

日志:
明明原始take是3为啥不是三条log呢 因为原始的发送0的时候 ,辅助Observable还没发送过数据
aLong:1  aLong2:0   ===>1
aLong:2  aLong2:1   ===>3
  • switchMap:和flatMap类似,不同的是当原始Observable发射一个新的数据(Observable)时,它将取消订阅前一个Observable
        Observable.interval(500, TimeUnit.MILLISECONDS)
                .take(3)
                .doOnNext(aLong -> System.out.println())
                .switchMap(aLong -> Observable.intervalRange(aLong * 10, 3,
                        0, 300, TimeUnit.MILLISECONDS)
                        .subscribeOn(Schedulers.newThread()))
                .subscribe(aLong -> System.out.print(aLong+"\t"));
解析:因为发送2的时候 intervalRange发送第三条数据的时候已经是600ms既 500ms的时候原始数据发送了。导致取消订阅前一个Observable
所以 2 ,12没有发送 但是最后的22发送了 因为原始数据没有新发送的了

//        日志结果
//        0 1
//        10    11
//        20    21  22
//        而不是
//        0     1   2
//        10    11  12
//        20    21  22
  • startWith:是concat()的对应部分,在Observable开始发射他们的数据之前,startWith()通过传递一个参数来先发射一个数据序列

   Observable.just("old")
                  <!-- 简化版本 T item  -->
                  .startWith("Start")
                  <!--  多次应用探究 -->
                  .startWith("Start2")
                  <!--  observer -->
                  .startWith(Observable.just("Other Observable"))
                   <!--  Iterable -->
                  .startWith(Arrays.asList("from Iterable"))
                   <!--  T... -->
                  .startWithArray("from Array", "from Array2")
                  .subscribe(s -> System.out.println(s));
日志:
from Array
from Array2
from Iterable
Other Observable
Start2
Start
old
  • join:任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了。一条数据,就结合两个Observable发射的数据

     <!-- 此demo 好使但是未让能理解透彻  仅仅想测试能结果的任用  想明白的话 此demo无效 -->
     Observable.intervalRange(10, 4, 0, 300, TimeUnit.MILLISECONDS)
                .join(Observable.interval(100, TimeUnit.MILLISECONDS)
                                .take(7)
                        , aLong -> {
                            System.out.println("开始收集:"+aLong);
                            return Observable.just(aLong);
                        }
                        , aLong -> Observable.timer(200, TimeUnit.MILLISECONDS)
                        , (aLong, aLong2) -> {
                            System.out.print("aLong:" + aLong + "\t aLong2:" + aLong2 + "\t");
                            return aLong + aLong2;
                        }
                )
                .subscribe(aLong -> System.out.println(aLong));

条件操作

  • all:判定是否Observable发射的所有数据都满足某个条件
 Observable.just(2, 3, 4)
                .all(integer -> integer > 3)
                .subscribe((aBoolean, throwable) -> System.out.println(aBoolean));
日志:false
  • amb:给定多个Observable,只让第一个发射数据的Observable发射全部数据

    • ambArray(静态方法):根据测试结果这个静态方法发射的最后一个
          Observable.ambArray(
                    Observable.intervalRange(0, 3, 200, 100, TimeUnit.MILLISECONDS)
                    , Observable.intervalRange(10, 3, 300, 100, TimeUnit.MILLISECONDS)
                    , Observable.intervalRange(20, 3, 100, 100, TimeUnit.MILLISECONDS)
            )
                    .doOnComplete(() -> System.out.println("Complete"))
                    .subscribe(aLong -> System.out.println(aLong));
        日志:
        20  21  22  Complete
    • ambWith:这个发射原始的
     Observable.intervalRange(0, 3, 200, 100, TimeUnit.MILLISECONDS)
                    .ambWith(Observable.intervalRange(10, 3, 300, 100, TimeUnit.MILLISECONDS))
                    .doOnComplete(() -> System.out.println("Complete"))
                    .subscribe(aLong -> System.out.println(aLong));
    日志:
    0   1   2   Complete
  • contains:判定一个Observable是否发射一个特定的值

Observable.just(2, 3, 4)
                .contains(2)
                .subscribe((aBoolean, throwable) -> System.out.println(aBoolean));
  • switchIfEmpty:如果原始Observable正常终止后仍然没有发射任何数据,就使用备用的Observable
        Observable.empty()
                .switchIfEmpty(Observable.just(2, 3, 4))
                .subscribe(o -> System.out.println("===>" + o + "\t")); //2,3,4
  • defaultIfEmpty:发射来自原始Observable的值,如果原始Observable没有发射任何值,就发射一个默认值,内部调用的switchIfEmpty。
     Observable.empty()
                .defaultIfEmpty(1)
                .subscribe(o -> System.out.println("===>" + o + "\t")); //1
  • sequenceEqual:判定两个Observables是否发射相同的数据序列。(数据,发射顺序,终止状态)
Observable.sequenceEqual(
                Observable.just(2, 3, 4)
                , Observable.just(2, 3, 4))
                .subscribe((aBoolean, throwable) -> System.out.println(aBoolean));


<!-- 它还有一个版本接受第三个参数,可以传递一个函数用于比较两个数据项是否相同。 -->
Observable.sequenceEqual(
        Observable.just(2, 3, 4)
        , Observable.just(2, 3, 4)
        , (integer, integer2) -> integer + 1 == integer2)
        .subscribe((aBoolean, throwable) -> System.out.println(aBoolean));
  • skipUntil:丢弃原始Observable发射的数据,直到第二个Observable发射了一项数据
        Observable.intervalRange(30, 20, 500, 100, TimeUnit.MILLISECONDS)
                .skipUntil(Observable.timer(1000, TimeUnit.MILLISECONDS))
                .doOnNext(integer -> System.out.println(integer))
                //此时用这个主要是 测试环境 有执行时间 所以用阻塞比较好
                .blockingSubscribe();
  • skipWhile:丢弃Observable发射的数据,直到一个指定的条件不成立
 Observable.just(1,2,3,4)
                //从2开始 因为2条件不成立
                .skipWhile(aLong -> aLong==1)
                .doOnNext(integer -> System.out.println(integer))
                //此时用这个主要是 测试环境 有执行时间 所以用阻塞比较好
                .blockingSubscribe();
  • takeUntil:当第二个Observable发射了一项数据或者终止时,丢弃原始Observable发射的任何数据
    <!-- 条件变体 -->
    Observable.just(2,3,4,5)
                 .takeUntil(integer ->  integer<=4)
                 .subscribe(o -> System.out.print(o + "\t"));//2,3,4
    <!-- Observable变体 -->
    Observable.intervalRange(30, 20, 500, 100, TimeUnit.MILLISECONDS)
                 .takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS))
                 .doOnNext(integer -> System.out.println(integer))
                 .doOnComplete(() -> System.out.println("Complete"))
                 //此时用这个主要是 测试环境 有执行时间 所以用阻塞比较好
                 .blockingSubscribe();
  • takeWhile:发射Observable发射的数据,直到一个指定的条件不成立
        Observable.just(2,3,4,5)
                .takeWhile(integer ->integer<=4 )
                .subscribe(o -> System.out.print(o + "\t"));//2,3

错误处理

  • onErrorReturn:让Observable遇到错误时发射一个特殊的项并且正常终止
<!-- 遇到错误处理范例 -->
Observable.error(new Throwable("我擦 空啊"))
            .onErrorReturnItem("hei")
            .subscribe(o -> System.out.println("===>" + o + "\t")
                    , throwable -> System.out.println("===>throwable")
                    , () -> System.out.println("===>complete"));
日志:
===>hei
===>complete

<!--  遇到错误不处理范例 -->
  Observable.error(new Throwable("我擦 空啊"))
                .onErrorReturn(throwable -> {
                    System.out.println("错误信息:" + throwable.getMessage());
                    return throwable;
                })
                .subscribe(o -> System.out.println("===>" + o + "\t")
                        , throwable -> System.out.println("===>throwable")
                        , () -> System.out.println("===>complete"));
日志:
错误信息:我擦 空啊
===>java.lang.Throwable: 我擦 空啊
===>complete
  • resumeNext:让Observable在遇到错误时开始发射第二个Observable的数据序列

    • onErrorResumeNext:可以处理所有的错误
      Observable.error(new Throwable("我擦 空啊"))
                    .onErrorResumeNext(throwable -> {
                        System.out.println("错误信息:" + throwable.getMessage());
                        return Observable.range(0, 3);
                    })
               .subscribe(o -> System.out.print("===>" + o + "\t")
                                 , throwable -> System.out.print("===>throwable"+ "\t")
                                 , () -> System.out.print("===>complete"+ "\t"));
        日志:
        错误信息:我擦 空啊
        ===>0   ===>1   ===>2   ===>complete
    • onExceptionResumeNext:只能处理异常。

    Throwable 不是一个 Exception ,它会将错误传递给观察者的 onError 方法,不会使用备用 的Observable。

      <!-- Throwable不能处理范例 -->
      Observable.error(new Throwable("我擦 空啊"))
                    .onExceptionResumeNext(observer -> Observable.range(0, 3))
                    .subscribe(o -> System.out.println("===>" + o + "\t")
                            , throwable -> System.out.println("===>throwable")
                            , () -> System.out.println("===>complete"));
        日志:
        ===>throwable
        <!-- 正确演示范例 无效ing 求解答~ todo -->
  • retry:如果原始Observable遇到错误,重新订阅它期望它能正常终止

    • 变体count 重复次数
           Observable.create(e -> {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Throwable("hehe"));
            })
                    .retry(2)
                    .subscribe(o -> System.out.print("===>" + o + "\t")
                            , throwable -> System.out.print("===>throwable\t")
                            , () -> System.out.print("===>complete\t"));
            日志:
            ===>1   ===>2   ===>1   ===>2   ===>1   ===>2   ===>throwable
    • 变体Predicate 条件判定 如果返回 true retry,false 放弃 retry
           Observable.create(e -> {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Throwable("hehe"));
            })
                    .retry(throwable -> throwable.getMessage().equals("hehe1"))
                    .subscribe(o -> System.out.print("===>" + o + "\t")
                            , throwable -> System.out.print("===>throwable\t")
                            , () -> System.out.print("===>complete\t"));
            日志:
           ===>1    ===>2   ===>throwable
  • retryWhen: 需要一个Observable 通过判断 throwableObservable,Observable发射一个数据 就重新订阅,发射的是 onError 通知,它就将这个通知传递给观察者然后终止。

  <!-- 正常范例 -->
  Observable.just(1, "2", 3)
                .cast(Integer.class)
                <!-- 结果:1,1,complete 原因这个Observable发了一次数据 -->
                .retryWhen(throwableObservable -> Observable.timer(1, TimeUnit.SECONDS))
                <!-- 结果:1,1,1,1,complete 原因这个Observable发了三次数据 -->
                .retryWhen(throwableObservable -> Observable.interval(1, TimeUnit.SECONDS)
                    .take(3))
                .subscribe(o -> System.out.println("retryWhen 1===>" + o + "\t")
                        , throwable -> System.out.println("retryWhen 1===>throwable")
                        , () -> System.out.println("retryWhen 1===>complete"));


    <!-- 通过判断throwable 进行处理范例 -->
    Observable.just(1, "2", 3)
                .cast(Integer.class)
                .retryWhen(throwableObservable -> {
                    return throwableObservable.switchMap(throwable -> {
                        if (throwable instanceof IllegalArgumentException)
                            return Observable.just(throwable);
                            <!-- 这种方式OK -->
//                        else{
//                            PublishSubject<Object> pb = PublishSubject.create();
//                            pb .onError(throwable);
//                            return pb;
//                        }
                        else
                            //方法泛型
                            return Observable.<Object>error(throwable);
                          <!-- 这种方式也OK -->
//                        return Observable.just(1).cast(String.class);
                    });
                })
                .subscribe(o -> System.out.println("retryWhen 2===>" + o + "\t")
                        , throwable -> System.out.println("retryWhen 2===>throwable")
                        , () -> System.out.println("retryWhen 2===>complete"));
日志:
retryWhen 2===>1
retryWhen 2===>throwable

阻塞操作

  • toList
  Observable.just(1, 2, 3)
                .toList().blockingGet()
                .forEach(aLong -> System.out.println(aLong));
  • toSortList
     Observable.just(5, 2, 3)
                .toSortedList()
                .blockingGet()
                .forEach(integer -> System.out.println(integer))
  • toMap
     Map<String, Integer> map = Observable.just(5, 2, 3)
//                .toMap(integer -> integer + "_")
                //key 就是5_,value就是5+10   mapSupplier map提供者
                .toMap(integer -> integer + "_"
                        , integer -> integer + 10
                        , () -> new HashMap<>())
                .blockingGet();
  • toFuture > 这个操作符将Observable转换为一个返 回单个数据项的 Future 带有返回值的任务 > 如果原始Observable发射多个数据项, Future 会收到1个 IllegalArgumentException > 如果原始Observable没有发射任何数据, Future 会收到一 个 NoSuchElementException > 如果你想将发射多个数据项的Observable转换为 Future ,可以这样 用: myObservable.toList().toFuture()
 Observable.just(1, 2, 3)
                    .toList()//转换成Single<List<T>> 这样就变成一个数据了
                    .toFuture()
                    .get()
                    .forEach(integer -> System.out.println(integer));
  • blockingSubscribe
  Observable.just(1, 2, 3)
                .blockingSubscribe(integer -> System.out.println(integer));
  • blockingForEach:对BlockingObservable发射的每一项数据调用一个方法,会阻塞直到Observable完成。

        Observable.interval(100, TimeUnit.MILLISECONDS)
                .doOnNext(aLong -> {
                    if (aLong == 10)
                        throw new RuntimeException();
                }).onErrorReturnItem(-1L)
                .blockingForEach(aLong -> System.out.println(aLong));
  • blockingIterable
   Observable.just(1, 2, 3)
                .blockingIterable()
//                .blockingIterable(5);
                .forEach(aLong -> System.out.println("aLong:" + aLong));
  • blockingFirst
    Observable.empty()
       // .blockingFirst();
       //带默认值版本
        .blockingFirst(-1));
  • blockingLast:
 Observable.just(1,2,3)
       // .blockingLast();
       //带默认值版本
        .blockingLast(-1));
  • blockingMostRecent:返回一个总是返回Observable最近发射的数据的Iterable,类似于while的感觉
     Iterable<Long> c = Observable.interval(100, TimeUnit.MILLISECONDS)
                .doOnNext(aLong -> {
                    if (aLong == 10)
                        throw new RuntimeException();
                }).onErrorReturnItem(-1L)
                .blockingMostRecent(-3L);
    for (Long aLong : c) {
                System.out.println("aLong:" + aLong);
            }
    日志很长 可以自己一试变知
  • blockingSingle:

终止时只发射了一个值,返回那个值
empty 无默认值 报错, 默认值的话显示默认值
多个值的话 有无默认值都报错

        System.out.println("emit 1 value:" + Observable.just(1).blockingSingle());
        System.out.println("default empty single:" + Observable.empty().blockingSingle(-1));
        System.out.println("default emit 1 value:" + Observable.just(1).blockingSingle(-1));
        try {
            System.out.println("empty single:" + Observable.empty().blockingSingle());
            System.out.println("emit many value:" + Observable.just(1, 2).blockingSingle());
            System.out.println("default emit many value:" + Observable.just(1, 2)
                    .blockingSingle(-1));
        } catch (Exception e) {
            e.printStackTrace();
        }
        日志:
        emit 1 value:1
        default empty single:-1
        default emit 1 value:1
        java.util.NoSuchElementException

组合操作

  • compose:有多个 Observable ,并且他们都需要应用一组相同的 变换
<!--  用一个工具类去写 这样符合单一职责 -->
//composes 工具类
public class RxComposes {

    public static <T> ObservableTransformer<T, T> applyObservableAsync() {
        return upstream -> upstream.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());
    }

}


  Observable.empty()
                .compose(RxComposes.applyObservableAsync())
                .subscribe(integer -> System.out.println("ob3:" + integer));
  • ConnectableObservable:可连接的Observable在 被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始用这种方法,
    你可以 等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。即使没有任何订阅者订阅它,你也可以使用 connect 让他发射

    • replay(Observable的方法): 每次订阅 都对单个订阅的重复播放一边
      • bufferSize:对源发射队列的缓存数量, 从而对新订阅的进行发射;

    Observable的方法 返回是ConnectableObservable
    切记要让ConnectableObservable具有重播的能力,必须Obserable的时候调用replay,而不是ConnectableObservable 的时候调用replay

    //this  is  OK,too!
         ConnectableObservable<Integer> co = Observable.just(1, 2, 3)
                 //类似 publish直接转成 ConnectableObservable  切记要重复播放的话必须Obserable的时候调用replay
                 //而不是ConnectableObservable 的时候调用replay 所以 .publish().replay()则无效
                 .replay(3);//重复播放的 是1  2  3
    //           .replay(2);//重复播放的 是 2  3
    
         co.doOnSubscribe(disposable -> System.out.print("订阅1:"))
                 .doFinally(() -> System.out.println())
                 .subscribe(integer -> System.out.print(integer + "\t"));
         co.connect();//此时开始发射数据 不同与 refCount 只发送一次
    
         co.doOnSubscribe(disposable -> System.out.print("订阅2:"))
                 .doFinally(() -> System.out.println())
                 .subscribe(integer -> System.out.print(integer + "\t"));
    
         co.doOnSubscribe(disposable -> System.out.print("订阅3:"))
                 .doFinally(() -> System.out.println())
                 .subscribe(integer -> System.out.print(integer + "\t"));
    
    replay(3)日志:只能缓存原始队列的两个【1,2,3】
    订阅1:1 2   3
    订阅2:1 2   3
    订阅3:1 2   3
    
    replay(2)日志:只能缓存原始队列的两个【2,3】
    订阅1:1 2   3
    订阅2:  2   3
    订阅3:  2   3
    • publish(Observable的方法):将普通的Observable转换为可连接的Observable
     ConnectableObservable<Integer> co = Observable.just(1, 2, 3)
                    .publish();
    
            co.subscribe(integer -> System.out.println("订阅1:" + integer));
            co.subscribe(integer -> System.out.println("订阅2:" + integer));
            co.subscribe(integer -> System.out.println("订阅3:" + integer));
            co.connect();//此时开始发射数据
    • refCount(ConnectableObservable的方法): 操作符把从一个可连接的Observable连接和断开的过程自动化了, 就像reply的感觉式样 每次订阅 都对单个订阅的重复播放一边
     Observable<Integer> co = Observable.just(1, 2, 3)
                    .publish()
                    //类似于reply  跟时间线有关  订阅开始就开始发送
                    .refCount();
    
            co.doOnSubscribe(disposable -> System.out.print("订阅1:"))
                    .doFinally(() -> System.out.println())
                    .subscribe(integer -> System.out.print(integer + "\t"));
            co.doOnSubscribe(disposable -> System.out.print("订阅2:"))
                    .doFinally(() -> System.out.println())
                    .subscribe(integer -> System.out.print(integer + "\t"));
    
            Observable.timer(300, TimeUnit.MILLISECONDS)
                    .doOnComplete(() -> {
                        co.doOnSubscribe(disposable -> System.out.print("订阅3:"))
                                .doFinally(() -> System.out.println())
                                .subscribe(integer -> System.out.print(integer + "\t"));
                    }).blockingSubscribe();
    日志:
    订阅1:1 2   3
    订阅2:1 2   3
    订阅3:1 2   3

Subjects

Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当 了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个 Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射 新的数据。

对我来说为什么用subjects呢?所有Subject都可以直接发射,不需要 发射器的引用 和 Observable.create()不同

  • AsyncSubject:简单的说使用AsyncSubject无论输入多少参数,永远只输出最后一个参数。 > 但是如果因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
    AsyncSubject<Integer> source = AsyncSubject.create();

        source.subscribe(o -> System.out.println("1:"+o)); // it will emit only 4 and onComplete

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);

         <!-- it will emit 4 and onComplete for second observer also. -->
        source.subscribe(o -> System.out.println("2:"+o));

        source.onNext(4);
        source.onComplete();
        日志:
        1:4
        2:4
  • BehaviorSubject:会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。

如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何 数据,只是简单的向前传递这个错误通知。

 BehaviorSubject<Integer> source = BehaviorSubject.create();
        //默认值版本
//        BehaviorSubject<Integer> source = BehaviorSubject.createDefault(-1);

        source.subscribe(o -> System.out.println("1:"+o)); // it will get 1, 2, 3, 4 and onComplete

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);

        <!-- it will emit 3(last emitted), 4 and onComplete for second observer also. -->
        source.subscribe(o -> System.out.println("2:"+o));

        source.onNext(4);
        source.onComplete();
        日志:
        1:1
        1:2
        1:3
        2:3
        1:4
        2:4
  • publishSubject(subject里最常用的):可以说是最正常的Subject,从那里订阅就从那里开始发送数据。 > 如果原始的Observable因为发生了一个错误而终止,PublishSubject将不会发射任何数据,只 是简单的向前传递这个错误通知。
PublishSubject bs = PublishSubject.create();

        bs.subscribe(o -> System.out.println("1:"+o));
        bs.onNext(1);
        bs.onNext(2);
        bs.subscribe(o -> System.out.println("2:"+o));
        bs.onNext(3);
        bs.onComplete();
        bs.subscribe(o -> System.out.println("3:"+o));
        日志:
        1:1
        1:2
        1:3
        2:3
  • replaySubject: 无论何时订阅,都会将所有历史订阅内容全部发出。
        ReplaySubject bs = ReplaySubject.create();

        bs.subscribe(o -> System.out.println("1:"+o));
// 无论何时订阅都会收到1,2,3
        bs.onNext(1);
        bs.onNext(2);
        bs.onNext(3);
        bs.onComplete();

        bs.subscribe(o -> System.out.println("2:"+o));
        日志:
        1:1
        1:2
        1:3
        2:1
        2:2
        2:3

Single与Completable

参考:http://developer.51cto.com/art/201703/535298.htm

使用场景:其实这个网络请求并不是一个连续事件流,你只会发起一次 Get 请求返回数据并且只收到一个事件。我们都知道这种情况下 onComplete 会紧跟着 onNext 被调用,那为什么不把它们合二为一呢?
* Single:它总是只发射一个值,或者一个错误通知,而不是发射 一系列的值。因此,不同于Observable需要三个方法onNext, onError, onCompleted,订阅Single只需要两 个方法:

Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关 系终止。

* onSuccess - Single发射单个的值到这个方法
* onError - 如果无法发射需要的值,Single发射一个Throwable对象到这个方法

<!--  retrofit 范例-->
 public interface APIClient {

     @GET("my/api/path")
     Single<MyData> getMyData();
 }


 apiClient.getMyData()
     .subscribe(new Consumer<MyData>() {
         @Override
         public void accept(MyData myData) throws Exception {
             // handle data fetched successfully and API call completed
         }
     }, new Consumer<Throwable>() {
         @Override
         public void accept(Throwable throwable) throws Exception{
             // handle error event
         }
     });


     <!-- 单独使用范例: -->
        Single.just("Amit")
            .subscribe(s -> System.out.println(s)
                    , throwable -> System.out.println("异常"));

使用场景:通过 PUT 请求更新数据 我只关心 onComplete 事件。使用 Completable 时我们忽略 onNext 事件,只处理 onComplete 和 onError 事件
* Completable:本质上来说和 Observable 与 Single 不一样,因为它不发射数据。

<!--  retrofit 范例-->
public interface APIClient {

    @PUT("my/api/updatepath")
    Completable updateMyData(@Body MyData data);
}

apiClient.updateMyData(myUpdatedData)
    .subscribe(new Action() {
        @Override
        public void run() throws Exception {
            // handle completion
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception{
            // handle error
        }
    });


    <!-- 单独使用范例: -->
     Completable.timer(1000, TimeUnit.MILLISECONDS)
                    .subscribe(() -> System.out.println("成功")
                            , throwable -> System.out.println("异常"));
  • andThen( Completable中的方法最常用):在这个操作符中你可以传任何Observable、Single、Flowable、Maybe或者其他Completable,它们会在原来的 Completable 结束后执行

    apiClient.updateMyData(myUpdatedData)
        .andThen(performOtherOperation()) // a Single<OtherResult>
        .subscribe(new Consumer<OtherResult>() {
            @Override
            public void accept(OtherResult result) throws Exception {
                // handle otherResult
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception{
                // handle error
            }
        });

自定义操作符

  • lift 原理图

@Test
    public void lift(){
            Observable.just(1,2)
                    //也是代理模式  observer是真正订阅
                    .lift(observer -> new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(Integer integer) {
                            observer.onNext(integer+"?");
                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onComplete() {

                        }
                    })
                    .subscribe(o -> System.out.println(o));
    }

    日志:
    1?
    2?

实用技巧

flatMap 与 zip 配合的实用范例:

 Observable.fromArray(new File("/Users/fuzhipeng/Documents"))
                .flatMap(file -> Observable.fromArray(file.listFiles()))
                //比较经典的 就是Observable.just(file) 把 file一个元素转成 observer从而进行zip合并的难题解决了
                .flatMap(file ->
                        Observable.zip(Observable.just(file)
                                , Observable.timer(1, TimeUnit.SECONDS)
                                , (file1, aLong) -> file1))
                .filter(file -> file.getName().endsWith(".png"))
                .take(5)
                .map(file -> file.getName())
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(s -> System.out.println(s));
        while (true) {
        }

map的实用范例:

 //有些服务几口设计,返回数据外层会包裹一些额外信息,可以使用map()吧外层格式剥掉
        Observable.just(1)
                .map(integer -> new Integer[]{1, 2, 3})
                .subscribe(integers -> System.out.println(integers));

方法泛型的实用范例:

Observable.just(1, "2", 3)
                .cast(Integer.class)
                .retryWhen(throwableObservable -> {
                    return throwableObservable.switchMap(throwable -> {
                        if (throwable instanceof IllegalArgumentException)
                            return Observable.just(throwable);
                        //todo  方法泛型 如果我不写<Object> 则会报错
                        return Observable.<Object>error(throwable);
                        //这个报错!!!
//                        return Observable.error(throwable);
                    });
                })
                .subscribe(o -> System.out.println("===>" + o + "\t")
                        , throwable -> System.out.println("===>throwable")
                        , () -> System.out.println("===>complete"));

BehaviorSubject的使用技巧:

cache BehaviorSubject 是桥梁 并且有 发送最近的缓存特性!

BehaviorSubject<Object> cache = BehaviorSubject.create();
        Observable.timer(1,TimeUnit.SECONDS)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(cache);

        //可以想象成上面是方法  这里是方法被调用
        cache.subscribe(o -> System.out.println(o));//结果0

Observable 发射元素的封装范例:

//创建一个Observable 可以直接发送的 原因 获取rx内部方法需要final很恶心 所以...
        RxEmitter<Integer> emitter = new RxEmitter();
        Observable.create(emitter)
                .subscribe(integer -> System.out.println(integer));
        emitter.onNext(1);
        emitter.onNext(2);


public class RxEmitter<T> implements ObservableOnSubscribe<T>, ObservableEmitter<T> {

    ObservableEmitter<T> e;

    @Override
    public void subscribe(ObservableEmitter<T> e) throws Exception {
        this.e = e;
    }

    @Override
    public void onNext(T value) {
        e.onNext(value);
    }

    @Override
    public void onError(Throwable error) {
        e.onError(error);
    }

    @Override
    public void onComplete() {
        e.onComplete();
    }

    @Override
    public void setDisposable(Disposable d) {
        e.setDisposable(d);
    }

    @Override
    public void setCancellable(Cancellable c) {
        e.setCancellable(c);
    }

    @Override
    public boolean isDisposed() {
        return e.isDisposed();
    }

    @Override
    public ObservableEmitter<T> serialize() {
        return e.serialize();
    }

    @Override
    public boolean tryOnError(Throwable t) {
        return e.tryOnError(t);
    }
}

Reference&Thanks:

https://www.gitbook.com/book/mcxiaoke/rxdocs/details

基本上所有的都参考此文档 很神!

http://blog.csdn.net/maplejaw_/article/details/52396175

http://developer.51cto.com/art/201703/535298.htm

http://gank.io/post/560e15be2dca930e00da1083

https://github.com/amitshekhariitbhu/RxJava2-Android-Samples

http://www.jianshu.com/u/c50b715ccaeb

共收到 6 条回复
96

bhhhh

96

bhhh2

96
mm0089 · #5 ·

臺灣約妹找妓女做愛打炮賴mm0089 堅持讓你花得值得 讓你一次就滿意
薇薇外送茶優質正妹網站:http://www.kiss69lg.com/forum.php?forumlist=1&mobile=2
堅持讓你花得值得 讓你一次就滿意
保證咩咩 熟練多變的技巧 讓你三度回味
堅持誠信經營 絕不會說的天花亂墬 浪費彼此寶貴的時間
精心安排 保證約好的時間 20分鐘內火速到達
絕無強迫消費‧購買點數‧匯款‧ATM轉帳‧都是見到本人 滿意在消費
優咩咩都是經過篩選 絕無地雷 請放心光臨體驗享受
㊣平價錢又安心 熟客 新客都享有優惠
薇薇茶坊營業時間:中午12點-凌晨四點

外送地區:臺北新北林口龜山新竹臺中彰化南投高雄臺南
妹妹類型:學生 OL 巨乳 蘿莉 人妻 技術茶 空姐小模 AV女優 婚紗助理等
服務內容:全套服務
約妹流程:賴上先預約然後準時給房號在房間等妹到
約會地點:你自己選擇的旅館or熟客可送住家
消費方式:看到妹喜歡在當場現金交易 不喜歡可換三次

北部:一節6000內立減500送1000優惠券
一節7000 -8000第二節半價 買三節送一節
一節9000-10000 立減1000-2000送2次半價
一節11000-15000 立減3000-5000送3次半價
一節15000-30000 立減5000-8000 送一年半價

中南部:一節4000-5000 二節半價
一節6000-7000 買兩節送一節
一節7000-10000 買兩節送兩節
一節11000-15000 立減3000-5000送2次半價
一節15000-30000立減5000-8000送一年半價
約小姐部落格看照網址:https://www.cssanyu.org/bbs2/forum.php?mod=viewthread&tid=333726&extra=
約小姐部落格看照https://www.photostore.me/mm0089/?list=images&sort=date_desc&page=4&seek=ursjV
雙北桃園林口龜山新竹看照約妹網址:http://www.kiss69lg.com/forum.php?mod=forumdisplay&fid=143
臺中彰化南投看照約妹網址:http://www.kiss69lg.com/forum.php?mod=forumdisplay&fid=145
高雄臺南看照約妹網址:http://www.kiss69lg.com/forum.php?mod=forumdisplay&fid=147
安全旅館便宜經濟實惠旅館推薦:http://www.kiss69lg.com/forum.php?mod=forumdisplay&fid=182
色情圖片露點照片網址:http://www.kiss69lg.com/forum.php?mod=forumdisplay&fid=177
成人小說網址http://www.kiss69lg.com/forum.php?mod=forumdisplay&fid=163
台中台北外送茶莊、茶坊推薦|優質台北、高雄外約茶妹任你選南投外送茶/全套叫小姐彰化外送茶
草屯外送茶大台中外送茶,外約美女,薇薇外送茶賴mm0089外約服務網,台中一夜情 .台灣出差旅館叫小姐line:mm0089
台北外送茶/台中外送茶/高雄外送茶/台南外送茶/新竹外送茶/彰化外送茶/南投外送茶/薇薇外送茶賴mm0089大台灣台北台中高雄台南新竹地區喝茶服務外送茶薇薇外送茶賴mm0089/鼓山區看照約妹薇薇外送茶賴mm0089前鎮區美腿茶/三民區火辣茶/新興區惹火嫵媚茶/左營區MT外送... 外送/台北清涼茶莊/更多優質妹妹/台北外送茶莊/高雄外送茶/新竹外送茶/洗澡/愛愛薇薇外送茶賴mm0089/口交旅館酒店外送小姐/上門服務/薇薇外送茶賴mm0089茶魚分享/大台北外送茶坊/大台中外送茶坊/高雄外送茶/援交妹網站台灣叫小姐俱樂部,薇薇外送茶賴mm0089台北叫小姐,薇薇外送茶賴mm0089西門町外送服務 板橋外送茶高雄約妹外約茶莊薇薇外送茶賴mm0089/夜市附近叫小姐,85大樓叫小姐 高雄叫小姐按摩外送茶西?町找小姐薇薇外送茶賴mm0089林森北找茶喝/台北?正妹/台?援交找薇薇外送茶賴mm0089台北叫小姐台中旅館叫小姐高雄旅遊找妹兼職美女茶外送看照約妹好茶外送到家西?町找小姐|林森北找茶喝/台北?正妹/台?援交找薇薇外送茶賴mm0089
#台灣汽車旅館找小姐 #商旅找小姐薇薇外送茶賴mm0089 #星級酒店找小姐
#旅館找小姐 #商旅找小姐薇薇外送茶賴mm0089 #星級酒店找小姐 #旅館找小姐
#飯店找小姐 #住家找小姐薇薇外送茶賴mm0089 #賓館找小姐 #台灣出差旅遊外約 #台灣出差旅遊找小姐
#酒店外約叫茶 #台灣汽車旅館外約叫茶 薇薇外送茶賴mm0089#商旅外約叫茶 #星級酒店外約叫茶 #旅館外約叫茶 #飯店外約叫茶
#住家外約叫茶 #賓館外約叫茶 #台灣出差旅遊外約叫茶 薇薇外送茶賴mm0089#台灣出差旅遊外約叫茶 +薇薇外送茶賴mm0089#
台中外送茶 #台北外送茶 #新竹外送茶薇薇外送茶賴mm0089 #高雄外送茶 #台南外送茶 #彰化外送茶 #南投外送茶 #台中外約
#台北外約 #高雄外約薇薇外送茶賴mm0089 #新竹外約 #台南外約 #彰化外約 #南投外約 #台灣外送茶 #外送茶 #情愛全台外送茶看照約妹叫小姐
#外送茶不戴套 +薇薇外送茶賴mm0089#板橋外約 #三重外約 #永和外約 #中和外約 #汐止外約 #新莊外約 #土城外約
#新店外約 #蘆洲外約 #五股外約 #泰山外約 #淡水外約薇薇外送茶賴mm0089 #八里外約 #林口外約 #龜山外約 #台中外約 #高雄外約
#台北外約薇薇外送茶賴mm0089 #本土外約 #外約台妹 #中正外約 #大同外約 #松山外約+薇薇外送茶賴mm0089 #板橋外送茶 #板橋外約
#大安外約 #萬華外約 #信義外約薇薇外送茶賴mm0089 #士林外約 #北投外約 #內湖外約 #南港外約 #文山外約 #新竹外約 #台南外約
#西屯外約 #南屯外約 薇薇外送茶賴mm0089#北屯外約 #逢甲外約 #大里外約 #大雅外約 #七其外約 #東海外約 #烏日外約 #太平外約
#豐原外約薇薇外送茶賴mm0089 #沙鹿外約 薇薇外送茶賴mm0089#逢甲茶莊 #逢甲全套
#薇薇外送茶賴mm0089逢甲外約 #逢甲外送茶 #逢甲叫小姐 #逢甲打砲 屏東汽車旅館叫小姐.薇薇外送茶賴mm0089屏東找茶,屏東外送舒壓按摩.屏東護膚全套外約.屏東找妹薇薇外送茶賴mm0089.屏東找小姐.屏東汽車旅館叫妹妹服務薇薇外送茶賴mm0089.屏東叫小姐.妹妹服務找歡樂.薇薇外送茶賴mm0089屏東優質外送茶莊.屏東出差旅遊約妹 .薇薇外送茶賴mm0089屏東正妹論壇.屏東約情人.薇薇外送茶賴mm0089屏東找女人兼職妹.推薦屏東茶莊##屏東外約妹妹價位#屏東找小姐.薇薇外送茶賴mm0089屏東全套外送.屏東辣妹薇薇外送茶賴mm0089.屏東找學生妹.人妻
#逢甲茶訊 #逢甲援交 #逢甲找女人薇薇外送茶賴mm0089 #逢甲魚訊 #逢甲炮神器 #逢甲紓壓 #逢甲性愛服務 #逢甲鐘點情人 #
太平全套 #薇薇外送茶賴mm0089大里全套 #沙鹿全套 #豐原全套 #大雅全套 #烏日全套薇薇外送茶賴mm0089 #台中車站應召 #台中南屯約妹
#台中西屯叫小姐 #台中逢甲外送 #台中勤美約妹薇薇外送茶賴mm0089 #台中車站叫小姐 #台中北屯應召 #台中南屯叫雞 #台中車站叫妹
#台中北區叫小姐薇薇外送茶賴mm0089 #台中應召 #台中車站叫雞 #台中車站茶莊 #台中西屯外送薇薇外送茶賴mm0089 #台中逢甲約砲 #台中北屯叫妹 #台中市區應召桃園半套店薇薇外送茶賴mm0089 ,#桃園半套價錢 #桃園找援,#桃園聯天室找援,#桃園西門找援桃園半套店薇薇外送茶賴mm0089 ,#桃園半套價錢 台中一夜情,台中??,台中全套基隆叫小姐電話 #基隆叫小姐 #基隆飯店叫小姐 #基隆旅館叫小姐薇薇外送茶賴mm0089 #基隆找女人 #基隆找妹 #基隆出差叫小姐 #基隆打炮薇薇外送茶賴mm0089 #基隆看照約妹#桃園茶莊心得,#桃園茶莊ptt,#桃園桑拿薇薇外送茶賴mm0089 ,#桃園桑拿浴,#桃園桑拿網
#桃園桑拿論壇薇薇外送茶賴mm0089 ,#桃園桑拿澳門,#桃園桑拿168,#桃園半套店薇薇外送茶賴mm0089 ,#桃園半套價錢
台北外送茶/高雄鐘點情人外約,台北旅館叫小姐,台北鐘點情人,台灣一夜情,高雄一夜情,台中一夜情,台北一夜情,台北美女外約,台中美女外約,高雄美女外約,高雄茶莊,台中茶莊,台北茶莊,台北叫小姐,台中叫小姐,高雄叫小姐,高雄外約/外約/援交妹,吃魚喝茶論壇,大家來找茶,PLUS,伊利,微克成人網/女優GoGoGo/淘A片/一刀未剪/免費成人影音薇薇外送茶賴mm0089 /交換網站/交友網站/性愛成人網/一葉晴成人貼片/成人極品情色站/癡漢線上免費A片/台灣明星淫片流出露比線上免費A片/伊莉成人論壇/◆免費線上A片◆/寶貝一夜情聊天室/免費A片頻道/無名成人網/成人網/台北情色聯盟/天天幹貼圖/洪爺色情網/台灣噴精成人網/十八小妹自拍美少女自拍貼圖老婆自拍貼圖/色色女孩情色總站/A圖情色交流/666人氣貼圖/插插穴排行/69Kiss電影排行/薇薇外送茶賴mm0089 彩虹頻道/后宮電影院/中文搜性網/酷站排行入口/上我人妻/成人龍虎豹/干爹情色排行/十七歲少女/台灣1歲/104寫真銀行/插插穴排行/熱酷美眉網/台灣性樂園/只有貼圖/波波美女網/交換連結eyny,玩美情人,男人幫,高雄女外約,高雄外送,高雄賓館叫小姐,高雄飯店叫小姐,高雄鐘點情人外約,台北旅館叫小姐,彰化外送茶,台灣兼職美女外送,薇薇外送茶賴mm0089 台北兼職美女外約,台中外送茶坊,高雄外送茶坊,台北外送茶坊陸妹價格,檳榔西施清涼秀,台功援學生兼差 msn,援交妹,24h 台北私兼,CLUB,台南指壓 3k,高雄酒店經紀,台中學生兼差msn,台北 夜店 舞廳 酒吧 制服便服,中年夫妻聯誼,高雄媛交,台南茶妹,台北指油壓留言板,情趣精品,大台南一夜情人外約\俱樂部,台北賓館叫小姐,高雄原味貼身衣物買賣,夢時代購物中心,台北推拿中醫,0204一夜崤◆隆f聊天室,熟女圖,討論區,台南喝茶的店哪好,網路購物,台中理容按摩,台南陪唱,台北下\午茶 blog,高雄 砲友,台中好茶討論區,高雄茶店news,高雄按摩個人工作室,旅遊,台南24h台南24h餐廳,台北茶訊交流msn,卡債,台中旅館外叫服務,台中 spa油壓男按摩小姐服務/漁會玩美情人遊戲成人論壇 台北吃魚喝茶留言板外送/台北一夜情重點情/台北旅館飯店找服務叫小姐/找女人全套服務加按摩指油壓成人夜遊魚訊交流論壇區/台北應召站/伊莉喝茶/第一手論壇/外約愛愛/外約電話/外約高檔茶到府服務/伊莉plus28/成人性愛慾茶園 性交易/正妹外送服務/找茶論壇薇薇外送茶賴mm0089 /找茶討論區/台灣外送GTO/台灣樂緣外送茶/兩性論妹板橋外送酒店 北投泡溫泉三溫暖趙小姐/援交妹網站論壇/FB交友網站/UT天室交友一夜情炮友/台北喝茶買三送一接多買多送純情動感兼職妹/華僑台北旅遊出差消伴遊找女人茶/薇薇外送茶賴mm0089 極品俱樂部嚴選絕色經典,第一手娛樂論壇/卡提諾/玩美情人/吃魚喝茶網 薇薇外送茶賴mm0089 伊漁網/Plus論壇/台灣樂緣/小女人論壇/台灣論壇/微風論壇/伊莉論壇/禁地論壇/維克斯論壇/捷克論/男人幫論壇/大眾論壇/竹北旅館飯店找女人按摩舒壓叫小姐3p服務 愛情公寓論壇 交友/ 愛情/戀愛/貓都論壇/賽斯論壇/104論壇/九州娛樂論壇/櫻雪論壇/2B級/台灣、送茶坊台北外送茶坊,台中外送茶,高雄外送茶,美女外約服務/台中/高雄/新竹/彰化/莊極品俱樂部嚴選絕色經典,成人性愛慾茶園,找茶討論區,催情藥,唯美貼圖,成人論壇,網絡報稅,線上遊戲,高檔平價好茶,淫照聊天是尋夢園美女外送,情趣用品八大行業指油壓全套,暑假打工,網站設計,中國合夥人,鋼鐵俠3,HTC,蝴蝶機,變裝遊戲,茶,motel,hotel,性感絲襪,A片下載,AV女優,第一手論壇,貓都,卡提諾,喝茶,完美情人,BJ論壇,小女人論壇,卡提諾論壇,台灣論壇三溫暖中陪酒ktv,台北全套護膚個人工作室,尋找台南援交auty 美容美體 SPA沙龍,台北外約茶棧,台中越南餐廳,台南應召站,台北外送 3k,高雄下午茶外,台中三溫暖全套,台南全套油壓泰國,台南半套店1600元,高雄推拿指壓,小姐,台北夜生活 pub,台北單身聯誼,兼差,台北交換伴侶,台北一夜情,高雄聊天網,高雄美女兼職,台南車站美食,8000mile,台北一夜情緣俱樂部,高雄應徵\酒店酒店上班,高雄成人視訊聊天室,台南兼職找利菁,高雄24h到府指油壓,女兼職,酒店兼職\,壽山,台南茶訊茶資薇薇外送茶賴mm0089 ,台南陪酒小姐,高雄絲襪美腿高跟鞋,夫妻聯誼部落格,薇薇外送茶賴mm0089 台南小野貓檳榔西施外送時被下藥拍照,自拍女老師,找台中援妹地點,台北大陸妹價格,檳榔西施清涼秀,台功援學生兼差 msn,援交妹,24h 台北私兼,CLUB,台南指壓 3k,高雄酒店經紀,台中學生兼差msn,台北 夜店 舞廳 酒吧 制服便服,中年夫妻聯誼,高雄媛交,台南茶妹,台北指油壓留言板,情趣精品,大台南一夜情人外約\俱樂部,台北賓館叫小姐,高雄原味貼身衣物買賣,夢時代購物中心,台北推拿中醫,0204一夜崤◆隆f聊天室,熟女圖,討論區,台南喝茶的店哪好,網福祿猴林千又 吳宗憲 2015 黑豹旗 王大陸 徐太宇 登革熱 波多野結衣 金鐘獎 蔡英文 洪秀柱 蛇精男 靈異 鬼故事 柯文哲 柯P 大家來說鬼 綜藝玩很大 時尚脈動 賈靜雯 陳佩琪 氣象 反課綱 2015星光大賞 宅男女神 愛爾麗 泛舟哥 張吉吟 ET看電影 八仙 塵爆 楊子晴 范冰冰 安心亞 陳泱瑾 Grace ISIS 林書豪

96
mm685265 · #6 ·

臺灣找小姐+賴:211861 大奶大粉嫩可看照.洗澡愛愛按摩口交全套服務
好吃的東西當然要一起分享~ 小弟昨天趁休假找茶姊約了個正妹 果然幫我安排的素質很讚 妹妹叫童童 目前還是個大學生 22歲 身材臉蛋都是我的菜 身高160 甜美可愛 美腿哦 罩杯Dcup 真材實料 吸起來彈性十足 妹妹雖然年紀小 但是性欲很強 喜歡在床上纏著我的腰扭屁股 真的是視覺上肉體上的100分滿足!!全程真的很主動 很會挑逗 皮膚也很棒很白嫩 全身鮑魚都可以隨意摸哦 全身都很敏感 妹妹也很緊 還會夾我的小弟 插起來水水很多 很有感覺 超級讚!!! 真的是個很淫蕩的小女生 讓你有回味無窮的感覺 喜歡的可以嘗試看看 加賴:211861 找童童可看照片 她家還要其他的姊妹 類型很多 加賴說是阿傑介紹 有好康喔!!!

需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up