git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@661295 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-05-29 11:27:33 +00:00
parent 4dde3239ff
commit 0f1ef01d1b
3 changed files with 165 additions and 282 deletions

View File

@ -1062,7 +1062,12 @@ public class Queue extends BaseDestination implements Task {
}
final void sendMessage(final ConnectionContext context, Message msg) throws Exception {
messages.addMessageLast(msg);
if (!msg.isPersistent() && messages.getSystemUsage() != null) {
messages.getSystemUsage().getTempUsage().waitForSpace();
}
synchronized(messages) {
messages.addMessageLast(msg);
}
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
messageDelivered(context, msg);

View File

@ -21,7 +21,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@ -60,7 +59,6 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
private boolean flushRequired;
private AtomicBoolean started = new AtomicBoolean();
private MessageReference last = null;
private ReentrantLock lock = new ReentrantLock(true);
/**
* @param name
@ -95,25 +93,20 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/**
* @return true if there are no pending messages
*/
public boolean isEmpty() {
lock.lock();
try {
if(memoryList.isEmpty() && isDiskListEmpty()){
return true;
public synchronized boolean isEmpty() {
if(memoryList.isEmpty() && isDiskListEmpty()){
return true;
}
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
if (node== QueueMessageReference.NULL_MESSAGE){
continue;
}
for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) {
MessageReference node = iterator.next();
if (node== QueueMessageReference.NULL_MESSAGE){
continue;
}
if (!node.isDropped()) {
return false;
}
// We can remove dropped references.
iterator.remove();
if (!node.isDropped()) {
return false;
}
} finally {
lock.unlock();
// We can remove dropped references.
iterator.remove();
}
return isDiskListEmpty();
}
@ -123,71 +116,48 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/**
* reset the cursor
*/
public void reset() {
lock.lock();
try {
iterating = true;
last = null;
iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
} finally {
lock.unlock();
public synchronized void reset() {
iterating = true;
last = null;
iter = isDiskListEmpty() ? memoryList.iterator() : getDiskList().listIterator();
}
public synchronized void release() {
iterating = false;
if (flushRequired) {
flushRequired = false;
flushToDisk();
}
}
public void release() {
lock.lock();
try {
synchronized(this) {
iterating = false;
this.notifyAll();
}
if (flushRequired) {
flushRequired = false;
flushToDisk();
}
} finally {
lock.unlock();
public synchronized void destroy() throws Exception {
stop();
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
Message node = (Message)i.next();
node.decrementReferenceCount();
}
memoryList.clear();
if (!isDiskListEmpty()) {
getDiskList().clear();
}
}
public void destroy() throws Exception {
lock.lock();
try {
stop();
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
Message node = (Message)i.next();
node.decrementReferenceCount();
}
memoryList.clear();
if (!isDiskListEmpty()) {
getDiskList().clear();
}
} finally {
lock.unlock();
}
}
public LinkedList<MessageReference> pageInList(int maxItems) {
int count = 0;
public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
LinkedList<MessageReference> result = new LinkedList<MessageReference>();
lock.lock();
try {
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
result.add(i.next());
int count = 0;
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext() && count < maxItems;) {
result.add(i.next());
count++;
}
if (count < maxItems && !isDiskListEmpty()) {
for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
Message message = (Message)i.next();
message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
result.add(message);
count++;
}
if (count < maxItems && !isDiskListEmpty()) {
for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
Message message = (Message)i.next();
message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
result.add(message);
count++;
}
}
} finally {
lock.unlock();
}
return result;
}
@ -197,108 +167,31 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
*
* @param node
*/
public void addMessageLast(MessageReference node) {
public synchronized void addMessageLast(MessageReference node) {
if (!node.isExpired()) {
try {
lock.lock();
try {
while (iterating) {
lock.unlock();
synchronized(this) {
try {
this.wait();
} catch (InterruptedException ie) {}
}
lock.lock();
regionDestination = node.getMessage().getRegionDestination();
if (isDiskListEmpty()) {
if (hasSpace() || this.store==null) {
memoryList.add(node);
node.incrementReferenceCount();
return;
}
regionDestination = node.getMessage().getRegionDestination();
}
if (!hasSpace()) {
if (isDiskListEmpty()) {
if (hasSpace() || this.store==null) {
expireOldMessages();
if (hasSpace()) {
memoryList.add(node);
node.incrementReferenceCount();
return;
} else {
flushToDisk();
}
}
if (!hasSpace()) {
if (isDiskListEmpty()) {
expireOldMessages();
if (hasSpace()) {
memoryList.add(node);
node.incrementReferenceCount();
return;
} else {
flushToDisk();
}
}
}
if (systemUsage.getTempUsage().isFull()) {
lock.unlock();
systemUsage.getTempUsage().waitForSpace();
lock.lock();
}
getDiskList().add(node);
} finally {
lock.unlock();
}
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node
+ " last to FilePendingMessageCursor ", e);
throw new RuntimeException(e);
}
} else {
discard(node);
}
}
/**
* add message to await dispatch
*
* @param node
*/
public void addMessageFirst(MessageReference node) {
if (!node.isExpired()) {
try {
lock.lock();
try {
while (iterating) {
lock.unlock();
synchronized(this) {
try {
this.wait();
} catch (InterruptedException ie) {}
}
lock.lock();
}
regionDestination = node.getMessage().getRegionDestination();
if (isDiskListEmpty()) {
if (hasSpace()) {
memoryList.addFirst(node);
node.incrementReferenceCount();
return;
}
}
if (!hasSpace()) {
if (isDiskListEmpty()) {
expireOldMessages();
if (hasSpace()) {
memoryList.addFirst(node);
node.incrementReferenceCount();
return;
} else {
flushToDisk();
}
}
}
if (systemUsage.getTempUsage().isFull()) {
lock.unlock();
systemUsage.getTempUsage().waitForSpace();
lock.lock();
}
node.decrementReferenceCount();
getDiskList().addFirst(node);
} finally {
lock.unlock();
}
systemUsage.getTempUsage().waitForSpace();
getDiskList().add(node);
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node
@ -309,38 +202,67 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
discard(node);
}
}
/**
* add message to await dispatch
*
* @param node
*/
public synchronized void addMessageFirst(MessageReference node) {
if (!node.isExpired()) {
try {
regionDestination = node.getMessage().getRegionDestination();
if (isDiskListEmpty()) {
if (hasSpace()) {
memoryList.addFirst(node);
node.incrementReferenceCount();
return;
}
}
if (!hasSpace()) {
if (isDiskListEmpty()) {
expireOldMessages();
if (hasSpace()) {
memoryList.addFirst(node);
node.incrementReferenceCount();
return;
} else {
flushToDisk();
}
}
}
systemUsage.getTempUsage().waitForSpace();
node.decrementReferenceCount();
getDiskList().addFirst(node);
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node
+ " first to FilePendingMessageCursor ", e);
throw new RuntimeException(e);
}
} else {
discard(node);
}
}
/**
* @return true if there pending messages to dispatch
*/
public boolean hasNext() {
boolean result;
lock.lock();
try {
result = iter.hasNext();
} finally {
lock.unlock();
}
return result;
public synchronized boolean hasNext() {
return iter.hasNext();
}
/**
* @return the next pending message
*/
public MessageReference next() {
Message message;
lock.lock();
try {
message = (Message)iter.next();
last = message;
if (!isDiskListEmpty()) {
// got from disk
message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
}
} finally {
lock.unlock();
public synchronized MessageReference next() {
Message message = (Message)iter.next();
last = message;
if (!isDiskListEmpty()) {
// got from disk
message.setRegionDestination(regionDestination);
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
}
return message;
}
@ -348,15 +270,10 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
/**
* remove the message at the cursor position
*/
public void remove() {
lock.lock();
try {
iter.remove();
if (last != null) {
last.decrementReferenceCount();
}
} finally {
lock.unlock();
public synchronized void remove() {
iter.remove();
if (last != null) {
last.decrementReferenceCount();
}
}
@ -364,61 +281,36 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* @param node
* @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
*/
public void remove(MessageReference node) {
lock.lock();
try {
if (memoryList.remove(node)) {
node.decrementReferenceCount();
}
if (!isDiskListEmpty()) {
getDiskList().remove(node);
}
} finally {
lock.unlock();
public synchronized void remove(MessageReference node) {
if (memoryList.remove(node)) {
node.decrementReferenceCount();
}
if (!isDiskListEmpty()) {
getDiskList().remove(node);
}
}
/**
* @return the number of pending messages
*/
public int size() {
int result;
lock.lock();
try {
result = memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
} finally {
lock.unlock();
}
return result;
public synchronized int size() {
return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
}
/**
* clear all pending messages
*/
public void clear() {
lock.lock();
try {
memoryList.clear();
if (!isDiskListEmpty()) {
getDiskList().clear();
}
last=null;
} finally {
lock.unlock();
public synchronized void clear() {
memoryList.clear();
if (!isDiskListEmpty()) {
getDiskList().clear();
}
last=null;
}
public boolean isFull() {
boolean result;
lock.lock();
try {
// we always have space - as we can persist to disk
// TODO: not necessarily true.
result = false;
} finally {
lock.unlock();
}
return result;
public synchronized boolean isFull() {
// we always have space - as we can persist to disk
return false;
}
public boolean hasMessagesBufferedToDeliver() {
@ -432,8 +324,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
public void onUsageChanged(Usage usage, int oldPercentUsage,
int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
lock.lock();
try {
synchronized (this) {
flushRequired = true;
if (!iterating) {
expireOldMessages();
@ -442,8 +333,6 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
flushRequired = false;
}
}
} finally {
lock.unlock();
}
}
}
@ -456,39 +345,31 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return hasSpace() && isDiskListEmpty();
}
protected void expireOldMessages() {
lock.lock();
try {
if (!memoryList.isEmpty()) {
LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
this.memoryList = new LinkedList<MessageReference>();
while (!tmpList.isEmpty()) {
MessageReference node = tmpList.removeFirst();
if (node.isExpired()) {
discard(node);
}else {
memoryList.add(node);
}
}
protected synchronized void expireOldMessages() {
if (!memoryList.isEmpty()) {
LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
this.memoryList = new LinkedList<MessageReference>();
while (!tmpList.isEmpty()) {
MessageReference node = tmpList.removeFirst();
if (node.isExpired()) {
discard(node);
}else {
memoryList.add(node);
}
}
} finally {
lock.unlock();
}
}
protected void flushToDisk() {
lock.lock();
try {
if (!memoryList.isEmpty()) {
while (!memoryList.isEmpty()) {
MessageReference node = memoryList.removeFirst();
node.decrementReferenceCount();
getDiskList().addLast(node);
}
memoryList.clear();
protected synchronized void flushToDisk() {
if (!memoryList.isEmpty()) {
while (!memoryList.isEmpty()) {
MessageReference node = memoryList.removeFirst();
node.decrementReferenceCount();
getDiskList().addLast(node);
}
} finally {
lock.unlock();
memoryList.clear();
}
}

View File

@ -16,10 +16,12 @@
*/
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -87,7 +89,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
pendingCount = 0;
}
public void addMessageLast(MessageReference node) throws Exception {
public synchronized void addMessageLast(MessageReference node) throws Exception {
if (node != null) {
Message msg = node.getMessage();
if (started) {
@ -102,7 +104,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
}
public void addMessageFirst(MessageReference node) throws Exception {
public synchronized void addMessageFirst(MessageReference node) throws Exception {
if (node != null) {
Message msg = node.getMessage();
if (started) {
@ -140,11 +142,6 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
MessageReference result = currentCursor != null ? currentCursor.next() : null;
return result;
}
public synchronized void release() {
nonPersistent.release();
persistent.release();
}
public synchronized void remove() {
if (currentCursor != null) {
@ -162,7 +159,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
pendingCount--;
}
public void reset() {
public synchronized void reset() {
nonPersistent.reset();
persistent.reset();
}