cursor fixes

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@619387 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-02-07 12:55:02 +00:00
parent 1a6cbfa651
commit 6d0c552c8a
12 changed files with 99 additions and 56 deletions

View File

@ -165,7 +165,6 @@ public abstract class AbstractRegion implements Region {
// the destination and that they should un-subscribe.. Then wait up
// to timeout time before
// dropping the subscription.
}
LOG.debug("Removing destination: " + destination);

View File

@ -37,9 +37,11 @@ public abstract class BaseDestination implements Destination {
protected final MemoryUsage memoryUsage;
private boolean producerFlowControl = true;
private int maxProducersToAudit=1024;
private int maxAuditDepth=1;
private int maxAuditDepth=2048;
private boolean enableAudit=true;
private int maxPageSize=1000;
private boolean useCache=true;
private int minimumMessageSize=1024;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
/**
@ -160,5 +162,21 @@ public abstract class BaseDestination implements Destination {
public void setMaxPageSize(int maxPageSize) {
this.maxPageSize = maxPageSize;
}
public boolean isUseCache() {
return useCache;
}
public void setUseCache(boolean useCache) {
this.useCache = useCache;
}
public int getMinimumMessageSize() {
return minimumMessageSize;
}
public void setMinimumMessageSize(int minimumMessageSize) {
this.minimumMessageSize = minimumMessageSize;
}
}

View File

@ -88,4 +88,12 @@ public interface Destination extends Service {
int getMaxPageSize();
public void setMaxPageSize(int maxPageSize);
public boolean isUseCache();
public void setUseCache(boolean useCache);
public int getMinimumMessageSize();
public void setMinimumMessageSize(int minimumMessageSize);
}

View File

@ -153,7 +153,6 @@ public class DestinationFilter implements Destination {
public boolean isEnableAudit() {
return next.isEnableAudit();
}
public void setEnableAudit(boolean enableAudit) {
next.setEnableAudit(enableAudit);
@ -179,4 +178,20 @@ public class DestinationFilter implements Destination {
public void setMaxPageSize(int maxPageSize) {
next.setMaxPageSize(maxPageSize);
}
public boolean isUseCache() {
return next.isUseCache();
}
public void setUseCache(boolean useCache) {
next.setUseCache(useCache);
}
public int getMinimumMessageSize() {
return next.getMinimumMessageSize();
}
public void setMinimumMessageSize(int minimumMessageSize) {
next.setMinimumMessageSize(minimumMessageSize);
}
}

View File

@ -111,6 +111,7 @@ public class Queue extends BaseDestination implements Task {
messages.setEnableAudit(isEnableAudit());
messages.setMaxAuditDepth(getMaxAuditDepth());
messages.setMaxProducersToAudit(getMaxProducersToAudit());
messages.setUseCache(isUseCache());
if (messages.isRecoveryRequired()) {
store.recover(new MessageRecoveryListener() {

View File

@ -36,10 +36,10 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
protected int maxBatchSize = 100;
protected SystemUsage systemUsage;
protected int maxProducersToAudit=1024;
protected int maxAuditDepth=1;
protected int maxAuditDepth=1000;
protected boolean enableAudit=true;
protected ActiveMQMessageAudit audit;
protected boolean useCache=true;
protected boolean useCache=false;
private boolean started=false;

View File

@ -16,10 +16,8 @@
*/
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.broker.region.Destination;
@ -38,7 +36,6 @@ import org.apache.commons.logging.LogFactory;
*/
public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
protected static final int MAX_FILL_ATTEMPTS=3;
protected final Destination regionDestination;
protected final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
protected boolean cacheEnabled=false;
@ -52,15 +49,15 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public final synchronized void start() throws Exception{
if (!isStarted()) {
super.start();
clear();
resetBatch();
this.size = getStoreSize();
this.storeHasMessages=this.size > 0;
if (!this.storeHasMessages&&useCache) {
cacheEnabled=true;
}
}
super.start();
clear();
resetBatch();
}
getSystemUsage().getMemoryUsage().addUsageListener(this);
}
@ -181,10 +178,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
resetBatch();
this.batchResetNeeded = false;
}
//we may have to move the store cursor past messages that have
//already been delivered - but we also don't want it to spin
int fillAttempts=0;
while (fillAttempts < MAX_FILL_ATTEMPTS && this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) {
if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) {
this.storeHasMessages = false;
try {
doFillBatch();
@ -195,7 +190,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
if (!this.batchList.isEmpty()) {
this.storeHasMessages=true;
}
fillAttempts++;
}
}

View File

