Spring 进程内事件驱动

前言

通常我们都是在讨论基于MQ的分布式事件驱动,今天我们看下Spring是如何实现进程内的事件驱动,事件驱动是指某个事情的发生,然后向队列中广播该事件,其他关心此类事件的,则自动进行回调处理。

进程内事件驱动

运用到的设计模式:观察者模式。

  1. 基于disruptor开源高性能队列;
  2. 基于spring自带的事件驱动;

类继承体系

主要涉及到事件广播器,事件,事件监听器。在JDK中,对于事件驱动,已经为我们提供一些基础的规范类,规范的写法是自定义的事件类型都需要继承自EventObject,自定义的事件监听器都需要继承自EventListener。

事件基类:java.util.EventObject

public class EventObject implements java.io.Serializable {
private static final long serialVersionUID = 5516075349620653480L;
protected transient Object source;

public EventObject(Object source) {
if (source == null)
throw new IllegalArgumentException("null source");
this.source = source;
}

public Object getSource() {
return source;
}
}

事件监听器接口:java.util.EventListener

public interface EventListener {
}

在spring中,完成事件驱动的最核心的三个基类是:

  1. ApplicationEvent 所有spring事件的基类;
  2. ApplicationListener 所有基于spring监听器的基类;
  3. ApplicationEventMulticaster 事件广播器接口,负责广播事件;

事件继承体系

// spring 所有事件的基类
public abstract class ApplicationEvent extends EventObject {
/** use serialVersionUID from Spring 1.2 for interoperability */
private static final long serialVersionUID = 7099057708183571937L;
/** System time when the event happened */
private final long timestamp;
/**
* Create a new ApplicationEvent.
* @param source the object on which the event initially occurred (never {@code null})
*/
public ApplicationEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}
/**
* Return the system time in milliseconds when the event happened.
*/
public final long getTimestamp() {
return this.timestamp;
}
}

image

监听器继承体系

// spring所有的监听器基类
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
/**
* Handle an application event.
* @param event the event to respond to
*/
void onApplicationEvent(E event);
}

事件广播器

public interface ApplicationEventMulticaster {
/**
* Add a listener to be notified of all events.
* @param listener the listener to add
*/
void addApplicationListener(ApplicationListener<?> listener);

/**
* Add a listener bean to be notified of all events.
* @param listenerBeanName the name of the listener bean to add
*/
void addApplicationListenerBean(String listenerBeanName);

/**
* Remove a listener from the notification list.
* @param listener the listener to remove
*/
void removeApplicationListener(ApplicationListener<?> listener);

/**
* Remove a listener bean from the notification list.
* @param listenerBeanName the name of the listener bean to add
*/
void removeApplicationListenerBean(String listenerBeanName);

/**
* Remove all listeners registered with this multicaster.
* <p>After a remove call, the multicaster will perform no action
* on event notification until new listeners are being registered.
*/
void removeAllListeners();

/**
* Multicast the given application event to appropriate listeners.
* <p>Consider using {@link #multicastEvent(ApplicationEvent, ResolvableType)}
* if possible as it provides a better support for generics-based events.
* @param event the event to multicast
*/
void multicastEvent(ApplicationEvent event);

/**
* Multicast the given application event to appropriate listeners.
* <p>If the {@code eventType} is {@code null}, a default type is built
* based on the {@code event} instance.
* @param event the event to multicast
* @param eventType the type of event (can be null)
* @since 4.2
*/
void multicastEvent(ApplicationEvent event, ResolvableType eventType);
}

测试用例

要求:在spring容器初始化完成之后,打印初始化完成日志。既然要在spring容器初始化完成之后,打印日志;那么我们需要监听ContextRefreshedEvent事件,因为此事件是在spring容器refresh完成时调用。

自定义监听器

/**
* 将ApplicationStartup注入到spring容器,即可被spring监听器列表管理,当收到ContextRefreshedEvent事件
* 时,从容器中找出所有监听该事件的监听器,进行回调处理(支持同步和异步回调)
**/
@Component
public class ApplicationStartup implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ApplicationContext applicationContext = null;
// 当前上下文为root context,具体原因参考spring父子容器,防止重复触发
if(contextRefreshedEvent.getApplicationContext().getParent() == null) {
applicationContext = contextRefreshedEvent.getApplicationContext();
}
System.out.println("spring 上下文刷新完成. applicationContext=" + applicationContext.toString());
}
}

实现原理浅析

AbstractApplicationContext 抽象类:

