Reduce contention around Queues

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@607038 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-12-27 11:06:50 +00:00
parent ea0cd012b1
commit 2db7df72d6
11 changed files with 493 additions and 447 deletions

View File

@ -39,7 +39,16 @@ import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker; import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.ft.MasterConnector; import org.apache.activemq.broker.ft.MasterConnector;
import org.apache.activemq.broker.jmx.*; import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.ConnectorView;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.FTConnectorView;
import org.apache.activemq.broker.jmx.JmsConnectorView;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.NetworkConnectorView;
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
import org.apache.activemq.broker.jmx.ProxyConnectorView;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor; import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory; import org.apache.activemq.broker.region.DestinationFactory;
@ -151,6 +160,7 @@ public class BrokerService implements Service {
private CountDownLatch stoppedLatch = new CountDownLatch(1); private CountDownLatch stoppedLatch = new CountDownLatch(1);
private boolean supportFailOver; private boolean supportFailOver;
private boolean clustered; private boolean clustered;
static { static {
String localHostName = "localhost"; String localHostName = "localhost";
@ -363,7 +373,7 @@ public class BrokerService implements Service {
/** /**
* @return true if this Broker is a slave to a Master * @return true if this Broker is a slave to a Master
*/ */
public synchronized boolean isSlave() { public boolean isSlave() {
return masterConnector != null && masterConnector.isSlave(); return masterConnector != null && masterConnector.isSlave();
} }

View File

@ -56,18 +56,18 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
} }
public synchronized boolean isActive() { public boolean isActive() {
return active; return active;
} }
protected synchronized boolean isFull() { protected boolean isFull() {
return !active || super.isFull(); return !active || super.isFull();
} }
public synchronized void gc() { public void gc() {
} }
public synchronized void add(ConnectionContext context, Destination destination) throws Exception { public void add(ConnectionContext context, Destination destination) throws Exception {
super.add(context, destination); super.add(context, destination);
destinations.put(destination.getActiveMQDestination(), destination); destinations.put(destination.getActiveMQDestination(), destination);
if (active || keepDurableSubsActive) { if (active || keepDurableSubsActive) {
@ -77,38 +77,43 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
topic.recoverRetroactiveMessages(context, this); topic.recoverRetroactiveMessages(context, this);
} }
} }
dispatchMatched(); dispatchPending();
} }
public synchronized void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info) throws Exception { public void activate(SystemUsage memoryManager, ConnectionContext context,
ConsumerInfo info) throws Exception {
LOG.debug("Activating " + this); LOG.debug("Activating " + this);
if (!active) { if (!active) {
this.active = true; this.active = true;
this.context = context; this.context = context;
this.info = info; this.info = info;
if (!keepDurableSubsActive) { if (!keepDurableSubsActive) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { for (Iterator<Destination> iter = destinations.values()
Topic topic = (Topic)iter.next(); .iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
topic.activate(context, this); topic.activate(context, this);
} }
} }
pending.setSystemUsage(memoryManager); synchronized (pending) {
pending.start(); pending.setSystemUsage(memoryManager);
pending.start();
// If nothing was in the persistent store, then try to use the // If nothing was in the persistent store, then try to use the
// recovery policy. // recovery policy.
if (pending.isEmpty()) { if (pending.isEmpty()) {
for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) { for (Iterator<Destination> iter = destinations.values()
Topic topic = (Topic)iter.next(); .iterator(); iter.hasNext();) {
topic.recoverRetroactiveMessages(context, this); Topic topic = (Topic) iter.next();
topic.recoverRetroactiveMessages(context, this);
}
} }
} }
dispatchMatched(); dispatchPending();
this.usageManager.getMemoryUsage().addUsageListener(this); this.usageManager.getMemoryUsage().addUsageListener(this);
} }
} }
public synchronized void deactivate(boolean keepDurableSubsActive) throws Exception { public void deactivate(boolean keepDurableSubsActive) throws Exception {
active = false; active = false;
this.usageManager.getMemoryUsage().removeUsageListener(this); this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (pending) { synchronized (pending) {
@ -136,7 +141,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
} }
dispatched.clear(); synchronized(dispatched) {
dispatched.clear();
}
if (!keepDurableSubsActive && pending.isTransient()) { if (!keepDurableSubsActive && pending.isTransient()) {
synchronized (pending) { synchronized (pending) {
try { try {
@ -163,7 +170,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return md; return md;
} }
public synchronized void add(MessageReference node) throws Exception { public void add(MessageReference node) throws Exception {
if (!active && !keepDurableSubsActive) { if (!active && !keepDurableSubsActive) {
return; return;
} }
@ -171,11 +178,13 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
super.add(node); super.add(node);
} }
protected synchronized void doAddRecoveredMessage(MessageReference message) throws Exception { protected void doAddRecoveredMessage(MessageReference message) throws Exception {
synchronized(pending) {
pending.addRecoveredMessage(message); pending.addRecoveredMessage(message);
}
} }
public synchronized int getPendingQueueSize() { public int getPendingQueueSize() {
if (active || keepDurableSubsActive) { if (active || keepDurableSubsActive) {
return super.getPendingQueueSize(); return super.getPendingQueueSize();
} }
@ -187,7 +196,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
} }
protected synchronized boolean canDispatch(MessageReference node) { protected boolean canDispatch(MessageReference node) {
return active; return active;
} }
@ -217,24 +226,28 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
/** /**
* Release any references that we are holding. * Release any references that we are holding.
*/ */
public synchronized void destroy() { public void destroy() {
try { synchronized (pending) {
synchronized (pending) { try {
pending.reset(); pending.reset();
while (pending.hasNext()) { while (pending.hasNext()) {
MessageReference node = pending.next(); MessageReference node = pending.next();
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
} finally {
pending.release();
pending.clear();
} }
} finally {
pending.release();
pending.clear();
} }
for (Iterator iter = dispatched.iterator(); iter.hasNext();) { synchronized(dispatched) {
MessageReference node = (MessageReference)iter.next(); for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
node.decrementReferenceCount(); MessageReference node = (MessageReference) iter.next();
node.decrementReferenceCount();
}
dispatched.clear();
} }
dispatched.clear();
} }
/** /**
@ -247,7 +260,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) { if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
try { try {
dispatchMatched(); dispatchPending();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("problem calling dispatchMatched", e); LOG.warn("problem calling dispatchMatched", e);
} }

View File

@ -64,6 +64,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
private int maxAuditDepth=2048; private int maxAuditDepth=2048;
protected final SystemUsage usageManager; protected final SystemUsage usageManager;
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit(); protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
private final Object pendingLock = new Object();
private final Object dispatchLock = new Object();
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException { public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
super(broker, context, info); super(broker, context, info);
@ -87,14 +89,14 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (getPrefetchSize() == 0 && !isSlave()) { if (getPrefetchSize() == 0 && !isSlave()) {
prefetchExtension++; prefetchExtension++;
final long dispatchCounterBeforePull = dispatchCounter; final long dispatchCounterBeforePull = dispatchCounter;
dispatchMatched(); dispatchPending();
// If there was nothing dispatched.. we may need to setup a timeout. // If there was nothing dispatched.. we may need to setup a timeout.
if (dispatchCounterBeforePull == dispatchCounter) { if (dispatchCounterBeforePull == dispatchCounter) {
// imediate timeout used by receiveNoWait() // imediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) { if (pull.getTimeout() == -1) {
// Send a NULL message. // Send a NULL message.
add(QueueMessageReference.NULL_MESSAGE); add(QueueMessageReference.NULL_MESSAGE);
dispatchMatched(); dispatchPending();
} }
if (pull.getTimeout() > 0) { if (pull.getTimeout() > 0) {
Scheduler.executeAfterDelay(new Runnable() { Scheduler.executeAfterDelay(new Runnable() {
@ -117,216 +119,238 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (dispatchCounterBeforePull == dispatchCounter) { if (dispatchCounterBeforePull == dispatchCounter) {
try { try {
add(QueueMessageReference.NULL_MESSAGE); add(QueueMessageReference.NULL_MESSAGE);
dispatchMatched(); dispatchPending();
} catch (Exception e) { } catch (Exception e) {
context.getConnection().serviceException(e); context.getConnection().serviceException(e);
} }
} }
} }
public synchronized void add(MessageReference node) throws Exception { public void add(MessageReference node) throws Exception {
boolean pendingEmpty = false; boolean pendingEmpty = false;
pendingEmpty = pending.isEmpty(); synchronized(pendingLock) {
pendingEmpty = pending.isEmpty();
}
enqueueCounter++; enqueueCounter++;
if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) { if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
dispatch(node); dispatch(node);
} else { } else {
optimizePrefetch(); optimizePrefetch();
synchronized (pending) { synchronized(pendingLock) {
if (pending.isEmpty() && LOG.isDebugEnabled()) { if (pending.isEmpty() && LOG.isDebugEnabled()) {
LOG.debug("Prefetch limit."); LOG.debug("Prefetch limit.");
} }
pending.addMessageLast(node); pending.addMessageLast(node);
dispatchMatched();
} }
dispatchPending();
} }
} }
public synchronized void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
try { synchronized(pendingLock) {
pending.reset(); try {
while (pending.hasNext()) { pending.reset();
MessageReference node = pending.next(); while (pending.hasNext()) {
if (node.getMessageId().equals(mdn.getMessageId())) { MessageReference node = pending.next();
pending.remove(); if (node.getMessageId().equals(mdn.getMessageId())) {
createMessageDispatch(node, node.getMessage()); pending.remove();
dispatched.add(node); createMessageDispatch(node, node.getMessage());
return; synchronized(dispatchLock) {
dispatched.add(node);
}
return;
}
} }
} finally {
pending.release();
} }
} finally {
pending.release();
} }
throw new JMSException("Slave broker out of sync with master: Dispatched message (" + mdn.getMessageId() + ") was not in the pending list"); throw new JMSException(
"Slave broker out of sync with master: Dispatched message ("
+ mdn.getMessageId() + ") was not in the pending list");
} }
public synchronized void acknowledge(final ConnectionContext context, public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case. // Handle the standard acknowledgment case.
boolean callDispatchMatched = false; boolean callDispatchMatched = false;
if (ack.isStandardAck()) { synchronized(dispatchLock) {
// Acknowledge all dispatched messages up till the message id of the if (ack.isStandardAck()) {
// acknowledgment. // Acknowledge all dispatched messages up till the message id of
int index = 0; // the
boolean inAckRange = false; // acknowledgment.
List<MessageReference> removeList = new ArrayList<MessageReference>(); int index = 0;
for (final MessageReference node : dispatched) { boolean inAckRange = false;
MessageId messageId = node.getMessageId(); List<MessageReference> removeList = new ArrayList<MessageReference>();
if (ack.getFirstMessageId() == null for (final MessageReference node : dispatched) {
|| ack.getFirstMessageId().equals(messageId)) { MessageId messageId = node.getMessageId();
inAckRange = true; if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
// Don't remove the nodes until we are committed.
if (!context.isInTransaction()) {
dequeueCounter++;
node.getRegionDestination()
.getDestinationStatistics().getDequeues()
.increment();
removeList.add(node);
} else {
// setup a Synchronization to remove nodes from the
// dispatched list.
context.getTransaction().addSynchronization(
new Synchronization() {
public void afterCommit()
throws Exception {
synchronized(dispatchLock) {
dequeueCounter++;
dispatched.remove(node);
node
.getRegionDestination()
.getDestinationStatistics()
.getDequeues()
.increment();
prefetchExtension--;
}
}
public void afterRollback()
throws Exception {
super.afterRollback();
}
});
}
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
if (context.isInTransaction()) {
// extend prefetch window only if not a pulling
// consumer
if (getPrefetchSize() != 0) {
prefetchExtension = Math.max(
prefetchExtension, index + 1);
}
} else {
prefetchExtension = Math.max(0,
prefetchExtension - (index + 1));
}
callDispatchMatched = true;
break;
}
}
} }
if (inAckRange) { for (final MessageReference node : removeList) {
// Don't remove the nodes until we are committed. dispatched.remove(node);
if (!context.isInTransaction()) { }
dequeueCounter++; // this only happens after a reconnect - get an ack which is not
// valid
if (!callDispatchMatched) {
if (LOG.isDebugEnabled()) {
LOG
.debug("Could not correlate acknowledgment with dispatched message: "
+ ack);
}
}
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
// Acknowledge all dispatched messages up till the message id of
// the
// acknowledgment.
int index = 0;
for (Iterator<MessageReference> iter = dispatched.iterator(); iter
.hasNext(); index++) {
final MessageReference node = iter.next();
if (ack.getLastMessageId().equals(node.getMessageId())) {
prefetchExtension = Math.max(prefetchExtension,
index + 1);
callDispatchMatched = true;
break;
}
}
if (!callDispatchMatched) {
throw new JMSException(
"Could not correlate acknowledgment with dispatched message: "
+ ack);
}
} else if (ack.isRedeliveredAck()) {
// Message was re-delivered but it was not yet considered to be
// a
// DLQ message.
// Acknowledge all dispatched messages up till the message id of
// the
// acknowledgment.
boolean inAckRange = false;
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
node.incrementRedeliveryCounter();
if (ack.getLastMessageId().equals(messageId)) {
callDispatchMatched = true;
break;
}
}
}
if (!callDispatchMatched) {
throw new JMSException(
"Could not correlate acknowledgment with dispatched message: "
+ ack);
}
} else if (ack.isPoisonAck()) {
// TODO: what if the message is already in a DLQ???
// Handle the poison ACK case: we need to send the message to a
// DLQ
if (ack.isInTransaction()) {
throw new JMSException("Poison ack cannot be transacted: "
+ ack);
}
// Acknowledge all dispatched messages up till the message id of
// the
// acknowledgment.
int index = 0;
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
sendToDLQ(context, node);
node.getRegionDestination().getDestinationStatistics() node.getRegionDestination().getDestinationStatistics()
.getDequeues().increment(); .getDequeues().increment();
removeList.add(node); removeList.add(node);
} else { dequeueCounter++;
// setup a Synchronization to remove nodes from the index++;
// dispatched list. acknowledge(context, ack, node);
context.getTransaction().addSynchronization( if (ack.getLastMessageId().equals(messageId)) {
new Synchronization() {
public void afterCommit() throws Exception {
synchronized (PrefetchSubscription.this) {
dequeueCounter++;
dispatched.remove(node);
node.getRegionDestination()
.getDestinationStatistics()
.getDequeues().increment();
prefetchExtension--;
}
}
public void afterRollback()
throws Exception {
super.afterRollback();
}
});
}
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
if (context.isInTransaction()) {
// extend prefetch window only if not a pulling
// consumer
if (getPrefetchSize() != 0) {
prefetchExtension = Math.max(prefetchExtension,
index + 1);
}
} else {
prefetchExtension = Math.max(0, prefetchExtension prefetchExtension = Math.max(0, prefetchExtension
- (index + 1)); - (index + 1));
callDispatchMatched = true;
break;
} }
callDispatchMatched = true;
break;
} }
} }
} for (final MessageReference node : removeList) {
for (final MessageReference node : removeList) { dispatched.remove(node);
dispatched.remove(node); }
} if (!callDispatchMatched) {
// this only happens after a reconnect - get an ack which is not throw new JMSException(
// valid "Could not correlate acknowledgment with dispatched message: "
if (!callDispatchMatched) {
if (LOG.isDebugEnabled()) {
LOG
.debug("Could not correlate acknowledgment with dispatched message: "
+ ack); + ack);
} }
} }
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
// Acknowledge all dispatched messages up till the message id of the
// acknowledgment.
int index = 0;
for (Iterator<MessageReference> iter = dispatched.iterator(); iter
.hasNext(); index++) {
final MessageReference node = iter.next();
if (ack.getLastMessageId().equals(node.getMessageId())) {
prefetchExtension = Math.max(prefetchExtension, index + 1);
callDispatchMatched = true;
break;
}
}
if (!callDispatchMatched) {
throw new JMSException(
"Could not correlate acknowledgment with dispatched message: "
+ ack);
}
} else if (ack.isRedeliveredAck()) {
// Message was re-delivered but it was not yet considered to be a
// DLQ message.
// Acknowledge all dispatched messages up till the message id of the
// acknowledgment.
boolean inAckRange = false;
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
node.incrementRedeliveryCounter();
if (ack.getLastMessageId().equals(messageId)) {
callDispatchMatched = true;
break;
}
}
}
if (!callDispatchMatched) {
throw new JMSException(
"Could not correlate acknowledgment with dispatched message: "
+ ack);
}
} else if (ack.isPoisonAck()) {
// TODO: what if the message is already in a DLQ???
// Handle the poison ACK case: we need to send the message to a DLQ
if (ack.isInTransaction()) {
throw new JMSException("Poison ack cannot be transacted: "
+ ack);
}
// Acknowledge all dispatched messages up till the message id of the
// acknowledgment.
int index = 0;
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
sendToDLQ(context, node);
node.getRegionDestination().getDestinationStatistics()
.getDequeues().increment();
removeList.add(node);
dequeueCounter++;
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
prefetchExtension = Math.max(0, prefetchExtension
- (index + 1));
callDispatchMatched = true;
break;
}
}
}
for (final MessageReference node : removeList) {
dispatched.remove(node);
}
if (!callDispatchMatched) {
throw new JMSException(
"Could not correlate acknowledgment with dispatched message: "
+ ack);
}
} }
if (callDispatchMatched) { if (callDispatchMatched) {
dispatchMatched(); dispatchPending();
} else { } else {
if (isSlave()) { if (isSlave()) {
throw new JMSException( throw new JMSException(
@ -356,45 +380,45 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* *
* @return * @return
*/ */
protected synchronized boolean isFull() { protected boolean isFull() {
return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize(); return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize();
} }
/** /**
* @return true when 60% or more room is left for dispatching messages * @return true when 60% or more room is left for dispatching messages
*/ */
public synchronized boolean isLowWaterMark() { public boolean isLowWaterMark() {
return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4); return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4);
} }
/** /**
* @return true when 10% or less room is left for dispatching messages * @return true when 10% or less room is left for dispatching messages
*/ */
public synchronized boolean isHighWaterMark() { public boolean isHighWaterMark() {
return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9); return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
} }
public synchronized int countBeforeFull() { public int countBeforeFull() {
return info.getPrefetchSize() + prefetchExtension - dispatched.size(); return info.getPrefetchSize() + prefetchExtension - dispatched.size();
} }
public synchronized int getPendingQueueSize() { public int getPendingQueueSize() {
return pending.size(); return pending.size();
} }
public synchronized int getDispatchedQueueSize() { public int getDispatchedQueueSize() {
return dispatched.size(); return dispatched.size();
} }
public synchronized long getDequeueCounter() { public long getDequeueCounter() {
return dequeueCounter; return dequeueCounter;
} }
public synchronized long getDispatchedCounter() { public long getDispatchedCounter() {
return dispatchCounter; return dispatchCounter;
} }
public synchronized long getEnqueueCounter() { public long getEnqueueCounter() {
return enqueueCounter; return enqueueCounter;
} }
@ -402,11 +426,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
return pending.isRecoveryRequired(); return pending.isRecoveryRequired();
} }
public synchronized PendingMessageCursor getPending() { public PendingMessageCursor getPending() {
return this.pending; return this.pending;
} }
public synchronized void setPending(PendingMessageCursor pending) { public void setPending(PendingMessageCursor pending) {
this.pending = pending; this.pending = pending;
if (this.pending!=null) { if (this.pending!=null) {
this.pending.setSystemUsage(usageManager); this.pending.setSystemUsage(usageManager);
@ -430,51 +454,60 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
*/ */
} }
public synchronized void add(ConnectionContext context, Destination destination) throws Exception { public void add(ConnectionContext context, Destination destination) throws Exception {
super.add(context, destination); synchronized(pendingLock) {
pending.add(context, destination); super.add(context, destination);
pending.add(context, destination);
}
} }
public synchronized void remove(ConnectionContext context, Destination destination) throws Exception { public void remove(ConnectionContext context, Destination destination) throws Exception {
super.remove(context, destination); synchronized(pendingLock) {
pending.remove(context, destination); super.remove(context, destination);
pending.remove(context, destination);
}
} }
protected synchronized void dispatchMatched() throws IOException { protected void dispatchPending() throws IOException {
if (!isSlave()) { if (!isSlave()) {
try { synchronized(pendingLock) {
int numberToDispatch = countBeforeFull(); try {
if (numberToDispatch > 0) { int numberToDispatch = countBeforeFull();
pending.setMaxBatchSize(numberToDispatch); if (numberToDispatch > 0) {
int count = 0; pending.setMaxBatchSize(numberToDispatch);
pending.reset(); int count = 0;
while (pending.hasNext() && !isFull() && count < numberToDispatch) { pending.reset();
MessageReference node = pending.next(); while (pending.hasNext() && !isFull()
if (node == null) { && count < numberToDispatch) {
break; MessageReference node = pending.next();
} if (node == null) {
if (canDispatch(node)) { break;
pending.remove(); }
// Message may have been sitting in the pending list if (canDispatch(node)) {
// a while pending.remove();
// waiting for the consumer to ak the message. // Message may have been sitting in the pending
if (node != QueueMessageReference.NULL_MESSAGE && broker.isExpired(node)) { // list
broker.messageExpired(getContext(), node); // a while
dequeueCounter++; // waiting for the consumer to ak the message.
continue; if (node != QueueMessageReference.NULL_MESSAGE
&& broker.isExpired(node)) {
broker.messageExpired(getContext(), node);
dequeueCounter++;
continue;
}
dispatch(node);
count++;
} }
dispatch(node);
count++;
} }
} }
} finally {
pending.release();
} }
} finally {
pending.release();
} }
} }
} }
protected synchronized boolean dispatch(final MessageReference node) throws IOException { protected boolean dispatch(final MessageReference node) throws IOException {
final Message message = node.getMessage(); final Message message = node.getMessage();
if (message == null) { if (message == null) {
return false; return false;
@ -488,7 +521,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
dispatchCounter++; dispatchCounter++;
dispatched.add(node); dispatched.add(node);
if(pending != null) { if(pending != null) {
pending.dispatched(message); synchronized(pendingLock) {
pending.dispatched(message);
}
} }
} else { } else {
prefetchExtension = Math.max(0, prefetchExtension - 1); prefetchExtension = Math.max(0, prefetchExtension - 1);
@ -523,7 +558,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
if (info.isDispatchAsync()) { if (info.isDispatchAsync()) {
try { try {
dispatchMatched(); dispatchPending();
} catch (IOException e) { } catch (IOException e) {
context.getConnection().serviceExceptionAsync(e); context.getConnection().serviceExceptionAsync(e);
} }

View File

@ -94,8 +94,8 @@ public class Queue extends BaseDestination implements Task {
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private int maximumPagedInMessages = garbageSizeBeforeCollection * 2; private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
private final Object exclusiveLockMutex = new Object(); private final Object exclusiveLockMutex = new Object();
private final Object sendLock = new Object();
private final TaskRunner taskRunner; private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
@ -204,150 +204,145 @@ public class Queue extends BaseDestination implements Task {
return true; return true;
} }
public synchronized void addSubscription(ConnectionContext context, Subscription sub) throws Exception { public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
sub.add(context, this); sub.add(context, this);
destinationStatistics.getConsumers().increment(); destinationStatistics.getConsumers().increment();
maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize(); maximumPagedInMessages += sub.getConsumerInfo().getPrefetchSize();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); MessageEvaluationContext msgContext = new MessageEvaluationContext();
try {
// needs to be synchronized - so no contention with dispatching
//needs to be synchronized - so no contention with dispatching synchronized (consumers) {
synchronized (consumers) { consumers.add(sub);
consumers.add(sub); if (sub.getConsumerInfo().isExclusive()) {
if (sub.getConsumerInfo().isExclusive()) { LockOwner owner = (LockOwner) sub;
LockOwner owner = (LockOwner)sub; if (exclusiveOwner == null) {
if (exclusiveOwner == null) { exclusiveOwner = owner;
} else {
// switch the owner if the priority is higher.
if (owner.getLockPriority() > exclusiveOwner
.getLockPriority()) {
exclusiveOwner = owner; exclusiveOwner = owner;
} else { }
// switch the owner if the priority is higher. }
if (owner.getLockPriority() > exclusiveOwner.getLockPriority()) { }
exclusiveOwner = owner; }
// we hold the lock on the dispatchValue - so lets build the paged in
// list directly;
buildList(false);
// synchronize with dispatch method so that no new messages are sent
// while
// setting up a subscription. avoid out of order messages,
// duplicates
// etc.
msgContext.setDestination(destination);
synchronized (pagedInMessages) {
// Add all the matching messages in the queue to the
// subscription.
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i
.hasNext();) {
QueueMessageReference node = (QueueMessageReference) i.next();
if (node.isDropped()
|| (!sub.getConsumerInfo().isBrowser() && node
.getLockOwner() != null)) {
continue;
}
try {
msgContext.setMessageReference(node);
if (sub.matches(node, msgContext)) {
sub.add(node);
}
} catch (IOException e) {
log.warn("Could not load message: " + e, e);
}
}
}
}
public void removeSubscription(ConnectionContext context, Subscription sub)
throws Exception {
destinationStatistics.getConsumers().decrement();
maximumPagedInMessages -= sub.getConsumerInfo().getPrefetchSize();
// synchronize with dispatch method so that no new messages are sent
// while
// removing up a subscription.
synchronized (consumers) {
consumers.remove(sub);
if (sub.getConsumerInfo().isExclusive()) {
LockOwner owner = (LockOwner) sub;
// Did we loose the exclusive owner??
if (exclusiveOwner == owner) {
// Find the exclusive consumer with the higest Lock
// Priority.
exclusiveOwner = null;
for (Iterator<Subscription> iter = consumers.iterator(); iter
.hasNext();) {
Subscription s = iter.next();
LockOwner so = (LockOwner) s;
if (s.getConsumerInfo().isExclusive()
&& (exclusiveOwner == null || so
.getLockPriority() > exclusiveOwner
.getLockPriority())) {
exclusiveOwner = so;
} }
} }
} }
} }
if (consumers.isEmpty()) {
//we hold the lock on the dispatchValue - so lets build the paged in messages.gc();
//list directly; }
buildList(false); }
sub.remove(context, this);
// synchronize with dispatch method so that no new messages are sent boolean wasExclusiveOwner = false;
// while if (exclusiveOwner == sub) {
// setting up a subscription. avoid out of order messages, exclusiveOwner = null;
// duplicates wasExclusiveOwner = true;
// etc. }
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
MessageGroupSet ownedGroups = getMessageGroupOwners().removeConsumer(
consumerId);
msgContext.setDestination(destination); if (!sub.getConsumerInfo().isBrowser()) {
synchronized (pagedInMessages) { MessageEvaluationContext msgContext = new MessageEvaluationContext();
// Add all the matching messages in the queue to the
// subscription. msgContext.setDestination(destination);
for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) { // lets copy the messages to dispatch to avoid deadlock
QueueMessageReference node = (QueueMessageReference)i.next(); List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
if (node.isDropped() || (!sub.getConsumerInfo().isBrowser() && node.getLockOwner()!=null)) { synchronized (pagedInMessages) {
continue; for (Iterator<MessageReference> i = pagedInMessages.iterator(); i
} .hasNext();) {
try { QueueMessageReference node = (QueueMessageReference) i
msgContext.setMessageReference(node); .next();
if (sub.matches(node, msgContext)) { if (node.isDropped()) {
sub.add(node); continue;
} }
} catch (IOException e) { String groupID = node.getGroupID();
log.warn("Could not load message: " + e, e); // Re-deliver all messages that the sub locked
} if (node.getLockOwner() == sub
|| wasExclusiveOwner
|| (groupID != null && ownedGroups
.contains(groupID))) {
messagesToDispatch.add(node);
} }
} }
}
// now lets dispatch from the copy of the collection to
// avoid deadlocks
} finally { for (Iterator<QueueMessageReference> iter = messagesToDispatch
msgContext.clear(); .iterator(); iter.hasNext();) {
QueueMessageReference node = iter.next();
node.incrementRedeliveryCounter();
node.unlock();
msgContext.setMessageReference(node);
dispatchPolicy.dispatch(node, msgContext, consumers);
}
} }
} }
public synchronized void removeSubscription(ConnectionContext context,
Subscription sub) throws Exception{
destinationStatistics.getConsumers().decrement();
maximumPagedInMessages-=sub.getConsumerInfo().getPrefetchSize();
// synchronize with dispatch method so that no new messages are sent
// while
// removing up a subscription.
synchronized(consumers){
consumers.remove(sub);
if(sub.getConsumerInfo().isExclusive()){
LockOwner owner=(LockOwner)sub;
// Did we loose the exclusive owner??
if(exclusiveOwner==owner){
// Find the exclusive consumer with the higest Lock
// Priority.
exclusiveOwner=null;
for(Iterator<Subscription> iter=consumers.iterator();iter
.hasNext();){
Subscription s=iter.next();
LockOwner so=(LockOwner)s;
if(s.getConsumerInfo().isExclusive()
&&(exclusiveOwner==null||so.getLockPriority()>exclusiveOwner
.getLockPriority())){
exclusiveOwner=so;
}
}
}
}
if(consumers.isEmpty()){
messages.gc();
}
}
sub.remove(context,this);
boolean wasExclusiveOwner=false;
if(exclusiveOwner==sub){
exclusiveOwner=null;
wasExclusiveOwner=true;
}
ConsumerId consumerId=sub.getConsumerInfo().getConsumerId();
MessageGroupSet ownedGroups=getMessageGroupOwners().removeConsumer(
consumerId);
if(!sub.getConsumerInfo().isBrowser()){
MessageEvaluationContext msgContext=context
.getMessageEvaluationContext();
try{
msgContext.setDestination(destination);
// lets copy the messages to dispatch to avoid deadlock
List<QueueMessageReference> messagesToDispatch=new ArrayList<QueueMessageReference>();
synchronized(pagedInMessages){
for(Iterator<MessageReference> i=pagedInMessages.iterator();i
.hasNext();){
QueueMessageReference node=(QueueMessageReference)i
.next();
if(node.isDropped()){
continue;
}
String groupID=node.getGroupID();
// Re-deliver all messages that the sub locked
if(node.getLockOwner()==sub
||wasExclusiveOwner
||(groupID!=null&&ownedGroups.contains(groupID))){
messagesToDispatch.add(node);
}
}
}
// now lets dispatch from the copy of the collection to
// avoid deadlocks
for(Iterator<QueueMessageReference> iter=messagesToDispatch
.iterator();iter.hasNext();){
QueueMessageReference node=iter.next();
node.incrementRedeliveryCounter();
node.unlock();
msgContext.setMessageReference(node);
dispatchPolicy.dispatch(node,msgContext,consumers);
}
}finally{
msgContext.clear();
}
}
}
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext(); final ConnectionContext context = producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the // There is delay between the client sending it and it arriving at the
@ -445,16 +440,21 @@ public class Queue extends BaseDestination implements Task {
} }
} }
synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext(); final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this); synchronized (sendLock) {
if (store != null && message.isPersistent()) { message.setRegionDestination(this);
while (!systemUsage.getStoreUsage().waitForSpace(1000)) { if (store != null && message.isPersistent()) {
if (context.getStopping().get()) { while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
throw new IOException("Connection closed, send aborted."); if (context.getStopping().get()) {
throw new IOException(
"Connection closed, send aborted.");
}
} }
store.addMessage(context, message);
} }
store.addMessage(context, message);
} }
if (context.isInTransaction()) { if (context.isInTransaction()) {
// If this is a transacted message.. increase the usage now so that // If this is a transacted message.. increase the usage now so that
@ -1010,57 +1010,51 @@ public class Queue extends BaseDestination implements Task {
return result; return result;
} }
private synchronized List<MessageReference> buildList(boolean force) throws Exception { private List<MessageReference> buildList(boolean force) throws Exception {
final int toPageIn = maximumPagedInMessages - pagedInMessages.size(); final int toPageIn = maximumPagedInMessages - pagedInMessages.size();
List<MessageReference> result = null; List<MessageReference> result = null;
if ((force || !consumers.isEmpty()) && toPageIn > 0) { if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn); messages.setMaxBatchSize(toPageIn);
try { int count = 0;
int count = 0; result = new ArrayList<MessageReference>(toPageIn);
result = new ArrayList<MessageReference>(toPageIn); synchronized (messages) {
synchronized (messages) {
try { try {
messages.reset(); messages.reset();
while (messages.hasNext() && count < toPageIn) { while (messages.hasNext() && count < toPageIn) {
MessageReference node = messages.next(); MessageReference node = messages.next();
messages.remove(); messages.remove();
if (!broker.isExpired(node)) { if (!broker.isExpired(node)) {
node = createMessageReference(node.getMessage()); node = createMessageReference(node.getMessage());
result.add(node); result.add(node);
count++; count++;
} else { } else {
broker.messageExpired(createConnectionContext(), node); broker.messageExpired(createConnectionContext(),
destinationStatistics.getMessages().decrement(); node);
} destinationStatistics.getMessages().decrement();
} }
} finally {
messages.release();
} }
} finally {
messages.release();
} }
synchronized (pagedInMessages) { }
pagedInMessages.addAll(result); synchronized (pagedInMessages) {
} pagedInMessages.addAll(result);
} finally {
queueMsgConext.clear();
} }
} }
return result; return result;
} }
private synchronized void doDispatch(List<MessageReference> list) throws Exception { private synchronized void doDispatch(List<MessageReference> list) throws Exception {
if (list != null && !list.isEmpty()) { if (list != null && !list.isEmpty()) {
try { MessageEvaluationContext msgContext = new MessageEvaluationContext();
for (int i = 0; i < list.size(); i++) { for (int i = 0; i < list.size(); i++) {
MessageReference node = list.get(i); MessageReference node = list.get(i);
queueMsgConext.setDestination(destination); msgContext.setDestination(destination);
queueMsgConext.setMessageReference(node); msgContext.setMessageReference(node);
dispatchPolicy.dispatch(node, queueMsgConext, consumers); dispatchPolicy.dispatch(node, msgContext, consumers);
}
} finally {
queueMsgConext.clear();
} }
} }
} }

