git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@729939 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-12-29 16:48:28 +00:00
parent 8242a3f58c
commit 63e3f41af2
23 changed files with 538 additions and 244 deletions

View File

@ -28,31 +28,29 @@ import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
/**
* @version $Revision: 1.12 $
*/
public abstract class BaseDestination implements Destination {
/**
* The maximum number of messages to page in to the destination
* from persistent storage
* The maximum number of messages to page in to the destination from persistent storage
*/
public static final int MAX_PAGE_SIZE=200;
public static final int MAX_BROWSE_PAGE_SIZE=MAX_PAGE_SIZE*2;
public static final int MAX_PAGE_SIZE = 200;
public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
protected SystemUsage systemUsage;
protected MemoryUsage memoryUsage;
private boolean producerFlowControl = true;
private int maxProducersToAudit=1024;
private int maxAuditDepth=2048;
private boolean enableAudit=true;
private int maxPageSize=MAX_PAGE_SIZE;
private int maxBrowsePageSize=MAX_BROWSE_PAGE_SIZE;
private boolean useCache=true;
private int minimumMessageSize=1024;
private boolean lazyDispatch=false;
private int maxProducersToAudit = 1024;
private int maxAuditDepth = 2048;
private boolean enableAudit = true;
private int maxPageSize = MAX_PAGE_SIZE;
private int maxBrowsePageSize = MAX_BROWSE_PAGE_SIZE;
private boolean useCache = true;
private int minimumMessageSize = 1024;
private boolean lazyDispatch = false;
private boolean advisoryForSlowConsumers;
private boolean advisdoryForFastProducers;
private boolean advisoryForDiscardingMessages;
@ -63,31 +61,32 @@ public abstract class BaseDestination implements Destination {
protected final BrokerService brokerService;
protected final Broker regionBroker;
protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
/**
* @param broker
* @param store
* @param broker
* @param store
* @param destination
* @param parentStats
* @throws Exception
* @throws Exception
*/
public BaseDestination(BrokerService brokerService,MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception {
public BaseDestination(BrokerService brokerService, MessageStore store, ActiveMQDestination destination,
DestinationStatistics parentStats) throws Exception {
this.brokerService = brokerService;
this.broker=brokerService.getBroker();
this.store=store;
this.destination=destination;
this.broker = brokerService.getBroker();
this.store = store;
this.destination = destination;
// let's copy the enabled property from the parent DestinationStatistics
this.destinationStatistics.setEnabled(parentStats.isEnabled());
this.destinationStatistics.setParent(parentStats);
this.destinationStatistics.setParent(parentStats);
this.systemUsage = brokerService.getProducerSystemUsage();
this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
this.memoryUsage.setUsagePortion(1.0f);
this.regionBroker = brokerService.getRegionBroker();
}
/**
* initialize the destination
*
* @throws Exception
*/
public void initialize() throws Exception {
@ -95,66 +94,77 @@ public abstract class BaseDestination implements Destination {
// flush messages to disk when usage gets high.
if (store != null) {
store.setMemoryUsage(this.memoryUsage);
}
}
}
/**
* @return the producerFlowControl
*/
public boolean isProducerFlowControl() {
return producerFlowControl;
}
/**
* @param producerFlowControl the producerFlowControl to set
* @param producerFlowControl
* the producerFlowControl to set
*/
public void setProducerFlowControl(boolean producerFlowControl) {
this.producerFlowControl = producerFlowControl;
}
/**
* @return the maxProducersToAudit
*/
public int getMaxProducersToAudit() {
return maxProducersToAudit;
}
/**
* @param maxProducersToAudit the maxProducersToAudit to set
* @param maxProducersToAudit
* the maxProducersToAudit to set
*/
public void setMaxProducersToAudit(int maxProducersToAudit) {
this.maxProducersToAudit = maxProducersToAudit;
}
/**
* @return the maxAuditDepth
*/
public int getMaxAuditDepth() {
return maxAuditDepth;
}
/**
* @param maxAuditDepth the maxAuditDepth to set
* @param maxAuditDepth
* the maxAuditDepth to set
*/
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
}
/**
* @return the enableAudit
*/
public boolean isEnableAudit() {
return enableAudit;
}
/**
* @param enableAudit the enableAudit to set
* @param enableAudit
* the enableAudit to set
*/
public void setEnableAudit(boolean enableAudit) {
this.enableAudit = enableAudit;
}
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationStatistics.getProducers().increment();
}
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationStatistics.getProducers().decrement();
}
public final MemoryUsage getMemoryUsage() {
return memoryUsage;
}
@ -167,20 +177,19 @@ public abstract class BaseDestination implements Destination {
return destination;
}
public final String getName() {
return getActiveMQDestination().getPhysicalName();
}
public final MessageStore getMessageStore() {
return store;
}
public final boolean isActive() {
return destinationStatistics.getConsumers().getCount() != 0 ||
destinationStatistics.getProducers().getCount() != 0;
return destinationStatistics.getConsumers().getCount() != 0
|| destinationStatistics.getProducers().getCount() != 0;
}
public int getMaxPageSize() {
return maxPageSize;
}
@ -188,14 +197,14 @@ public abstract class BaseDestination implements Destination {
public void setMaxPageSize(int maxPageSize) {
this.maxPageSize = maxPageSize;
}
public int getMaxBrowsePageSize() {
return this.maxBrowsePageSize;
}
public void setMaxBrowsePageSize(int maxPageSize) {
this.maxBrowsePageSize = maxPageSize;
}
}
public boolean isUseCache() {
return useCache;
@ -219,8 +228,8 @@ public abstract class BaseDestination implements Destination {
public void setLazyDispatch(boolean lazyDispatch) {
this.lazyDispatch = lazyDispatch;
}
}
protected long getDestinationSequenceId() {
return regionBroker.getBrokerSequenceId();
}
@ -233,7 +242,8 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisoryForSlowConsumers the advisoryForSlowConsumers to set
* @param advisoryForSlowConsumers
* the advisoryForSlowConsumers to set
*/
public void setAdvisoryForSlowConsumers(boolean advisoryForSlowConsumers) {
this.advisoryForSlowConsumers = advisoryForSlowConsumers;
@ -247,10 +257,10 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisoryForDiscardingMessages the advisoryForDiscardingMessages to set
* @param advisoryForDiscardingMessages
* the advisoryForDiscardingMessages to set
*/
public void setAdvisoryForDiscardingMessages(
boolean advisoryForDiscardingMessages) {
public void setAdvisoryForDiscardingMessages(boolean advisoryForDiscardingMessages) {
this.advisoryForDiscardingMessages = advisoryForDiscardingMessages;
}
@ -262,7 +272,8 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisoryWhenFull the advisoryWhenFull to set
* @param advisoryWhenFull
* the advisoryWhenFull to set
*/
public void setAdvisoryWhenFull(boolean advisoryWhenFull) {
this.advisoryWhenFull = advisoryWhenFull;
@ -276,7 +287,8 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisoryForDelivery the advisoryForDelivery to set
* @param advisoryForDelivery
* the advisoryForDelivery to set
*/
public void setAdvisoryForDelivery(boolean advisoryForDelivery) {
this.advisoryForDelivery = advisoryForDelivery;
@ -290,7 +302,8 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisoryForConsumed the advisoryForConsumed to set
* @param advisoryForConsumed
* the advisoryForConsumed to set
*/
public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
this.advisoryForConsumed = advisoryForConsumed;
@ -304,12 +317,13 @@ public abstract class BaseDestination implements Destination {
}
/**
* @param advisdoryForFastProducers the advisdoryForFastProducers to set
* @param advisdoryForFastProducers
* the advisdoryForFastProducers to set
*/
public void setAdvisdoryForFastProducers(boolean advisdoryForFastProducers) {
this.advisdoryForFastProducers = advisdoryForFastProducers;
}
/**
* @return the dead letter strategy
*/
@ -319,13 +333,16 @@ public abstract class BaseDestination implements Destination {
/**
* set the dead letter strategy
*
* @param deadLetterStrategy
*/
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
/**
* called when message is consumed
*
* @param context
* @param messageReference
*/
@ -334,21 +351,23 @@ public abstract class BaseDestination implements Destination {
broker.messageConsumed(context, messageReference);
}
}
/**
* Called when message is delivered to the broker
*
* @param context
* @param messageReference
*/
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
if(advisoryForDelivery) {
if (advisoryForDelivery) {
broker.messageDelivered(context, messageReference);
}
}
/**
* Called when a message is discarded - e.g. running low on memory
* This will happen only if the policy is enabled - e.g. non durable topics
* Called when a message is discarded - e.g. running low on memory This will happen only if the policy is enabled -
* e.g. non durable topics
*
* @param context
* @param messageReference
*/
@ -357,43 +376,48 @@ public abstract class BaseDestination implements Destination {
broker.messageDiscarded(context, messageReference);
}
}
/**
* Called when there is a slow consumer
*
* @param context
* @param subs
*/
public void slowConsumer(ConnectionContext context, Subscription subs) {
if(advisoryForSlowConsumers) {
if (advisoryForSlowConsumers) {
broker.slowConsumer(context, this, subs);
}
}
/**
* Called to notify a producer is too fast
*
* @param context
* @param producerInfo
*/
public void fastProducer(ConnectionContext context,ProducerInfo producerInfo) {
if(advisdoryForFastProducers) {
public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
if (advisdoryForFastProducers) {
broker.fastProducer(context, producerInfo);
}
}
/**
* Called when a Usage reaches a limit
*
* @param context
* @param usage
*/
public void isFull(ConnectionContext context,Usage usage) {
if(advisoryWhenFull) {
broker.isFull(context,this, usage);
public void isFull(ConnectionContext context, Usage usage) {
if (advisoryWhenFull) {
broker.isFull(context, this, usage);
}
}
public void dispose(ConnectionContext context) throws IOException {
destinationStatistics.setParent(null);
if (this.store != null) {
this.store.dispose(context);
}
this.destinationStatistics.setParent(null);
this.memoryUsage.stop();
}
}

View File

@ -475,13 +475,6 @@ public class Queue extends BaseDestination implements Task {
}
}
public void dispose(ConnectionContext context) throws IOException {
super.dispose(context);
if (store != null) {
store.removeAllMessages(context);
}
}
public void gc(){
}
@ -543,6 +536,9 @@ public class Queue extends BaseDestination implements Task {
if (memoryUsage != null) {
memoryUsage.stop();
}
if (store!=null) {
store.stop();
}
}
// Properties

View File

@ -449,13 +449,7 @@ public class Topic extends BaseDestination implements Task{
messageConsumed(context, node);
}
public void dispose(ConnectionContext context) throws IOException {
super.dispose(context);
if (topicStore != null) {
topicStore.removeAllMessages(context);
}
}
public void gc() {
}
@ -479,6 +473,9 @@ public class Topic extends BaseDestination implements Task{
if (memoryUsage != null) {
memoryUsage.stop();
}
if(this.topicStore != null) {
this.topicStore.stop();
}
}
public Message[] browse() {

View File

@ -283,4 +283,9 @@ public interface MapContainer<K, V> extends Map<K, V> {
* @return the Index MBean
*/
IndexMBean getIndexMBean();
/**
* Clean up all state associated with this container.
*/
void delete();
}

View File

@ -142,6 +142,16 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
}
}
public synchronized void delete() {
unload();
try {
index.delete();
} catch (IOException e) {
LOG.warn("Failed to unload the index", e);
}
}
public synchronized void setKeyMarshaller(Marshaller keyMarshaller) {
checkClosed();
this.keyMarshaller = keyMarshaller;
@ -578,7 +588,6 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
public IndexMBean getIndexMBean() {
return (IndexMBean) index;
}
public int getIndexMaxBinSize() {
return indexMaxBinSize;
}

View File

@ -96,4 +96,11 @@ public interface Index {
* @return
*/
int getSize();
/**
* delete all state associated with the index
*
* @throws IOException
*/
void delete() throws IOException;
}

View File

@ -28,7 +28,7 @@ import org.apache.commons.logging.LogFactory;
/**
* Index implementation using a HashMap
*
*
* @version $Revision: 1.2 $
*/
public class VMIndex implements Index, IndexMBean {
@ -41,7 +41,7 @@ public class VMIndex implements Index, IndexMBean {
}
/**
*
*
* @see org.apache.activemq.kaha.impl.index.Index#clear()
*/
public void clear() {
@ -122,9 +122,13 @@ public class VMIndex implements Index, IndexMBean {
map.clear();
}
public void delete() throws IOException {
unload();
}
public void setKeyMarshaller(Marshaller marshaller) {
}
public int getSize() {
return map.size();
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.usage.MemoryUsage;
abstract public class AbstractMessageStore implements MessageStore {
protected final ActiveMQDestination destination;
public AbstractMessageStore(ActiveMQDestination destination) {
this.destination = destination;
}
public void dispose(ConnectionContext context) {
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
public ActiveMQDestination getDestination() {
return destination;
}
public void setMemoryUsage(MemoryUsage memoryUsage) {
}
}

View File

@ -109,4 +109,5 @@ public interface MessageStore extends Service {
void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;
void dispose(ConnectionContext context);
}

View File

@ -67,6 +67,10 @@ public class ProxyMessageStore implements MessageStore {
delegate.stop();
}
public void dispose(ConnectionContext context) {
delegate.dispose(context);
}
public ActiveMQDestination getDestination() {
return delegate.getDestination();
}

View File

@ -126,6 +126,10 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
}
public void dispose(ConnectionContext context) {
delegate.dispose(context);
}
public void resetBatching() {
delegate.resetBatching();

View File

@ -30,7 +30,6 @@ import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.DataStructure;
@ -40,8 +39,8 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
@ -59,21 +58,17 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision: 1.14 $
*/
public class AMQMessageStore implements MessageStore {
public class AMQMessageStore extends AbstractMessageStore {
private static final Log LOG = LogFactory.getLog(AMQMessageStore.class);
protected final AMQPersistenceAdapter peristenceAdapter;
protected final AMQTransactionStore transactionStore;
protected final ReferenceStore referenceStore;
protected final ActiveMQDestination destination;
protected final TransactionTemplate transactionTemplate;
protected Location lastLocation;
protected Location lastWrittenLocation;
protected Set<Location> inFlightTxLocations = new HashSet<Location>();
protected final TaskRunner asyncWriteTask;
protected CountDownLatch flushLatch;
private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
/** A MessageStore that we can use to retrieve messages quickly. */
@ -82,15 +77,15 @@ public class AMQMessageStore implements MessageStore {
private final AtomicReference<Location> mark = new AtomicReference<Location>();
protected final Lock lock;
public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore, ActiveMQDestination destination) {
public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
super(destination);
this.peristenceAdapter = adapter;
this.lock=referenceStore.getStoreLock();
this.lock = referenceStore.getStoreLock();
this.transactionStore = adapter.getTransactionStore();
this.referenceStore = referenceStore;
this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(
new NonCachedMessageEvaluationContext()));
asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() {
public boolean iterate() {
asyncWrite();
return false;
@ -103,8 +98,8 @@ public class AMQMessageStore implements MessageStore {
}
/**
* Not synchronize since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing.
* Not synchronize since the Journal has better throughput if you increase the number of concurrent writes that it
* is doing.
*/
public final void addMessage(ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId();
@ -122,12 +117,11 @@ public class AMQMessageStore implements MessageStore {
lock.lock();
try {
inFlightTxLocations.add(location);
}finally {
} finally {
lock.unlock();
}
transactionStore.addMessage(this, message, location);
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
if (debug) {
LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
@ -135,7 +129,7 @@ public class AMQMessageStore implements MessageStore {
lock.lock();
try {
inFlightTxLocations.remove(location);
}finally {
} finally {
lock.unlock();
}
addMessage(message, location);
@ -148,7 +142,7 @@ public class AMQMessageStore implements MessageStore {
lock.lock();
try {
inFlightTxLocations.remove(location);
}finally {
} finally {
lock.unlock();
}
}
@ -161,15 +155,14 @@ public class AMQMessageStore implements MessageStore {
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
lock.lock();
try {
lock.lock();
try {
lastLocation = location;
messages.put(message.getMessageId(), data);
}finally {
} finally {
lock.unlock();
}
if (messages.size() > this.peristenceAdapter
.getMaxCheckpointMessageAddSize()) {
if (messages.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
flush();
} else {
try {
@ -194,7 +187,8 @@ public class AMQMessageStore implements MessageStore {
return true;
}
} catch (Throwable e) {
LOG.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " + e, e);
LOG.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: "
+ e, e);
}
return false;
}
@ -210,7 +204,7 @@ public class AMQMessageStore implements MessageStore {
if (debug) {
LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
}
removeMessage(ack,location);
removeMessage(ack, location);
} else {
if (debug) {
LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
@ -218,33 +212,34 @@ public class AMQMessageStore implements MessageStore {
lock.lock();
try {
inFlightTxLocations.add(location);
}finally {
} finally {
lock.unlock();
}
transactionStore.removeMessage(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
if (debug) {
LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: "
+ location);
}
lock.lock();
try {
inFlightTxLocations.remove(location);
}finally {
} finally {
lock.unlock();
}
removeMessage(ack,location);
removeMessage(ack, location);
}
public void afterRollback() throws Exception {
if (debug) {
LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: "
+ location);
}
lock.lock();
try {
inFlightTxLocations.remove(location);
}finally {
} finally {
lock.unlock();
}
}
@ -255,7 +250,7 @@ public class AMQMessageStore implements MessageStore {
final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
ReferenceData data;
lock.lock();
try{
try {
lastLocation = location;
MessageId id = ack.getLastMessageId();
data = messages.remove(id);
@ -265,13 +260,12 @@ public class AMQMessageStore implements MessageStore {
// message never got written so datafileReference will still exist
AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId());
}
}finally {
} finally {
lock.unlock();
}
if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
flush();
}
else if (data == null) {
} else if (data == null) {
try {
asyncWriteTask.wakeup();
} catch (InterruptedException e) {
@ -279,7 +273,7 @@ public class AMQMessageStore implements MessageStore {
}
}
}
public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
try {
// Only remove the message if it has not already been removed.
@ -289,7 +283,8 @@ public class AMQMessageStore implements MessageStore {
return true;
}
} catch (Throwable e) {
LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
+ "'. Message may have already been acknowledged. reason: " + e);
}
return false;
}
@ -313,7 +308,7 @@ public class AMQMessageStore implements MessageStore {
flushLatch = new CountDownLatch(1);
}
countDown = flushLatch;
}finally {
} finally {
lock.unlock();
}
try {
@ -338,7 +333,7 @@ public class AMQMessageStore implements MessageStore {
try {
countDown = flushLatch;
flushLatch = null;
}finally {
} finally {
lock.unlock();
}
mark.set(doAsyncWrite());
@ -368,14 +363,14 @@ public class AMQMessageStore implements MessageStore {
this.messages = new LinkedHashMap<MessageId, ReferenceData>();
this.messageAcks = new ArrayList<MessageAck>();
lastLocation = this.lastLocation;
}finally {
} finally {
lock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " + cpRemovedMessageLocations.size() + " ");
LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: "
+ cpRemovedMessageLocations.size() + " ");
}
transactionTemplate.run(new Callback() {
public void execute() throws Exception {
int size = 0;
PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
@ -386,7 +381,8 @@ public class AMQMessageStore implements MessageStore {
Entry<MessageId, ReferenceData> entry = iterator.next();
try {
referenceStore.addMessageReference(context, entry.getKey(), entry.getValue());
AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this,entry.getValue().getFileId());
AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, entry
.getValue().getFileId());
} catch (Throwable e) {
LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
@ -415,7 +411,7 @@ public class AMQMessageStore implements MessageStore {
try {
cpAddedMessageIds = null;
lastWrittenLocation = lastLocation;
}finally {
} finally {
lock.unlock();
}
if (cpActiveJournalLocations.size() > 0) {
@ -436,14 +432,13 @@ public class AMQMessageStore implements MessageStore {
try {
return (Message) rc;
} catch (ClassCastException e) {
throw new IOException("Could not read message " + identity
+ " at location " + location
throw new IOException("Could not read message " + identity + " at location " + location
+ ", expected a message, but got: " + rc);
}
}
return null;
}
protected Location getLocation(MessageId messageId) throws IOException {
ReferenceData data = null;
lock.lock();
@ -453,7 +448,7 @@ public class AMQMessageStore implements MessageStore {
if (data == null && cpAddedMessageIds != null) {
data = cpAddedMessageIds.get(messageId);
}
}finally {
} finally {
lock.unlock();
}
if (data == null) {
@ -469,16 +464,15 @@ public class AMQMessageStore implements MessageStore {
}
/**
* Replays the referenceStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is
* updated.
* Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
* transaction log and then the cache is updated.
*
* @param listener
* @throws Exception
*/
public void recover(final MessageRecoveryListener listener) throws Exception {
flush();
referenceStore.recover(new RecoveryListenerAdapter(this, listener));
referenceStore.recover(new RecoveryListenerAdapter(this, listener));
}
public void start() throws Exception {
@ -506,11 +500,8 @@ public class AMQMessageStore implements MessageStore {
referenceStore.removeAllMessages(context);
}
public ActiveMQDestination getDestination() {
return destination;
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime,
String messageRef) throws IOException {
throw new IOException("The journal does not support message references.");
}
@ -543,9 +534,10 @@ public class AMQMessageStore implements MessageStore {
location.setOffset(data.getOffset());
DataStructure rc = peristenceAdapter.readCommand(location);
try {
return (Message)rc;
return (Message) rc;
} catch (ClassCastException e) {
throw new IOException("Could not read message at location " + location + ", expected a message, but got: " + rc);
throw new IOException("Could not read message at location " + location + ", expected a message, but got: "
+ rc);
}
}
@ -556,4 +548,14 @@ public class AMQMessageStore implements MessageStore {
public Location getMark() {
return mark.get();
}
public void dispose(ConnectionContext context) {
try {
flush();
} catch (InterruptedIOException e) {
Thread.currentThread().interrupt();
}
referenceStore.dispose(context);
super.dispose(context);
}
}

View File

@ -27,8 +27,8 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.IOExceptionSupport;
@ -37,19 +37,18 @@ import org.apache.activemq.wireformat.WireFormat;
/**
* @version $Revision: 1.10 $
*/
public class JDBCMessageStore implements MessageStore {
public class JDBCMessageStore extends AbstractMessageStore {
protected final WireFormat wireFormat;
protected final ActiveMQDestination destination;
protected final JDBCAdapter adapter;
protected final JDBCPersistenceAdapter persistenceAdapter;
protected AtomicLong lastMessageId = new AtomicLong(-1);
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination) {
super(destination);
this.persistenceAdapter = persistenceAdapter;
this.adapter = adapter;
this.wireFormat = wireFormat;
this.destination = destination;
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
@ -169,12 +168,6 @@ public class JDBCMessageStore implements MessageStore {
}
}
public void start() {
}
public void stop() {
}
/**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
@ -191,15 +184,6 @@ public class JDBCMessageStore implements MessageStore {
}
}
public ActiveMQDestination getDestination() {
return destination;
}
public void setMemoryUsage(MemoryUsage memoryUsage) {
//can ignore as messages aren't buffered
}
public int getMessageCount() throws IOException {
int result = 0;
TransactionContext c = persistenceAdapter.getTransactionContext();

View File

@ -37,6 +37,7 @@ import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@ -50,14 +51,13 @@ import org.apache.commons.logging.LogFactory;
*
* @version $Revision: 1.14 $
*/
public class JournalMessageStore implements MessageStore {
public class JournalMessageStore extends AbstractMessageStore {
private static final Log LOG = LogFactory.getLog(JournalMessageStore.class);
protected final JournalPersistenceAdapter peristenceAdapter;
protected final JournalTransactionStore transactionStore;
protected final MessageStore longTermStore;
protected final ActiveMQDestination destination;
protected final TransactionTemplate transactionTemplate;
protected RecordLocation lastLocation;
protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>();
@ -72,10 +72,10 @@ public class JournalMessageStore implements MessageStore {
private MemoryUsage memoryUsage;
public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
super(destination);
this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore();
this.longTermStore = checkpointStore;
this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
}
@ -382,10 +382,6 @@ public class JournalMessageStore implements MessageStore {
longTermStore.removeAllMessages(context);
}
public ActiveMQDestination getDestination() {
return destination;
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
throw new IOException("The journal does not support message references.");
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@ -35,16 +36,15 @@ import org.apache.activemq.usage.SystemUsage;
*
* @version $Revision: 1.7 $
*/
public class KahaMessageStore implements MessageStore {
public class KahaMessageStore extends AbstractMessageStore {
protected final ActiveMQDestination destination;
protected final MapContainer<MessageId, Message> messageContainer;
protected StoreEntry batchEntry;
public KahaMessageStore(MapContainer<MessageId, Message> container, ActiveMQDestination destination)
throws IOException {
super(destination);
this.messageContainer = container;
this.destination = destination;
}
protected MessageId getMessageId(Object object) {
@ -101,27 +101,14 @@ public class KahaMessageStore implements MessageStore {
}
}
public void start() {
}
public void stop() {
}
public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
messageContainer.clear();
}
public ActiveMQDestination getDestination() {
return destination;
}
public synchronized void delete() {
messageContainer.clear();
}
public void setMemoryUsage(MemoryUsage memoryUsage) {
}
/**
* @return the number of messages held by this destination
* @see org.apache.activemq.store.MessageStore#getMessageCount()

View File

@ -31,15 +31,15 @@ import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.usage.MemoryUsage;
/**
* @author rajdavies
*
*/
public class KahaReferenceStore implements ReferenceStore {
public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
protected final ActiveMQDestination destination;
protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
protected KahaReferenceStoreAdapter adapter;
private StoreEntry batchEntry;
@ -48,19 +48,19 @@ public class KahaReferenceStore implements ReferenceStore {
public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container,
ActiveMQDestination destination) throws IOException {
super(destination);
this.adapter = adapter;
this.messageContainer = container;
this.destination = destination;
}
public Lock getStoreLock() {
return lock;
}
public void start() {
}
public void stop() {
public void dispose(ConnectionContext context) {
super.dispose(context);
this.messageContainer.delete();
this.adapter.removeReferenceStore(this);
}
protected MessageId getMessageId(Object object) {
@ -204,10 +204,6 @@ public class KahaReferenceStore implements ReferenceStore {
}
}
public ActiveMQDestination getDestination() {
return destination;
}
public void delete() {
lock.lock();
try {
@ -231,9 +227,6 @@ public class KahaReferenceStore implements ReferenceStore {
return messageContainer.size();
}
public void setMemoryUsage(MemoryUsage memoryUsage) {
}
public boolean isSupportForCursors() {
return true;
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
@ -179,6 +180,16 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
}
return rc;
}
public void removeReferenceStore(KahaReferenceStore store) {
ActiveMQDestination destination = store.getDestination();
if (destination.isQueue()) {
queues.remove(destination);
} else {
topics.remove(destination);
}
messageStores.remove(destination);
}
/*
public void buildReferenceFileIdsInUse() throws IOException {
recordReferences = new HashMap<Integer, AtomicInteger>();
@ -239,7 +250,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
}
/**
*
*
* @throws IOException
* @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
*/
@ -250,13 +261,13 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
}
/**
*
*
* @throws IOException
* @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
*/
public void recoverState() throws IOException {
Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers);
Set<SubscriptionInfo> set = new HashSet<SubscriptionInfo>(this.durableSubscribers);
for (SubscriptionInfo info:set) {
LOG.info("Recovering subscriber state for durable subscriber: " + info);
TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
@ -312,7 +323,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
StoreFactory.delete(stateDirectory);
}
}
public boolean isPersistentIndex() {
return persistentIndex;
}
@ -363,7 +374,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
public void setIndexPageSize(int indexPageSize) {
this.indexPageSize = indexPageSize;
}
public int getIndexMaxBinSize() {
return indexMaxBinSize;
}
@ -371,7 +382,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
public void setIndexMaxBinSize(int maxBinSize) {
this.indexMaxBinSize = maxBinSize;
}
/**
* @return the loadFactor
*/

View File

@ -39,7 +39,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
protected ListContainer<TopicSubAck> ackContainer;
protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
private Map<String, SubscriptionInfo> subscriberContainer;
private MapContainer<String, SubscriptionInfo> subscriberContainer;
private Store store;
private static final String TOPIC_SUB_NAME = "tsn";
@ -58,6 +58,11 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
}
}
public void dispose(ConnectionContext context) {
super.dispose(context);
subscriberContainer.delete();
}
protected MessageId getMessageId(Object object) {
return new MessageId(((ReferenceRecord)object).getMessageId());
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@ -39,9 +40,8 @@ import org.apache.activemq.usage.SystemUsage;
*
* @version $Revision: 1.7 $
*/
public class MemoryMessageStore implements MessageStore {
public class MemoryMessageStore extends AbstractMessageStore {
protected final ActiveMQDestination destination;
protected final Map<MessageId, Message> messageTable;
protected MessageId lastBatchId;
@ -50,7 +50,7 @@ public class MemoryMessageStore implements MessageStore {
}
public MemoryMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable) {
this.destination = destination;
super(destination);
this.messageTable = Collections.synchronizedMap(messageTable);
}
@ -108,22 +108,12 @@ public class MemoryMessageStore implements MessageStore {
}
}
public void start() {
}
public void stop() {
}
public void removeAllMessages(ConnectionContext context) throws IOException {
synchronized (messageTable) {
messageTable.clear();
}
}
public ActiveMQDestination getDestination() {
return destination;
}
public void delete() {
synchronized (messageTable) {
messageTable.clear();
@ -160,13 +150,4 @@ public class MemoryMessageStore implements MessageStore {
public void resetBatching() {
lastBatchId = null;
}
/**
* @param memoeyUSage
* @see org.apache.activemq.store.MessageStore#setMemoryUsage(org.apache.activemq.usage.MemoryUsage)
*/
public void setMemoryUsage(MemoryUsage memoeyUSage){
// TODO Auto-generated method stub
}
}

View File

@ -35,9 +35,9 @@ public class SimpleTopicTest extends TestCase {
private static final Log LOG = LogFactory.getLog(SimpleTopicTest.class);
protected BrokerService broker;
protected String clientURI="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=50000";
protected String clientURI="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false&wireFormat.maxInactivityDuration=0";
//protected String clientURI="tcp://localhost:61616";
protected String bindAddress="tcp://localhost:61616";
protected String bindAddress="tcp://localhost:61616?wireFormat.maxInactivityDuration=0";
//protected String bindAddress = "tcp://localhost:61616";
//protected String bindAddress="vm://localhost?marshal=true";
//protected String bindAddress="vm://localhost";
@ -51,7 +51,7 @@ public class SimpleTopicTest extends TestCase {
protected int numberofProducers = 1;
protected int totalNumberOfProducers;
protected int totalNumberOfConsumers;
protected int playloadSize = 1024;
protected int playloadSize = 12;
protected byte[] array;
protected ConnectionFactory factory;

View File

@ -42,28 +42,32 @@ public class SlowConsumerTopicTest extends SimpleTopicTest {
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
return new SlowConsumer(fac, dest);
PerfConsumer result = new SlowConsumer(fac, dest);
return result;
}
protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, int number, byte[] payload) throws JMSException {
PerfProducer result = super.createProducer(fac, dest, number, payload);
result.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
result.setSleep(10);
return result;
}
protected BrokerService createBroker() throws Exception {
protected BrokerService createBroker(String url) throws Exception {
Resource resource = new ClassPathResource("org/apache/activemq/perf/slowConsumerBroker.xml");
System.err.println("CREATE BROKER FROM " + resource);
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
BrokerService broker = factory.getBroker();
broker.start();
return broker;
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
ActiveMQConnectionFactory result = super.createConnectionFactory(bindAddress);
protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception {
ActiveMQConnectionFactory result = super.createConnectionFactory(uri);
ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
policy.setTopicPrefetch(1000);
policy.setTopicPrefetch(10);
result.setPrefetchPolicy(policy);
return result;
}

View File

@ -0,0 +1,235 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @author Rajani Chennamaneni
*
*/
public class DispatchMultipleConsumersTest extends TestCase {
private final static Log logger = LogFactory.getLog(DispatchMultipleConsumersTest.class);
BrokerService broker;
Destination dest;
String destinationName = "TEST.Q";
String msgStr = "Test text message";
int messagesPerThread = 20;
int producerThreads = 50;
int consumerCount = 2;
AtomicInteger sentCount;
AtomicInteger consumedCount;
CountDownLatch producerLatch;
CountDownLatch consumerLatch;
String brokerURL = "tcp://localhost:61616";
String userName = "";
String password = "";
@Override
protected void setUp() throws Exception {
super.setUp();
broker = new BrokerService();
broker.setPersistent(true);
broker.setUseJmx(true);
broker.deleteAllMessages();
broker.addConnector("tcp://localhost:61616");
broker.start();
dest = new ActiveMQQueue(destinationName);
resetCounters();
}
@Override
protected void tearDown() throws Exception {
// broker.stop();
super.tearDown();
}
private void resetCounters() {
sentCount = new AtomicInteger(0);
consumedCount = new AtomicInteger(0);
producerLatch = new CountDownLatch(producerThreads);
consumerLatch = new CountDownLatch(consumerCount);
}
public void testDispatch1() {
for (int i = 1; i <= 5; i++) {
resetCounters();
dispatch();
/*try {
System.out.print("Press Enter to continue/finish:");
//pause to check the counts on JConsole
System.in.read();
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}*/
//check for consumed messages count
assertEquals("Incorrect messages in Iteration " + i, sentCount.get(), consumedCount.get());
}
}
private void dispatch() {
startConsumers();
startProducers();
try {
producerLatch.await();
consumerLatch.await();
} catch (InterruptedException e) {
fail("test interrupted!");
}
}
private void startConsumers() {
ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(userName, password, brokerURL);
Connection conn;
try {
conn = connFactory.createConnection();
conn.start();
for (int i = 0; i < consumerCount; i++) {
new ConsumerThread(conn, "ConsumerThread"+i);
}
} catch (JMSException e) {
logger.error("Failed to start consumers", e);
}
}
private void startProducers() {
ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(userName, password, brokerURL);
for (int i = 0; i < producerThreads; i++) {
new ProducerThread(connFactory, messagesPerThread, "ProducerThread"+i);
}
}
private class ConsumerThread extends Thread {
Connection conn;
Session session;
MessageConsumer consumer;
public ConsumerThread(Connection conn, String name) {
super();
this.conn = conn;
this.setName(name);
logger.info("Created new consumer thread:" + name);
try {
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(dest);
start();
} catch (JMSException e) {
logger.error("Failed to start consumer thread:" + name, e);
}
}
@Override
public void run() {
int msgCount = 0;
int nullCount = 0;
while (true) {
try {
Message msg = consumer.receive(1000);
if (msg == null) {
if (producerLatch.getCount() > 0) {
continue;
}
nullCount++;
if (nullCount > 10) {
//assume that we are not getting any more messages
break;
} else {
continue;
}
} else {
nullCount = 0;
}
Thread.sleep(100);
logger.info("Message received:" + msg.getJMSMessageID());
msgCount++;
} catch (JMSException e) {
logger.error("Failed to consume:", e);
} catch (InterruptedException e) {
logger.error("Interrupted!", e);
}
}
try {
consumer.close();
} catch (JMSException e) {
logger.error("Failed to close consumer " + getName(), e);
}
consumedCount.addAndGet(msgCount);
consumerLatch.countDown();
logger.info("Consumed " + msgCount + " messages using thread " + getName());
}
}
private class ProducerThread extends Thread {
int count;
Connection conn;
Session session;
MessageProducer producer;
public ProducerThread(ActiveMQConnectionFactory connFactory, int count, String name) {
super();
this.count = count;
this.setName(name);
logger.info("Created new producer thread:" + name);
try {
conn = connFactory.createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(dest);
start();
} catch (JMSException e) {
logger.error("Failed to start producer thread:" + name, e);
}
}
@Override
public void run() {
int i = 0;
try {
for (; i < count; i++) {
producer.send(session.createTextMessage(msgStr));
Thread.sleep(500);
}
conn.close();
} catch (JMSException e) {
logger.error(e.getMessage(), e);
} catch (InterruptedException e) {
logger.error("Interrupted!", e);
}
sentCount.addAndGet(i);
producerLatch.countDown();
logger.info("Sent " + i + " messages from thread " + getName());
}
}
}

View File

@ -25,10 +25,10 @@
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="blob">
<policyEntry topic=">" producerFlowControl="false">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>
<constantPendingMessageLimitStrategy limit="0"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>