网站首页 > 编程文章 正文
大纲
1.Seata的Resource资源接口源码
2.Seata数据源连接池代理的实现源码
3.Client向Server发起注册RM的源码
4.Client向Server注册RM时的交互源码
5.数据源连接代理与SQL句柄代理的初始化源码
6.Seata基于SQL句柄代理执行SQL的源码
7.执行SQL语句前取消自动提交事务的源码
8.执行SQL语句前后构建数据镜像的源码
9.构建全局锁的key和UndoLog数据的源码
10.Seata Client发起分支事务注册的源码
11.Seata Server处理分支事务注册请求的源码
12.将UndoLog写入到数据库与提交事务的源码
13.通过全局锁重试策略组件执行事务的提交
14.注册分支事务时获取全局锁的入口源码
15.Seata Server获取全局锁的具体逻辑源码
16.全局锁和分支事务及本地事务总结
17.提交全局事务以及提交各分支事务的源码
18.全局事务回滚的过程源码
16.全局锁和分支事务及本地事务总结
获取到全局锁,才能注册分支事务成功,否则LockRetryPolicy重试。获取到全局锁,才能提交本地事务成功,否则LockRetryPolicy重试。
全局锁没有被其他事务(xid)获取,则当前事务(xid)才能获取全局锁成功。获取全局锁,会将当前分支事务申请全局锁的记录写入到数据库中。
17.提交全局事务以及提交各分支事务的源码
(1)Seata Client发起提交全局事务的请求
(2)Server向Client发送提交分支事务的请求
(3)Seata Client处理提交分支事务的请求
(4)全局事务的提交主要就是让各个分支事务把本地的UndoLog删除
(1)Seata Client发起提交全局事务的请求
-> TransactionalTemplate.execute()发起全局事务的提交
-> TransactionalTemplate.commitTransaction()
-> DefaultGlobalTransaction.commit()
-> DefaultTransactionManager.commit()
-> DefaultTransactionManager.syncCall()
-> TmNettyRemotingClient.sendSyncRequest()
把全局事务提交请求GlobalCommitRequest发送给Seata Server进行处理
//Template of executing business logic with a global transaction. 全局事务执行模版
public class TransactionalTemplate {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);
public Object execute(TransactionalExecutor business) throws Throwable {
//1.Get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
//1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
//根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务
//刚开始在开启一个全局事务的时候,是没有全局事务的
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
//1.2 Handle the transaction propagation.
//从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED
//也就是如果存在一个全局事务,就直接执行业务;如果不存在一个全局事务,就开启一个新的全局事务;
Propagation propagation = txInfo.getPropagation();
//不同的全局事务传播级别,会采取不同的处理方式
//比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid
//可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
...
}
//1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
//set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
//2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
//else do nothing. Of course, the hooks will still be triggered.
//开启一个全局事务
beginTransaction(txInfo, tx);
Object rs;
try {
//Do Your Business
//执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并提交一个一个分支事务
rs = business.execute();
} catch (Throwable ex) {
//3. The needed business exception to rollback.
//发生异常时需要完成的事务
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
//4. everything is fine, commit.
//如果一切执行正常就会在这里提交全局事务
commitTransaction(tx);
return rs;
} finally {
//5. clear
//执行一些全局事务完成后的回调,比如清理等工作
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
//If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
//如果之前挂起了一个全局事务,此时可以恢复这个全局事务
tx.resume(suspendedResourcesHolder);
}
}
}
//提交事务
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);
}
}
...
}
//The type Default global transaction. 默认的全局事务
public class DefaultGlobalTransaction implements GlobalTransaction {
private TransactionManager transactionManager;
DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
this.transactionManager = TransactionManagerHolder.get();//全局事务管理者
this.xid = xid;
this.status = status;
this.role = role;
}
...
@Override
public void commit() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
//Participant has no responsibility of committing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
retry--;
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", xid, status);
}
}
...
}
public class TransactionManagerHolder {
...
private TransactionManagerHolder() {
}
private static class SingletonHolder {
private static TransactionManager INSTANCE = null;
static {
try {
INSTANCE = EnhancedServiceLoader.load(TransactionManager.class);
LOGGER.info("TransactionManager Singleton {}", INSTANCE);
} catch (Throwable anyEx) {
LOGGER.error("Failed to load TransactionManager Singleton! ", anyEx);
}
}
}
//Get transaction manager.
public static TransactionManager get() {
if (SingletonHolder.INSTANCE == null) {
throw new ShouldNeverHappenException("TransactionManager is NOT ready!");
}
return SingletonHolder.INSTANCE;
}
...
}
public class DefaultTransactionManager implements TransactionManager {
...
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
//TMNettyRemotingClient会和Seata Server基于Netty建立长连接
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
...
}
(2)Server向Client发送提交分支事务的请求
ServerHandler的channelRead()方法会将收到的请求进行层层传递:首先交给DefaultCoordinator的onRequest()方法来进行处理,然后交给GlobalCommitRequest的handle()方法来进行处理,接着交给AbstractTCInboundHandler的handle()方法来进行处理,最后交给DefaultCoordinator的doGlobalCommit()方法来进行处理,也就是调用DefaultCore的commit()方法来提交全局事务。
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
...
@ChannelHandler.Sharable
class ServerHandler extends ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
}
//接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理
processMessage(ctx, (RpcMessage) msg);
}
...
}
...
}
public abstract class AbstractNettyRemoting implements Disposable {
...
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
}
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
//根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
//processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
//所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
...
}
} else {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
...
}
public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {
private final RemotingServer remotingServer;
...
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (ChannelManager.isRegistered(ctx.channel())) {
onRequestMessage(ctx, rpcMessage);
} else {
try {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
}
ctx.disconnect();
ctx.close();
} catch (Exception exx) {
LOGGER.error(exx.getMessage());
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
}
}
}
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
//RpcContext线程本地变量副本
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
} else {
try {
BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
} catch (InterruptedException e) {
LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
}
}
if (!(message instanceof AbstractMessage)) {
return;
}
//the batch send request message
if (message instanceof MergedWarpMessage) {
...
} else {
//the single send request message
final AbstractMessage msg = (AbstractMessage) message;
//最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
//返回响应给客户端
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
}
}
...
}
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
...
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToTC)) {
throw new IllegalArgumentException();
}
//传入的request其实就是客户端发送请求时的GlobalCommitRequest
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
return transactionRequest.handle(context);
}
...
}
public class GlobalCommitRequest extends AbstractGlobalEndRequest {
@Override
public short getTypeCode() {
return MessageType.TYPE_GLOBAL_COMMIT;
}
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return handler.handle(this, rpcContext);
}
}
public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {
...
@Override
public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
GlobalCommitResponse response = new GlobalCommitResponse();
response.setGlobalStatus(GlobalStatus.Committing);
exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
@Override
public void execute(GlobalCommitRequest request, GlobalCommitResponse response) throws TransactionException {
try {
doGlobalCommit(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
}
}
@Override
public void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response, TransactionException tex) {
super.onTransactionException(request, response, tex);
checkTransactionStatus(request, response);
}
@Override
public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {
super.onException(request, response, rex);
checkTransactionStatus(request, response);
}
}, request, response);
return response;
}
protected abstract void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException;
...
}
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
private final DefaultCore core;
...
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
//调用DefaultCore.commit()方法提交全局事务
response.setGlobalStatus(core.commit(request.getXid()));
}
...
}
DefaultCore的commit()方法会调用DefaultCore的doGlobalCommit()方法,而doGlobalCommit()方法会获取全局事务的所有分支事务并进行遍历,然后把提交分支事务的请求BranchCommitRequest发送到Seata Client中。
public class DefaultCore implements Core {
...
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//just lock changeStatus
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
if (globalSession.getStatus() == GlobalStatus.Begin) {
//Highlight: Firstly, close the session, then no more branch can be registered.
globalSession.closeAndClean();
if (globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
return false;
} else {
globalSession.changeGlobalStatus(GlobalStatus.Committing);
return true;
}
}
return false;
});
if (shouldCommit) {
boolean success = doGlobalCommit(globalSession, false);
//If successful and all remaining branches can be committed asynchronously, do async commit.
if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
return globalSession.getStatus();
}
} else {
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
@Override
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
//start committing event
MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
if (globalSession.isSaga()) {
success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
} else {
//获取到全局事务的所有分支事务,并进行遍历提交
Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
//if not retrying, skip the canBeCommittedAsync branches
if (!retrying && branchSession.canBeCommittedAsync()) {
return CONTINUE;
}
BranchStatus currentStatus = branchSession.getStatus();
if (currentStatus == BranchStatus.PhaseOne_Failed) {
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
}
try {
//发送请求给Seata Client提交分支事务
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
if (isXaerNotaTimeout(globalSession,branchStatus)) {
LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
branchStatus = BranchStatus.PhaseTwo_Committed;
}
switch (branchStatus) {
case PhaseTwo_Committed:
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
//not at branch
SessionHelper.endCommitFailed(globalSession, retrying);
LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
return false;
default:
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
}
if (globalSession.canBeCommittedAsync()) {
LOGGER.error("Committing branch transaction[{}], status:{} and will retry later", branchSession.getBranchId(), branchStatus);
return CONTINUE;
} else {
LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
}
} catch (Exception ex) {
StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}", new String[] {branchSession.toString()});
if (!retrying) {
globalSession.queueToRetryCommit();
throw new TransactionException(ex);
}
}
return CONTINUE;
});
//Return if the result is not null
if (result != null) {
return result;
}
//If has branch and not all remaining branches can be committed asynchronously,
//do print log and return false
if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
return false;
}
if (!retrying) {
//contains not AT branch
globalSession.setStatus(GlobalStatus.Committed);
}
}
//if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
//executed to improve concurrency performance, and the global transaction ends..
if (success && globalSession.getBranchSessions().isEmpty()) {
SessionHelper.endCommitted(globalSession, retrying);
LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
}
return success;
}
...
}
public abstract class AbstractCore implements Core {
protected RemotingServer remotingServer;
...
@Override
public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
try {
BranchCommitRequest request = new BranchCommitRequest();
request.setXid(branchSession.getXid());
request.setBranchId(branchSession.getBranchId());
request.setResourceId(branchSession.getResourceId());
request.setApplicationData(branchSession.getApplicationData());
request.setBranchType(branchSession.getBranchType());
return branchCommitSend(request, globalSession, branchSession);
} catch (IOException | TimeoutException e) {
throw new BranchTransactionException(FailedToSendBranchCommitRequest, String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(), branchSession.getBranchId()), e);
}
}
protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession, BranchSession branchSession) throws IOException, TimeoutException {
BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(branchSession.getResourceId(), branchSession.getClientId(), request);
return response.getBranchStatus();
}
...
}
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
...
@Override
public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {
if (channel == null) {
throw new RuntimeException("client is not connected");
}
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
}
...
}
public abstract class AbstractNettyRemoting implements Disposable {
...
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
if (timeoutMillis <= 0) {
throw new FrameworkException("timeout should more than 0ms");
}
if (channel == null) {
LOGGER.warn("sendSync nothing, caused by null channel.");
return null;
}
//把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
channelWritableCheck(channel, rpcMessage.getBody());
//获取远程地址
String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
doBeforeRpcHooks(remoteAddr, rpcMessage);
//异步化发送数据,同时对发送结果添加监听器
//如果发送失败,则会对网络连接Channel进行销毁处理
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
if (messageFuture1 != null) {
messageFuture1.setResultMessage(future.cause());
}
destroyChannel(future.channel());
}
});
try {
//然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
doAfterRpcHooks(remoteAddr, rpcMessage, result);
return result;
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
}
...
}
(3)Seata Client处理提交分支事务的请求
ClientHandler的channelRead()方法收到提交分支事务的请求后,会由RmBranchCommitProcessor的handleBranchCommit()方法进行处理。
-> AbstractRMHandler.onRequest()
-> BranchCommitRequest.handle()
-> AbstractRMHandler.handle()
-> AbstractRMHandler.doBranchCommit()
-> DataSourceManager.branchCommit()
-> AsyncWorker.branchCommit()异步化提交分支事务
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
...
@Sharable
class ClientHandler extends ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
}
processMessage(ctx, (RpcMessage) msg);
}
...
}
...
}
public abstract class AbstractNettyRemoting implements Disposable {
...
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
}
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
//根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
...
}
} else {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
...
}
public class RmBranchCommitProcessor implements RemotingProcessor {
...
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
Object msg = rpcMessage.getBody();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("rm client handle branch commit process:" + msg);
}
handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
}
private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
BranchCommitResponse resultMessage;
resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("branch commit result:" + resultMessage);
}
try {
this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
} catch (Throwable throwable) {
LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);
}
}
...
}
public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {
...
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToRM)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
transactionRequest.setRMInboundMessageHandler(this);
return transactionRequest.handle(context);
}
...
}
public class BranchCommitRequest extends AbstractBranchEndRequest {
@Override
public short getTypeCode() {
return MessageType.TYPE_BRANCH_COMMIT;
}
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return handler.handle(this);
}
}
public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {
@Override
public BranchCommitResponse handle(BranchCommitRequest request) {
BranchCommitResponse response = new BranchCommitResponse();
exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
@Override
public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
doBranchCommit(request, response);
}
}, request, response);
return response;
}
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
}
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Branch commit result: " + status);
}
}
...
}
//The type Data source manager. DataSourceManager是AT模式下的资源管理器
public class DataSourceManager extends AbstractResourceManager {
//异步化worker
private final AsyncWorker asyncWorker = new AsyncWorker(this);
...
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
//通过asyncWorker,异步化提交分支事务
return asyncWorker.branchCommit(xid, branchId, resourceId);
}
...
}
(4)全局事务的提交主要就是让各个分支事务把本地的UndoLog删除
public class AsyncWorker {
...
public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
Phase2Context context = new Phase2Context(xid, branchId, resourceId);
addToCommitQueue(context);
return BranchStatus.PhaseTwo_Committed;
}
private void addToCommitQueue(Phase2Context context) {
if (commitQueue.offer(context)) {
return;
}
CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor).thenRun(() -> addToCommitQueue(context));
}
void doBranchCommitSafely() {
try {
doBranchCommit();
} catch (Throwable e) {
LOGGER.error("Exception occur when doing branch commit", e);
}
}
private void doBranchCommit() {
if (commitQueue.isEmpty()) {
return;
}
//transfer all context currently received to this list
List<Phase2Context> allContexts = new LinkedList<>();
commitQueue.drainTo(allContexts);
//group context by their resourceId
Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);
groupedContexts.forEach(this::dealWithGroupedContexts);
}
private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
if (dataSourceProxy == null) {
LOGGER.warn("failed to find resource for {} and requeue", resourceId);
addAllToCommitQueue(contexts);
return;
}
Connection conn = null;
try {
conn = dataSourceProxy.getPlainConnection();
UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
//split contexts into several lists, with each list contain no more element than limit size
List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);
//全局事务的提交,就是让各个分支事务把本地的undo logs删除掉即可
for (List<Phase2Context> partition : splitByLimit) {
deleteUndoLog(conn, undoLogManager, partition);
}
} catch (SQLException sqlExx) {
addAllToCommitQueue(contexts);
LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, sqlExx);
} finally {
IOUtil.close(conn);
}
}
...
}
18.全局事务回滚的过程源码
全局事务的回滚流程和提交流程几乎一样:
一.Seata Client发起全局事务回滚请求
二.Server向Client发送分支事务回滚请求
三.Seata Client处理分支事务回滚的请求
猜你喜欢
- 2025-06-12 从零开始搭建AI网站(6):如何使用响应式编程
- 2025-06-12 Windows下Ollama安装目录迁移到D盘
- 2025-06-12 RocketMQ的偏移量更新原理(rocketmq迁移)
- 2025-06-12 阿里p8手把手教学,如何写出简洁又规范的单元测试?
- 2025-06-12 Seata源码—7.Seata TCC模式的事务处理
- 2025-06-12 Seata源码—4.全局事务拦截与开启事务处理二
- 2025-06-12 Seata源码—3.全局事务注解扫描器的初始化一
- 2025-06-12 Seata源码—6.Seata AT模式的数据源代理二
- 2025-06-12 写出更优雅的代码:搞懂 Python 协议与抽象基类的核心区别
- 2025-06-12 Seata源码—6.Seata AT模式的数据源代理一
你 发表评论:
欢迎- 06-13边缘计算网关如何实现系统高效运维?
- 06-13虚拟专用网络VPN连接配置
- 06-13WIN10/11下配置VPN(解决L2TP/IPsec无法连接的问题)
- 06-13公共网络上的私有安全通道——VPN
- 06-13远程办公如何访问公司内网办公系统和内部资源?
- 06-13VBRAS场景测试方法—如何高效验证网络设备的性能与稳定性
- 06-13Win10系统如何使用VPN远程办公
- 06-13iPhone轻松实现远程访问公司局域网电脑上的共享文件
- 最近发表
- 标签列表
-
- spire.doc (70)
- instanceclient (62)
- system.data.oracleclient (61)
- 按键小精灵源码提取 (66)
- pyqt5designer教程 (65)
- 联想刷bios工具 (66)
- c#源码 (64)
- graphics.h头文件 (62)
- mysqldump下载 (66)
- sqljdbc4.jar下载 (56)
- libmp3lame (60)
- maven3.3.9 (63)
- 二调符号库 (57)
- git.exe下载 (68)
- diskgenius_winpe (72)
- pythoncrc16 (57)
- solidworks宏文件下载 (59)
- qt帮助文档中文版 (73)
- satacontroller (66)
- hgcad (64)
- bootimg.exe (69)
- android-gif-drawable (62)
- axure9元件库免费下载 (57)
- libmysqlclient.so.18 (58)
- springbootdemo (64)
本文暂时没有评论,来添加一个吧(●'◡'●)