@ -121,7 +121,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
public synchronized boolean hasNext() {
boolean result = pendingCount > 0;
boolean result = true;//pendingCount > 0;
if (result) {
try {
currentCursor = getNextCursor();

View File

@ -51,12 +51,14 @@ public class PolicyEntry extends DestinationMapEntry {
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
private int maxProducersToAudit=32;
private int maxAuditDepth=1024;
private int maxQueueAuditDepth=1;
private int maxAuditDepth=2048;
private int maxQueueAuditDepth=2048;
private boolean enableAudit=true;
private boolean producerFlowControl = true;
private boolean optimizedDispatch=false;
private int maxPageSize=1000;
private boolean useCache=true;
private long minimumMessageSize=1024;
public void configure(Broker broker,Queue queue) {
if (dispatchPolicy != null) {
@ -78,6 +80,8 @@ public class PolicyEntry extends DestinationMapEntry {
queue.setMaxAuditDepth(getMaxQueueAuditDepth());
queue.setMaxProducersToAudit(getMaxProducersToAudit());
queue.setMaxPageSize(getMaxPageSize());
queue.setUseCache(isUseCache());
queue.setMinimumMessageSize((int) getMinimumMessageSize());
}
public void configure(Topic topic) {
@ -99,6 +103,8 @@ public class PolicyEntry extends DestinationMapEntry {
topic.setMaxAuditDepth(getMaxAuditDepth());
topic.setMaxProducersToAudit(getMaxProducersToAudit());
topic.setMaxPageSize(getMaxPageSize());
topic.setUseCache(isUseCache());
topic.setMinimumMessageSize((int) getMinimumMessageSize());
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@ -360,5 +366,24 @@ public class PolicyEntry extends DestinationMapEntry {
public void setMaxPageSize(int maxPageSize) {
this.maxPageSize = maxPageSize;
}
public boolean isUseCache() {
return useCache;
}
public void setUseCache(boolean useCache) {
this.useCache = useCache;
}
public long getMinimumMessageSize() {
return minimumMessageSize;
}
/**
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
public void setMinimumMessageSize(long minimumMessageSize) {
this.minimumMessageSize = minimumMessageSize;
}
}

View File

@ -138,7 +138,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
public int getSize() {
if (size == 0 && content == null && text != null) {
size = AVERAGE_MESSAGE_SIZE_OVERHEAD;
size = getMinimumMessageSize();
if (marshalledProperties != null) {
size += marshalledProperties.getLength();
}

View File

@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.ByteArrayInputStream;
@ -41,7 +42,10 @@ import org.apache.activemq.wireformat.WireFormat;
*/
public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
public static final int AVERAGE_MESSAGE_SIZE_OVERHEAD = 8 * 1024;
/**
* The default minimum amount of memory a message is assumed to use
*/
public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
protected MessageId messageId;
protected ActiveMQDestination originalDestination;
@ -620,8 +624,9 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
}
public int getSize() {
if (size <= AVERAGE_MESSAGE_SIZE_OVERHEAD) {
size = AVERAGE_MESSAGE_SIZE_OVERHEAD;
int minimumMessageSize = getMinimumMessageSize();
if (size < minimumMessageSize) {
size = minimumMessageSize;
if (marshalledProperties != null) {
size += marshalledProperties.getLength();
}
@ -631,6 +636,16 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
}
return size;
}
protected int getMinimumMessageSize() {
int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
//let destination override
Destination dest = regionDestination;
if (dest != null) {
result=dest.getMinimumMessageSize();
}
return result;
}
/**
* @openwire:property version=1

View File

@ -525,44 +525,12 @@ public class AMQMessageStore implements MessageStore {
}
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(
this, listener);
if (referenceStore.supportsExternalBatchControl()) {
lock.lock();
try {
referenceStore.recoverNextMessages(maxReturned,
recoveryListener);
if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
int count = 0;
Iterator<Entry<MessageId, ReferenceData>> iterator = messages
.entrySet().iterator();
while (iterator.hasNext() && count < maxReturned
&& recoveryListener.hasSpace()) {
Entry<MessageId, ReferenceData> entry = iterator.next();
ReferenceData data = entry.getValue();
Message message = getMessage(data);
recoveryListener.recoverMessage(message);
count++;
}
referenceStore.setBatch(recoveryListener
.getLastRecoveredMessageId());
}
}finally {
lock.unlock();
}
} else {
flush();
referenceStore.recoverNextMessages(maxReturned, recoveryListener);
}
/*
RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
referenceStore.recoverNextMessages(maxReturned, recoveryListener);
if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
flush();
referenceStore.recoverNextMessages(maxReturned, recoveryListener);
}
*/
}
Message getMessage(ReferenceData data) throws IOException {