docs/Spring全家桶/Spring源码分析/Spring事务/Spring事务(四):事务的隔离级别与传播方式的处理02.md
ǡĸ뼶봫ʽĴĵ 2 ƪģǼ
Ϣ TransactionAspectSupport#createTransactionIfNecessary лȡdzҪǰܸ뼶𡢴ʽﴦ÷£
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// δָƣ
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// ȡ״̬ǰûܻᴴ
status = tm.getTransaction(txAttr);
}
}
// ϢǽǰõϢװ TransactionInfo
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
Ҫ
»ȡ״̬̣Ϊ AbstractPlatformTransactionManager#getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
TransactionDefinition def = (definition != null ?
definition : TransactionDefinition.withDefaults());
// ȡ
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// Ƿ
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(def, transaction, debugEnabled);
}
// еǰû
// 鳬ʱʱǷ
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// PROPAGATION_MANDATORYУûֱ쳣
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(...);
}
// ǰ
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// suspend(...) nullͬͬʲôҲ
SuspendedResourcesHolder suspendedResources = suspend(null);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//
DefaultTransactionStatus status = newTransactionStatus(
def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//
doBegin(transaction, def);
// TransactionSynchronizationManager
prepareSynchronization(status, def);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
е㳤
doGetTransaction(...)ȡȡķΪ DataSourceTransactionManager#doGetTransaction
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// ȡϢobtainDataSource()ȡԴ
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
ConnectionHolderԴλȡģ
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
@Nullable
private DataSource dataSource;
/**
* 췽Դ
*/
public DataSourceTransactionManager(DataSource dataSource) {
this();
setDataSource(dataSource);
afterPropertiesSet();
}
/**
* Դ
*/
public void setDataSource(@Nullable DataSource dataSource) {
if (dataSource instanceof TransactionAwareDataSourceProxy) {
this.dataSource = ((TransactionAwareDataSourceProxy) dataSource)
.getTargetDataSource();
}
else {
this.dataSource = dataSource;
}
}
@Nullable
public DataSource getDataSource() {
return this.dataSource;
}
/**
* ȡԴ
*/
protected DataSource obtainDataSource() {
DataSource dataSource = getDataSource();
Assert.state(dataSource != null, "No DataSource set");
return dataSource;
}
...
}
obtainDataSource() ʵǵ getDataSource() ص dataSource Ա dataSource DataSourceTransactionManager Ĺ췽ﴫģˣõĽǣȡԴ DataSourceTransactionManager ʱģ
@Configuration
public class TxDemo03Config {
/**
* Դ
* @return
* @throws Exception
*/
@Bean
public DataSource dataSource() throws Exception {
Driver driver = new com.mysql.jdbc.Driver();
String url = "jdbc:mysql://localhost:3306/test";
String username = "root";
String password = "123";
return new SimpleDriverDataSource(driver, url, username, password);
}
/**
*
* @param dataSource
* @return
*/
@Bean
public DataSourceTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
...
}
Դ SimpleDriverDataSource.
ConnectionHolder Ļȡ÷Ϊ TransactionSynchronizationManager#getResource £
// ThreadLocal ConnectionHolder Ϣ
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
/**
* ȡ ConnectionHolder
*/
public static Object getResource(Object key) {
// װ´ key
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
// ȡϢ
Object value = doGetResource(actualKey);
return value;
}
/**
* Ļȡ
*/
private static Object doGetResource(Object actualKey) {
// ThreadLocalлȡ
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
ӴTransactionSynchronizationManager һ ThreadLocal ʵдһ Map Map key Ϊ datasource``value Ϊ ConnectionHolder.
ô ConnectionHolder ʲôأԼؽΪ Connection(ݿ) İװ࣬ҪԾ Connection ˣ
ˣͰ doGetTransaction(xxx) ˣصĽ
isExistingTransaction(...)Ƿȡ DataSourceTransactionObject жǷˣжϷ DataSourceTransactionManager#isExistingTransaction£
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder()
&& txObject.getConnectionHolder().isTransactionActive());
}
ConnectionHolder һԱ transactionActiveǰ ConnectionHolder Ƿڼ״̬isExistingTransaction(...) ҪǸжϵǰǷġ
handleExistingTransaction(...)ǰspring ôģѴķΪ AbstractPlatformTransactionManager#handleExistingTransaction£
private TransactionStatus handleExistingTransaction(TransactionDefinition definition,
Object transaction, boolean debugEnabled) throws TransactionException {
// ʽΪʹʱ׳쳣
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// ʽΪ֧ʱǰȻ״̬
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
// 1\. suspend()
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// ʽΪµСʱǰȻµ
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
//
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true,
newSynchronization, debugEnabled, suspendedResources);
// 2\. doBegin()µ
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// ʽΪǶִСʱ ı
// ע㣬γǶ
// Ƕе쳣Ӱ쵽֮ǰIJ
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(...);
}
// 3\. createAndHoldSavepoint(...)㣬عʱֻعñ
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction,
false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
// ֱ֧㣬µ
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
if (isValidateExistingTransaction()) {
// ֤
...
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false,
newSynchronization, debugEnabled, null);
}
Կʹĸ뼶صĴѾעͣͲ˵ˣмҪر
suspend()doBegin()µcreateAndHoldSavepoint(...)㣬عʱֻعñ⼸ͳһ
AbstractPlatformTransactionManager#getTransactionٻص AbstractPlatformTransactionManager#getTransaction ʣµ̣
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// ǰѾˣʡ
...
// еǰû
// 鳬ʱʱǷ
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// PROPAGATION_MANDATORYУûֱ쳣
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(...);
}
// ǰ
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// suspend(...) nullͬͬʲôҲ
SuspendedResourcesHolder suspendedResources = suspend(null);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//
DefaultTransactionStatus status = newTransactionStatus(
def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//
doBegin(transaction, def);
// TransactionSynchronizationManager
prepareSynchronization(status, def);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
handleExistingTransaction(...) ĹʱЩҪôgetTransaction(...) ²ֵĹǣʱЩҪôԿȻ suspend(...)``doBegin(...) ȷЩһҲͳһ
prepareTransactionStatus(...)handleExistingTransaction(...) getTransaction(...) ڴؽʱʹ prepareTransactionStatus(...)
// `handleExistingTransaction(...)`
return prepareTransactionStatus(definition, transaction, false,
newSynchronization, debugEnabled, null);
// `getTransaction(...)`
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
ɶ AbstractPlatformTransactionManager#prepareTransactionStatus
protected final DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
// һ DefaultTransactionStatus
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
// Synchronization
prepareSynchronization(status, definition);
return status;
}
/**
*һ TransactionStatus ʵ
*/
protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
// DefaultTransactionStatus Ĺ췽
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
}
ҪΪ˴ DefaultTransactionStatus һн
TransactionAspectSupport#prepareTransactionInfoص TransactionAspectSupport#createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
...
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// ȡ״̬ǰûܻᴴ
status = tm.getTransaction(txAttr);
}
}
// ϢǽǰõϢװ TransactionInfo
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
ǰôֻ࣬ǵõ TransactionStatusٽϢķ prepareTransactionInfo(...)
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
txInfo.newTransactionStatus(status);
}
// ʡlogĴӡ
...
// ̰߳
txInfo.bindToThread();
return txInfo;
}
ţͬ prepareTransactionStatus(...) ƣҲǴһ TransactionInfo ҽ TransactionInfo 뵱ǰ̰߳Ĵ£
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
// ŵǰʹõϢ
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
new NamedThreadLocal<>("Current aspect-driven transaction");
// ΪɵϢ
protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
if (txInfo != null) {
txInfo.restoreThreadLocalStatus();
}
}
/**
* TransactionInfo: Ϣ
*/
protected static final class TransactionInfo {
// ǰ״̬
@Nullable
private TransactionStatus transactionStatus;
// ɵϢҲǹϢ
@Nullable
private TransactionInfo oldTransactionInfo;
/**
* Ϣǰ߳
*/
private void bindToThread() {
// õɵϢ
this.oldTransactionInfo = transactionInfoHolder.get();
// óµϢ
transactionInfoHolder.set(this);
}
/**
* ɺὫɵϢǰ߳
*/
private void restoreThreadLocalStatus() {
// ΪɵϢ
transactionInfoHolder.set(this.oldTransactionInfo);
}
...
}
}
TransactionAspectSupport һ ThreadLocalŵǰ TransactionInfo ̰߳ʱõɵϢ TransactionInfo ijԱ oldTransactionInfo УȻµ TransactionInfo ThreadLocal Уִɺ TransactionInfo ijԱ oldTransactionInfo õɵϢٽɵϢ ThreadLocal У " - - " л.
һõ TransactionInfo £
TransactionInfo ĽṹһЩ˵
TransactionAspectSupport.TransactionInfo TransactionAspectSupport һڲ࣬װһЩϢtransactionManager: õ DataSourceTransactionManagertransactionAttribute: ֵ @Transactional עֵjoinpointIdentification: ȫʽΪ"͡"transactionStatus: ϿǼ¼״̬ʵ¼״̬ش£
complete: ״̬connectionHolder: ǰеݿsuspendedResources: ݿӣҪָʱܹöõݿoldTransactionInfo: һҲǹϢִ굱ǰָһִһСдôˣľȷˣsuspend(...)``doBegin(...) ȷƪٷɡ
ԭӣhttps://my.oschina.net/funcy/blog/4947799 ߸ˮƽд֮ӭָԭףҵתϵȨҵתע