git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@646880 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-04-10 17:18:05 +00:00
parent 0e5dda068c
commit dfcf776e5d
10 changed files with 43 additions and 4 deletions

View File

@ -312,5 +312,10 @@ public interface Broker extends Region, Service {
* @param messageReference
*/
void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference);
/**
* @return the broker sequence id
*/
long getBrokerSequenceId();
}

View File

@ -260,4 +260,8 @@ public class BrokerFilter implements Broker {
public Broker getRoot() {
return next.getRoot();
}
public long getBrokerSequenceId() {
return next.getBrokerSequenceId();
}
}

View File

@ -252,4 +252,8 @@ public class EmptyBroker implements Broker {
public Broker getRoot() {
return null;
}
public long getBrokerSequenceId() {
return -1l;
}
}

View File

@ -263,4 +263,8 @@ public class ErrorBroker implements Broker {
public Broker getRoot() {
throw new BrokerStoppedException(this.message);
}
public long getBrokerSequenceId() {
throw new BrokerStoppedException(this.message);
}
}

View File

@ -273,5 +273,9 @@ public class MutableBrokerFilter implements Broker {
public Broker getRoot() {
return getNext().getRoot();
}
public long getBrokerSequenceId() {
return getNext().getBrokerSequenceId();
}
}

View File

@ -46,6 +46,7 @@ public abstract class BaseDestination implements Destination {
private boolean lazyDispatch=false;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected final BrokerService brokerService;
protected final Broker regionBroker;
/**
* @param broker
@ -66,6 +67,7 @@ public abstract class BaseDestination implements Destination {
this.systemUsage = brokerService.getProducerSystemUsage();
this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
this.memoryUsage.setUsagePortion(1.0f);
this.regionBroker = brokerService.getRegionBroker();
}
/**
@ -193,5 +195,9 @@ public abstract class BaseDestination implements Destination {
public void setLazyDispatch(boolean lazyDispatch) {
this.lazyDispatch = lazyDispatch;
}
}
protected long getDestinationSequenceId() {
return regionBroker.getBrokerSequenceId();
}
}

View File

@ -430,7 +430,7 @@ public class Queue extends BaseDestination implements Task {
"Connection closed, send aborted.");
}
}
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
store.addMessage(context, message);
}

View File

@ -418,8 +418,6 @@ public class RegionBroker implements Broker {
}
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
long si = sequenceGenerator.getNextSequenceId();
message.getMessageId().setBrokerSequenceId(si);
message.setBrokerInTime(System.currentTimeMillis());
if (producerExchange.isMutable() || producerExchange.getRegion() == null) {
ActiveMQDestination destination = message.getDestination();
@ -730,4 +728,13 @@ public class RegionBroker implements Broker {
throw new RuntimeException("The broker from the BrokerService should not throw an exception");
}
}
/**
* @return the broker sequence id
*/
public long getBrokerSequenceId() {
synchronized(sequenceGenerator) {
return sequenceGenerator.getNextSequenceId();
}
}
}

View File

@ -382,6 +382,7 @@ public class Topic extends BaseDestination implements Task{
final ConnectionContext context = producerExchange
.getConnectionContext();
message.setRegionDestination(this);
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (topicStore != null && message.isPersistent()
&& !canOptimizeOutPersistence()) {

View File

@ -258,4 +258,8 @@ public class StubBroker implements Broker {
public Broker getRoot() {
return this;
}
public long getBrokerSequenceId() {
return -1l;
}
}