我们都知道,在Android中默认只有一个UI线程,并且还不是线程安全的,当我们要做一些耗时的操作时,就有必要另开一个线程来做这些事以此来避免ANR。不过这其中会遇到一个问题,在异步线程中做完一个task,要把信息反馈给UI线程才能去操作view。
一般我们是怎么做的呢?handler,传递一个UI线程的handler给这个task,完成后用这个handler去sendMessage也好post也好,那就可以到主线程里去处理了。不过handler的劣势也很明显:

  1. message中的Data只能通过bundle传递,这就也就是必须是可序列化的数据,这有一定的限制,而且效率上也会打折扣

  2. handler生命周期跟主线程的Looper是一样的,比Activity、Service等系统组建的生命周期要长的多,不注意使用就会造成activity等内存泄漏,即使是使用static来声明这个handler,那也会长期占用内存直到UI线程退出,如果每个activity都要使用handler,那看起来确实不那么优雅

  3. 如果是两个非UI线程之间的交互,那接受线程就必须创建自己的Loop,或者直接使用HandlerThread

有没有稍微便捷一点的方法呢?EventBus就给我们提供了这么一种工具,这个项目在github上收获了大量的star,那我们本着学习的心态,看看人家是怎么处理这样的case的

还是要简单的从如何使用开始讲起:

 1 public class TestEvent {
 2 
 3     public final String msg;
 4 
 5     public TestEvent(String msg) {
 6         this.msg = msg;
 7     }
 8 }
 9 
10 public class MyActivity extends Activity {
11 
12  	@Override
13 	public void onCreate(Bundle savedInstanceState) {
14 	    super.onCreate(savedInstanceState);
15 	    EventBus.getDefault().register(this);
16 	}
17 
18  	@Override
19 	public void onDestroy() {
20 	    EventBus.getDefault().unregister(this);
21 	    super.onDestroy();
22 	}
23 
24 	@Subscribe(threadMode = ThreadMode.MAIN)
25 	public void onEventMainThread(TestEvent event) {
26 	    System.out.println(event.msg)    
27 	}
28 }
29 
30 // somewhere in another thread
31 
32 EventBus.getDefault().post(new TestEvent("this is a test"));

用法很简单,先自定义一个Event,然后在你要接收这个类型event的activity里用注解写一个onEvent,在别的线程中post一个此类型的event,就可以在activity中收到这个event了,当然不能忘记在activity的onCreate/onDestory里进行register/unregister操作

最大的疑惑可能就是注解里的threadMode了,先来解释一下,一共有四种mode: - POSTING:在哪个线程中post就在哪个线程中调用onEvent,没有线程的切换

  • MAIN:onEvent一定会在UI线程中调用

  • BACKGROUND:如果post thread不是UI线程,那就直接在这个线程里调用onEvent,如果在UI线程里post,那就会在后台开一个线程,一个接着一个的调用onEventBackground

  • ASYNC:onEvent的调用会独立于post线程和主线程,也就是会另开一个线程池并发、异步的处理这些event

那我们就从register开始说起吧

 1 /**
 2      * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
 3      * are no longer interested in receiving events.
 4      * <p/>
 5      * Subscribers have event handling methods that must be annotated by {@link Subscribe}.
 6      * The {@link Subscribe} annotation also allows configuration like {@link
 7      * ThreadMode} and priority.
 8      */
 9     public void register(Object subscriber) {
10         Class<?> subscriberClass = subscriber.getClass();
11         List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
12         synchronized (this) {
13             for (SubscriberMethod subscriberMethod : subscriberMethods) {
14                 subscribe(subscriber, subscriberMethod);
15             }
16         }
17     }
  • 首先获得subscriber的class

  • 根据这个class找到一个订阅方法列表

  • 然后将这些订阅方法和subscriber一起订阅起来

一头雾水,那就看看findSubscriberMethods做了什么:

 1 private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
 2 
 3     List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
 4         List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
 5         if (subscriberMethods != null) {
 6             return subscriberMethods;
 7         }
 8 
 9         if (ignoreGeneratedIndex) {
10             subscriberMethods = findUsingReflection(subscriberClass);
11         } else {
12             subscriberMethods = findUsingInfo(subscriberClass);
13         }
14         if (subscriberMethods.isEmpty()) {
15             throw new EventBusException("Subscriber " + subscriberClass
16                     + " and its super classes have no public methods with the @Subscribe annotation");
17         } else {
18             METHOD_CACHE.put(subscriberClass, subscriberMethods);
19             return subscriberMethods;
20         }
21     }

