实现思路
对于一个标注了@Transaction的方法,生成aop代理,代理内部逻辑是先开启一个事务,再执行原方法的逻辑,如果入到异常,就把事务回滚。
需要实现以下步骤:
1. 如何和spring结合,对特定类实现代理?
1. 代理类中需要实现什么逻辑?
1. 怎么实现事务传播特性?超时?
spring aop代理实现原理
bean扩展
要明确的是肯定对bean动了手脚,那就要从spring里bean的扩展点入手。
spring提供了许多扩展点,对Bean进行修改的扩展应该使用——BeanPostProcessor,它有两个方法
//在afterPropertiesSet 和 init-method之前,此时它属性的{...}值已经被注入
//一般用来注入属性
Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;
//在afterPropertiesSet 和 init-method之后,此时它属性的{...}值已经被注入
//一般用来做代理
Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
有了上述方法,可以推断出bean的创建流程:
bean实例化 -> 自动注入已执行 -> postProcessBeforeInitialization -> init-method -> postProcessAfterInitialization
spring aop并没有直接通过BeanPostProcessor实现代理逻辑,而是间接的
AbstractAutoProxyCreator -> SmartInstantiationAwareBeanPostProcessor -> InstantiationAwareBeanPostProcessor -> BeanPostProcessor
。
所以我们只能从顶层一层一层往下看每一个接口多了什么特性。
第二层是InstantiationAwareBeanPostProcessor,它是为了对bean实例化这步进行细分而定义的,主要是为了一些特殊场景,比如想不执行创建代理类的逻辑,一般是内部框架做的,非必要不提倡实现该接口。方法如下
//在对象创建前执行
Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException;
//在对象创建后,属性和自动注入之前执行,你可以在这实现自己的字段注入
//返回truefalse决定是否需要注入属性
boolean postProcessAfterInstantiation(Object bean, String beanName) throws BeansException;
//在键值对被注入前执行,通常用来进行required的检测
PropertyValues postProcessPropertyValues(
PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException;
可以推断出bean的创建流程:
postProcessBeforeInstantiation -> bean实例化 -> postProcessAfterInstantiation -> postProcessPropertyValues -> 自动注入已执行 -> postProcessBeforeInitialization -> init-method -> postProcessAfterInitialization
第三层是SmartInstantiationAwareBeanPostProcessor,主要加了几个对实例化出来的bean到底是什么类型的预测方法,也是内部框架使用的。
//预测postProcessBeforeInstantiation最终返回什么类型,此时应未被创建出来
Class<?> predictBeanType(Class<?> beanClass, String beanName) throws BeansException;
//判断应该用哪个构造函数去实例化bean
Constructor<?>[] determineCandidateConstructors(Class<?> beanClass, String beanName) throws BeansException;
//用来获得早期bean引用,主要解决循环依赖问题
Object getEarlyBeanReference(Object bean, String beanName) throws BeansException;
AbstractAutoProxyCreator对postProcessBeforeInstantiation和postProcessAfterInitialization都进行了实现,这两方法时机不同,在前者实现是为了拥有TargetSource时省去初始化bean的开销,内部核心代码还是一样的。
如果缓存找不到就——
//获得该对象的方法拦截器链,这个链是由一系列aop的advice转变而来的
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
//使用ProxyFactory创建代理(如果是isProxyTargetClass使用ObjenesisCglibAopProxy,如果目标类是接口或是Proxy子类,用JdkDynamicAopProxy)
//在Cglib代理中
//会将拦截器链specificInterceptors和commonInterceptors组合并包装生成Advisor[],
//最终在生成Callback[]传给cglib
Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());
事务逻辑如何变成Advisor[]的
首先看
<tx:annotation-driven transaction-manager="transactionManager" />
其配置
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
在spring.schemas找到xml配置http\://www.springframework.org/schema/tx/spring-tx.xsd=org/springframework/transaction/config/spring-tx-4.3.xsd
在spring.handlers找到处理器配置http\://www.springframework.org/schema/tx=org.springframework.transaction.config.TxNamespaceHandler
TxNamespaceHandler会直接或间接注册一些AnnotationParser,包括jta的Transactional和spring自己的Transactional处理器(JtaTransactionAnnotationParser),最终是为了生成MethodInterceptor;
事务对底层实现
MethodInterceptor的方法调用了TransactionAspectSupport的invokeWithinTransaction方法,关键步骤——
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);//判断是否有必要创建事务
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();//执行拦截器链,最终会执行到目标方法
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);//当抛出异常后,完成这个事务,提交或者回滚,并抛出这个异常
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
commitTransactionAfterReturning(txInfo);//提交事务
return retVal;
- determineTransactionManager
- createTransactionIfNecessary(!)
- proceedWithInvocation
- completeTransactionAfterThrowing
- commitTransactionAfterReturning
createTransactionIfNecessary核心代码
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);//核心方法,具体看AbstractPlatformTransactionManager实现
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
- getTransaction(!)
- prepareTransactionInfo(绑定到threadlocal)
其中AbstractPlatformTransactionManager的getTransaction实现逻辑为
Object transaction = doGetTransaction();
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
以下流程来自于DatasourceTransactionManager源码
1. doGetTransaction 获得当前的事务,底层threadLocal.get(key)->ConnectionHolder
key的话是一个dataSource,也就是在当前线程中,获取特定数据源对应的ConnectionHolder。
事务传播指的就是在同一个线程中先后处理标了@Transaction的方法时怎么处理事务冲突问题,所以用threadLocal实现。
接下来是一个分支,通过判断isExistingTransaction(transaction),其实就是判断ConnnectionHolder是否不为空且isTransactionActive
若存在:handleExistingTransaction
1. 若为PROPAGATION_NEVER,抛异常
1. 若为PROPAGATION_NOT_SUPPORTED,挂起,
1. 若为PROPAGATION_REQUIRES_NEW,挂起,try里面new个新的begin,catch里resume老的
1. 若为PROPAGATION_NESTED,若支持savePoint机制,会创建savePoint,否则用嵌套的begin&commit实现
若不存在:
1. 若为PROPAGATION_MANDATORY,抛异常
1. 若为PROPAGATION_REQUIRED or REQUIRES_NEW or NESTED,挂起一个null,try里面new个新的begin,catch里resume老的
注意:
其中begin会创建一个connection,用connectionHolder包装,然后绑定到threadLocal;
suspend会将当前threadlocal中map的key删除,然后返回该对象;
resume与suspend对应,将挂起的connectionHolder重新放入threadlocal的map的key里。