网站首页 > 编程文章 正文
大纲
1.Seata Server的启动入口的源码
2.Seata Server的网络服务器启动的源码
3.全局事务拦截器的核心变量
4.全局事务拦截器的初始化源码
5.全局事务拦截器的AOP切面拦截方法
6.通过全局事务执行模版来执行全局事务
7.获取xid构建全局事务实例与全局事务的传播级别
8.全局事务执行模版根据传播级别来执行业务
9.全局事务执行模版开启事务+提交事务+回滚事务
10.Seata Server集群的负载均衡机制实现源码
11.Seata Client向Seata Server发送请求的源码
12.Client将RpcMessage对象编码成字节数组
13.Server将字节数组解码成RpcMessage对象
14.Server处理已解码的RpcMessage对象的流程
15.Seata Server开启全局事务的流程源码
9.全局事务执行模版开启事务+提交事务+回滚事务
(1)事务执行模版的开启事务+提交事务+回滚事务
(2)默认的全局事务和默认的事务管理器对事务的开启+提交+回滚的处理
(1)事务执行模版的开启事务+提交事务+回滚事务
事务执行模版TransactionalTemplate在开启、提交、回滚事务时,会通过默认的全局事务DefaultGlobalTransaction来进行开启、提交、回滚事务。
//全局事务上下文
public class GlobalTransactionContext {
private GlobalTransactionContext() {
}
//Try to create a new GlobalTransaction.
//如果xid为null,则会创建一个新的全局事务
public static GlobalTransaction createNew() {
return new DefaultGlobalTransaction();
}
...
}
//默认的全局事务
public class DefaultGlobalTransaction implements GlobalTransaction {
private TransactionManager transactionManager;
private String xid;
private GlobalStatus status;
private GlobalTransactionRole role;
...
//Instantiates a new Default global transaction.
DefaultGlobalTransaction() {
//全局事务角色是全局事务发起者
this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}
//Instantiates a new Default global transaction.
DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
this.transactionManager = TransactionManagerHolder.get();//全局事务管理者
this.xid = xid;
this.status = status;
this.role = role;
}
...
}
//全局事务执行模版
public class TransactionalTemplate {
...
//开启事务
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
//开启全局事务之前有一个回调的一个钩子名为triggerBeforeBegin()
triggerBeforeBegin();
//真正去开启一个全局事务
tx.begin(txInfo.getTimeOut(), txInfo.getName());
//开启全局事务之后还有一个回调钩子名为triggerAfterBegin()
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);
}
}
private void triggerBeforeBegin() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeBegin();
} catch (Exception e) {
LOGGER.error("Failed execute beforeBegin in hook {}", e.getMessage(), e);
}
}
}
private void triggerAfterBegin() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterBegin();
} catch (Exception e) {
LOGGER.error("Failed execute afterBegin in hook {}", e.getMessage(), e);
}
}
}
...
//提交事务
private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);
}
}
private void triggerBeforeCommit() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeCommit();
} catch (Exception e) {
LOGGER.error("Failed execute beforeCommit in hook {}", e.getMessage(), e);
}
}
}
private void triggerAfterCommit() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCommit();
} catch (Exception e) {
LOGGER.error("Failed execute afterCommit in hook {}", e.getMessage(), e);
}
}
}
private void triggerAfterCompletion() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterCompletion();
} catch (Exception e) {
LOGGER.error("Failed execute afterCompletion in hook {}", e.getMessage(), e);
}
}
}
...
//回滚事务
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
//roll back
if (txInfo != null && txInfo.rollbackOn(originalException)) {
try {
rollbackTransaction(tx, originalException);
} catch (TransactionException txe) {
//Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.RollbackFailure, originalException);
}
} else {
//not roll back on this exception, so commit
commitTransaction(tx);
}
}
private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
//3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus()) ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}
private void triggerBeforeRollback() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.beforeRollback();
} catch (Exception e) {
LOGGER.error("Failed execute beforeRollback in hook {}", e.getMessage(), e);
}
}
}
private void triggerAfterRollback() {
for (TransactionHook hook : getCurrentHooks()) {
try {
hook.afterRollback();
} catch (Exception e) {
LOGGER.error("Failed execute afterRollback in hook {}", e.getMessage(), e);
}
}
}
...
}
(2)默认的全局事务和默认的事务管理器对事务的开启+提交+回滚的处理
默认的全局事务DefaultGlobalTransaction在进行开启、提交、回滚事务时,会由默认的事务管理器DefaultTransactionManager来开启、提交、回滚事务。
而默认的事务管理器DefaultTransactionManager在开启、提交、回滚事务时,最终都会执行其syncCall()方法发起一个同步调用,也就是通过TmNettyRemotingClient向Seata Server发送一个Netty请求。
//默认的全局事务
public class DefaultGlobalTransaction implements GlobalTransaction {
private TransactionManager transactionManager;
private String xid;
private GlobalStatus status;
private GlobalTransactionRole role;
...
@Override
public void begin() throws TransactionException {
begin(DEFAULT_GLOBAL_TX_TIMEOUT);
}
@Override
public void begin(int timeout) throws TransactionException {
begin(timeout, DEFAULT_GLOBAL_TX_NAME);
}
@Override
public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," + " can't begin a new global transaction, currentXid = " + currentXid);
}
//通过全局事务管理器去真正开启全局事务,一旦开启成功,就可以获取到一个xid
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
//把xid绑定到RootContext的线程本地变量副本里去
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}
@Override
public void commit() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
try {
while (retry > 0) {
try {
retry--;
status = transactionManager.commit(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
if (retry == 0) {
throw new TransactionException("Failed to report global commit", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] commit status: {}", xid, status);
}
}
@Override
public void rollback() throws TransactionException {
if (role == GlobalTransactionRole.Participant) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNotNull();
int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
try {
while (retry > 0) {
try {
retry--;
status = transactionManager.rollback(xid);
break;
} catch (Throwable ex) {
LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
if (retry == 0) {
throw new TransactionException("Failed to report global rollback", ex);
}
}
}
} finally {
if (xid.equals(RootContext.getXID())) {
suspend();
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] rollback status: {}", xid, status);
}
}
...
}
public class RootContext {
private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
public static final String KEY_XID = "TX_XID";
...
//Gets xid.
@Nullable
public static String getXID() {
return (String) CONTEXT_HOLDER.get(KEY_XID);
}
//Bind xid.
public static void bind(@Nonnull String xid) {
if (StringUtils.isBlank(xid)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid is blank, switch to unbind operation!");
}
unbind();
} else {
MDC.put(MDC_KEY_XID, xid);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind {}", xid);
}
CONTEXT_HOLDER.put(KEY_XID, xid);
}
}
...
}
//默认的全局事务管理器
public class DefaultTransactionManager implements TransactionManager {
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
//构建一个全局事务开启请求GlobalBeginRequest
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
//发起一个同步调用
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
return response.getGlobalStatus();
}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setXid(xid);
GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
return response.getGlobalStatus();
}
...
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
//TMNettyRemotingClient会和Seata Server基于Netty建立长连接
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
}
//GlobalBeginRequest会根据Seata的通信协议序列化成字节数组,然后通过Netty被发送到Seata Server中去
public class GlobalBeginRequest extends AbstractTransactionRequestToTC {
private int timeout = 60000;
private String transactionName;
...
@Override
public short getTypeCode() {
return MessageType.TYPE_GLOBAL_BEGIN;
}
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return handler.handle(this, rpcContext);
}
...
}
10.Seata Server集群的负载均衡机制实现源码
(1)通过负载均衡选择Seata Server节点
(2)Seata提供的负载均衡算法
(1)通过负载均衡选择Seata Server节点
默认的事务管理器DefaultTransactionManager在开启、提交、回滚事务时,最终都会执行其syncCall()方法发起一个同步调用,也就是通过TmNettyRemotingClient向Seata Server发送一个Netty请求。
syncCall()方法在调用TmNettyRemotingClient实例的sendSyncRequest()方法发送请求时,其实调用的是TmNettyRemotingClient的抽象父类
AbstractNettyRemotingClient的sendSyncRequest()方法。
在sendSyncRequest()方法中,首先会调用
AbstractNettyRemotingClient的loadBalance()方法进行负载均衡,也就是首先会调用
AbstractNettyRemotingClient.doSelect()方法。
AbstractNettyRemotingClient的doSelect()方法会先通过LoadBalanceFactory工厂 + SPI来获取一个LoadBalance实例,然后再调用LoadBalance实例的select()方法来进行负载均衡。
负载均衡,其实就是从Seata Server节点中选择其中一个节点发送请求。
//默认的全局事务管理器
public class DefaultTransactionManager implements TransactionManager {
...
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
//TMNettyRemotingClient会和Seata Server基于Netty建立长连接
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
...
}
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
...
@Override
public Object sendSyncRequest(Object msg) throws TimeoutException {
//因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡
String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
//获取RPC调用的超时时间
long timeoutMillis = this.getRpcRequestTimeout();
//构建一个RPC消息
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
//send batch message
//put message into basketMap, @see MergedSendRunnable
//默认是不开启批量消息发送
if (this.isEnableClientBatchSendRequest()) {
...
} else {
//通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel
//然后通过网络连接Channel把RpcMessage发送给Seata Server
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}
}
protected String loadBalance(String transactionServiceGroup, Object msg) {
InetSocketAddress address = null;
try {
@SuppressWarnings("unchecked")
List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().aliveLookup(transactionServiceGroup);
address = this.doSelect(inetSocketAddressList, msg);
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
}
if (address == null) {
throw new FrameworkException(NoAvailableService);
}
return NetUtil.toStringAddress(address);
}
protected InetSocketAddress doSelect(List<InetSocketAddress> list, Object msg) throws Exception {
if (CollectionUtils.isNotEmpty(list)) {
if (list.size() > 1) {
return LoadBalanceFactory.getInstance().select(list, getXid(msg));
} else {
return list.get(0);
}
}
return null;
}
...
}
public class LoadBalanceFactory {
...
public static LoadBalance getInstance() {
//根据SPI机制获取LoadBalance实例
String config = ConfigurationFactory.getInstance().getConfig(LOAD_BALANCE_TYPE, DEFAULT_LOAD_BALANCE);
return EnhancedServiceLoader.load(LoadBalance.class, config);
}
}
(2)Seata提供的负载均衡算法
轮询选择算法、随机选择算法、最少使用算法、一致性哈希算法。
一.轮询选择算法
@LoadLevel(name = ROUND_ROBIN_LOAD_BALANCE)
public class RoundRobinLoadBalance implements LoadBalance {
private final AtomicInteger sequence = new AtomicInteger();
@Override
public <T> T select(List<T> invokers, String xid) {
int length = invokers.size();
//通过轮询选择Seata Server的节点
return invokers.get(getPositiveSequence() % length);
}
private int getPositiveSequence() {
for (;;) {
int current = sequence.get();
int next = current >= Integer.MAX_VALUE ? 0 : current + 1;
if (sequence.compareAndSet(current, next)) {
return current;
}
}
}
}
二.随机选择算法
@LoadLevel(name = RANDOM_LOAD_BALANCE)
public class RandomLoadBalance implements LoadBalance {
@Override
public <T> T select(List<T> invokers, String xid) {
int length = invokers.size();
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
}
三.最少使用算法
@LoadLevel(name = LEAST_ACTIVE_LOAD_BALANCE)
public class LeastActiveLoadBalance implements LoadBalance {
@Override
public <T> T select(List<T> invokers, String xid) {
int length = invokers.size();
long leastActive = -1;
int leastCount = 0;
int[] leastIndexes = new int[length];
for (int i = 0; i < length; i++) {
long active = RpcStatus.getStatus(invokers.get(i).toString()).getActive();
if (leastActive == -1 || active < leastActive) {
leastActive = active;
leastCount = 1;
leastIndexes[0] = i;
} else if (active == leastActive) {
leastIndexes[leastCount++] = i;
}
}
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
四.一致性哈希算法
@LoadLevel(name = CONSISTENT_HASH_LOAD_BALANCE)
public class ConsistentHashLoadBalance implements LoadBalance {
public static final String LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES = LOAD_BALANCE_PREFIX + "visualNodes";
private static final int VIRTUAL_NODES_NUM = ConfigurationFactory.getInstance().getInt(LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES, VIRTUAL_NODES_DEFAULT);
@Override
public <T> T select(List<T> invokers, String xid) {
//通过一致性哈希选择节点
return new ConsistentHashSelector<>(invokers, VIRTUAL_NODES_NUM).select(xid);
}
private static final class ConsistentHashSelector<T> {
private final SortedMap<Long, T> virtualInvokers = new TreeMap<>();
private final HashFunction hashFunction = new MD5Hash();
ConsistentHashSelector(List<T> invokers, int virtualNodes) {
for (T invoker : invokers) {
for (int i = 0; i < virtualNodes; i++) {
virtualInvokers.put(hashFunction.hash(invoker.toString() + i), invoker);
}
}
}
public T select(String objectKey) {
SortedMap<Long, T> tailMap = virtualInvokers.tailMap(hashFunction.hash(objectKey));
Long nodeHashVal = tailMap.isEmpty() ? virtualInvokers.firstKey() : tailMap.firstKey();
return virtualInvokers.get(nodeHashVal);
}
}
@SuppressWarnings("lgtm[java/weak-cryptographic-algorithm]")
private static class MD5Hash implements HashFunction {
MessageDigest instance;
public MD5Hash() {
try {
instance = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public long hash(String key) {
instance.reset();
instance.update(key.getBytes());
byte[] digest = instance.digest();
long h = 0;
for (int i = 0; i < 4; i++) {
h <<= 8;
h |= ((int) digest[i]) & 0xFF;
}
return h;
}
}
public interface HashFunction {
long hash(String key);
}
}
11.Seata Client向Seata Server发送请求的源码
首先Seata Client会通过网络连接管理器ClientChannelManager获取与指定Seata Server建立的网络连接Channel。
然后通过Netty的Channel把RpcMessage请求消息发送给Seata Server,也就是执行Channel的writeAndFlush()方法将RpcMessage请求消息异步发送给Seata Server。
其中,Seata Client会将发送的请求消息封装在一个MessageFuture实例中。并且,Seata Client会通过MessageFuture同步等待Seata Server返回该请求的响应。而MessageFuture请求响应组件是通过CompletableFuture实现同步等待的。
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
...
@Override
public Object sendSyncRequest(Object msg) throws TimeoutException {
//因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡
String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
//获取RPC调用的超时时间
long timeoutMillis = this.getRpcRequestTimeout();
//构建一个RPC消息
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
//send batch message
//put message into basketMap, @see MergedSendRunnable
//默认是不开启批量消息发送
if (this.isEnableClientBatchSendRequest()) {
...
} else {
//通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel
//然后通过网络连接Channel把RpcMessage发送给Seata Server
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}
}
...
}
public abstract class AbstractNettyRemoting implements Disposable {
...
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
if (timeoutMillis <= 0) {
throw new FrameworkException("timeout should more than 0ms");
}
if (channel == null) {
LOGGER.warn("sendSync nothing, caused by null channel.");
return null;
}
//把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
channelWritableCheck(channel, rpcMessage.getBody());
//获取远程地址
String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
doBeforeRpcHooks(remoteAddr, rpcMessage);
//通过Netty的Channel异步化发送数据,同时对发送结果添加监听器
//如果发送失败,则会对网络连接Channel进行销毁处理
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
if (messageFuture1 != null) {
messageFuture1.setResultMessage(future.cause());
}
destroyChannel(future.channel());
}
});
try {
//然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
doAfterRpcHooks(remoteAddr, rpcMessage, result);
return result;
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
}
...
}
public class MessageFuture {
private transient CompletableFuture<Object> origin = new CompletableFuture<>();
...
public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException {
Object result = null;
try {
result = origin.get(timeout, unit);
if (result instanceof TimeoutException) {
throw (TimeoutException)result;
}
} catch (ExecutionException e) {
throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
} catch (TimeoutException e) {
throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start));
}
if (result instanceof RuntimeException) {
throw (RuntimeException)result;
} else if (result instanceof Throwable) {
throw new RuntimeException((Throwable)result);
}
return result;
}
...
}
12.Client将RpcMessage对象编码成字节数组
Seata Client在调用Channel的writeAndFlush()方法将RpcMessage对象发送给Seata Server时,会先将RpcMessage对象交给NettyClientBootstrap的ChannelPipeline进行处理。其中,RpcMessage对象会被ProtocolV1Encoder编码成字节数组。
public class NettyClientBootstrap implements RemotingBootstrap {
...
@Override
public void start() {
if (this.defaultEventExecutorGroup == null) {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads()));
}
//基于Netty API构建一个Bootstrap
//设置好对应的NioEventLoopGroup线程池组,默认1个线程就够了
this.bootstrap.group(this.eventLoopGroupWorker)
.channel(nettyClientConfig.getClientChannelClazz())
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());
if (nettyClientConfig.enableNative()) {
if (PlatformDependent.isOsx()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("client run on macOS");
}
} else {
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);
}
}
//对Netty网络通信数据处理组件pipeline进行初始化
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//IdleStateHandler,空闲状态检查Handler
//如果有数据通过就记录一下时间
//如果超过很长时间没有数据通过,即处于空闲状态,那么就会触发一个user triggered event出去给ClientHandler来进行处理
pipeline.addLast(new IdleStateHandler(
nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()
))
.addLast(new ProtocolV1Decoder())//基于Seata通信协议的编码器
.addLast(new ProtocolV1Encoder());//基于Seata通信协议的解码器
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
}
}
);
if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
LOGGER.info("NettyClientBootstrap has started");
}
}
...
}
public class ProtocolV1Encoder extends MessageToByteEncoder {
private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Encoder.class);
@Override
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
try {
if (msg instanceof RpcMessage) {
RpcMessage rpcMessage = (RpcMessage) msg;
//完整的消息长度
int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
//消息头的长度
int headLength = ProtocolConstants.V1_HEAD_LENGTH;
//获取消息类型
byte messageType = rpcMessage.getMessageType();
//先写入魔数MagicNumber,通过魔数代表一条消息的开始
out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
//然后写入版本号
out.writeByte(ProtocolConstants.VERSION);
//full Length(4B) and head length(2B) will fix in the end.
//接着标记写入index的位置:当前写入的字节数 + 6,就是标记的writerIndex
//可以理解为直接让writerIndex跳过了6字节,这6个字节的内容先空出来不写
//最后写完具体的消息后,再把这6个字节代表的消息长度和消息头长度补回来
//空出来的6个字节 = 4个字节的消息长度 + 2个字节的消息头长度
out.writerIndex(out.writerIndex() + 6);
//此时消息长度和消息头长度,还没统计出来,所以先跳过6个字节
//也就是从版本号之后的第6个字节开始写:消息类型、codec、compressor
out.writeByte(messageType);
out.writeByte(rpcMessage.getCodec());
out.writeByte(rpcMessage.getCompressor());
//接着写入4个字节的消息ID
out.writeInt(rpcMessage.getId());
//direct write head with zero-copy
//获取消息头
Map<String, String> headMap = rpcMessage.getHeadMap();
if (headMap != null && !headMap.isEmpty()) {
//对消息头进行编码,把Map转换为字节数据写入到out里面,此时才是在写消息头
//写完消息头之后,便可以获取到消息头长度headLength了
int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out);
headLength += headMapBytesLength;
fullLength += headMapBytesLength;
}
byte[] bodyBytes = null;
//根据消息类型对消息体进行序列化
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
//heartbeat has no body
//根据RpcMessage对象的codec属性通过SPI机制获取serializer序列化组件
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
//通过serializer对消息体进行序列化
bodyBytes = serializer.serialize(rpcMessage.getBody());
//根据RpcMessage对象的compressor属性通过SPI机制获取compressor压缩组件
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
//通过compressor对字节数组进行压缩
bodyBytes = compressor.compress(bodyBytes);
fullLength += bodyBytes.length;
}
if (bodyBytes != null) {
out.writeBytes(bodyBytes);
}
//fix fullLength and headLength
int writeIndex = out.writerIndex();
//skip magic code(2B) + version(1B)
out.writerIndex(writeIndex - fullLength + 3);
out.writeInt(fullLength);
out.writeShort(headLength);
out.writerIndex(writeIndex);
} else {
throw new UnsupportedOperationException("Not support this class:" + msg.getClass());
}
} catch (Throwable e) {
LOGGER.error("Encode request error!", e);
}
}
}
13.Server将字节数组解码成RpcMessage对象
Seata Server收到Seata Client发来的字节数组时,会先将字节数组交给NettyServerBootstrap的ChannelPipeline进行处理。其中,字节数组会被ProtocolV1Decoder解码成RpcMessage对象。
public class NettyServerBootstrap implements RemotingBootstrap {
...
@Override
public void start() {
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
.addLast(new ProtocolV1Decoder())
.addLast(new ProtocolV1Encoder());
if (channelHandlers != null) {
addChannelPipelineLast(ch, channelHandlers);
}
}
}
);
try {
this.serverBootstrap.bind(getListenPort()).sync();
XID.setPort(getListenPort());
LOGGER.info("Server started, service listen port: {}", getListenPort());
RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
initialized.set(true);
} catch (SocketException se) {
throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se);
} catch (Exception exx) {
throw new RuntimeException("Server start failed", exx);
}
}
...
}
public class ProtocolV1Decoder extends LengthFieldBasedFrameDecoder {
private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Decoder.class);
//为了解决粘包和拆包的问题,这里基于LengthFieldBasedFrameDecoder按照整帧来进行解码
public ProtocolV1Decoder() {
// default is 8M
this(ProtocolConstants.MAX_FRAME_LENGTH);
}
public ProtocolV1Decoder(int maxFrameLength) {
//最大的帧长度是8M,所以一个消息数据不能超过8M
//开头是2个字节的魔数、1个字节的版本号、然后第4个字节开始是4个字节的FullLength
super(maxFrameLength, 3, 4, -7, 0);
}
//每一个整帧解出来之后,就可以通过decode()方法,把字节数组转为RpcMessage对象
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
Object decoded;
try {
//调用decode()方法进行解帧
decoded = super.decode(ctx, in);
if (decoded instanceof ByteBuf) {
ByteBuf frame = (ByteBuf)decoded;
try {
return decodeFrame(frame);
} finally {
frame.release();
}
}
} catch (Exception exx) {
LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
throw new DecodeException(exx);
}
return decoded;
}
public Object decodeFrame(ByteBuf frame) {
//开头两个byte是魔数
byte b0 = frame.readByte();
byte b1 = frame.readByte();
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);
}
//获取到version版本号
byte version = frame.readByte();
int fullLength = frame.readInt();
short headLength = frame.readShort();
byte messageType = frame.readByte();
byte codecType = frame.readByte();
byte compressorType = frame.readByte();
int requestId = frame.readInt();
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(codecType);
rpcMessage.setId(requestId);
rpcMessage.setCompressor(compressorType);
rpcMessage.setMessageType(messageType);
//direct read head with zero-copy
int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;
if (headMapLength > 0) {
Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);
rpcMessage.getHeadMap().putAll(map);
}
//read body
if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {
rpcMessage.setBody(HeartbeatMessage.PING);
} else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
rpcMessage.setBody(HeartbeatMessage.PONG);
} else {
int bodyLength = fullLength - headLength;
if (bodyLength > 0) {
byte[] bs = new byte[bodyLength];
frame.readBytes(bs);
//先获取到压缩组件,对消息体字节数组进行解压缩
Compressor compressor = CompressorFactory.getCompressor(compressorType);
bs = compressor.decompress(bs);
//然后对解压缩完的数据,根据序列化类型进行反序列化,获取到消息体对象
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
rpcMessage.setBody(serializer.deserialize(bs));
}
}
return rpcMessage;
}
}
14.Server处理已解码的RpcMessage对象的流程
Seata Server将收到的网络请求字节数组解码成RpcMessage对象后,便会将RpcMessage对象交给NettyServerBootstrap的ServerHandler进行处理,也就是交给ServerHandler的channelRead()方法进行处理。
ServerHandler的channelRead()方法会调用AbstractNettyRemoting的processMessage()方法,也就是调用ServerOnRequestProcessor的process()方法来实现对RpcMessage对象的处理。
在ServerOnRequestProcessor的process()方法的处理过程中,会调用TransactionMessageHandler的onRequest()方法处理RpcMessage对象。
由于Server.start()初始化NettyRemotingServer时,设置了TransactionMessageHandler为DefaultCoordinator,所以最终就会调用DefaultCoordinator的onRequest()方法来处理RpcMessage对象。
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
...
@ChannelHandler.Sharable
class ServerHandler extends ChannelDuplexHandler {
//Channel read.
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
}
//接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理
processMessage(ctx, (RpcMessage) msg);
}
...
}
...
}
public abstract class AbstractNettyRemoting implements Disposable {
...
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
}
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
//根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
//processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
//所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
if (pair.getSecond() != null) {
try {
pair.getSecond().execute(() -> {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
long idx = System.currentTimeMillis();
try {
String jstackFile = idx + ".log";
LOGGER.info("jstack command will dump to " + jstackFile);
Runtime.getRuntime().exec(String.format("jstack %s > %s", pid, jstackFile));
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
...
}
public class NettyRemotingServer extends AbstractNettyRemotingServer {
...
private void registerProcessor() {
//1. registry on request message processor
ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler());
ShutdownHook.getInstance().addDisposable(onRequestProcessor);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
//2. registry on response message processor
ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
//3. registry rm message processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
//4. registry tm message processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
//5. registry heartbeat message processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
...
}
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
...
@Override
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(messageType, pair);
}
...
}
public abstract class AbstractNettyRemoting implements Disposable {
...
//This container holds all processors.
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
...
}
public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {
private final TransactionMessageHandler transactionMessageHandler;
...
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (ChannelManager.isRegistered(ctx.channel())) {
onRequestMessage(ctx, rpcMessage);
} else {
try {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
}
ctx.disconnect();
ctx.close();
} catch (Exception exx) {
LOGGER.error(exx.getMessage());
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
}
}
}
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
//RpcContext线程本地变量副本
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
} else {
try {
BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
} catch (InterruptedException e) {
LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
}
}
if (!(message instanceof AbstractMessage)) {
return;
}
//the batch send request message
if (message instanceof MergedWarpMessage) {
...
} else {
//the single send request message
final AbstractMessage msg = (AbstractMessage) message;
//最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
}
}
...
}
//Server端的全局事务处理逻辑组件
//其中包含了:开启很多后台线程、处理开启全局事务、处理提交全局事务、处理回滚全局事务、处理全局事务状态的上报、处理分支事务的注册、
//本地检查、超时检查、重试回滚、重试提交、异步提交、Undo Log的删除
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
...
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToTC)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
return transactionRequest.handle(context);
}
...
}
15.Seata Server开启全局事务的流程源码
注意:创建一个全局事务会话后,会通过slf4j的MDC把xid放入线程本地变量副本里。
//Server端的全局事务处理逻辑组件
//其中包含了:开启很多后台线程、处理开启全局事务、处理提交全局事务、处理回滚全局事务、处理全局事务状态的上报、处理分支事务的注册、
//本地检查、超时检查、重试回滚、重试提交、异步提交、Undo Log的删除
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
...
@Override
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
if (!(request instanceof AbstractTransactionRequestToTC)) {
throw new IllegalArgumentException();
}
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
return transactionRequest.handle(context);
}
...
}
public class GlobalBeginRequest extends AbstractTransactionRequestToTC {
...
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
return handler.handle(this, rpcContext);
}
...
}
//The type Abstract tc inbound handler.
public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {
...
@Override
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
GlobalBeginResponse response = new GlobalBeginResponse();
exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
@Override
public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
try {
//开启全局事务
doGlobalBegin(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e);
}
}
}, request, response);
return response;
}
//Do global begin.
protected abstract void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException;
...
}
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
private final DefaultCore core;
...
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {
//接下来才真正处理开启全局事务的业务逻辑
//其中会调用DefaultCore来真正开启一个全局事务,即拿到xid并设置到响应里去
response.setXid(core.begin(
rpcContext.getApplicationId(),//应用程序id
rpcContext.getTransactionServiceGroup(),//事务服务分组
request.getTransactionName(),//事务名称
request.getTimeout())//超时时间
);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}
...
}
public class DefaultCore implements Core {
...
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
//创建一个全局事务会话
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
//通过slf4j的MDC把xid放入线程本地变量副本里去
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
//添加一个全局事务会话的生命周期监听器
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//打开Session
session.begin();
//transaction start event,发布会话开启事件
MetricsPublisher.postSessionDoingEvent(session, false);
//返回全局事务会话的xid
return session.getXid();
}
...
}
public class GlobalSession implements SessionLifecycle, SessionStorable {
...
public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {
GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);
return session;
}
public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {
//全局事务id是通过UUIDGenerator来生成的
this.transactionId = UUIDGenerator.generateUUID();
this.status = GlobalStatus.Begin;
this.lazyLoadBranch = lazyLoadBranch;
if (!lazyLoadBranch) {
this.branchSessions = new ArrayList<>();
}
this.applicationId = applicationId;
this.transactionServiceGroup = transactionServiceGroup;
this.transactionName = transactionName;
this.timeout = timeout;
//根据UUIDGenerator生成的transactionId + XID工具生成最终的xid
this.xid = XID.generateXID(transactionId);
}
...
}
猜你喜欢
- 2025-06-12 从零开始搭建AI网站(6):如何使用响应式编程
- 2025-06-12 Windows下Ollama安装目录迁移到D盘
- 2025-06-12 RocketMQ的偏移量更新原理(rocketmq迁移)
- 2025-06-12 阿里p8手把手教学,如何写出简洁又规范的单元测试?
- 2025-06-12 Seata源码—7.Seata TCC模式的事务处理
- 2025-06-12 Seata源码—3.全局事务注解扫描器的初始化一
- 2025-06-12 Seata源码—6.Seata AT模式的数据源代理二
- 2025-06-12 Seata源码—6.Seata AT模式的数据源代理三
- 2025-06-12 写出更优雅的代码:搞懂 Python 协议与抽象基类的核心区别
- 2025-06-12 Seata源码—6.Seata AT模式的数据源代理一
你 发表评论:
欢迎- 06-13边缘计算网关如何实现系统高效运维?
- 06-13虚拟专用网络VPN连接配置
- 06-13WIN10/11下配置VPN(解决L2TP/IPsec无法连接的问题)
- 06-13公共网络上的私有安全通道——VPN
- 06-13远程办公如何访问公司内网办公系统和内部资源?
- 06-13VBRAS场景测试方法—如何高效验证网络设备的性能与稳定性
- 06-13Win10系统如何使用VPN远程办公
- 06-13iPhone轻松实现远程访问公司局域网电脑上的共享文件
- 最近发表
- 标签列表
-
- spire.doc (70)
- instanceclient (62)
- system.data.oracleclient (61)
- 按键小精灵源码提取 (66)
- pyqt5designer教程 (65)
- 联想刷bios工具 (66)
- c#源码 (64)
- graphics.h头文件 (62)
- mysqldump下载 (66)
- sqljdbc4.jar下载 (56)
- libmp3lame (60)
- maven3.3.9 (63)
- 二调符号库 (57)
- git.exe下载 (68)
- diskgenius_winpe (72)
- pythoncrc16 (57)
- solidworks宏文件下载 (59)
- qt帮助文档中文版 (73)
- satacontroller (66)
- hgcad (64)
- bootimg.exe (69)
- android-gif-drawable (62)
- axure9元件库免费下载 (57)
- libmysqlclient.so.18 (58)
- springbootdemo (64)
本文暂时没有评论,来添加一个吧(●'◡'●)