Merging AMQ-6577

This closes #224
This commit is contained in:
Christopher L. Shannon (cshannon) 2017-01-30 11:16:15 -05:00
commit b51e0bf40c
6 changed files with 183 additions and 99 deletions

View File

@ -52,6 +52,7 @@ public abstract class AbstractSubscription implements Subscription {
protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
protected final AtomicInteger prefetchExtension = new AtomicInteger(0);
private boolean usePrefetchExtension = true;
private BooleanExpression selectorExpression;
private ObjectName objectName;
private int cursorMemoryHighWaterMark = 70;
@ -185,6 +186,14 @@ public abstract class AbstractSubscription implements Subscription {
return info.getPrefetchSize();
}
public boolean isUsePrefetchExtension() {
return usePrefetchExtension;
}
public void setUsePrefetchExtension(boolean usePrefetchExtension) {
this.usePrefetchExtension = usePrefetchExtension;
}
public void setPrefetchSize(int newSize) {
info.setPrefetchSize(newSize);
}

View File

@ -56,7 +56,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
protected PendingMessageCursor pending;
protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
protected boolean usePrefetchExtension = true;
private int maxProducersToAudit=32;
private int maxAuditDepth=2048;
protected final SystemUsage usageManager;
@ -263,7 +262,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
registerRemoveSync(context, node);
}
if (usePrefetchExtension && getPrefetchSize() != 0 && ack.isInTransaction()) {
if (isUsePrefetchExtension() && getPrefetchSize() != 0 && ack.isInTransaction()) {
// allow transaction batch to exceed prefetch
while (true) {
int currentExtension = prefetchExtension.get();
@ -288,7 +287,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
final MessageReference node = iter.next();
Destination nodeDest = (Destination) node.getRegionDestination();
if (ack.getLastMessageId().equals(node.getMessageId())) {
if (usePrefetchExtension && getPrefetchSize() != 0) {
if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
// allow batch to exceed prefetch
while (true) {
int currentExtension = prefetchExtension.get();
@ -328,7 +327,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
nodeDest.getDestinationStatistics().getInflight().decrement();
if (ack.getLastMessageId().equals(messageId)) {
if (usePrefetchExtension && getPrefetchSize() != 0) {
if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
// allow batch to exceed prefetch
while (true) {
int currentExtension = prefetchExtension.get();
@ -444,7 +443,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
@Override
public void beforeEnd() {
if (usePrefetchExtension && getPrefetchSize() != 0) {
if (isUsePrefetchExtension() && getPrefetchSize() != 0) {
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - 1);
@ -892,14 +891,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
}
public boolean isUsePrefetchExtension() {
return usePrefetchExtension;
}
public void setUsePrefetchExtension(boolean usePrefetchExtension) {
this.usePrefetchExtension = usePrefetchExtension;
}
@Override
public void setPrefetchSize(int prefetchSize) {
this.info.setPrefetchSize(prefetchSize);

View File

@ -422,6 +422,9 @@ public class TopicSubscription extends AbstractSubscription {
}
private void incrementPrefetchExtension(int amount) {
if (!isUsePrefetchExtension()) {
return;
}
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension + amount);
@ -748,7 +751,8 @@ public class TopicSubscription extends AbstractSubscription {
@Override
public String toString() {
return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
+ getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get();
+ getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get()
+ ", usePrefetchExtension=" + isUsePrefetchExtension();
}
@Override
@ -781,4 +785,5 @@ public class TopicSubscription extends AbstractSubscription {
LOG.trace("Caught exception on dispatch after prefetch size change.");
}
}
}

View File

@ -313,6 +313,7 @@ public class PolicyEntry extends DestinationMapEntry {
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
configurePrefetch(subscription);
subscription.setUsePrefetchExtension(isUsePrefetchExtension());
subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
if (pendingMessageLimitStrategy != null) {
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);

View File

@ -18,7 +18,7 @@ package org.apache.activemq;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.List;
import java.util.Map;
import javax.jms.Connection;
@ -28,13 +28,13 @@ import javax.jms.Message;
import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
@ -157,6 +157,15 @@ public abstract class TestSupport extends CombinationTestSupport {
return result;
}
public static List<Subscription> getDestinationConsumers(BrokerService broker, ActiveMQDestination destination) {
List<Subscription> result = null;
org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination);
if (dest != null) {
result = dest.getConsumers();
}
return result;
}
public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) {
org.apache.activemq.broker.region.Destination result = null;
for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) {

View File

@ -22,11 +22,14 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
@ -34,9 +37,11 @@ import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.activemq.TestSupport.getDestination;
import static org.apache.activemq.TestSupport.getDestinationConsumers;
import static org.apache.activemq.TestSupport.getDestinationStatistics;
public class ExpiredMessagesTest extends CombinationTestSupport {
@ -48,11 +53,12 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
Session session;
MessageProducer producer;
MessageConsumer consumer;
public ActiveMQDestination destination = new ActiveMQQueue("test");
public ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
public boolean useTextMessage = true;
public boolean useVMCursor = true;
protected String brokerUri;
private ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ");
private boolean useTextMessage = true;
private boolean useVMCursor = true;
private boolean deleteAllMessages = true;
private boolean usePrefetchExtension = true;
private String brokerUri;
public static Test suite() {
return suite(ExpiredMessagesTest.class);
@ -64,20 +70,153 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
@Override
protected void setUp() throws Exception {
final boolean deleteAllMessages = true;
broker = createBroker(deleteAllMessages, 100);
brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
@Override
protected void tearDown() throws Exception {
if (null != producer) {
producer.close();
}
if (null != consumer) {
consumer.close();
}
session.close();
connection.stop();
broker.stop();
broker.waitUntilStopped();
}
public void testExpiredMessages() throws Exception {
final ActiveMQDestination destination = new ActiveMQQueue("test");
final int numMessagesToSend = 10000;
buildBroker(destination);
final DestinationStatistics view = verifyMessageExpirationOnDestination(destination, numMessagesToSend);
verifyDestinationDlq(destination, numMessagesToSend, view);
}
public void testExpiredMessages_onTopic_withPrefetchExtension() throws Exception {
final ActiveMQDestination destination = new ActiveMQTopic("test");
final int numMessagesToSend = 10000;
usePrefetchExtension = true;
buildBroker(destination);
verifyMessageExpirationOnDestination(destination, numMessagesToSend);
// We don't check the DLQ because non-persistent messages on topics are discarded instead.
final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
assertTrue("prefetch extension was not incremented",
subscriptions.stream().
filter(s -> s instanceof TopicSubscription).
mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
allMatch(e -> e > 0));
}
public void testExpiredMessages_onTopic_withoutPrefetchExtension() throws Exception {
final ActiveMQDestination destination = new ActiveMQTopic("test");
final int numMessagesToSend = 10000;
usePrefetchExtension = false;
buildBroker(destination);
verifyMessageExpirationOnDestination(destination, numMessagesToSend);
// We don't check the DLQ because non-persistent messages on topics are discarded instead.
final List<Subscription> subscriptions = getDestinationConsumers(broker, destination);
assertTrue("prefetch extension was incremented",
subscriptions.stream().
filter(s -> s instanceof TopicSubscription).
mapToInt(s -> ((TopicSubscription)s).getPrefetchExtension().get()).
allMatch(e -> e == 0));
}
private void buildBroker(ActiveMQDestination destination) throws Exception {
broker = createBroker(deleteAllMessages, usePrefetchExtension, 100, destination);
brokerUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
public void testRecoverExpiredMessages() throws Exception {
final ActiveMQDestination destination = new ActiveMQQueue("test");
buildBroker(destination);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"failover://"+brokerUri);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
producer.setTimeToLive(2000);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Thread producingThread = new Thread("Producing Thread") {
@Override
public void run() {
try {
int i = 0;
while (i++ < 1000) {
Message message = useTextMessage ? session
.createTextMessage("test") : session
.createObjectMessage("test");
producer.send(message);
}
} catch (Throwable ex) {
ex.printStackTrace();
}
}
};
producingThread.start();
producingThread.join();
DestinationStatistics view = getDestinationStatistics(broker, destination);
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
+ view.getEnqueues().getCount() + ", dequeues: "
+ view.getDequeues().getCount() + ", dispatched: "
+ view.getDispatched().getCount() + ", inflight: "
+ view.getInflight().getCount() + ", expiries: "
+ view.getExpired().getCount());
LOG.info("stopping broker");
broker.stop();
broker.waitUntilStopped();
Thread.sleep(5000);
LOG.info("recovering broker");
final boolean deleteAllMessages = false;
final boolean usePrefetchExtension = true;
broker = createBroker(deleteAllMessages, usePrefetchExtension, 5000, destination);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
DestinationStatistics view = getDestinationStatistics(broker, destination);
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
+ view.getEnqueues().getCount() + ", dequeues: "
+ view.getDequeues().getCount() + ", dispatched: "
+ view.getDispatched().getCount() + ", inflight: "
+ view.getInflight().getCount() + ", expiries: "
+ view.getExpired().getCount());
return view.getMessages().getCount() == 0;
}
});
view = getDestinationStatistics(broker, destination);
assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount());
assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount());
}
private DestinationStatistics verifyMessageExpirationOnDestination(ActiveMQDestination destination, final int numMessagesToSend) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -100,7 +239,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
Thread.sleep(100);
end = System.currentTimeMillis();
}
consumer.close();
} catch (Throwable ex) {
ex.printStackTrace();
}
@ -109,7 +247,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
consumerThread.start();
final int numMessagesToSend = 10000;
Thread producingThread = new Thread("Producing Thread") {
@Override
public void run() {
@ -118,7 +255,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
while (i++ < numMessagesToSend) {
producer.send(session.createTextMessage("test"));
}
producer.close();
} catch (Throwable ex) {
ex.printStackTrace();
}
@ -129,7 +265,6 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
consumerThread.join();
producingThread.join();
session.close();
final DestinationStatistics view = getDestinationStatistics(broker, destination);
@ -171,7 +306,10 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
return view.getMessages().getCount() == 0;
}
}));
return view;
}
private void verifyDestinationDlq(ActiveMQDestination destination, int numMessagesToSend, DestinationStatistics view) throws Exception {
final long expiredBeforeEnqueue = numMessagesToSend - view.getEnqueues().getCount();
final long totalExpiredCount = view.getExpired().getCount() + expiredBeforeEnqueue;
@ -225,77 +363,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
}
public void testRecoverExpiredMessages() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
"failover://"+brokerUri);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(destination);
producer.setTimeToLive(2000);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Thread producingThread = new Thread("Producing Thread") {
@Override
public void run() {
try {
int i = 0;
while (i++ < 1000) {
Message message = useTextMessage ? session
.createTextMessage("test") : session
.createObjectMessage("test");
producer.send(message);
}
producer.close();
} catch (Throwable ex) {
ex.printStackTrace();
}
}
};
producingThread.start();
producingThread.join();
DestinationStatistics view = getDestinationStatistics(broker, destination);
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
+ view.getEnqueues().getCount() + ", dequeues: "
+ view.getDequeues().getCount() + ", dispatched: "
+ view.getDispatched().getCount() + ", inflight: "
+ view.getInflight().getCount() + ", expiries: "
+ view.getExpired().getCount());
LOG.info("stopping broker");
broker.stop();
broker.waitUntilStopped();
Thread.sleep(5000);
LOG.info("recovering broker");
final boolean deleteAllMessages = false;
broker = createBroker(deleteAllMessages, 5000);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
DestinationStatistics view = getDestinationStatistics(broker, destination);
LOG.info("Stats: size: " + view.getMessages().getCount() + ", enqueues: "
+ view.getEnqueues().getCount() + ", dequeues: "
+ view.getDequeues().getCount() + ", dispatched: "
+ view.getDispatched().getCount() + ", inflight: "
+ view.getInflight().getCount() + ", expiries: "
+ view.getExpired().getCount());
return view.getMessages().getCount() == 0;
}
});
view = getDestinationStatistics(broker, destination);
assertEquals("Expect empty queue, QueueSize: ", 0, view.getMessages().getCount());
assertEquals("all dequeues were expired", view.getDequeues().getCount(), view.getExpired().getCount());
}
private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception {
private BrokerService createBroker(boolean deleteAllMessages, boolean usePrefetchExtension, long expireMessagesPeriod, ActiveMQDestination destination) throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("localhost");
broker.setDestinations(new ActiveMQDestination[]{destination});
@ -307,6 +375,7 @@ public class ExpiredMessagesTest extends CombinationTestSupport {
}
defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod);
defaultPolicy.setMaxExpirePageSize(1200);
defaultPolicy.setUsePrefetchExtension(usePrefetchExtension);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(defaultPolicy);
broker.setDestinationPolicy(policyMap);