这里通过ConcurrentHashMap建立了一个小缓存,先去这里面找,找不到了,我们先看看是如何通过反射的方式来操作的:

 1 private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
 2     FindState findState = prepareFindState();
 3     findState.initForSubscriber(subscriberClass);
 4     while (findState.clazz != null) {
 5         findUsingReflectionInSingleClass(findState);
 6         findState.moveToSuperclass();
 7     }
 8     return getMethodsAndRelease(findState);
 9 }
10 
11 private FindState prepareFindState() {
12     synchronized (FIND_STATE_POOL) {
13         for (int i = 0; i < POOL_SIZE; i++) {
14             FindState state = FIND_STATE_POOL[i];
15             if (state != null) {
16                 FIND_STATE_POOL[i] = null;
17                 return state;
18             }
19         }
20     }
21     return new FindState();
22 }

首先prepareFindState里面,只是从一个FIND_STATE_POOL数组里拿出一个findstate,那还得看看FindState是什么:

 1 static class FindState {
 2     final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
 3     final Map<Class, Object> anyMethodByEventType = new HashMap<>();
 4     final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
 5     final StringBuilder methodKeyBuilder = new StringBuilder(128);
 6 
 7     Class<?> subscriberClass;
 8     Class<?> clazz;
 9     boolean skipSuperClasses;
10     SubscriberInfo subscriberInfo;
11 
12     void initForSubscriber(Class<?> subscriberClass) {
13         this.subscriberClass = clazz = subscriberClass;
14         skipSuperClasses = false;
15         subscriberInfo = null;
16     }

只看到了一大托的List、Map,貌似要是存放一些什么东西,init好像也只是设置了clazz 还是来看看到底反射能帮我们做些什么吧:

 1 private void findUsingReflectionInSingleClass(FindState findState) {
 2     Method[] methods;
 3     try {
 4         // This is faster than getMethods, especially when subscribers are fat classes like Activities
 5         methods = findState.clazz.getDeclaredMethods();
 6     } catch (Throwable th) {
 7         // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
 8         methods = findState.clazz.getMethods();
 9         findState.skipSuperClasses = true;
10     }
11     for (Method method : methods) {
12         int modifiers = method.getModifiers();
13         if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
14             Class<?>[] parameterTypes = method.getParameterTypes();
15             if (parameterTypes.length == 1) {
16                 Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
17                 if (subscribeAnnotation != null) {
18                     Class<?> eventType = parameterTypes[0];
19                     if (findState.checkAdd(method, eventType)) {
20                         ThreadMode threadMode = subscribeAnnotation.threadMode();
21                         findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
22                                 subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
23                     }
24                 }
25             } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
26                 String methodName = method.getDeclaringClass().getName() + "." + method.getName();
27                 throw new EventBusException("@Subscribe method " + methodName +
28                         "must have exactly 1 parameter but has " + parameterTypes.length);
29             }
30         } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
31             String methodName = method.getDeclaringClass().getName() + "." + method.getName();
32             throw new EventBusException(methodName +
33                     " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
34         }
35     }
36 }
  • 先通过反射,找到这个class(也就是我们的Activity)的所有method,遍历它们

  • 13行确定这个method仅仅是public,才能继续处理

  • 14、15行说,这个method必须只有一个类型的参数才可以!

  • 接下来看这个method有没有声明Subscribe注解,没有的话不要!

  • 19行将这个method和event参数的class类型(例子中就是TestEvent.class)交给findState.checkAdd,看看它做了什么:

 1 boolean checkAdd(Method method, Class<?> eventType) {
 2     // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
 3     // Usually a subscriber doesn't have methods listening to the same event type.
 4     Object existing = anyMethodByEventType.put(eventType, method);
 5     if (existing == null) {
 6         return true;
 7     } else {
 8         if (existing instanceof Method) {
 9             if (!checkAddWithMethodSignature((Method) existing, eventType)) {
10                 // Paranoia check
11                 throw new IllegalStateException();
12             }
13             // Put any non-Method object to "consume" the existing Method
14             anyMethodByEventType.put(eventType, this);
15         }
16         return checkAddWithMethodSignature(method, eventType);
17     }
18 }
19 
20 private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
21     methodKeyBuilder.setLength(0);
22     methodKeyBuilder.append(method.getName());
23     methodKeyBuilder.append('>').append(eventType.getName());
24 
25     String methodKey = methodKeyBuilder.toString();
26     Class<?> methodClass = method.getDeclaringClass();
27     Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
28     if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
29         // Only add if not already found in a sub class
30         return true;
31     } else {
32         // Revert the put, old class is further down the class hierarchy
33         subscriberClassByMethodKey.put(methodKey, methodClassOld);
34         return false;
35     }
36 }

