From 3b381e7c15e62d0162921f0bc26314f853ea68c1 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Tue, 11 Oct 2011 19:39:45 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3481 Further refine this fix to address some test failures. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1182049 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/stomp/ProtocolConverter.java | 41 +++++++++++-------- .../transport/stomp/StompSubscription.java | 2 +- .../activemq/transport/stomp/StompTest.java | 1 - 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 4b6255a9b1..3d2869518f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -166,7 +166,16 @@ public class ProtocolConverter { command.setResponseRequired(true); resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); } - stompTransport.sendToActiveMQ(command); + stompTransport.asyncSendToActiveMQ(command); + } + + protected void asyncSendToActiveMQ(Command command, ResponseHandler handler) { + command.setCommandId(generateCommandId()); + if (handler != null) { + command.setResponseRequired(true); + resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); + } + stompTransport.asyncSendToActiveMQ(command); } protected void sendToStomp(StompFrame command) throws IOException { @@ -292,7 +301,7 @@ public class ProtocolConverter { } message.onSend(); - sendToActiveMQ(message, createResponseHandler(command)); + asyncSendToActiveMQ(message, createResponseHandler(command)); } protected void onStompNack(StompFrame command) throws ProtocolException { @@ -329,7 +338,7 @@ public class ProtocolConverter { if (sub != null) { MessageAck ack = sub.onStompMessageNack(messageId, activemqTx); if (ack != null) { - sendToActiveMQ(ack, createResponseHandler(command)); + asyncSendToActiveMQ(ack, createResponseHandler(command)); } else { throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]"); } @@ -368,7 +377,7 @@ public class ProtocolConverter { if (sub != null) { MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); if (ack != null) { - sendToActiveMQ(ack, createResponseHandler(command)); + asyncSendToActiveMQ(ack, createResponseHandler(command)); acked = true; } } @@ -382,7 +391,7 @@ public class ProtocolConverter { for (StompSubscription sub : subscriptionsByConsumerId.values()) { MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); if (ack != null) { - sendToActiveMQ(ack, createResponseHandler(command)); + asyncSendToActiveMQ(ack, createResponseHandler(command)); acked = true; break; } @@ -417,7 +426,7 @@ public class ProtocolConverter { tx.setTransactionId(activemqTx); tx.setType(TransactionInfo.BEGIN); - sendToActiveMQ(tx, createResponseHandler(command)); + asyncSendToActiveMQ(tx, createResponseHandler(command)); } protected void onStompCommit(StompFrame command) throws ProtocolException { @@ -444,7 +453,7 @@ public class ProtocolConverter { tx.setTransactionId(activemqTx); tx.setType(TransactionInfo.COMMIT_ONE_PHASE); - sendToActiveMQ(tx, createResponseHandler(command)); + asyncSendToActiveMQ(tx, createResponseHandler(command)); } protected void onStompAbort(StompFrame command) throws ProtocolException { @@ -473,7 +482,7 @@ public class ProtocolConverter { tx.setTransactionId(activemqTx); tx.setType(TransactionInfo.ROLLBACK); - sendToActiveMQ(tx, createResponseHandler(command)); + asyncSendToActiveMQ(tx, createResponseHandler(command)); } protected void onStompSubscribe(StompFrame command) throws ProtocolException { @@ -541,7 +550,7 @@ public class ProtocolConverter { // dispatch can beat the receipt so send it early sendReceipt(command); - sendToActiveMQ(consumerInfo, null); + asyncSendToActiveMQ(consumerInfo, null); } protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { @@ -570,7 +579,7 @@ public class ProtocolConverter { info.setClientId(durable); info.setSubscriptionName(durable); info.setConnectionId(connectionId); - sendToActiveMQ(info, createResponseHandler(command)); + asyncSendToActiveMQ(info, createResponseHandler(command)); return; } @@ -578,7 +587,7 @@ public class ProtocolConverter { StompSubscription sub = this.subscriptions.remove(subscriptionId); if (sub != null) { - sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); + asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); return; } @@ -589,7 +598,7 @@ public class ProtocolConverter { for (Iterator iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { StompSubscription sub = iter.next(); if (destination != null && destination.equals(sub.getDestination())) { - sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); + asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); iter.remove(); return; } @@ -712,8 +721,8 @@ public class ProtocolConverter { protected void onStompDisconnect(StompFrame command) throws ProtocolException { checkConnected(); - sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); - sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); + asyncSendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); + asyncSendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); connected.set(false); } @@ -778,7 +787,7 @@ public class ProtocolConverter { ActiveMQDestination rc = tempDestinations.get(name); if( rc == null ) { rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); - sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); + asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); tempDestinations.put(name, rc); } return rc; @@ -788,7 +797,7 @@ public class ProtocolConverter { ActiveMQDestination rc = tempDestinations.get(name); if( rc == null ) { rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); - sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); + asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); tempDestinations.put(name, rc); tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 3c0adada22..d2a7905767 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -115,7 +115,7 @@ public class StompSubscription { if (!unconsumedMessage.isEmpty()) { MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size()); - protocolConverter.getStompTransport().sendToActiveMQ(ack); + protocolConverter.getStompTransport().asyncSendToActiveMQ(ack); unconsumedMessage.clear(); } } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index de2779a779..7bd963d242 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -1588,7 +1588,6 @@ public class StompTest extends CombinationTestSupport { stompConnection.connect("system", "manager"); HashMap headers = new HashMap(); - long timestamp = System.currentTimeMillis(); headers.put(Stomp.Headers.Send.REPLY_TO, "JustAString"); headers.put(Stomp.Headers.Send.PERSISTENT, "true");