https://issues.apache.org/jira/browse/AMQ-3331 - fix regression in BrokerNetworkWithStuckMessagesTest - vm connector exposed some turnips in there w.r.t the response correlator

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1442613 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-02-05 14:48:31 +00:00
parent a57108cfa6
commit 88d85ae552
3 changed files with 26 additions and 2 deletions

View File

@ -1327,7 +1327,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
setDuplexNetworkConnectorId(duplexNetworkConnectorId); setDuplexNetworkConnectorId(duplexNetworkConnectorId);
} }
Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker); Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker);
Transport remoteBridgeTransport = new ResponseCorrelator(transport); Transport remoteBridgeTransport = transport;
if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
// the vm transport case is already wrapped
remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport);
}
String duplexName = localTransport.toString(); String duplexName = localTransport.toString();
if (duplexName.contains("#")) { if (duplexName.contains("#")) {
duplexName = duplexName.substring(duplexName.lastIndexOf("#")); duplexName = duplexName.substring(duplexName.lastIndexOf("#"));

View File

@ -603,7 +603,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else { } else {
if (isDuplex()) { if (isDuplex()) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " duplex command type: " + command.getCommandId()); LOG.trace(configuration.getBrokerName() + " duplex command type: " + command.getDataStructureType());
} }
if (command.isMessage()) { if (command.isMessage()) {
final ActiveMQMessage message = (ActiveMQMessage) command; final ActiveMQMessage message = (ActiveMQMessage) command;
@ -976,6 +976,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
+ message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
} }
if (isDuplex() && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
try {
// never request b/c they are eventually acked async
remoteBroker.oneway(message);
} finally {
sub.decrementOutstandingResponses();
}
return;
}
if (message.isPersistent() || configuration.isAlwaysSyncSend()) { if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
// The message was not sent using async send, so we should only // The message was not sent using async send, so we should only

View File

@ -19,6 +19,7 @@ package org.apache.activemq.network;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -220,6 +221,7 @@ public class BrokerNetworkWithStuckMessagesTest {
for (int i = 0; i < receiveNumMessages; ++i) { for (int i = 0; i < receiveNumMessages; ++i) {
Message message1 = receiveMessage(connection2, 20000); Message message1 = receiveMessage(connection2, 20000);
assertNotNull(message1); assertNotNull(message1);
LOG.info("on remote, got: " + message1.getMessageId());
connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE)); connection2.send(createAck(consumerInfo2, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
} }
@ -261,6 +263,13 @@ public class BrokerNetworkWithStuckMessagesTest {
connection2.send(connectionInfo2.createRemoveCommand()); connection2.send(connectionInfo2.createRemoveCommand());
// There should now be 5 messages stuck on the remote broker // There should now be 5 messages stuck on the remote broker
assertTrue("correct stuck message count", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
Object[] result = browseQueueWithJmx(remoteBroker);
return 5 == result.length;
}
}));
messages = browseQueueWithJmx(remoteBroker); messages = browseQueueWithJmx(remoteBroker);
assertEquals(5, messages.length); assertEquals(5, messages.length);
@ -303,6 +312,7 @@ public class BrokerNetworkWithStuckMessagesTest {
int counter = 1; int counter = 1;
for (; counter < receiveNumMessages; counter++) { for (; counter < receiveNumMessages; counter++) {
message1 = receiveMessage(connection1); message1 = receiveMessage(connection1);
LOG.info("local consume of: " + (message1 != null ? message1.getMessageId() : " null"));
connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE)); connection1.send(createAck(consumerInfo1, message1, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
} }
// Ensure that 5 messages were received // Ensure that 5 messages were received