View File

@ -44,13 +44,6 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy {
* org.apache.activemq.filter.MessageEvaluationContext, java.util.List) * org.apache.activemq.filter.MessageEvaluationContext, java.util.List)
*/ */
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers) throws Exception { public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers) throws Exception {
// Big synch here so that only 1 message gets dispatched at a time.
// Ensures
// Everyone sees the same order and that the consumer list is not used
// while
// it's being rotated.
synchronized (consumers) {
int count = 0; int count = 0;
Subscription firstMatchingConsumer = null; Subscription firstMatchingConsumer = null;
@ -79,7 +72,5 @@ public class RoundRobinDispatchPolicy implements DispatchPolicy {
} }
} }
return count > 0; return count > 0;
}
} }
} }

View File

@ -25,8 +25,8 @@ import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -47,7 +47,6 @@ import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Callback; import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate; import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;

View File

@ -31,15 +31,15 @@ public class AMQStoreDurableTopicTest extends SimpleDurableTopicTest {
answer.setDeleteAllMessagesOnStartup(true); answer.setDeleteAllMessagesOnStartup(true);
AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter(); AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
adaptor.setArchiveDataLogs(true); adaptor.setArchiveDataLogs(true);
adaptor.setMaxFileLength(1024 * 64); //adaptor.setMaxFileLength(1024 * 64);
answer.setDataDirectoryFile(dataFileDir); answer.setDataDirectoryFile(dataFileDir);
answer.setPersistenceAdapter(adaptor); answer.setPersistenceAdapter(adaptor);
answer.addConnector(uri); answer.addConnector(uri);
} }
protected void setUp() throws Exception { protected void setUp() throws Exception {
numberofProducers=6; numberofProducers=2;
numberOfConsumers=6; numberOfConsumers=10;
this.consumerSleepDuration=0; this.consumerSleepDuration=0;
super.setUp(); super.setUp();
} }

