[AMQ-6667] gate cursor cache enablement on a single pending send and tidy up setbatch to always check outstanding async future list. Fix and test

This commit is contained in:
gtully 2017-05-03 11:36:06 +01:00
parent 57795bafce
commit a0ba0bf4c6
5 changed files with 430 additions and 38 deletions

View File

@ -38,6 +38,7 @@ import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -114,6 +115,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// Messages that are paged in but have not yet been targeted at a subscription // Messages that are paged in but have not yet been targeted at a subscription
private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
protected QueueDispatchPendingList dispatchPendingList = new QueueDispatchPendingList(); protected QueueDispatchPendingList dispatchPendingList = new QueueDispatchPendingList();
private AtomicInteger pendingSends = new AtomicInteger(0);
private MessageGroupMap messageGroupOwners; private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory(); private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory();
@ -149,7 +151,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
private final Object iteratingMutex = new Object(); private final Object iteratingMutex = new Object();
// gate on enabling cursor cache to ensure no outstanding sync
// send before async sends resume
public boolean singlePendingSend() {
return pendingSends.get() <= 1;
}
class TimeoutMessage implements Delayed { class TimeoutMessage implements Delayed {
@ -825,6 +831,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
ListenableFuture<Object> result = null; ListenableFuture<Object> result = null;
producerExchange.incrementSend(); producerExchange.incrementSend();
pendingSends.incrementAndGet();
do { do {
checkUsage(context, producerExchange, message); checkUsage(context, producerExchange, message);
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
@ -845,6 +852,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// we may have a store in inconsistent state, so reset the cursor // we may have a store in inconsistent state, so reset the cursor
// before restarting normal broker operations // before restarting normal broker operations
resetNeeded = true; resetNeeded = true;
pendingSends.decrementAndGet();
throw e; throw e;
} }
} }
@ -1837,6 +1845,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} }
final void messageSent(final ConnectionContext context, final Message msg) throws Exception { final void messageSent(final ConnectionContext context, final Message msg) throws Exception {
pendingSends.decrementAndGet();
destinationStatistics.getEnqueues().increment(); destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment(); destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize()); destinationStatistics.getMessageSize().addSize(msg.getSize());
@ -1983,7 +1992,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// store should have trapped duplicate in it's index, or cursor audit trapped insert // store should have trapped duplicate in it's index, or cursor audit trapped insert
// or producerBrokerExchange suppressed send. // or producerBrokerExchange suppressed send.
// note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id // note: jdbc store will not trap unacked messages as a duplicate b/c it gives each message a unique sequence id
LOG.warn("{}, duplicate message {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessage()); LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
if (store != null) { if (store != null) {
ConnectionContext connectionContext = createConnectionContext(); ConnectionContext connectionContext = createConnectionContext();
dropMessage(ref); dropMessage(ref);

View File

@ -232,12 +232,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public synchronized boolean tryAddMessageLast(MessageReference node, long wait) throws Exception { public synchronized boolean tryAddMessageLast(MessageReference node, long wait) throws Exception {
boolean disableCache = false; boolean disableCache = false;
if (hasSpace()) { if (hasSpace()) {
if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
if (LOG.isTraceEnabled()) {
LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
}
setCacheEnabled(true);
}
if (isCacheEnabled()) { if (isCacheEnabled()) {
if (recoverMessage(node.getMessage(),true)) { if (recoverMessage(node.getMessage(),true)) {
trackLastCached(node); trackLastCached(node);
@ -261,41 +255,68 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return true; return true;
} }
@Override
public synchronized boolean isCacheEnabled() {
return super.isCacheEnabled() || enableCacheNow();
}
protected boolean enableCacheNow() {
boolean result = false;
if (canEnableCash()) {
setCacheEnabled(true);
result = true;
if (LOG.isTraceEnabled()) {
LOG.trace("{} enabling cache on empty store", this);
}
}
return result;
}
protected boolean canEnableCash() {
return useCache && size==0 && hasSpace() && isStarted();
}
private void syncWithStore(Message currentAdd) throws Exception { private void syncWithStore(Message currentAdd) throws Exception {
pruneLastCached(); pruneLastCached();
if (lastCachedIds[SYNC_ADD] == null) { for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {
// possibly only async adds, lets wait on the potential last add and reset from there MessageId lastPending = it.previous();
for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { Object futureOrLong = lastPending.getFutureOrSequenceLong();
MessageId lastPending = it.previous(); if (futureOrLong instanceof Future) {
Object futureOrLong = lastPending.getFutureOrSequenceLong(); Future future = (Future) futureOrLong;
if (futureOrLong instanceof Future) { if (future.isCancelled()) {
Future future = (Future) futureOrLong; continue;
if (future.isCancelled()) { }
continue; try {
} future.get(5, TimeUnit.SECONDS);
try {
future.get(5, TimeUnit.SECONDS);
setLastCachedId(ASYNC_ADD, lastPending);
} catch (CancellationException ok) {
continue;
} catch (TimeoutException potentialDeadlock) {
LOG.debug("{} timed out waiting for async add", this, potentialDeadlock);
} catch (Exception worstCaseWeReplay) {
LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay);
}
} else {
setLastCachedId(ASYNC_ADD, lastPending); setLastCachedId(ASYNC_ADD, lastPending);
} catch (CancellationException ok) {
continue;
} catch (TimeoutException potentialDeadlock) {
LOG.debug("{} timed out waiting for async add", this, potentialDeadlock);
} catch (Exception worstCaseWeReplay) {
LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay);
} }
break; } else {
setLastCachedId(ASYNC_ADD, lastPending);
} }
if (lastCachedIds[ASYNC_ADD] != null) { break;
// ensure we don't skip current possibly sync add b/c we waited on the future }
if (isAsync(currentAdd) || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) {
setBatch(lastCachedIds[ASYNC_ADD]); MessageId candidate = lastCachedIds[ASYNC_ADD];
if (candidate != null) {
// ensure we don't skip current possibly sync add b/c we waited on the future
if (!isAsync(currentAdd) && Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) < 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("no set batch from async:" + candidate.getFutureOrSequenceLong() + " >= than current: " + currentAdd.getMessageId().getFutureOrSequenceLong() + ", " + this);
} }
candidate = null;
} }
} else { }
setBatch(lastCachedIds[SYNC_ADD]); if (candidate == null) {
candidate = lastCachedIds[SYNC_ADD];
}
if (candidate != null) {
setBatch(candidate);
} }
// cleanup // cleanup
lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null; lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null;
@ -355,6 +376,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
} else if (candidateOrSequenceLong != null && } else if (candidateOrSequenceLong != null &&
Long.compare(((Long) candidateOrSequenceLong), ((Long) lastCacheFutureOrSequenceLong)) > 0) { Long.compare(((Long) candidateOrSequenceLong), ((Long) lastCacheFutureOrSequenceLong)) > 0) {
lastCachedIds[index] = candidate; lastCachedIds[index] = candidate;
} if (LOG.isTraceEnabled()) {
LOG.trace("no set last cached[" + index + "] current:" + lastCacheFutureOrSequenceLong + " <= than candidate: " + candidateOrSequenceLong+ ", " + this);
} }
} }
} }

View File

@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
class QueueStorePrefetch extends AbstractStoreCursor { class QueueStorePrefetch extends AbstractStoreCursor {
private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class); private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
private final MessageStore store; private final MessageStore store;
private final Queue queue;
private final Broker broker; private final Broker broker;
/** /**
@ -46,6 +47,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
*/ */
public QueueStorePrefetch(Queue queue, Broker broker) { public QueueStorePrefetch(Queue queue, Broker broker) {
super(queue); super(queue);
this.queue = queue;
this.store = queue.getMessageStore(); this.store = queue.getMessageStore();
this.broker = broker; this.broker = broker;
@ -87,6 +89,11 @@ class QueueStorePrefetch extends AbstractStoreCursor {
} }
} }
@Override
protected boolean canEnableCash() {
return super.canEnableCash() && queue.singlePendingSend();
}
@Override @Override
protected synchronized boolean isStoreEmpty() { protected synchronized boolean isStoreEmpty() {
try { try {

View File

@ -184,7 +184,7 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(1); msg = getMessage(1);
messages[0] = msg; messages[0] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage()); msg.setMemoryUsage(systemUsage.getMemoryUsage());
msg.getMessageId().setFutureOrSequenceLong(1l); msg.getMessageId().setFutureOrSequenceLong(0l);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
@ -354,7 +354,7 @@ public class StoreQueueCursorOrderTest {
msg = getMessage(3); msg = getMessage(3);
messages[2] = msg; messages[2] = msg;
msg.setMemoryUsage(systemUsage.getMemoryUsage()); msg.setMemoryUsage(systemUsage.getMemoryUsage());
msg.getMessageId().setFutureOrSequenceLong(3l); msg.getMessageId().setFutureOrSequenceLong(2l);
underTest.addMessageLast(msg); underTest.addMessageLast(msg);
@ -375,6 +375,14 @@ public class StoreQueueCursorOrderTest {
} }
underTest.release(); underTest.release();
assertEquals(count, dequeueCount); assertEquals(count, dequeueCount);
msg = getMessage(4);
msg.setMemoryUsage(systemUsage.getMemoryUsage());
msg.getMessageId().setFutureOrSequenceLong(4l);
underTest.addMessageLast(msg);
assertTrue("cache enabled on empty store", underTest.isCacheEnabled());
} }
@Test @Test

View File

@ -0,0 +1,345 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
public class DuplicateFromStoreTest {
static Logger LOG = LoggerFactory.getLogger(DuplicateFromStoreTest.class);
String activemqURL;
BrokerService broker;
protected final static String DESTNAME = "TEST";
protected final static int NUM_PRODUCERS = 100;
protected final static int NUM_CONSUMERS = 20;
protected final static int NUM_MSGS = 40000;
protected final static int CONSUMER_SLEEP = 0;
protected final static int PRODUCER_SLEEP = 10;
public static CountDownLatch producersFinished = new CountDownLatch(NUM_PRODUCERS);
public static CountDownLatch consumersFinished = new CountDownLatch(NUM_CONSUMERS );
public AtomicInteger totalMessagesToSend = new AtomicInteger(NUM_MSGS);
public AtomicInteger totalReceived = new AtomicInteger(0);
public int messageSize = 16*1000;
@Before
public void startBroker() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://0.0.0.0:0");
// Create <policyEntry>
PolicyEntry policy = new PolicyEntry();
ActiveMQDestination dest = new ActiveMQQueue(">");
policy.setDestination(dest);
policy.setMemoryLimit(10 * 1024 * 1024); // 10 MB
policy.setExpireMessagesPeriod(0);
policy.setEnableAudit(false); // allow any duplicates from the store to bubble up to the q impl
PolicyMap policies = new PolicyMap();
policies.put(dest, policy);
broker.setDestinationPolicy(policies);
// configure <systemUsage>
MemoryUsage memoryUsage = new MemoryUsage();
memoryUsage.setPercentOfJvmHeap(70);
StoreUsage storeUsage = new StoreUsage();
storeUsage.setLimit(8 * 1024 * 1024 * 1024); // 8 gb
SystemUsage memoryManager = new SystemUsage();
memoryManager.setMemoryUsage(memoryUsage);
memoryManager.setStoreUsage(storeUsage);
broker.setSystemUsage(memoryManager);
// configure KahaDB persistence
PersistenceAdapter kahadb = new KahaDBStore();
((KahaDBStore) kahadb).setConcurrentStoreAndDispatchQueues(true);
broker.setPersistenceAdapter(kahadb);
// start broker
broker.start();
broker.waitUntilStarted();
activemqURL = broker.getTransportConnectorByScheme("tcp").getPublishableConnectString();
}
@After
public void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
}
}
@Test
public void testDuplicateMessage() throws Exception {
LOG.info("Testing for duplicate messages.");
//create producer and consumer threads
ExecutorService producers = Executors.newFixedThreadPool(NUM_PRODUCERS);
ExecutorService consumers = Executors.newFixedThreadPool(NUM_CONSUMERS);
createOpenwireClients(producers, consumers);
LOG.info("All producers and consumers got started. Awaiting their termination");
producersFinished.await(100, TimeUnit.MINUTES);
LOG.info("All producers have terminated.");
consumersFinished.await(100, TimeUnit.MINUTES);
LOG.info("All consumers have terminated.");
producers.shutdownNow();
consumers.shutdownNow();
assertEquals("no messages pending, i.e. dlq empty", 0l, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
// validate cache can be enabled if disabled
}
protected void createOpenwireClients(ExecutorService producers, ExecutorService consumers) {
for (int i = 0; i < NUM_CONSUMERS; i++) {
LOG.trace("Creating consumer for destination " + DESTNAME);
Consumer consumer = new Consumer(DESTNAME, false);
consumers.submit(consumer);
// wait for consumer to signal it has fully initialized
synchronized(consumer.init) {
try {
consumer.init.wait();
} catch (InterruptedException e) {
LOG.error(e.toString(), e);
}
}
}
for (int i = 0; i < NUM_PRODUCERS; i++) {
LOG.trace("Creating producer for destination " + DESTNAME );
Producer producer = new Producer(DESTNAME, false, 0);
producers.submit(producer);
}
}
class Producer implements Runnable {
Logger log = LOG;
protected String destName = "TEST";
protected boolean isTopicDest = false;
public Producer(String dest, boolean isTopic, int ttl) {
this.destName = dest;
this.isTopicDest = isTopic;
}
/**
* Connect to broker and constantly send messages
*/
public void run() {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(activemqURL);
connection = amq.createConnection();
connection.setExceptionListener(new javax.jms.ExceptionListener() {
public void onException(javax.jms.JMSException e) {
e.printStackTrace();
}
});
connection.start();
// Create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination;
if (isTopicDest) {
// Create the destination (Topic or Queue)
destination = session.createTopic(destName);
} else {
destination = session.createQueue(destName);
}
// Create a MessageProducer from the Session to the Topic or Queue
producer = session.createProducer(destination);
// Create message
long counter = 0;
//enlarge msg to 16 kb
int msgSize = 16 * 1024;
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.setLength(msgSize + 15);
stringBuilder.append("Message: ");
stringBuilder.append(counter);
for (int j = 0; j < (msgSize / 10); j++) {
stringBuilder.append("XXXXXXXXXX");
}
String text = stringBuilder.toString();
TextMessage message = session.createTextMessage(text);
// send message
while (totalMessagesToSend.decrementAndGet() >= 0) {
producer.send(message);
log.debug("Sent message: " + counter);
counter++;
if ((counter % 10000) == 0)
log.info("sent " + counter + " messages");
Thread.sleep(PRODUCER_SLEEP);
}
} catch (Exception ex) {
log.error(ex.getMessage());
return;
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (Exception ignored) {
} finally {
producersFinished.countDown();
}
}
log.debug("Closing producer for " + destName);
}
}
class Consumer implements Runnable {
public Object init = new Object();
protected String queueName = "TEST";
boolean isTopic = false;
Logger log = LOG;
public Consumer(String destName, boolean topic) {
this.isTopic = topic;
this.queueName = destName;
}
/**
* connect to broker and receive messages
*/
public void run() {
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(activemqURL);
connection = amq.createConnection();
connection.setExceptionListener(new javax.jms.ExceptionListener() {
public void onException(javax.jms.JMSException e) {
e.printStackTrace();
}
});
connection.start();
// Create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = null;
if (isTopic)
destination = session.createTopic(queueName);
else
destination = session.createQueue(queueName);
//Create a MessageConsumer from the Session to the Topic or Queue
consumer = session.createConsumer(destination);
synchronized (init) {
init.notifyAll();
}
// Wait for a message
long counter = 0;
while (totalReceived.get() < NUM_MSGS) {
Message message2 = consumer.receive(5000);
if (message2 instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message2;
String text = textMessage.getText();
log.debug("Received: " + text.substring(0, 50));
} else {
if (totalReceived.get() < NUM_MSGS) {
log.error("Received message of unsupported type. Expecting TextMessage. " + message2);
}
break;
}
if (message2 != null) {
counter++;
totalReceived.incrementAndGet();
if ((counter % 10000) == 0)
log.info("received " + counter + " messages");
Thread.sleep(CONSUMER_SLEEP);
}
}
} catch (Exception e) {
log.error("Error in Consumer: " + e.getMessage());
return;
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (Exception ignored) {
} finally {
consumersFinished.countDown();
}
}
}
}
}