mirror of https://github.com/apache/activemq.git
Changes to address memory usage for large transactions for:
https://issues.apache.org/activemq/browse/AMQ-1490 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@600891 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2913fb8ada
commit
f472000467
|
@ -405,6 +405,11 @@
|
|||
<!-- A test used for memory profiling only. -->
|
||||
<exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
|
||||
|
||||
<exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
|
||||
|
||||
<exclude>**/amq1490/*</exclude>
|
||||
<exclude>**/archive/*</exclude>
|
||||
|
||||
<exclude>**/AMQDeadlockTest3.*</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
|
|
|
@ -113,7 +113,7 @@ public class BrokerService implements Service {
|
|||
private TaskRunnerFactory persistenceTaskRunnerFactory;
|
||||
private SystemUsage systemUsage;
|
||||
private SystemUsage producerSystemUsage;
|
||||
private SystemUsage storeSystemUsage;
|
||||
private SystemUsage consumerSystemUsaage;
|
||||
private PersistenceAdapter persistenceAdapter;
|
||||
private PersistenceAdapterFactory persistenceFactory;
|
||||
private DestinationFactory destinationFactory;
|
||||
|
@ -668,23 +668,23 @@ public class BrokerService implements Service {
|
|||
* @throws IOException
|
||||
*/
|
||||
public SystemUsage getConsumerSystemUsage() throws IOException {
|
||||
if (this.storeSystemUsage == null) {
|
||||
this.storeSystemUsage = new SystemUsage(getSystemUsage(), "Store");
|
||||
this.storeSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
|
||||
addService(this.storeSystemUsage);
|
||||
if (this.consumerSystemUsaage == null) {
|
||||
this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
|
||||
this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(0.5f);
|
||||
addService(this.consumerSystemUsaage);
|
||||
}
|
||||
return this.storeSystemUsage;
|
||||
return this.consumerSystemUsaage;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param storeSystemUsage the storeSystemUsage to set
|
||||
* @param consumerSystemUsaage the storeSystemUsage to set
|
||||
*/
|
||||
public void setConsumerSystemUsage(SystemUsage storeSystemUsage) {
|
||||
if (this.storeSystemUsage != null) {
|
||||
removeService(this.storeSystemUsage);
|
||||
public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
|
||||
if (this.consumerSystemUsaage != null) {
|
||||
removeService(this.consumerSystemUsaage);
|
||||
}
|
||||
this.storeSystemUsage = storeSystemUsage;
|
||||
addService(this.storeSystemUsage);
|
||||
this.consumerSystemUsaage = consumerSystemUsaage;
|
||||
addService(this.consumerSystemUsaage);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -17,7 +17,9 @@
|
|||
package org.apache.activemq.broker.region;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import javax.jms.InvalidSelectorException;
|
||||
import org.apache.activemq.broker.Broker;
|
||||
|
@ -118,9 +120,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
|||
topic.deactivate(context, this);
|
||||
}
|
||||
}
|
||||
for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
|
||||
for (final MessageReference node : dispatched) {
|
||||
// Mark the dispatched messages as redelivered for next time.
|
||||
MessageReference node = (MessageReference)iter.next();
|
||||
Integer count = redeliveredMessages.get(node.getMessageId());
|
||||
if (count != null) {
|
||||
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
|
||||
|
@ -134,8 +135,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
|||
} else {
|
||||
node.decrementReferenceCount();
|
||||
}
|
||||
iter.remove();
|
||||
}
|
||||
dispatched.clear();
|
||||
if (!keepDurableSubsActive && pending.isTransient()) {
|
||||
synchronized (pending) {
|
||||
try {
|
||||
|
|
|
@ -17,8 +17,10 @@
|
|||
package org.apache.activemq.broker.region;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import javax.jms.InvalidSelectorException;
|
||||
import javax.jms.JMSException;
|
||||
|
@ -52,12 +54,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
|
||||
protected PendingMessageCursor pending;
|
||||
protected final LinkedList<MessageReference> dispatched = new LinkedList<MessageReference>();
|
||||
protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
|
||||
protected int prefetchExtension;
|
||||
protected long enqueueCounter;
|
||||
protected long dispatchCounter;
|
||||
protected long dequeueCounter;
|
||||
protected boolean optimizedDispatch=false;
|
||||
protected boolean optimizedDispatch=true;
|
||||
private int maxProducersToAudit=32;
|
||||
private int maxAuditDepth=2048;
|
||||
protected final SystemUsage usageManager;
|
||||
|
@ -148,7 +150,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
if (node.getMessageId().equals(mdn.getMessageId())) {
|
||||
pending.remove();
|
||||
createMessageDispatch(node, node.getMessage());
|
||||
dispatched.addLast(node);
|
||||
dispatched.add(node);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -158,7 +160,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
throw new JMSException("Slave broker out of sync with master: Dispatched message (" + mdn.getMessageId() + ") was not in the pending list");
|
||||
}
|
||||
|
||||
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
|
||||
public synchronized void acknowledge(final ConnectionContext context,
|
||||
final MessageAck ack) throws Exception {
|
||||
// Handle the standard acknowledgment case.
|
||||
boolean callDispatchMatched = false;
|
||||
if (ack.isStandardAck()) {
|
||||
|
@ -166,36 +169,42 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
// acknowledgment.
|
||||
int index = 0;
|
||||
boolean inAckRange = false;
|
||||
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
|
||||
final MessageReference node = iter.next();
|
||||
List<MessageReference> removeList = new ArrayList<MessageReference>();
|
||||
for (final MessageReference node : dispatched) {
|
||||
MessageId messageId = node.getMessageId();
|
||||
if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
|
||||
if (ack.getFirstMessageId() == null
|
||||
|| ack.getFirstMessageId().equals(messageId)) {
|
||||
inAckRange = true;
|
||||
}
|
||||
if (inAckRange) {
|
||||
// Don't remove the nodes until we are committed.
|
||||
if (!context.isInTransaction()) {
|
||||
dequeueCounter++;
|
||||
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
|
||||
iter.remove();
|
||||
node.getRegionDestination().getDestinationStatistics()
|
||||
.getDequeues().increment();
|
||||
removeList.add(node);
|
||||
} else {
|
||||
// setup a Synchronization to remove nodes from the
|
||||
// dispatched list.
|
||||
context.getTransaction().addSynchronization(new Synchronization() {
|
||||
context.getTransaction().addSynchronization(
|
||||
new Synchronization() {
|
||||
|
||||
public void afterCommit() throws Exception {
|
||||
synchronized (PrefetchSubscription.this) {
|
||||
dequeueCounter++;
|
||||
dispatched.remove(node);
|
||||
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
|
||||
prefetchExtension--;
|
||||
}
|
||||
}
|
||||
public void afterCommit() throws Exception {
|
||||
synchronized (PrefetchSubscription.this) {
|
||||
dequeueCounter++;
|
||||
dispatched.remove(node);
|
||||
node.getRegionDestination()
|
||||
.getDestinationStatistics()
|
||||
.getDequeues().increment();
|
||||
prefetchExtension--;
|
||||
}
|
||||
}
|
||||
|
||||
public void afterRollback() throws Exception {
|
||||
super.afterRollback();
|
||||
}
|
||||
});
|
||||
public void afterRollback()
|
||||
throws Exception {
|
||||
super.afterRollback();
|
||||
}
|
||||
});
|
||||
}
|
||||
index++;
|
||||
acknowledge(context, ack, node);
|
||||
|
@ -204,21 +213,28 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
// extend prefetch window only if not a pulling
|
||||
// consumer
|
||||
if (getPrefetchSize() != 0) {
|
||||
prefetchExtension = Math.max(prefetchExtension, index + 1);
|
||||
prefetchExtension = Math.max(prefetchExtension,
|
||||
index + 1);
|
||||
}
|
||||
} else {
|
||||
prefetchExtension = Math.max(0, prefetchExtension - (index + 1));
|
||||
prefetchExtension = Math.max(0, prefetchExtension
|
||||
- (index + 1));
|
||||
}
|
||||
callDispatchMatched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (final MessageReference node : removeList) {
|
||||
dispatched.remove(node);
|
||||
}
|
||||
// this only happens after a reconnect - get an ack which is not
|
||||
// valid
|
||||
if (!callDispatchMatched) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Could not correlate acknowledgment with dispatched message: " + ack);
|
||||
LOG
|
||||
.debug("Could not correlate acknowledgment with dispatched message: "
|
||||
+ ack);
|
||||
}
|
||||
}
|
||||
} else if (ack.isDeliveredAck()) {
|
||||
|
@ -227,7 +243,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
// Acknowledge all dispatched messages up till the message id of the
|
||||
// acknowledgment.
|
||||
int index = 0;
|
||||
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
|
||||
for (Iterator<MessageReference> iter = dispatched.iterator(); iter
|
||||
.hasNext(); index++) {
|
||||
final MessageReference node = iter.next();
|
||||
if (ack.getLastMessageId().equals(node.getMessageId())) {
|
||||
prefetchExtension = Math.max(prefetchExtension, index + 1);
|
||||
|
@ -236,17 +253,20 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
}
|
||||
}
|
||||
if (!callDispatchMatched) {
|
||||
throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
|
||||
throw new JMSException(
|
||||
"Could not correlate acknowledgment with dispatched message: "
|
||||
+ ack);
|
||||
}
|
||||
} else if (ack.isRedeliveredAck() ) {
|
||||
// Message was re-delivered but it was not yet considered to be a DLQ message.
|
||||
} else if (ack.isRedeliveredAck()) {
|
||||
// Message was re-delivered but it was not yet considered to be a
|
||||
// DLQ message.
|
||||
// Acknowledge all dispatched messages up till the message id of the
|
||||
// acknowledgment.
|
||||
boolean inAckRange = false;
|
||||
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
|
||||
final MessageReference node = iter.next();
|
||||
for (final MessageReference node : dispatched) {
|
||||
MessageId messageId = node.getMessageId();
|
||||
if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
|
||||
if (ack.getFirstMessageId() == null
|
||||
|| ack.getFirstMessageId().equals(messageId)) {
|
||||
inAckRange = true;
|
||||
}
|
||||
if (inAckRange) {
|
||||
|
@ -258,49 +278,65 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
}
|
||||
}
|
||||
if (!callDispatchMatched) {
|
||||
throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
|
||||
throw new JMSException(
|
||||
"Could not correlate acknowledgment with dispatched message: "
|
||||
+ ack);
|
||||
}
|
||||
} else if (ack.isPoisonAck()) {
|
||||
// TODO: what if the message is already in a DLQ???
|
||||
// Handle the poison ACK case: we need to send the message to a DLQ
|
||||
if (ack.isInTransaction()) {
|
||||
throw new JMSException("Poison ack cannot be transacted: " + ack);
|
||||
throw new JMSException("Poison ack cannot be transacted: "
|
||||
+ ack);
|
||||
}
|
||||
// Acknowledge all dispatched messages up till the message id of the
|
||||
// acknowledgment.
|
||||
int index = 0;
|
||||
boolean inAckRange = false;
|
||||
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
|
||||
final MessageReference node = iter.next();
|
||||
List<MessageReference> removeList = new ArrayList<MessageReference>();
|
||||
for (final MessageReference node : dispatched) {
|
||||
MessageId messageId = node.getMessageId();
|
||||
if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
|
||||
if (ack.getFirstMessageId() == null
|
||||
|| ack.getFirstMessageId().equals(messageId)) {
|
||||
inAckRange = true;
|
||||
}
|
||||
if (inAckRange) {
|
||||
sendToDLQ(context, node);
|
||||
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
|
||||
iter.remove();
|
||||
node.getRegionDestination().getDestinationStatistics()
|
||||
.getDequeues().increment();
|
||||
removeList.add(node);
|
||||
dequeueCounter++;
|
||||
index++;
|
||||
acknowledge(context, ack, node);
|
||||
if (ack.getLastMessageId().equals(messageId)) {
|
||||
prefetchExtension = Math.max(0, prefetchExtension - (index + 1));
|
||||
prefetchExtension = Math.max(0, prefetchExtension
|
||||
- (index + 1));
|
||||
callDispatchMatched = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (final MessageReference node : removeList) {
|
||||
dispatched.remove(node);
|
||||
}
|
||||
if (!callDispatchMatched) {
|
||||
throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
|
||||
throw new JMSException(
|
||||
"Could not correlate acknowledgment with dispatched message: "
|
||||
+ ack);
|
||||
}
|
||||
}
|
||||
if (callDispatchMatched) {
|
||||
dispatchMatched();
|
||||
} else {
|
||||
if (isSlave()) {
|
||||
throw new JMSException("Slave broker out of sync with master: Acknowledgment (" + ack + ") was not in the dispatch list: " + dispatched);
|
||||
throw new JMSException(
|
||||
"Slave broker out of sync with master: Acknowledgment ("
|
||||
+ ack + ") was not in the dispatch list: "
|
||||
+ dispatched);
|
||||
} else {
|
||||
LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): " + ack);
|
||||
LOG
|
||||
.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
|
||||
+ ack);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -450,7 +486,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
// NULL messages don't count... they don't get Acked.
|
||||
if (node != QueueMessageReference.NULL_MESSAGE) {
|
||||
dispatchCounter++;
|
||||
dispatched.addLast(node);
|
||||
dispatched.add(node);
|
||||
if(pending != null) {
|
||||
pending.dispatched(message);
|
||||
}
|
||||
|
|
|
@ -383,11 +383,15 @@ public class Topic extends BaseDestination implements Task{
|
|||
* @throws IOException
|
||||
* @throws Exception
|
||||
*/
|
||||
synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
|
||||
final ConnectionContext context = producerExchange.getConnectionContext();
|
||||
synchronized void doMessageSend(
|
||||
final ProducerBrokerExchange producerExchange, final Message message)
|
||||
throws IOException, Exception {
|
||||
final ConnectionContext context = producerExchange
|
||||
.getConnectionContext();
|
||||
message.setRegionDestination(this);
|
||||
|
||||
if (store != null && message.isPersistent() && !canOptimizeOutPersistence()) {
|
||||
if (store != null && message.isPersistent()
|
||||
&& !canOptimizeOutPersistence()) {
|
||||
while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
|
||||
if (context.getStopping().get()) {
|
||||
throw new IOException("Connection closed, send aborted.");
|
||||
|
@ -397,31 +401,35 @@ public class Topic extends BaseDestination implements Task{
|
|||
}
|
||||
|
||||
message.incrementReferenceCount();
|
||||
try {
|
||||
|
||||
if (context.isInTransaction()) {
|
||||
context.getTransaction().addSynchronization(new Synchronization() {
|
||||
public void afterCommit() throws Exception {
|
||||
// It could take while before we receive the commit
|
||||
// operration.. by that time the message could have
|
||||
// expired..
|
||||
if (broker.isExpired(message)) {
|
||||
broker.messageExpired(context, message);
|
||||
message.decrementReferenceCount();
|
||||
destinationStatistics.getMessages().decrement();
|
||||
return;
|
||||
}
|
||||
dispatch(context, message);
|
||||
if (context.isInTransaction()) {
|
||||
context.getTransaction().addSynchronization(new Synchronization() {
|
||||
public void afterCommit() throws Exception {
|
||||
// It could take while before we receive the commit
|
||||
// operration.. by that time the message could have
|
||||
// expired..
|
||||
if (broker.isExpired(message)) {
|
||||
broker.messageExpired(context, message);
|
||||
message.decrementReferenceCount();
|
||||
destinationStatistics.getMessages().decrement();
|
||||
return;
|
||||
}
|
||||
});
|
||||
try {
|
||||
dispatch(context, message);
|
||||
} finally {
|
||||
message.decrementReferenceCount();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
} else {
|
||||
} else {
|
||||
try {
|
||||
dispatch(context, message);
|
||||
} finally {
|
||||
message.decrementReferenceCount();
|
||||
}
|
||||
|
||||
} finally {
|
||||
message.decrementReferenceCount();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private boolean canOptimizeOutPersistence() {
|
||||
|
|
|
@ -50,6 +50,11 @@ public interface Store {
|
|||
*/
|
||||
Marshaller COMMAND_MARSHALLER = new CommandMarshaller();
|
||||
|
||||
/**
|
||||
* MessageId marshaller
|
||||
*/
|
||||
Marshaller MESSAGEID_MARSHALLER = new MessageIdMarshaller();
|
||||
|
||||
/**
|
||||
* close the store
|
||||
*
|
||||
|
@ -270,4 +275,5 @@ public interface Store {
|
|||
public boolean isPersistentIndex();
|
||||
|
||||
public void setPersistentIndex(boolean persistentIndex);
|
||||
|
||||
}
|
||||
|
|
|
@ -178,15 +178,7 @@ public class KahaStore implements Store {
|
|||
}
|
||||
}
|
||||
if (directory != null && directory.isDirectory()) {
|
||||
File[] files = directory.listFiles();
|
||||
if (files != null) {
|
||||
for (int i = 0; i < files.length; i++) {
|
||||
File file = files[i];
|
||||
if (!file.isDirectory()) {
|
||||
result &= file.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
result =IOHelper.deleteChildren(directory);
|
||||
String str = result ? "successfully deleted" : "failed to delete";
|
||||
LOG.info("Kaha Store " + str + " data directory " + directory);
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.activemq.kaha.impl.index.IndexLinkedList;
|
|||
import org.apache.activemq.kaha.impl.index.IndexManager;
|
||||
import org.apache.activemq.kaha.impl.index.VMIndex;
|
||||
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -67,7 +68,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
if (index == null) {
|
||||
if (persistentIndex) {
|
||||
String name = containerId.getDataContainerName() + "_" + containerId.getKey();
|
||||
name = name.replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_");
|
||||
name=IOHelper.toFileSystemSafeName(name);
|
||||
try {
|
||||
HashIndex hashIndex = new HashIndex(directory, name, indexManager);
|
||||
hashIndex.setNumberOfBins(getIndexBinSize());
|
||||
|
|
|
@ -56,7 +56,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
|||
private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
|
||||
private static final String STORE_STATE = "store-state";
|
||||
private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
|
||||
private static final Integer INDEX_VERSION = new Integer(2);
|
||||
private static final Integer INDEX_VERSION = new Integer(3);
|
||||
private static final String RECORD_REFERENCES = "record-references";
|
||||
private static final String TRANSACTIONS = "transactions-state";
|
||||
private MapContainer stateMap;
|
||||
|
|
|
@ -117,7 +117,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
|
|||
subscriberContainer.put(key, info);
|
||||
}
|
||||
// add the subscriber
|
||||
ListContainer container = addSubscriberMessageContainer(key);
|
||||
addSubscriberMessageContainer(key);
|
||||
/*
|
||||
* if(retroactive){ for(StoreEntry
|
||||
* entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
|
||||
|
@ -200,33 +200,39 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
|
|||
return result;
|
||||
}
|
||||
|
||||
protected ListContainer addSubscriberMessageContainer(Object key) throws IOException {
|
||||
ListContainer container = store.getListContainer(key, "topic-subs");
|
||||
protected MapContainer addSubscriberMessageContainer(Object key) throws IOException {
|
||||
MapContainer container = store.getMapContainer(key, "topic-subs");
|
||||
container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
|
||||
Marshaller marshaller = new ConsumerMessageRefMarshaller();
|
||||
container.setMarshaller(marshaller);
|
||||
container.setValueMarshaller(marshaller);
|
||||
TopicSubContainer tsc = new TopicSubContainer(container);
|
||||
subscriberMessages.put(key, tsc);
|
||||
return container;
|
||||
}
|
||||
|
||||
protected void removeSubscriberMessageContainer(Object key) throws IOException {
|
||||
protected void removeSubscriberMessageContainer(Object key)
|
||||
throws IOException {
|
||||
subscriberContainer.remove(key);
|
||||
TopicSubContainer container = subscriberMessages.remove(key);
|
||||
for (Iterator i = container.iterator(); i.hasNext();) {
|
||||
ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
|
||||
if (ref != null) {
|
||||
TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
|
||||
if (tsa != null) {
|
||||
if (tsa.decrementCount() <= 0) {
|
||||
ackContainer.remove(ref.getAckEntry());
|
||||
messageContainer.remove(tsa.getMessageEntry());
|
||||
} else {
|
||||
ackContainer.update(ref.getAckEntry(), tsa);
|
||||
if (container != null) {
|
||||
for (Iterator i = container.iterator(); i.hasNext();) {
|
||||
ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
|
||||
if (ref != null) {
|
||||
TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
|
||||
if (tsa != null) {
|
||||
if (tsa.decrementCount() <= 0) {
|
||||
ackContainer.remove(ref.getAckEntry());
|
||||
messageContainer.remove(tsa.getMessageEntry());
|
||||
} else {
|
||||
ackContainer.update(ref.getAckEntry(), tsa);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
container.clear();
|
||||
}
|
||||
store.deleteListContainer(key, "topic-subs");
|
||||
|
||||
}
|
||||
|
||||
public int getMessageCount(String clientId, String subscriberName) throws IOException {
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.activemq.kaha.Store;
|
|||
import org.apache.activemq.kaha.StoreEntry;
|
||||
import org.apache.activemq.store.MessageRecoveryListener;
|
||||
import org.apache.activemq.store.TopicReferenceStore;
|
||||
import org.apache.activemq.util.SubscriptionKey;
|
||||
|
||||
public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
|
||||
|
||||
|
@ -40,6 +41,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
|
|||
protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
|
||||
private Map<String, SubscriptionInfo> subscriberContainer;
|
||||
private Store store;
|
||||
private static final String TOPIC_SUB_NAME = "tsn";
|
||||
|
||||
public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
|
||||
MapContainer<MessageId, ReferenceRecord> messageContainer, ListContainer<TopicSubAck> ackContainer,
|
||||
|
@ -108,10 +110,12 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
|
|||
}
|
||||
}
|
||||
|
||||
protected ListContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
|
||||
ListContainer container = store.getListContainer(clientId+":"+subscriptionName+":"+destination.getQualifiedName(), "topic-subs-references");
|
||||
|
||||
protected MapContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
|
||||
MapContainer container = store.getMapContainer(getSubscriptionContainerName(getSubscriptionKey(clientId, subscriptionName)));
|
||||
container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
|
||||
Marshaller marshaller = new ConsumerMessageRefMarshaller();
|
||||
container.setMarshaller(marshaller);
|
||||
container.setValueMarshaller(marshaller);
|
||||
TopicSubContainer tsc = new TopicSubContainer(container);
|
||||
subscriberMessages.put(getSubscriptionKey(clientId, subscriptionName), tsc);
|
||||
return container;
|
||||
|
@ -192,7 +196,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
|
|||
adapter.addSubscriberState(info);
|
||||
}
|
||||
// add the subscriber
|
||||
ListContainer container = addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
|
||||
addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
|
||||
if (retroactive) {
|
||||
/*
|
||||
* for(StoreEntry
|
||||
|
@ -210,8 +214,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
|
|||
if (info != null) {
|
||||
adapter.removeSubscriberState(info);
|
||||
}
|
||||
String key = getSubscriptionKey(clientId, subscriptionName);
|
||||
removeSubscriberMessageContainer(key);
|
||||
removeSubscriberMessageContainer(clientId,subscriptionName);
|
||||
}
|
||||
|
||||
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
|
||||
|
@ -293,9 +296,11 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
|
|||
}
|
||||
}
|
||||
|
||||
protected void removeSubscriberMessageContainer(String key) throws IOException {
|
||||
subscriberContainer.remove(key);
|
||||
TopicSubContainer container = subscriberMessages.remove(key);
|
||||
protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
|
||||
String subscriberKey = getSubscriptionKey(clientId, subscriptionName);
|
||||
String containerName = getSubscriptionContainerName(subscriberKey);
|
||||
subscriberContainer.remove(subscriberKey);
|
||||
TopicSubContainer container = subscriberMessages.remove(subscriberKey);
|
||||
for (Iterator i = container.iterator(); i.hasNext();) {
|
||||
ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
|
||||
if (ref != null) {
|
||||
|
@ -310,12 +315,18 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
|
|||
}
|
||||
}
|
||||
}
|
||||
store.deleteListContainer(destination, "topic-subs-references-" + key);
|
||||
store.deleteMapContainer(containerName);
|
||||
}
|
||||
|
||||
protected String getSubscriptionKey(String clientId, String subscriberName) {
|
||||
String result = clientId + ":";
|
||||
result += subscriberName != null ? subscriberName : "NOT_SET";
|
||||
return result;
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
buffer.append(clientId).append(":");
|
||||
String name = subscriberName != null ? subscriberName : "NOT_SET";
|
||||
return buffer.append(name).toString();
|
||||
}
|
||||
|
||||
private String getSubscriptionContainerName(String subscriptionKey) {
|
||||
StringBuffer buffer = new StringBuffer(subscriptionKey);
|
||||
return buffer.append(":").append(destination.getQualifiedName()).append(TOPIC_SUB_NAME).toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.activemq.store.kahadaptor;
|
|||
import java.util.Iterator;
|
||||
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.kaha.ListContainer;
|
||||
import org.apache.activemq.kaha.MapContainer;
|
||||
import org.apache.activemq.kaha.StoreEntry;
|
||||
|
||||
/**
|
||||
|
@ -28,11 +28,11 @@ import org.apache.activemq.kaha.StoreEntry;
|
|||
* @version $Revision: 1.10 $
|
||||
*/
|
||||
public class TopicSubContainer {
|
||||
private transient ListContainer listContainer;
|
||||
private transient MapContainer mapContainer;
|
||||
private transient StoreEntry batchEntry;
|
||||
|
||||
public TopicSubContainer(ListContainer container) {
|
||||
this.listContainer = container;
|
||||
public TopicSubContainer(MapContainer container) {
|
||||
this.mapContainer = container;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -55,78 +55,56 @@ public class TopicSubContainer {
|
|||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return listContainer.isEmpty();
|
||||
return mapContainer.isEmpty();
|
||||
}
|
||||
|
||||
public StoreEntry add(ConsumerMessageRef ref) {
|
||||
return listContainer.placeLast(ref);
|
||||
return mapContainer.place(ref.getMessageId(),ref);
|
||||
}
|
||||
|
||||
public ConsumerMessageRef remove(MessageId id) {
|
||||
ConsumerMessageRef result = null;
|
||||
if (!listContainer.isEmpty()) {
|
||||
StoreEntry entry = listContainer.getFirst();
|
||||
while (entry != null) {
|
||||
ConsumerMessageRef ref = (ConsumerMessageRef)listContainer.get(entry);
|
||||
if (ref != null && ref.getMessageId().equals(id)) {
|
||||
result = ref;
|
||||
listContainer.remove(entry);
|
||||
if (batchEntry != null && batchEntry.equals(entry)) {
|
||||
reset();
|
||||
}
|
||||
break;
|
||||
}
|
||||
entry = listContainer.getNext(entry);
|
||||
StoreEntry entry = mapContainer.getEntry(id);
|
||||
if (entry != null) {
|
||||
result = (ConsumerMessageRef) mapContainer.getValue(entry);
|
||||
mapContainer.remove(entry);
|
||||
if (batchEntry != null && batchEntry.equals(entry)) {
|
||||
reset();
|
||||
}
|
||||
}
|
||||
if (listContainer != null && (listContainer.isEmpty() )) {
|
||||
if(mapContainer.isEmpty()) {
|
||||
reset();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public ConsumerMessageRef removeFirst() {
|
||||
ConsumerMessageRef result = null;
|
||||
if (!listContainer.isEmpty()) {
|
||||
StoreEntry entry = listContainer.getFirst();
|
||||
|
||||
result = (ConsumerMessageRef) listContainer.get(entry);
|
||||
listContainer.remove(entry);
|
||||
if (listContainer != null && batchEntry != null
|
||||
&& (listContainer.isEmpty() || batchEntry.equals(entry))) {
|
||||
reset();
|
||||
}
|
||||
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public ConsumerMessageRef get(StoreEntry entry) {
|
||||
return (ConsumerMessageRef)listContainer.get(entry);
|
||||
return (ConsumerMessageRef)mapContainer.getValue(entry);
|
||||
}
|
||||
|
||||
public StoreEntry getEntry() {
|
||||
return listContainer.getFirst();
|
||||
return mapContainer.getFirst();
|
||||
}
|
||||
|
||||
public StoreEntry refreshEntry(StoreEntry entry) {
|
||||
return listContainer.refresh(entry);
|
||||
return mapContainer.refresh(entry);
|
||||
}
|
||||
|
||||
public StoreEntry getNextEntry(StoreEntry entry) {
|
||||
return listContainer.getNext(entry);
|
||||
return mapContainer.getNext(entry);
|
||||
}
|
||||
|
||||
public Iterator iterator() {
|
||||
return listContainer.iterator();
|
||||
return mapContainer.values().iterator();
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return listContainer.size();
|
||||
return mapContainer.size();
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
reset();
|
||||
listContainer.clear();
|
||||
mapContainer.clear();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.io.IOException;
|
|||
* @version $Revision$
|
||||
*/
|
||||
public final class IOHelper {
|
||||
|
||||
protected static final int MAX_FILE_NAME_LENGTH;
|
||||
private IOHelper() {
|
||||
}
|
||||
|
||||
|
@ -74,6 +74,10 @@ public final class IOHelper {
|
|||
rc.append(HexSupport.toHexFromInt(c, true));
|
||||
}
|
||||
}
|
||||
String result = rc.toString();
|
||||
if (result.length() > MAX_FILE_NAME_LENGTH) {
|
||||
result = result.substring(0,MAX_FILE_NAME_LENGTH);
|
||||
}
|
||||
return rc.toString();
|
||||
}
|
||||
|
||||
|
@ -121,5 +125,9 @@ public final class IOHelper {
|
|||
}
|
||||
}
|
||||
|
||||
static {
|
||||
MAX_FILE_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumFileNameLength","200")).intValue();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue