Back to Javatutorial

Spring事务(四):事务的隔离级别与传播方式的处理02

docs/Spring全家桶/Spring源码分析/Spring事务/Spring事务(四):事务的隔离级别与传播方式的处理02.md

1.0.018.1 KB
Original Source

ǡĸ뼶봫ʽĴĵ 2 ƪģǼ

3.5 ȡϢ

Ϣ TransactionAspectSupport#createTransactionIfNecessary лȡdzҪǰܸ뼶𡢴ʽﴦ÷£

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    // δָƣ򽫷
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // ȡ״̬ǰû񣬿ܻᴴ
            status = tm.getTransaction(txAttr);
        }
    }
    // ׼ϢǽǰõϢװ TransactionInfo
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

Ҫ

  1. ȡ״̬
  2. ׼Ϣ

»ȡ״̬̣Ϊ AbstractPlatformTransactionManager#getTransaction

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
        throws TransactionException {

    TransactionDefinition def = (definition != null ? 
            definition : TransactionDefinition.withDefaults());

    // ȡ
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    // Ƿ񣬴򷵻
    if (isExistingTransaction(transaction)) {
        return handleExistingTransaction(def, transaction, debugEnabled);
    }
    // еǰû

    // 鳬ʱʱǷ
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }

    // PROPAGATION_MANDATORYУûֱ쳣
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(...);
    }
    // ǰ񣬴
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // suspend(...) nullͬͬ񣬷ʲôҲ
        SuspendedResourcesHolder suspendedResources = suspend(null);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 
            DefaultTransactionStatus status = newTransactionStatus(
                    def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 
            doBegin(transaction, def);
            //  TransactionSynchronizationManager 
            prepareSynchronization(status, def);
            return status;
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

е㳤

1. doGetTransaction(...)ȡ

ȡķΪ DataSourceTransactionManager#doGetTransaction

protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    // ȡϢobtainDataSource()ȡԴ
    ConnectionHolder conHolder = 
            (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

  1. ȡԴ
  2. ȡ ConnectionHolder

Դλȡģ

public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
        implements ResourceTransactionManager, InitializingBean {

    @Nullable
    private DataSource dataSource;

    /**
     * 췽Դ
     */
    public DataSourceTransactionManager(DataSource dataSource) {
        this();
        setDataSource(dataSource);
        afterPropertiesSet();
    }

    /**
     * Դ
     */
    public void setDataSource(@Nullable DataSource dataSource) {
        if (dataSource instanceof TransactionAwareDataSourceProxy) {
            this.dataSource = ((TransactionAwareDataSourceProxy) dataSource)
                    .getTargetDataSource();
        }
        else {
            this.dataSource = dataSource;
        }
    }

    @Nullable
    public DataSource getDataSource() {
        return this.dataSource;
    }

    /**
     * ȡԴ
     */
    protected DataSource obtainDataSource() {
        DataSource dataSource = getDataSource();
        Assert.state(dataSource != null, "No DataSource set");
        return dataSource;
    }

    ...
}

obtainDataSource() ʵǵ getDataSource() ص dataSource Ա dataSource DataSourceTransactionManager Ĺ췽ﴫģˣõĽǣȡԴ DataSourceTransactionManager ʱģ

@Configuration
public class TxDemo03Config {

    /**
     * Դ
     * @return
     * @throws Exception
     */
    @Bean
    public DataSource dataSource() throws Exception {
        Driver driver = new com.mysql.jdbc.Driver();
        String url = "jdbc:mysql://localhost:3306/test";
        String username = "root";
        String password = "123";
        return new SimpleDriverDataSource(driver, url, username, password);
    }

    /**
     * 
     * @param dataSource
     * @return
     */
    @Bean
    public DataSourceTransactionManager transactionManager(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

    ...

}

Դ SimpleDriverDataSource.

ConnectionHolder Ļȡ÷Ϊ TransactionSynchronizationManager#getResource £

//  ThreadLocal   ConnectionHolder Ϣ
private static final ThreadLocal<Map<Object, Object>> resources =
        new NamedThreadLocal<>("Transactional resources");

/**
 * ȡ ConnectionHolder
 */
public static Object getResource(Object key) {
    // װ´ key
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    // ȡϢ
    Object value = doGetResource(actualKey);
    return value;
}

/**
 * Ļȡ
 */
private static Object doGetResource(Object actualKey) {
    // ThreadLocalлȡ
    Map<Object, Object> map = resources.get();
    if (map == null) {
        return null;
    }
    Object value = map.get(actualKey);
    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
        map.remove(actualKey);
        if (map.isEmpty()) {
            resources.remove();
        }
        value = null;
    }
    return value;
}

ӴTransactionSynchronizationManager һ ThreadLocal ʵдһ Map Map key Ϊ datasource``value Ϊ ConnectionHolder.

ô ConnectionHolder ʲôأԼ򵥵ؽΪ Connection(ݿ) İװ࣬ҪԾ Connection ˣ

ˣͰ doGetTransaction(xxx) ˣصĽ

2. isExistingTransaction(...)Ƿ

ȡ DataSourceTransactionObject 󣬽жǷˣжϷ DataSourceTransactionManager#isExistingTransaction£

protected boolean isExistingTransaction(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    return (txObject.hasConnectionHolder() 
            && txObject.getConnectionHolder().isTransactionActive());
}

ConnectionHolder һԱ transactionActiveǰ ConnectionHolder Ƿڼ״̬isExistingTransaction(...) ҪǸжϵǰǷġ

3. ѴڵhandleExistingTransaction(...)

ǰspring ôģѴķΪ AbstractPlatformTransactionManager#handleExistingTransaction£

private TransactionStatus handleExistingTransaction(TransactionDefinition definition, 
        Object transaction, boolean debugEnabled) throws TransactionException {
    // ʽΪʹʱ׳쳣
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }
    // ʽΪ֧ʱǰȻ״̬
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        // 1\. suspend()
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    // ʽΪµСʱǰȻµ
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        // 
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true,
                    newSynchronization, debugEnabled, suspendedResources);
            // 2\. doBegin()µ
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    // ʽΪǶִСʱ ı
    // 񣬽ע㣬γǶ
    // Ƕе쳣Ӱ쵽񱣴֮ǰIJ
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(...);
        }
        // 3\. createAndHoldSavepoint(...)㣬عʱֻعñ
        if (useSavepointForNestedTransaction()) {
            DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction,
                    false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            // ֱ֧㣬µ
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    }
    if (isValidateExistingTransaction()) {
        // ֤
        ...
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, 
            newSynchronization, debugEnabled, null);
}