先看checkAddWithMethodSignature:

首先把method的name和eventType的name拼接成字符串key
找到method的class,看看subscriberClassByMethodKey中有没有key和class的pair
如果有,就替换为原来的的class,返回false
如果没有或者原来class是method的class的父类(也就是子类把父类替换掉)则返回true

再看checkAdd

  1. 检查是否之前已经存过对应的eventType与method的pair,如过没存过,返回true,add成功

  2. 如果之前存过,并且以前的那个也是一个Method,则进入checkAddWithMethodSignature,如果签名不匹配,则会抛出异常,反之会设置一个无关紧要的object,这样下次检查的时候就不用这么费事了

  3. 最后再检查一下当前method和evenType的签名是否正确

回到findUsingReflectionInSingleClass

  • 21行new一个SubscriberMethod,也就是保存一些信息,放入subscriberMethods的一个List中就结束了

回到findUsingReflection,不断的去找当前clazz的父类,直到系统类,最后会释放这个findState,把它还给FIND_STATE_POOL

这样,我们就得到了一个SubscriberMethod的列表,每一个SubscriberMethod里面都维护了method、threadMode、eventType、priority、sticky参数 回到register里,有了这个列表,我们就可以对每一个SubscriberMethod调用subscribe了:

 1 // Must be called in synchronized block
 2     private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
 3         Class<?> eventType = subscriberMethod.eventType;
 4         Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
 5         CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
 6         if (subscriptions == null) {
 7             subscriptions = new CopyOnWriteArrayList<>();
 8             subscriptionsByEventType.put(eventType, subscriptions);
 9         } else {
10             if (subscriptions.contains(newSubscription)) {
11                 throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
12                         + eventType);
13             }
14         }
15 
16         int size = subscriptions.size();
17         for (int i = 0; i <= size; i++) {
18             if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
19                 subscriptions.add(i, newSubscription);
20                 break;
21             }
22         }
23 
24         List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
25         if (subscribedEvents == null) {
26             subscribedEvents = new ArrayList<>();
27             typesBySubscriber.put(subscriber, subscribedEvents);
28         }
29         subscribedEvents.add(eventType);
30 
31         if (subscriberMethod.sticky) {
32             if (eventInheritance) {
33                 // Existing sticky events of all subclasses of eventType have to be considered.
34                 // Note: Iterating over all events may be inefficient with lots of sticky events,
35                 // thus data structure should be changed to allow a more efficient lookup
36                 // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
37                 Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
38                 for (Map.Entry<Class<?>, Object> entry : entries) {
39                     Class<?> candidateEventType = entry.getKey();
40                     if (eventType.isAssignableFrom(candidateEventType)) {
41                         Object stickyEvent = entry.getValue();
42                         checkPostStickyEventToSubscription(newSubscription, stickyEvent);
43                     }
44                 }
45             } else {
46                 Object stickyEvent = stickyEvents.get(eventType);
47                 checkPostStickyEventToSubscription(newSubscription, stickyEvent);
48             }
49         }
50     }
  • 首先获取eventType类型,并把这个对象和subscriberMethod拼成一个Subscription,注意到没有,我们有了对象,有了方法,以后就可以通过反射去调用这个方法了!

  • 通过这个eventType从hashMap里获得一个Subscription的列表,把新建的Subscription加进去

  • 再把这个eventType加到这个subscriber所订阅的eventType集合里去

  • 最后会保存一些sticky的event,保证它们是可以被被动查询到的

就这样结束了,所有的Subscription都保存在subscriptionsByEventType里,通过eventType进行分类,然后对应的value是一个列表,存放订阅这个eventType的所有对象以及方法和threadMode

