前言
通常我们都是在讨论基于MQ的分布式事件驱动,今天我们看下Spring是如何实现进程内的事件驱动,事件驱动是指某个事情的发生,然后向队列中广播该事件,其他关心此类事件的,则自动进行回调处理。
进程内事件驱动
运用到的设计模式:观察者模式。
- 基于disruptor开源高性能队列;
- 基于spring自带的事件驱动;
类继承体系
主要涉及到事件广播器,事件,事件监听器。在JDK中,对于事件驱动,已经为我们提供一些基础的规范类,规范的写法是自定义的事件类型都需要继承自EventObject,自定义的事件监听器都需要继承自EventListener。
事件基类:java.util.EventObjectpublic class EventObject implements java.io.Serializable {
private static final long serialVersionUID = 5516075349620653480L;
protected transient Object source;
public EventObject(Object source) {
if (source == null)
throw new IllegalArgumentException("null source");
this.source = source;
}
public Object getSource() {
return source;
}
}
事件监听器接口:java.util.EventListenerpublic interface EventListener {
}
在spring中,完成事件驱动的最核心的三个基类是:
- ApplicationEvent 所有spring事件的基类;
- ApplicationListener 所有基于spring监听器的基类;
- ApplicationEventMulticaster 事件广播器接口,负责广播事件;
事件继承体系
// spring 所有事件的基类 |
监听器继承体系
// spring所有的监听器基类 |
事件广播器
public interface ApplicationEventMulticaster { |
测试用例
要求:在spring容器初始化完成之后,打印初始化完成日志。既然要在spring容器初始化完成之后,打印日志;那么我们需要监听ContextRefreshedEvent事件,因为此事件是在spring容器refresh完成时调用。
自定义监听器
/** |
实现原理浅析
AbstractApplicationContext 抽象类:public abstract class AbstractApplicationContext extends DefaultResourceLoader
implements ConfigurableApplicationContext, DisposableBean {
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// Prepare this context for refreshing.
prepareRefresh();
// Tell the subclass to refresh the internal bean factory.
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
// Prepare the bean factory for use in this context.
prepareBeanFactory(beanFactory);
try {
// Allows post-processing of the bean factory in context subclasses.
postProcessBeanFactory(beanFactory);
// Invoke factory processors registered as beans in the context.
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
registerBeanPostProcessors(beanFactory);
// Initialize message source for this context.
initMessageSource();
// Initialize event multicaster for this context.
initApplicationEventMulticaster();
// Initialize other special beans in specific context subclasses.
onRefresh();
// 遍历获取spring容器中继承自ApplicationListener的所有监听器
registerListeners();
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);
// 最后一步: 发布ContextRefreshedEvent事件
finishRefresh();
} catch (BeansException ex) {
..... 省略部分代码 .....
} finally{
..... 省略部分代码 .....
}
}
}
// 获取spring容器中继承自ApplicationListener的所有监听器
protected void registerListeners() {
// Register statically specified listeners first.
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
// 扫描容器,获取ApplicationListener及其子类型的Bean监听器名称
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// Publish early application events now that we finally have a multicaster...
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
// 发布spring上下文刷新完成事件
protected void finishRefresh() {
// Initialize lifecycle processor for this context.
initLifecycleProcessor();
// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();
// 向容器中发布ContextRefreshedEvent事件
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
LiveBeansView.registerApplicationContext(this);
}
}
```
SimpleApplicationEventMulticaster 是spring容器默认使用的事件广播器,具体实现如下:
```java
// spring事件广播器
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
private Executor taskExecutor;
private ErrorHandler errorHandler;
public void multicastEvent(ApplicationEvent event) {
multicastEvent(event, resolveDefaultEventType(event));
}
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
// 解析事件类型
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 根据事件类型,匹配到所有监听该事件及其子事件类型的监听器
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
// 异步回调监听器
executor.execute(new Runnable() {
public void run() {
invokeListener(listener, event);
}
});
}
else {
// 同步回调监听器
invokeListener(listener, event);
}
}
}
private ResolvableType resolveDefaultEventType(ApplicationEvent event) {
return ResolvableType.forInstance(event);
}
"unchecked", "rawtypes"}) ({
protected void invokeListener(ApplicationListener listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
// 执行回调监听器
listener.onApplicationEvent(event);
} catch (Throwable err) {
// 回调报错后的异常处理
errorHandler.handleError(err);
}
} else {
try {
listener.onApplicationEvent(event);
} catch (ClassCastException ex) {
...... 省略部分代码 ......
}
}
}
}
AbstractApplicationEventMulticaster 事件广播器抽象类:public abstract class AbstractApplicationEventMulticaster
implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {
// 事件类型和监听该事件的监听器集合映射关系缓存
final Map<ListenerCacheKey, ListenerRetriever> retrieverCache =
new ConcurrentHashMap<ListenerCacheKey, ListenerRetriever>(64);
// 获取发生事件的监听器列表
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// 优先从缓存取,如果取到则直接返回
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
// Fully synchronized building and caching of a ListenerRetriever
synchronized (this.retrievalMutex) {
retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
retriever = new ListenerRetriever(true);
// 根据监听的事件类型及触发的事件类型匹配监听器集合
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
this.retrieverCache.put(cacheKey, retriever);
return listeners;
}
}
else {
// No ListenerRetriever caching -> no synchronization necessary
return retrieveApplicationListeners(eventType, sourceType, null);
}
}
// 检索符合条件的监听器列表
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, Class<?> sourceType, ListenerRetriever retriever) {
LinkedList<ApplicationListener<?>> allListeners = new LinkedList<ApplicationListener<?>>();
Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
............... 省略部分代码 .................
for (ApplicationListener<?> listener : listeners) {
// 符合条件则放入集合中
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
retriever.applicationListeners.add(listener);
}
allListeners.add(listener);
}
}
............... 省略部分代码 .................
AnnotationAwareOrderComparator.sort(allListeners);
return allListeners;
}
// 判断监听器是否监听此事件
protected boolean supportsEvent(Class<?> listenerType, ResolvableType eventType) {
if (GenericApplicationListener.class.isAssignableFrom(listenerType) ||
SmartApplicationListener.class.isAssignableFrom(listenerType)) {
return true;
}
// 解析监听器listenerType声明监听的事件类型
ResolvableType declaredEventType = GenericApplicationListenerAdapter.resolveDeclaredEventType(listenerType);
// 如果声明的事件类型是eventType及其子类则返回true
return (declaredEventType == null || declaredEventType.isAssignableFrom(eventType));
}