Back to Javatutorial

SpringCloudSeata源码分析

docs/Spring全家桶/SpringCloudAlibaba源码分析/SpringCloudSeata源码分析.md

1.0.044.3 KB
Original Source

| һߣJava Դ |ͷ

ѧϰĿ

  • Seata ATģʽԴ 1 ATģʽ 1.1 ˼άƵ ѾATģʽĴԭԴУͨREADMEҲܿATģʽʹãDZĽӵײԴȥATģʽԭڷԭ֮ǰͼһĹ˼·ģʽ

ȿ˼άƵͼ1.2 ʼƵ 1.3 ִƵ 2 Դ 2.1 SeataAutoConfiguration seataԴоҪseataҵSQLundo_logݣһ׶ɺύȫһ׶ҵʧܺͨundo_logع񣬽񲹳

seataҲspringʹõģSpringBootseataҲһЩԶseataԶdzֱӣͽSeataAutoConfigurationǴ

@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")@ConditionalOnProperty(prefix = StarterConstants.SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)@Configuration@EnableConfigurationProperties({SeataProperties.class})public class SeataAutoConfiguration {    }

ȣ@ComponentScanɨһpropertiesһSeataPropertiesBean

@ConditionalOnPropertyЧΪseata.enabled=trueĬֵtrueԿԿطֲʽܣclient˵file.confã

@ConfigurationSeataAutoConfigurationΪspringࡣ

@EnableConfigurationPropertiesðתһSeataPropertiesBeanʹá

ĶSeataAutoConfigurationڲ

@Bean@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})@ConditionalOnMissingBean(GlobalTransactionScanner.class)public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {    if (LOGGER.isInfoEnabled()) {        LOGGER.info("Automatically configure Seata");    }    return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);}

ԶõĺĵһBeanGlobalTransactionScanner

ǿBeandzļ򵥣췽ֻҪһapplicationIdtxServiceGroup

applicationId: spring.application.name=㶨ĵǰӦõ֣磺userService

txServiceGroup: applicationId -seata-service-groupģ磺 userService-seata-service-group汾ϵ͵ĻʱܻseatafescarĬfescarΪ׺

newһGlobalTransactionScannerSeataAutoConfigurationԶþͽˡSeataAutoConfigurationֻһá

2.2 GlobalTransactionScanner ȻĵGlobalTransactionScanner࣬ǼעʵͿԲ²⵽һãɨ@GlobalTransactionalע⣬ԴǿĹܡ

Ҫ˽࣬òĶһUMLͼԿGlobalTransactionScannerҪ4ֵùע

1ApplicationContextAwareʾõspring

2InitializingBeanӿڣ˳ʼʱһЩ

3AbstractAutoProxyCreatorʾspringеBeanǿҲǿIJ²⡣

4DisposableӿڣspringٵʱһЩ

΢עһ4ִ˳

ApplicationContextAware -> InitializingBean -> AbstractAutoProxyCreator -> DisposableBean

2.3 InitializingBean

@Overridepublic void afterPropertiesSet() {    if (disableGlobalTransaction) {        if (LOGGER.isInfoEnabled()) {            LOGGER.info("Global transaction is disabled.");        }        return;    }    initClient();}

ʼSeataClient˵ĶClientҪTransactionManagerResourceManagerΪ˼򻯰ɣûаinitClient´GlobalTransactionScannerһࡣ

initClient

private void initClient() {    //init TM    TMClient.init(applicationId, txServiceGroup);       //init RM    RMClient.init(applicationId, txServiceGroup);      registerSpringShutdownHook();}

initClient߼ӣTMClient.initʼTransactionManagerRPCͻˣRMClient.initʼResourceManagerRPCͻˡseataRPCnettyʵ֣seataװһʹáעһSpringShutdownHookӺ

2.3.1 TMClientʼ

@Overridepublic void init() {    timerExecutor.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            clientChannelManager.reconnect(getTransactionServiceGroup());        }    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);...}

һʱϽ clientChannelManager.reconnect

void reconnect(String transactionServiceGroup) {    List<String> availList = null;    try {        availList = getAvailServerList(transactionServiceGroup);    } catch (Exception e) {       ...    }   ...    for (String serverAddress : availList) {        try {            acquireChannel(serverAddress);        } catch (Exception e) {            ...        }    }}

transactionServiceGroupȡseata-serveripַбȻ

