mirror of https://github.com/apache/activemq.git
AMQ-6577: honour usePrefetchExtension in TopicSubscription.
AMQ-6577: move usePrefetchExtension flag to AbstractSubscription to promote reuse.
This commit is contained in:
parent
2e953d96a3
commit
687badb4fd
|
@ -52,6 +52,7 @@ public abstract class AbstractSubscription implements Subscription {
|
||||||
protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
|
protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
|
||||||
protected final AtomicInteger prefetchExtension = new AtomicInteger(0);
|
protected final AtomicInteger prefetchExtension = new AtomicInteger(0);
|
||||||
|
|
||||||
|
private boolean usePrefetchExtension = true;
|
||||||
private BooleanExpression selectorExpression;
|
private BooleanExpression selectorExpression;
|
||||||
private ObjectName objectName;
|
private ObjectName objectName;
|
||||||
private int cursorMemoryHighWaterMark = 70;
|
private int cursorMemoryHighWaterMark = 70;
|
||||||
|
@ -185,6 +186,14 @@ public abstract class AbstractSubscription implements Subscription {
|
||||||
return info.getPrefetchSize();
|
return info.getPrefetchSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isUsePrefetchExtension() {
|
||||||
|
return usePrefetchExtension;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUsePrefetchExtension(boolean usePrefetchExtension) {
|
||||||
|
this.usePrefetchExtension = usePrefetchExtension;
|
||||||
|
}
|
||||||
|
|
||||||
public void setPrefetchSize(int newSize) {
|
public void setPrefetchSize(int newSize) {
|
||||||
info.setPrefetchSize(newSize);
|
info.setPrefetchSize(newSize);
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,7 +56,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
|
|
||||||
protected PendingMessageCursor pending;
|
protected PendingMessageCursor pending;
|
||||||
protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
|
protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
|
||||||
protected boolean usePrefetchExtension = true;
|
|
||||||
private int maxProducersToAudit=32;
|
private int maxProducersToAudit=32;
|
||||||
private int maxAuditDepth=2048;
|
private int maxAuditDepth=2048;
|
||||||
protected final SystemUsage usageManager;
|
protected final SystemUsage usageManager;
|
||||||
|
@ -263,7 +262,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
registerRemoveSync(context, node);
|
registerRemoveSync(context, node);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) {
|
if (isUsePrefetchExtension() && getPrefetchSize() != 0 && ack.isInTransaction()) {
|
||||||
// allow transaction batch to exceed prefetch
|
// allow transaction batch to exceed prefetch
|
||||||
while (true) {
|
while (true) {
|
||||||
int currentExtension = prefetchExtension.get();
|
int currentExtension = prefetchExtension.get();
|
||||||
|
@ -288,7 +287,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
final MessageReference node = iter.next();
|
final MessageReference node = iter.next();
|
||||||
Destination nodeDest = (Destination) node.getRegionDestination();
|
Destination nodeDest = (Destination) node.getRegionDestination();
|
||||||
if (ack.getLastMessageId().equals(node.getMessageId())) {
|
if (ack.getLastMessageId().equals(node.getMessageId())) {
|
||||||
if (usePrefetchExtension && getPrefetchSize() != 0) {
|
if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
|
||||||
// allow batch to exceed prefetch
|
// allow batch to exceed prefetch
|
||||||
while (true) {
|
while (true) {
|
||||||
int currentExtension = prefetchExtension.get();
|
int currentExtension = prefetchExtension.get();
|
||||||
|
@ -328,7 +327,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
nodeDest.getDestinationStatistics().getInflight().decrement();
|
nodeDest.getDestinationStatistics().getInflight().decrement();
|
||||||
|
|
||||||
if (ack.getLastMessageId().equals(messageId)) {
|
if (ack.getLastMessageId().equals(messageId)) {
|
||||||
if (usePrefetchExtension && getPrefetchSize() != 0) {
|
if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
|
||||||
// allow batch to exceed prefetch
|
// allow batch to exceed prefetch
|
||||||
while (true) {
|
while (true) {
|
||||||
int currentExtension = prefetchExtension.get();
|
int currentExtension = prefetchExtension.get();
|
||||||
|
@ -444,7 +443,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void beforeEnd() {
|
public void beforeEnd() {
|
||||||
if (usePrefetchExtension && getPrefetchSize() != 0) {
|
if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
|
||||||
while (true) {
|
while (true) {
|
||||||
int currentExtension = prefetchExtension.get();
|
int currentExtension = prefetchExtension.get();
|
||||||
int newExtension = Math.max(0, currentExtension - 1);
|
int newExtension = Math.max(0, currentExtension - 1);
|
||||||
|
@ -892,14 +891,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isUsePrefetchExtension() {
|
|
||||||
return usePrefetchExtension;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setUsePrefetchExtension(boolean usePrefetchExtension) {
|
|
||||||
this.usePrefetchExtension = usePrefetchExtension;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setPrefetchSize(int prefetchSize) {
|
public void setPrefetchSize(int prefetchSize) {
|
||||||
this.info.setPrefetchSize(prefetchSize);
|
this.info.setPrefetchSize(prefetchSize);
|
||||||
|
|
|
@ -422,6 +422,9 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void incrementPrefetchExtension(int amount) {
|
private void incrementPrefetchExtension(int amount) {
|
||||||
|
if (!isUsePrefetchExtension()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
while (true) {
|
while (true) {
|
||||||
int currentExtension = prefetchExtension.get();
|
int currentExtension = prefetchExtension.get();
|
||||||
int newExtension = Math.max(0, currentExtension + amount);
|
int newExtension = Math.max(0, currentExtension + amount);
|
||||||
|
@ -748,7 +751,8 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
|
return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
|
||||||
+ getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get();
|
+ getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get()
|
||||||
|
+ ", usePrefetchExtension=" + isUsePrefetchExtension();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -781,4 +785,5 @@ public class TopicSubscription extends AbstractSubscription {
|
||||||
LOG.trace("Caught exception on dispatch after prefetch size change.");
|
LOG.trace("Caught exception on dispatch after prefetch size change.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -313,6 +313,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
|
|
||||||
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
|
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
|
||||||
configurePrefetch(subscription);
|
configurePrefetch(subscription);
|
||||||
|
subscription.setUsePrefetchExtension(isUsePrefetchExtension());
|
||||||
subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
|
||||||
if (pendingMessageLimitStrategy != null) {
|
if (pendingMessageLimitStrategy != null) {
|
||||||
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
|
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
|
||||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.activemq;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ServerSocket;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
|
@ -28,13 +28,13 @@ import javax.jms.Message;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import javax.management.MalformedObjectNameException;
|
import javax.management.MalformedObjectNameException;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
import javax.net.ServerSocketFactory;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerRegistry;
|
import org.apache.activemq.broker.BrokerRegistry;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
import org.apache.activemq.broker.region.DestinationStatistics;
|
import org.apache.activemq.broker.region.DestinationStatistics;
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQMessage;
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
@ -157,6 +157,15 @@ public abstract class TestSupport extends CombinationTestSupport {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static List<Subscription> getDestinationConsumers(BrokerService broker, ActiveMQDestination destination) {
|
||||||
|
List<Subscription> result = null;
|
||||||
|
org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination);
|
||||||
|
if (dest != null) {
|
||||||
|
result = dest.getConsumers();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) {
|
public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) {
|
||||||
org.apache.activemq.broker.region.Destination result = null;
|
org.apache.activemq.broker.region.Destination result = null;
|
||||||
for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) {
|
for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) {
|
||||||
|
|
|
@ -22,11 +22,14 @@ import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.CombinationTestSupport;
|
import org.apache.activemq.CombinationTestSupport;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.region.DestinationStatistics;
|
import org.apache.activemq.broker.region.DestinationStatistics;
|
||||||
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
|
import org.apache.activemq.broker.region.TopicSubscription;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
|
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
|
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -34,9 +37,11 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.jms.*;
|
import javax.jms.*;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import static org.apache.activemq.TestSupport.getDestination;
|
import static org.apache.activemq.TestSupport.getDestination;
|
||||||
|
import static org.apache.activemq.TestSupport.getDestinationConsumers;
|
||||||
import static org.apache.activemq.TestSupport.getDestinationStatistics;
|
import static org.apache.activemq.TestSupport.getDestinationStatistics;
|
||||||
|
|
||||||
public class ExpiredMessagesTest extends CombinationTestSupport {
|
public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||||
|
@ -48,11 +53,12 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||||
Session session;
|
Session session;
|
||||||
MessageProducer producer;
|
MessageProducer producer;
|
||||||
MessageConsumer consumer;
|
MessageConsumer consumer;
|
||||||
public ActiveMQDestination destination = new ActiveMQQueue("test");
|
private ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
|
||||||
public ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
|
private boolean useTextMessage = true;
|
||||||
public boolean useTextMessage = true;
|
private boolean useVMCursor = true;
|
||||||
public boolean useVMCursor = true;
|
private boolean deleteAllMessages = true;
|
||||||
protected String brokerUri;
|
private boolean usePrefetchExtension = true;
|
||||||
|
private String brokerUri;
|
||||||
|
|
||||||
public static Test suite() {
|
public static Test suite() {
|
||||||
return suite(ExpiredMessagesTest.class);
|
return suite(ExpiredMessagesTest.class);
|
||||||
|
@ -64,20 +70,153 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
final boolean deleteAllMessages = true;
|
|
||||||
broker = createBroker(deleteAllMessages, 100);
|
|
||||||
brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void tearDown() throws Exception {
|
protected void tearDown() throws Exception {
|
||||||
|
if (null != producer) {
|
||||||
|
producer.close();
|
||||||
|
}
|
||||||
|
if (null != consumer) {
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
session.close();
|
||||||
connection.stop();
|
connection.stop();
|
||||||
broker.stop();
|
broker.stop();
|
||||||
broker.waitUntilStopped();
|
broker.waitUntilStopped();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExpiredMessages() throws Exception {
|
public void testExpiredMessages() throws Exception {
|
||||||
|
final ActiveMQDestination destination = new ActiveMQQueue("test");
|
||||||
|
final int numMessagesToSend = 10000;
|
||||||
|
|
||||||
|
buildBroker(destination);
|
||||||
|
|
||||||
|
final DestinationStatistics view = verifyMessageExpirationOnDestination(destination, numMessagesToSend);
|
||||||
|
|
||||||
|
verifyDestinationDlq(destination, numMessagesToSend, view);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testExpiredMessages_onTopic_withPrefetchExtension() throws Exception {
|
||||||
|
final ActiveMQDestination destination = new ActiveMQTopic("test");
|
||||||
|
final int numMessagesToSend = 10000;
|
||||||
|
|
||||||
|
usePrefetchExtension = true;
|
||||||
|
|
||||||
|
buildBroker(destination);
|
||||||
|
|
||||||
|
verifyMessageExpirationOnDestination(destination, numMessagesToSend);
|
||||||
|
// We don't check the DLQ because non-persistent messages on topics are discarded instead.
|
||||||
|
|
||||||
|
final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
|
||||||
|
|
||||||
|
assertTrue("prefetch extension was not incremented",
|
||||||
|
subscriptions.stream().
|
||||||
|
filter(s -> s instanceof TopicSubscription).
|
||||||
|
mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
|
||||||
|
allMatch(e -> e > 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testExpiredMessages_onTopic_withoutPrefetchExtension() throws Exception {
|
||||||
|
final ActiveMQDestination destination = new ActiveMQTopic("test");
|
||||||
|
final int numMessagesToSend = 10000;
|
||||||
|
|
||||||
|
usePrefetchExtension = false;
|
||||||
|
|
||||||
|
buildBroker(destination);
|
||||||
|
|
||||||
|
verifyMessageExpirationOnDestination(destination, numMessagesToSend);
|
||||||
|
// We don't check the DLQ because non-persistent messages on topics are discarded instead.
|
||||||
|
|
||||||
|
final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
|
||||||
|
|
||||||
|
assertTrue("prefetch extension was incremented",
|
||||||
|
subscriptions.stream().
|
||||||
|
filter(s -> s instanceof TopicSubscription).
|
||||||
|
mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
|
||||||
|
allMatch(e -> e == 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void buildBroker(ActiveMQDestination destination) throws Exception {
|
||||||
|
broker = createBroker(deleteAllMessages, usePrefetchExtension, 100, destination);
|
||||||
|
brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRecoverExpiredMessages() throws Exception {
|
||||||
|
final ActiveMQDestination destination = new ActiveMQQueue("test");
|
||||||
|
|
||||||
|
buildBroker(destination);
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
|
||||||
|
"failover://"+brokerUri);
|
||||||
|
connection = factory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
producer = session.createProducer(destination);
|
||||||
|
producer.setTimeToLive(2000);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
|
||||||
|
Thread producingThread = new Thread("Producing Thread") {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
int i = 0;
|
||||||
|
while (i++ < 1000) {
|
||||||
|
Message message = useTextMessage ? session
|
||||||
|
.createTextMessage("test") : session
|
||||||
|
.createObjectMessage("test");
|
||||||
|
producer.send(message);
|
||||||
|
}
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
ex.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
producingThread.start();
|
||||||
|
producingThread.join();
|
||||||
|
|
||||||
|
DestinationStatistics view = getDestinationStatistics(broker, destination);
|
||||||
|
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
|
||||||
|
+ view.getEnqueues().getCount() + ", dequeues: "
|
||||||
|
+ view.getDequeues().getCount() + ", dispatched: "
|
||||||
|
+ view.getDispatched().getCount() + ", inflight: "
|
||||||
|
+ view.getInflight().getCount() + ", expiries: "
|
||||||
|
+ view.getExpired().getCount());
|
||||||
|
|
||||||
|
LOG.info("stopping broker");
|
||||||
|
broker.stop();
|
||||||
|
broker.waitUntilStopped();
|
||||||
|
|
||||||
|
Thread.sleep(5000);
|
||||||
|
|
||||||
|
LOG.info("recovering broker");
|
||||||
|
final boolean deleteAllMessages = false;
|
||||||
|
final boolean usePrefetchExtension = true;
|
||||||
|
broker = createBroker(deleteAllMessages, usePrefetchExtension, 5000, destination);
|
||||||
|
|
||||||
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
DestinationStatistics view = getDestinationStatistics(broker, destination);
|
||||||
|
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
|
||||||
|
+ view.getEnqueues().getCount() + ", dequeues: "
|
||||||
|
+ view.getDequeues().getCount() + ", dispatched: "
|
||||||
|
+ view.getDispatched().getCount() + ", inflight: "
|
||||||
|
+ view.getInflight().getCount() + ", expiries: "
|
||||||
|
+ view.getExpired().getCount());
|
||||||
|
|
||||||
|
return view.getMessages().getCount() == 0;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
view = getDestinationStatistics(broker, destination);
|
||||||
|
assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount());
|
||||||
|
assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
private DestinationStatistics verifyMessageExpirationOnDestination(ActiveMQDestination destination, final int numMessagesToSend) throws Exception {
|
||||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
|
||||||
connection = factory.createConnection();
|
connection = factory.createConnection();
|
||||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
@ -100,7 +239,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
end = System.currentTimeMillis();
|
end = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
consumer.close();
|
|
||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
ex.printStackTrace();
|
ex.printStackTrace();
|
||||||
}
|
}
|
||||||
|
@ -109,7 +247,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||||
|
|
||||||
consumerThread.start();
|
consumerThread.start();
|
||||||
|
|
||||||
final int numMessagesToSend = 10000;
|
|
||||||
Thread producingThread = new Thread("Producing Thread") {
|
Thread producingThread = new Thread("Producing Thread") {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -118,7 +255,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||||
while (i++ < numMessagesToSend) {
|
while (i++ < numMessagesToSend) {
|
||||||
producer.send(session.createTextMessage("test"));
|
producer.send(session.createTextMessage("test"));
|
||||||
}
|
}
|
||||||
producer.close();
|
|
||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
ex.printStackTrace();
|
ex.printStackTrace();
|
||||||
}
|
}
|
||||||
|
@ -129,7 +265,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||||
|
|
||||||
consumerThread.join();
|
consumerThread.join();
|
||||||
producingThread.join();
|
producingThread.join();
|
||||||
session.close();
|
|
||||||
|
|
||||||
final DestinationStatistics view = getDestinationStatistics(broker, destination);
|
final DestinationStatistics view = getDestinationStatistics(broker, destination);
|
||||||
|
|
||||||
|
@ -171,7 +306,10 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||||
return view.getMessages().getCount() == 0;
|
return view.getMessages().getCount() == 0;
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
return view;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyDestinationDlq(ActiveMQDestination destination, int numMessagesToSend, DestinationStatistics view) throws Exception {
|
||||||
final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount();
|
final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount();
|
||||||
final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue;
|
final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue;
|
||||||
|
|
||||||
|
@ -225,77 +363,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||||
addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
|
addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRecoverExpiredMessages() throws Exception {
|
private BrokerService createBroker(boolean deleteAllMessages, boolean usePrefetchExtension, long expireMessagesPeriod, ActiveMQDestination destination) throws Exception {
|
||||||
|
|
||||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
|
|
||||||
"failover://"+brokerUri);
|
|
||||||
connection = factory.createConnection();
|
|
||||||
connection.start();
|
|
||||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
||||||
producer = session.createProducer(destination);
|
|
||||||
producer.setTimeToLive(2000);
|
|
||||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
|
||||||
|
|
||||||
Thread producingThread = new Thread("Producing Thread") {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
int i = 0;
|
|
||||||
while (i++ < 1000) {
|
|
||||||
Message message = useTextMessage ? session
|
|
||||||
.createTextMessage("test") : session
|
|
||||||
.createObjectMessage("test");
|
|
||||||
producer.send(message);
|
|
||||||
}
|
|
||||||
producer.close();
|
|
||||||
} catch (Throwable ex) {
|
|
||||||
ex.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
producingThread.start();
|
|
||||||
producingThread.join();
|
|
||||||
|
|
||||||
DestinationStatistics view = getDestinationStatistics(broker, destination);
|
|
||||||
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
|
|
||||||
+ view.getEnqueues().getCount() + ", dequeues: "
|
|
||||||
+ view.getDequeues().getCount() + ", dispatched: "
|
|
||||||
+ view.getDispatched().getCount() + ", inflight: "
|
|
||||||
+ view.getInflight().getCount() + ", expiries: "
|
|
||||||
+ view.getExpired().getCount());
|
|
||||||
|
|
||||||
LOG.info("stopping broker");
|
|
||||||
broker.stop();
|
|
||||||
broker.waitUntilStopped();
|
|
||||||
|
|
||||||
Thread.sleep(5000);
|
|
||||||
|
|
||||||
LOG.info("recovering broker");
|
|
||||||
final boolean deleteAllMessages = false;
|
|
||||||
broker = createBroker(deleteAllMessages, 5000);
|
|
||||||
|
|
||||||
Wait.waitFor(new Wait.Condition() {
|
|
||||||
@Override
|
|
||||||
public boolean isSatisified() throws Exception {
|
|
||||||
DestinationStatistics view = getDestinationStatistics(broker, destination);
|
|
||||||
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
|
|
||||||
+ view.getEnqueues().getCount() + ", dequeues: "
|
|
||||||
+ view.getDequeues().getCount() + ", dispatched: "
|
|
||||||
+ view.getDispatched().getCount() + ", inflight: "
|
|
||||||
+ view.getInflight().getCount() + ", expiries: "
|
|
||||||
+ view.getExpired().getCount());
|
|
||||||
|
|
||||||
return view.getMessages().getCount() == 0;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
view = getDestinationStatistics(broker, destination);
|
|
||||||
assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount());
|
|
||||||
assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount());
|
|
||||||
}
|
|
||||||
|
|
||||||
private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception {
|
|
||||||
BrokerService broker = new BrokerService();
|
BrokerService broker = new BrokerService();
|
||||||
broker.setBrokerName("localhost");
|
broker.setBrokerName("localhost");
|
||||||
broker.setDestinations(new ActiveMQDestination[]{destination});
|
broker.setDestinations(new ActiveMQDestination[]{destination});
|
||||||
|
@ -307,6 +375,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
|
||||||
}
|
}
|
||||||
defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod);
|
defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod);
|
||||||
defaultPolicy.setMaxExpirePageSize(1200);
|
defaultPolicy.setMaxExpirePageSize(1200);
|
||||||
|
defaultPolicy.setUsePrefetchExtension(usePrefetchExtension);
|
||||||
PolicyMap policyMap = new PolicyMap();
|
PolicyMap policyMap = new PolicyMap();
|
||||||
policyMap.setDefaultEntry(defaultPolicy);
|
policyMap.setDefaultEntry(defaultPolicy);
|
||||||
broker.setDestinationPolicy(policyMap);
|
broker.setDestinationPolicy(policyMap);
|
||||||
|
|
Loading…
Reference in New Issue