好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

RxJava的消息发送和线程切换实现原理

rxjava是一个在java虚拟机上的响应式扩展,通过使用可观察的序列将异步和基于事件的程序组合起来的一个库。

它扩展了观察者模式来支持数据/事件序列,并且添加了操作符,这些操作符允许你声明性地组合序列,同时抽象出要关注的问题:比如低级线程、同步、线程安全和并发数据结构等。

rxjava相信大家都非常了解吧,今天分享一下rxjava的 消息发送 和线程源码的分析。最后并分享一个相关demo,让大家更加熟悉我们天天都在用的框架。

消息订阅发送

首先让我们看看消息订阅发送最基本的代码组成:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

observable observable = observable.create( new observableonsubscribe<string>() {

      @override

      public void subscribe(observableemitter<string> emitter) throws exception {

        emitter.onnext( "jack1" );

        emitter.onnext( "jack2" );

        emitter.onnext( "jack3" );

        emitter.oncomplete();

      }

    });

 

    observer<string> observer = new observer<string>() {

      @override

      public void onsubscribe(disposable d) {

        log.d(tag, "onsubscribe" );

      }

 

      @override

      public void onnext(string s) {

        log.d(tag, "onnext : " + s);

      }

 

      @override

      public void onerror(throwable e) {

        log.d(tag, "onerror : " + e.tostring());

      }

 

      @override

      public void oncomplete() {

        log.d(tag, "oncomplete" );

      }

    };

 

    observable.subscribe(observer);

代码很简单,observable为被观察者,observer为观察者,然后通过observable.subscribe(observer),把观察者和被观察者关联起来。被观察者发送消息(emitter.onnext("内容")),观察者就可以在onnext()方法里回调出来。

我们先来看observable,创建是用observable.create()方法进行创建,源码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

public static <t> observable<t> create(observableonsubscribe<t> source) {

   objecthelper.requirenonnull(source, "source is null" );

   return rxjavaplugins.onassembly( new observablecreate<t>(source));

}

 

public static <t> t requirenonnull(t object, string message) {

   if (object == null ) {

      throw new nullpointerexception(message);

   }

   return object;

  }

 

public static <t> observable<t> onassembly( @nonnull observable<t> source) {

   function<? super observable, ? extends observable> f = onobservableassembly;

   if (f != null ) {

      return apply(f, source);

   }

   return source;

}

可以看出,create()方法里最主要的还是创建用observableonsubscribe传入创建了一个observablecreate对象并且保存而已。

?

1

2

3

4

5

6

7

8

public final class observablecreate<t> extends observable<t> {

   final observableonsubscribe<t> source;

 

   public observablecreate(observableonsubscribe<t> source) {

     this .source = source;

   }

 

}

接着是创建observer,这比较简单只是单纯创建一个接口对象而已

?

1

2

3

4

5

6

7

8

9

public interface observer<t> {

   void onsubscribe( @nonnull disposable d);

 

   void onnext( @nonnull t t);

 

   void onerror( @nonnull throwable e);

  

   void oncomplete();

}

订阅发送消息

observable.subscribe(observer)的subscribe方法如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

public final void subscribe(observer<? super t> observer) {

   objecthelper.requirenonnull(observer, "observer is null" );

   try {

     observer = rxjavaplugins.onsubscribe( this , observer);

     objecthelper.requirenonnull(observer, "plugin returned null observer" );

     subscribeactual(observer);

   } catch (nullpointerexception e) { // nopmd

     throw e;

   } catch (throwable e) {

     exceptions.throwiffatal(e);

     rxjavaplugins.onerror(e);

     nullpointerexception npe = new nullpointerexception( "actually not, but can't throw other exceptions due to rs" );

     npe.initcause(e);

     throw npe;

   }

}

 

//objecthelper.requirenonnull()方法

public static <t> t requirenonnull(t object, string message) {

   if (object == null ) {

      throw new nullpointerexception(message);

   }

   return object;

}

 

