spring事务

实现思路

对于一个标注了@Transaction的方法,生成aop代理,代理内部逻辑是先开启一个事务,再执行原方法的逻辑,如果入到异常,就把事务回滚。

需要实现以下步骤:
1. 如何和spring结合,对特定类实现代理?
1. 代理类中需要实现什么逻辑?
1. 怎么实现事务传播特性?超时?

spring aop代理实现原理

bean扩展

要明确的是肯定对bean动了手脚,那就要从spring里bean的扩展点入手。

spring提供了许多扩展点,对Bean进行修改的扩展应该使用——BeanPostProcessor,它有两个方法

//在afterPropertiesSet 和 init-method之前,此时它属性的{...}值已经被注入
//一般用来注入属性
Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;

//在afterPropertiesSet 和 init-method之后,此时它属性的{...}值已经被注入
//一般用来做代理
Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;

有了上述方法,可以推断出bean的创建流程:
bean实例化 -> 自动注入已执行 -> postProcessBeforeInitialization -> init-method -> postProcessAfterInitialization

spring aop并没有直接通过BeanPostProcessor实现代理逻辑,而是间接的
AbstractAutoProxyCreator -> SmartInstantiationAwareBeanPostProcessor -> InstantiationAwareBeanPostProcessor -> BeanPostProcessor
所以我们只能从顶层一层一层往下看每一个接口多了什么特性。


第二层是InstantiationAwareBeanPostProcessor,它是为了对bean实例化这步进行细分而定义的,主要是为了一些特殊场景,比如想不执行创建代理类的逻辑,一般是内部框架做的,非必要不提倡实现该接口。方法如下

//在对象创建前执行
Object postProcessBeforeInstantiation(Class<?> beanClass, String beanName) throws BeansException;

//在对象创建后,属性和自动注入之前执行,你可以在这实现自己的字段注入
//返回truefalse决定是否需要注入属性
boolean postProcessAfterInstantiation(Object bean, String beanName) throws BeansException;

//在键值对被注入前执行,通常用来进行required的检测
PropertyValues postProcessPropertyValues(
            PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException;

可以推断出bean的创建流程:
postProcessBeforeInstantiation -> bean实例化 -> postProcessAfterInstantiation -> postProcessPropertyValues -> 自动注入已执行 -> postProcessBeforeInitialization -> init-method -> postProcessAfterInitialization


第三层是SmartInstantiationAwareBeanPostProcessor,主要加了几个对实例化出来的bean到底是什么类型的预测方法,也是内部框架使用的。

//预测postProcessBeforeInstantiation最终返回什么类型,此时应未被创建出来
Class<?> predictBeanType(Class<?> beanClass, String beanName) throws BeansException;

//判断应该用哪个构造函数去实例化bean
Constructor<?>[] determineCandidateConstructors(Class<?> beanClass, String beanName) throws BeansException;

//用来获得早期bean引用,主要解决循环依赖问题
Object getEarlyBeanReference(Object bean, String beanName) throws BeansException;

AbstractAutoProxyCreator对postProcessBeforeInstantiation和postProcessAfterInitialization都进行了实现,这两方法时机不同,在前者实现是为了拥有TargetSource时省去初始化bean的开销,内部核心代码还是一样的。

如果缓存找不到就——
//获得该对象的方法拦截器链,这个链是由一系列aop的advice转变而来的
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);

//使用ProxyFactory创建代理(如果是isProxyTargetClass使用ObjenesisCglibAopProxy,如果目标类是接口或是Proxy子类,用JdkDynamicAopProxy)
//在Cglib代理中
//会将拦截器链specificInterceptors和commonInterceptors组合并包装生成Advisor[],
//最终在生成Callback[]传给cglib
Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());

事务逻辑如何变成Advisor[]的

首先看
<tx:annotation-driven transaction-manager="transactionManager" />
其配置

http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd

在spring.schemas找到xml配置http\://www.springframework.org/schema/tx/spring-tx.xsd=org/springframework/transaction/config/spring-tx-4.3.xsd
在spring.handlers找到处理器配置http\://www.springframework.org/schema/tx=org.springframework.transaction.config.TxNamespaceHandler

TxNamespaceHandler会直接或间接注册一些AnnotationParser,包括jta的Transactional和spring自己的Transactional处理器(JtaTransactionAnnotationParser),最终是为了生成MethodInterceptor;

事务对底层实现

MethodInterceptor的方法调用了TransactionAspectSupport的invokeWithinTransaction方法,关键步骤——

            final PlatformTransactionManager tm = determineTransactionManager(txAttr);
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);//判断是否有必要创建事务
            Object retVal = null;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                retVal = invocation.proceedWithInvocation();//执行拦截器链,最终会执行到目标方法
            }
            catch (Throwable ex) {
                // target invocation exception
                completeTransactionAfterThrowing(txInfo, ex);//当抛出异常后,完成这个事务,提交或者回滚,并抛出这个异常
                throw ex;
            }
            finally {
                cleanupTransactionInfo(txInfo);
            }
            commitTransactionAfterReturning(txInfo);//提交事务
            return retVal;
  1. determineTransactionManager
  2. createTransactionIfNecessary(!)
  3. proceedWithInvocation
  4. completeTransactionAfterThrowing
  5. commitTransactionAfterReturning

createTransactionIfNecessary核心代码

        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                status = tm.getTransaction(txAttr);//核心方法,具体看AbstractPlatformTransactionManager实现
            }
            else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                            "] because no transaction manager has been configured");
                }
            }
        }
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
  1. getTransaction(!)
  2. prepareTransactionInfo(绑定到threadlocal)

其中AbstractPlatformTransactionManager的getTransaction实现逻辑为

            Object transaction = doGetTransaction();


        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }

以下流程来自于DatasourceTransactionManager源码
1. doGetTransaction 获得当前的事务,底层threadLocal.get(key)->ConnectionHolder
key的话是一个dataSource,也就是在当前线程中,获取特定数据源对应的ConnectionHolder。
事务传播指的就是在同一个线程中先后处理标了@Transaction的方法时怎么处理事务冲突问题,所以用threadLocal实现。

接下来是一个分支,通过判断isExistingTransaction(transaction),其实就是判断ConnnectionHolder是否不为空且isTransactionActive
若存在:handleExistingTransaction
1. 若为PROPAGATION_NEVER,抛异常
1. 若为PROPAGATION_NOT_SUPPORTED,挂起,
1. 若为PROPAGATION_REQUIRES_NEW,挂起,try里面new个新的begin,catch里resume老的
1. 若为PROPAGATION_NESTED,若支持savePoint机制,会创建savePoint,否则用嵌套的begin&commit实现

若不存在:
1. 若为PROPAGATION_MANDATORY,抛异常
1. 若为PROPAGATION_REQUIRED or REQUIRES_NEW or NESTED,挂起一个null,try里面new个新的begin,catch里resume老的

注意:
其中begin会创建一个connection,用connectionHolder包装,然后绑定到threadLocal;
suspend会将当前threadlocal中map的key删除,然后返回该对象;
resume与suspend对应,将挂起的connectionHolder重新放入threadlocal的map的key里。

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注