https://issues.apache.org/jira/browse/AMQ-3716: NetworkBridge with conduitSubscriptions=true will leak consumer info in org.apache.activemq.network.DemandForwardingBridgeSupport#subscriptionMapByRemoteId map. fix with test

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1244116 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-02-14 16:37:29 +00:00
parent 456005bf13
commit 2a87ad5b3e
3 changed files with 36 additions and 7 deletions

View File

@ -611,6 +611,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
}
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
// continue removal in separate thread to free up this thread for outstanding responses
asyncTaskRunner.execute(new Runnable() {

View File

@ -110,6 +110,13 @@ public abstract class JmsSendReceiveTestSupport extends org.apache.activemq.Test
Thread.sleep(1000);
messages.clear();
sendMessages();
assertMessagesAreReceived();
LOG.info("" + data.length + " messages(s) received, closing down connections");
}
protected void sendMessages() throws Exception {
for (int i = 0; i < data.length; i++) {
Message message = createMessage(i);
configureMessage(message);
@ -118,11 +125,8 @@ public abstract class JmsSendReceiveTestSupport extends org.apache.activemq.Test
}
sendMessage(i, message);
}
assertMessagesAreReceived();
LOG.info("" + data.length + " messages(s) received, closing down connections");
}
protected void sendMessage(int index, Message message) throws Exception {
producer.send(producerDestination, message);
}

View File

@ -16,20 +16,44 @@
*/
package org.apache.activemq.usecases;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.DemandForwardingBridgeSupport;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class TwoBrokerQueueSendReceiveTest extends TwoBrokerTopicSendReceiveTest {
protected ActiveMQConnectionFactory sendFactory;
protected ActiveMQConnectionFactory receiveFactory;
private static final Logger LOG = LoggerFactory.getLogger(TwoBrokerQueueSendReceiveTest.class);
protected void setUp() throws Exception {
topic = false;
super.setUp();
}
public void testReceiveOnXConsumersNoLeak() throws Exception {
consumer.close();
sendMessages();
for (int i=0; i<data.length; i++) {
consumer = createConsumer();
onMessage(consumer.receive(10000));
consumer.close();
}
this.assertMessagesAreReceived();
BrokerService broker = (BrokerService) brokers.get("receiver");
final DemandForwardingBridgeSupport bridge = (DemandForwardingBridgeSupport) broker.getNetworkConnectors().get(0).activeBridges().toArray()[0];
assertTrue("No extra, size:" + bridge.getLocalSubscriptionMap().size(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("local subs map size = " + bridge.getLocalSubscriptionMap().size());
return 1 == bridge.getLocalSubscriptionMap().size();
}
}));
}
}