//rxjavaplugins.onsubscribe()方法

public static <t> observer<? super t> onsubscribe( @nonnull observable<t> source, @nonnull observer<? super t> observer) {

   bifunction<? super observable, ? super observer, ? extends observer> f = onobservablesubscribe;

   if (f != null ) {

     return apply(f, source, observer);

   }

   return observer;

}

从上面源码可以看出requirenonnull()只是做非空判断而已,而rxjavaplugins.onsubscribe()也只是返回最终的观察者而已。所以关键代码是抽象方法subscribeactual(observer);那么subscribeactual对应哪个代码段呢?

还记得observable.create()创建的observablecreate类吗,这就是subscribeactual()具体实现类,源码如下:

?

1

2

3

4

5

6

7

8

9

10

protected void subscribeactual(observer<? super t> observer) {

   createemitter<t> parent = new createemitter<t>(observer);

   observer.onsubscribe(parent);

   try {

     source.subscribe(parent);

   } catch (throwable ex) {

     exceptions.throwiffatal(ex);

     parent.onerror(ex);

   }

}

从上面的代码可以看出,首先创建了一个createemitter对象并传入observer,然后回到observer的onsubscribe()方法,而source就是我们之前创建observablecreate传入的observableonsubscribe对象。

?

1

2

3

4

class createemitter<t> extends atomicreference<disposable>

   implements observableemitter<t>, disposable {

 

  }

而createemitter又继承observableemitter接口,又回调observableonsubscribe的subscribe方法,对应着我们的:

?

1

2

3

4

5

6

7

8

9

observable observable = observable.create( new observableonsubscribe<string>() {

    @override

    public void subscribe(observableemitter<string> emitter) throws exception {

       emitter.onnext( "jack1" );

       emitter.onnext( "jack2" );

       emitter.onnext( "jack3" );

       emitter.oncomplete();

    }

});

当它发送消息既调用emitter.onnext()方法时,既调用了createemitter的onnext()方法:

?

1

2

3

4

5

6

7

8

9

public void onnext(t t) {

   if (t == null ) {

     onerror( new nullpointerexception( "onnext called with null. null values are generally not allowed in 2.x operators and sources." ));

     return ;

   }

   if (!isdisposed()) {

     observer.onnext(t);

   }

}

可以看到最终又回调了观察者的onnext()方法,把被观察者的数据传输给了观察者。有人会问

isdisposed()是什么意思,是判断要不要终止传递的,我们看emitter.oncomplete()源码:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

public void oncomplete() {

   if (!isdisposed()) {

     try {

       observer.oncomplete();

     } finally {

       dispose();

     }

   }

}

 

public static boolean dispose(atomicreference<disposable> field) {

     disposable current = field.get();

     disposable d = disposed;

     if (current != d) {

       current = field.getandset(d);

       if (current != d) {

         if (current != null ) {

           current.dispose();

         }

         return true ;

       }

     }

     return false ;

  }

 

public static boolean isdisposed(disposable d) {

     return d == disposed;

}

dispose()方法是终止消息传递,也就付了个disposed常量,而isdisposed()方法就是判断这个常量而已。这就是整个消息订阅发送的过程,用的是观察者模式。

线程切换

在上面模板代码的基础上,线程切换只是改变了如下代码:

?

1

2

3

observable.subscribeon(schedulers.io())

      .observeon(androidschedulers.mainthread())

      .subscribe(observer);

下面我们对线程切换的源码进行一下分析,分为两部分:subscribeon()和observeon()

subscribeon()

首先是subscribeon()源码如下:

?

1

2

3

4

public final observable<t> subscribeon(scheduler scheduler) {

   objecthelper.requirenonnull(scheduler, "scheduler is null" );

   return rxjavaplugins.onassembly( new observablesubscribeon<t>( this , scheduler));

}

我们传进去了一个scheduler类,scheduler是一个调度类,能够延时或周期性地去执行一个任务。

scheduler有如下类型:

 

类型 使用方式 含义 使用场景
ioscheduler schedulers.io() io操作线程 读写sd卡文件,查询数据库,访问网络等io密集型操作
newthreadscheduler schedulers.newthread() 创建新线程 耗时操作等
singlescheduler schedulers.single() 单例线程 只需一个单例线程时
computationscheduler schedulers测试数据putation() cpu计算操作线程 图片压缩取样、xml,json解析等cpu密集型计算
trampolinescheduler schedulers.trampoline() 当前线程 需要在当前线程立即执行任务时
handlerscheduler androidschedulers.mainthread() android主线程 更新ui等

 

接着就没什么了,只是返回一个observablesubscribeon对象而已。

observeon()

首先看源码如下:

?

1

2

3

4

5

6

7

8

9

public final observable<t> observeon(scheduler scheduler) {

   return observeon(scheduler, false , buffersize());

}

 

public final observable<t> observeon(scheduler scheduler, boolean delayerror, int buffersize) {

   objecthelper.requirenonnull(scheduler, "scheduler is null" );

   objecthelper.verifypositive(buffersize, "buffersize" );

   return rxjavaplugins.onassembly( new observableobserveon<t>( this , scheduler, delayerror, buffersize));

}

这里也是没什么,只是最终返回一个observableobserveon对象而已。

接着还是像原来那样调用subscribe()方法进行订阅,看起来好像整体变化不大,就是封装了一些对象而已,不过着恰恰是rxjava源码的精华,当他再次调用subscribeactual()方法时,已经不是之前的observablecreate()里subscribeactual方法了,而是最先调用observableobserveon的subscribeactual()方法,对应源码如下:

?

1

2

3

4

5

6

7

8

protected void subscribeactual(observer<? super t> observer) {

   if (scheduler instanceof trampolinescheduler) {

     source.subscribe(observer);

   } else {

     scheduler.worker w = scheduler.createworker();

     source.subscribe( new observeonobserver<t>(observer, w, delayerror, buffersize));

   }

}

在这里有两点要讲,一点是observeonobserver是执行观察者的线程,后面还会详解,然后就是source.subscribe,这个source.subscribe调的是observablesubscribeon的subscribe方法,而subscribe方法因为继承的也是observable,是observable里的方法,所以和上面的observablecreate一样的方法,所以会调用observablesubscribeon里的subscribeactual()方法,对应的代码如下:

?

1

2

3

4

5

public void subscribeactual( final observer<? super t> s) {

   final subscribeonobserver<t> parent = new subscribeonobserver<t>(s);

   s.onsubscribe(parent);

   parent.setdisposable(scheduler.scheduledirect( new subscribetask(parent)));

}

上面代码中,首先把observeonobserver返回给来的用subscribeonobserver[包装]起来,然后在回调observer的onsubscribe(),就是对应模板代码的onsubscribe()方法。

接着看subscribetask类的源码:

?

1

2

3

4

5

6

7

8

9

10

final class subscribetask implements runnable {

   private final subscribeonobserver<t> parent;

   subscribetask(subscribeonobserver<t> parent) {

     this .parent = parent;

   }

   @override

   public void run() {

     source.subscribe(parent);

   }

}

其中的source.subscribe(parent),就是我们执行子线程的回调方法,对应我们模板代码里的被观察者的subscribe()方法。它放在run()方法里,并且继承runnable,说明这个类主要是线程运行。接着看scheduler.scheduledirect()方法对应的源码如下:

?

1

2

3

4

5

6

7

8

9

10

11

public disposable scheduledirect( @nonnull runnable run) {

   return scheduledirect(run, 0l, timeunit.nanoseconds);

}

 

public disposable scheduledirect( @nonnull runnable run, long delay, @nonnull timeunit unit) {

   final worker w = createworker();

   final runnable decoratedrun = rxjavaplugins.onschedule(run);

   disposetask task = new disposetask(decoratedrun, w);

   w.schedule(task, delay, unit);

   return task;

}

在这里,createworker()也是一个抽象方法,调用的是我们的调度类对应的schedulers类里面的方法,这里是ioscheduler类,

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

public final class ioscheduler extends scheduler{

 

   final atomicreference<cachedworkerpool> pool;

 

   //省略....

 

   public worker createworker() {

     return new eventloopworker(pool.get());

   }

 

   static final class eventloopworker extends scheduler.worker {

     private final compositedisposable tasks;

     private final cachedworkerpool pool;

     private final threadworker threadworker;

 

     final atomicboolean once = new atomicboolean();

 

     eventloopworker(cachedworkerpool pool) {

       this .pool = pool;

       this .tasks = new compositedisposable();

       this .threadworker = pool.get();

     }

 

     //省略....

 

     @nonnull

     @override

     public disposable schedule( @nonnull runnable action, long delaytime, @nonnull timeunit unit) {

       if (tasks.isdisposed()) {

         // don't schedule, we are unsubscribed

         return emptydisposable.instance;

       }

       return threadworker.scheduleactual(action, delaytime, unit, tasks);

     }

   }

 

}

 

  static final class cachedworkerpool implements runnable {

 

   //省略....

 

   threadworker get() {

     if (allworkers.isdisposed()) {

       return shutdown_thread_worker;

     }

     while (!expiringworkerqueue.isempty()) {

       threadworker threadworker = expiringworkerqueue.poll();

       if (threadworker != null ) {

         return threadworker;

       }

     }

 

     threadworker w = new threadworker(threadfactory);

     allworkers.add(w);

     return w;

    }

    //省略....

}

这就是ioscheduler的createworker()的方法,其实最主要的意思就是获取线程池,以便于生成子线程,让subscribetask()可以运行。然后直接调用 w.schedule(task, delay, unit)方法让它在线程池里执行。上面中那threadworker的源码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

static final class threadworker extends newthreadworker {

   private long expirationtime;

   threadworker(threadfactory threadfactory) {

     super (threadfactory);

     this .expirationtime = 0l;

   }

 

   //省略代码....

  }

 

public class newthreadworker extends scheduler.worker implements disposable {

   private final scheduledexecutorservice executor;

 

   public newthreadworker(threadfactory threadfactory) {

     executor = schedulerpoolfactory.create(threadfactory);

   }

 

   public scheduledrunnable scheduleactual( final runnable run, long delaytime, @nonnull timeunit unit, @nullable disposablecontainer parent) {

     runnable decoratedrun = rxjavaplugins.onschedule(run);

 

     scheduledrunnable sr = new scheduledrunnable(decoratedrun, parent);

 

     if (parent != null ) {

       if (!parent.add(sr)) {

         return sr;

       }

     }

 

     future<?> f;

     try {

       if (delaytime <= 0 ) {

         f = executor.submit((callable<object>)sr);

       } else {

         f = executor.schedule((callable<object>)sr, delaytime, unit);

       }

       sr.setfuture(f);

     } catch (rejectedexecutionexception ex) {

       if (parent != null ) {

         parent.remove(sr);

       }

       rxjavaplugins.onerror(ex);

     }

 

     return sr;

   }

}

可以看到,这就调了原始的javaapi来进行线程池操作。

然后最后一环在子线程调用source.subscribe(parent)方法,然后回调刚开始创建的observablecreate的subscribeactual(),既:

?

1

2

3

4

5

6

7

8

9

10

protected void subscribeactual(observer<? super t> observer) {

     createemitter<t> parent = new createemitter<t>(observer);

     observer.onsubscribe(parent);

     try {

       source.subscribe(parent);

     } catch (throwable ex) {

       exceptions.throwiffatal(ex);

       parent.onerror(ex);

     }

}

进行消息的订阅绑定。

当我们在调用 emitter.onnext(内容)时,是在io线程里的,那回调的onnext()又是什么时候切换的?那就是前面为了整个流程流畅性没讲的在observeon()里的observeonobserver是执行观察者的线程的过程。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

