move decision about being a slave from the Broker to the ConnectionContext - so can be done on a Connection basis if required

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@557748 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-07-19 19:24:31 +00:00
parent 993f78caac
commit 83a6eff431
13 changed files with 73 additions and 51 deletions

View File

@ -191,12 +191,7 @@ public interface Broker extends Region, Service {
* @param messageDispatch
*/
public void processDispatch(MessageDispatch messageDispatch);
/**
* @return true if the broker is running as a slave
*/
public boolean isSlaveBroker();
/**
* @return true if the broker has stopped
*/
@ -229,7 +224,7 @@ public interface Broker extends Region, Service {
* @return true if fault tolerant
*/
public boolean isFaultTolerantConfiguration();
/**
* @return the connection context used to make administration operations on startup or via JMX MBeans
*/

View File

@ -200,11 +200,7 @@ public class BrokerFilter implements Broker {
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception{
next.processDispatchNotification(messageDispatchNotification);
}
public boolean isSlaveBroker(){
return next.isSlaveBroker();
}
public boolean isStopped(){
return next.isStopped();
}

View File

@ -154,6 +154,7 @@ public class BrokerService implements Service {
private boolean useLocalHostBrokerName = false;
private CountDownLatch stoppedLatch = new CountDownLatch(1);
private boolean supportFailOver = false;
private boolean clustered = false;
static{
String localHostName = "localhost";
@ -1120,6 +1121,20 @@ public class BrokerService implements Service {
public void setSupportFailOver(boolean supportFailOver){
this.supportFailOver=supportFailOver;
}
/**
* @return the clustered
*/
public boolean isClustered(){
return this.clustered;
}
/**
* @param clustered the clustered to set
*/
public void setClustered(boolean clustered){
this.clustered=clustered;
}
// Implementation methods
// -------------------------------------------------------------------------
@ -1697,6 +1712,5 @@ public class BrokerService implements Service {
broker.addDestination(adminConnectionContext, destination);
}
}
}
}
}

View File

@ -59,6 +59,7 @@ public class ConnectionContext {
private final AtomicBoolean stopping = new AtomicBoolean();
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
private boolean dontSendReponse;
private boolean clientMaster=true;
public ConnectionContext() {
}
@ -267,6 +268,29 @@ public class ConnectionContext {
public boolean isDontSendReponse() {
return dontSendReponse;
}
}
/**
* @return the slave
*/
public boolean isSlave(){
return (this.broker!=null&&this.broker.getBrokerService().isSlave())||!this.clientMaster;
}
/**
* @return the clientMaster
*/
public boolean isClientMaster(){
return this.clientMaster;
}
/**
* @param clientMaster the clientMaster to set
*/
public void setClientMaster(boolean clientMaster){
this.clientMaster=clientMaster;
}
}

View File

@ -199,10 +199,7 @@ public class EmptyBroker implements Broker {
}
public boolean isSlaveBroker() {
return false;
}
public boolean isStopped() {
return false;
}

View File

@ -197,10 +197,7 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message);
}
public boolean isSlaveBroker() {
throw new BrokerStoppedException(this.message);
}
public boolean isStopped() {
return true;
}

View File

@ -209,10 +209,7 @@ public class MutableBrokerFilter implements Broker {
getNext().processDispatchNotification(messageDispatchNotification);
}
public boolean isSlaveBroker(){
return getNext().isSlaveBroker();
}
public boolean isStopped(){
return getNext().isStopped();
}

View File

@ -658,6 +658,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
context.setClientId(clientId);
context.setUserName(info.getUserName());
context.setConnectionId(info.getConnectionId());
context.setClientMaster(info.isClientMaster());
context.setWireFormatInfo(wireFormatInfo);
context.setNetworkConnection(networkConnection);
context.incrementReference();
@ -1199,18 +1200,19 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
}
}
protected void disposeTransport() {
if( transportDisposed.compareAndSet(false, true) ) {
try {
transport.stop();
active = false;
log.debug("Stopped connection: "+transport.getRemoteAddress());
} catch (Exception e) {
log.debug("Could not stop transport: "+e,e);
}
}
}
protected void disposeTransport(){
if(transportDisposed.compareAndSet(false,true)){
try{
transport.stop();
active=false;
log.debug("Stopped connection: "+transport.getRemoteAddress());
}catch(Exception e){
log.debug("Could not stop transport: "+e,e);
}
}
}
public int getProtocolVersion() {
return protocolVersion.get();
}

View File

@ -115,8 +115,8 @@ abstract public class AbstractSubscription implements Subscription {
public void gc() {
}
public boolean isSlaveBroker(){
return broker.isSlaveBroker();
public boolean isSlave(){
return getContext().isSlave();
}
public ConnectionContext getContext() {

View File

@ -74,7 +74,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
// The slave should not deliver pull messages. TODO: when the slave becomes a master,
// He should send a NULL message to all the consumers to 'wake them up' in case
// they were waiting for a message.
if(getPrefetchSize()==0&&!isSlaveBroker()){
if(getPrefetchSize()==0&&!isSlave()){
prefetchExtension++;
final long dispatchCounterBeforePull=dispatchCounter;
dispatchMatched();
@ -119,7 +119,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
pendingEmpty=pending.isEmpty();
enqueueCounter++;
if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){
if(!isFull()&&pendingEmpty&&!isSlave()){
dispatch(node);
}else{
optimizePrefetch();
@ -260,7 +260,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(callDispatchMatched){
dispatchMatched();
}else{
if(isSlaveBroker()){
if(isSlave()){
throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack
+") was not in the dispatch list: "+dispatched);
}else{
@ -295,7 +295,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
* @return
*/
protected synchronized boolean isFull(){
return isSlaveBroker()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
return isSlave()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
}
/**
@ -377,7 +377,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
}
protected synchronized void dispatchMatched() throws IOException{
if(!broker.isSlaveBroker()){
if(!isSlave()){
try{
int numberToDispatch=countBeforeFull();
if(numberToDispatch>0){
@ -412,7 +412,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
return false;
}
// Make sure we can dispatch a message.
if(canDispatch(node)&&!isSlaveBroker()){
if(canDispatch(node)&&!isSlave()){
MessageDispatch md=createMessageDispatch(node,message);
// NULL messages don't count... they don't get Acked.
if(node!=QueueMessageReference.NULL_MESSAGE){

View File

@ -111,7 +111,7 @@ public interface Subscription extends SubscriptionRecovery {
/**
* @return true if the broker is currently in slave mode
*/
boolean isSlaveBroker();
boolean isSlave();
/**
* @return number of messages pending delivery

View File

@ -74,7 +74,7 @@ public class TopicSubscription extends AbstractSubscription{
public void add(MessageReference node) throws Exception{
enqueueCounter.incrementAndGet();
node.incrementReferenceCount();
if(!isFull()&&!isSlaveBroker()){
if(!isFull()&&!isSlave()){
optimizePrefetch();
// if maximumPendingMessages is set we will only discard messages which
// have not been dispatched (i.e. we allow the prefetch buffer to be filled)

View File

@ -36,7 +36,7 @@ public class ConnectionInfo extends BaseCommand {
protected BrokerId[] brokerPath;
protected boolean brokerMasterConnector;
protected boolean manageable;
protected boolean clientMaster;
protected boolean clientMaster=true;
protected transient Object transportContext;
public ConnectionInfo() {