mirror of https://github.com/apache/activemq.git
Better handle prefetch extension and pull consumers over Topics to avoid the remote not receiving all the messages available based on the credit it has issued.
This commit is contained in:
parent
2537266740
commit
e050519ff6
|
@ -230,7 +230,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
assertEquals(totalCount, proxy.getQueueSize());
|
||||
|
||||
// Consume again..check we receive all the messages.
|
||||
Set<Integer> messageNumbers = new HashSet<Integer>();
|
||||
Set<Integer> messageNumbers = new HashSet<>();
|
||||
for (int i = 1; i <= totalCount; i++) {
|
||||
messageNumbers.add(i);
|
||||
}
|
||||
|
@ -644,7 +644,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
public void testDurableConsumerAsync() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicReference<Message> received = new AtomicReference<Message>();
|
||||
final AtomicReference<Message> received = new AtomicReference<>();
|
||||
String durableClientId = getDestinationName() + "-ClientId";
|
||||
|
||||
connection = createConnection(durableClientId);
|
||||
|
@ -693,7 +693,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
message.setText("hello");
|
||||
producer.send(message);
|
||||
|
||||
final AtomicReference<Message> msg = new AtomicReference<Message>();
|
||||
final AtomicReference<Message> msg = new AtomicReference<>();
|
||||
assertTrue(Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
|
@ -712,7 +712,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
public void testTopicConsumerAsync() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicReference<Message> received = new AtomicReference<Message>();
|
||||
final AtomicReference<Message> received = new AtomicReference<>();
|
||||
|
||||
connection = createConnection();
|
||||
{
|
||||
|
@ -760,7 +760,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
message.setText("hello");
|
||||
producer.send(message);
|
||||
|
||||
final AtomicReference<Message> msg = new AtomicReference<Message>();
|
||||
final AtomicReference<Message> msg = new AtomicReference<>();
|
||||
assertTrue(Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
|
@ -782,7 +782,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
final ConnectorViewMBean connector = getProxyToConnectionView(getTargetConnectorName());
|
||||
LOG.info("Current number of Connections is: {}", connector.connectionCount());
|
||||
|
||||
ArrayList<Connection> connections = new ArrayList<Connection>();
|
||||
ArrayList<Connection> connections = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
connections.add(createConnection(null));
|
||||
|
@ -1265,4 +1265,69 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
assertFalse(message.getJMSRedelivered());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testProduceAndConsumeLargeNumbersOfTopicMessagesClientAck() throws Exception {
|
||||
doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.CLIENT_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testProduceAndConsumeLargeNumbersOfQueueMessagesClientAck() throws Exception {
|
||||
doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testProduceAndConsumeLargeNumbersOfTopicMessagesAutoAck() throws Exception {
|
||||
doTestProduceAndConsumeLargeNumbersOfMessages(true, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testProduceAndConsumeLargeNumbersOfQueueMessagesAutoAck() throws Exception {
|
||||
doTestProduceAndConsumeLargeNumbersOfMessages(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
public void doTestProduceAndConsumeLargeNumbersOfMessages(boolean topic, int ackMode) throws Exception {
|
||||
|
||||
final int MSG_COUNT = 1000;
|
||||
final CountDownLatch done = new CountDownLatch(MSG_COUNT);
|
||||
|
||||
JmsConnectionFactory factory = new JmsConnectionFactory(getAmqpURI());
|
||||
factory.setForceSyncSend(true);
|
||||
|
||||
connection = factory.createConnection();
|
||||
connection.start();
|
||||
|
||||
Session session = connection.createSession(false, ackMode);
|
||||
final Destination destination;
|
||||
if (topic) {
|
||||
destination = session.createTopic(getDestinationName());
|
||||
} else {
|
||||
destination = session.createQueue(getDestinationName());
|
||||
}
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
try {
|
||||
message.acknowledge();
|
||||
done.countDown();
|
||||
} catch (JMSException ex) {
|
||||
LOG.info("Caught exception.", ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
TextMessage textMessage = session.createTextMessage();
|
||||
textMessage.setText("messageText");
|
||||
|
||||
for (int i = 0; i < MSG_COUNT; i++) {
|
||||
producer.send(textMessage);
|
||||
}
|
||||
|
||||
assertTrue("Did not receive all messages: " + MSG_COUNT, done.await(15, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,9 @@ import static org.junit.Assert.assertNull;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.broker.jmx.TopicViewMBean;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
|
@ -37,47 +39,93 @@ import org.junit.Test;
|
|||
public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testReceiverCanDrainMessages() throws Exception {
|
||||
public void testReceiverCanDrainMessagesQueue() throws Exception {
|
||||
doTestReceiverCanDrainMessages(false);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testReceiverCanDrainMessagesTopic() throws Exception {
|
||||
doTestReceiverCanDrainMessages(true);
|
||||
}
|
||||
|
||||
private void doTestReceiverCanDrainMessages(boolean topic) throws Exception {
|
||||
final String destinationName;
|
||||
if (topic) {
|
||||
destinationName = "topic://" + getTestName();
|
||||
} else {
|
||||
destinationName = "queue://" + getTestName();
|
||||
}
|
||||
|
||||
int MSG_COUNT = 20;
|
||||
sendMessages(getTestName(), MSG_COUNT, false);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
AmqpReceiver receiver = session.createReceiver(destinationName);
|
||||
|
||||
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(MSG_COUNT, queueView.getQueueSize());
|
||||
assertEquals(0, queueView.getDispatchCount());
|
||||
sendMessages(getTestName(), MSG_COUNT, topic);
|
||||
|
||||
final DestinationViewMBean destinationView;
|
||||
if (topic) {
|
||||
destinationView = getProxyToTopic(getTestName());
|
||||
} else {
|
||||
destinationView = getProxyToQueue(getTestName());
|
||||
}
|
||||
|
||||
assertEquals(MSG_COUNT, destinationView.getEnqueueCount());
|
||||
assertEquals(0, destinationView.getDispatchCount());
|
||||
|
||||
receiver.drain(MSG_COUNT);
|
||||
for (int i = 0; i < MSG_COUNT; ++i) {
|
||||
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(message);
|
||||
assertNotNull("Failed to read message: " + (i + 1), message);
|
||||
LOG.info("Read message: {}", message.getMessageId());
|
||||
message.accept();
|
||||
}
|
||||
receiver.close();
|
||||
|
||||
assertEquals(0, queueView.getQueueSize());
|
||||
assertEquals(MSG_COUNT, destinationView.getDequeueCount());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testPullWithNoMessageGetDrained() throws Exception {
|
||||
public void testPullWithNoMessageGetDrainedQueue() throws Exception {
|
||||
doTestPullWithNoMessageGetDrained(false);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testPullWithNoMessageGetDrainedTopic() throws Exception {
|
||||
doTestPullWithNoMessageGetDrained(true);
|
||||
}
|
||||
|
||||
private void doTestPullWithNoMessageGetDrained(boolean topic) throws Exception {
|
||||
|
||||
final String destinationName;
|
||||
if (topic) {
|
||||
destinationName = "topic://" + getTestName();
|
||||
} else {
|
||||
destinationName = "queue://" + getTestName();
|
||||
}
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
AmqpReceiver receiver = session.createReceiver(destinationName);
|
||||
|
||||
receiver.flow(10);
|
||||
|
||||
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(0, queueView.getQueueSize());
|
||||
assertEquals(0, queueView.getDispatchCount());
|
||||
final DestinationViewMBean destinationView;
|
||||
if (topic) {
|
||||
destinationView = getProxyToTopic(getTestName());
|
||||
} else {
|
||||
destinationView = getProxyToQueue(getTestName());
|
||||
}
|
||||
|
||||
assertEquals(0, destinationView.getEnqueueCount());
|
||||
assertEquals(0, destinationView.getDispatchCount());
|
||||
|
||||
assertEquals(10, receiver.getReceiver().getRemoteCredit());
|
||||
|
||||
|
@ -89,19 +137,42 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
|
|||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testPullOneFromRemote() throws Exception {
|
||||
int MSG_COUNT = 20;
|
||||
sendMessages(getTestName(), MSG_COUNT, false);
|
||||
public void testPullOneFromRemoteQueue() throws Exception {
|
||||
doTestPullOneFromRemote(false);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testPullOneFromRemoteTopic() throws Exception {
|
||||
doTestPullOneFromRemote(true);
|
||||
}
|
||||
|
||||
private void doTestPullOneFromRemote(boolean topic) throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
final String destinationName;
|
||||
if (topic) {
|
||||
destinationName = "topic://" + getTestName();
|
||||
} else {
|
||||
destinationName = "queue://" + getTestName();
|
||||
}
|
||||
|
||||
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(MSG_COUNT, queueView.getQueueSize());
|
||||
assertEquals(0, queueView.getDispatchCount());
|
||||
AmqpReceiver receiver = session.createReceiver(destinationName);
|
||||
|
||||
int MSG_COUNT = 20;
|
||||
sendMessages(getTestName(), MSG_COUNT, topic);
|
||||
|
||||
final DestinationViewMBean destinationView;
|
||||
if (topic) {
|
||||
destinationView = getProxyToTopic(getTestName());
|
||||
} else {
|
||||
destinationView = getProxyToQueue(getTestName());
|
||||
}
|
||||
|
||||
assertEquals(MSG_COUNT, destinationView.getEnqueueCount());
|
||||
assertEquals(0, destinationView.getDispatchCount());
|
||||
|
||||
assertEquals(0, receiver.getReceiver().getRemoteCredit());
|
||||
|
||||
|
@ -113,25 +184,48 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
|
|||
|
||||
receiver.close();
|
||||
|
||||
assertEquals(MSG_COUNT - 1, queueView.getQueueSize());
|
||||
assertEquals(1, queueView.getDispatchCount());
|
||||
assertEquals(MSG_COUNT - 1, destinationView.getEnqueueCount() - destinationView.getDequeueCount());
|
||||
assertEquals(1, destinationView.getDispatchCount());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testMultipleZeroResultPulls() throws Exception {
|
||||
public void testMultipleZeroResultPullsQueue() throws Exception {
|
||||
doTestMultipleZeroResultPulls(false);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testMultipleZeroResultPullsTopic() throws Exception {
|
||||
doTestMultipleZeroResultPulls(true);
|
||||
}
|
||||
|
||||
private void doTestMultipleZeroResultPulls(boolean topic) throws Exception {
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
final String destinationName;
|
||||
if (topic) {
|
||||
destinationName = "topic://" + getTestName();
|
||||
} else {
|
||||
destinationName = "queue://" + getTestName();
|
||||
}
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver(destinationName);
|
||||
|
||||
receiver.flow(10);
|
||||
|
||||
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(0, queueView.getQueueSize());
|
||||
assertEquals(0, queueView.getDispatchCount());
|
||||
if (topic) {
|
||||
TopicViewMBean topicView = getProxyToTopic(getTestName());
|
||||
assertEquals(0, topicView.getEnqueueCount());
|
||||
assertEquals(0, topicView.getDispatchCount());
|
||||
} else {
|
||||
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(0, queueView.getQueueSize());
|
||||
assertEquals(0, queueView.getDispatchCount());
|
||||
}
|
||||
|
||||
assertEquals(10, receiver.getReceiver().getRemoteCredit());
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
|
@ -29,6 +29,7 @@ import java.util.Map;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.activemq.broker.jmx.DestinationViewMBean;
|
||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.junit.ActiveMQTestRunner;
|
||||
import org.apache.activemq.junit.Repeat;
|
||||
|
@ -268,19 +269,43 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
|
||||
@Test(timeout = 60000)
|
||||
@Repeat(repetitions = 1)
|
||||
public void testPresettledReceiverReadsAllMessagesInNonFlowBatch() throws Exception {
|
||||
public void testPresettledReceiverReadsAllMessagesInNonFlowBatchQueue() throws Exception {
|
||||
doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(false);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
@Repeat(repetitions = 1)
|
||||
public void testPresettledReceiverReadsAllMessagesInNonFlowBatchTopic() throws Exception {
|
||||
doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(true);
|
||||
}
|
||||
|
||||
private void doTestPresettledReceiverReadsAllMessagesInNonFlowBatch(boolean topic) throws Exception {
|
||||
|
||||
final String destinationName;
|
||||
if (topic) {
|
||||
destinationName = "topic://" + getTestName();
|
||||
} else {
|
||||
destinationName = "queue://" + getTestName();
|
||||
}
|
||||
|
||||
final int MSG_COUNT = 100;
|
||||
sendMessages(getTestName(), MSG_COUNT, false);
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true);
|
||||
AmqpReceiver receiver = session.createReceiver(destinationName, null, false, true);
|
||||
|
||||
QueueViewMBean queueView = getProxyToQueue(getTestName());
|
||||
assertEquals(MSG_COUNT, queueView.getQueueSize());
|
||||
assertEquals(0, queueView.getDispatchCount());
|
||||
sendMessages(getTestName(), MSG_COUNT, topic);
|
||||
|
||||
final DestinationViewMBean destinationView;
|
||||
if (topic) {
|
||||
destinationView = getProxyToTopic(getTestName());
|
||||
} else {
|
||||
destinationView = getProxyToQueue(getTestName());
|
||||
}
|
||||
assertEquals(MSG_COUNT, destinationView.getEnqueueCount());
|
||||
assertEquals(0, destinationView.getDispatchCount());
|
||||
|
||||
receiver.flow(20);
|
||||
// consume less that flow
|
||||
|
@ -302,7 +327,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
|
||||
receiver.close();
|
||||
|
||||
assertEquals(0, queueView.getQueueSize());
|
||||
assertEquals(0, destinationView.getEnqueueCount() - destinationView.getDequeueCount());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
@ -481,7 +506,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
|
|||
}
|
||||
});
|
||||
|
||||
Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
|
||||
Map<Symbol, DescribedType> filters = new HashMap<>();
|
||||
filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKOWN_FILTER);
|
||||
|
||||
Source source = new Source();
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Topic;
|
||||
|
@ -60,13 +61,30 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSimpleSendOneReceiveOne() throws Exception {
|
||||
public void testSimpleSendOneReceiveOneToQueue() throws Exception {
|
||||
doTestSimpleSendOneReceiveOne(Queue.class);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSimpleSendOneReceiveOneToTopic() throws Exception {
|
||||
doTestSimpleSendOneReceiveOne(Topic.class);
|
||||
}
|
||||
|
||||
public void doTestSimpleSendOneReceiveOne(Class<?> destType) throws Exception {
|
||||
|
||||
final String address;
|
||||
if (Queue.class.equals(destType)) {
|
||||
address = "queue://" + getTestName();
|
||||
} else {
|
||||
address = "topic://" + getTestName();
|
||||
}
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender("queue://" + getTestName());
|
||||
AmqpSender sender = session.createSender(address);
|
||||
AmqpReceiver receiver = session.createReceiver(address);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
|
||||
|
@ -78,7 +96,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
sender.close();
|
||||
|
||||
LOG.info("Attempting to read message with receiver");
|
||||
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
|
||||
receiver.flow(2);
|
||||
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
|
||||
assertNotNull("Should have read message", received);
|
||||
|
@ -366,7 +383,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
|
||||
private void doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Class<?> destType) throws Exception {
|
||||
final AmqpClient client = createAmqpClient();
|
||||
final LinkedList<Throwable> errors = new LinkedList<Throwable>();
|
||||
final LinkedList<Throwable> errors = new LinkedList<>();
|
||||
final CountDownLatch receiverReady = new CountDownLatch(1);
|
||||
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
|
||||
|
@ -609,7 +626,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
sender.send(message);
|
||||
}
|
||||
|
||||
List<AmqpMessage> pendingAcks = new ArrayList<AmqpMessage>();
|
||||
List<AmqpMessage> pendingAcks = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < MSG_COUNT; i++) {
|
||||
receiver.flow(1);
|
||||
|
@ -719,4 +736,84 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
|
|||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendReceiveLotsOfDurableMessagesOnQueue() throws Exception {
|
||||
doTestSendReceiveLotsOfDurableMessages(Queue.class);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendReceiveLotsOfDurableMessagesOnTopic() throws Exception {
|
||||
doTestSendReceiveLotsOfDurableMessages(Topic.class);
|
||||
}
|
||||
|
||||
private void doTestSendReceiveLotsOfDurableMessages(Class<?> destType) throws Exception {
|
||||
final int MSG_COUNT = 1000;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
|
||||
AmqpConnection connection = trackConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
final CountDownLatch done = new CountDownLatch(MSG_COUNT);
|
||||
final AtomicBoolean error = new AtomicBoolean(false);
|
||||
final ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
|
||||
final String address;
|
||||
if (Queue.class.equals(destType)) {
|
||||
address = "queue://" + getTestName();
|
||||
} else {
|
||||
address = "topic://" + getTestName();
|
||||
}
|
||||
|
||||
final AmqpReceiver receiver = session.createReceiver(address);
|
||||
receiver.flow(MSG_COUNT);
|
||||
|
||||
AmqpSender sender = session.createSender(address);
|
||||
|
||||
final DestinationViewMBean destinationView;
|
||||
if (Queue.class.equals(destType)) {
|
||||
destinationView = getProxyToQueue(getTestName());
|
||||
} else {
|
||||
destinationView = getProxyToTopic(getTestName());
|
||||
}
|
||||
|
||||
executor.execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 0; i < MSG_COUNT; i++) {
|
||||
try {
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
received.accept();
|
||||
done.countDown();
|
||||
} catch (Exception ex) {
|
||||
LOG.info("Caught error: {}", ex.getClass().getSimpleName());
|
||||
error.set(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (int i = 0; i < MSG_COUNT; i++) {
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setMessageId("msg" + i);
|
||||
sender.send(message);
|
||||
}
|
||||
|
||||
assertTrue("did not read all messages, waiting on: " + done.getCount(), done.await(10, TimeUnit.SECONDS));
|
||||
assertFalse("should not be any errors on receive", error.get());
|
||||
|
||||
assertTrue("Should be no inflight messages: " + destinationView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return destinationView.getInFlightCount() == 0;
|
||||
}
|
||||
}));
|
||||
|
||||
sender.close();
|
||||
receiver.close();
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
@ -276,37 +275,61 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
|
||||
super.acknowledge(context, ack);
|
||||
|
||||
// Handle the standard acknowledgment case.
|
||||
if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
|
||||
if (context.isInTransaction()) {
|
||||
context.getTransaction().addSynchronization(new Synchronization() {
|
||||
@Override
|
||||
public void afterCommit() throws Exception {
|
||||
updateStatsOnAck(ack);
|
||||
dispatchMatched();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
updateStatsOnAck(ack);
|
||||
if (ack.isStandardAck()) {
|
||||
updateStatsOnAck(context, ack);
|
||||
} else if (ack.isPoisonAck()) {
|
||||
if (ack.isInTransaction()) {
|
||||
throw new JMSException("Poison ack cannot be transacted: " + ack);
|
||||
}
|
||||
updateStatsOnAck(context, ack);
|
||||
if (getPrefetchSize() != 0) {
|
||||
decrementPrefetchExtension(ack.getMessageCount());
|
||||
}
|
||||
} else if (ack.isIndividualAck()) {
|
||||
updateStatsOnAck(context, ack);
|
||||
if (getPrefetchSize() != 0 && ack.isInTransaction()) {
|
||||
incrementPrefetchExtension(ack.getMessageCount());
|
||||
}
|
||||
updatePrefetch(ack);
|
||||
dispatchMatched();
|
||||
return;
|
||||
} else if (ack.isDeliveredAck()) {
|
||||
// Message was delivered but not acknowledged: update pre-fetch counters.
|
||||
prefetchExtension.addAndGet(ack.getMessageCount());
|
||||
dispatchMatched();
|
||||
return;
|
||||
} else if (ack.isExpiredAck()) {
|
||||
updateStatsOnAck(ack);
|
||||
updatePrefetch(ack);
|
||||
dispatchMatched();
|
||||
return;
|
||||
if (getPrefetchSize() != 0) {
|
||||
incrementPrefetchExtension(ack.getMessageCount());
|
||||
}
|
||||
} else if (ack.isDeliveredAck()) {
|
||||
// Message was delivered but not acknowledged: update pre-fetch counters.
|
||||
if (getPrefetchSize() != 0) {
|
||||
incrementPrefetchExtension(ack.getMessageCount());
|
||||
}
|
||||
} else if (ack.isRedeliveredAck()) {
|
||||
// nothing to do atm
|
||||
// No processing for redelivered needed
|
||||
return;
|
||||
} else {
|
||||
throw new JMSException("Invalid acknowledgment: " + ack);
|
||||
}
|
||||
|
||||
dispatchMatched();
|
||||
}
|
||||
|
||||
private void updateStatsOnAck(final ConnectionContext context, final MessageAck ack) {
|
||||
if (context.isInTransaction()) {
|
||||
context.getTransaction().addSynchronization(new Synchronization() {
|
||||
|
||||
@Override
|
||||
public void beforeEnd() {
|
||||
if (getPrefetchSize() != 0) {
|
||||
decrementPrefetchExtension(ack.getMessageCount());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCommit() throws Exception {
|
||||
updateStatsOnAck(ack);
|
||||
dispatchMatched();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
updateStatsOnAck(ack);
|
||||
}
|
||||
throw new JMSException("Invalid acknowledgment: " + ack);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -399,20 +422,20 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
}
|
||||
}
|
||||
|
||||
private void updatePrefetch(MessageAck ack) {
|
||||
private void incrementPrefetchExtension(int amount) {
|
||||
while (true) {
|
||||
int currentExtension = prefetchExtension.get();
|
||||
int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
|
||||
int newExtension = Math.max(0, currentExtension + amount);
|
||||
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void decrementPrefetchExtension() {
|
||||
private void decrementPrefetchExtension(int amount) {
|
||||
while (true) {
|
||||
int currentExtension = prefetchExtension.get();
|
||||
int newExtension = Math.max(0, currentExtension - 1);
|
||||
int newExtension = Math.max(0, currentExtension - amount);
|
||||
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
|
||||
break;
|
||||
}
|
||||
|
@ -439,7 +462,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
@Override
|
||||
public int getDispatchedQueueSize() {
|
||||
return (int)(getSubscriptionStatistics().getDispatched().getCount() -
|
||||
prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount());
|
||||
getSubscriptionStatistics().getDequeues().getCount());
|
||||
}
|
||||
|
||||
public int getMaximumPendingMessages() {
|
||||
|
@ -538,10 +561,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
// -------------------------------------------------------------------------
|
||||
@Override
|
||||
public boolean isFull() {
|
||||
if (info.getPrefetchSize() == 0) {
|
||||
return prefetchExtension.get() == 0;
|
||||
}
|
||||
return getDispatchedQueueSize() >= info.getPrefetchSize();
|
||||
return getPrefetchSize() == 0 ? prefetchExtension.get() == 0 : getDispatchedQueueSize() - prefetchExtension.get() >= info.getPrefetchSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -554,7 +574,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
*/
|
||||
@Override
|
||||
public boolean isLowWaterMark() {
|
||||
return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
|
||||
return (getDispatchedQueueSize() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -562,7 +582,7 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
*/
|
||||
@Override
|
||||
public boolean isHighWaterMark() {
|
||||
return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
|
||||
return (getDispatchedQueueSize() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -669,10 +689,10 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
}
|
||||
|
||||
if (getPrefetchSize() == 0) {
|
||||
decrementPrefetchExtension();
|
||||
decrementPrefetchExtension(1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (info.isDispatchAsync()) {
|
||||
if (node != null) {
|
||||
md.setTransmitCallback(new TransmitCallback() {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
|
@ -16,24 +16,42 @@
|
|||
*/
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.ActiveMQSession;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class TopicSubscriptionZeroPrefetchTest {
|
||||
|
||||
private static final String TOPIC_NAME = "slow.consumer";
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TopicSubscriptionZeroPrefetchTest.class);
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
private Connection connection;
|
||||
private Session session;
|
||||
private ActiveMQTopic destination;
|
||||
|
@ -41,6 +59,10 @@ public class TopicSubscriptionZeroPrefetchTest {
|
|||
private MessageConsumer consumer;
|
||||
private BrokerService brokerService;
|
||||
|
||||
public String getTopicName() {
|
||||
return name.getMethodName();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
||||
|
@ -52,7 +74,7 @@ public class TopicSubscriptionZeroPrefetchTest {
|
|||
connection = activeMQConnectionFactory.createConnection();
|
||||
connection.setClientID("ClientID-1");
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
destination = new ActiveMQTopic(TOPIC_NAME);
|
||||
destination = new ActiveMQTopic(getTopicName());
|
||||
producer = session.createProducer(destination);
|
||||
|
||||
connection.start();
|
||||
|
@ -61,10 +83,10 @@ public class TopicSubscriptionZeroPrefetchTest {
|
|||
/*
|
||||
* test non durable topic subscription with prefetch set to zero
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
@Test(timeout = 60000)
|
||||
public void testTopicConsumerPrefetchZero() throws Exception {
|
||||
|
||||
ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
|
||||
ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
|
||||
consumer = session.createConsumer(consumerDestination);
|
||||
|
||||
// publish messages
|
||||
|
@ -76,31 +98,153 @@ public class TopicSubscriptionZeroPrefetchTest {
|
|||
Assert.assertNotNull("should have received a message the published message", consumedMessage);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testTopicConsumerPrefetchZeroClientAckLoop() throws Exception {
|
||||
ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
|
||||
@Test(timeout = 60000)
|
||||
public void testTopicConsumerPrefetchZeroClientAckLoopReceive() throws Exception {
|
||||
ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
|
||||
Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
consumer = consumerClientAckSession.createConsumer(consumerDestination);
|
||||
|
||||
final int count = 10;
|
||||
for (int i=0;i<count;i++) {
|
||||
Message txtMessage = session.createTextMessage("M:"+ i);
|
||||
for (int i = 0; i < count; i++) {
|
||||
Message txtMessage = session.createTextMessage("M:" + i);
|
||||
producer.send(txtMessage);
|
||||
}
|
||||
|
||||
for (int i=0;i<count;i++) {
|
||||
Message consumedMessage = consumer.receive(2000);
|
||||
Assert.assertNotNull("should have received message[" + i +"]", consumedMessage);
|
||||
for (int i = 0; i < count; i++) {
|
||||
Message consumedMessage = consumer.receive();
|
||||
Assert.assertNotNull("should have received message[" + i + "]", consumedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTopicConsumerPrefetchZeroClientAckLoopTimedReceive() throws Exception {
|
||||
ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
|
||||
Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
consumer = consumerClientAckSession.createConsumer(consumerDestination);
|
||||
|
||||
final int count = 10;
|
||||
for (int i = 0; i < count; i++) {
|
||||
Message txtMessage = session.createTextMessage("M:" + i);
|
||||
producer.send(txtMessage);
|
||||
}
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
Message consumedMessage = consumer.receive(2000);
|
||||
Assert.assertNotNull("should have received message[" + i + "]", consumedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTopicConsumerPrefetchZeroClientAckLoopReceiveNoWait() throws Exception {
|
||||
ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
|
||||
Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
consumer = consumerClientAckSession.createConsumer(consumerDestination);
|
||||
|
||||
final int count = 10;
|
||||
for (int i = 0; i < count; i++) {
|
||||
Message txtMessage = session.createTextMessage("M:" + i);
|
||||
producer.send(txtMessage);
|
||||
}
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
Message consumedMessage = consumer.receiveNoWait();
|
||||
Assert.assertNotNull("should have received message[" + i + "]", consumedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeAutoAck() throws Exception {
|
||||
doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeClientAck() throws Exception {
|
||||
doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.CLIENT_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeDupsOk() throws Exception {
|
||||
doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.DUPS_OK_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeTransacted() throws Exception {
|
||||
doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.SESSION_TRANSACTED);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeTransactedComitInBatches() throws Exception {
|
||||
doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(Session.SESSION_TRANSACTED);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTopicConsumerPrefetchZeroConcurrentProduceConsumeIndividual() throws Exception {
|
||||
doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
private void doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(int ackMode) throws Exception {
|
||||
doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(ackMode, false);
|
||||
}
|
||||
|
||||
private void doTestTopicConsumerPrefetchZeroConcurrentProduceConsume(int ackMode, boolean commitBatch) throws Exception {
|
||||
|
||||
ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.retroactive=true&consumer.prefetchSize=0");
|
||||
Session consumerSession = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode);
|
||||
consumer = consumerSession.createConsumer(consumerDestination);
|
||||
|
||||
final int MSG_COUNT = 2000;
|
||||
|
||||
final AtomicBoolean error = new AtomicBoolean();
|
||||
final CountDownLatch done = new CountDownLatch(MSG_COUNT);
|
||||
|
||||
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
executor.execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
for (int i = 0; i < MSG_COUNT; i++) {
|
||||
Message consumedMessage = consumer.receive();
|
||||
if (consumedMessage != null) {
|
||||
done.countDown();
|
||||
consumedMessage.acknowledge();
|
||||
if (ackMode == Session.SESSION_TRANSACTED && commitBatch && ((i + 1) % 50) == 0) {
|
||||
consumerSession.commit();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Caught exception during receive: {}", ex);
|
||||
error.set(true);
|
||||
} finally {
|
||||
if (ackMode == Session.SESSION_TRANSACTED) {
|
||||
try {
|
||||
consumerSession.commit();
|
||||
} catch (JMSException e) {
|
||||
LOG.error("Caught exception on commit: {}", e);
|
||||
error.set(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (int i = 0; i < MSG_COUNT; i++) {
|
||||
Message txtMessage = session.createTextMessage("M:" + i);
|
||||
producer.send(txtMessage);
|
||||
}
|
||||
|
||||
assertFalse("Should not have gotten any errors", error.get());
|
||||
assertTrue("Should have read all messages", done.await(10, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
/*
|
||||
* test durable topic subscription with prefetch zero
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
@Test(timeout = 60000)
|
||||
public void testDurableTopicConsumerPrefetchZero() throws Exception {
|
||||
|
||||
ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.prefetchSize=0");
|
||||
ActiveMQTopic consumerDestination = new ActiveMQTopic(getTopicName() + "?consumer.prefetchSize=0");
|
||||
consumer = session.createDurableSubscriber(consumerDestination, "mysub1");
|
||||
|
||||
// publish messages
|
||||
|
|
Loading…
Reference in New Issue