mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3188 - Full table scan for durable subs in jdbc store when priority enabled; very slow with large message backlog
added more state to the topic message store such that it can ask the db for a single priority at a time which is indexed. This avoids a full table scan. send rate with active durable subs vs inactive durable subs is now in the region of 6x from 40x. validation test included. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1073453 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c6ede162c3
commit
cb967831d6
|
@ -131,7 +131,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
|||
topic.activate(context, this);
|
||||
}
|
||||
}
|
||||
synchronized (pending) {
|
||||
synchronized (pendingLock) {
|
||||
pending.setSystemUsage(memoryManager);
|
||||
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
pending.setMaxAuditDepth(getMaxAuditDepth());
|
||||
|
|
|
@ -64,7 +64,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
private int maxProducersToAudit=32;
|
||||
private int maxAuditDepth=2048;
|
||||
protected final SystemUsage usageManager;
|
||||
private final Object pendingLock = new Object();
|
||||
protected final Object pendingLock = new Object();
|
||||
private final Object dispatchLock = new Object();
|
||||
private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
|
|||
protected boolean enableAudit=true;
|
||||
protected ActiveMQMessageAudit audit;
|
||||
protected boolean useCache=true;
|
||||
protected boolean cacheEnabled=true;
|
||||
private boolean cacheEnabled=true;
|
||||
private boolean started=false;
|
||||
protected MessageReference last = null;
|
||||
protected final boolean prioritizedMessages;
|
||||
|
@ -329,7 +329,11 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
|
|||
|
||||
}
|
||||
|
||||
public boolean isCacheEnabled() {
|
||||
public synchronized boolean isCacheEnabled() {
|
||||
return cacheEnabled;
|
||||
}
|
||||
|
||||
public synchronized void setCacheEnabled(boolean val) {
|
||||
cacheEnabled = val;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
this.regionDestination=destination;
|
||||
if (this.prioritizedMessages) {
|
||||
this.batchList= new PrioritizedPendingList();
|
||||
}else {
|
||||
} else {
|
||||
this.batchList = new OrderedPendingList();
|
||||
}
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
resetBatch();
|
||||
this.size = getStoreSize();
|
||||
this.storeHasMessages=this.size > 0;
|
||||
cacheEnabled = !this.storeHasMessages&&useCache;
|
||||
setCacheEnabled(!this.storeHasMessages&&useCache);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -95,8 +95,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
* it will be a duplicate - but should be ignored
|
||||
*/
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
|
||||
+ " cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
|
||||
LOG.trace(this + " - cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
|
||||
}
|
||||
}
|
||||
return recovered;
|
||||
|
@ -108,7 +107,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
try {
|
||||
fillBatch();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to fill batch", e);
|
||||
LOG.error(this + " - Failed to fill batch", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
@ -145,7 +144,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
try {
|
||||
fillBatch();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to fill batch", e);
|
||||
LOG.error(this + " - Failed to fill batch", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
@ -169,24 +168,22 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
|
||||
public final synchronized void addMessageLast(MessageReference node) throws Exception {
|
||||
if (hasSpace()) {
|
||||
if (!cacheEnabled && size==0 && isStarted() && useCache) {
|
||||
if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
|
||||
+ " enabling cache for empty store " + node.getMessageId());
|
||||
LOG.trace(this + " - enabling cache for empty store " + node.getMessageId());
|
||||
}
|
||||
cacheEnabled=true;
|
||||
setCacheEnabled(true);
|
||||
}
|
||||
if (cacheEnabled) {
|
||||
if (isCacheEnabled()) {
|
||||
recoverMessage(node.getMessage(),true);
|
||||
lastCachedId = node.getMessageId();
|
||||
}
|
||||
} else if (cacheEnabled) {
|
||||
cacheEnabled=false;
|
||||
} else if (isCacheEnabled()) {
|
||||
setCacheEnabled(false);
|
||||
// sync with store on disabling the cache
|
||||
if (lastCachedId != null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
|
||||
+ " disabling cache on size:" + size
|
||||
LOG.trace(this + " - disabling cache"
|
||||
+ ", lastCachedId: " + lastCachedId
|
||||
+ " current node Id: " + node.getMessageId());
|
||||
}
|
||||
|
@ -203,7 +200,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
|
||||
|
||||
public final synchronized void addMessageFirst(MessageReference node) throws Exception {
|
||||
cacheEnabled=false;
|
||||
setCacheEnabled(false);
|
||||
size++;
|
||||
}
|
||||
|
||||
|
@ -221,7 +218,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
|
||||
public final synchronized void remove(MessageReference node) {
|
||||
size--;
|
||||
cacheEnabled=false;
|
||||
setCacheEnabled(false);
|
||||
batchList.remove(node);
|
||||
}
|
||||
|
||||
|
@ -240,7 +237,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
batchList.clear();
|
||||
clearIterator(false);
|
||||
batchResetNeeded = true;
|
||||
this.cacheEnabled=false;
|
||||
setCacheEnabled(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -251,8 +248,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
|
||||
protected final synchronized void fillBatch() {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded
|
||||
+ ", hasMessages=" + this.storeHasMessages + ", size=" + this.size + ", cacheEnabled=" + this.cacheEnabled);
|
||||
LOG.trace(this + " - fillBatch");
|
||||
}
|
||||
if (batchResetNeeded) {
|
||||
resetBatch();
|
||||
|
@ -263,7 +259,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
try {
|
||||
doFillBatch();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to fill batch", e);
|
||||
LOG.error(this + " - Failed to fill batch", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (!this.batchList.isEmpty() || !hadSpace) {
|
||||
|
@ -290,7 +286,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
public String toString() {
|
||||
return regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
|
||||
+ ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled();
|
||||
}
|
||||
|
||||
protected abstract void doFillBatch() throws Exception;
|
||||
|
||||
|
|
|
@ -199,7 +199,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
|||
if (hasSpace() || this.store == null) {
|
||||
memoryList.add(node);
|
||||
node.incrementReferenceCount();
|
||||
cacheEnabled = true;
|
||||
setCacheEnabled(true);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -247,7 +247,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
|||
if (hasSpace()) {
|
||||
memoryList.addFirst(node);
|
||||
node.incrementReferenceCount();
|
||||
cacheEnabled = true;
|
||||
setCacheEnabled(true);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -428,7 +428,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
|||
|
||||
}
|
||||
memoryList.clear();
|
||||
cacheEnabled = false;
|
||||
setCacheEnabled(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -187,15 +187,15 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
|||
Destination dest = msg.getRegionDestination();
|
||||
TopicStorePrefetch tsp = topics.get(dest);
|
||||
if (tsp != null) {
|
||||
// cache can be come high priority cache for immediate dispatch
|
||||
// cache can become high priority cache for immediate dispatch
|
||||
final int priority = msg.getPriority();
|
||||
if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.cacheEnabled) {
|
||||
if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.isCacheEnabled()) {
|
||||
if (priority > tsp.getCurrentLowestPriority()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("enabling cache for cursor on high priority message " + priority
|
||||
+ ", current lowest: " + tsp.getCurrentLowestPriority());
|
||||
}
|
||||
tsp.cacheEnabled = true;
|
||||
tsp.setCacheEnabled(true);
|
||||
cacheCurrentLowestPriority = tsp.getCurrentLowestPriority();
|
||||
}
|
||||
} else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) {
|
||||
|
@ -206,7 +206,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
|||
+ priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority()
|
||||
+ " cache lowest: " + cacheCurrentLowestPriority);
|
||||
}
|
||||
tsp.cacheEnabled = false;
|
||||
tsp.setCacheEnabled(false);
|
||||
cacheCurrentLowestPriority = UNKNOWN;
|
||||
}
|
||||
tsp.addMessageLast(node);
|
||||
|
|
|
@ -297,7 +297,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
|
|||
|
||||
@Override
|
||||
public boolean isCacheEnabled() {
|
||||
cacheEnabled = isUseCache();
|
||||
boolean cacheEnabled = isUseCache();
|
||||
if (cacheEnabled) {
|
||||
if (persistent != null) {
|
||||
cacheEnabled &= persistent.isCacheEnabled();
|
||||
|
@ -305,6 +305,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
|
|||
if (nonPersistent != null) {
|
||||
cacheEnabled &= nonPersistent.isCacheEnabled();
|
||||
}
|
||||
setCacheEnabled(cacheEnabled);
|
||||
}
|
||||
return cacheEnabled;
|
||||
}
|
||||
|
|
|
@ -132,6 +132,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")";
|
||||
return "TopicStorePrefetch(" + clientId + "," + subscriberName + ")" + super.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,25 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class JDBCMessageStore extends AbstractMessageStore {
|
||||
|
||||
class Duration {
|
||||
static final int LIMIT = 100;
|
||||
final long start = System.currentTimeMillis();
|
||||
final String name;
|
||||
|
||||
Duration(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
void end() {
|
||||
end(null);
|
||||
}
|
||||
void end(Object o) {
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
|
||||
if (duration > LIMIT) {
|
||||
System.err.println(name + " took a long time: " + duration + "ms " + o);
|
||||
}
|
||||
}
|
||||
}
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
|
||||
protected final WireFormat wireFormat;
|
||||
protected final JDBCAdapter adapter;
|
||||
|
@ -58,7 +77,6 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
|||
}
|
||||
|
||||
public void addMessage(ConnectionContext context, Message message) throws IOException {
|
||||
|
||||
MessageId messageId = message.getMessageId();
|
||||
if (audit != null && audit.isDuplicate(message)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -90,6 +108,10 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
|||
} finally {
|
||||
c.close();
|
||||
}
|
||||
onAdd(sequenceId, message.getPriority());
|
||||
}
|
||||
|
||||
protected void onAdd(long sequenceId, byte priority) {
|
||||
}
|
||||
|
||||
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
|
||||
|
|
|
@ -18,10 +18,10 @@ package org.apache.activemq.store.jdbc;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.ActiveMQMessageAudit;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
|
@ -102,22 +102,120 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
|
|||
}
|
||||
}
|
||||
|
||||
private class LastRecovered {
|
||||
long sequence = 0;
|
||||
byte priority = 9;
|
||||
private class LastRecovered implements Iterable<LastRecoveredEntry> {
|
||||
LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
|
||||
LastRecovered() {
|
||||
for (int i=0; i<perPriority.length; i++) {
|
||||
perPriority[i] = new LastRecoveredEntry(i);
|
||||
}
|
||||
}
|
||||
|
||||
public void update(long sequence, Message msg) {
|
||||
this.sequence = sequence;
|
||||
this.priority = msg.getPriority();
|
||||
public void updateStored(long sequence, int priority) {
|
||||
perPriority[priority].stored = sequence;
|
||||
}
|
||||
|
||||
public LastRecoveredEntry defaultPriority() {
|
||||
return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "" + sequence + ":" + priority;
|
||||
return Arrays.deepToString(perPriority);
|
||||
}
|
||||
|
||||
public Iterator<LastRecoveredEntry> iterator() {
|
||||
return new PriorityIterator();
|
||||
}
|
||||
|
||||
class PriorityIterator implements Iterator<LastRecoveredEntry> {
|
||||
int current = 9;
|
||||
public boolean hasNext() {
|
||||
for (int i=current; i>=0; i--) {
|
||||
if (perPriority[i].hasMessages()) {
|
||||
current = i;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public LastRecoveredEntry next() {
|
||||
return perPriority[current];
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new RuntimeException("not implemented");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class LastRecoveredEntry {
|
||||
final int priority;
|
||||
long recovered = 0;
|
||||
long stored = Integer.MAX_VALUE;
|
||||
|
||||
public LastRecoveredEntry(int priority) {
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return priority + "-" + stored + ":" + recovered;
|
||||
}
|
||||
|
||||
public void exhausted() {
|
||||
stored = recovered;
|
||||
}
|
||||
|
||||
public boolean hasMessages() {
|
||||
return stored > recovered;
|
||||
}
|
||||
}
|
||||
|
||||
class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
|
||||
final MessageRecoveryListener delegate;
|
||||
final int maxMessages;
|
||||
LastRecoveredEntry lastRecovered;
|
||||
int recoveredCount;
|
||||
int recoveredMarker;
|
||||
|
||||
public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
|
||||
this.delegate = delegate;
|
||||
this.maxMessages = maxMessages;
|
||||
}
|
||||
|
||||
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
|
||||
if (delegate.hasSpace()) {
|
||||
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
|
||||
msg.getMessageId().setBrokerSequenceId(sequenceId);
|
||||
if (delegate.recoverMessage(msg)) {
|
||||
lastRecovered.recovered = sequenceId;
|
||||
recoveredCount++;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean recoverMessageReference(String reference) throws Exception {
|
||||
return delegate.recoverMessageReference(new MessageId(reference));
|
||||
}
|
||||
|
||||
public void setLastRecovered(LastRecoveredEntry lastRecovered) {
|
||||
this.lastRecovered = lastRecovered;
|
||||
recoveredMarker = recoveredCount;
|
||||
}
|
||||
|
||||
public boolean complete() {
|
||||
return !delegate.hasSpace() || recoveredCount == maxMessages;
|
||||
}
|
||||
|
||||
public boolean stalled() {
|
||||
return recoveredMarker == recoveredCount;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
|
||||
throws Exception {
|
||||
//Duration duration = new Duration("recoverNextMessages");
|
||||
TransactionContext c = persistenceAdapter.getTransactionContext();
|
||||
|
||||
String key = getSubscriptionKey(clientId, subscriptionName);
|
||||
|
@ -125,38 +223,38 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
|
|||
subscriberLastRecoveredMap.put(key, new LastRecovered());
|
||||
}
|
||||
final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
|
||||
LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
|
||||
try {
|
||||
JDBCMessageRecoveryListener jdbcListener = new JDBCMessageRecoveryListener() {
|
||||
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
|
||||
if (listener.hasSpace()) {
|
||||
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
|
||||
msg.getMessageId().setBrokerSequenceId(sequenceId);
|
||||
if (listener.recoverMessage(msg)) {
|
||||
lastRecovered.update(sequenceId, msg);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean recoverMessageReference(String reference) throws Exception {
|
||||
return listener.recoverMessageReference(new MessageId(reference));
|
||||
}
|
||||
|
||||
};
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(key + " existing last recovered: " + lastRecovered);
|
||||
}
|
||||
if (isPrioritizedMessages()) {
|
||||
adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
|
||||
lastRecovered.sequence, lastRecovered.priority, maxReturned, jdbcListener);
|
||||
Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
|
||||
for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
|
||||
LastRecoveredEntry entry = it.next();
|
||||
recoveredAwareListener.setLastRecovered(entry);
|
||||
//Duration microDuration = new Duration("recoverNextMessages:loop");
|
||||
adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
|
||||
entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
|
||||
//microDuration.end(entry);
|
||||
if (recoveredAwareListener.stalled()) {
|
||||
if (recoveredAwareListener.complete()) {
|
||||
break;
|
||||
} else {
|
||||
entry.exhausted();
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LastRecoveredEntry last = lastRecovered.defaultPriority();
|
||||
recoveredAwareListener.setLastRecovered(last);
|
||||
adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
|
||||
lastRecovered.sequence, 0, maxReturned, jdbcListener);
|
||||
last.recovered, 0, maxReturned, recoveredAwareListener);
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(key + " last recovered: " + lastRecovered);
|
||||
}
|
||||
//duration.end();
|
||||
} catch (SQLException e) {
|
||||
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
|
||||
} finally {
|
||||
|
@ -168,6 +266,14 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
|
|||
subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
|
||||
}
|
||||
|
||||
protected void onAdd(long sequenceId, byte priority) {
|
||||
// update last recovered state
|
||||
for (LastRecovered last : subscriberLastRecoveredMap.values()) {
|
||||
last.updateStored(sequenceId, priority);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
|
||||
TransactionContext c = persistenceAdapter.getTransactionContext();
|
||||
try {
|
||||
|
@ -223,6 +329,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
|
|||
}
|
||||
|
||||
public int getMessageCount(String clientId, String subscriberName) throws IOException {
|
||||
//Duration duration = new Duration("getMessageCount");
|
||||
int result = 0;
|
||||
TransactionContext c = persistenceAdapter.getTransactionContext();
|
||||
try {
|
||||
|
@ -236,6 +343,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
|
|||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
|
||||
}
|
||||
//duration.end();
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -281,13 +281,13 @@ public class Statements {
|
|||
|
||||
public String getFindDurableSubMessagesByPriorityStatement() {
|
||||
if (findDurableSubMessagesByPriorityStatement == null) {
|
||||
findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
|
||||
+ getFullAckTableName() + " D "
|
||||
findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M,"
|
||||
+ " " + getFullAckTableName() + " D"
|
||||
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
|
||||
+ " AND M.CONTAINER=D.CONTAINER"
|
||||
+ " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID"
|
||||
+ " AND ( (M.ID > ?) OR (M.PRIORITY < ?) )"
|
||||
+ " ORDER BY M.PRIORITY DESC, M.ID";
|
||||
+ " AND M.ID > ? AND M.PRIORITY = ?"
|
||||
+ " ORDER BY M.ID";
|
||||
}
|
||||
return findDurableSubMessagesByPriorityStatement;
|
||||
}
|
||||
|
|
|
@ -23,15 +23,19 @@ import java.util.HashMap;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
@ -43,32 +47,28 @@ import org.apache.activemq.TestSupport;
|
|||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
//import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.activemq.util.MessageIdList;
|
||||
import org.apache.activemq.util.Wait;
|
||||
//import org.apache.commons.dbcp.BasicDataSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentProducerDurableConsumerTest.class);
|
||||
private int consumerCount = 1;
|
||||
private int consumerCount = 5;
|
||||
BrokerService broker;
|
||||
protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
|
||||
protected Map<MessageConsumer, MessageIdList> consumers = new HashMap<MessageConsumer, MessageIdList>();
|
||||
protected Map<MessageConsumer, TimedMessageListener> consumers = new HashMap<MessageConsumer, TimedMessageListener>();
|
||||
protected MessageIdList allMessagesList = new MessageIdList();
|
||||
private int messageSize = 1024;
|
||||
|
||||
public void testPlaceHolder() throws Exception {
|
||||
}
|
||||
|
||||
public void x_initCombosForTestSendRateWithActivatingConsumers() throws Exception {
|
||||
public void initCombosForTestSendRateWithActivatingConsumers() throws Exception {
|
||||
addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC, PersistenceAdapterChoice.MEM});
|
||||
}
|
||||
|
||||
public void x_testSendRateWithActivatingConsumers() throws Exception {
|
||||
public void testSendRateWithActivatingConsumers() throws Exception {
|
||||
final Destination destination = createDestination();
|
||||
final ConnectionFactory factory = createConnectionFactory();
|
||||
startInactiveConsumers(factory, destination);
|
||||
|
@ -78,12 +78,12 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
MessageProducer producer = createMessageProducer(session, destination);
|
||||
|
||||
// preload the durable consumers
|
||||
double[] inactiveConsumerStats = produceMessages(destination, 200, 100, session, producer, null);
|
||||
double[] inactiveConsumerStats = produceMessages(destination, 500, 10, session, producer, null);
|
||||
LOG.info("With inactive consumers: ave: " + inactiveConsumerStats[1]
|
||||
+ ", max: " + inactiveConsumerStats[0] + ", multiplier: " + (inactiveConsumerStats[0]/inactiveConsumerStats[1]));
|
||||
|
||||
// periodically start a durable sub that is has a backlog
|
||||
final int consumersToActivate = 1;
|
||||
// periodically start a durable sub that has a backlog
|
||||
final int consumersToActivate = 5;
|
||||
final Object addConsumerSignal = new Object();
|
||||
Executors.newCachedThreadPool(new ThreadFactory() {
|
||||
@Override
|
||||
|
@ -96,16 +96,15 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
try {
|
||||
MessageConsumer consumer = null;
|
||||
for (int i = 0; i < consumersToActivate; i++) {
|
||||
LOG.info("Waiting for add signal");
|
||||
LOG.info("Waiting for add signal from producer...");
|
||||
synchronized (addConsumerSignal) {
|
||||
addConsumerSignal.wait(30 * 60 * 1000);
|
||||
}
|
||||
TimedMessageListener listener = new TimedMessageListener();
|
||||
consumer = createDurableSubscriber(factory.createConnection(), destination, "consumer" + (i + 1));
|
||||
LOG.info("Created consumer " + consumer);
|
||||
MessageIdList list = new MessageIdList();
|
||||
list.setParent(allMessagesList);
|
||||
consumer.setMessageListener(list);
|
||||
consumers.put(consumer, list);
|
||||
consumer.setMessageListener(listener);
|
||||
consumers.put(consumer, listener);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("failed to start consumer", e);
|
||||
|
@ -114,18 +113,44 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
});
|
||||
|
||||
|
||||
double[] stats = produceMessages(destination, 20, 100, session, producer, addConsumerSignal);
|
||||
double[] statsWithActive = produceMessages(destination, 300, 10, session, producer, addConsumerSignal);
|
||||
|
||||
LOG.info(" with concurrent activate, ave: " + stats[1] + ", max: " + stats[0] + ", multiplier: " + (stats[0]/stats[1]));
|
||||
assertTrue("max (" + stats[0] + ") within reasonable multiplier of ave (" + stats[1] + ")",
|
||||
stats[0] < 5 * stats[1]);
|
||||
LOG.info(" with concurrent activate, ave: " + statsWithActive[1] + ", max: " + statsWithActive[0] + ", multiplier: " + (statsWithActive[0]/ statsWithActive[1]));
|
||||
|
||||
while(consumers.size() < consumersToActivate) {
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
}
|
||||
|
||||
long timeToFirstAccumulator = 0;
|
||||
for (TimedMessageListener listener : consumers.values()) {
|
||||
long time = listener.getFirstReceipt();
|
||||
timeToFirstAccumulator += time;
|
||||
LOG.info("Time to first " + time);
|
||||
}
|
||||
LOG.info("Ave time to first message =" + timeToFirstAccumulator/consumers.size());
|
||||
|
||||
for (TimedMessageListener listener : consumers.values()) {
|
||||
LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(5000) + " max receipt: " + listener.maxReceiptTime);
|
||||
}
|
||||
|
||||
//assertTrue("max (" + statsWithActive[0] + ") within reasonable multiplier of ave (" + statsWithActive[1] + ")",
|
||||
// statsWithActive[0] < 5 * statsWithActive[1]);
|
||||
|
||||
// compare no active to active
|
||||
LOG.info("Ave send time with active: " + statsWithActive[1]
|
||||
+ " as multiplier of ave with none active: " + inactiveConsumerStats[1]
|
||||
+ ", multiplier=" + (statsWithActive[1]/inactiveConsumerStats[1]));
|
||||
|
||||
assertTrue("Ave send time with active: " + statsWithActive[1]
|
||||
+ " within reasonable multpler of ave with none active: " + inactiveConsumerStats[1]
|
||||
+ ", multiplier " + (statsWithActive[1]/inactiveConsumerStats[1]),
|
||||
statsWithActive[1] < 15 * inactiveConsumerStats[1]);
|
||||
}
|
||||
|
||||
|
||||
public void x_initCombosForTestSendWithInactiveAndActiveConsumers() throws Exception {
|
||||
addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC, PersistenceAdapterChoice.MEM});
|
||||
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
}
|
||||
|
||||
public void x_testSendWithInactiveAndActiveConsumers() throws Exception {
|
||||
|
@ -150,7 +175,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
|
||||
LOG.info("With consumer: " + withConsumerStats[1] + " , with noConsumer: " + noConsumerStats[1]
|
||||
+ ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]));
|
||||
final int reasonableMultiplier = 4; // not so reasonable, but on slow disks it can be
|
||||
final int reasonableMultiplier = 15; // not so reasonable but improving
|
||||
assertTrue("max X times as slow with consumer: " + withConsumerStats[1] + ", with no Consumer: "
|
||||
+ noConsumerStats[1] + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]),
|
||||
withConsumerStats[1] < noConsumerStats[1] * reasonableMultiplier);
|
||||
|
@ -188,9 +213,8 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
|
||||
MessageConsumer consumer;
|
||||
for (int i = 0; i < consumerCount; i++) {
|
||||
TimedMessageListener list = new TimedMessageListener();
|
||||
consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1));
|
||||
MessageIdList list = new MessageIdList();
|
||||
list.setParent(allMessagesList);
|
||||
consumer.setMessageListener(list);
|
||||
consumers.put(consumer, list);
|
||||
}
|
||||
|
@ -212,39 +236,44 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
* @throws Exception
|
||||
*/
|
||||
private double[] produceMessages(Destination destination,
|
||||
int toSend,
|
||||
int numIterations,
|
||||
final int toSend,
|
||||
final int numIterations,
|
||||
Session session,
|
||||
MessageProducer producer,
|
||||
Object addConsumerSignal) throws Exception {
|
||||
long start;
|
||||
long count = 0;
|
||||
double max = 0, sum = 0;
|
||||
double batchMax = 0, max = 0, sum = 0;
|
||||
for (int i=0; i<numIterations; i++) {
|
||||
start = System.currentTimeMillis();
|
||||
for (int j=0; j < toSend; j++) {
|
||||
long singleSendstart = System.currentTimeMillis();
|
||||
TextMessage msg = createTextMessage(session, "" + j);
|
||||
producer.send(msg);
|
||||
if (++count % 300 == 0) {
|
||||
max = Math.max(max, (System.currentTimeMillis() - singleSendstart));
|
||||
if (++count % 500 == 0) {
|
||||
if (addConsumerSignal != null) {
|
||||
synchronized (addConsumerSignal) {
|
||||
addConsumerSignal.notifyAll();
|
||||
LOG.info("Signaled add consumer");
|
||||
LOG.info("Signalled add consumer");
|
||||
}
|
||||
}
|
||||
}
|
||||
;
|
||||
if (count % 5000 == 0) {
|
||||
LOG.info("Sent " + count);
|
||||
LOG.info("Sent " + count + ", singleSendMax:" + max);
|
||||
}
|
||||
|
||||
}
|
||||
long duration = System.currentTimeMillis() - start;
|
||||
max = Math.max(max, duration);
|
||||
batchMax = Math.max(batchMax, duration);
|
||||
sum += duration;
|
||||
LOG.info("Iteration " + i + ", sent " + toSend + ", time: "
|
||||
+ duration + ", batchMax:" + batchMax + ", singleSendMax:" + max);
|
||||
}
|
||||
|
||||
LOG.info("Sent: " + toSend * numIterations + ", max send time: " + max);
|
||||
return new double[]{max, sum/numIterations};
|
||||
LOG.info("Sent: " + toSend * numIterations + ", batchMax: " + batchMax + " singleSendMax: " + max);
|
||||
return new double[]{batchMax, sum/numIterations};
|
||||
}
|
||||
|
||||
protected TextMessage createTextMessage(Session session, String initText) throws Exception {
|
||||
|
@ -297,12 +326,44 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
policy.setPrioritizedMessages(true);
|
||||
policy.setMaxPageSize(500);
|
||||
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
policyMap.setDefaultEntry(policy);
|
||||
brokerService.setDestinationPolicy(policyMap);
|
||||
|
||||
//setPersistenceAdapter(brokerService, PersistenceAdapterChoice.JDBC);
|
||||
setDefaultPersistenceAdapter(brokerService);
|
||||
if (false) {
|
||||
// external mysql works a lot faster
|
||||
//
|
||||
// JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
|
||||
// BasicDataSource ds = new BasicDataSource();
|
||||
// com.mysql.jdbc.Driver d = new com.mysql.jdbc.Driver();
|
||||
// ds.setDriverClassName("com.mysql.jdbc.Driver");
|
||||
// ds.setUrl("jdbc:mysql://localhost/activemq?relaxAutoCommit=true");
|
||||
// ds.setMaxActive(200);
|
||||
// ds.setUsername("root");
|
||||
// ds.setPassword("");
|
||||
// ds.setPoolPreparedStatements(true);
|
||||
// jdbc.setDataSource(ds);
|
||||
// brokerService.setPersistenceAdapter(jdbc);
|
||||
|
||||
/* add mysql bits to the pom in the testing dependencies
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>5.1.10</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-dbcp</groupId>
|
||||
<artifactId>commons-dbcp</artifactId>
|
||||
<version>1.2.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
*/
|
||||
} else {
|
||||
setDefaultPersistenceAdapter(brokerService);
|
||||
}
|
||||
return brokerService;
|
||||
}
|
||||
|
||||
|
@ -311,6 +372,8 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
|
||||
prefetchPolicy.setAll(1);
|
||||
factory.setPrefetchPolicy(prefetchPolicy);
|
||||
|
||||
factory.setDispatchAsync(true);
|
||||
return factory;
|
||||
}
|
||||
|
||||
|
@ -318,4 +381,55 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport {
|
|||
return suite(ConcurrentProducerDurableConsumerTest.class);
|
||||
}
|
||||
|
||||
class TimedMessageListener implements MessageListener {
|
||||
final int batchSize = 1000;
|
||||
CountDownLatch firstReceiptLatch = new CountDownLatch(1);
|
||||
long mark = System.currentTimeMillis();
|
||||
long firstReceipt = 0l;
|
||||
long receiptAccumulator = 0;
|
||||
long batchReceiptAccumulator = 0;
|
||||
long maxReceiptTime = 0;
|
||||
AtomicLong count = new AtomicLong(0);
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
final long current = System.currentTimeMillis();
|
||||
final long duration = current - mark;
|
||||
receiptAccumulator += duration;
|
||||
allMessagesList.onMessage(message);
|
||||
if (count.incrementAndGet() == 1) {
|
||||
firstReceipt = duration;
|
||||
firstReceiptLatch.countDown();
|
||||
LOG.info("First receipt in " + firstReceipt + "ms");
|
||||
} else if (count.get() % batchSize == 0) {
|
||||
LOG.info("Consumed " + batchSize + " in " + batchReceiptAccumulator + "ms");
|
||||
batchReceiptAccumulator=0;
|
||||
}
|
||||
maxReceiptTime = Math.max(maxReceiptTime, duration);
|
||||
receiptAccumulator += duration;
|
||||
batchReceiptAccumulator += duration;
|
||||
mark = current;
|
||||
}
|
||||
|
||||
long getMessageCount() {
|
||||
return count.get();
|
||||
}
|
||||
|
||||
long getFirstReceipt() throws Exception {
|
||||
firstReceiptLatch.await(30, TimeUnit.SECONDS);
|
||||
return firstReceipt;
|
||||
}
|
||||
|
||||
public long waitForReceivedLimit(long limit) throws Exception {
|
||||
final long expiry = System.currentTimeMillis() + 30*60*1000;
|
||||
while (count.get() < limit) {
|
||||
if (System.currentTimeMillis() > expiry) {
|
||||
throw new RuntimeException("Expired waiting for X messages, " + limit);
|
||||
}
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
}
|
||||
return receiptAccumulator/(limit/batchSize);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue