From dfcf776e5d357953b95081385676b41a42ac8431 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Thu, 10 Apr 2008 17:18:05 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-1656 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@646880 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/activemq/broker/Broker.java | 5 +++++ .../java/org/apache/activemq/broker/BrokerFilter.java | 4 ++++ .../java/org/apache/activemq/broker/EmptyBroker.java | 4 ++++ .../java/org/apache/activemq/broker/ErrorBroker.java | 4 ++++ .../apache/activemq/broker/MutableBrokerFilter.java | 4 ++++ .../activemq/broker/region/BaseDestination.java | 8 +++++++- .../java/org/apache/activemq/broker/region/Queue.java | 2 +- .../apache/activemq/broker/region/RegionBroker.java | 11 +++++++++-- .../java/org/apache/activemq/broker/region/Topic.java | 1 + .../java/org/apache/activemq/broker/StubBroker.java | 4 ++++ 10 files changed, 43 insertions(+), 4 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index 62ed76469b..ac0021a131 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -312,5 +312,10 @@ public interface Broker extends Region, Service { * @param messageReference */ void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference); + + /** + * @return the broker sequence id + */ + long getBrokerSequenceId(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 5ef26d41e4..4600722667 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -260,4 +260,8 @@ public class BrokerFilter implements Broker { public Broker getRoot() { return next.getRoot(); } + + public long getBrokerSequenceId() { + return next.getBrokerSequenceId(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 2e27c30063..d4e4c5827d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -252,4 +252,8 @@ public class EmptyBroker implements Broker { public Broker getRoot() { return null; } + + public long getBrokerSequenceId() { + return -1l; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 4d4587ba79..4f12af2f38 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -263,4 +263,8 @@ public class ErrorBroker implements Broker { public Broker getRoot() { throw new BrokerStoppedException(this.message); } + + public long getBrokerSequenceId() { + throw new BrokerStoppedException(this.message); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 9b5e8f977d..ba7a34aa78 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -273,5 +273,9 @@ public class MutableBrokerFilter implements Broker { public Broker getRoot() { return getNext().getRoot(); } + + public long getBrokerSequenceId() { + return getNext().getBrokerSequenceId(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 6e0befd743..27ad5c7251 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -46,6 +46,7 @@ public abstract class BaseDestination implements Destination { private boolean lazyDispatch=false; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final BrokerService brokerService; + protected final Broker regionBroker; /** * @param broker @@ -66,6 +67,7 @@ public abstract class BaseDestination implements Destination { this.systemUsage = brokerService.getProducerSystemUsage(); this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString()); this.memoryUsage.setUsagePortion(1.0f); + this.regionBroker = brokerService.getRegionBroker(); } /** @@ -193,5 +195,9 @@ public abstract class BaseDestination implements Destination { public void setLazyDispatch(boolean lazyDispatch) { this.lazyDispatch = lazyDispatch; - } + } + + protected long getDestinationSequenceId() { + return regionBroker.getBrokerSequenceId(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 0033f79e9a..eaa8f232cd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -430,7 +430,7 @@ public class Queue extends BaseDestination implements Task { "Connection closed, send aborted."); } } - + message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); store.addMessage(context, message); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 47ee75e362..8f067da735 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -418,8 +418,6 @@ public class RegionBroker implements Broker { } public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { - long si = sequenceGenerator.getNextSequenceId(); - message.getMessageId().setBrokerSequenceId(si); message.setBrokerInTime(System.currentTimeMillis()); if (producerExchange.isMutable() || producerExchange.getRegion() == null) { ActiveMQDestination destination = message.getDestination(); @@ -730,4 +728,13 @@ public class RegionBroker implements Broker { throw new RuntimeException("The broker from the BrokerService should not throw an exception"); } } + + /** + * @return the broker sequence id + */ + public long getBrokerSequenceId() { + synchronized(sequenceGenerator) { + return sequenceGenerator.getNextSequenceId(); + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 2abcc74b0d..e25087f075 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -382,6 +382,7 @@ public class Topic extends BaseDestination implements Task{ final ConnectionContext context = producerExchange .getConnectionContext(); message.setRegionDestination(this); + message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java b/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java index f098c1c022..b6e14d1990 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java @@ -258,4 +258,8 @@ public class StubBroker implements Broker { public Broker getRoot() { return this; } + + public long getBrokerSequenceId() { + return -1l; + } }