class observeonobserver<t> extends basicintqueuedisposable<t>

   implements observer<t>, runnable {

 

     //省略代码....

 

     observeonobserver(observer<? super t> actual, scheduler.worker worker, boolean delayerror, int buffersize) {

       this .actual = actual;

       this .worker = worker;

       this .delayerror = delayerror;

       this .buffersize = buffersize;

     }

 

     @override

     public void onsubscribe(disposable s) {

       if (disposablehelper.validate( this .s, s)) {

         this .s = s;

         if (s instanceof queuedisposable) {

           @suppresswarnings ( "unchecked" )

           queuedisposable<t> qd = (queuedisposable<t>) s;

           int m = qd.requestfusion(queuedisposable.any | queuedisposable.boundary);

           if (m == queuedisposable.sync) {

             sourcemode = m;

             queue = qd;

             done = true ;

             actual.onsubscribe( this );

             schedule();

             return ;

           }

           if (m == queuedisposable.async) {

             sourcemode = m;

             queue = qd;

             actual.onsubscribe( this );

             return ;

           }

         }

         queue = new spsclinkedarrayqueue<t>(buffersize);

         actual.onsubscribe( this );

       }

     }

 

     @override

     public void onnext(t t) {

       if (done) {

         return ;

       }

       if (sourcemode != queuedisposable.async) {

         queue.offer(t);

       }

       schedule();

     } 

 

     void schedule() {

       if (getandincrement() == 0 ) {

         worker.schedule( this );

       }

     }

     //省略代码....

   }

当调用emitter.onnext(内容)方法,会调用上面的onnext()方法,然后在这个方法里会把数据压入一个队列,然后执行worker.schedule(this)方法,work是什么呢,还记得androidschedulers.mainthread()吗,这个对应这个handlerscheduler这个类,所以createworker()对应着:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

private static final class mainholder {

     static final scheduler default = new handlerscheduler( new handler(looper.getmainlooper()));

}

 

 

public worker createworker() {

   return new handlerworker(handler);

}

 

private static final class handlerworker extends worker {

     private final handler handler;

     private volatile boolean disposed;

 

     handlerworker(handler handler) {

       this .handler = handler;

     }

 

     @override

     public disposable schedule(runnable run, long delay, timeunit unit) {

       if (run == null ) throw new nullpointerexception( "run == null" );

       if (unit == null ) throw new nullpointerexception( "unit == null" );

       if (disposed) {

         return disposables.disposed();

       }

       run = rxjavaplugins.onschedule(run);

       scheduledrunnable scheduled = new scheduledrunnable(handler, run);

       message message = message.obtain(handler, scheduled);

       message.obj = this ; // used as token for batch disposal of this worker's runnables.

       handler.sendmessagedelayed(message, unit.tomillis(delay));

       if (disposed) {

         handler.removecallbacks(scheduled);

         return disposables.disposed();

       }

       return scheduled;

     }

}

在next()方法里,运用android自带的handler消息机制,通过把方法包裹在message里,同通过handler.sendmessagedelayed()发送消息,就会在ui线程里回调next()方法,从而实现从子线程切换到android主线程的操作。我们在主线程拿到数据就可以进行各种在主线程的操作了。

总结一下:

observablecreate 一> observablesubscribeon 一> observableobserveon为初始化顺序

当调用observable.subscribe(observer)时的执行顺序
observableobserveon 一> observablesubscribeon 一> observablecreate

当发送消息的执行顺序
observablecreate 一> observablesubscribeon 一> observableobserveon

以上就是消息订阅和线程切换的源码的所有讲解了。

为了让你们理解更清楚,我仿照rxjava写了大概的消息订阅和线程切换的最基本代码和基本功能,以帮助你们理解

https://github测试数据/jack921/rxjava2demo

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。

原文链接:https://HdhCmsTestjianshu测试数据/p/264b68fd96fa

查看更多关于RxJava的消息发送和线程切换实现原理的详细内容...

  阅读:15次