mirror of https://github.com/apache/activemq.git
Fixing Queue destination statistics in dropMessage by adding sync in between the check for dropped and actually dropping the message plus fixing dequeue stats so messages aren't counted twice
This commit is contained in:
parent
ab434ee776
commit
1241e4120a
|
@ -24,8 +24,8 @@ import org.apache.activemq.command.MessageId;
|
||||||
* Keeps track of a message that is flowing through the Broker. This object may
|
* Keeps track of a message that is flowing through the Broker. This object may
|
||||||
* hold a hard reference to the message or only hold the id of the message if
|
* hold a hard reference to the message or only hold the id of the message if
|
||||||
* the message has been persisted on in a MessageStore.
|
* the message has been persisted on in a MessageStore.
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class IndirectMessageReference implements QueueMessageReference {
|
public class IndirectMessageReference implements QueueMessageReference {
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ public class IndirectMessageReference implements QueueMessageReference {
|
||||||
/** Direct reference to the message */
|
/** Direct reference to the message */
|
||||||
private final Message message;
|
private final Message message;
|
||||||
private final MessageId messageId;
|
private final MessageId messageId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param message
|
* @param message
|
||||||
*/
|
*/
|
||||||
|
@ -50,44 +50,70 @@ public class IndirectMessageReference implements QueueMessageReference {
|
||||||
message.getGroupSequence();
|
message.getGroupSequence();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Message getMessageHardRef() {
|
public Message getMessageHardRef() {
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getReferenceCount() {
|
public int getReferenceCount() {
|
||||||
return message.getReferenceCount();
|
return message.getReferenceCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int incrementReferenceCount() {
|
public int incrementReferenceCount() {
|
||||||
return message.incrementReferenceCount();
|
return message.incrementReferenceCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int decrementReferenceCount() {
|
public int decrementReferenceCount() {
|
||||||
return message.decrementReferenceCount();
|
return message.decrementReferenceCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Message getMessage() {
|
public Message getMessage() {
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null);
|
return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void incrementRedeliveryCounter() {
|
public void incrementRedeliveryCounter() {
|
||||||
message.incrementRedeliveryCounter();
|
message.incrementRedeliveryCounter();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized boolean isDropped() {
|
public synchronized boolean isDropped() {
|
||||||
return dropped;
|
return dropped;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized void drop() {
|
public synchronized void drop() {
|
||||||
dropped = true;
|
dropped = true;
|
||||||
lockOwner = null;
|
lockOwner = null;
|
||||||
message.decrementReferenceCount();
|
message.decrementReferenceCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the message has already been dropped before
|
||||||
|
* dropping. Return true if dropped, else false.
|
||||||
|
* This method exists so that this can be done atomically
|
||||||
|
* under the intrinisic lock
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized boolean dropIfLive() {
|
||||||
|
if (isDropped()) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
drop();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean lock(LockOwner subscription) {
|
public boolean lock(LockOwner subscription) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (dropped || lockOwner != null) {
|
if (dropped || lockOwner != null) {
|
||||||
|
@ -98,28 +124,34 @@ public class IndirectMessageReference implements QueueMessageReference {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized boolean unlock() {
|
public synchronized boolean unlock() {
|
||||||
boolean result = lockOwner != null;
|
boolean result = lockOwner != null;
|
||||||
lockOwner = null;
|
lockOwner = null;
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized LockOwner getLockOwner() {
|
public synchronized LockOwner getLockOwner() {
|
||||||
return lockOwner;
|
return lockOwner;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getRedeliveryCounter() {
|
public int getRedeliveryCounter() {
|
||||||
return message.getRedeliveryCounter();
|
return message.getRedeliveryCounter();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public MessageId getMessageId() {
|
public MessageId getMessageId() {
|
||||||
return messageId;
|
return messageId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Message.MessageDestination getRegionDestination() {
|
public Message.MessageDestination getRegionDestination() {
|
||||||
return message.getRegionDestination();
|
return message.getRegionDestination();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isPersistent() {
|
public boolean isPersistent() {
|
||||||
return message.isPersistent();
|
return message.isPersistent();
|
||||||
}
|
}
|
||||||
|
@ -128,38 +160,47 @@ public class IndirectMessageReference implements QueueMessageReference {
|
||||||
return lockOwner != null;
|
return lockOwner != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized boolean isAcked() {
|
public synchronized boolean isAcked() {
|
||||||
return acked;
|
return acked;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized void setAcked(boolean b) {
|
public synchronized void setAcked(boolean b) {
|
||||||
acked = b;
|
acked = b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getGroupID() {
|
public String getGroupID() {
|
||||||
return message.getGroupID();
|
return message.getGroupID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getGroupSequence() {
|
public int getGroupSequence() {
|
||||||
return message.getGroupSequence();
|
return message.getGroupSequence();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ConsumerId getTargetConsumerId() {
|
public ConsumerId getTargetConsumerId() {
|
||||||
return message.getTargetConsumerId();
|
return message.getTargetConsumerId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public long getExpiration() {
|
public long getExpiration() {
|
||||||
return message.getExpiration();
|
return message.getExpiration();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isExpired() {
|
public boolean isExpired() {
|
||||||
return message.isExpired();
|
return message.isExpired();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized int getSize() {
|
public synchronized int getSize() {
|
||||||
return message.getSize();
|
return message.getSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isAdvisory() {
|
public boolean isAdvisory() {
|
||||||
return message.isAdvisory();
|
return message.isAdvisory();
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,98 +29,127 @@ public final class NullMessageReference implements QueueMessageReference {
|
||||||
private final ActiveMQMessage message = new ActiveMQMessage();
|
private final ActiveMQMessage message = new ActiveMQMessage();
|
||||||
private volatile int references;
|
private volatile int references;
|
||||||
|
|
||||||
|
@Override
|
||||||
public void drop() {
|
public void drop() {
|
||||||
throw new RuntimeException("not implemented");
|
throw new RuntimeException("not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean dropIfLive() {
|
||||||
|
throw new RuntimeException("not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public LockOwner getLockOwner() {
|
public LockOwner getLockOwner() {
|
||||||
throw new RuntimeException("not implemented");
|
throw new RuntimeException("not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isAcked() {
|
public boolean isAcked() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isDropped() {
|
public boolean isDropped() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean lock(LockOwner subscription) {
|
public boolean lock(LockOwner subscription) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setAcked(boolean b) {
|
public void setAcked(boolean b) {
|
||||||
throw new RuntimeException("not implemented");
|
throw new RuntimeException("not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean unlock() {
|
public boolean unlock() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int decrementReferenceCount() {
|
public int decrementReferenceCount() {
|
||||||
return --references;
|
return --references;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public long getExpiration() {
|
public long getExpiration() {
|
||||||
throw new RuntimeException("not implemented");
|
throw new RuntimeException("not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getGroupID() {
|
public String getGroupID() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getGroupSequence() {
|
public int getGroupSequence() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Message getMessage() {
|
public Message getMessage() {
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Message getMessageHardRef() {
|
public Message getMessageHardRef() {
|
||||||
throw new RuntimeException("not implemented");
|
throw new RuntimeException("not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public MessageId getMessageId() {
|
public MessageId getMessageId() {
|
||||||
return message.getMessageId();
|
return message.getMessageId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getRedeliveryCounter() {
|
public int getRedeliveryCounter() {
|
||||||
throw new RuntimeException("not implemented");
|
throw new RuntimeException("not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getReferenceCount() {
|
public int getReferenceCount() {
|
||||||
return references;
|
return references;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Destination getRegionDestination() {
|
public Destination getRegionDestination() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getSize() {
|
public int getSize() {
|
||||||
throw new RuntimeException("not implemented");
|
throw new RuntimeException("not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ConsumerId getTargetConsumerId() {
|
public ConsumerId getTargetConsumerId() {
|
||||||
throw new RuntimeException("not implemented");
|
throw new RuntimeException("not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void incrementRedeliveryCounter() {
|
public void incrementRedeliveryCounter() {
|
||||||
throw new RuntimeException("not implemented");
|
throw new RuntimeException("not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int incrementReferenceCount() {
|
public int incrementReferenceCount() {
|
||||||
return ++references;
|
return ++references;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isExpired() {
|
public boolean isExpired() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isPersistent() {
|
public boolean isPersistent() {
|
||||||
throw new RuntimeException("not implemented");
|
throw new RuntimeException("not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isAdvisory() {
|
public boolean isAdvisory() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1748,7 +1748,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
// This sends the ack the the journal..
|
// This sends the ack the the journal..
|
||||||
if (!ack.isInTransaction()) {
|
if (!ack.isInTransaction()) {
|
||||||
acknowledge(context, sub, ack, reference);
|
acknowledge(context, sub, ack, reference);
|
||||||
getDestinationStatistics().getDequeues().increment();
|
|
||||||
dropMessage(reference);
|
dropMessage(reference);
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
|
@ -1758,7 +1757,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterCommit() throws Exception {
|
public void afterCommit() throws Exception {
|
||||||
getDestinationStatistics().getDequeues().increment();
|
|
||||||
dropMessage(reference);
|
dropMessage(reference);
|
||||||
wakeup();
|
wakeup();
|
||||||
}
|
}
|
||||||
|
@ -1788,9 +1786,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
}
|
}
|
||||||
|
|
||||||
private void dropMessage(QueueMessageReference reference) {
|
private void dropMessage(QueueMessageReference reference) {
|
||||||
if (!reference.isDropped()) {
|
//use dropIfLive so we only process the statistics at most one time
|
||||||
reference.drop();
|
if (reference.dropIfLive()) {
|
||||||
destinationStatistics.getMessages().decrement();
|
getDestinationStatistics().getDequeues().increment();
|
||||||
|
getDestinationStatistics().getMessages().decrement();
|
||||||
pagedInMessagesLock.writeLock().lock();
|
pagedInMessagesLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
pagedInMessages.remove(reference);
|
pagedInMessages.remove(reference);
|
||||||
|
|
|
@ -18,25 +18,28 @@ package org.apache.activemq.broker.region;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Queue specific MessageReference.
|
* Queue specific MessageReference.
|
||||||
*
|
*
|
||||||
* @author fateev@amazon.com
|
* @author fateev@amazon.com
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public interface QueueMessageReference extends MessageReference {
|
public interface QueueMessageReference extends MessageReference {
|
||||||
|
|
||||||
QueueMessageReference NULL_MESSAGE = new NullMessageReference();
|
QueueMessageReference NULL_MESSAGE = new NullMessageReference();
|
||||||
|
|
||||||
boolean isAcked();
|
boolean isAcked();
|
||||||
|
|
||||||
void setAcked(boolean b);
|
void setAcked(boolean b);
|
||||||
|
|
||||||
void drop();
|
void drop();
|
||||||
|
|
||||||
|
boolean dropIfLive();
|
||||||
|
|
||||||
|
@Override
|
||||||
boolean isDropped();
|
boolean isDropped();
|
||||||
|
|
||||||
boolean lock(LockOwner subscription);
|
boolean lock(LockOwner subscription);
|
||||||
|
|
||||||
boolean unlock();
|
boolean unlock();
|
||||||
|
|
||||||
LockOwner getLockOwner();
|
LockOwner getLockOwner();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,178 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.bugs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.broker.region.Queue;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
|
public class AMQ6293Test {
|
||||||
|
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(AMQ6293Test.class);
|
||||||
|
|
||||||
|
private BrokerService brokerService;
|
||||||
|
private String connectionUri;
|
||||||
|
private ExecutorService service = Executors.newFixedThreadPool(6);
|
||||||
|
private final ActiveMQQueue queue = new ActiveMQQueue("test");
|
||||||
|
private final int numMessages = 10000;
|
||||||
|
private Connection connection;
|
||||||
|
private Session session;
|
||||||
|
private final AtomicBoolean isException = new AtomicBoolean();
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() throws Exception {
|
||||||
|
brokerService = new BrokerService();
|
||||||
|
TransportConnector connector = brokerService.addConnector("tcp://localhost:0");
|
||||||
|
connectionUri = connector.getPublishableConnectString();
|
||||||
|
brokerService.setPersistent(true);
|
||||||
|
brokerService.setDataDirectoryFile(dataFileDir.getRoot());
|
||||||
|
|
||||||
|
PolicyMap policyMap = new PolicyMap();
|
||||||
|
PolicyEntry entry = new PolicyEntry();
|
||||||
|
policyMap.setDefaultEntry(entry);
|
||||||
|
brokerService.setDestinationPolicy(policyMap);
|
||||||
|
entry.setQueuePrefetch(100);
|
||||||
|
|
||||||
|
brokerService.start();
|
||||||
|
brokerService.waitUntilStarted();
|
||||||
|
|
||||||
|
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
|
||||||
|
Connection connection = null;
|
||||||
|
connection = factory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() throws Exception {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.stop();
|
||||||
|
}
|
||||||
|
if (brokerService != null) {
|
||||||
|
brokerService.stop();
|
||||||
|
brokerService.waitUntilStopped();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=90000)
|
||||||
|
public void testDestinationStatisticsOnPurge() throws Exception {
|
||||||
|
//send messages to the store
|
||||||
|
sendTestMessages(numMessages);
|
||||||
|
|
||||||
|
//Start up 5 consumers
|
||||||
|
final Queue regionQueue = (Queue) brokerService.getRegionBroker().getDestinationMap().get(queue);
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
service.submit(new TestConsumer(session.createConsumer(queue)));
|
||||||
|
}
|
||||||
|
|
||||||
|
//Start a purge task at the same time as the consumers
|
||||||
|
for (int i = 0; i < 1; i++) {
|
||||||
|
service.submit(new Runnable() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
regionQueue.purge();
|
||||||
|
} catch (Exception e) {
|
||||||
|
isException.set(true);
|
||||||
|
LOG.warn(e.getMessage(), e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
service.shutdown();
|
||||||
|
assertTrue("Took too long to shutdown service", service.awaitTermination(1, TimeUnit.MINUTES));
|
||||||
|
assertFalse("Exception encountered", isException.get());
|
||||||
|
|
||||||
|
//Verify dequeue and message counts
|
||||||
|
assertEquals(0, regionQueue.getDestinationStatistics().getMessages().getCount());
|
||||||
|
assertEquals(numMessages, regionQueue.getDestinationStatistics().getDequeues().getCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendTestMessages(int numMessages) throws JMSException {
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
|
||||||
|
final TextMessage textMessage = session.createTextMessage();
|
||||||
|
textMessage.setText("Message");
|
||||||
|
for (int i = 1; i <= numMessages; i++) {
|
||||||
|
producer.send(textMessage);
|
||||||
|
if (i % 1000 == 0) {
|
||||||
|
LOG.info("Sent {} messages", i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class TestConsumer implements Runnable {
|
||||||
|
private final MessageConsumer consumer;
|
||||||
|
|
||||||
|
public TestConsumer(final MessageConsumer consumer) throws JMSException {
|
||||||
|
this.consumer = consumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
int i = 0;
|
||||||
|
while (consumer.receive(1000) != null) {
|
||||||
|
i++;
|
||||||
|
if (i % 1000 == 0) {
|
||||||
|
LOG.info("Received {} messages", i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
isException.set(true);
|
||||||
|
LOG.warn(e.getMessage(), e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
Loading…
Reference in New Issue