diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index 24c84428ea..3e50a08665 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -1739,7 +1739,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon /** * Handles async client internal exceptions. * A client internal exception is usually one that has been thrown - * by a container runtie component during asynchronous processing of a + * by a container runtime component during asynchronous processing of a * message that does not affect the connection itself. * This method notifies the ClientInternalExceptionListener by invoking * its onException method, if one has been registered with this connection. diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 25764d0f28..7b390ebb0c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -199,7 +199,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { inAckRange = true; } if (inAckRange) { - // Don't remove the nodes until we are committed. + // Don't remove the nodes until we are committed. if (!context.isInTransaction()) { dequeueCounter++; if (!this.getConsumerInfo().isBrowser()) { @@ -282,9 +282,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { }else if (ack.isDeliveredAck()) { // Message was delivered but not acknowledged: update pre-fetch // counters. - // Acknowledge all dispatched messages up till the message id of - // the - // acknowledgment. int index = 0; for (Iterator iter = dispatched.iterator(); iter.hasNext(); index++) { final MessageReference node = iter.next(); diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 90243a77ce..2c39903b89 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -500,7 +500,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { BrokerId[] path = info.getBrokerPath(); if (path != null && path.length >= networkTTL) { if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only"); + LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info); } return; } @@ -508,24 +508,24 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { // Ignore this consumer as it's a consumer we locally sent to // the broker. if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already routed through this broker once"); + LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info); } return; } if (!isPermissableDestination(info.getDestination())) { - // ignore if not in the permited or in the excluded list + // ignore if not in the permitted or in the excluded list if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited"); + LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info); } return; } if (addConsumerInfo(info)) { if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " Forwarding sub on " + localBroker + " from " + remoteBrokerName + " : " + info); + LOG.debug(configuration.getBrokerName() + " Forwarding sub on " + localBroker + " from " + remoteBrokerName + " : " + info); } } else { if (LOG.isDebugEnabled()) { - LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination"); + LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info); } } } else if (data.getClass() == DestinationInfo.class) { diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index 3f15ac9955..e195d8c760 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -79,10 +79,11 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco return; } if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) { + LOG.debug("not connecting loopback: " + uri); return; } URI connectUri = uri; - LOG.info("Establishing network connection between from " + localURIName + " to " + connectUri); + LOG.info("Establishing network connection from " + localURIName + " to " + connectUri); Transport remoteTransport; Transport localTransport; @@ -213,10 +214,13 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco String name = super.getName(); if (name == null) { name = discoveryAgent.toString(); - ; super.setName(name); } return name; } + @Override + public String toString() { + return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java index b22ccbd2d6..4a3813933d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java @@ -176,14 +176,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { private long lastAdvertizeTime; private AtomicBoolean started = new AtomicBoolean(false); private boolean reportAdvertizeFailed = true; - - private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { - public Thread newThread(Runnable runable) { - Thread t = new Thread(runable, "Multicast Discovery Agent Notifier"); - t.setDaemon(true); - return t; - } - }); + private Executor executor = null; /** * Set the discovery listener @@ -304,7 +297,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { mcast.joinGroup(inetAddress); mcast.setSoTimeout((int)keepAliveInterval); runner = new Thread(this); - runner.setName("MulticastDiscovery: " + selfService); + runner.setName(this.toString() + ":" + runner.getName()); runner.setDaemon(true); runner.start(); doAdvertizeSelf(); @@ -409,11 +402,9 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { RemoteBrokerData data = brokersByService.get(service); if (data == null) { data = new RemoteBrokerData(brokerName, service); - brokersByService.put(service, data); - + brokersByService.put(service, data); fireServiceAddEvent(data); doAdvertizeSelf(); - } else { data.updateHeartBeat(); if (data.doRecovery()) { @@ -433,7 +424,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { } private void doExpireOldServices() { - long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); + long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); for (Iterator i = brokersByService.values().iterator(); i.hasNext();) { RemoteBrokerData data = i.next(); if (data.getLastHeartBeat() < expireTime) { @@ -467,7 +458,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { // Have the listener process the event async so that // he does not block this thread since we are doing time sensitive // processing of events. - executor.execute(new Runnable() { + getExecutor().execute(new Runnable() { public void run() { DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; if (discoveryListener != null) { @@ -482,11 +473,11 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { if (discoveryListener != null) { final DiscoveryEvent event = new DiscoveryEvent(data.service); event.setBrokerName(data.brokerName); - + // Have the listener process the event async so that // he does not block this thread since we are doing time sensitive // processing of events. - executor.execute(new Runnable() { + getExecutor().execute(new Runnable() { public void run() { DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; if (discoveryListener != null) { @@ -497,6 +488,20 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { } } + private Executor getExecutor() { + if (executor == null) { + final String threadName = "Notifier-" + this.toString(); + executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { + public Thread newThread(Runnable runable) { + Thread t = new Thread(runable, threadName); + t.setDaemon(true); + return t; + } + }); + } + return executor; + } + public long getBackOffMultiplier() { return backOffMultiplier; } @@ -540,4 +545,10 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { public void setGroup(String group) { this.group = group; } + + @Override + public String toString() { + return "MulticastDiscoveryAgent-" + + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java new file mode 100644 index 0000000000..45ee05bcfb --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java @@ -0,0 +1,229 @@ +package org.apache.activemq.usecases; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.network.NetworkConnector; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import junit.framework.TestCase; + +public class NoDuplicateOnTopicNetworkTest extends TestCase { + private static final Log LOG = LogFactory + .getLog(NoDuplicateOnTopicNetworkTest.class); + + private static final String MULTICAST_DEFAULT = "multicast://default"; + private static final String BROKER_1 = "tcp://localhost:61626"; + private static final String BROKER_2 = "tcp://localhost:61636"; + private static final String BROKER_3 = "tcp://localhost:61646"; + private BrokerService broker1; + private BrokerService broker2; + private BrokerService broker3; + + private boolean dynamicOnly = false; + // no duplicates in cyclic network if networkTTL <=1 + // when > 1, subscriptions perculate around resulting in duplicates as there is no + // memory of the original subscription. + // solution for 6.0 using org.apache.activemq.command.ConsumerInfo.getNetworkConsumerIds() + private int ttl = 1; + + @Override + protected void setUp() throws Exception { + super.setUp(); + + broker3 = createAndStartBroker("broker3", BROKER_3); + Thread.sleep(3000); + broker2 = createAndStartBroker("broker2", BROKER_2); + Thread.sleep(3000); + broker1 = createAndStartBroker("broker1", BROKER_1); + Thread.sleep(1000); + } + + private BrokerService createAndStartBroker(String name, String addr) + throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName(name); + broker.addConnector(addr).setDiscoveryUri(new URI(MULTICAST_DEFAULT)); + broker.setUseJmx(false); + + NetworkConnector networkConnector = broker + .addNetworkConnector(MULTICAST_DEFAULT); + networkConnector.setDecreaseNetworkConsumerPriority(true); + networkConnector.setDynamicOnly(dynamicOnly); + networkConnector.setNetworkTTL(ttl); + + broker.start(); + + return broker; + } + + @Override + protected void tearDown() throws Exception { + broker1.stop(); + broker2.stop(); + broker3.stop(); + super.tearDown(); + } + + public void testProducerConsumerTopic() throws Exception { + final String topicName = "broadcast"; + Thread producerThread = new Thread(new Runnable() { + public void run() { + TopicWithDuplicateMessages producer = new TopicWithDuplicateMessages(); + producer.setBrokerURL(BROKER_1); + producer.setTopicName(topicName); + try { + producer.produce(); + } catch (JMSException e) { + fail("Unexpected " + e); + } + } + }); + + final TopicWithDuplicateMessages consumer = new TopicWithDuplicateMessages(); + Thread consumerThread = new Thread(new Runnable() { + public void run() { + consumer.setBrokerURL(BROKER_2); + consumer.setTopicName(topicName); + try { + consumer.consumer(); + consumer.getLatch().await(60, TimeUnit.SECONDS); + } catch (Exception e) { + fail("Unexpected " + e); + } + } + }); + + consumerThread.start(); + Thread.sleep(1000); + producerThread.start(); + producerThread.join(); + consumerThread.join(); + + Map map = new HashMap(); + for (String msg : consumer.getMessageStrings()) { + assertTrue("is not a duplicate: " + msg, !map.containsKey(msg)); + map.put(msg, msg); + } + assertEquals("got all required messages: " + map.size(), consumer + .getNumMessages(), map.size()); + } + + class TopicWithDuplicateMessages { + private String brokerURL; + private String topicName; + private Connection connection; + private Session session; + private Topic topic; + private MessageProducer producer; + private MessageConsumer consumer; + + private List receivedStrings = new ArrayList(); + private int numMessages = 10; + private CountDownLatch recievedLatch = new CountDownLatch(numMessages); + + public CountDownLatch getLatch() { + return recievedLatch; + } + + public List getMessageStrings() { + return receivedStrings; + } + + public String getBrokerURL() { + return brokerURL; + } + + public void setBrokerURL(String brokerURL) { + this.brokerURL = brokerURL; + } + + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + private void createConnection() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + brokerURL); + connection = factory.createConnection(); + } + + private void createTopic() throws JMSException { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = session.createTopic(topicName); + } + + private void createProducer() throws JMSException { + producer = session.createProducer(topic); + } + + private void createConsumer() throws JMSException { + consumer = session.createConsumer(topic); + consumer.setMessageListener(new MessageListener() { + + public void onMessage(Message arg0) { + TextMessage msg = (TextMessage) arg0; + try { + LOG.debug("Received message [" + msg.getText() + "]"); + receivedStrings.add(msg.getText()); + recievedLatch.countDown(); + } catch (JMSException e) { + fail("Unexpected :" + e); + } + } + + }); + } + + private void publish() throws JMSException { + for (int i = 0; i < numMessages; i++) { + TextMessage textMessage = session.createTextMessage(); + String message = "message: " + i; + LOG.debug("Sending message[" + message + "]"); + textMessage.setText(message); + producer.send(textMessage); + } + } + + public void produce() throws JMSException { + createConnection(); + createTopic(); + createProducer(); + connection.start(); + publish(); + } + + public void consumer() throws JMSException { + createConnection(); + createTopic(); + createConsumer(); + connection.start(); + } + + public int getNumMessages() { + return numMessages; + } + } +}