ARTEMIS-463 Using OperationContext for async support

This commit is contained in:
Clebert Suconic 2016-04-01 17:24:41 -04:00 committed by jbertram
parent 3aedf27386
commit 3560415bcb
1 changed files with 52 additions and 15 deletions

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
@ -224,15 +225,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
super.bufferReceived(connectionID, buffer); super.bufferReceived(connectionID, buffer);
try { try {
// TODO-NOW: set OperationContext
Command command = (Command) wireFormat.unmarshal(buffer); Command command = (Command) wireFormat.unmarshal(buffer);
boolean responseRequired = command.isResponseRequired(); boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId(); int commandId = command.getCommandId();
// TODO-NOW: the server should send packets to the client based on the requested times // TODO: the server should send packets to the client based on the requested times
// need to look at what Andy did on AMQP
// the connection handles pings, negotiations directly. // the connection handles pings, negotiations directly.
// and delegate all other commands to manager. // and delegate all other commands to manager.
@ -285,7 +283,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
} }
// TODO-NOW: response through operation-context // TODO: response through operation-context
if (response != null && !protocolManager.isStopping()) { if (response != null && !protocolManager.isStopping()) {
response.setCorrelationId(commandId); response.setCorrelationId(commandId);
@ -1076,6 +1074,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processBeginTransaction(TransactionInfo info) throws Exception { public Response processBeginTransaction(TransactionInfo info) throws Exception {
final TransactionId txID = info.getTransactionId(); final TransactionId txID = info.getTransactionId();
setOperationContext(null);
try { try {
internalSession.resetTX(null); internalSession.resetTX(null);
if (txID.isXATransaction()) { if (txID.isXATransaction()) {
@ -1095,6 +1094,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
finally { finally {
internalSession.resetTX(null); internalSession.resetTX(null);
clearOpeartionContext();
} }
return null; return null;
} }
@ -1111,7 +1111,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
AMQSession session = (AMQSession) tx.getProtocolData(); AMQSession session = (AMQSession) tx.getProtocolData();
tx.commit(onePhase); setOperationContext(session);
try {
tx.commit(onePhase);
}
finally {
clearOpeartionContext();
}
return null; return null;
} }
@ -1125,18 +1131,24 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processForgetTransaction(TransactionInfo info) throws Exception { public Response processForgetTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId(); TransactionId txID = info.getTransactionId();
if (txID.isXATransaction()) { setOperationContext(null);
try { try {
Xid xid = OpenWireUtil.toXID(info.getTransactionId()); if (txID.isXATransaction()) {
internalSession.xaForget(xid); try {
Xid xid = OpenWireUtil.toXID(info.getTransactionId());
internalSession.xaForget(xid);
}
catch (Exception e) {
e.printStackTrace();
throw e;
}
} }
catch (Exception e) { else {
e.printStackTrace(); txMap.remove(txID);
throw e;
} }
} }
else { finally {
txMap.remove(txID); clearOpeartionContext();
} }
return null; return null;
@ -1146,6 +1158,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processPrepareTransaction(TransactionInfo info) throws Exception { public Response processPrepareTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId(); TransactionId txID = info.getTransactionId();
setOperationContext(null);
try { try {
if (txID.isXATransaction()) { if (txID.isXATransaction()) {
try { try {
@ -1164,6 +1177,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
finally { finally {
internalSession.resetTX(null); internalSession.resetTX(null);
clearOpeartionContext();
} }
return new IntegerResponse(XAResource.XA_RDONLY); return new IntegerResponse(XAResource.XA_RDONLY);
@ -1173,6 +1187,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processEndTransaction(TransactionInfo info) throws Exception { public Response processEndTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId(); TransactionId txID = info.getTransactionId();
setOperationContext(null);
if (txID.isXATransaction()) { if (txID.isXATransaction()) {
try { try {
Transaction tx = lookupTX(txID, null); Transaction tx = lookupTX(txID, null);
@ -1192,6 +1207,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
else { else {
txMap.remove(info); txMap.remove(info);
clearOpeartionContext();
} }
return null; return null;
@ -1255,14 +1271,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
Transaction tx = lookupTX(messageSend.getTransactionId(), session); Transaction tx = lookupTX(messageSend.getTransactionId(), session);
setOperationContext(session);
session.getCoreSession().resetTX(tx); session.getCoreSession().resetTX(tx);
try { try {
session.send(producerInfo, messageSend, sendProducerAck); session.send(producerInfo, messageSend, sendProducerAck);
} }
finally { finally {
session.getCoreSession().resetTX(null); session.getCoreSession().resetTX(null);
clearOpeartionContext();
} }
return null; return null;
} }
@ -1270,6 +1289,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processMessageAck(MessageAck ack) throws Exception { public Response processMessageAck(MessageAck ack) throws Exception {
AMQSession session = getSession(ack.getConsumerId().getParentId()); AMQSession session = getSession(ack.getConsumerId().getParentId());
Transaction tx = lookupTX(ack.getTransactionId(), session); Transaction tx = lookupTX(ack.getTransactionId(), session);
setOperationContext(session);
session.getCoreSession().resetTX(tx); session.getCoreSession().resetTX(tx);
try { try {
@ -1278,6 +1298,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
finally { finally {
session.getCoreSession().resetTX(null); session.getCoreSession().resetTX(null);
clearOpeartionContext();
} }
return null; return null;
} }
@ -1354,6 +1375,22 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
private void setOperationContext(AMQSession session) {
OperationContext ctx;
if (session == null) {
ctx = this.internalSession.getSessionContext();
}
else {
ctx = session.getCoreSession().getSessionContext();
}
server.getStorageManager().setContext(ctx);
}
private void clearOpeartionContext() {
server.getStorageManager().clearContext();
}
private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException { private Transaction lookupTX(TransactionId txID, AMQSession session) throws IllegalStateException {
if (txID == null) { if (txID == null) {
return null; return null;