View File

@ -79,7 +79,7 @@ public class PerfConsumer implements MessageListener {
public void onMessage(Message msg) { public void onMessage(Message msg) {
rate.increment(); rate.increment();
try { try {
if (!this.audit.isInOrder(msg.getJMSMessageID())) { if (msg.getJMSDestination() instanceof Topic && !this.audit.isInOrder(msg.getJMSMessageID())) {
LOG.error("Message out of order!!" + msg); LOG.error("Message out of order!!" + msg);
} }
if (this.audit.isDuplicate(msg)){ if (this.audit.isDuplicate(msg)){

View File

@ -30,8 +30,8 @@ public class SimpleDurableTopicNetworkTest extends SimpleNetworkTest {
protected void setUp() throws Exception { protected void setUp() throws Exception {
numberofProducers=6; numberofProducers=6;
numberOfConsumers=6; numberOfConsumers=6;
samepleCount=100; samepleCount=1000;
playloadSize = 1; playloadSize = 1024;
super.setUp(); super.setUp();
} }

View File

@ -30,7 +30,9 @@ public class SimpleQueueTest extends SimpleTopicTest {
} }
protected void setUp() throws Exception { protected void setUp() throws Exception {
this.consumerSleepDuration=2000; numberOfConsumers = 50;
numberofProducers = 50;
this.consumerSleepDuration=10;
super.setUp(); super.setUp();
} }

View File

@ -66,8 +66,10 @@ public class ReconnectTest extends TestCase {
private ActiveMQConnection connection; private ActiveMQConnection connection;
private AtomicBoolean stop = new AtomicBoolean(false); private AtomicBoolean stop = new AtomicBoolean(false);
private Throwable error; private Throwable error;
private String name;
public Worker() throws URISyntaxException, JMSException { public Worker(String name) throws URISyntaxException, JMSException {
this.name=name;
URI uri = new URI("failover://(mock://(" + tcpUri + "))"); URI uri = new URI("failover://(mock://(" + tcpUri + "))");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
connection = (ActiveMQConnection)factory.createConnection(); connection = (ActiveMQConnection)factory.createConnection();
@ -115,7 +117,7 @@ public class ReconnectTest extends TestCase {
public void run() { public void run() {
try { try {
ActiveMQQueue queue = new ActiveMQQueue("FOO"); ActiveMQQueue queue = new ActiveMQQueue("FOO_"+name);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
@ -213,7 +215,7 @@ public class ReconnectTest extends TestCase {
workers = new Worker[WORKER_COUNT]; workers = new Worker[WORKER_COUNT];
for (int i = 0; i < WORKER_COUNT; i++) { for (int i = 0; i < WORKER_COUNT; i++) {
workers[i] = new Worker(); workers[i] = new Worker(""+i);
workers[i].start(); workers[i].start();
} }