test case and logging improvements relating to cyclic/multicast discovery network of size 3 and topic message duplication with nteworkTTL > 1

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@718224 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-11-17 11:49:28 +00:00
parent 0bfb28a0bc
commit 6d1f57b137
6 changed files with 270 additions and 29 deletions

View File

@ -1739,7 +1739,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Handles async client internal exceptions.
* A client internal exception is usually one that has been thrown
* by a container runtie component during asynchronous processing of a
* by a container runtime component during asynchronous processing of a
* message that does not affect the connection itself.
* This method notifies the <code>ClientInternalExceptionListener</code> by invoking
* its <code>onException</code> method, if one has been registered with this connection.

View File

@ -199,7 +199,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
inAckRange = true;
}
if (inAckRange) {
// Don't remove the nodes until we are committed.
// Don't remove the nodes until we are committed.
if (!context.isInTransaction()) {
dequeueCounter++;
if (!this.getConsumerInfo().isBrowser()) {
@ -282,9 +282,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
// Acknowledge all dispatched messages up till the message id of
// the
// acknowledgment.
int index = 0;
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
final MessageReference node = iter.next();

View File

@ -500,7 +500,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
BrokerId[] path = info.getBrokerPath();
if (path != null && path.length >= networkTTL) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
}
return;
}
@ -508,24 +508,24 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
// Ignore this consumer as it's a consumer we locally sent to
// the broker.
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already routed through this broker once");
LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
}
return;
}
if (!isPermissableDestination(info.getDestination())) {
// ignore if not in the permited or in the excluded list
// ignore if not in the permitted or in the excluded list
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
}
return;
}
if (addConsumerInfo(info)) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Forwarding sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
LOG.debug(configuration.getBrokerName() + " Forwarding sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination");
LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
}
}
} else if (data.getClass() == DestinationInfo.class) {

View File

@ -79,10 +79,11 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
return;
}
if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) {
LOG.debug("not connecting loopback: " + uri);
return;
}
URI connectUri = uri;
LOG.info("Establishing network connection between from " + localURIName + " to " + connectUri);
LOG.info("Establishing network connection from " + localURIName + " to " + connectUri);
Transport remoteTransport;
Transport localTransport;
@ -213,10 +214,13 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
String name = super.getName();
if (name == null) {
name = discoveryAgent.toString();
;
super.setName(name);
}
return name;
}
@Override
public String toString() {
return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService();
}
}

View File

