From 3560415bcbcebfa853a941c4ac9fcf707ed2beaf Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 1 Apr 2016 17:24:41 -0400 Subject: [PATCH] ARTEMIS-463 Using OperationContext for async support --- .../protocol/openwire/OpenWireConnection.java | 67 ++++++++++++++----- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index f9e8838b79..3ccb98d3b4 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.QueueBinding; @@ -224,15 +225,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se super.bufferReceived(connectionID, buffer); try { - // TODO-NOW: set OperationContext - Command command = (Command) wireFormat.unmarshal(buffer); boolean responseRequired = command.isResponseRequired(); int commandId = command.getCommandId(); - // TODO-NOW: the server should send packets to the client based on the requested times - // need to look at what Andy did on AMQP + // TODO: the server should send packets to the client based on the requested times // the connection handles pings, negotiations directly. // and delegate all other commands to manager. @@ -285,7 +283,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } - // TODO-NOW: response through operation-context + // TODO: response through operation-context if (response != null && !protocolManager.isStopping()) { response.setCorrelationId(commandId); @@ -1076,6 +1074,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processBeginTransaction(TransactionInfo info) throws Exception { final TransactionId txID = info.getTransactionId(); + setOperationContext(null); try { internalSession.resetTX(null); if (txID.isXATransaction()) { @@ -1095,6 +1094,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } finally { internalSession.resetTX(null); + clearOpeartionContext(); } return null; } @@ -1111,7 +1111,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se AMQSession session = (AMQSession) tx.getProtocolData(); - tx.commit(onePhase); + setOperationContext(session); + try { + tx.commit(onePhase); + } + finally { + clearOpeartionContext(); + } return null; } @@ -1125,18 +1131,24 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processForgetTransaction(TransactionInfo info) throws Exception { TransactionId txID = info.getTransactionId(); - if (txID.isXATransaction()) { - try { - Xid xid = OpenWireUtil.toXID(info.getTransactionId()); - internalSession.xaForget(xid); + setOperationContext(null); + try { + if (txID.isXATransaction()) { + try { + Xid xid = OpenWireUtil.toXID(info.getTransactionId()); + internalSession.xaForget(xid); + } + catch (Exception e) { + e.printStackTrace(); + throw e; + } } - catch (Exception e) { - e.printStackTrace(); - throw e; + else { + txMap.remove(txID); } } - else { - txMap.remove(txID); + finally { + clearOpeartionContext(); } return null; @@ -1146,6 +1158,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processPrepareTransaction(TransactionInfo info) throws Exception { TransactionId txID = info.getTransactionId(); + setOperationContext(null); try { if (txID.isXATransaction()) { try { @@ -1164,6 +1177,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } finally { internalSession.resetTX(null); + clearOpeartionContext(); } return new IntegerResponse(XAResource.XA_RDONLY); @@ -1173,6 +1187,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processEndTransaction(TransactionInfo info) throws Exception { TransactionId txID = info.getTransactionId(); + setOperationContext(null); if (txID.isXATransaction()) { try { Transaction tx = lookupTX(txID, null); @@ -1192,6 +1207,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } else { txMap.remove(info); + clearOpeartionContext(); } return null; @@ -1255,14 +1271,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se Transaction tx = lookupTX(messageSend.getTransactionId(), session); + setOperationContext(session); session.getCoreSession().resetTX(tx); try { session.send(producerInfo, messageSend, sendProducerAck); } finally { session.getCoreSession().resetTX(null); + clearOpeartionContext(); } + return null; } @@ -1270,6 +1289,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public Response processMessageAck(MessageAck ack) throws Exception { AMQSession session = getSession(ack.getConsumerId().getParentId()); Transaction tx = lookupTX(ack.getTransactionId(), session); + setOperationContext(session); session.getCoreSession().resetTX(tx); try { @@ -1278,6 +1298,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } finally { session.getCoreSession().resetTX(null); + clearOpeartionContext(); } return null; } @@ -1354,6 +1375,22 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } + private void setOperationContext(AMQSession session) { + OperationContext ctx; + if (session == null) { + ctx = this.internalSession.getSessionContext(); + } + else { + ctx = session.getCoreSession().getSessionContext(); + } + server.getStorageManager().setContext(ctx); + } + + + private void clearOpeartionContext() { + server.getStorageManager().clearContext(); + } + private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException { if (txID == null) { return null;