EventBus源代码解析:2、消息的发布与处理

在上一篇文章《EventBus源代码解析:1、初始化与订阅者注册》当中,我们主要分析了两个事情:

  • EventBus初始化
  • 订阅者的注册

我们通过分析,EventBus在初始化的时候,初始化了几个集合,分别用来根据EventType和Event Handler所在的类索引对应的Handler方法;并且也同时初始化了用于不同ThreadMode的Poster。订阅者在注册的时候,EventBus会解析要注册的类,分析其所有的方法,从中找出Event的Handler方法(即public修饰的以onEvent开头),然后根据EventType保存到相应的List中。

但我们一直没有分析到一个分析,那就是,EventBus到底是如何去Post消息的呢?接下来我们就去分析这个问题。按照我们的老套路,还是从最常用的代码入手:

MessageEvent event = new MessageEvent(System.currentTimeMillis(), "Message Sequence " + mSequence.getAndIncrement());
EventBus.getDefault().post(event);

这个代码主要分为两步,第一步是构建了一个需要处理的Event即MessageEvent,根据我们之前的分析,在我们调用register()方法的时候,EventBus会解析并将MessageBus作为key保存在一个HashMap中。通过调用EventBus.getDefault().post(event)方法,EventBus会自动调用我们的onEvent方法,我们这里的实现如下:

// Called in Android UI's main thread
public void onEventMainThread(MessageEvent event) {
    mMessages.add("onEventMainThread Receive : " + event);
    mAdapter.notifyDataSetChanged();
}

 public void onEvent(MessageEvent event) {
    Log.i(TAG, "Thread name : " + Thread.currentThread().getName());
    mHandler.obtainMessage(MESSAGE_WHAT_MESSAGEEVENT, event).sendToTarget();
}

那这一个过程当中,EventBus都做了哪些事情呢?我们先来看看post()方法的源代码:

post(Object event)