@ -176,14 +176,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
private long lastAdvertizeTime;
private AtomicBoolean started = new AtomicBoolean(false);
private boolean reportAdvertizeFailed = true;
private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, "Multicast Discovery Agent Notifier");
t.setDaemon(true);
return t;
}
});
private Executor executor = null;
/**
* Set the discovery listener
@ -304,7 +297,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
mcast.joinGroup(inetAddress);
mcast.setSoTimeout((int)keepAliveInterval);
runner = new Thread(this);
runner.setName("MulticastDiscovery: " + selfService);
runner.setName(this.toString() + ":" + runner.getName());
runner.setDaemon(true);
runner.start();
doAdvertizeSelf();
@ -409,11 +402,9 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
RemoteBrokerData data = brokersByService.get(service);
if (data == null) {
data = new RemoteBrokerData(brokerName, service);
brokersByService.put(service, data);
brokersByService.put(service, data);
fireServiceAddEvent(data);
doAdvertizeSelf();
} else {
data.updateHeartBeat();
if (data.doRecovery()) {
@ -433,7 +424,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
}
private void doExpireOldServices() {
long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
RemoteBrokerData data = i.next();
if (data.getLastHeartBeat() < expireTime) {
@ -467,7 +458,7 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
// Have the listener process the event async so that
// he does not block this thread since we are doing time sensitive
// processing of events.
executor.execute(new Runnable() {
getExecutor().execute(new Runnable() {
public void run() {
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
if (discoveryListener != null) {
@ -482,11 +473,11 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
if (discoveryListener != null) {
final DiscoveryEvent event = new DiscoveryEvent(data.service);
event.setBrokerName(data.brokerName);
// Have the listener process the event async so that
// he does not block this thread since we are doing time sensitive
// processing of events.
executor.execute(new Runnable() {
getExecutor().execute(new Runnable() {
public void run() {
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
if (discoveryListener != null) {
@ -497,6 +488,20 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
}
}
private Executor getExecutor() {
if (executor == null) {
final String threadName = "Notifier-" + this.toString();
executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, threadName);
t.setDaemon(true);
return t;
}
});
}
return executor;
}
public long getBackOffMultiplier() {
return backOffMultiplier;
}
@ -540,4 +545,10 @@ public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
public void setGroup(String group) {
this.group = group;
}
@Override
public String toString() {
return "MulticastDiscoveryAgent-"
+ (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
}
}

View File

@ -0,0 +1,229 @@
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.NetworkConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import junit.framework.TestCase;
public class NoDuplicateOnTopicNetworkTest extends TestCase {
private static final Log LOG = LogFactory
.getLog(NoDuplicateOnTopicNetworkTest.class);
private static final String MULTICAST_DEFAULT = "multicast://default";
private static final String BROKER_1 = "tcp://localhost:61626";
private static final String BROKER_2 = "tcp://localhost:61636";
private static final String BROKER_3 = "tcp://localhost:61646";
private BrokerService broker1;
private BrokerService broker2;
private BrokerService broker3;
private boolean dynamicOnly = false;
// no duplicates in cyclic network if networkTTL <=1
// when > 1, subscriptions perculate around resulting in duplicates as there is no
// memory of the original subscription.
// solution for 6.0 using org.apache.activemq.command.ConsumerInfo.getNetworkConsumerIds()
private int ttl = 1;
@Override
protected void setUp() throws Exception {
super.setUp();
broker3 = createAndStartBroker("broker3", BROKER_3);
Thread.sleep(3000);
broker2 = createAndStartBroker("broker2", BROKER_2);
Thread.sleep(3000);
broker1 = createAndStartBroker("broker1", BROKER_1);
Thread.sleep(1000);
}
private BrokerService createAndStartBroker(String name, String addr)
throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName(name);
broker.addConnector(addr).setDiscoveryUri(new URI(MULTICAST_DEFAULT));
broker.setUseJmx(false);
NetworkConnector networkConnector = broker
.addNetworkConnector(MULTICAST_DEFAULT);
networkConnector.setDecreaseNetworkConsumerPriority(true);
networkConnector.setDynamicOnly(dynamicOnly);
networkConnector.setNetworkTTL(ttl);
broker.start();
return broker;
}
@Override
protected void tearDown() throws Exception {
broker1.stop();
broker2.stop();
broker3.stop();
super.tearDown();
}
public void testProducerConsumerTopic() throws Exception {
final String topicName = "broadcast";
Thread producerThread = new Thread(new Runnable() {
public void run() {
TopicWithDuplicateMessages producer = new TopicWithDuplicateMessages();
producer.setBrokerURL(BROKER_1);
producer.setTopicName(topicName);
try {
producer.produce();
} catch (JMSException e) {
fail("Unexpected " + e);
}
}
});
final TopicWithDuplicateMessages consumer = new TopicWithDuplicateMessages();
Thread consumerThread = new Thread(new Runnable() {
public void run() {
consumer.setBrokerURL(BROKER_2);
consumer.setTopicName(topicName);
try {
consumer.consumer();
consumer.getLatch().await(60, TimeUnit.SECONDS);
} catch (Exception e) {
fail("Unexpected " + e);
}
}
});
consumerThread.start();
Thread.sleep(1000);
producerThread.start();
producerThread.join();
consumerThread.join();
Map<String, String> map = new HashMap<String, String>();
for (String msg : consumer.getMessageStrings()) {
assertTrue("is not a duplicate: " + msg, !map.containsKey(msg));
map.put(msg, msg);
}
assertEquals("got all required messages: " + map.size(), consumer
.getNumMessages(), map.size());
}
class TopicWithDuplicateMessages {
private String brokerURL;
private String topicName;
private Connection connection;
private Session session;
private Topic topic;
private MessageProducer producer;
private MessageConsumer consumer;
private List<String> receivedStrings = new ArrayList<String>();
private int numMessages = 10;
private CountDownLatch recievedLatch = new CountDownLatch(numMessages);
public CountDownLatch getLatch() {
return recievedLatch;
}
public List<String> getMessageStrings() {
return receivedStrings;
}
public String getBrokerURL() {
return brokerURL;
}
public void setBrokerURL(String brokerURL) {
this.brokerURL = brokerURL;
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
private void createConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
brokerURL);
connection = factory.createConnection();
}
private void createTopic() throws JMSException {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(topicName);
}
private void createProducer() throws JMSException {
producer = session.createProducer(topic);
}
private void createConsumer() throws JMSException {
consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
TextMessage msg = (TextMessage) arg0;
try {
LOG.debug("Received message [" + msg.getText() + "]");
receivedStrings.add(msg.getText());
recievedLatch.countDown();
} catch (JMSException e) {
fail("Unexpected :" + e);
}
}
});
}
private void publish() throws JMSException {
for (int i = 0; i < numMessages; i++) {
TextMessage textMessage = session.createTextMessage();
String message = "message: " + i;
LOG.debug("Sending message[" + message + "]");
textMessage.setText(message);
producer.send(textMessage);
}
}
public void produce() throws JMSException {
createConnection();
createTopic();
createProducer();
connection.start();
publish();
}
public void consumer() throws JMSException {
createConnection();
createTopic();
createConsumer();
connection.start();
}
public int getNumMessages() {
return numMessages;
}
}
}