本文共 7757 字,大约阅读时间需要 25 分钟。
这篇文章是用来讲解清楚TC(Transaction Coordinator:事务协调器)在处理TM发送过来的begin操作(事务开启操作)。
核心逻辑包括GlobalSession对象的生成、GlobalSession的持久化以及XID生成。
public class DefaultCore implements Core { @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { // 创建全局GlobalSession对象 GlobalSession session = GlobalSession.createGlobalSession( applicationId, transactionServiceGroup, name, timeout); // 全局GlobalSession对象添加生命周期监听器SessionHolder.getRootSessionManager() session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); // 启动全局Session对象GlobalSession session.begin(); // 返回新生成的XID返回 return XID.generateXID(session.getTransactionId()); }}
说明:
public class GlobalSession implements SessionLifecycle, SessionStorable { // 生命周期监听器的容器 private ArrayListlifecycleListeners = new ArrayList<>(); public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) { GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout); return session; } public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) { // 生成transactionId对象。 this.transactionId = UUIDGenerator.generateUUID(); this.status = GlobalStatus.Begin; this.applicationId = applicationId; this.transactionServiceGroup = transactionServiceGroup; this.transactionName = transactionName; this.timeout = timeout; } // 添加生命周期监听器 public void addSessionLifecycleListener( SessionLifecycleListener sessionLifecycleListener) { lifecycleListeners.add(sessionLifecycleListener); } public void begin() throws TransactionException { this.status = GlobalStatus.Begin; this.beginTime = System.currentTimeMillis(); this.active = true; for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onBegin(this); } }}// 生成TransactionId的类和方法public class UUIDGenerator { private static AtomicLong UUID = new AtomicLong(1000); private static int UUID_INTERNAL = 200000000; public static long generateUUID() { long id = UUID.incrementAndGet(); if (id > 2000000000) { synchronized (UUID) { if (UUID.get() >= id) { id -= 2000000000; UUID.set(id); } } } return id; }}
说明:
public class SessionHolder { private static final String ROOT_SESSION_MANAGER_NAME = "root.data"; private static final String ASYNC_COMMITTING_SESSION_MANAGER_NAME = "async.commit.data"; private static final String RETRY_COMMITTING_SESSION_MANAGER_NAME = "retry.commit.data"; private static final String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data"; private static SessionManager ROOT_SESSION_MANAGER; private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER; private static SessionManager RETRY_COMMITTING_SESSION_MANAGER; private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER; public static void init(String sessionStorePath) throws IOException { if (sessionStorePath == null) { ROOT_SESSION_MANAGER = new DefaultSessionManager(ROOT_SESSION_MANAGER_NAME); ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME); RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME); RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME); } else { if (!sessionStorePath.endsWith("/")) { sessionStorePath = sessionStorePath + "/"; } ROOT_SESSION_MANAGER = new FileBasedSessionManager(ROOT_SESSION_MANAGER_NAME, sessionStorePath); ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME); RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME); RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME); } } public static final SessionManager getRootSessionManager() { if (ROOT_SESSION_MANAGER == null) { throw new ShouldNeverHappenException("SessionManager is NOT init!"); } return ROOT_SESSION_MANAGER; }}
说明:
public class DefaultSessionManager extends AbstractSessionManager { public DefaultSessionManager(String name) { super(name); transactionStoreManager = new TransactionStoreManager() { @Override public boolean writeSession(LogOperation logOperation, SessionStorable session) { return false; } @Override public void shutdown() { } @Override public ListreadWriteStoreFromFile(int readSize, boolean isHistory) { return null; } @Override public boolean hasRemaining(boolean isHistory) { return false; } }; }}public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener { protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSessionManager.class); protected Map sessionMap = new ConcurrentHashMap<>(); protected TransactionStoreManager transactionStoreManager; protected String name; public AbstractSessionManager(String name) { this.name = name; } @Override public void addGlobalSession(GlobalSession session) throws TransactionException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD); } transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session); sessionMap.put(session.getTransactionId(), session); } @Override public void onBegin(GlobalSession globalSession) throws TransactionException { addGlobalSession(globalSession); }}
说明:
转载地址:http://trgix.baihongyu.com/