git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1440415 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-01-30 13:49:55 +00:00
parent 94f1f27c35
commit 30a9fe104d
3 changed files with 91 additions and 18 deletions

View File

@ -156,7 +156,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
public synchronized void destroy() throws Exception {
stop();
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
Message node = (Message) i.next();
MessageReference node = i.next();
node.decrementReferenceCount();
}
memoryList.clear();

View File

@ -100,30 +100,28 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
@Override
protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
boolean match = true;
if (mec.getDestination().isQueue()) {
if (contains(message.getBrokerPath(), networkBrokerId)) {
// potential replay back to origin
match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
if (mec.getDestination().isQueue() && contains(message.getBrokerPath(), networkBrokerId)) {
// potential replay back to origin
match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
if (match && LOG.isTraceEnabled()) {
LOG.trace("Replaying [" + message.getMessageId() + "] for [" + message.getDestination()
+ "] back to origin in the absence of a local consumer");
}
}
if (match && rateLimitExceeded()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount
+ ">" + rateLimit + "/" + rateDuration);
}
match = false;
if (match && LOG.isTraceEnabled()) {
LOG.trace("Replaying [" + message.getMessageId() + "] for [" + message.getDestination()
+ "] back to origin in the absence of a local consumer");
}
} else {
// use existing logic for topics
// use existing filter logic for topics and non replays
match = super.matchesForwardingFilter(message, mec);
}
if (match && rateLimitExceeded()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount
+ ">" + rateLimit + "/" + rateDuration);
}
match = false;
}
return match;
}

View File

@ -83,6 +83,7 @@ public class BrokerNetworkWithStuckMessagesTest {
private BrokerService localBroker;
private BrokerService remoteBroker;
private BrokerService secondRemoteBroker;
private DemandForwardingBridge bridge;
protected Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
@ -90,6 +91,7 @@ public class BrokerNetworkWithStuckMessagesTest {
protected TransportConnector connector;
protected TransportConnector remoteConnector;
protected TransportConnector secondRemoteConnector;
protected long idGenerator;
protected int msgIdGenerator;
@ -135,6 +137,22 @@ public class BrokerNetworkWithStuckMessagesTest {
bridge.setBrokerService(localBroker);
bridge.start();
// introduce a second broker/bridge on remote that should not get any messages because of networkTtl=1
// local <-> remote <-> secondRemote
createSecondRemoteBroker();
config = new NetworkBridgeConfiguration();
config.setBrokerName("remote");
config.setDuplex(true);
localTransport = createRemoteTransport();
remoteTransport = createSecondRemoteTransport();
// Create a network bridge between the two brokers
bridge = new DemandForwardingBridge(config, localTransport, remoteTransport);
bridge.setBrokerService(remoteBroker);
bridge.start();
waitForBridgeFormation();
}
@ -156,6 +174,7 @@ public class BrokerNetworkWithStuckMessagesTest {
bridge.stop();
localBroker.stop();
remoteBroker.stop();
secondRemoteBroker.stop();
}
@Test(timeout=120000)
@ -217,6 +236,24 @@ public class BrokerNetworkWithStuckMessagesTest {
messages = browseQueueWithJmx(localBroker);
assertEquals(0, messages.length);
// try and pull the messages from remote, should be denied b/c on networkTtl
LOG.info("creating demand on second remote...");
StubConnection connection3 = createSecondRemoteConnection();
ConnectionInfo connectionInfo3 = createConnectionInfo();
SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
connection3.send(connectionInfo3);
connection3.send(sessionInfo3);
ActiveMQDestination destinationInfo3 =
createDestinationInfo(connection3, connectionInfo3, ActiveMQDestination.QUEUE_TYPE);
final ConsumerInfo consumerInfoS3 = createConsumerInfo(sessionInfo3, destinationInfo3);
connection3.send(consumerInfoS3);
Message messageExceedingTtl = receiveMessage(connection3, 5000);
if (messageExceedingTtl != null) {
LOG.error("got message on Second remote: " + messageExceedingTtl);
connection3.send(createAck(consumerInfoS3, messageExceedingTtl, 1, MessageAck.INDIVIDUAL_ACK_TYPE));
}
LOG.info("Closing consumer on remote");
// Close the consumer on the remote broker
connection2.send(consumerInfo2.createRemoveCommand());
@ -297,6 +334,7 @@ public class BrokerNetworkWithStuckMessagesTest {
connection1.stop();
connection2.stop();
connection3.stop();
}
protected BrokerService createBroker() throws Exception {
@ -348,6 +386,23 @@ public class BrokerNetworkWithStuckMessagesTest {
return remoteBroker;
}
protected BrokerService createSecondRemoteBroker() throws Exception {
secondRemoteBroker = new BrokerService();
secondRemoteBroker.setBrokerName("secondRemotehost");
secondRemoteBroker.setUseJmx(false);
secondRemoteBroker.setPersistenceAdapter(null);
secondRemoteBroker.setPersistent(false);
secondRemoteConnector = createSecondRemoteConnector();
secondRemoteBroker.addConnector(secondRemoteConnector);
configureBroker(secondRemoteBroker);
secondRemoteBroker.start();
secondRemoteBroker.waitUntilStarted();
brokers.put(secondRemoteBroker.getBrokerName(), secondRemoteBroker);
return secondRemoteBroker;
}
protected Transport createTransport() throws Exception {
Transport transport = TransportFactory.connect(connector.getServer().getConnectURI());
return transport;
@ -358,6 +413,11 @@ public class BrokerNetworkWithStuckMessagesTest {
return transport;
}
protected Transport createSecondRemoteTransport() throws Exception {
Transport transport = TransportFactory.connect(secondRemoteConnector.getServer().getConnectURI());
return transport;
}
protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException {
return new TransportConnector(TransportFactory.bind(new URI(getLocalURI())));
}
@ -366,10 +426,18 @@ public class BrokerNetworkWithStuckMessagesTest {
return new TransportConnector(TransportFactory.bind(new URI(getRemoteURI())));
}
protected TransportConnector createSecondRemoteConnector() throws Exception, IOException, URISyntaxException {
return new TransportConnector(TransportFactory.bind(new URI(getSecondRemoteURI())));
}
protected String getRemoteURI() {
return "vm://remotehost";
}
protected String getSecondRemoteURI() {
return "vm://secondRemotehost";
}
protected String getLocalURI() {
return "vm://localhost";
}
@ -388,6 +456,13 @@ public class BrokerNetworkWithStuckMessagesTest {
return connection;
}
protected StubConnection createSecondRemoteConnection() throws Exception {
Transport transport = TransportFactory.connect(secondRemoteConnector.getServer().getConnectURI());
StubConnection connection = new StubConnection(transport);
connections.add(connection);
return connection;
}
@SuppressWarnings({ "unchecked", "unused" })
private Object[] browseQueueWithJms(BrokerService broker) throws Exception {
Object[] messages = null;