public abstract class AbstractApplicationContext extends DefaultResourceLoader
implements ConfigurableApplicationContext, DisposableBean {
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// Prepare this context for refreshing.
prepareRefresh();

// Tell the subclass to refresh the internal bean factory.
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();

// Prepare the bean factory for use in this context.
prepareBeanFactory(beanFactory);

try {
// Allows post-processing of the bean factory in context subclasses.
postProcessBeanFactory(beanFactory);

// Invoke factory processors registered as beans in the context.
invokeBeanFactoryPostProcessors(beanFactory);

// Register bean processors that intercept bean creation.
registerBeanPostProcessors(beanFactory);

// Initialize message source for this context.
initMessageSource();

// Initialize event multicaster for this context.
initApplicationEventMulticaster();

// Initialize other special beans in specific context subclasses.
onRefresh();

// 遍历获取spring容器中继承自ApplicationListener的所有监听器
registerListeners();

// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);

// 最后一步: 发布ContextRefreshedEvent事件
finishRefresh();
} catch (BeansException ex) {
..... 省略部分代码 .....
} finally{
..... 省略部分代码 .....
}
}
}

// 获取spring容器中继承自ApplicationListener的所有监听器
protected void registerListeners() {
// Register statically specified listeners first.
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}

// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
// 扫描容器,获取ApplicationListener及其子类型的Bean监听器名称
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}

// Publish early application events now that we finally have a multicaster...
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}

// 发布spring上下文刷新完成事件
protected void finishRefresh() {
// Initialize lifecycle processor for this context.
initLifecycleProcessor();

// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();

// 向容器中发布ContextRefreshedEvent事件
publishEvent(new ContextRefreshedEvent(this));

// Participate in LiveBeansView MBean, if active.
LiveBeansView.registerApplicationContext(this);
}

}
```
SimpleApplicationEventMulticaster 是spring容器默认使用的事件广播器,具体实现如下:
```java
// spring事件广播器
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
private Executor taskExecutor;
private ErrorHandler errorHandler;

@Override
public void multicastEvent(ApplicationEvent event) {
multicastEvent(event, resolveDefaultEventType(event));
}

@Override
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
// 解析事件类型
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 根据事件类型,匹配到所有监听该事件及其子事件类型的监听器
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
// 异步回调监听器
executor.execute(new Runnable() {
@Override
public void run() {
invokeListener(listener, event);
}
});
}
else {
// 同步回调监听器
invokeListener(listener, event);
}
}
}

private ResolvableType resolveDefaultEventType(ApplicationEvent event) {
return ResolvableType.forInstance(event);
}

@SuppressWarnings({"unchecked", "rawtypes"})
protected void invokeListener(ApplicationListener listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
// 执行回调监听器
listener.onApplicationEvent(event);
} catch (Throwable err) {
// 回调报错后的异常处理
errorHandler.handleError(err);
}
} else {
try {
listener.onApplicationEvent(event);
} catch (ClassCastException ex) {
...... 省略部分代码 ......
}
}
}
}

AbstractApplicationEventMulticaster 事件广播器抽象类:

public abstract class AbstractApplicationEventMulticaster
implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {

// 事件类型和监听该事件的监听器集合映射关系缓存
final Map<ListenerCacheKey, ListenerRetriever> retrieverCache =
new ConcurrentHashMap<ListenerCacheKey, ListenerRetriever>(64);

// 获取发生事件的监听器列表
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);

// 优先从缓存取,如果取到则直接返回
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}

if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
// Fully synchronized building and caching of a ListenerRetriever
synchronized (this.retrievalMutex) {
retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
retriever = new ListenerRetriever(true);
// 根据监听的事件类型及触发的事件类型匹配监听器集合
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
this.retrieverCache.put(cacheKey, retriever);
return listeners;
}
}
else {
// No ListenerRetriever caching -> no synchronization necessary
return retrieveApplicationListeners(eventType, sourceType, null);
}
}

// 检索符合条件的监听器列表
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, Class<?> sourceType, ListenerRetriever retriever) {
LinkedList<ApplicationListener<?>> allListeners = new LinkedList<ApplicationListener<?>>();
Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
............... 省略部分代码 .................
for (ApplicationListener<?> listener : listeners) {
// 符合条件则放入集合中
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
retriever.applicationListeners.add(listener);
}
allListeners.add(listener);
}
}
............... 省略部分代码 .................
AnnotationAwareOrderComparator.sort(allListeners);
return allListeners;
}

// 判断监听器是否监听此事件
protected boolean supportsEvent(Class<?> listenerType, ResolvableType eventType) {
if (GenericApplicationListener.class.isAssignableFrom(listenerType) ||
SmartApplicationListener.class.isAssignableFrom(listenerType)) {
return true;
}

// 解析监听器listenerType声明监听的事件类型
ResolvableType declaredEventType = GenericApplicationListenerAdapter.resolveDeclaredEventType(listenerType);
// 如果声明的事件类型是eventType及其子类则返回true
return (declaredEventType == null || declaredEventType.isAssignableFrom(eventType));
}