private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {    List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance()        .lookup(transactionServiceGroup);    if (CollectionUtils.isEmpty(availInetSocketAddressList)) {        return Collections.emptyList();    }     return availInetSocketAddressList.stream()        .map(NetUtil::toStringAddress)        .collect(Collectors.toList());}

RegistryFactory.getInstance().lookup(transactionServiceGroup);ǶԲͬעģĬϿNacosʽʵȸҵserverȺƣdefaultȻݼȺҵserverӦip˿ڵַ

@Overridepublic List<InetSocketAddress> lookup(String key) throws Exception {    //default    String clusterName = getServiceGroup(key);    if (clusterName == null) {        return null;    }    if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {        synchronized (LOCK_OBJ) {            if (!LISTENER_SERVICE_MAP.containsKey(clusterName)) {                List<String> clusters = new ArrayList<>();                clusters.add(clusterName);                List<Instance> firstAllInstances = getNamingInstance().getAllInstances(getServiceName(), getServiceGroup(), clusters);                if (null != firstAllInstances) {                    List<InetSocketAddress> newAddressList = firstAllInstances.stream()                        .filter(instance -> instance.isEnabled() && instance.isHealthy())                        .map(instance -> new InetSocketAddress(instance.getIp(), instance.getPort()))                        .collect(Collectors.toList());                    CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);                }                subscribe(clusterName, event -> {                    List<Instance> instances = ((NamingEvent) event).getInstances();                    if (null == instances && null != CLUSTER_ADDRESS_MAP.get(clusterName)) {                        CLUSTER_ADDRESS_MAP.remove(clusterName);                    } else if (!CollectionUtils.isEmpty(instances)) {                        List<InetSocketAddress> newAddressList = instances.stream()                            .filter(instance -> instance.isEnabled() && instance.isHealthy())                            .map(instance -> new InetSocketAddress(instance.getIp(), instance.getPort()))                            .collect(Collectors.toList());                        CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);                    }                });            }        }    }    return CLUSTER_ADDRESS_MAP.get(clusterName);}

Seata-serverIPַѻȡ,ȻacquireChannel

Channel acquireChannel(String serverAddress) {    Channel channelToServer = channels.get(serverAddress);    if (channelToServer != null) {        channelToServer = getExistAliveChannel(channelToServer, serverAddress);        if (channelToServer != null) {            return channelToServer;        }    }...    channelLocks.putIfAbsent(serverAddress, new Object());    synchronized (channelLocks.get(serverAddress)) {        return doConnect(serverAddress);    }}

󽫻ȡseata-serverIPַŵNettyзװTmClientͳʼ

TmClientʼܽ᣺

  • ʱԽһseata-server
  • ʱȴnacosãиݷ(service_group)ҵȺ(cluster_name)
  • ٸݼȺҵȺip˿б
  • ipбѡһnetty 2.3.2 RMClientʼ
public static void init(String applicationId, String transactionServiceGroup) {    // ȡ    RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);    // ResourceManagerĵ    rmRpcClient.setResourceManager(DefaultResourceManager.get());    // ӼServer˵Ϣ    rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));    // ʼRPC    rmRpcClient.init();}

TMClientȣRMClientһServerϢĻơҲ˵TMְServerͨţ磺ȫbegincommitrollbackȡ

RMԴ⣬ΪȫcommitrollbackȵϢͣӶԱԴز

ԴresourceManagerϢصڽTCڶ׶ηύ߻عSeataжResourceManagerAbstractRMHandlerSPI䣬ResouceManagerΪ

public class DefaultResourceManager implements ResourceManager {    protected void initResourceManagers() {        //init all resource managers        List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);        if (CollectionUtils.isNotEmpty(allResourceManagers)) {            for (ResourceManager rm : allResourceManagers) {                resourceManagers.put(rm.getBranchType(), rm);            }        }    }}

ԿʼDefaultResouceManagerʱʹClassLoaderȥضӦJarµʵ֣ĬATģʽʹõʵݿ⣬Ҳrm-datasourceµʵ֣ʵ·Ҫλ/resources/META-INF/չӿȫ·ȥңͻҵӦʵ ResourceManagerӦʵȫ· io.seata.rm.datasource.DataSourceManagerָύͻعķDefaultRMHandlerӦʵȫ·io.seata.rm.RMHandlerATǸserverϢӦύ߻عĻصࡣ

RMClinetinit()TMClientһ

2.3.3 ܽ

  • Springʱʼ2ͻTmClientRmClient
  • TmClientseata-serverͨNettyӲϢ
  • RmClientseata-serverͨNettyӣն׶ύعϢڻص(RmHandler)

| һߣJava Դ |ͷ

2.4 AbstractAutoProxyCreator GlobalTransactionScannerʼTMRMԺٹעһAbstractAutoProxyCreatorԶ

Զɶأ˵springеBeanǿʲôܣ

GlobalTransactionScannerҪչAbstractAutoProxyCreatorwrapIfNecessary

ǿǰжϴʾǷBeanҪǿǿĻ

2.4.1 wrapIfNecessary

@Overrideprotected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {    if (disableGlobalTransaction) {        return bean;    }    try {        synchronized (PROXYED_SET) {            // ͬBean            if (PROXYED_SET.contains(beanName)) {                return bean;            }             interceptor = null;            // жǷTCCģʽ            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {                // TCCʵֵ                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));            } else {                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);                 // жǷ@GlobalTransactional@GlobalLockע                if (!existsAnnotation(new Class[]{serviceInterface})                    && !existsAnnotation(interfacesIfJdk)) {                    return bean;                }                 if (interceptor == null) {                    // TCC                    if (globalTransactionalInterceptor == null) {                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);                        ConfigurationCache.addConfigListener(                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,                            (ConfigurationChangeListener)globalTransactionalInterceptor);                    }                    interceptor = globalTransactionalInterceptor;                }            }            // жϵǰBeanǷѾspringĴ            if (!AopUtils.isAopProxy(bean)) {                // ǣôһspringĴ̼                bean = super.wrapIfNecessary(bean, beanName, cacheKey);            } else {                // һspringĴ࣬ôȡѾڵϣȻӵüϵ                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));                for (Advisor avr : advisor) {                    advised.addAdvisor(0, avr);                }            }             PROXYED_SET.add(beanName);            return bean;        }    } catch (Exception exx) {}}

wrapIfNecessaryϳǷֲ迴

1isTccAutoProxyжǷtccģʽĻѡTccActionInterceptortccģʽѡ GlobalTransactionalInterceptorĬϲ

2existAnnotationжϵǰBeanǷ߽ӿڵķ@GlobalTransactional@GlobalLockע⣬ûֱӷ

3isAopProxyжϵǰBeanǷѾspringĴˣJDK̬CglibͨBeanԭеɴ߼ɣѾǴ࣬ôҪͨȡڵҲAdvisorֱӵüϵС

wrapIfNecessaryķӣԴǺϤϸڵЩ

2.4.1.1 ATһ׶οȫ ҪȫĽӿϣ@GlobalTransactionalע⣬עһӦ GlobalTransactionalInterceptorinvokeط

@Overridepublic Object invoke(final MethodInvocation methodInvocation) throws Throwable {    Class<?> targetClass =        methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;    Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);        //ȡϵȫע        final GlobalTransactional globalTransactionalAnnotation =            getAnnotation(method, targetClass, GlobalTransactional.class);        //ȡϵȫע        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);        boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);        if (!localDisable) {            //ȫע⣬handleGlobalTransactionȫ            if (globalTransactionalAnnotation != null) {                return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);            //ȫע⣬handleGlobalLockȫ            } else if (globalLockAnnotation != null) {                return handleGlobalLock(methodInvocation);            }        }    }    //ɶûУִͨУ    return methodInvocation.proceed();}

handleGlobalTransactionе transactionalTemplate.execute

// 2\. ȫbeginTransactionbeginTransaction(txInfo, tx); Object rs = null;try {     // ִҵ񷽷business.execute()    rs = business.execute(); } catch (Throwable ex) {     // 3.쳣ִcompleteTransactionAfterThrowingع    completeTransactionAfterThrowing(txInfo, tx, ex);    throw ex;} // 4\. û쳣ύcommitTransactioncommitTransaction(tx);

ȫյ io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)

@Overridepublic void begin(int timeout, String name) throws TransactionException {    //˴Ľɫжйؼ//ǰȫķߣLauncherDzߣParticipant//ڷֲʽϵͳҲGlobalTransactionalע//ôĽɫParticipantԺbegin˳    //жǷߣLauncherDzߣParticipantǸݵǰǷѴXIDж    //ûXIDľLauncherѾXIDľParticipant    if (role != GlobalTransactionRole.Launcher) {        assertXIDNotNull();        if (LOGGER.isDebugEnabled()) {            LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);        }        return;    }    assertXIDNull();    if (RootContext.getXID() != null) {        throw new IllegalStateException();    }    xid = transactionManager.begin(null, null, name, timeout);    status = GlobalStatus.Begin;    RootContext.bind(xid);    if (LOGGER.isInfoEnabled()) {        LOGGER.info("Begin new global transaction [{}]", xid);    } }

seata-serverȡȫXID

@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)    throws TransactionException {    GlobalBeginRequest request = new GlobalBeginRequest();    request.setTransactionName(name);    request.setTimeout(timeout);    //    GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);    if (response.getResultCode() == ResultCode.Failed) {        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());    }    return response.getXid();}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {    try {        //TMClientװNetty        return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);    } catch (TimeoutException toe) {        throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);    }}

XIDRootContextУɴ˿ԿȫTMģTMȫseata-serverseata-serverܵseata룩

@Overrideprotected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)    throws TransactionException {    //begin    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),                               request.getTransactionName(), request.getTimeout()));    if (LOGGER.isInfoEnabled()) {        LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",                    rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());    }}

io.seata.server.coordinator.DefaultCoordinator#doGlobalBeginܿͻ˿ȫ󣬵io.seata.server.coordinator.DefaultCore#beginȫ

@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)    throws TransactionException {    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,                                                              timeout);    MDC.put(RootContext.MDC_KEY_XID, session.getXid());    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//Ự    session.begin();     // transaction start event    eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,                                             session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));     return session.getXid();}

ͨǰỰ

@Overridepublic void begin() throws TransactionException {    this.status = GlobalStatus.Begin;    this.beginTime = System.currentTimeMillis();    this.active = true;    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {        lifecycleListener.onBegin(this);    }}

io.seata.server.session.AbstractSessionManager#onBeginֵio.seata.server.storage.db.session.DataBaseSessionManager#addGlobalSession

@Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException {    if (StringUtils.isBlank(taskName)) {        //        boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);        if (!ret) {            throw new StoreException("addGlobalSession failed.");        }    } else {        boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);        if (!ret) {            throw new StoreException("addGlobalSession failed.");        }    }}

ݿд

@Overridepublic boolean writeSession(LogOperation logOperation, SessionStorable session) {    if (LogOperation.GLOBAL_ADD.equals(logOperation)) {        return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));    } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {        return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));    } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {        return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));    } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {        return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));    } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {        return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));    } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {        return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));    } else {        throw new StoreException("Unknown LogOperation:" + logOperation.name());    }}

seataglobal_tabݣȫѿ

2.4.1.2 ATһ׶ִҵSQL ȫѿҪִҵSQLundo_logݣȫسɹջִҵ񷽷ģSeataԴ˴sqlundo_logԴִеģSeataDataSourceConnectionStatementĴװ

/*** datasource滻ԭĵdatasource*/@Primary@Bean("dataSource")public DataSourceProxy dataSourceProxy(DataSource druidDataSource){    return new DataSourceProxy(druidDataSource);}

ĿʹõԴseataDataSourceProxyնSqlнStatementProxy

@Overridepublic boolean execute(String sql) throws SQLException {    this.targetSQL = sql;    return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);}
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,                                                     StatementProxy<S> statementProxy,                                                     StatementCallback<T, S> statementCallback,                                                     Object... args) throws SQLException {         if (!RootContext.requireGlobalLock() && !StringUtils.equals(BranchType.AT.name(), RootContext.getBranchType())) {            //ȫֱִУ            return statementCallback.execute(statementProxy.getTargetStatement(), args);        }         String dbType = statementProxy.getConnectionProxy().getDbType();        if (CollectionUtils.isEmpty(sqlRecognizers)) {            sqlRecognizers = SQLVisitorFactory.get(                    statementProxy.getTargetSQL(),                    dbType);        }        Executor<T> executor;        if (CollectionUtils.isEmpty(sqlRecognizers)) {            executor = new PlainExecutor<>(statementProxy, statementCallback);        } else {            if (sqlRecognizers.size() == 1) {                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);                //ͬSQLͣͬ                switch (sqlRecognizer.getSQLType()) {                    case INSERT:                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,                                new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},                                new Object[]{statementProxy, statementCallback, sqlRecognizer});                        break;                    case UPDATE:                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);                        break;                    case DELETE:                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);                        break;                    case SELECT_FOR_UPDATE:                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);                        break;                    default:                        executor = new PlainExecutor<>(statementProxy, statementCallback);                        break;                }            } else {                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);            }        }        T rs;        try {            //ִSQL            rs = executor.execute(args);        } catch (Throwable ex) {            if (!(ex instanceof SQLException)) {                // Turn other exception into SQLException                ex = new SQLException(ex);            }            throw (SQLException) ex;        }        return rs;    }
  • жǷȫûУߴsql
  • SQLVisitorFactoryĿsqlн
  • ضsql(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE)Ƚ
  • ִsqlؽ ͬ͵SQLһinsertΪ

insertʹõInsertExecutor.executeʵջʹ io.seata.rm.datasource.exec.BaseTransactionalExecutor#execute

@Overridepublic T execute(Object... args) throws Throwable {    if (RootContext.inGlobalTransaction()) {        String xid = RootContext.getXID();        statementProxy.getConnectionProxy().bind(xid);    }     statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());    return doExecute(args);}

еxid󶨵statementProxyУdoExecuteAbstractDMLBaseExecutorеdoExecute

@Overridepublic T doExecute(Object... args) throws Throwable {    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();    if (connectionProxy.getAutoCommit()) {        return executeAutoCommitTrue(args);    } else {        return executeAutoCommitFalse(args);    }}

е executeAutoCommitTrue/executeAutoCommitFalse

protected T executeAutoCommitTrue(Object[] args) throws Throwable {    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();    try {        connectionProxy.setAutoCommit(false);        return new LockRetryPolicy(connectionProxy).execute(() -> {            T result = executeAutoCommitFalse(args);            connectionProxy.commit();            return result;        });    } catch (Exception e) {        ...    } finally {        connectionProxy.getContext().reset();        connectionProxy.setAutoCommit(true);    }}

ϸ֣նǵexecuteAutoCommitFalse

protected T executeAutoCommitFalse(Object[] args) throws Exception {    //getTableMeta    if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && getTableMeta().getPrimaryKeyOnlyName().size() > 1)    {        throw new NotSupportYetException("multi pk only support mysql!");    }    //ȡbeforeImage    TableRecords beforeImage = beforeImage();    //ִҵsql    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);    //ȡafterImage    TableRecords afterImage = afterImage(beforeImage);    //image    prepareUndoLog(beforeImage, afterImage);    return result;}

ȡbeforeImage

//tableMetaСprotected TableMeta getTableMeta(String tableName) {    if (tableMeta != null) {        return tableMeta;    }    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();    tableMeta = TableMetaCacheFactory.getTableMetaCache(connectionProxy.getDbType())        .getTableMeta(connectionProxy.getTargetConnection(), tableName, connectionProxy.getDataSourceProxy().getResourceId());    return tableMeta;}

ִҵsqlʹ com.alibaba.druid.pool.DruidPooledPreparedStatement#executeִ

ȡafterImageύʱundo_log־

protected T executeAutoCommitTrue(Object[] args) throws Throwable {    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();    try {        connectionProxy.setAutoCommit(false);        return new LockRetryPolicy(connectionProxy).execute(() -> {            T result = executeAutoCommitFalse(args);            //            connectionProxy.commit();            return result;        });    } catch (Exception e) {        ...    } finally {        connectionProxy.getContext().reset();        connectionProxy.setAutoCommit(true);    }}
public void commit() throws SQLException {    try {        LOCK_RETRY_POLICY.execute(() -> {            //            doCommit();            return null;        });    } catch (SQLException e) {        throw e;    } catch (Exception e) {        throw new SQLException(e);    }}
private void doCommit() throws SQLException {    if (context.inGlobalTransaction()) {        //        processGlobalTransactionCommit();    } else if (context.isGlobalLockRequire()) {        processLocalCommitWithGlobalLocks();    } else {        targetConnection.commit();    }}
private void processGlobalTransactionCommit() throws SQLException {    try {        //seata-serverע֧Ϣ        register();    } catch (TransactionException e) {        recognizeLockKeyConflictException(e, context.buildLockKeys());    }    try {        //ύ֮ǰundo_log,flushUndoLogs        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);        targetConnection.commit();    } catch (Throwable ex) {       ...    }    if (IS_REPORT_SUCCESS_ENABLE) {        report(true);    }    context.reset();}
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {    ConnectionContext connectionContext = cp.getContext();    if (!connectionContext.hasUndoLog()) {        return;    }     String xid = connectionContext.getXid();    long branchId = connectionContext.getBranchId();     ...//÷undo_log    insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,                            cp.getTargetConnection());}

ڸ÷ע֧ύseata-serverע֧Ϣseata-serverյseataԴ룩

io.seata.server.coordinator.DefaultCoordinator#doBranchRegister

public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,                           String applicationData, String lockKeys) throws TransactionException {    GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);    return SessionHolder.lockAndExecute(globalSession, () -> {        ...        try {            //ע            globalSession.addBranch(branchSession);        } catch (RuntimeException ex) {            ...        }        ...        return branchSession.getBranchId();    });}
@Overridepublic void addBranch(BranchSession branchSession) throws TransactionException {    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {        //onAddBranchѡAbstractSessionManager        lifecycleListener.onAddBranch(this, branchSession);    }    branchSession.setStatus(BranchStatus.Registered);    add(branchSession);}

io.seata.server.storage.db.session.DataBaseSessionManager#addBranchSession

@Overridepublic void onAddBranch(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {    //룬ѡDataBaseSessionManager    addBranchSession(globalSession, branchSession);}
@Overridepublic void addBranchSession(GlobalSession globalSession, BranchSession session) throws TransactionException {    if (StringUtils.isNotBlank(taskName)) {        return;    }    //    boolean ret = transactionStoreManager.writeSession(LogOperation.BRANCH_ADD, session);    if (!ret) {        throw new StoreException("addBranchSession failed.");    }}
@Overridepublic boolean writeSession(LogOperation logOperation, SessionStorable session) {    if (LogOperation.GLOBAL_ADD.equals(logOperation)) {        return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));    } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {        return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));    } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {        return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));    } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {        return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));    } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {        return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));    } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {        return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));    } else {        throw new StoreException("Unknown LogOperation:" + logOperation.name());    }}
@Overridepublic boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) {    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertBranchTransactionSQL(branchTable);    Connection conn = null;    PreparedStatement ps = null;    try {        int index = 1;        conn = logStoreDataSource.getConnection();        conn.setAutoCommit(true);        ps = conn.prepareStatement(sql);        ps.setString(index++, branchTransactionDO.getXid());        ps.setLong(index++, branchTransactionDO.getTransactionId());        ps.setLong(index++, branchTransactionDO.getBranchId());        ps.setString(index++, branchTransactionDO.getResourceGroupId());        ps.setString(index++, branchTransactionDO.getResourceId());        ps.setString(index++, branchTransactionDO.getBranchType());        ps.setInt(index++, branchTransactionDO.getStatus());        ps.setString(index++, branchTransactionDO.getClientId());        ps.setString(index++, branchTransactionDO.getApplicationData());        return ps.executeUpdate() > 0;    } catch (SQLException e) {        throw new StoreException(e);    } finally {        IOUtil.close(ps, conn);    }}

Seata-serverӷ֧Ϣɣһ׶νҵݣundo_log֧ϢѾдݿ

2.4.1.3 AT׶ύ صhandleGlobalTransactionУ transactionalTemplate.execute

// 2\. ȫbeginTransactionbeginTransaction(txInfo, tx); Object rs = null;try {     // ִҵ񷽷business.execute()    rs = business.execute(); } catch (Throwable ex) {     //һ׶    //Ƕ׶    // 3.쳣ִcompleteTransactionAfterThrowingع    completeTransactionAfterThrowing(txInfo, tx, ex);    throw ex;} // 4\. û쳣ύcommitTransactioncommitTransaction(tx);

׶ύ

commitTransaction(tx);

private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {    try {        triggerBeforeCommit();        //        tx.commit();        triggerAfterCommit();    } catch (TransactionException txe) {        // 4.1 Failed to commit        throw new TransactionalExecutor.ExecutionException(tx, txe,                                                           TransactionalExecutor.Code.CommitFailure);    }}

@Overridepublic GlobalStatus commit(String xid) throws TransactionException {    GlobalCommitRequest globalCommit = new GlobalCommitRequest();    globalCommit.setXid(xid);    //syncCall    GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);    return response.getGlobalStatus();}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {    try {        return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);    } catch (TimeoutException toe) {        throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);    }}

ͨTMseata-serverSeata-serverյȫύseataԴ룩

DefaultCoordinator

@Overrideprotected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)    throws TransactionException {    MDC.put(RootContext.MDC_KEY_XID, request.getXid());    //commit    response.setGlobalStatus(core.commit(request.getXid()));}

Seata-serverյͻȫύȻصͻˣɾundo_logseataɾ֧ȫ

֮ǰ˵RMClientڳʼʱԴresourceManagerϢصڽTCڶ׶ηύ߻عSeata-serverɾ֧ݼȫ

@Overridepublic void removeBranch(BranchSession branchSession) throws TransactionException {    // do not unlock if global status in (Committing, CommitRetrying, AsyncCommitting),    // because it's already unlocked in 'DefaultCore.commit()'    if (status != Committing && status != CommitRetrying && status != AsyncCommitting) {        if (!branchSession.unlock()) {            throw new TransactionException("Unlock branch lock failed, xid = " + this.xid + ", branchId = " + branchSession.getBranchId());        }    }    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {        //        lifecycleListener.onRemoveBranch(this, branchSession);    }    remove(branchSession);}
private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {    if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {        if (LogOperation.GLOBAL_ADD.equals(logOperation)) {            throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,                                                 "Fail to store global session");        } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {            throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,                                                 "Fail to update global session");        } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {            throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,                                                 "Fail to remove global session");        } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {            throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,                                                 "Fail to store branch session");        } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {            throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,                                                 "Fail to update branch session");        } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {            throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,                                                 "Fail to remove branch session");        } else {            throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,                                                 "Unknown LogOperation:" + logOperation.name());        }    }}
public static void endCommitted(GlobalSession globalSession) throws TransactionException {    globalSession.changeStatus(GlobalStatus.Committed);    //ɾȫ    globalSession.end();}

ͻɾundo_log

ڽύ

protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)    throws TransactionException {    String xid = request.getXid();    long branchId = request.getBranchId();    String resourceId = request.getResourceId();    String applicationData = request.getApplicationData();    if (LOGGER.isInfoEnabled()) {        LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);    }    //    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,                                                            applicationData);    response.setXid(xid);    response.setBranchId(branchId);    response.setBranchStatus(status);    if (LOGGER.isInfoEnabled()) {        LOGGER.info("Branch commit result: " + status);    } }

getResourceManagerȡľRMClientʼʱõԴDataSourceManager

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,                                 String applicationData) throws TransactionException {    return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);}
@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,                                 String applicationData) throws TransactionException {    if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {        LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);    }    return BranchStatus.PhaseTwo_Committed;}

ֻһASYNC_COMMIT_BUFFERListһ׶ύcontextύAsyncWorkerinit()

public synchronized void init() {    LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);    ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));    timerExecutor.scheduleAtFixedRate(() -> {        try {//            doBranchCommits();         } catch (Throwable e) {            LOGGER.info("Failed at async committing ... {}", e.getMessage());         }    }, 10, 1000 * 1, TimeUnit.MILLISECONDS);}

ɾUndo_log

׶λع

׶λعseata-server˴׶ύƣʡ

protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,                                RpcContext rpcContext) throws TransactionException {    MDC.put(RootContext.MDC_KEY_XID, request.getXid());    //ȫֻعsea    response.setGlobalStatus(core.rollback(request.getXid()));}

Ҫعͻν񲹳

@Overridepublic BranchRollbackResponse handle(BranchRollbackRequest request) {    BranchRollbackResponse response = new BranchRollbackResponse();    exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {        @Override        public void execute(BranchRollbackRequest request, BranchRollbackResponse response)            throws TransactionException {            //            doBranchRollback(request, response);        }    }, request, response);    return response;}
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,                                   String applicationData) throws TransactionException {    DataSourceProxy dataSourceProxy = get(resourceId);    if (dataSourceProxy == null) {        throw new ShouldNeverHappenException();    }    try {        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);    } catch (TransactionException te) {        StackTraceLogger.info(LOGGER, te,                              "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",                              new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;        } else {            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;        }    }    return BranchStatus.PhaseTwo_Rollbacked; }

ջعõUndoLogManager.undo(dataSourceProxy, xid, branchId);жundologǷڣɾӦundologһύseataATģʽԴϡ

ο

https://lijunyi.xyz/docs/SpringCloud/SpringCloud.html#_2-2-x-%E5%88%86%E6%94%AF https://mp.weixin.qq.com/s/2jeovmj77O9Ux96v3A0NtA https://juejin.cn/post/6931922457741770760 https://github.com/D2C-Cai/herring http://c.biancheng.net/springcloud https://github.com/macrozheng/springcloud-learning