https://issues.apache.org/jira/browse/AMQ-4485 - fix test regression with browse test - AMQ4595Test - reduce replay window when sync and asnyc cursor updates flip message order - concurrentStoreAndDispatch=true - https://issues.apache.org/jira/browse/AMQ-5266 - increse default audit depth to match async jobs for concurrent store

This commit is contained in:
gtully 2014-10-09 13:47:31 +01:00
parent a56996dd6c
commit 97c127d2d4
6 changed files with 92 additions and 37 deletions

View File

@ -57,7 +57,7 @@ public abstract class BaseDestination implements Destination {
public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
public static final int MAX_PRODUCERS_TO_AUDIT = 64;
public static final int MAX_AUDIT_DEPTH = 2048;
public static final int MAX_AUDIT_DEPTH = 10000;
protected final ActiveMQDestination destination;
protected final Broker broker;

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.cursors;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.concurrent.Future;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
@ -41,7 +42,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
private boolean storeHasMessages = false;
protected int size;
private LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
MessageId lastCachedId = null;
private static int SYNC_ADD = 0;
private static int ASYNC_ADD = 1;
final MessageId[] lastCachedIds = new MessageId[2];
protected boolean hadSpace = false;
protected AbstractStoreCursor(Destination destination) {
@ -203,12 +206,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
if (isCacheEnabled()) {
if (recoverMessage(node.getMessage(),true)) {
if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) {
pruneLastCached();
pendingCachedIds.add(node.getMessageId());
} else {
setLastCachedId(node.getMessageId());
}
trackLastCached(node);
} else {
dealWithDuplicates();
return false;
@ -219,24 +217,78 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
if (disableCache && isCacheEnabled()) {
LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
setCacheEnabled(false);
// sync with store on disabling the cache
if (!pendingCachedIds.isEmpty() || lastCachedId != null) {
LOG.trace("{} - disabling cache. current Id: {} seq: {}, batchList size: {}",
new Object[]{this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong(), batchList.size()});
pruneLastCached();
if (lastCachedId != null) {
setBatch(lastCachedId);
lastCachedId = null;
pendingCachedIds.clear();
}
}
syncWithStore();
}
this.storeHasMessages = true;
size++;
return true;
}
private void syncWithStore() throws Exception {
if (lastCachedIds[SYNC_ADD] == null) {
// only async adds, lets wait on the potential last add and reset from there
for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {
MessageId lastStored = it.previous();
Object futureOrLong = lastStored.getFutureOrSequenceLong();
if (futureOrLong instanceof Future) {
Future future = (Future) futureOrLong;
if (future.isCancelled()) {
continue;
} else {
try {
future.get();
setLastCachedId(ASYNC_ADD, lastStored);
} catch (Exception ignored) {}
}
}
}
if (lastCachedIds[ASYNC_ADD] != null) {
setBatch(lastCachedIds[ASYNC_ADD]);
}
} else {
// mix of async and sync - async can exceed sync only if next in sequence
for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
MessageId candidate = it.next();
final Object futureOrLong = candidate.getFutureOrSequenceLong();
if (futureOrLong instanceof Future) {
Future future = (Future) futureOrLong;
if (future.isCancelled()) {
it.remove();
} else {
try {
future.get();
long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong();
if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), next) == 0) {
setLastCachedId(SYNC_ADD, candidate);
} else {
// out of sequence, revert to sync state
LOG.trace("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size());
break;
}
} catch (Exception ignored) {}
}
}
}
if (lastCachedIds[SYNC_ADD] != null) {
setBatch(lastCachedIds[SYNC_ADD]);
}
}
// cleanup
lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null;
pendingCachedIds.clear();
}
private void trackLastCached(MessageReference node) {
if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) {
pruneLastCached();
pendingCachedIds.add(node.getMessageId());
} else {
setLastCachedId(SYNC_ADD, node.getMessageId());
}
}
private void pruneLastCached() {
for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
@ -247,21 +299,22 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
if (future.isCancelled()) {
it.remove();
} else {
// we don't want to wait for work to complete
break;
}
} else {
// store complete - track via lastCachedId
setLastCachedId(candidate);
// complete
setLastCachedId(ASYNC_ADD, candidate);
it.remove();
}
}
}
private void setLastCachedId(MessageId candidate) {
if (lastCachedId == null || lastCachedId.getFutureOrSequenceLong() == null) { // possibly null for topics
lastCachedId = candidate;
} else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedId.getFutureOrSequenceLong())) > 0) {
lastCachedId = candidate;
private void setLastCachedId(final int index, MessageId candidate) {
if (lastCachedIds[index] == null || lastCachedIds[index].getFutureOrSequenceLong() == null) { // possibly null for topics
lastCachedIds[index] = candidate;
} else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedIds[index].getFutureOrSequenceLong())) > 0) {
lastCachedIds[index] = candidate;
}
}
@ -351,7 +404,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public String toString() {
return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
+ ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
+ ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + ",lastCachedId:" + lastCachedId + ",lastCachedId-seq:" + (lastCachedId != null ? lastCachedId.getFutureOrSequenceLong() : "null");
+ ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size()
+ ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null")
+ ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null");
}
protected abstract void doFillBatch() throws Exception;

View File

@ -94,7 +94,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
@Override
protected void setBatch(MessageId messageId) throws Exception {
LOG.trace("{} setBatch {} loc: {}", this, messageId, messageId.getEntryLocator());
LOG.trace("{} setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator());
store.setBatch(messageId);
batchResetNeeded = false;
}

View File

@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -86,7 +87,7 @@ import org.slf4j.LoggerFactory;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
private static final int MAX_ASYNC_JOBS = 10000;
private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(

View File

@ -101,7 +101,6 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
addNetworkConnector(broker);
}
broker.setSchedulePeriodForDestinationPurge(0);
//broker.getSystemUsage().setSendFailIfNoSpace(true);
broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024l);
@ -406,7 +405,6 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
int id = numMessages - val - 1;
ActiveMQQueue compositeQ = new ActiveMQQueue("IN");
//LOG.info("Send to: " + ((ActiveMQConnection) queueConnection).getBrokerName() + ", " + val + ", dest:" + compositeQ);
Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + id + " payload:" + payload);
textMessage.setIntProperty("NUM", id);
producer.send(compositeQ, textMessage);

View File

@ -16,12 +16,9 @@
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import java.net.URI;
import java.util.Date;
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
@ -29,13 +26,10 @@ import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
@ -43,6 +37,9 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
public class AMQ4595Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ4595Test.class);
@ -112,6 +109,8 @@ public class AMQ4595Test {
}
producerConnection.close();
LOG.info("Mem usage after producer done: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%");
// Browse the queue.
Connection connection = factory.createConnection();
connection.start();
@ -131,6 +130,8 @@ public class AMQ4595Test {
session.close();
connection.close();
LOG.info("Mem usage after browser closed: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%");
// The number of messages browsed should be equal to the number of messages sent.
assertEquals(messageToSend, browsed);