Կʹĸ뼶߼صĴѾעͣͲ˵ˣмҪر

  1. suspend()
  2. doBegin()µ
  3. createAndHoldSavepoint(...)㣬عʱֻعñ

⼸ͳһ

4. AbstractPlatformTransactionManager#getTransaction

ٻص AbstractPlatformTransactionManager#getTransaction ʣµ̣

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
        throws TransactionException {

    // ǰѾˣʡ
    ...

    // еǰû

    // 鳬ʱʱǷ
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }

    // PROPAGATION_MANDATORYУûֱ쳣
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(...);
    }
    // ǰ񣬴
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // suspend(...) nullͬͬ񣬷ʲôҲ
        SuspendedResourcesHolder suspendedResources = suspend(null);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 
            DefaultTransactionStatus status = newTransactionStatus(
                    def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 
            doBegin(transaction, def);
            //  TransactionSynchronizationManager 
            prepareSynchronization(status, def);
            return status;
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

handleExistingTransaction(...) ĹʱЩҪôgetTransaction(...) ²ֵĹǣʱЩҪôԿȻ suspend(...)``doBegin(...) ȷЩһҲͳһ

5. ׼ؽprepareTransactionStatus(...)

handleExistingTransaction(...) getTransaction(...) ڴؽʱʹ prepareTransactionStatus(...)

// `handleExistingTransaction(...)`
return prepareTransactionStatus(definition, transaction, false, 
            newSynchronization, debugEnabled, null);

// `getTransaction(...)`
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);

ɶ AbstractPlatformTransactionManager#prepareTransactionStatus