public void post(Object event) {
  
    // 获取当前线程(调用post方法的线程)中的一个PostingThreadState实例
    PostingThreadState postingState = currentPostingThreadState.get();
    // 获取当前线程(调用post方法的线程)中的EventQueue
    List<Object> eventQueue = postingState.eventQueue;
    // 将Event添加到队列当中
    eventQueue.add(event);

    // 如果当前线程(调用post方法的线程)没有在发布Event
    if (!postingState.isPosting) {
        // 判断调用者是否工作在主线程上
        postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
        // 设置标志,正在发布Event
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            // 依次发送,直至eventQueue为空为止
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

我们依次来分析每行代码的作用:

  1. PostingThreadState postingState = currentPostingThreadState.get();
    1. 这一行代码中,有一个PostingThreadState的定义,我们首先要搞明白PostingThreadState到底是怎么回事?
      /** For ThreadLocal, much faster to set (and get multiple values). */
      final static class PostingThreadState {
          // eventQueue
          final List<Object> eventQueue = new ArrayList<Object>();
          // 标志:正在发布?
          boolean isPosting;
          // 标志:是主线程?
          boolean isMainThread;
          // 订阅者
          Subscription subscription;
          // Event
          Object event;
          // 已经取消
          boolean canceled;
      }

      我们可以看到,原来这是一个静态不可变类,按照作者的描述,该类的作用用来提高性能,用于ThreadLocal,可以更快的去设置获取读取多个值。等下ThreadLocal在哪里呢?我们继续看,

      private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
          @Override
          protected PostingThreadState initialValue() {
              return new PostingThreadState();
          }
      };

      这是currentPostingThreadState的定义,我们可以看到,currentPostingThreadState定义为一个ThreadLocal对象,其内容是PostingThreadState对象,可以看到其中的initialValue()方法返回了一个新的PostingThreadState()对象,这是什么意思呢?换句话每当从一个线程调用currentPostingThreadState.get()方法的时候,系统会检查当前线程是否有一份PostingThreadState实例,如果没有则新建一个,再换句话说,每一个线程中都有其独一无二的一个PostingThreadState实例。那这个用来做什么呢?我们继续分析。

  2. List<Object> eventQueue = postingState.eventQueue;
    1. 这一行代码就比较有意思,post方法从postingState中获取了一个eventQueue,我们再回到刚刚去看一下PostingThreadState中关于eventQueue的定义
      final List<Object> eventQueue = new ArrayList<Object>();

      可以看到,每一个新的PostingThreadState对象中都有自己的一个eventQueue对象,并单独指向一个ArrayList,什么意思呢?我们可以明白,每个线程中都有自己的一份PostingThreadState拷贝,那么换言之,每一个线程中,同样有这样一个自己专属的eventQueue。也就是说代码List<Object> eventQueue = postingState.eventQueue;其实是获取了当前线程中的对应的eventQueue。(注意哦,EventBus中所有线程中消息的发送都是可以通过这个post方法实现的)

  3. eventQueue.add(event);
    1. 这段代码就相当简单了,将event添加到自己所在线程的eventQueue当中。
  4. 继续往下看,发现是一个判断
    if (!postingState.isPosting)

    那么我们就根据分支来分析:

    1. ture:也就是说postingState.isPosting=false;也就是说,当前的线程没有在发布event,则进入以下的流程:
      postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
      postingState.isPosting = true;
      if (postingState.canceled) {
          throw new EventBusException("Internal error. Abort state was not reset");
      }
      try {
          while (!eventQueue.isEmpty()) {
              postSingleEvent(eventQueue.remove(0), postingState);
          }
      } finally {
          postingState.isPosting = false;
          postingState.isMainThread = false;
      }

      我们来分析下,这里面都做了什么的工作呢?

      1. 首先会先判断当前的线程是否是在UI线程上,为什么要判断呢?什么这个还要问吗?UI线程不能做太多事情当然要小心处理啊!!!所以通过代码
        Looper.getMainLooper() == Looper.myLooper()

        来了解。

      2. 然后呢?判断当前的线程是否已经canceled,也就是当前的线程是否已经unregistered了,这个容易理解,不去细细分析。
      3. 然后进入一个while循环
        while (!eventQueue.isEmpty()) {
            postSingleEvent(eventQueue.remove(0), postingState);
        }

        这个代码也很明了啊,就是不断的通过postSingleEvent发送Event直至队列尾空为止。

    2. false:哦,这里应该写另外分支要做的事情,额,如果当前的线程正在发送,那么就不去做任何事情了,等待上一次的while循环处理就好。

所以,通过上面的分析,我们发现最终代码还是进入

postSingleEvent(eventQueue.remove(0), postingState);

对所有的消息进行发送处理,那我们继续分析一下这一个方法就好了。

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error

首先看源代码:

// 发送单个的Event
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    // 获取event的类
    Class<?> eventClass = event.getClass();
    // 标记,用来表示是否已经找到event对应的订阅者
    boolean subscriptionFound = false;
    // 判断event是否开启继承?
    if (eventInheritance) {
        // 查找Event对应的所有的event类型(包括父类和接口)。
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            // 获取其中的一个Event类型(Event对应的类或者其父类或者其实现的接口的一种)
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        // 如果没有开启Event继承,则直接根据Event的类型,在指定的线程中发送Event即可。
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            Log.d(TAG, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

其实我们大体上一看就知道,这段代码也并没有执行具体的消息发送,但做了很必要的预处理工作,那都有哪些工作呢?我已经在程序里面加了很多注释了,我们可以很容易的发现,其实关键的代码在里面的那个if..else…分支语句里面,我们来依次来看看。

if (eventInheritance) {
    // 查找Event对应的所有的event类型(包括父类和接口)。
    List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
    int countTypes = eventTypes.size();
    for (int h = 0; h < countTypes; h++) {
        // 获取其中的一个Event类型(Event对应的类或者其父类或者其实现的接口的一种)
        Class<?> clazz = eventTypes.get(h);
        subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
    }
} else {
    // 如果没有开启Event继承,则直接根据Event的类型,在指定的线程中发送Event即可。
    subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}

两个分支有什么不同呢?eventInheritance表示用户是否开启Event继承,如果不开启,则通过

postSingleEventForEventType(event, postingState, eventClass);

方法发送Event,如果开启,则首先通过

lookupAllEventTypes(eventClass);

查找event类所有的Event类型,然后依次通过代码

postSingleEventForEventType(event, postingState, eventClass);

进行消息处理,所以,我们分两步来,首先来看看lookupAllEventTypes的代码。

private List<Class<?>> lookupAllEventTypes(Class<?> eventClass)

private List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
    synchronized (eventTypesCache) {
        List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
        if (eventTypes == null) {
            eventTypes = new ArrayList<Class<?>>();
            Class<?> clazz = eventClass;
            while (clazz != null) {
                eventTypes.add(clazz);
                addInterfaces(eventTypes, clazz.getInterfaces());
                clazz = clazz.getSuperclass();
            }
            eventTypesCache.put(eventClass, eventTypes);
        }
        return eventTypes;
    }
}

其实这个代码并不负责,简单的理解就是根据Event类查找其父类,然后添加到eventTypesCache里面。那另外一个函数干什么的呢?其实真正的工作都在postSingleEventForEventType(event, postingState, clazz);里面,我们去看看。

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass)

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        // 根据Event类型获取其对应的订阅者。
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    // 如果存在订阅者
    if (subscriptions != null && !subscriptions.isEmpty()) {
        // 依次发送给对应的订阅者
        for (Subscription subscription : subscriptions) {
            // 设置post()方法调用线程中对应的postingState
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                // 将event发送给对应的订阅者。
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

什么?我们再一看,好吧,这个代码也没有去真正的发送消息,那我们看看这个方法到底做了什么工作呢?

  1. subscriptions = subscriptionsByEventType.get(eventClass);首先通过这行代码,从subscriptionsByEventType中,根据Event的类,获取所有的对应的subscriptions,需要注意的是,此段代码使用EventBus的实例进行同步,实际上是同步的订阅者的List
  2. 如果subscriptions==null 或者 subscriptions.size()==0,即如果不存在对应的subscriptions,那么则返回即可。
  3. 将Event依次发送给每一个Subscription
    1. 设置postingState.event = event,设置postingState.subscription = subscription;
    2. 调用方法postToSubscription(subscription, event, postingState.isMainThread);发送消息
    3. 恢复postingState默认状态为null
    4. 重复步骤3

可以看到,真正发送消息的工作还没有看到,在postToSubscription方法当中,好吧,那我们继续来学习这个方法都做了什么?

postToSubscription

源代码如下:

// 将event发送给对应的调用者
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case PostThread:
            // 直接调用
            invokeSubscriber(subscription, event);
            break;
        case MainThread:
            if (isMainThread) {
                // 如果post的发送线程是UI线程,那么则直接调用对应的方法即可
                invokeSubscriber(subscription, event);
            } else {
                // 否则则发送到main线程中对应的Handler中
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case BackgroundThread:
            // 背景线程
            if (isMainThread) {
                // 如果当前工作在主线程,则直接压入背景Poster的队列
                backgroundPoster.enqueue(subscription, event);
            } else {
                // 反之则直接调用
                invokeSubscriber(subscription, event);
            }
            break;
        case Async:
            //直接压入异步Poster的队列
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

可以看到,这里是最终的调用订阅者Event处理方法的地方,针对ThreadMode的不同,postToSubscription方法采取了不同的策略

  • PostThread:这个是直接在当前线程上调用处理的方法,所以直接通过invokeSubscriber(subscription, event);调用对应的方法
  • MainThread:
    • 当前在主线程上:同PostThread一样,直接通过invokeSubscriber(subscription, event);反射调用对应的方法
    • 不在主线程上,则通过mainThreadPoster.enqueue(subscription, event);将Event压入队列等待处理
  • BackgroundThread:
    • 当前在主线程上:通过backgroundPoster.enqueue(subscription, event);将Event压入队列,等待处理
    • 当前不在主线程上:通过invokeSubscriber(subscription, event);反射调用对应的方法
  • Async:直接将Event压入队列asyncPoster.enqueue(subscription, event);

好了,到这里,我们基本上明白了Event在EventBus中数据是怎么传递的了,但每一个ThreadMode不同的处理方法我们还没有看,到底是怎么样的呢?

invokeSubscriber(Subscription subscription, Object event) 源代码

    try {
        // 使用反射机制,调用对应的事件处理函数。
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

这个地方的代码比较简单了,其实就是将反射的调用包装了一下,不需要多说。

我们先来看一下MainThread的时候,其Poster的处理办法:

HandlerPoster

/*
 * Copyright (C) 2012 Markus Junginger, greenrobot (http://greenrobot.de)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package de.greenrobot.event;

import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.os.SystemClock;

/**
 * 主线程Poster,本质上为一个Handler 
 *
 */
final class HandlerPoster extends Handler {

    // 维护一个PendingPostQueue的队列
    private final PendingPostQueue queue;
    // 不太懂
    private final int maxMillisInsideHandleMessage;
    // EventBus对象
    private final EventBus eventBus;
    // 标记本Handler是否空闲:true:忙,false:空闲
    private boolean handlerActive;

    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    // 入队列
    void enqueue(Subscription subscription, Object event) {
        // 获取一个PendingPost
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            // 将pendingPost放入queue队列当中
            queue.enqueue(pendingPost);
            // 发送消息
            if (!handlerActive) {
                handlerActive = true;
                // 通过Handler中的MessageQueue,将通知工作在某个线程(可能是main Thread,post thread,background thread,asnyc thread)处理消息
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    // 消息的处理,需要注意的是,该函数段是工作在Looper对应的线程之上的。
    // 有个问题,如果event很快处理完成,那么这个时候是不需要rescheduled的,那么如果在该event处理过程当中,已经放入其他的消息,那么这个消息会在什么时候得到处理呢?
    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            // 记录开始时间
            long started = SystemClock.uptimeMillis();
            while (true) {
                // 从等待处理的队列当中获取一个PendingPost
                PendingPost pendingPost = queue.poll();
                // 判断获取到的pendingPost是否为null,如果null则是没有需要处理的event
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        // 再次处理,需要注意的是,该方法是同步的,跟谁同步的呢?是跟enqueue方法中的代码块同步,做什么用呢?
                        // 我理解的是,此处的代码主要是用于避免一种现象的发生,就是上面已经给Handler发送消息,但并未处理的时候。---> 但貌似又不是
                        // 这次是对的:就是等待前面的enqueue函数执行完成,以便于从中获取event进行处理,如果此时仍然为空,说明队列是空的,标记handlerActive为空,
                        // 这样的话,下次enqueue的时候,就可以直接通过sendMessage通知Handler立刻进行处理。
                        pendingPost = queue.poll();
                        // 如果再次从中获取数据,但为空,则说明handler不是Activie的了。
                        if (pendingPost == null) {
                            // 标记handler已经空闲
                            handlerActive = false;
                            return;
                        }
                    }
                }
                // eventBus调用订阅者的对应的方法
                eventBus.invokeSubscriber(pendingPost);
                // 工作做完,统计消耗时间
                long timeInMethod = SystemClock.uptimeMillis() - started;
                // 超时
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    // 立刻尝试处理下一个消息
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    // 设置标记
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            // 如果已经rescheduled,那么说明此时该handler已经在忙,否则则说明handler已经空闲。
            handlerActive = rescheduled;
        }
    }
}

上面是我添加过注释的源代码,我们可以发现以下的特点:

  1. 主线程的Poster本质上是一个Handler,因此关键的一点就是,看Handler到底工作在哪个Looper上,通过EventBus的默认初始化代码
    mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);

    可以发现,其实主线程上的Poster就是一个工作在主线程上的Handler,那么剩下的就比较简单了。

  2. enqueue,入队列,其实就做了以下的事情:
    1. 同步保护,防止从多个线程同时发送消息的时候出现错误
    2. queue.enqueue(pendingPost);将需要处理的PendingPost压入队列
    3. 通过sendMessage(obtainMessage())将消息发送给Handler进行处理
  3. handleMessage(Message msg):消息处理的方法
    1. pendingPost = queue.poll();获取数据,如果为null,那么意味着没有数据可以处理,标记当前活动状态为false,那么下一次enqueue入列的时候,就可以直接通知handler进行数据处理
    2. 获取成功,则通过eventBus.invokeSubscriber(pendingPost);调用相应的方法进行处理
    3. 如果此次消息处理超时,则直接通过sendMessage(obtainMessage())进行下一次消息处理

BackgroundPoster

源代码如下:

/*
 * Copyright (C) 2012 Markus Junginger, greenrobot (http://greenrobot.de)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package de.greenrobot.event;

import android.util.Log;

/**
 * Posts events in background.
 * 在后台线程当中处理events
 * @author Markus
 */
final class BackgroundPoster implements Runnable {

    // 一个保存有PendingPost的队列
    private final PendingPostQueue queue;
    // 保持对EventBus的引用
    private final EventBus eventBus;

    // 看现在的BackgroundPoster是否正在处理event
    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        // 根据subscription和event构建PendingPost
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            // 将构建好的PendingPost加入到队列。
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                // 如果当前队列空闲,则设置其为忙,并通过EventBus的线程池执行该线程
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    // 阻塞方法,从PendignPostQueue中获取一个PendingPost
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            // 原理同我们之前分析的mainPoster一样的,都是防止在加入的时候尝试取PendignPost而取不到,
                            // 代码到这里的时候,则保证如果要加入队列,工作已经完成的。
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    // 调用对应的订阅者方法
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            // 此时说明没有工作可做,因此释放该线程完成工作,设置标记为false。
            executorRunning = false;
        }
    }

}

通过比较和Poster代码,发现其实现的原理不一样,但大体的机制基本一直:所有的Event都是缓存在PendingPostQueue当中,当enqueue的时候入队列,然后不同ThreadMode的Poster会以不同的方式处理相应的Event。

BackgroundPoster的Event的入列方式:

  1. 将PendingPost加入到队列当中
  2. 判断如果当前的executorRunning==false,即当前BackgroundPoster没有线程在处理
    1. 设置标记executorRunning=true
    2. 将该线程提交给EventBus默认的ExecutorService进行处理

BackgroundPoster的Event的处理方式的几个特点:

  1. 当开启一个BackgroundPoster之后,会一直处理所有的PendignPost直至所有的全部处理完成。
  2. 当使用queue.poll(1000)获取PendignPost,仍然没有取回之后,会进入同步保护块(避免此时有新的PendignPost加入队列,但该线程看不到),再次尝试,如果依然没有PendingPost,说明此时没有Event通过BackgroundPoster进行处理,线程可以安全退出。

此时,我们再来回顾使用EventBus中,关于BackgroundPoster的几个说明:

  1. BackgroundPoster只会顺序对Event进行处理,因此不适合并发的情况。

对比看完了BackgroundPoster,还需要继续学习一下AsyncPoster的使用

AsyncPoster

class AsyncPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        // 与mainPoster和backgroundPoster相比,直接将PendingPost执行
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        // 获取需要处理的PendingPost
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        // 调用相应的订阅者方法
        eventBus.invokeSubscriber(pendingPost);
    }

}

一对比,发现跟BackgroundPoster有两点有区别:

  • 入队列的时候,会直接将线程提交给ExecutorService()进行处理,不需要检查当前是否有AsyncPoster任务在执行
  • 每一个AsyncPoster任务只负责一个PendignPostQueue的处理。

 

 

About: happyhls


2 thoughts on “EventBus源代码解析:2、消息的发布与处理”

发表评论

电子邮件地址不会被公开。 必填项已用*标注