Further refine this fix to address some test failures. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1182049 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-10-11 19:39:45 +00:00
parent 39ca3de50e
commit 3b381e7c15
3 changed files with 26 additions and 18 deletions

View File

@ -166,7 +166,16 @@ public class ProtocolConverter {
command.setResponseRequired(true); command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
} }
stompTransport.sendToActiveMQ(command); stompTransport.asyncSendToActiveMQ(command);
}
protected void asyncSendToActiveMQ(Command command, ResponseHandler handler) {
command.setCommandId(generateCommandId());
if (handler != null) {
command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
}
stompTransport.asyncSendToActiveMQ(command);
} }
protected void sendToStomp(StompFrame command) throws IOException { protected void sendToStomp(StompFrame command) throws IOException {
@ -292,7 +301,7 @@ public class ProtocolConverter {
} }
message.onSend(); message.onSend();
sendToActiveMQ(message, createResponseHandler(command)); asyncSendToActiveMQ(message, createResponseHandler(command));
} }
protected void onStompNack(StompFrame command) throws ProtocolException { protected void onStompNack(StompFrame command) throws ProtocolException {
@ -329,7 +338,7 @@ public class ProtocolConverter {
if (sub != null) { if (sub != null) {
MessageAck ack = sub.onStompMessageNack(messageId, activemqTx); MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
if (ack != null) { if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command)); asyncSendToActiveMQ(ack, createResponseHandler(command));
} else { } else {
throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]"); throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
} }
@ -368,7 +377,7 @@ public class ProtocolConverter {
if (sub != null) { if (sub != null) {
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) { if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command)); asyncSendToActiveMQ(ack, createResponseHandler(command));
acked = true; acked = true;
} }
} }
@ -382,7 +391,7 @@ public class ProtocolConverter {
for (StompSubscription sub : subscriptionsByConsumerId.values()) { for (StompSubscription sub : subscriptionsByConsumerId.values()) {
MessageAck ack = sub.onStompMessageAck(messageId, activemqTx); MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
if (ack != null) { if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command)); asyncSendToActiveMQ(ack, createResponseHandler(command));
acked = true; acked = true;
break; break;
} }
@ -417,7 +426,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx); tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.BEGIN); tx.setType(TransactionInfo.BEGIN);
sendToActiveMQ(tx, createResponseHandler(command)); asyncSendToActiveMQ(tx, createResponseHandler(command));
} }
protected void onStompCommit(StompFrame command) throws ProtocolException { protected void onStompCommit(StompFrame command) throws ProtocolException {
@ -444,7 +453,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx); tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.COMMIT_ONE_PHASE); tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
sendToActiveMQ(tx, createResponseHandler(command)); asyncSendToActiveMQ(tx, createResponseHandler(command));
} }
protected void onStompAbort(StompFrame command) throws ProtocolException { protected void onStompAbort(StompFrame command) throws ProtocolException {
@ -473,7 +482,7 @@ public class ProtocolConverter {
tx.setTransactionId(activemqTx); tx.setTransactionId(activemqTx);
tx.setType(TransactionInfo.ROLLBACK); tx.setType(TransactionInfo.ROLLBACK);
sendToActiveMQ(tx, createResponseHandler(command)); asyncSendToActiveMQ(tx, createResponseHandler(command));
} }
protected void onStompSubscribe(StompFrame command) throws ProtocolException { protected void onStompSubscribe(StompFrame command) throws ProtocolException {
@ -541,7 +550,7 @@ public class ProtocolConverter {
// dispatch can beat the receipt so send it early // dispatch can beat the receipt so send it early
sendReceipt(command); sendReceipt(command);
sendToActiveMQ(consumerInfo, null); asyncSendToActiveMQ(consumerInfo, null);
} }
protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
@ -570,7 +579,7 @@ public class ProtocolConverter {
info.setClientId(durable); info.setClientId(durable);
info.setSubscriptionName(durable); info.setSubscriptionName(durable);
info.setConnectionId(connectionId); info.setConnectionId(connectionId);
sendToActiveMQ(info, createResponseHandler(command)); asyncSendToActiveMQ(info, createResponseHandler(command));
return; return;
} }
@ -578,7 +587,7 @@ public class ProtocolConverter {
StompSubscription sub = this.subscriptions.remove(subscriptionId); StompSubscription sub = this.subscriptions.remove(subscriptionId);
if (sub != null) { if (sub != null) {
sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
return; return;
} }
@ -589,7 +598,7 @@ public class ProtocolConverter {
for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) { for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
StompSubscription sub = iter.next(); StompSubscription sub = iter.next();
if (destination != null && destination.equals(sub.getDestination())) { if (destination != null && destination.equals(sub.getDestination())) {
sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command)); asyncSendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
iter.remove(); iter.remove();
return; return;
} }
@ -712,8 +721,8 @@ public class ProtocolConverter {
protected void onStompDisconnect(StompFrame command) throws ProtocolException { protected void onStompDisconnect(StompFrame command) throws ProtocolException {
checkConnected(); checkConnected();
sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command)); asyncSendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command)); asyncSendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
connected.set(false); connected.set(false);
} }
@ -778,7 +787,7 @@ public class ProtocolConverter {
ActiveMQDestination rc = tempDestinations.get(name); ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) { if( rc == null ) {
rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc); tempDestinations.put(name, rc);
} }
return rc; return rc;
@ -788,7 +797,7 @@ public class ProtocolConverter {
ActiveMQDestination rc = tempDestinations.get(name); ActiveMQDestination rc = tempDestinations.get(name);
if( rc == null ) { if( rc == null ) {
rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); asyncSendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null);
tempDestinations.put(name, rc); tempDestinations.put(name, rc);
tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
} }

View File

@ -115,7 +115,7 @@ public class StompSubscription {
if (!unconsumedMessage.isEmpty()) { if (!unconsumedMessage.isEmpty()) {
MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size()); MessageAck ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
protocolConverter.getStompTransport().sendToActiveMQ(ack); protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
unconsumedMessage.clear(); unconsumedMessage.clear();
} }
} }

View File

@ -1588,7 +1588,6 @@ public class StompTest extends CombinationTestSupport {
stompConnection.connect("system", "manager"); stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>(); HashMap<String, String> headers = new HashMap<String, String>();
long timestamp = System.currentTimeMillis();
headers.put(Stomp.Headers.Send.REPLY_TO, "JustAString"); headers.put(Stomp.Headers.Send.REPLY_TO, "JustAString");
headers.put(Stomp.Headers.Send.PERSISTENT, "true"); headers.put(Stomp.Headers.Send.PERSISTENT, "true");