Improve concurrency by using read/write locks

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@961245 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-07-07 05:10:17 +00:00
parent cc700dd841
commit 28180b35bd
1 changed files with 327 additions and 205 deletions

View File

@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
@ -92,11 +93,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
protected static final Log LOG = LogFactory.getLog(Queue.class);
protected final TaskRunnerFactory taskFactory;
protected TaskRunner taskRunner;
private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock();
protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
private final ReentrantReadWriteLock messagesLock = new ReentrantReadWriteLock();
protected PendingMessageCursor messages;
private final ReentrantReadWriteLock pagedInMessagesLock = new ReentrantReadWriteLock();
private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
// Messages that are paged in but have not yet been targeted at a
// subscription
private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>();
private MessageGroupMap messageGroupOwners;
@ -106,7 +111,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private ExecutorService executor;
protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
private final Object dispatchMutex = new Object();
private boolean useConsumerPriority = true;
private boolean strictOrderDispatch = false;
private final QueueDispatchSelector dispatchSelector;
@ -219,8 +223,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
public List<Subscription> getConsumers() {
synchronized (consumers) {
consumersLock.readLock().lock();
try {
return new ArrayList<Subscription>(consumers);
}finally {
consumersLock.readLock().unlock();
}
}
@ -284,12 +291,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
if (hasSpace()) {
message.setRegionDestination(Queue.this);
synchronized (messages) {
messagesLock.writeLock().lock();
try{
try {
messages.addMessageLast(message);
} catch (Exception e) {
LOG.fatal("Failed to add message to cursor", e);
}
}finally {
messagesLock.writeLock().unlock();
}
destinationStatistics.getMessages().increment();
return true;
@ -348,13 +358,16 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// synchronize with dispatch method so that no new messages are sent
// while setting up a subscription. avoid out of order messages,
// duplicates, etc.
synchronized (dispatchMutex) {
pagedInPendingDispatchLock.writeLock().lock();
try {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
// needs to be synchronized - so no contention with dispatching
synchronized (consumers) {
// consumersLock.
consumersLock.writeLock().lock();
try {
// set a flag if this is a first consumer
if (consumers.size() == 0) {
@ -378,20 +391,27 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
}finally {
consumersLock.writeLock().unlock();
}
if (sub instanceof QueueBrowserSubscription) {
// tee up for dispatch in next iterate
QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
synchronized (pagedInMessages) {
pagedInMessagesLock.readLock().lock();
try{
BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
browserDispatches.addLast(browserDispatch);
}finally {
pagedInMessagesLock.readLock().unlock();
}
}
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
}
}finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
if (this.optimizedDispatch || isSlave()) {
// Outside of dispatchLock() to maintain the lock hierarchy of
@ -406,14 +426,16 @@ public class Queue extends BaseDestination implements Task, UsageListener {
destinationStatistics.getConsumers().decrement();
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
synchronized (dispatchMutex) {
pagedInPendingDispatchLock.writeLock().lock();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
+ getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
+ getDestinationStatistics().getDispatched().getCount() + ", inflight: "
+ getDestinationStatistics().getInflight().getCount());
}
synchronized (consumers) {
consumersLock.writeLock().lock();
try {
removeFromConsumerList(sub);
if (sub.getConsumerInfo().isExclusive()) {
Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
@ -461,10 +483,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (!redeliveredWaitingDispatch.isEmpty()) {
doDispatch(new ArrayList<QueueMessageReference>());
}
}finally {
consumersLock.writeLock().unlock();
}
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
}
}finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
if (this.optimizedDispatch || isSlave()) {
// Outside of dispatchLock() to maintain the lock hierarchy of
@ -758,8 +784,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
@Override
public String toString() {
int size = 0;
synchronized (messages) {
messagesLock.readLock().lock();
try{
size = messages.size();
}finally {
messagesLock.readLock().unlock();
}
return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size()
+ ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
@ -919,58 +948,70 @@ public class Queue extends BaseDestination implements Task, UsageListener {
try {
pageInMessages(false);
List<MessageReference> toExpire = new ArrayList<MessageReference>();
synchronized (dispatchMutex) {
synchronized (pagedInPendingDispatch) {
addAll(pagedInPendingDispatch, browseList, max, toExpire);
for (MessageReference ref : toExpire) {
pagedInPendingDispatch.remove(ref);
if (broker.isExpired(ref)) {
LOG.debug("expiring from pagedInPending: " + ref);
messageExpired(connectionContext, ref);
}
}
}
toExpire.clear();
synchronized (pagedInMessages) {
addAll(pagedInMessages.values(), browseList, max, toExpire);
}
for (MessageReference ref : toExpire) {
if (broker.isExpired(ref)) {
LOG.debug("expiring from pagedInMessages: " + ref);
messageExpired(connectionContext, ref);
} else {
synchronized (pagedInMessages) {
pagedInMessages.remove(ref.getMessageId());
}
}
}
if (browseList.size() < getMaxBrowsePageSize()) {
synchronized (messages) {
try {
messages.reset();
while (messages.hasNext() && browseList.size() < max) {
MessageReference node = messages.next();
if (node.isExpired()) {
if (broker.isExpired(node)) {
LOG.debug("expiring from messages: " + node);
messageExpired(connectionContext, createMessageReference(node.getMessage()));
}
messages.remove();
} else {
messages.rollback(node.getMessageId());
if (browseList.contains(node.getMessage()) == false) {
browseList.add(node.getMessage());
}
}
node.decrementReferenceCount();
}
} finally {
messages.release();
}
pagedInPendingDispatchLock.writeLock().lock();
try {
addAll(pagedInPendingDispatch, browseList, max, toExpire);
for (MessageReference ref : toExpire) {
pagedInPendingDispatch.remove(ref);
if (broker.isExpired(ref)) {
LOG.debug("expiring from pagedInPending: " + ref);
messageExpired(connectionContext, ref);
}
}
} finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
toExpire.clear();
pagedInMessagesLock.readLock().lock();
try {
addAll(pagedInMessages.values(), browseList, max, toExpire);
} finally {
pagedInMessagesLock.readLock().unlock();
}
for (MessageReference ref : toExpire) {
if (broker.isExpired(ref)) {
LOG.debug("expiring from pagedInMessages: " + ref);
messageExpired(connectionContext, ref);
} else {
pagedInMessagesLock.writeLock().lock();
try {
pagedInMessages.remove(ref.getMessageId());
} finally {
pagedInMessagesLock.writeLock().unlock();
}
}
}
if (browseList.size() < getMaxBrowsePageSize()) {
messagesLock.writeLock().lock();
try {
try {
messages.reset();
while (messages.hasNext() && browseList.size() < max) {
MessageReference node = messages.next();
if (node.isExpired()) {
if (broker.isExpired(node)) {
LOG.debug("expiring from messages: " + node);
messageExpired(connectionContext, createMessageReference(node.getMessage()));
}
messages.remove();
} else {
messages.rollback(node.getMessageId());
if (browseList.contains(node.getMessage()) == false) {
browseList.add(node.getMessage());
}
}
node.decrementReferenceCount();
}
} finally {
messages.release();
}
} finally {
messagesLock.writeLock().unlock();
}
}
} catch (Exception e) {
LOG.error("Problem retrieving message for browse", e);
}
@ -990,13 +1031,17 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public Message getMessage(String id) {
MessageId msgId = new MessageId(id);
synchronized (pagedInMessages) {
pagedInMessagesLock.readLock().lock();
try{
QueueMessageReference r = this.pagedInMessages.get(msgId);
if (r != null) {
return r.getMessage();
}
}finally {
pagedInMessagesLock.readLock().unlock();
}
synchronized (messages) {
messagesLock.readLock().lock();
try{
try {
messages.reset();
while (messages.hasNext()) {
@ -1014,6 +1059,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} finally {
messages.release();
}
}finally {
messagesLock.readLock().unlock();
}
return null;
}
@ -1023,8 +1070,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
List<MessageReference> list = null;
do {
doPageIn(true);
synchronized (pagedInMessages) {
pagedInMessagesLock.readLock().lock();
try {
list = new ArrayList<MessageReference>(pagedInMessages.values());
}finally {
pagedInMessagesLock.readLock().unlock();
}
for (MessageReference ref : list) {
@ -1085,8 +1135,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
ConnectionContext context = createConnectionContext();
do {
doPageIn(true);
synchronized (pagedInMessages) {
pagedInMessagesLock.readLock().lock();
try{
set.addAll(pagedInMessages.values());
}finally {
pagedInMessagesLock.readLock().unlock();
}
List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
@ -1149,8 +1202,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
doPageIn(true);
setMaxPageSize(oldMaxSize);
synchronized (pagedInMessages) {
pagedInMessagesLock.readLock().lock();
try {
set.addAll(pagedInMessages.values());
}finally {
pagedInMessagesLock.readLock().unlock();
}
List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
@ -1189,8 +1245,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
QueueMessageReference r = createMessageReference(m);
BrokerSupport.resend(context, m, dest);
removeMessage(context, r);
synchronized (messages) {
messagesLock.writeLock().lock();
try{
messages.rollback(r.getMessageId());
}finally {
messagesLock.writeLock().unlock();
}
return true;
}
@ -1232,8 +1291,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
do {
doPageIn(true);
synchronized (pagedInMessages) {
pagedInMessagesLock.readLock().lock();
try{
set.addAll(pagedInMessages.values());
}finally {
pagedInMessagesLock.readLock().unlock();
}
List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
@ -1252,11 +1314,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
BrowserDispatch getNextBrowserDispatch() {
synchronized (pagedInMessages) {
pagedInMessagesLock.readLock().lock();
try{
if (browserDispatches.isEmpty()) {
return null;
}
return browserDispatches.removeFirst();
}finally {
pagedInMessagesLock.readLock().unlock();
}
}
@ -1317,15 +1382,18 @@ public class Queue extends BaseDestination implements Task, UsageListener {
BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
synchronized (messages) {
messagesLock.readLock().lock();
try{
pageInMoreMessages |= !messages.isEmpty();
}finally {
messagesLock.readLock().unlock();
}
// Kinda ugly.. but I think dispatchLock is the only mutex
// protecting the
// pagedInPendingDispatch variable.
synchronized (dispatchMutex) {
pagedInPendingDispatchLock.readLock().lock();
try {
pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
}finally {
pagedInPendingDispatchLock.readLock().unlock();
}
// Perhaps we should page always into the pagedInPendingDispatch
@ -1344,8 +1412,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if (pendingBrowserDispatch != null) {
ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
synchronized (pagedInMessages) {
pagedInMessagesLock.readLock().lock();
try{
alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
}finally {
pagedInMessagesLock.readLock().unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
@ -1412,11 +1483,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
removeMessage(c, null, r);
synchronized (dispatchMutex) {
synchronized (pagedInPendingDispatch) {
pagedInPendingDispatch.remove(r);
}
pagedInPendingDispatchLock.writeLock().lock();
try {
pagedInPendingDispatch.remove(r);
} finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
}
protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
@ -1457,8 +1530,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
if (ack.isPoisonAck()) {
// message gone to DLQ, is ok to allow redelivery
synchronized (messages) {
messagesLock.writeLock().lock();
try{
messages.rollback(reference.getMessageId());
}finally {
messagesLock.writeLock().unlock();
}
}
@ -1467,8 +1543,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private void dropMessage(QueueMessageReference reference) {
reference.drop();
destinationStatistics.getMessages().decrement();
synchronized (pagedInMessages) {
pagedInMessagesLock.writeLock().lock();
try{
pagedInMessages.remove(reference.getMessageId());
}finally {
pagedInMessagesLock.writeLock().unlock();
}
}
@ -1498,8 +1577,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
final void sendMessage(final Message msg) throws Exception {
synchronized (messages) {
messagesLock.writeLock().lock();
try{
messages.addMessageLast(msg);
}finally {
messagesLock.writeLock().unlock();
}
}
@ -1507,10 +1589,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
messageDelivered(context, msg);
synchronized (consumers) {
consumersLock.readLock().lock();
try {
if (consumers.isEmpty()) {
onMessageWithNoConsumers(context, msg);
}
}finally {
consumersLock.readLock().unlock();
}
wakeup();
}
@ -1540,99 +1625,118 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
List<QueueMessageReference> result = null;
List<QueueMessageReference> resultList = null;
synchronized (dispatchMutex) {
int toPageIn = Math.min(getMaxPageSize(), messages.size());
if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
+ destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
+ pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
}
if (isLazyDispatch() && !force) {
// Only page in the minimum number of messages which can be
// dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
}
if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingDispatch.size() < getMaxPageSize()))) {
int count = 0;
result = new ArrayList<QueueMessageReference>(toPageIn);
synchronized (messages) {
try {
messages.setMaxBatchSize(toPageIn);
messages.reset();
while (messages.hasNext() && count < toPageIn) {
MessageReference node = messages.next();
messages.remove();
QueueMessageReference ref = createMessageReference(node.getMessage());
if (ref.isExpired()) {
if (broker.isExpired(ref)) {
messageExpired(createConnectionContext(), ref);
} else {
ref.decrementReferenceCount();
}
} else {
result.add(ref);
count++;
}
}
} finally {
messages.release();
}
}
// Only add new messages, not already pagedIn to avoid multiple
// dispatch attempts
synchronized (pagedInMessages) {
resultList = new ArrayList<QueueMessageReference>(result.size());
for (QueueMessageReference ref : result) {
if (!pagedInMessages.containsKey(ref.getMessageId())) {
pagedInMessages.put(ref.getMessageId(), ref);
resultList.add(ref);
} else {
ref.decrementReferenceCount();
}
}
}
} else {
// Avoid return null list, if condition is not validated
resultList = new ArrayList<QueueMessageReference>();
}
int toPageIn = Math.min(getMaxPageSize(), messages.size());
if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
+ destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
+ pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
}
if (isLazyDispatch() && !force) {
// Only page in the minimum number of messages which can be
// dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
}
int pagedInPendingSize = 0;
pagedInPendingDispatchLock.readLock().lock();
try {
pagedInPendingSize = pagedInPendingDispatch.size();
} finally {
pagedInPendingDispatchLock.readLock().unlock();
}
if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) {
int count = 0;
result = new ArrayList<QueueMessageReference>(toPageIn);
messagesLock.writeLock().lock();
try {
try {
messages.setMaxBatchSize(toPageIn);
messages.reset();
while (messages.hasNext() && count < toPageIn) {
MessageReference node = messages.next();
messages.remove();
QueueMessageReference ref = createMessageReference(node.getMessage());
if (ref.isExpired()) {
if (broker.isExpired(ref)) {
messageExpired(createConnectionContext(), ref);
} else {
ref.decrementReferenceCount();
}
} else {
result.add(ref);
count++;
}
}
} finally {
messages.release();
}
} finally {
messagesLock.writeLock().unlock();
}
// Only add new messages, not already pagedIn to avoid multiple
// dispatch attempts
pagedInMessagesLock.readLock().lock();
try {
resultList = new ArrayList<QueueMessageReference>(result.size());
for (QueueMessageReference ref : result) {
if (!pagedInMessages.containsKey(ref.getMessageId())) {
pagedInMessagesLock.readLock().unlock();
pagedInMessagesLock.writeLock().lock();
pagedInMessages.put(ref.getMessageId(), ref);
pagedInMessagesLock.readLock().lock();
pagedInMessagesLock.writeLock().unlock();
resultList.add(ref);
} else {
ref.decrementReferenceCount();
}
}
} finally {
pagedInMessagesLock.readLock().unlock();
}
} else {
// Avoid return null list, if condition is not validated
resultList = new ArrayList<QueueMessageReference>();
}
return resultList;
}
private void doDispatch(List<QueueMessageReference> list) throws Exception {
boolean doWakeUp = false;
synchronized (dispatchMutex) {
synchronized (pagedInPendingDispatch) {
if (!redeliveredWaitingDispatch.isEmpty()) {
// Try first to dispatch redelivered messages to keep an
// proper order
redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
}
if (!pagedInPendingDispatch.isEmpty()) {
// Next dispatch anything that had not been
// dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
}
// and now see if we can dispatch the new stuff.. and append to
// the pending
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list));
} else {
for (QueueMessageReference qmr : list) {
if (!pagedInPendingDispatch.contains(qmr)) {
pagedInPendingDispatch.add(qmr);
}
pagedInPendingDispatchLock.writeLock().lock();
try {
if (!redeliveredWaitingDispatch.isEmpty()) {
// Try first to dispatch redelivered messages to keep an
// proper order
redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
}
if (!pagedInPendingDispatch.isEmpty()) {
// Next dispatch anything that had not been
// dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
}
// and now see if we can dispatch the new stuff.. and append to
// the pending
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list));
} else {
for (QueueMessageReference qmr : list) {
if (!pagedInPendingDispatch.contains(qmr)) {
pagedInPendingDispatch.add(qmr);
}
doWakeUp = true;
}
doWakeUp = true;
}
}
} finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
if (doWakeUp) {
// avoid lock order contention
asyncWakeup();
@ -1645,13 +1749,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
*/
private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
List<Subscription> consumers;
synchronized (this.consumers) {
consumersLock.writeLock().lock();
try {
if (this.consumers.isEmpty() || isSlave()) {
// slave dispatch happens in processDispatchNotification
return list;
}
consumers = new ArrayList<Subscription>(this.consumers);
}finally {
consumersLock.writeLock().unlock();
}
List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
@ -1698,11 +1804,14 @@ public class Queue extends BaseDestination implements Task, UsageListener {
// distribution.
if (target != null && !strictOrderDispatch && consumers.size() > 1
&& !dispatchSelector.isExclusiveConsumer(target)) {
synchronized (this.consumers) {
consumersLock.writeLock().lock();
try {
if (removeFromConsumerList(target)) {
addToConsumerList(target);
consumers = new ArrayList<Subscription>(this.consumers);
}
}finally {
consumersLock.writeLock().unlock();
}
}
}
@ -1730,12 +1839,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private int getConsumerMessageCountBeforeFull() throws Exception {
int total = 0;
boolean zeroPrefetch = false;
synchronized (consumers) {
consumersLock.readLock().lock();
try{
for (Subscription s : consumers) {
zeroPrefetch |= s.getPrefetchSize() == 0;
int countBeforeFull = s.countBeforeFull();
total += countBeforeFull;
}
}finally {
consumersLock.readLock().unlock();
}
if (total == 0 && zeroPrefetch) {
total = 1;
@ -1768,50 +1880,57 @@ public class Queue extends BaseDestination implements Task, UsageListener {
QueueMessageReference message = null;
MessageId messageId = messageDispatchNotification.getMessageId();
synchronized (dispatchMutex) {
synchronized (pagedInPendingDispatch) {
for (QueueMessageReference ref : pagedInPendingDispatch) {
if (messageId.equals(ref.getMessageId())) {
message = ref;
pagedInPendingDispatch.remove(ref);
break;
}
pagedInPendingDispatchLock.writeLock().lock();
try {
for (QueueMessageReference ref : pagedInPendingDispatch) {
if (messageId.equals(ref.getMessageId())) {
message = ref;
pagedInPendingDispatch.remove(ref);
break;
}
}
if (message == null) {
synchronized (pagedInMessages) {
message = pagedInMessages.get(messageId);
}
}
if (message == null) {
synchronized (messages) {
try {
messages.setMaxBatchSize(getMaxPageSize());
messages.reset();
while (messages.hasNext()) {
MessageReference node = messages.next();
messages.remove();
if (messageId.equals(node.getMessageId())) {
message = this.createMessageReference(node.getMessage());
break;
}
}
} finally {
messages.release();
}
}
}
if (message == null) {
Message msg = loadMessage(messageId);
if (msg != null) {
message = this.createMessageReference(msg);
}
}
} finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
if (message == null) {
pagedInMessagesLock.readLock().lock();
try {
message = pagedInMessages.get(messageId);
} finally {
pagedInMessagesLock.readLock().unlock();
}
}
if (message == null) {
messagesLock.writeLock().lock();
try {
try {
messages.setMaxBatchSize(getMaxPageSize());
messages.reset();
while (messages.hasNext()) {
MessageReference node = messages.next();
messages.remove();
if (messageId.equals(node.getMessageId())) {
message = this.createMessageReference(node.getMessage());
break;
}
}
} finally {
messages.release();
}
} finally {
messagesLock.writeLock().unlock();
}
}
if (message == null) {
Message msg = loadMessage(messageId);
if (msg != null) {
message = this.createMessageReference(msg);
}
}
if (message == null) {
throw new JMSException("Slave broker out of sync with master - Message: "
+ messageDispatchNotification.getMessageId() + " on "
@ -1832,13 +1951,16 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
throws JMSException {
Subscription sub = null;
synchronized (consumers) {
consumersLock.readLock().lock();
try {
for (Subscription s : consumers) {
if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
sub = s;
break;
}
}
}finally {
consumersLock.readLock().unlock();
}
return sub;
}