protected final DefaultTransactionStatus prepareTransactionStatus(
        TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
        boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

    // һ DefaultTransactionStatus 
    DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
    // ׼ Synchronization
    prepareSynchronization(status, definition);
    return status;
}

/**
 *һ TransactionStatus ʵ
 */
protected DefaultTransactionStatus newTransactionStatus(
        TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
        boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

    boolean actualNewSynchronization = newSynchronization &&
            !TransactionSynchronizationManager.isSynchronizationActive();
    //  DefaultTransactionStatus Ĺ췽
    return new DefaultTransactionStatus(
            transaction, newTransaction, actualNewSynchronization,
            definition.isReadOnly(), debug, suspendedResources);
}

ҪΪ˴ DefaultTransactionStatus һн

6. ׼ϢTransactionAspectSupport#prepareTransactionInfo

ص TransactionAspectSupport#createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    ...
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // ȡ״̬ǰû񣬿ܻᴴ
            status = tm.getTransaction(txAttr);
        }
    }
    // ׼ϢǽǰõϢװ TransactionInfo
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

ǰôֻ࣬ǵõ TransactionStatusٽ׼Ϣķ prepareTransactionInfo(...)

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, String joinpointIdentification,
        @Nullable TransactionStatus status) {
    TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);

    if (txAttr != null) {
        txInfo.newTransactionStatus(status);
    }

    // ʡlogĴӡ
    ...

    // ̰߳
    txInfo.bindToThread();
    return txInfo;
}

ţͬ prepareTransactionStatus(...) ƣҲǴһ TransactionInfo 󣬲ҽ TransactionInfo 뵱ǰ̰߳󶨣󶨵Ĵ£

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {

    // ŵǰʹõϢ
    private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
            new NamedThreadLocal<>("Current aspect-driven transaction");

    // ΪɵϢ
    protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
        if (txInfo != null) {
            txInfo.restoreThreadLocalStatus();
        }
    }

    /**
     * TransactionInfo: Ϣ
     */
    protected static final class TransactionInfo {

        // ǰ״̬
        @Nullable
        private TransactionStatus transactionStatus;

        // ɵϢҲǹϢ
        @Nullable
        private TransactionInfo oldTransactionInfo;

        /**
         * Ϣ󶨵ǰ߳
         */
        private void bindToThread() {
            // õɵϢ
            this.oldTransactionInfo = transactionInfoHolder.get();
            // óµϢ
            transactionInfoHolder.set(this);
        }

        /**
         * ɺ󣬻ὫɵϢ󶨵ǰ߳
         */
        private void restoreThreadLocalStatus() {
            // ΪɵϢ
            transactionInfoHolder.set(this.oldTransactionInfo);
        }

        ...
    }

}

TransactionAspectSupport һ ThreadLocalŵǰ TransactionInfo 󣬽̰߳ʱõɵϢ TransactionInfo ijԱ oldTransactionInfo УȻµ TransactionInfo ThreadLocal Уִɺ󣬻 TransactionInfo ijԱ oldTransactionInfo õɵϢٽɵϢ ThreadLocal У " - - " л.

һõ TransactionInfo £

TransactionInfo ĽṹһЩ˵

  • TransactionAspectSupport.TransactionInfo TransactionAspectSupport һڲ࣬װһЩϢ
  • transactionManager: õ DataSourceTransactionManager
  • transactionAttribute: ֵ @Transactional עֵ
  • joinpointIdentification: ȫ޶ʽΪ"͡"
  • transactionStatus: ϿǼ¼״̬ʵ󲻽¼״̬ش£
    • complete: ״̬
    • connectionHolder: ǰеݿ
    • suspendedResources: ݿӣҪָʱܹöõݿ
  • oldTransactionInfo: һҲǹ񣩵Ϣִ굱ǰ󣬻ָһִ

һСдôˣľȷˣsuspend(...)``doBegin(...) ȷƪٷɡ


ԭӣhttps://my.oschina.net/funcy/blog/4947799 ߸ˮƽд֮ӭָԭףҵתϵ߻Ȩҵתע