git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@660555 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-05-27 15:20:41 +00:00
parent 041b1b8ad5
commit 8ab5f45216
3 changed files with 257 additions and 137 deletions

View File

@ -1062,9 +1062,7 @@ public class Queue extends BaseDestination implements Task {
} }
final void sendMessage(final ConnectionContext context, Message msg) throws Exception { final void sendMessage(final ConnectionContext context, Message msg) throws Exception {
synchronized (messages) { messages.addMessageLast(msg);
messages.addMessageLast(msg);
}
destinationStatistics.getEnqueues().increment(); destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment(); destinationStatistics.getMessages().increment();
messageDelivered(context, msg); messageDelivered(context, msg);

View File

@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -59,6 +60,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private boolean flushRequired; private boolean flushRequired;
private AtomicBoolean started = new AtomicBoolean(); private AtomicBoolean started = new AtomicBoolean();
private MessageReference last = null; private MessageReference last = null;
private ReentrantLock lock = new ReentrantLock(true);
/** /**
* @param name * @param name
@ -93,20 +95,25 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/** /**
* @return true if there are no pending messages * @return true if there are no pending messages
*/ */
public synchronized boolean isEmpty() { public boolean isEmpty() {
if(memoryList.isEmpty() && isDiskListEmpty()){ lock.lock();
return true; try {
} if(memoryList.isEmpty() && isDiskListEmpty()){
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { return true;
MessageReference node = iterator.next();
if (node== QueueMessageReference.NULL_MESSAGE){
continue;
} }
if (!node.isDropped()) { for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
return false; MessageReference node = iterator.next();
if (node== QueueMessageReference.NULL_MESSAGE){
continue;
}
if (!node.isDropped()) {
return false;
}
// We can remove dropped references.
iterator.remove();
} }
// We can remove dropped references. } finally {
iterator.remove(); lock.unlock();
} }
return isDiskListEmpty(); return isDiskListEmpty();
} }
@ -116,48 +123,71 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/** /**
* reset the cursor * reset the cursor
*/ */
public synchronized void reset() { public void reset() {
iterating = true; lock.lock();
last = null; try {
iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator(); iterating = true;
} last = null;
iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
public synchronized void release() { } finally {
iterating = false; lock.unlock();
if (flushRequired) {
flushRequired = false;
flushToDisk();
} }
} }
public synchronized void destroy() throws Exception { public void release() {
stop(); lock.lock();
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) { try {
Message node = (Message)i.next(); synchronized(this) {
node.decrementReferenceCount(); iterating = false;
} this.notifyAll();
memoryList.clear(); }
if (!isDiskListEmpty()) { if (flushRequired) {
getDiskList().clear(); flushRequired = false;
flushToDisk();
}
} finally {
lock.unlock();
} }
} }
public synchronized LinkedList<MessageReference> pageInList(int maxItems) { public void destroy() throws Exception {
LinkedList<MessageReference> result = new LinkedList<MessageReference>(); lock.lock();
try {
stop();
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
Message node = (Message)i.next();
node.decrementReferenceCount();
}
memoryList.clear();
if (!isDiskListEmpty()) {
getDiskList().clear();
}
} finally {
lock.unlock();
}
}
public LinkedList<MessageReference> pageInList(int maxItems) {
int count = 0; int count = 0;
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) { LinkedList<MessageReference> result = new LinkedList<MessageReference>();
result.add(i.next()); lock.lock();
count++; try {
} for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
if (count < maxItems && !isDiskListEmpty()) { result.add(i.next());
for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
Message message = (Message)i.next();
message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
result.add(message);
count++; count++;
} }
if (count < maxItems && !isDiskListEmpty()) {
for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
Message message = (Message)i.next();
message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
result.add(message);
count++;
}
}
} finally {
lock.unlock();
} }
return result; return result;
} }
@ -167,35 +197,52 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* *
* @param node * @param node
*/ */
public synchronized void addMessageLast(MessageReference node) { public void addMessageLast(MessageReference node) {
if (!node.isExpired()) { if (!node.isExpired()) {
try { try {
regionDestination = node.getMessage().getRegionDestination(); lock.lock();
if (isDiskListEmpty()) { try {
if (hasSpace() || this.store==null) { while (iterating) {
memoryList.add(node); lock.unlock();
node.incrementReferenceCount(); synchronized(this) {
return; try {
this.wait();
} catch (InterruptedException ie) {}
}
lock.lock();
} }
} regionDestination = node.getMessage().getRegionDestination();
if (!hasSpace()) {
if (isDiskListEmpty()) { if (isDiskListEmpty()) {
expireOldMessages(); if (hasSpace() || this.store==null) {
if (hasSpace()) {
memoryList.add(node); memoryList.add(node);
node.incrementReferenceCount(); node.incrementReferenceCount();
return; return;
} else {
flushToDisk();
} }
} }
if (!hasSpace()) {
if (isDiskListEmpty()) {
expireOldMessages();
if (hasSpace()) {
memoryList.add(node);
node.incrementReferenceCount();
return;
} else {
flushToDisk();
}
}
}
if (systemUsage.getTempUsage().isFull()) {
lock.unlock();
systemUsage.getTempUsage().waitForSpace();
lock.lock();
}
getDiskList().add(node);
} finally {
lock.unlock();
} }
systemUsage.getTempUsage().waitForSpace();
getDiskList().add(node);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node LOG.error("Caught an Exception adding a message: " + node
+ " first to FilePendingMessageCursor ", e); + " last to FilePendingMessageCursor ", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} else { } else {
@ -208,32 +255,50 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* *
* @param node * @param node
*/ */
public synchronized void addMessageFirst(MessageReference node) { public void addMessageFirst(MessageReference node) {
if (!node.isExpired()) { if (!node.isExpired()) {
try { try {
regionDestination = node.getMessage().getRegionDestination(); lock.lock();
if (isDiskListEmpty()) { try {
if (hasSpace()) { while (iterating) {
memoryList.addFirst(node); lock.unlock();
node.incrementReferenceCount(); synchronized(this) {
return; try {
this.wait();
} catch (InterruptedException ie) {}
}
lock.lock();
} }
} regionDestination = node.getMessage().getRegionDestination();
if (!hasSpace()) {
if (isDiskListEmpty()) { if (isDiskListEmpty()) {
expireOldMessages();
if (hasSpace()) { if (hasSpace()) {
memoryList.addFirst(node); memoryList.addFirst(node);
node.incrementReferenceCount(); node.incrementReferenceCount();
return; return;
} else {
flushToDisk();
} }
} }
if (!hasSpace()) {
if (isDiskListEmpty()) {
expireOldMessages();
if (hasSpace()) {
memoryList.addFirst(node);
node.incrementReferenceCount();
return;
} else {
flushToDisk();
}
}
}
if (systemUsage.getTempUsage().isFull()) {
lock.unlock();
systemUsage.getTempUsage().waitForSpace();
lock.lock();
}
node.decrementReferenceCount();
getDiskList().addFirst(node);
} finally {
lock.unlock();
} }
systemUsage.getTempUsage().waitForSpace();
node.decrementReferenceCount();
getDiskList().addFirst(node);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node LOG.error("Caught an Exception adding a message: " + node
@ -244,25 +309,38 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
discard(node); discard(node);
} }
} }
/** /**
* @return true if there pending messages to dispatch * @return true if there pending messages to dispatch
*/ */
public synchronized boolean hasNext() { public boolean hasNext() {
return iter.hasNext(); boolean result;
lock.lock();
try {
result = iter.hasNext();
} finally {
lock.unlock();
}
return result;
} }
/** /**
* @return the next pending message * @return the next pending message
*/ */
public synchronized MessageReference next() { public MessageReference next() {
Message message = (Message)iter.next(); Message message;
last = message; lock.lock();
if (!isDiskListEmpty()) { try {
// got from disk message = (Message)iter.next();
message.setRegionDestination(regionDestination); last = message;
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); if (!isDiskListEmpty()) {
message.incrementReferenceCount(); // got from disk
message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
}
} finally {
lock.unlock();
} }
return message; return message;
} }
@ -270,10 +348,15 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/** /**
* remove the message at the cursor position * remove the message at the cursor position
*/ */
public synchronized void remove() { public void remove() {
iter.remove(); lock.lock();
if (last != null) { try {
last.decrementReferenceCount(); iter.remove();
if (last != null) {
last.decrementReferenceCount();
}
} finally {
lock.unlock();
} }
} }
@ -281,36 +364,61 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* @param node * @param node
* @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference) * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
*/ */
public synchronized void remove(MessageReference node) { public void remove(MessageReference node) {
if (memoryList.remove(node)) { lock.lock();
node.decrementReferenceCount(); try {
} if (memoryList.remove(node)) {
if (!isDiskListEmpty()) { node.decrementReferenceCount();
getDiskList().remove(node); }
if (!isDiskListEmpty()) {
getDiskList().remove(node);
}
} finally {
lock.unlock();
} }
} }
/** /**
* @return the number of pending messages * @return the number of pending messages
*/ */
public synchronized int size() { public int size() {
return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size()); int result;
lock.lock();
try {
result = memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
} finally {
lock.unlock();
}
return result;
} }
/** /**
* clear all pending messages * clear all pending messages
*/ */
public synchronized void clear() { public void clear() {
memoryList.clear(); lock.lock();
if (!isDiskListEmpty()) { try {
getDiskList().clear(); memoryList.clear();
if (!isDiskListEmpty()) {
getDiskList().clear();
}
last=null;
} finally {
lock.unlock();
} }
last=null;
} }
public synchronized boolean isFull() { public boolean isFull() {
// we always have space - as we can persist to disk boolean result;
return false; lock.lock();
try {
// we always have space - as we can persist to disk
// TODO: not necessarily true.
result = false;
} finally {
lock.unlock();
}
return result;
} }
public boolean hasMessagesBufferedToDeliver() { public boolean hasMessagesBufferedToDeliver() {
@ -324,7 +432,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
public void onUsageChanged(Usage usage, int oldPercentUsage, public void onUsageChanged(Usage usage, int oldPercentUsage,
int newPercentUsage) { int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) { if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
synchronized (this) { lock.lock();
try {
flushRequired = true; flushRequired = true;
if (!iterating) { if (!iterating) {
expireOldMessages(); expireOldMessages();
@ -333,6 +442,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
flushRequired = false; flushRequired = false;
} }
} }
} finally {
lock.unlock();
} }
} }
} }
@ -345,31 +456,39 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return hasSpace() && isDiskListEmpty(); return hasSpace() && isDiskListEmpty();
} }
protected synchronized void expireOldMessages() { protected void expireOldMessages() {
if (!memoryList.isEmpty()) { lock.lock();
LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList); try {
this.memoryList = new LinkedList<MessageReference>(); if (!memoryList.isEmpty()) {
while (!tmpList.isEmpty()) { LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
MessageReference node = tmpList.removeFirst(); this.memoryList = new LinkedList<MessageReference>();
if (node.isExpired()) { while (!tmpList.isEmpty()) {
discard(node); MessageReference node = tmpList.removeFirst();
}else { if (node.isExpired()) {
memoryList.add(node); discard(node);
} }else {
memoryList.add(node);
}
}
} }
} finally {
lock.unlock();
} }
} }
protected synchronized void flushToDisk() { protected void flushToDisk() {
lock.lock();
if (!memoryList.isEmpty()) { try {
while (!memoryList.isEmpty()) { if (!memoryList.isEmpty()) {
MessageReference node = memoryList.removeFirst(); while (!memoryList.isEmpty()) {
node.decrementReferenceCount(); MessageReference node = memoryList.removeFirst();
getDiskList().addLast(node); node.decrementReferenceCount();
getDiskList().addLast(node);
}
memoryList.clear();
} }
memoryList.clear(); } finally {
lock.unlock();
} }
} }

View File

@ -16,12 +16,10 @@
*/ */
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -89,7 +87,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
pendingCount = 0; pendingCount = 0;
} }
public synchronized void addMessageLast(MessageReference node) throws Exception { public void addMessageLast(MessageReference node) throws Exception {
if (node != null) { if (node != null) {
Message msg = node.getMessage(); Message msg = node.getMessage();
if (started) { if (started) {
@ -104,7 +102,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
} }
} }
public synchronized void addMessageFirst(MessageReference node) throws Exception { public void addMessageFirst(MessageReference node) throws Exception {
if (node != null) { if (node != null) {
Message msg = node.getMessage(); Message msg = node.getMessage();
if (started) { if (started) {
@ -142,6 +140,11 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
MessageReference result = currentCursor != null ? currentCursor.next() : null; MessageReference result = currentCursor != null ? currentCursor.next() : null;
return result; return result;
} }
public synchronized void release() {
nonPersistent.release();
persistent.release();
}
public synchronized void remove() { public synchronized void remove() {
if (currentCursor != null) { if (currentCursor != null) {
@ -159,7 +162,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
pendingCount--; pendingCount--;
} }
public synchronized void reset() { public void reset() {
nonPersistent.reset(); nonPersistent.reset();
persistent.reset(); persistent.reset();
} }