EventBus
1. 什么是EventBus
总线(Bus)一般指计算机各种功能部件之间传送信息的公共通信干线,而EventBus则是事件源(publisher)向订阅方(subscriber)发送订阅事件的总线,它解耦了观察者模式中订阅方和事件源之间的强依赖关系。
2. guava EventBus的构成

3. SubscriberRegistry
18版本:
/** * Registers all subscriber methods on {@code object} to receive events. * Subscriber methods are selected and classified using this EventBus's * {@link SubscriberFindingStrategy}; the default strategy is the * {@link AnnotatedSubscriberFinder}. * * @param object object whose subscriber methods should be registered. */ public void register(Object object) { Multimap<Class<?>, EventSubscriber> methodsInListener = finder.findAllSubscribers(object); subscribersByTypeLock.writeLock().lock(); try { subscribersByType.putAll(methodsInListener); } finally { subscribersByTypeLock.writeLock().unlock(); } }
19版本:
/** * Registers all subscriber methods on the given listener object. */ void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); eventSubscribers = MoreObjects.firstNonNull( subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { // TODO(cgdecker): Should check for a generic parameter type and error out Class<?>[] parameterTypes = method.getParameterTypes(); checkArgument(parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); MethodIdentifier ident = new MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }
在构建Subscriber的时候根据方法是否有@AllowConcurrentEvents,分为同步和并发执行method。
4. Dispatcher
Dispatcher发布事件的时候有三种模式: 1. ImmediateDispatcher,来了一个事件则通知对这个事件感兴趣的订阅者。 2. LegacyAsyncDispatcher,会有一个全局的队列ConcurrentLinkedQueue<EventWithSubscriber> queue保存EventWithSubscriber(事件和subscriber),如果被不同的线程poll 不能保证在queue队列中的event是有序发布的。 3. PerThreadQueuedDispatcher,在同一个线程post的Event,执行的顺序是有序的。用ThreadLocal<Queue<Event>> queue来实现每个线程post的Event是有序的,在把事件添加到queue后会有一个ThreadLocal<Boolean> dispatching来判断当前线程是否正在分发,如果正在分发,则这次添加的event不会马上进行分发而是等到dispatching的值为false才进行。这样做的原因是为了防止同一个事件被重复分发。我的理解是这样的:如果没有dispatching这个状态变量,在ThreadA中EventA发布了,ListenerA收到了,ListenerA进行处理,在处理的过程中如果又触发了EventA的发布,如果该线程不结束则会陷入到一种循环中去。
5.EventBus的使用
一般的应用的场景就是在用观察者模式的地方就可以用EventBus进行替代。结合Spring的使用过程如下:
5.1定义Listener
5.2向EventBus注册Listener
5.3发送事件