Robert Davies 2008-09-02 06:14:22 +00:00
parent 1bd6be8a8e
commit 47cfa5590b
10 changed files with 212 additions and 123 deletions

View File

@ -38,7 +38,7 @@ public abstract class BaseDestination implements Destination {
* from persistent storage
*/
public static final int MAX_PAGE_SIZE=200;
public static final int MAX_BROWSE_PAGE_SIZE=MAX_PAGE_SIZE*2;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
@ -49,6 +49,7 @@ public abstract class BaseDestination implements Destination {
private int maxAuditDepth=2048;
private boolean enableAudit=true;
private int maxPageSize=MAX_PAGE_SIZE;
private int maxBrowsePageSize=MAX_BROWSE_PAGE_SIZE;
private boolean useCache=true;
private int minimumMessageSize=1024;
private boolean lazyDispatch=false;
@ -187,6 +188,14 @@ public abstract class BaseDestination implements Destination {
public void setMaxPageSize(int maxPageSize) {
this.maxPageSize = maxPageSize;
}
public int getMaxBrowsePageSize() {
return this.maxBrowsePageSize;
}
public void setMaxBrowsePageSize(int maxPageSize) {
this.maxBrowsePageSize = maxPageSize;
}
public boolean isUseCache() {
return useCache;

View File

@ -91,6 +91,10 @@ public interface Destination extends Service, Task {
public void setMaxPageSize(int maxPageSize);
public int getMaxBrowsePageSize();
public void setMaxBrowsePageSize(int maxPageSize);
public boolean isUseCache();
public void setUseCache(boolean useCache);

View File

@ -249,4 +249,12 @@ public class DestinationFilter implements Destination {
public void messageExpired(ConnectionContext context, Subscription subs,MessageReference node) {
next.messageExpired(context,subs, node);
}
public int getMaxBrowsePageSize() {
return next.getMaxBrowsePageSize();
}
public void setMaxBrowsePageSize(int maxPageSize) {
next.setMaxBrowsePageSize(maxPageSize);
}
}

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@ -616,80 +617,91 @@ public class Queue extends BaseDestination implements Task {
}
public Message[] browse() {
int count = 0;
List<Message> l = new ArrayList<Message>();
try {
doPageIn(true);
} catch (Exception e) {
LOG.error("caught an exception browsing " + this, e);
}
synchronized (pagedInMessages) {
for (QueueMessageReference node:pagedInMessages.values()){
node.incrementReferenceCount();
try {
Message m = node.getMessage();
if (m != null) {
l.add(m);
}
} catch (IOException e) {
LOG.error("caught an exception browsing " + this, e);
} finally {
node.decrementReferenceCount();
synchronized (this.pagedInPendingDispatch) {
for (Iterator<QueueMessageReference> i = this.pagedInPendingDispatch
.iterator(); i.hasNext()
&& count < getMaxBrowsePageSize();) {
l.add(i.next().getMessage());
count++;
}
}
}
synchronized (messages) {
try {
messages.reset();
while (messages.hasNext()) {
try {
MessageReference r = messages.next();
r.incrementReferenceCount();
try {
Message m = r.getMessage();
if (m != null) {
l.add(m);
}
} finally {
r.decrementReferenceCount();
if (count < getMaxBrowsePageSize()) {
synchronized (pagedInMessages) {
for (Iterator<QueueMessageReference> i = this.pagedInMessages
.values().iterator(); i.hasNext()
&& count < getMaxBrowsePageSize();) {
Message m = i.next().getMessage();
if (l.contains(m) == false) {
l.add(m);
count++;
}
} catch (IOException e) {
LOG.error("caught an exception brwsing " + this, e);
}
}
} finally {
messages.release();
}
if (count < getMaxBrowsePageSize()) {
synchronized (messages) {
try {
messages.reset();
while (messages.hasNext()
&& count < getMaxBrowsePageSize()) {
MessageReference node = messages.next();
messages.rollback(node.getMessageId());
if (node != null) {
Message m = node.getMessage();
if (l.contains(m) == false) {
l.add(m);
count++;
}
}
}
} finally {
messages.release();
}
}
}
} catch (IOException e) {
LOG.error("Problem retrieving message in browse() ", e);
}
return l.toArray(new Message[l.size()]);
}
public Message getMessage(String messageId) {
synchronized (messages) {
try {
messages.reset();
while (messages.hasNext()) {
try {
MessageReference r = messages.next();
if (messageId.equals(r.getMessageId().toString())) {
r.incrementReferenceCount();
try {
public Message getMessage(String id) {
MessageId msgId = new MessageId(id);
try {
synchronized (pagedInMessages) {
QueueMessageReference r = this.pagedInMessages.get(msgId);
if (r != null) {
return r.getMessage();
}
}
synchronized (messages) {
try {
messages.reset();
while (messages.hasNext()) {
try {
MessageReference r = messages.next();
messages.rollback(r.getMessageId());
if (msgId.equals(r.getMessageId())) {
Message m = r.getMessage();
if (m != null) {
return m;
}
} finally {
r.decrementReferenceCount();
break;
}
break;
} catch (IOException e) {
LOG.error("got an exception retrieving message "
+ id);
}
} catch (IOException e) {
LOG.error("got an exception retrieving message " + messageId);
}
} finally {
messages.release();
}
} finally {
messages.release();
}
} catch (IOException e) {
LOG.error("got an exception retrieving message " + id);
}
return null;
}
@ -852,7 +864,7 @@ public class Queue extends BaseDestination implements Task {
* @return the number of messages removed
*/
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception {
return moveMatchingMessagesTo(context, selector, dest, -1);
return moveMatchingMessagesTo(context, selector, dest,Integer.MAX_VALUE);
}
/**
@ -867,7 +879,9 @@ public class Queue extends BaseDestination implements Task {
* Moves the messages matching the given filter up to the maximum number of
* matched messages
*/
public int moveMatchingMessagesTo(ConnectionContext context,MessageReferenceFilter filter, ActiveMQDestination dest,int maximumMessages) throws Exception {
public int moveMatchingMessagesTo(ConnectionContext context,
MessageReferenceFilter filter, ActiveMQDestination dest,
int maximumMessages) throws Exception {
int movedCounter = 0;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
do {
@ -875,28 +889,27 @@ public class Queue extends BaseDestination implements Task {
synchronized (pagedInMessages) {
set.addAll(pagedInMessages.values());
}
List <MessageReference>list = new ArrayList<MessageReference>(set);
for (MessageReference ref:list) {
List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
// We should only move messages that can be locked.
r.incrementReferenceCount();
try {
Message m = r.getMessage();
BrokerSupport.resend(context, m, dest);
removeMessage(context, r);
set.remove(r);
if (++movedCounter >= maximumMessages
&& maximumMessages > 0) {
return movedCounter;
}
} finally {
r.decrementReferenceCount();
Message m = r.getMessage();
BrokerSupport.resend(context, m, dest);
removeMessage(context, r);
set.remove(r);
if (++movedCounter >= maximumMessages
&& maximumMessages > 0) {
return movedCounter;
}
} else {
synchronized (messages) {
messages.rollback(r.getMessageId());
}
}
}
} while (set.size() < this.destinationStatistics.getMessages().getCount());
} while (set.size() < this.destinationStatistics.getMessages().getCount()
&& set.size() < maximumMessages);
return movedCounter;
}
@ -936,7 +949,9 @@ public class Queue extends BaseDestination implements Task {
// make sure it gets queued for dispatched again
dispatchLock.lock();
try {
pagedInPendingDispatch.add(node);
synchronized(pagedInPendingDispatch) {
pagedInPendingDispatch.add(node);
}
} finally {
dispatchLock.unlock();
}
@ -993,6 +1008,9 @@ public class Queue extends BaseDestination implements Task {
public boolean evaluate(ConnectionContext context, MessageReference r) {
return messageId.equals(r.getMessageId().toString());
}
public String toString() {
return "MessageIdFilter: "+messageId;
}
};
}
@ -1031,21 +1049,13 @@ public class Queue extends BaseDestination implements Task {
acknowledge(context, sub, ack, reference);
if (!ack.isInTransaction()) {
reference.drop();
destinationStatistics.getMessages().decrement();
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
dropMessage(reference);
wakeup();
} else {
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
reference.drop();
destinationStatistics.getMessages().decrement();
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
dropMessage(reference);
wakeup();
}
@ -1057,6 +1067,17 @@ public class Queue extends BaseDestination implements Task {
}
private void dropMessage(QueueMessageReference reference) {
reference.drop();
destinationStatistics.getMessages().decrement();
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
synchronized(messages) {
messages.rollback(reference.getMessageId());
}
}
public void messageExpired(ConnectionContext context,MessageReference reference) {
messageExpired(context,null,reference);
}
@ -1117,8 +1138,16 @@ public class Queue extends BaseDestination implements Task {
List<QueueMessageReference> result = null;
dispatchLock.lock();
try{
int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
toPageIn = Math.min(toPageIn,getMaxPageSize());
int toPageIn = 0;
if (force) {
toPageIn = getMaxPageSize();
} else {
toPageIn = (getMaxPageSize() + (int) destinationStatistics
.getInflight().getCount())
- pagedInMessages.size();
toPageIn = Math.min(toPageIn, getMaxPageSize());
}
if (isLazyDispatch()&& !force) {
// Only page in the minimum number of messages which can be dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
@ -1129,6 +1158,7 @@ public class Queue extends BaseDestination implements Task {
result = new ArrayList<QueueMessageReference>(toPageIn);
synchronized (messages) {
try {
messages.reset();
while (messages.hasNext() && count < toPageIn) {
MessageReference node = messages.next();
@ -1161,17 +1191,19 @@ public class Queue extends BaseDestination implements Task {
private void doDispatch(List<QueueMessageReference> list) throws Exception {
dispatchLock.lock();
try {
if(!pagedInPendingDispatch.isEmpty()) {
// Try to first dispatch anything that had not been dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
}
// and now see if we can dispatch the new stuff.. and append to the pending
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list));
} else {
pagedInPendingDispatch.addAll(list);
synchronized(pagedInPendingDispatch) {
if(!pagedInPendingDispatch.isEmpty()) {
// Try to first dispatch anything that had not been dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
}
// and now see if we can dispatch the new stuff.. and append to the pending
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list));
} else {
pagedInPendingDispatch.addAll(list);
}
}
}
} finally {

View File

@ -21,6 +21,7 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
@ -34,7 +35,7 @@ import org.apache.activemq.usage.SystemUsage;
*/
public class AbstractPendingMessageCursor implements PendingMessageCursor {
protected int memoryUsageHighWaterMark = 70;
protected int maxBatchSize = 100;
protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
protected SystemUsage systemUsage;
protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1000;
@ -285,7 +286,7 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
return this.audit.isDuplicate(messageId);
}
protected synchronized void rollback(MessageId id) {
public synchronized void rollback(MessageId id) {
if (this.audit != null) {
audit.rollback(id);
}

View File

@ -41,6 +41,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
protected boolean cacheEnabled=false;
protected boolean batchResetNeeded = true;
protected boolean storeHasMessages = false;
protected Iterator<Entry<MessageId, Message>> iterator = null;
protected int size;
protected AbstractStoreCursor(Destination destination) {
@ -93,12 +94,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
public final void reset() {
}
public final void finished() {
}
public final synchronized boolean hasNext() {
if (batchList.isEmpty()) {
try {
fillBatch();
@ -107,17 +102,39 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
throw new RuntimeException(e);
}
}
boolean result= !batchList.isEmpty();
return result;
this.iterator = this.batchList.entrySet().iterator();
}
public void release() {
this.iterator=null;
}
public final void finished() {
}
public final synchronized boolean hasNext() {
if (batchList.isEmpty()) {
try {
fillBatch();
this.iterator = this.batchList.entrySet().iterator();
} catch (Exception e) {
LOG.error("Failed to fill batch", e);
throw new RuntimeException(e);
}
}else {
if (this.iterator==null) {
this.iterator=this.batchList.entrySet().iterator();
}
}
return this.iterator.hasNext();
}
public final synchronized MessageReference next() {
Message result = null;
if (!this.batchList.isEmpty()) {
Iterator<Entry<MessageId, Message>> i = this.batchList.entrySet().iterator();
result = i.next().getValue();
if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
result = this.iterator.next().getValue();
result.decrementReferenceCount();
i.remove();
}
return result;
}
@ -141,6 +158,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
if (size==0 && isStarted() && cacheEnabled) {
cacheEnabled=true;
}
if (iterator!=null) {
iterator.remove();
}
}
public final synchronized void remove(MessageReference node) {

View File

@ -24,6 +24,7 @@ import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
/**
@ -282,5 +283,11 @@ public interface PendingMessageCursor extends Service {
* @return true if a cache is being used
*/
public boolean isUseCache();
/**
* remove from auditing the message id
* @param id
*/
public void rollback(MessageId id);
}

View File

@ -16,12 +16,10 @@
*/
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -163,6 +161,12 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
nonPersistent.reset();
persistent.reset();
}
public void release() {
nonPersistent.release();
persistent.release();
}
public synchronized int size() {
return pendingCount;

View File

@ -59,6 +59,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean producerFlowControl = true;
private boolean optimizedDispatch=false;
private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
private boolean useCache=true;
private long minimumMessageSize=1024;
private boolean useConsumerPriority=true;
@ -119,6 +120,7 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setMaxAuditDepth(getMaxQueueAuditDepth());
destination.setMaxProducersToAudit(getMaxProducersToAudit());
destination.setMaxPageSize(getMaxPageSize());
destination.setMaxBrowsePageSize(getMaxBrowsePageSize());
destination.setUseCache(isUseCache());
destination.setMinimumMessageSize((int) getMinimumMessageSize());
destination.setAdvisoryForConsumed(isAdvisoryForConsumed());
@ -387,7 +389,15 @@ public class PolicyEntry extends DestinationMapEntry {
public void setMaxPageSize(int maxPageSize) {
this.maxPageSize = maxPageSize;
}
}
public int getMaxBrowsePageSize() {
return maxBrowsePageSize;
}
public void setMaxBrowsePageSize(int maxPageSize) {
this.maxBrowsePageSize = maxPageSize;
}
public boolean isUseCache() {
return useCache;

View File

@ -100,9 +100,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
else {
echo("Current queue size: " + initialQueueSize);
}
// TODO uncommenting this line causes a hang!
//int messageCount = initialQueueSize;
int messageCount = 10;
int messageCount = initialQueueSize;
String[] messageIDs = new String[messageCount];
for (int i = 0; i < messageCount; i++) {
CompositeData cdata = compdatalist[i];
@ -124,8 +122,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
compdatalist = queue.browse();
int actualCount = compdatalist.length;
echo("Current queue size: " + actualCount);
// TODO we seem to have browsed the queue and now there are messages missing!
//assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
echo("Now browsing the second queue");
@ -137,7 +134,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
}
public void TODO_testRetryMessages() throws Exception {
public void testRetryMessages() throws Exception {
// lets speed up redelivery
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
@ -186,10 +183,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
else {
echo("Current DLQ queue size: " + dlqQueueSize);
}
// TODO uncommenting this line causes a hang!
//int messageCount = dlqQueueSize;
int messageCount = 10;
int messageCount = dlqQueueSize;
String[] messageIDs = new String[messageCount];
for (int i = 0; i < messageCount; i++) {
CompositeData cdata = compdatalist[i];