https://issues.apache.org/jira/browse/AMQ-3576 - implementation for network connectors, the store needs to be queried b/c messages from multiple producers are multiplexed by a network connector bridge. Additional boolean auditNetworkProducers on TransportConnector can disable the check if duplicates can be dealt with by the application layer. Additional test included.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1197203 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-11-03 16:23:29 +00:00
parent 2711ad1f3f
commit 0018f4a3ad
4 changed files with 123 additions and 12 deletions

View File

@ -19,10 +19,12 @@ package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.Region;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.ProducerState;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@ -40,7 +42,9 @@ public class ProducerBrokerExchange {
private boolean mutable = true; private boolean mutable = true;
private AtomicLong lastSendSequenceNumber = new AtomicLong(-1); private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
private boolean auditProducerSequenceIds; private boolean auditProducerSequenceIds;
private boolean isNetworkProducer;
private BrokerService brokerService;
public ProducerBrokerExchange() { public ProducerBrokerExchange() {
} }
@ -132,23 +136,43 @@ public class ProducerBrokerExchange {
*/ */
public boolean canDispatch(Message messageSend) { public boolean canDispatch(Message messageSend) {
boolean canDispatch = true; boolean canDispatch = true;
if (auditProducerSequenceIds) { if (auditProducerSequenceIds && messageSend.isPersistent()) {
if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber.get()) { final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
if (isNetworkProducer) {
// messages are multiplexed on this producer so we need to query the persistenceAdapter
long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
if (producerSequenceId <= lastStoredForMessageProducer) {
canDispatch = false;
LOG.debug("suppressing duplicate message send from network producer [" + messageSend.getMessageId() + "] with producerSequenceId ["
+ producerSequenceId + "] less than last stored: " + lastStoredForMessageProducer);
}
} else if (producerSequenceId <= lastSendSequenceNumber.get()) {
canDispatch = false; canDispatch = false;
LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId [" LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId() + "] with producerSequenceId ["
+ messageSend.getMessageId().getProducerSequenceId() + "] less than last stored: " + lastSendSequenceNumber); + producerSequenceId + "] less than last stored: " + lastSendSequenceNumber);
} } else {
if (canDispatch) {
// track current so we can suppress duplicates later in the stream // track current so we can suppress duplicates later in the stream
lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId()); lastSendSequenceNumber.set(producerSequenceId);
} }
} }
return canDispatch; return canDispatch;
} }
private long getStoredSequenceIdForMessage(MessageId messageId) {
try {
return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
} catch (IOException ignored) {
LOG.debug("Failed to determine last producer sequence id for: " +messageId, ignored);
}
return -1;
}
public void setLastStoredSequenceId(long l) { public void setLastStoredSequenceId(long l) {
auditProducerSequenceIds = true; auditProducerSequenceIds = true;
if (connectionContext.isNetworkConnection()) {
brokerService = connectionContext.getBroker().getBrokerService();
isNetworkProducer = true;
}
lastSendSequenceNumber.set(l); lastSendSequenceNumber.set(l);
LOG.debug("last stored sequence id set: " + l); LOG.debug("last stored sequence id set: " + l);
} }

View File

@ -1313,10 +1313,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
result = new ProducerBrokerExchange(); result = new ProducerBrokerExchange();
TransportConnectionState state = lookupConnectionState(id); TransportConnectionState state = lookupConnectionState(id);
context = state.getContext(); context = state.getContext();
if (context.isReconnect() && !context.isNetworkConnection()) { result.setConnectionContext(context);
if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id)); result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
} }
result.setConnectionContext(context);
SessionState ss = state.getSessionState(id.getParentId()); SessionState ss = state.getSessionState(id.getParentId());
if (ss != null) { if (ss != null) {
result.setProducerState(ss.getProducerState(id)); result.setProducerState(ss.getProducerState(id));

View File

@ -71,6 +71,7 @@ public class TransportConnector implements Connector, BrokerServiceAware {
private boolean rebalanceClusterClients; private boolean rebalanceClusterClients;
private boolean updateClusterClientsOnRemove = false; private boolean updateClusterClientsOnRemove = false;
private String updateClusterFilter; private String updateClusterFilter;
private boolean auditNetworkProducers = true;
public TransportConnector() { public TransportConnector() {
} }
@ -557,4 +558,12 @@ public class TransportConnector implements Connector, BrokerServiceAware {
public int connectionCount() { public int connectionCount() {
return connections.size(); return connections.size();
} }
public boolean isAuditNetworkProducers() {
return auditNetworkProducers;
}
public void setAuditNetworkProducers(boolean auditNetworkProducers) {
this.auditNetworkProducers = auditNetworkProducers;
}
} }

View File

@ -19,6 +19,10 @@ package org.apache.activemq.usecases;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Destination; import javax.jms.Destination;
@ -29,11 +33,15 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -357,6 +365,76 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
assertEquals("Client for " + broker2 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient2); assertEquals("Client for " + broker2 + " should have received 50% of the messages.", (int)(MESSAGE_COUNT * 0.50), msgsClient2);
} }
public void testDuplicateSend() throws Exception {
broker1 = "BrokerA";
broker2 = "BrokerB";
bridgeBrokers(broker1, broker2);
final AtomicBoolean first = new AtomicBoolean();
final CountDownLatch gotMessageLatch = new CountDownLatch(1);
BrokerService brokerService = brokers.get(broker2).broker;
brokerService.setPersistent(true);
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.setPlugins(new BrokerPlugin[]{
new BrokerPluginSupport() {
@Override
public void send(final ProducerBrokerExchange producerExchange,
org.apache.activemq.command.Message messageSend)
throws Exception {
super.send(producerExchange, messageSend);
if (first.compareAndSet(false, true)) {
producerExchange.getConnectionContext().setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
try {
LOG.info("Waiting for recepit");
assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
LOG.info("Stopping connection post send and receive and multiple producers");
producerExchange.getConnectionContext().getConnection().stop();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
});
// Run brokers
startAllBrokers();
// Create queue
Destination dest = createDestination("TEST.FOO", false);
MessageConsumer client2 = createConsumer(broker2, dest);
sendMessages("BrokerA", dest, 1);
assertEquals("Client got message", 1, receiveExactMessages(client2, 1));
client2.close();
gotMessageLatch.countDown();
// message still pending on broker1
assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
client2 = createConsumer(broker2, dest);
LOG.info("Let the second client receive the rest of the messages");
assertEquals("no duplicate message", 0, receiveAllMessages(client2));
assertEquals("no duplicate message", 0, receiveAllMessages(client2));
assertEquals("no messages enqueued", 0, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
}
}));
}
protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception { protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception {
Message msg; Message msg;
int i; int i;
@ -410,8 +488,8 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
public void setUp() throws Exception { public void setUp() throws Exception {
super.setAutoFail(true); super.setAutoFail(true);
super.setUp(); super.setUp();
createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false")); createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=true"));
createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false")); createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=true"));
// Configure broker connection factory // Configure broker connection factory
ActiveMQConnectionFactory factoryA; ActiveMQConnectionFactory factoryA;