EventBus 原理解析
我们都知道,在Android中默认只有一个UI线程,并且还不是线程安全的,当我们要做一些耗时的操作时,就有必要另开一个线程来做这些事以此来避免ANR。不过这其中会遇到一个问题,在异步线程中做完一个task,要把信息反馈给UI线程才能去操作view。
一般我们是怎么做的呢?handler,传递一个UI线程的handler给这个task,完成后用这个handler去sendMessage也好post也好,那就可以到主线程里去处理了。不过handler的劣势也很明显:
-
message中的Data只能通过bundle传递,这就也就是必须是可序列化的数据,这有一定的限制,而且效率上也会打折扣
-
handler生命周期跟主线程的Looper是一样的,比Activity、Service等系统组建的生命周期要长的多,不注意使用就会造成activity等内存泄漏,即使是使用static来声明这个handler,那也会长期占用内存直到UI线程退出,如果每个activity都要使用handler,那看起来确实不那么优雅
-
如果是两个非UI线程之间的交互,那接受线程就必须创建自己的Loop,或者直接使用HandlerThread
有没有稍微便捷一点的方法呢?EventBus就给我们提供了这么一种工具,这个项目在github上收获了大量的star,那我们本着学习的心态,看看人家是怎么处理这样的case的
还是要简单的从如何使用开始讲起:
1 public class TestEvent {
2
3 public final String msg;
4
5 public TestEvent(String msg) {
6 this.msg = msg;
7 }
8 }
9
10 public class MyActivity extends Activity {
11
12 @Override
13 public void onCreate(Bundle savedInstanceState) {
14 super.onCreate(savedInstanceState);
15 EventBus.getDefault().register(this);
16 }
17
18 @Override
19 public void onDestroy() {
20 EventBus.getDefault().unregister(this);
21 super.onDestroy();
22 }
23
24 @Subscribe(threadMode = ThreadMode.MAIN)
25 public void onEventMainThread(TestEvent event) {
26 System.out.println(event.msg)
27 }
28 }
29
30 // somewhere in another thread
31
32 EventBus.getDefault().post(new TestEvent("this is a test"));
用法很简单,先自定义一个Event,然后在你要接收这个类型event的activity里用注解写一个onEvent,在别的线程中post一个此类型的event,就可以在activity中收到这个event了,当然不能忘记在activity的onCreate/onDestory里进行register/unregister操作
最大的疑惑可能就是注解里的threadMode了,先来解释一下,一共有四种mode: - POSTING:在哪个线程中post就在哪个线程中调用onEvent,没有线程的切换
-
MAIN:onEvent一定会在UI线程中调用
-
BACKGROUND:如果post thread不是UI线程,那就直接在这个线程里调用onEvent,如果在UI线程里post,那就会在后台开一个线程,一个接着一个的调用onEventBackground
-
ASYNC:onEvent的调用会独立于post线程和主线程,也就是会另开一个线程池并发、异步的处理这些event
那我们就从register开始说起吧
1 /**
2 * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
3 * are no longer interested in receiving events.
4 * <p/>
5 * Subscribers have event handling methods that must be annotated by {@link Subscribe}.
6 * The {@link Subscribe} annotation also allows configuration like {@link
7 * ThreadMode} and priority.
8 */
9 public void register(Object subscriber) {
10 Class<?> subscriberClass = subscriber.getClass();
11 List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
12 synchronized (this) {
13 for (SubscriberMethod subscriberMethod : subscriberMethods) {
14 subscribe(subscriber, subscriberMethod);
15 }
16 }
17 }
-
首先获得subscriber的class
-
根据这个class找到一个订阅方法列表
-
然后将这些订阅方法和subscriber一起订阅起来
一头雾水,那就看看findSubscriberMethods做了什么:
1 private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
2
3 List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
4 List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
5 if (subscriberMethods != null) {
6 return subscriberMethods;
7 }
8
9 if (ignoreGeneratedIndex) {
10 subscriberMethods = findUsingReflection(subscriberClass);
11 } else {
12 subscriberMethods = findUsingInfo(subscriberClass);
13 }
14 if (subscriberMethods.isEmpty()) {
15 throw new EventBusException("Subscriber " + subscriberClass
16 + " and its super classes have no public methods with the @Subscribe annotation");
17 } else {
18 METHOD_CACHE.put(subscriberClass, subscriberMethods);
19 return subscriberMethods;
20 }
21 }
这里通过ConcurrentHashMap建立了一个小缓存,先去这里面找,找不到了,我们先看看是如何通过反射的方式来操作的:
1 private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
2 FindState findState = prepareFindState();
3 findState.initForSubscriber(subscriberClass);
4 while (findState.clazz != null) {
5 findUsingReflectionInSingleClass(findState);
6 findState.moveToSuperclass();
7 }
8 return getMethodsAndRelease(findState);
9 }
10
11 private FindState prepareFindState() {
12 synchronized (FIND_STATE_POOL) {
13 for (int i = 0; i < POOL_SIZE; i++) {
14 FindState state = FIND_STATE_POOL[i];
15 if (state != null) {
16 FIND_STATE_POOL[i] = null;
17 return state;
18 }
19 }
20 }
21 return new FindState();
22 }
首先prepareFindState里面,只是从一个FIND_STATE_POOL数组里拿出一个findstate,那还得看看FindState是什么:
1 static class FindState {
2 final List<SubscriberMethod> subscriberMethods = new ArrayList<>();
3 final Map<Class, Object> anyMethodByEventType = new HashMap<>();
4 final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();
5 final StringBuilder methodKeyBuilder = new StringBuilder(128);
6
7 Class<?> subscriberClass;
8 Class<?> clazz;
9 boolean skipSuperClasses;
10 SubscriberInfo subscriberInfo;
11
12 void initForSubscriber(Class<?> subscriberClass) {
13 this.subscriberClass = clazz = subscriberClass;
14 skipSuperClasses = false;
15 subscriberInfo = null;
16 }
只看到了一大托的List、Map,貌似要是存放一些什么东西,init好像也只是设置了clazz 还是来看看到底反射能帮我们做些什么吧:
1 private void findUsingReflectionInSingleClass(FindState findState) {
2 Method[] methods;
3 try {
4 // This is faster than getMethods, especially when subscribers are fat classes like Activities
5 methods = findState.clazz.getDeclaredMethods();
6 } catch (Throwable th) {
7 // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
8 methods = findState.clazz.getMethods();
9 findState.skipSuperClasses = true;
10 }
11 for (Method method : methods) {
12 int modifiers = method.getModifiers();
13 if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
14 Class<?>[] parameterTypes = method.getParameterTypes();
15 if (parameterTypes.length == 1) {
16 Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
17 if (subscribeAnnotation != null) {
18 Class<?> eventType = parameterTypes[0];
19 if (findState.checkAdd(method, eventType)) {
20 ThreadMode threadMode = subscribeAnnotation.threadMode();
21 findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
22 subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
23 }
24 }
25 } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
26 String methodName = method.getDeclaringClass().getName() + "." + method.getName();
27 throw new EventBusException("@Subscribe method " + methodName +
28 "must have exactly 1 parameter but has " + parameterTypes.length);
29 }
30 } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
31 String methodName = method.getDeclaringClass().getName() + "." + method.getName();
32 throw new EventBusException(methodName +
33 " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
34 }
35 }
36 }
-
先通过反射,找到这个class(也就是我们的Activity)的所有method,遍历它们
-
13行确定这个method仅仅是public,才能继续处理
-
14、15行说,这个method必须只有一个类型的参数才可以!
-
接下来看这个method有没有声明Subscribe注解,没有的话不要!
-
19行将这个method和event参数的class类型(例子中就是TestEvent.class)交给findState.checkAdd,看看它做了什么:
1 boolean checkAdd(Method method, Class<?> eventType) {
2 // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
3 // Usually a subscriber doesn't have methods listening to the same event type.
4 Object existing = anyMethodByEventType.put(eventType, method);
5 if (existing == null) {
6 return true;
7 } else {
8 if (existing instanceof Method) {
9 if (!checkAddWithMethodSignature((Method) existing, eventType)) {
10 // Paranoia check
11 throw new IllegalStateException();
12 }
13 // Put any non-Method object to "consume" the existing Method
14 anyMethodByEventType.put(eventType, this);
15 }
16 return checkAddWithMethodSignature(method, eventType);
17 }
18 }
19
20 private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
21 methodKeyBuilder.setLength(0);
22 methodKeyBuilder.append(method.getName());
23 methodKeyBuilder.append('>').append(eventType.getName());
24
25 String methodKey = methodKeyBuilder.toString();
26 Class<?> methodClass = method.getDeclaringClass();
27 Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
28 if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
29 // Only add if not already found in a sub class
30 return true;
31 } else {
32 // Revert the put, old class is further down the class hierarchy
33 subscriberClassByMethodKey.put(methodKey, methodClassOld);
34 return false;
35 }
36 }
先看checkAddWithMethodSignature:
首先把method的name和eventType的name拼接成字符串key
找到method的class,看看subscriberClassByMethodKey中有没有key和class的pair
如果有,就替换为原来的的class,返回false
如果没有或者原来class是method的class的父类(也就是子类把父类替换掉)则返回true
再看checkAdd
-
检查是否之前已经存过对应的eventType与method的pair,如过没存过,返回true,add成功
-
如果之前存过,并且以前的那个也是一个Method,则进入checkAddWithMethodSignature,如果签名不匹配,则会抛出异常,反之会设置一个无关紧要的object,这样下次检查的时候就不用这么费事了
-
最后再检查一下当前method和evenType的签名是否正确
回到findUsingReflectionInSingleClass
- 21行new一个SubscriberMethod,也就是保存一些信息,放入subscriberMethods的一个List中就结束了
回到findUsingReflection,不断的去找当前clazz的父类,直到系统类,最后会释放这个findState,把它还给FIND_STATE_POOL
这样,我们就得到了一个SubscriberMethod的列表,每一个SubscriberMethod里面都维护了method、threadMode、eventType、priority、sticky参数 回到register里,有了这个列表,我们就可以对每一个SubscriberMethod调用subscribe了:
1 // Must be called in synchronized block
2 private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
3 Class<?> eventType = subscriberMethod.eventType;
4 Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
5 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
6 if (subscriptions == null) {
7 subscriptions = new CopyOnWriteArrayList<>();
8 subscriptionsByEventType.put(eventType, subscriptions);
9 } else {
10 if (subscriptions.contains(newSubscription)) {
11 throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
12 + eventType);
13 }
14 }
15
16 int size = subscriptions.size();
17 for (int i = 0; i <= size; i++) {
18 if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
19 subscriptions.add(i, newSubscription);
20 break;
21 }
22 }
23
24 List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
25 if (subscribedEvents == null) {
26 subscribedEvents = new ArrayList<>();
27 typesBySubscriber.put(subscriber, subscribedEvents);
28 }
29 subscribedEvents.add(eventType);
30
31 if (subscriberMethod.sticky) {
32 if (eventInheritance) {
33 // Existing sticky events of all subclasses of eventType have to be considered.
34 // Note: Iterating over all events may be inefficient with lots of sticky events,
35 // thus data structure should be changed to allow a more efficient lookup
36 // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
37 Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
38 for (Map.Entry<Class<?>, Object> entry : entries) {
39 Class<?> candidateEventType = entry.getKey();
40 if (eventType.isAssignableFrom(candidateEventType)) {
41 Object stickyEvent = entry.getValue();
42 checkPostStickyEventToSubscription(newSubscription, stickyEvent);
43 }
44 }
45 } else {
46 Object stickyEvent = stickyEvents.get(eventType);
47 checkPostStickyEventToSubscription(newSubscription, stickyEvent);
48 }
49 }
50 }
-
首先获取eventType类型,并把这个对象和subscriberMethod拼成一个Subscription,注意到没有,我们有了对象,有了方法,以后就可以通过反射去调用这个方法了!
-
通过这个eventType从hashMap里获得一个Subscription的列表,把新建的Subscription加进去
-
再把这个eventType加到这个subscriber所订阅的eventType集合里去
-
最后会保存一些sticky的event,保证它们是可以被被动查询到的
就这样结束了,所有的Subscription都保存在subscriptionsByEventType里,通过eventType进行分类,然后对应的value是一个列表,存放订阅这个eventType的所有对象以及方法和threadMode
接下来看看post:
1 /** Posts the given event to the event bus. */
2 public void post(Object event) {
3 PostingThreadState postingState = currentPostingThreadState.get();
4 List<Object> eventQueue = postingState.eventQueue;
5 eventQueue.add(event);
6
7 if (!postingState.isPosting) {
8 postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
9 postingState.isPosting = true;
10 if (postingState.canceled) {
11 throw new EventBusException("Internal error. Abort state was not reset");
12 }
13 try {
14 while (!eventQueue.isEmpty()) {
15 postSingleEvent(eventQueue.remove(0), postingState);
16 }
17 } finally {
18 postingState.isPosting = false;
19 postingState.isMainThread = false;
20 }
21 }
22 }
在这其中会将这个event加入一个队列,如果队列还没有开始post,则会最终调用postSingleEventForEventType:
1 private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
2 CopyOnWriteArrayList<Subscription> subscriptions;
3 synchronized (this) {
4 subscriptions = subscriptionsByEventType.get(eventClass);
5 }
6 if (subscriptions != null && !subscriptions.isEmpty()) {
7 for (Subscription subscription : subscriptions) {
8 postingState.event = event;
9 postingState.subscription = subscription;
10 boolean aborted = false;
11 try {
12 postToSubscription(subscription, event, postingState.isMainThread);
13 aborted = postingState.canceled;
14 } finally {
15 postingState.event = null;
16 postingState.subscription = null;
17 postingState.canceled = false;
18 }
19 if (aborted) {
20 break;
21 }
22 }
23 return true;
24 }
25 return false;
26 }
它会根据event的类型,从subscriptionsByEventType里找到之前提过的subscription队列,然后一个一个执行postToSubscription:
1 private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
2 switch (subscription.subscriberMethod.threadMode) {
3 case POSTING:
4 invokeSubscriber(subscription, event);
5 break;
6 case MAIN:
7 if (isMainThread) {
8 invokeSubscriber(subscription, event);
9 } else {
10 mainThreadPoster.enqueue(subscription, event);
11 }
12 break;
13 case BACKGROUND:
14 if (isMainThread) {
15 backgroundPoster.enqueue(subscription, event);
16 } else {
17 invokeSubscriber(subscription, event);
18 }
19 break;
20 case ASYNC:
21 asyncPoster.enqueue(subscription, event);
22 break;
23 default:
24 throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
25 }
26 }
如果是POSTING,或者MAIN模式下已经在UI线程了又或者是在BACKGROUND模式但不在UI线程,直接反射调用:
1 void invokeSubscriber(Subscription subscription, Object event) {
2 try {
3 subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
4 } catch (InvocationTargetException e) {
5 handleSubscriberException(subscription, event, e.getCause());
6 } catch (IllegalAccessException e) {
7 throw new IllegalStateException("Unexpected exception", e);
8 }
9 }
-
如果是MAIN模式,会通过mainThreadPoster,本质是一个handler,把这个subscription加入它的一个等待队列中,然后发送msg,在handleMessage里从队列里取出这个subscription反射执行
-
如果是BACKGROUND模式,会投递到backgroundPoster里,也加入一个队列,通过eventBus内部的线程池执行一个runnable,这个runnable循环从这个队列里取出subscription并执行,但是要注意的是,这个backgroundPoster有一个private volatile boolean executorRunning变量,保证串行的执行这些subscription
-
如果是ASYNC模式,投递到asyncPoster,也有是eventBus的内部线程池执行的,它是可以并行的
总结
-
利用反射的方法来实现事件总线,觉得很巧妙,感慨反射的强大,虽然反射是运行时进行的,由于不能预加载class以及jit优化,效率肯定是比不上已经编译好的字节码的,但这种动态语言的思想还是很重要的
-
这次又看到了XXX_POOL,这种什么什么池的概念在大规模处理数据的时候还是很有用的,减少了gc和重新new的开销
-
还有一些细节并没有追踪完整,但我觉得目前要做的还是理解它大致框架和思想是第一位的,代码这种东西都是熟能生巧的