接下来看看post:

 1 /** Posts the given event to the event bus. */
 2 public void post(Object event) {
 3     PostingThreadState postingState = currentPostingThreadState.get();
 4     List<Object> eventQueue = postingState.eventQueue;
 5     eventQueue.add(event);
 6 
 7     if (!postingState.isPosting) {
 8         postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
 9         postingState.isPosting = true;
10         if (postingState.canceled) {
11             throw new EventBusException("Internal error. Abort state was not reset");
12         }
13         try {
14             while (!eventQueue.isEmpty()) {
15                 postSingleEvent(eventQueue.remove(0), postingState);
16             }
17         } finally {
18             postingState.isPosting = false;
19             postingState.isMainThread = false;
20         }
21     }
22 }

在这其中会将这个event加入一个队列,如果队列还没有开始post,则会最终调用postSingleEventForEventType:

 1 private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
 2         CopyOnWriteArrayList<Subscription> subscriptions;
 3         synchronized (this) {
 4             subscriptions = subscriptionsByEventType.get(eventClass);
 5         }
 6         if (subscriptions != null && !subscriptions.isEmpty()) {
 7             for (Subscription subscription : subscriptions) {
 8                 postingState.event = event;
 9                 postingState.subscription = subscription;
10                 boolean aborted = false;
11                 try {
12                     postToSubscription(subscription, event, postingState.isMainThread);
13                     aborted = postingState.canceled;
14                 } finally {
15                     postingState.event = null;
16                     postingState.subscription = null;
17                     postingState.canceled = false;
18                 }
19                 if (aborted) {
20                     break;
21                 }
22             }
23             return true;
24         }
25         return false;
26     }

它会根据event的类型,从subscriptionsByEventType里找到之前提过的subscription队列,然后一个一个执行postToSubscription:

 1 private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
 2         switch (subscription.subscriberMethod.threadMode) {
 3             case POSTING:
 4                 invokeSubscriber(subscription, event);
 5                 break;
 6             case MAIN:
 7                 if (isMainThread) {
 8                     invokeSubscriber(subscription, event);
 9                 } else {
10                     mainThreadPoster.enqueue(subscription, event);
11                 }
12                 break;
13             case BACKGROUND:
14                 if (isMainThread) {
15                     backgroundPoster.enqueue(subscription, event);
16                 } else {
17                     invokeSubscriber(subscription, event);
18                 }
19                 break;
20             case ASYNC:
21                 asyncPoster.enqueue(subscription, event);
22                 break;
23             default:
24                 throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
25         }
26     }

如果是POSTING,或者MAIN模式下已经在UI线程了又或者是在BACKGROUND模式但不在UI线程,直接反射调用:

1 void invokeSubscriber(Subscription subscription, Object event) {
2         try {
3             subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
4         } catch (InvocationTargetException e) {
5             handleSubscriberException(subscription, event, e.getCause());
6         } catch (IllegalAccessException e) {
7             throw new IllegalStateException("Unexpected exception", e);
8         }
9     }
  • 如果是MAIN模式,会通过mainThreadPoster,本质是一个handler,把这个subscription加入它的一个等待队列中,然后发送msg,在handleMessage里从队列里取出这个subscription反射执行

  • 如果是BACKGROUND模式,会投递到backgroundPoster里,也加入一个队列,通过eventBus内部的线程池执行一个runnable,这个runnable循环从这个队列里取出subscription并执行,但是要注意的是,这个backgroundPoster有一个private volatile boolean executorRunning变量,保证串行的执行这些subscription

  • 如果是ASYNC模式,投递到asyncPoster,也有是eventBus的内部线程池执行的,它是可以并行的

总结

  • 利用反射的方法来实现事件总线,觉得很巧妙,感慨反射的强大,虽然反射是运行时进行的,由于不能预加载class以及jit优化,效率肯定是比不上已经编译好的字节码的,但这种动态语言的思想还是很重要的

  • 这次又看到了XXX_POOL,这种什么什么池的概念在大规模处理数据的时候还是很有用的,减少了gc和重新new的开销

  • 还有一些细节并没有追踪完整,但我觉得目前要做的还是理解它大致框架和思想是第一位的,代码这种东西都是熟能生巧的