Back to Javatutorial

Spring事务(五):事务的隔离级别与传播方式的处理03

docs/Spring全家桶/Spring源码分析/Spring事务/Spring事务(五):事务的隔离级别与传播方式的处理03.md

1.0.013.6 KB
Original Source

ǡĸ뼶봫ʽĴĵ 3 ƪģǼ

ᵽĿdoBegin(...)``suspend(...)봴㣨createAndHoldSavepoint(...)IJĽЩʵ֡

1. doBegin(...)µ

ķΪ DataSourceTransactionManager#doBegin£

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        // 1\. ȡݿ
        if (!txObject.hasConnectionHolder() ||
                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            // getConnection(): ȡݿӣobtainDataSource()ȡԴ
            Connection newCon = obtainDataSource().getConnection();
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }
        // ォ synchronizedWithTransaction Ϊtrue
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        // 2\. ĸ뼶

        Integer previousIsolationLevel 
                = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        // ֻ
        txObject.setReadOnly(definition.isReadOnly());

        // 3\. 
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            // رԶύҲǿ
            con.setAutoCommit(false);
        }
        // 4\. ֻ
        prepareTransactionalConnection(con, definition);
        // ļ
        txObject.getConnectionHolder().setTransactionActive(true);
        // 5\. ijʱʱ
        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // 6\. Դӵǰ߳
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(
                    obtainDataSource(), txObject.getConnectionHolder());
            }
        }
    } catch (Throwable ex) {
        // 쳣رո
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException(...);
    }
}

ϴ뻹ͦģעҲȷˣԹؼһ

1.1 ȡݿ

ݿӵĻȡǺܼ򵥵ģ£

Connection newCon = obtainDataSource().getConnection();

ʵǵ javax.sql.DataSource#getConnection()

1.2 ĸ뼶

@Transactional Уǿʹ isolation ָĸ뼶

public @interface Transactional {
    /**
     * ָĸ뼶
     */
    Isolation isolation() default Isolation.DEFAULT;
    ...
}

ָʹĬϵĸ뼶Ҳʹݿõġ

spring 뼶ķΪ DataSourceUtils#prepareConnectionForTransaction£

public static Integer prepareConnectionForTransaction(Connection con, 
        @Nullable TransactionDefinition definition) throws SQLException {
    Assert.notNull(con, "No Connection specified");
    if (definition != null && definition.isReadOnly()) {
        try {
            // Ϊֻģʽ
            con.setReadOnly(true);
        }
        catch (SQLException | RuntimeException ex) {
            ...
        }
    }

    Integer previousIsolationLevel = null;
    if (definition != null && definition.getIsolationLevel() 
            != TransactionDefinition.ISOLATION_DEFAULT) {
        int currentIsolation = con.getTransactionIsolation();
        if (currentIsolation != definition.getIsolationLevel()) {
            // õ֮ǰĸ뼶£ҪΪԭĸ뼶
            previousIsolationLevel = currentIsolation;
            // ݿĸ뼶𣬵õǣ
            // java.sql.Connection.setTransactionIsolation
            con.setTransactionIsolation(definition.getIsolationLevel());
        }
    }
    return previousIsolationLevel;
}

ã

  1. Ϊֻģʽõ java.sql.Connection#setReadOnly
  2. ø뼶𣺵õ java.sql.Connection.setTransactionIsolation

ֻģʽҲǿ @Transactional õģ

public @interface Transactional {
    /**
     * ֻ
     */
    boolean readOnly() default false;
    ...
}

1.3

ĵʱˣǰ̵ô࣬ΪһIJ񡣿Ĵ£

if (con.getAutoCommit()) {
    txObject.setMustRestoreAutoCommit(true);
    // رԶύҲǿ
    con.setAutoCommit(false);
}

ΪжԶύǷˣͽΪ falseõҲ java.sql ķ

  • ȡԶύ״̬java.sql.Connection#getAutoCommit
  • Զύ״̬java.sql.Connection#setAutoCommit

1.4 ֻ

ǰ 1.2 ĸ뼶Уͨ java.sql.Connection#setReadOnly ΪֻˣﻹһãΪ DataSourceTransactionManager#prepareTransactionalConnection

protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)
        throws SQLException {
    if (isEnforceReadOnly() && definition.isReadOnly()) {
        try (Statement stmt = con.createStatement()) {
            // ֻҪsql
            stmt.executeUpdate("SET TRANSACTION READ ONLY");
        }
    }
}

һִͨ sql SET TRANSACTION READ ONLY Ϊֻ

1.5 ijʱʱ

@Transactional עУǿʹ timeout ָijʱʱ䣺

public @interface Transactional {
    /**
     * óʱʱ
     */
    int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;
    ...
}

õijʱʱõǽ ResourceHolderSupport#setTimeoutInSeconds

public abstract class ResourceHolderSupport implements ResourceHolder {
    /**
     * ֹʱ
     */
    private Date deadline;

    /**
     * ʱ䣬λ
     */
    public void setTimeoutInSeconds(int seconds) {
        setTimeoutInMillis(seconds * 1000L);
    }

    /**
     * ʱ䣬λ
     * תΪ ֹʱ
     */
    public void setTimeoutInMillis(long millis) {
        this.deadline = new Date(System.currentTimeMillis() + millis);
    }

    /**
     * ȡֹʱ
     */
    @Nullable
    public Date getDeadline() {
        return this.deadline;
    }

    /**
     * ȡʣʱ䣬λ
     */
    public int getTimeToLiveInSeconds() {
        double diff = ((double) getTimeToLiveInMillis()) / 1000;
        int secs = (int) Math.ceil(diff);
        checkTransactionTimeout(secs <= 0);
        return secs;
    }

    /**
     * ȡʣʱ䣬λ
     */
    public long getTimeToLiveInMillis() throws TransactionTimedOutException{
        if (this.deadline == null) {
            throw new IllegalStateException("No timeout specified for this resource holder");
        }
        long timeToLive = this.deadline.getTime() - System.currentTimeMillis();
        checkTransactionTimeout(timeToLive <= 0);
        return timeToLive;
    }

    ...
}

ResourceHolderSupport άһԱ deadlineֹʱ䣩ijʱʱնתΪ deadline

ȡʣʱʱҲ deadline õصʣʱ֡

txObject.getConnectionHolder().setTimeoutInSeconds(timeout) ֻǽʱʱõ ConnectionHolder ijԱУConnectionHolder ResourceHolderSupport ࣩƺݿûɶϵݿôʱأ

ò˵ʱĿеңͨҵģʱʱ DataSourceUtils#applyTimeout УпνǾǧɽˮ

лԹܣû֪Ҫòҵʱãʹõ jdbcTemplate orm £óʱʱӦûͬ

ǿ DataSourceUtils#applyTimeout ôóʱʱģ

public static void applyTimeout(Statement stmt, @Nullable DataSource dataSource, int timeout) 
        throws SQLException {
    Assert.notNull(stmt, "No Statement specified");
    ConnectionHolder holder = null;
    if (dataSource != null) {
        holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
    }
    if (holder != null && holder.hasTimeout()) {
        // ǻȡʣijʱʱ䣬 ConnectionHolder.dateline õ
        stmt.setQueryTimeout(holder.getTimeToLiveInSeconds());
    }
    else if (timeout >= 0) {
        // jdbcTemplate Ҳòѯʱʱ
        stmt.setQueryTimeout(timeout);
    }
}

յõ java.sql.Statement#setQueryTimeout óʱʱġ

1.6 Դӵǰ߳

鴦ɺ󣬽ǰԴˣΪ TransactionSynchronizationManager#bindResource:

/**
 * resources ŵǰ߳еԴ
 * дŵΪһ MapMap  key ΪԴvalue ΪԴӦ
 */
private static final ThreadLocal<Map<Object, Object>> resources =
        new NamedThreadLocal<>("Transactional resources");

/**
 * 󶨲
 */
public static void bindResource(Object key, Object value) throws IllegalStateException {
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Assert.notNull(value, "Value must not be null");
    Map<Object, Object> map = resources.get();
    if (map == null) {
        map = new HashMap<>();
        resources.set(map);
    }
    // ԴӴŵmap
    Object oldValue = map.put(actualKey, value);
    if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
        oldValue = null;
    }
    if (oldValue != null) {
        throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
                actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    }

