mirror of https://github.com/apache/activemq.git
Fix and test for: https://issues.apache.org/jira/browse/AMQ-4475
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1469013 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
893a1ce88c
commit
7e53814928
|
@ -140,7 +140,7 @@ public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
|
||||||
Destination regionDestination = (Destination) message.getRegionDestination();
|
Destination regionDestination = (Destination) message.getRegionDestination();
|
||||||
if (message != null && regionDestination != null) {
|
if (message != null && regionDestination != null) {
|
||||||
deadLetterStrategy = regionDestination.getDeadLetterStrategy();
|
deadLetterStrategy = regionDestination.getDeadLetterStrategy();
|
||||||
if (deadLetterStrategy != null) {
|
if (deadLetterStrategy != null && message.getOriginalDestination() != null) {
|
||||||
// Cheap copy, since we only need two fields
|
// Cheap copy, since we only need two fields
|
||||||
tmp = new ActiveMQMessage();
|
tmp = new ActiveMQMessage();
|
||||||
tmp.setDestination(message.getOriginalDestination());
|
tmp.setDestination(message.getOriginalDestination());
|
||||||
|
|
|
@ -0,0 +1,328 @@
|
||||||
|
package org.apache.activemq.bugs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerPlugin;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.broker.util.TimeStampingBrokerPlugin;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class AMQ4475Test {
|
||||||
|
|
||||||
|
private final Log LOG = LogFactory.getLog(AMQ4475Test.class);
|
||||||
|
|
||||||
|
private final int NUM_MSGS = 1000;
|
||||||
|
private final int MAX_THREADS = 20;
|
||||||
|
|
||||||
|
private BrokerService broker;
|
||||||
|
private String connectionUri;
|
||||||
|
|
||||||
|
private final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);
|
||||||
|
private final ActiveMQQueue original = new ActiveMQQueue("jms/AQueue");
|
||||||
|
private final ActiveMQQueue rerouted = new ActiveMQQueue("jms/AQueue_proxy");
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin();
|
||||||
|
tsbp.setZeroExpirationOverride(432000000);
|
||||||
|
tsbp.setTtlCeiling(432000000);
|
||||||
|
tsbp.setFutureOnly(true);
|
||||||
|
|
||||||
|
broker = new BrokerService();
|
||||||
|
broker.setPersistent(false);
|
||||||
|
broker.setUseJmx(true);
|
||||||
|
broker.setPlugins(new BrokerPlugin[] {tsbp});
|
||||||
|
connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
|
||||||
|
|
||||||
|
// Configure Dead Letter Strategy
|
||||||
|
DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
|
||||||
|
strategy.setProcessExpired(true);
|
||||||
|
((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
|
||||||
|
((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
|
||||||
|
strategy.setProcessNonPersistent(true);
|
||||||
|
|
||||||
|
// Add policy and individual DLQ strategy
|
||||||
|
PolicyEntry policy = new PolicyEntry();
|
||||||
|
policy.setTimeBeforeDispatchStarts(3000);
|
||||||
|
policy.setDeadLetterStrategy(strategy);
|
||||||
|
|
||||||
|
PolicyMap pMap = new PolicyMap();
|
||||||
|
pMap.setDefaultEntry(policy);
|
||||||
|
|
||||||
|
broker.setDestinationPolicy(pMap);
|
||||||
|
broker.start();
|
||||||
|
broker.waitUntilStarted();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() throws Exception {
|
||||||
|
if (broker != null) {
|
||||||
|
broker.stop();
|
||||||
|
broker.waitUntilStopped();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIndividualDeadLetterAndTimeStampPlugin() {
|
||||||
|
LOG.info("Starting test ..");
|
||||||
|
|
||||||
|
long startTime = System.nanoTime();
|
||||||
|
|
||||||
|
// Produce to network
|
||||||
|
List<Future<ProducerTask>> tasks = new ArrayList<Future<ProducerTask>>();
|
||||||
|
|
||||||
|
for (int index = 0; index < 1; index++) {
|
||||||
|
ProducerTask p = new ProducerTask(connectionUri, original, NUM_MSGS);
|
||||||
|
Future<ProducerTask> future = executor.submit(p, p);
|
||||||
|
tasks.add(future);
|
||||||
|
}
|
||||||
|
|
||||||
|
ForwardingConsumerThread f1 = new ForwardingConsumerThread(original, rerouted, NUM_MSGS);
|
||||||
|
f1.start();
|
||||||
|
ConsumerThread c1 = new ConsumerThread(connectionUri, rerouted, NUM_MSGS);
|
||||||
|
c1.start();
|
||||||
|
|
||||||
|
LOG.info("Waiting on consumers and producers to exit");
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (Future<ProducerTask> future : tasks) {
|
||||||
|
ProducerTask e = future.get();
|
||||||
|
LOG.info("[Completed] " + e.dest.getPhysicalName());
|
||||||
|
}
|
||||||
|
executor.shutdown();
|
||||||
|
LOG.info("Producing threads complete, waiting on ACKs");
|
||||||
|
f1.join(TimeUnit.MINUTES.toMillis(2));
|
||||||
|
c1.join(TimeUnit.MINUTES.toMillis(2));
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
LOG.warn("Caught unexpected exception: {}", e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.warn("Caught unexpected exception: {}", ie);
|
||||||
|
throw new RuntimeException(ie);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertFalse(f1.isFailed());
|
||||||
|
assertFalse(c1.isFailed());
|
||||||
|
|
||||||
|
long estimatedTime = System.nanoTime() - startTime;
|
||||||
|
|
||||||
|
LOG.info("Testcase duration (seconds): " + estimatedTime / 1000000000.0);
|
||||||
|
LOG.info("Consumers and producers exited, all msgs received as expected");
|
||||||
|
}
|
||||||
|
|
||||||
|
public class ProducerTask implements Runnable {
|
||||||
|
private final String uri;
|
||||||
|
private final ActiveMQQueue dest;
|
||||||
|
private final int count;
|
||||||
|
|
||||||
|
public ProducerTask(String uri, ActiveMQQueue dest, int count) {
|
||||||
|
this.uri = uri;
|
||||||
|
this.dest = dest;
|
||||||
|
this.count = count;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
|
||||||
|
Connection connection = null;
|
||||||
|
try {
|
||||||
|
String destName = "";
|
||||||
|
|
||||||
|
try {
|
||||||
|
destName = dest.getQueueName();
|
||||||
|
} catch (JMSException e) {
|
||||||
|
LOG.warn("Caught unexpected exception: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uri);
|
||||||
|
|
||||||
|
connection = connectionFactory.createConnection();
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = session.createProducer(dest);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
|
||||||
|
String msg = "Test Message";
|
||||||
|
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
producer.send(session.createTextMessage(msg + dest.getQueueName() + " " + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("[" + destName + "] Sent " + count + " msgs");
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Caught unexpected exception: {}", e);
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
connection.close();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.warn("Caught unexpected exception: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class ForwardingConsumerThread extends Thread {
|
||||||
|
|
||||||
|
private final ActiveMQQueue original;
|
||||||
|
private final ActiveMQQueue forward;
|
||||||
|
private int blockSize = 0;
|
||||||
|
private final int PARALLEL = 1;
|
||||||
|
private boolean failed;
|
||||||
|
|
||||||
|
public ForwardingConsumerThread(ActiveMQQueue original, ActiveMQQueue forward, int total) {
|
||||||
|
this.original = original;
|
||||||
|
this.forward = forward;
|
||||||
|
this.blockSize = total / PARALLEL;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isFailed() {
|
||||||
|
return failed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
Connection connection = null;
|
||||||
|
try {
|
||||||
|
|
||||||
|
for (int index = 0; index < PARALLEL; index++) {
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
|
||||||
|
|
||||||
|
connection = factory.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer = session.createConsumer(original);
|
||||||
|
MessageProducer producer = session.createProducer(forward);
|
||||||
|
connection.start();
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
while (count < blockSize) {
|
||||||
|
|
||||||
|
Message msg1 = consumer.receive(10000);
|
||||||
|
if (msg1 != null) {
|
||||||
|
if (msg1 instanceof ActiveMQTextMessage) {
|
||||||
|
if (count % 100 == 0) {
|
||||||
|
LOG.info("Consuming -> " + ((ActiveMQTextMessage) msg1).getDestination() + " count=" + count);
|
||||||
|
}
|
||||||
|
|
||||||
|
producer.send(msg1);
|
||||||
|
|
||||||
|
count++;
|
||||||
|
} else {
|
||||||
|
LOG.info("Skipping unknown msg type " + msg1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("[" + original.getQueueName() + "] completed segment (" + index + " of " + blockSize + ")");
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Caught unexpected exception: {}", e);
|
||||||
|
} finally {
|
||||||
|
LOG.debug(getName() + ": is stopping");
|
||||||
|
try {
|
||||||
|
connection.close();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class ConsumerThread extends Thread {
|
||||||
|
|
||||||
|
private final String uri;
|
||||||
|
private final ActiveMQQueue dest;
|
||||||
|
private int blockSize = 0;
|
||||||
|
private final int PARALLEL = 1;
|
||||||
|
private boolean failed;
|
||||||
|
|
||||||
|
public ConsumerThread(String uri, ActiveMQQueue dest, int total) {
|
||||||
|
this.uri = uri;
|
||||||
|
this.dest = dest;
|
||||||
|
this.blockSize = total / PARALLEL;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isFailed() {
|
||||||
|
return failed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
Connection connection = null;
|
||||||
|
try {
|
||||||
|
|
||||||
|
for (int index = 0; index < PARALLEL; index++) {
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
|
||||||
|
|
||||||
|
connection = factory.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer = session.createConsumer(dest);
|
||||||
|
connection.start();
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
while (count < blockSize) {
|
||||||
|
|
||||||
|
Object msg1 = consumer.receive(10000);
|
||||||
|
if (msg1 != null) {
|
||||||
|
if (msg1 instanceof ActiveMQTextMessage) {
|
||||||
|
if (count % 100 == 0) {
|
||||||
|
LOG.info("Consuming -> " + ((ActiveMQTextMessage) msg1).getDestination() + " count=" + count);
|
||||||
|
}
|
||||||
|
|
||||||
|
count++;
|
||||||
|
} else {
|
||||||
|
LOG.info("Skipping unknown msg type " + msg1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
failed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("[" + dest.getQueueName() + "] completed segment (" + index + " of " + blockSize + ")");
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Caught unexpected exception: {}", e);
|
||||||
|
} finally {
|
||||||
|
LOG.debug(getName() + ": is stopping");
|
||||||
|
try {
|
||||||
|
connection.close();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue