docs/Spring全家桶/SpringCloudAlibaba源码分析/SpringCloudSeata源码分析.md
| һߣJava Դ |ͷ
ѧϰĿ
ȿ˼άƵͼ1.2 ʼƵ 1.3 ִƵ 2 Դ 2.1 SeataAutoConfiguration seataԴоҪseataҵSQLundo_logݣһɺύȫһҵʧܺͨundo_logع
seataҲspringʹõģSpringBootseataҲһЩԶseataԶdzֱӣͽSeataAutoConfigurationǴ
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)@Configuration@EnableConfigurationProperties({SeataProperties.class})public class SeataAutoConfiguration { }
ȣ@ComponentScanɨһpropertiesһSeataPropertiesBean
@ConditionalOnPropertyЧΪseata.enabled=trueĬֵtrueԿԿطֲʽܣclient˵file.confã
@ConfigurationSeataAutoConfigurationΪspringࡣ
@EnableConfigurationPropertiesðתһSeataPropertiesBeanʹá
ĶSeataAutoConfigurationڲ
@Bean@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})@ConditionalOnMissingBean(GlobalTransactionScanner.class)public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Automatically configure Seata"); } return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);}
ԶõĺĵһBeanGlobalTransactionScanner
ǿBeandzļ췽ֻҪһapplicationIdtxServiceGroup
applicationId: spring.application.name=㶨ĵǰӦõ֣磺userService
txServiceGroup: applicationId -seata-service-groupģ磺 userService-seata-service-group汾ϵ͵ĻʱܻseatafescarĬfescarΪ
newһGlobalTransactionScannerSeataAutoConfigurationԶþͽˡSeataAutoConfigurationֻһá
2.2 GlobalTransactionScanner ȻĵGlobalTransactionScanner࣬ǼעʵͿԲ²һãɨ@GlobalTransactionalע⣬ԴǿĹܡ
Ҫ˽࣬òĶһUMLͼԿGlobalTransactionScannerҪ4ֵùע
1ApplicationContextAwareʾõspring
2InitializingBeanӿڣ˳ʼʱһЩ
3AbstractAutoProxyCreatorʾspringеBeanǿҲǿIJ²⡣
4DisposableӿڣspringٵʱһЩ
עһ4ִ˳
ApplicationContextAware -> InitializingBean -> AbstractAutoProxyCreator -> DisposableBean
2.3 InitializingBean
@Overridepublic void afterPropertiesSet() { if (disableGlobalTransaction) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } return; } initClient();}
ʼSeataClient˵ĶClientҪTransactionManagerResourceManagerΪ˼ɣûаinitClient´GlobalTransactionScannerһࡣ
initClient
private void initClient() { //init TM TMClient.init(applicationId, txServiceGroup); //init RM RMClient.init(applicationId, txServiceGroup); registerSpringShutdownHook();}
initClientӣTMClient.initʼTransactionManagerRPCͻˣRMClient.initʼResourceManagerRPCͻˡseataRPCnettyʵ֣seataװһʹáעһSpringShutdownHookӺ
2.3.1 TMClientʼ
@Overridepublic void init() { timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { clientChannelManager.reconnect(getTransactionServiceGroup()); } }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);...}
һʱϽ clientChannelManager.reconnect
void reconnect(String transactionServiceGroup) { List<String> availList = null; try { availList = getAvailServerList(transactionServiceGroup); } catch (Exception e) { ... } ... for (String serverAddress : availList) { try { acquireChannel(serverAddress); } catch (Exception e) { ... } }}
transactionServiceGroupȡseata-serveripַбȻ
private List<String> getAvailServerList(String transactionServiceGroup) throws Exception { List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance() .lookup(transactionServiceGroup); if (CollectionUtils.isEmpty(availInetSocketAddressList)) { return Collections.emptyList(); } return availInetSocketAddressList.stream() .map(NetUtil::toStringAddress) .collect(Collectors.toList());}
RegistryFactory.getInstance().lookup(transactionServiceGroup);ǶԲͬעģĬϿNacosʽʵȸҵserverȺƣdefaultȻݼȺҵserverӦip˿ڵַ
@Overridepublic List<InetSocketAddress> lookup(String key) throws Exception { //default String clusterName = getServiceGroup(key); if (clusterName == null) { return null; } if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) { synchronized (LOCK_OBJ) { if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) { List<String> clusters = new ArrayList<>(); clusters.add(clusterName); List<Instance> firstAllInstances = getNamingInstance().getAllInstances(getServiceName(), getServiceGroup(), clusters); if (null != firstAllInstances) { List<InetSocketAddress> newAddressList = firstAllInstances.stream() .filter(instance -> instance.isEnabled() && instance.isHealthy()) .map(instance -> new InetSocketAddress(instance.getIp(), instance.getPort())) .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); } subscribe(clusterName, event -> { List<Instance> instances = ((NamingEvent) event).getInstances(); if (null == instances && null != CLUSTER_ADDRESS_MAP.get(clusterName)) { CLUSTER_ADDRESS_MAP.remove(clusterName); } else if (!CollectionUtils.isEmpty(instances)) { List<InetSocketAddress> newAddressList = instances.stream() .filter(instance -> instance.isEnabled() && instance.isHealthy()) .map(instance -> new InetSocketAddress(instance.getIp(), instance.getPort())) .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); } }); } } } return CLUSTER_ADDRESS_MAP.get(clusterName);}
Seata-serverIPַѻȡ,ȻacquireChannel
Channel acquireChannel(String serverAddress) { Channel channelToServer = channels.get(serverAddress); if (channelToServer != null) { channelToServer = getExistAliveChannel(channelToServer, serverAddress); if (channelToServer != null) { return channelToServer; } }... channelLocks.putIfAbsent(serverAddress, new Object()); synchronized (channelLocks.get(serverAddress)) { return doConnect(serverAddress); }}
ȡseata-serverIPַŵNettyзװTmClientͳʼ
TmClientʼܽ
public static void init(String applicationId, String transactionServiceGroup) { // ȡ RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup); // ResourceManagerĵ rmRpcClient.setResourceManager(DefaultResourceManager.get()); // ӼServer˵Ϣ rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get())); // ʼRPC rmRpcClient.init();}
TMClientȣRMClientһServerϢĻơҲ˵TMְServerͨţ磺ȫbegincommitrollbackȡ
RMԴ⣬ΪȫcommitrollbackȵϢͣӶԱԴز
ԴresourceManagerϢصڽTCڶηύعSeataжResourceManagerAbstractRMHandlerSPI䣬ResouceManagerΪ
public class DefaultResourceManager implements ResourceManager { protected void initResourceManagers() { //init all resource managers List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class); if (CollectionUtils.isNotEmpty(allResourceManagers)) { for (ResourceManager rm : allResourceManagers) { resourceManagers.put(rm.getBranchType(), rm); } } }}
ԿʼDefaultResouceManagerʱʹClassLoaderȥضӦJarµʵ֣ĬATģʽʹõʵݿ⣬Ҳrm-datasourceµʵ֣ʵ·Ҫλ/resources/META-INF/չӿȫ·ȥңͻҵӦʵ ResourceManagerӦʵȫ· io.seata.rm.datasource.DataSourceManagerָύͻعķDefaultRMHandlerӦʵȫ·io.seata.rm.RMHandlerATǸserverϢӦύعĻصࡣ
RMClinetinit()TMClientһ
2.3.3 ܽ
| һߣJava Դ |ͷ
2.4 AbstractAutoProxyCreator GlobalTransactionScannerʼTMRMԺٹעһAbstractAutoProxyCreatorԶ
Զɶأ˵springеBeanǿʲôܣ
GlobalTransactionScannerҪչAbstractAutoProxyCreatorwrapIfNecessary
ǿǰжϴʾǷBeanҪǿǿĻ
2.4.1 wrapIfNecessary
@Overrideprotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { if (disableGlobalTransaction) { return bean; } try { synchronized (PROXYED_SET) { // ͬBean if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; // жǷTCCģʽ if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { // TCCʵֵ interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); } else { Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); // жǷ@GlobalTransactional@GlobalLockע if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } if (interceptor == null) { // TCC if (globalTransactionalInterceptor == null) { globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook); ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } interceptor = globalTransactionalInterceptor; } } // жϵǰBeanǷѾspringĴ if (!AopUtils.isAopProxy(bean)) { // ǣôһspringĴ̼ bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { // һspringĴ࣬ôȡѾڵϣȻӵüϵ AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); for (Advisor avr : advisor) { advised.addAdvisor(0, avr); } } PROXYED_SET.add(beanName); return bean; } } catch (Exception exx) {}}
wrapIfNecessaryϳǷֲ迴
1isTccAutoProxyжǷtccģʽĻѡTccActionInterceptortccģʽѡ GlobalTransactionalInterceptorĬϲ
2existAnnotationжϵǰBeanǷ߽ӿڵķ@GlobalTransactional@GlobalLockע⣬ûֱӷ
3isAopProxyжϵǰBeanǷѾspringĴˣJDK̬CglibͨBeanԭеɴɣѾǴ࣬ôҪͨȡڵҲAdvisorֱӵüϵС
wrapIfNecessaryķӣԴǺϤϸڵЩ
2.4.1.1 ATһοȫ ҪȫĽӿϣ@GlobalTransactionalע⣬עһӦ GlobalTransactionalInterceptorinvokeط
@Overridepublic Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); //ȡϵȫע final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); //ȡϵȫע final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes); if (!localDisable) { //ȫע⣬handleGlobalTransactionȫ if (globalTransactionalAnnotation != null) { return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); //ȫע⣬handleGlobalLockȫ } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation); } } } //ɶûУִͨУ return methodInvocation.proceed();}
handleGlobalTransactionе transactionalTemplate.execute
// 2\. ȫbeginTransactionbeginTransaction(txInfo, tx); Object rs = null;try { // ִҵbusiness.execute() rs = business.execute(); } catch (Throwable ex) { // 3.쳣ִcompleteTransactionAfterThrowingع completeTransactionAfterThrowing(txInfo, tx, ex); throw ex;} // 4\. û쳣ύcommitTransactioncommitTransaction(tx);
ȫյ io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)
@Overridepublic void begin(int timeout, String name) throws TransactionException { //˴Ľɫжйؼ//ǰȫķߣLauncherDzߣParticipant//ڷֲʽϵͳҲGlobalTransactionalע//ôĽɫParticipantԺbegin˳ //жǷߣLauncherDzߣParticipantǸݵǰǷѴXIDж //ûXIDľLauncherѾXIDľParticipant if (role != GlobalTransactionRole.Launcher) { assertXIDNotNull(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid); } return; } assertXIDNull(); if (RootContext.getXID() != null) { throw new IllegalStateException(); } xid = transactionManager.begin(null, null, name, timeout); status = GlobalStatus.Begin; RootContext.bind(xid); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction [{}]", xid); } }
seata-serverȡȫXID
@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); // GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); if (response.getResultCode() == ResultCode.Failed) { throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg()); } return response.getXid();}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException { try { //TMClientװNetty return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request); } catch (TimeoutException toe) { throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe); }}
XIDRootContextУɴ˿ԿȫTMģTMȫseata-serverseata-serverܵseata룩
@Overrideprotected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { //begin response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout())); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid()); }}
io.seata.server.coordinator.DefaultCoordinator#doGlobalBeginܿͻ˿ȫio.seata.server.coordinator.DefaultCore#beginȫ
@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout); MDC.put(RootContext.MDC_KEY_XID, session.getXid()); session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//Ự session.begin(); // transaction start event eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC, session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus())); return session.getXid();}
ͨǰỰ
@Overridepublic void begin() throws TransactionException { this.status = GlobalStatus.Begin; this.beginTime = System.currentTimeMillis(); this.active = true; for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onBegin(this); }}
io.seata.server.session.AbstractSessionManager#onBeginֵio.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession
@Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException { if (StringUtils.isBlank(taskName)) { // boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session); if (!ret) { throw new StoreException("addGlobalSession failed."); } } else { boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session); if (!ret) { throw new StoreException("addGlobalSession failed."); } }}
ݿд
@Overridepublic boolean writeSession(LogOperation logOperation, SessionStorable session) { if (LogOperation.GLOBAL_ADD.equals(logOperation)) { return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) { return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) { return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.BRANCH_ADD.equals(logOperation)) { return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) { return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) { return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else { throw new StoreException("Unknown LogOperation:" + logOperation.name()); }}
seataglobal_tabݣȫѿ
2.4.1.2 ATһִҵSQL ȫѿҪִҵSQLundo_logݣȫسɹջִҵģSeataԴ˴sqlundo_logԴִеģSeataDataSourceConnectionStatementĴװ
/*** datasource滻ԭĵdatasource*/@Primary@Bean("dataSource")public DataSourceProxy dataSourceProxy(DataSource druidDataSource){ return new DataSourceProxy(druidDataSource);}
ĿʹõԴseataDataSourceProxyնSqlнStatementProxy
@Overridepublic boolean execute(String sql) throws SQLException { this.targetSQL = sql; return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);}
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { if (!RootContext.requireGlobalLock() && !StringUtils.equals(BranchType.AT.name(), RootContext.getBranchType())) { //ȫֱִУ return statementCallback.execute(statementProxy.getTargetStatement(), args); } String dbType = statementProxy.getConnectionProxy().getDbType(); if (CollectionUtils.isEmpty(sqlRecognizers)) { sqlRecognizers = SQLVisitorFactory.get( statementProxy.getTargetSQL(), dbType); } Executor<T> executor; if (CollectionUtils.isEmpty(sqlRecognizers)) { executor = new PlainExecutor<>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); //ͬSQLͣͬ switch (sqlRecognizer.getSQLType()) { case INSERT: executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer}); break; case UPDATE: executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; } } else { executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { //ִSQL rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { // Turn other exception into SQLException ex = new SQLException(ex); } throw (SQLException) ex; } return rs; }
insertʹõInsertExecutor.executeʵջʹ io.seata.rm.datasource.exec.BaseTransactionalExecutor#execute
@Overridepublic T execute(Object... args) throws Throwable { if (RootContext.inGlobalTransaction()) { String xid = RootContext.getXID(); statementProxy.getConnectionProxy().bind(xid); } statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock()); return doExecute(args);}
еxidstatementProxyУdoExecuteAbstractDMLBaseExecutorеdoExecute
@Overridepublic T doExecute(Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); if (connectionProxy.getAutoCommit()) { return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); }}
е executeAutoCommitTrue/executeAutoCommitFalse
protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.setAutoCommit(false); return new LockRetryPolicy(connectionProxy).execute(() -> { T result = executeAutoCommitFalse(args); connectionProxy.commit(); return result; }); } catch (Exception e) { ... } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true); }}
ϸ֣նǵexecuteAutoCommitFalse
protected T executeAutoCommitFalse(Object[] args) throws Exception { //getTableMeta if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && getTableMeta().getPrimaryKeyOnlyName().size() > 1) { throw new NotSupportYetException("multi pk only support mysql!"); } //ȡbeforeImage TableRecords beforeImage = beforeImage(); //ִҵsql T result = statementCallback.execute(statementProxy.getTargetStatement(), args); //ȡafterImage TableRecords afterImage = afterImage(beforeImage); //image prepareUndoLog(beforeImage, afterImage); return result;}
ȡbeforeImage
//tableMetaСprotected TableMeta getTableMeta(String tableName) { if (tableMeta != null) { return tableMeta; } ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); tableMeta = TableMetaCacheFactory.getTableMetaCache(connectionProxy.getDbType()) .getTableMeta(connectionProxy.getTargetConnection(), tableName, connectionProxy.getDataSourceProxy().getResourceId()); return tableMeta;}
ִҵsqlʹ com.alibaba.druid.pool.DruidPooledPreparedStatement#executeִ
ȡafterImageύʱundo_log־
protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.setAutoCommit(false); return new LockRetryPolicy(connectionProxy).execute(() -> { T result = executeAutoCommitFalse(args); // connectionProxy.commit(); return result; }); } catch (Exception e) { ... } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true); }}
public void commit() throws SQLException { try { LOCK_RETRY_POLICY.execute(() -> { // doCommit(); return null; }); } catch (SQLException e) { throw e; } catch (Exception e) { throw new SQLException(e); }}
private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { // processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); }}
private void processGlobalTransactionCommit() throws SQLException { try { //seata-serverע֧Ϣ register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { //ύ֮ǰundo_log,flushUndoLogs UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); targetConnection.commit(); } catch (Throwable ex) { ... } if (IS_REPORT_SUCCESS_ENABLE) { report(true); } context.reset();}
public void flushUndoLogs(ConnectionProxy cp) throws SQLException { ConnectionContext connectionContext = cp.getContext(); if (!connectionContext.hasUndoLog()) { return; } String xid = connectionContext.getXid(); long branchId = connectionContext.getBranchId(); ...//÷undo_log insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent, cp.getTargetConnection());}
ڸ÷ע֧ύseata-serverע֧Ϣseata-serverյseataԴ룩
io.seata.server.coordinator.DefaultCoordinator#doBranchRegister
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException { GlobalSession globalSession = assertGlobalSessionNotNull(xid, false); return SessionHolder.lockAndExecute(globalSession, () -> { ... try { //ע globalSession.addBranch(branchSession); } catch (RuntimeException ex) { ... } ... return branchSession.getBranchId(); });}
@Overridepublic void addBranch(BranchSession branchSession) throws TransactionException { for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { //onAddBranchѡAbstractSessionManager lifecycleListener.onAddBranch(this, branchSession); } branchSession.setStatus(BranchStatus.Registered); add(branchSession);}
io.seata.server.storage.db.session.DataBaseSessionManager#addBranchSession
@Overridepublic void onAddBranch(GlobalSession globalSession, BranchSession branchSession) throws TransactionException { //룬ѡDataBaseSessionManager addBranchSession(globalSession, branchSession);}
@Overridepublic void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException { if (StringUtils.isNotBlank(taskName)) { return; } // boolean ret = transactionStoreManager.writeSession(LogOperation.BRANCH_ADD, session); if (!ret) { throw new StoreException("addBranchSession failed."); }}
@Overridepublic boolean writeSession(LogOperation logOperation, SessionStorable session) { if (LogOperation.GLOBAL_ADD.equals(logOperation)) { return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) { return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) { return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.BRANCH_ADD.equals(logOperation)) { return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) { return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) { return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else { throw new StoreException("Unknown LogOperation:" + logOperation.name()); }}
@Overridepublic boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) { String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertBranchTransactionSQL(branchTable); Connection conn = null; PreparedStatement ps = null; try { int index = 1; conn = logStoreDataSource.getConnection(); conn.setAutoCommit(true); ps = conn.prepareStatement(sql); ps.setString(index++, branchTransactionDO.getXid()); ps.setLong(index++, branchTransactionDO.getTransactionId()); ps.setLong(index++, branchTransactionDO.getBranchId()); ps.setString(index++, branchTransactionDO.getResourceGroupId()); ps.setString(index++, branchTransactionDO.getResourceId()); ps.setString(index++, branchTransactionDO.getBranchType()); ps.setInt(index++, branchTransactionDO.getStatus()); ps.setString(index++, branchTransactionDO.getClientId()); ps.setString(index++, branchTransactionDO.getApplicationData()); return ps.executeUpdate() > 0; } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(ps, conn); }}
Seata-serverӷ֧Ϣɣһνҵݣundo_log֧ϢѾдݿ
2.4.1.3 ATύ صhandleGlobalTransactionУ transactionalTemplate.execute
// 2\. ȫbeginTransactionbeginTransaction(txInfo, tx); Object rs = null;try { // ִҵbusiness.execute() rs = business.execute(); } catch (Throwable ex) { //һ //Ƕ // 3.쳣ִcompleteTransactionAfterThrowingع completeTransactionAfterThrowing(txInfo, tx, ex); throw ex;} // 4\. û쳣ύcommitTransactioncommitTransaction(tx);
ύ
commitTransaction(tx);
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeCommit(); // tx.commit(); triggerAfterCommit(); } catch (TransactionException txe) { // 4.1 Failed to commit throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); }}
@Overridepublic GlobalStatus commit(String xid) throws TransactionException { GlobalCommitRequest globalCommit = new GlobalCommitRequest(); globalCommit.setXid(xid); //syncCall GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit); return response.getGlobalStatus();}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException { try { return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request); } catch (TimeoutException toe) { throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe); }}
ͨTMseata-serverSeata-serverյȫύseataԴ룩
DefaultCoordinator
@Overrideprotected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); //commit response.setGlobalStatus(core.commit(request.getXid()));}
Seata-serverյͻȫύȻصͻˣɾundo_logseataɾ֧ȫ
֮ǰ˵RMClientڳʼʱԴresourceManagerϢصڽTCڶηύعSeata-serverɾ֧ݼȫ
@Overridepublic void removeBranch(BranchSession branchSession) throws TransactionException { // do not unlock if global status in (Committing, CommitRetrying, AsyncCommitting), // because it's already unlocked in 'DefaultCore.commit()' if (status != Committing && status != CommitRetrying && status != AsyncCommitting) { if (!branchSession.unlock()) { throw new TransactionException("Unlock branch lock failed, xid = " + this.xid + ", branchId = " + branchSession.getBranchId()); } } for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { // lifecycleListener.onRemoveBranch(this, branchSession); } remove(branchSession);}
private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException { if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) { if (LogOperation.GLOBAL_ADD.equals(logOperation)) { throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store global session"); } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) { throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update global session"); } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) { throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove global session"); } else if (LogOperation.BRANCH_ADD.equals(logOperation)) { throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store branch session"); } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) { throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update branch session"); } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) { throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove branch session"); } else { throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Unknown LogOperation:" + logOperation.name()); } }}
public static void endCommitted(GlobalSession globalSession) throws TransactionException { globalSession.changeStatus(GlobalStatus.Committed); //ɾȫ globalSession.end();}
ͻɾundo_log
ڽύ
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData); } // BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData); response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch commit result: " + status); } }
getResourceManagerȡľRMClientʼʱõԴDataSourceManager
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);}
@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) { LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid); } return BranchStatus.PhaseTwo_Committed;}
ֻһASYNC_COMMIT_BUFFERListһύcontextύAsyncWorkerinit()
public synchronized void init() { LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT); ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true)); timerExecutor.scheduleAtFixedRate(() -> { try {// doBranchCommits(); } catch (Throwable e) { LOGGER.info("Failed at async committing ... {}", e.getMessage()); } }, 10, 1000 * 1, TimeUnit.MILLISECONDS);}
ɾUndo_log
λع
λعseata-server˴ύƣʡ
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); //ȫֻعsea response.setGlobalStatus(core.rollback(request.getXid()));}
Ҫعͻν
@Overridepublic BranchRollbackResponse handle(BranchRollbackRequest request) { BranchRollbackResponse response = new BranchRollbackResponse(); exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() { @Override public void execute(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { // doBranchRollback(request, response); } }, request, response); return response;}
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { DataSourceProxy dataSourceProxy = get(resourceId); if (dataSourceProxy == null) { throw new ShouldNeverHappenException(); } try { UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId); } catch (TransactionException te) { StackTraceLogger.info(LOGGER, te, "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]", new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()}); if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) { return BranchStatus.PhaseTwo_RollbackFailed_Unretryable; } else { return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } return BranchStatus.PhaseTwo_Rollbacked; }
ջعõUndoLogManager.undo(dataSourceProxy, xid, branchId);жundologǷڣɾӦundologһύseataATģʽԴϡ
https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA https://juejin.cn/post/6931922457741770760 https://github.com/D2C-Cai/herring http://c.biancheng.net/springcloud https://github.com/macrozheng/springcloud-learning