һIJDZȽϼ򵥵ģǽԴӷŽ resources УӶ뵱ǰ̵߳İ󶨲

2. suspend(...)

IJΪ AbstractPlatformTransactionManager#suspend£

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) 
        throws TransactionException {
    // ͬȹͬ
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            if (transaction != null) {
                // 
                suspendedResources = doSuspend(transaction);
            }
            // 
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            // ֻ״̬
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            // ø뼶
            Integer isolationLevel = TransactionSynchronizationManager
                    .getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            // 񼤻״̬
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
            // ع
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, 
                    isolationLevel, wasActive);
        }
        catch (RuntimeException | Error ex) {
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
    }
    else if (transaction != null) {
        Object suspendedResources = doSuspend(transaction);
        return new SuspendedResourcesHolder(suspendedResources);
    }
    else {
        return null;
    }
}

suspend(...) ҪľǹIJˣҲ doSuspend(transaction)÷ λ `` Уֱӿ룺

protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    // 
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

TransactionSynchronizationManager.unbindResource

/**
 * 󶨲
 */
public static Object unbindResource(Object key) throws IllegalStateException {
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    // 
    Object value = doUnbindResource(actualKey);
    if (value == null) {
        throw new IllegalStateException(...);
    }
    return value;
}

/**
 * 󶨲
 */
private static Object doUnbindResource(Object actualKey) {
    Map<Object, Object> map = resources.get();
    if (map == null) {
        return null;
    }
    // ƳԴ
    Object value = map.remove(actualKey);
    if (map.isEmpty()) {
        resources.remove();
    }
    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
        value = null;
    }
    return value;
}

ڿʱǽԴӰ󶨵ǰ̣߳ʱ ǽԴӽ뵱ǰ̵߳İ󶨹ϵ

3. createAndHoldSavepoint(...)

Ĵ AbstractTransactionStatus#createAndHoldSavepoint д£

    // 
    private Object savepoint;

    // 
    public void createAndHoldSavepoint() throws TransactionException {
        setSavepoint(getSavepointManager().createSavepoint());
    }

    protected void setSavepoint(@Nullable Object savepoint) {
        this.savepoint = savepoint;
    }

ţֻ``ı棨ҲǸֵ AbstractTransactionStatus ijԱҪ˽ⱣĴÿ getSavepointManager().createSavepoint()JdbcTransactionObjectSupport#createSavepoint

public Object createSavepoint() throws TransactionException {
    ConnectionHolder conHolder = getConnectionHolderForSavepoint();
    try {
        if (!conHolder.supportsSavepoints()) {
            throw new NestedTransactionNotSupportedException(...);
        }
        if (conHolder.isRollbackOnly()) {
            throw new CannotCreateTransactionException(...);
        }
        // 
        return conHolder.createSavepoint();
    }
    catch (SQLException ex) {
        throw new CannotCreateTransactionException("Could not create JDBC savepoint", ex);
    }
}

õ ConnectionHolder#createSavepoint ԭ ConnectionHolder дİ

// ǰ׺
public static final String SAVEPOINT_NAME_PREFIX = "SAVEPOINT_";

// 
private int savepointCounter = 0;

public Savepoint createSavepoint() throws SQLException {
    this.savepointCounter++;
    // ﴴ㣬õ java.sql.Connection#setSavepoint(java.lang.String) 
    return getConnection().setSavepoint(SAVEPOINT_NAME_PREFIX + this.savepointCounter);
}

ǰ׺Ϊ SAVEPOINT_ÿһ㣬savepointCounter ļͼ 1ձΪ SAVEPOINT_1``SAVEPOINT_2...

յõķ java.sql.Connection#setSavepoint(java.lang.String)Ȼ jdk ṩķǻᷢĴ󲿷ֲ spring jdk ķװ

ˣĵķȵˣύعع㡢ָȣƪ¼


ԭӣhttps://my.oschina.net/funcy/blog/4947826 ߸ˮƽд֮ӭָԭףҵתϵ߻Ȩҵתע