From 83a6eff431007e259fce89905f3245669f65c8fc Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Thu, 19 Jul 2007 19:24:31 +0000 Subject: [PATCH] move decision about being a slave from the Broker to the ConnectionContext - so can be done on a Connection basis if required git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@557748 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/broker/Broker.java | 9 ++---- .../apache/activemq/broker/BrokerFilter.java | 6 +--- .../apache/activemq/broker/BrokerService.java | 18 ++++++++++-- .../activemq/broker/ConnectionContext.java | 28 +++++++++++++++++-- .../apache/activemq/broker/EmptyBroker.java | 5 +--- .../apache/activemq/broker/ErrorBroker.java | 5 +--- .../activemq/broker/MutableBrokerFilter.java | 5 +--- .../activemq/broker/TransportConnection.java | 26 +++++++++-------- .../broker/region/AbstractSubscription.java | 4 +-- .../broker/region/PrefetchSubscription.java | 12 ++++---- .../activemq/broker/region/Subscription.java | 2 +- .../broker/region/TopicSubscription.java | 2 +- .../activemq/command/ConnectionInfo.java | 2 +- 13 files changed, 73 insertions(+), 51 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index 69d8c2370b..986f6ceb10 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -191,12 +191,7 @@ public interface Broker extends Region, Service { * @param messageDispatch */ public void processDispatch(MessageDispatch messageDispatch); - - /** - * @return true if the broker is running as a slave - */ - public boolean isSlaveBroker(); - + /** * @return true if the broker has stopped */ @@ -229,7 +224,7 @@ public interface Broker extends Region, Service { * @return true if fault tolerant */ public boolean isFaultTolerantConfiguration(); - + /** * @return the connection context used to make administration operations on startup or via JMX MBeans */ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index c78853fddd..e972eeeaf8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -200,11 +200,7 @@ public class BrokerFilter implements Broker { public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception{ next.processDispatchNotification(messageDispatchNotification); } - - public boolean isSlaveBroker(){ - return next.isSlaveBroker(); - } - + public boolean isStopped(){ return next.isStopped(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 727b0c8d88..963c259572 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -154,6 +154,7 @@ public class BrokerService implements Service { private boolean useLocalHostBrokerName = false; private CountDownLatch stoppedLatch = new CountDownLatch(1); private boolean supportFailOver = false; + private boolean clustered = false; static{ String localHostName = "localhost"; @@ -1120,6 +1121,20 @@ public class BrokerService implements Service { public void setSupportFailOver(boolean supportFailOver){ this.supportFailOver=supportFailOver; } + + /** + * @return the clustered + */ + public boolean isClustered(){ + return this.clustered; + } + + /** + * @param clustered the clustered to set + */ + public void setClustered(boolean clustered){ + this.clustered=clustered; + } // Implementation methods // ------------------------------------------------------------------------- @@ -1697,6 +1712,5 @@ public class BrokerService implements Service { broker.addDestination(adminConnectionContext, destination); } } - } - + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java index d3db30fc64..559bf69ca4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java @@ -59,6 +59,7 @@ public class ConnectionContext { private final AtomicBoolean stopping = new AtomicBoolean(); private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext(); private boolean dontSendReponse; + private boolean clientMaster=true; public ConnectionContext() { } @@ -267,6 +268,29 @@ public class ConnectionContext { public boolean isDontSendReponse() { return dontSendReponse; - } - + } + + + /** + * @return the slave + */ + public boolean isSlave(){ + return (this.broker!=null&&this.broker.getBrokerService().isSlave())||!this.clientMaster; + } + + + /** + * @return the clientMaster + */ + public boolean isClientMaster(){ + return this.clientMaster; + } + + + /** + * @param clientMaster the clientMaster to set + */ + public void setClientMaster(boolean clientMaster){ + this.clientMaster=clientMaster; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index bef07ae7d3..f5f961f2f1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -199,10 +199,7 @@ public class EmptyBroker implements Broker { } - public boolean isSlaveBroker() { - return false; - } - + public boolean isStopped() { return false; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 57e58ec3a0..a714c9a1cc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -197,10 +197,7 @@ public class ErrorBroker implements Broker { throw new BrokerStoppedException(this.message); } - public boolean isSlaveBroker() { - throw new BrokerStoppedException(this.message); - } - + public boolean isStopped() { return true; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 7c544e5761..d875287d86 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -209,10 +209,7 @@ public class MutableBrokerFilter implements Broker { getNext().processDispatchNotification(messageDispatchNotification); } - public boolean isSlaveBroker(){ - return getNext().isSlaveBroker(); - } - + public boolean isStopped(){ return getNext().isStopped(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 1a6efbda4f..f93b04a6e3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -658,6 +658,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit context.setClientId(clientId); context.setUserName(info.getUserName()); context.setConnectionId(info.getConnectionId()); + context.setClientMaster(info.isClientMaster()); context.setWireFormatInfo(wireFormatInfo); context.setNetworkConnection(networkConnection); context.incrementReference(); @@ -1199,18 +1200,19 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit } } - protected void disposeTransport() { - if( transportDisposed.compareAndSet(false, true) ) { - try { - transport.stop(); - active = false; - log.debug("Stopped connection: "+transport.getRemoteAddress()); - } catch (Exception e) { - log.debug("Could not stop transport: "+e,e); - } - } - } - + protected void disposeTransport(){ + if(transportDisposed.compareAndSet(false,true)){ + try{ + transport.stop(); + active=false; + log.debug("Stopped connection: "+transport.getRemoteAddress()); + }catch(Exception e){ + log.debug("Could not stop transport: "+e,e); + } + } + } + + public int getProtocolVersion() { return protocolVersion.get(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index 6f30b45788..2d1d7d5f6b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -115,8 +115,8 @@ abstract public class AbstractSubscription implements Subscription { public void gc() { } - public boolean isSlaveBroker(){ - return broker.isSlaveBroker(); + public boolean isSlave(){ + return getContext().isSlave(); } public ConnectionContext getContext() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 86edd23f88..c2b047f96c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -74,7 +74,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ // The slave should not deliver pull messages. TODO: when the slave becomes a master, // He should send a NULL message to all the consumers to 'wake them up' in case // they were waiting for a message. - if(getPrefetchSize()==0&&!isSlaveBroker()){ + if(getPrefetchSize()==0&&!isSlave()){ prefetchExtension++; final long dispatchCounterBeforePull=dispatchCounter; dispatchMatched(); @@ -119,7 +119,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ pendingEmpty=pending.isEmpty(); enqueueCounter++; - if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){ + if(!isFull()&&pendingEmpty&&!isSlave()){ dispatch(node); }else{ optimizePrefetch(); @@ -260,7 +260,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ if(callDispatchMatched){ dispatchMatched(); }else{ - if(isSlaveBroker()){ + if(isSlave()){ throw new JMSException("Slave broker out of sync with master: Acknowledgment ("+ack +") was not in the dispatch list: "+dispatched); }else{ @@ -295,7 +295,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ * @return */ protected synchronized boolean isFull(){ - return isSlaveBroker()||dispatched.size()-prefetchExtension>=info.getPrefetchSize(); + return isSlave()||dispatched.size()-prefetchExtension>=info.getPrefetchSize(); } /** @@ -377,7 +377,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } protected synchronized void dispatchMatched() throws IOException{ - if(!broker.isSlaveBroker()){ + if(!isSlave()){ try{ int numberToDispatch=countBeforeFull(); if(numberToDispatch>0){ @@ -412,7 +412,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ return false; } // Make sure we can dispatch a message. - if(canDispatch(node)&&!isSlaveBroker()){ + if(canDispatch(node)&&!isSlave()){ MessageDispatch md=createMessageDispatch(node,message); // NULL messages don't count... they don't get Acked. if(node!=QueueMessageReference.NULL_MESSAGE){ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java index abae931b3d..1b9b54cf21 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -111,7 +111,7 @@ public interface Subscription extends SubscriptionRecovery { /** * @return true if the broker is currently in slave mode */ - boolean isSlaveBroker(); + boolean isSlave(); /** * @return number of messages pending delivery diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index d9c8191593..96bc9e4ec2 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -74,7 +74,7 @@ public class TopicSubscription extends AbstractSubscription{ public void add(MessageReference node) throws Exception{ enqueueCounter.incrementAndGet(); node.incrementReferenceCount(); - if(!isFull()&&!isSlaveBroker()){ + if(!isFull()&&!isSlave()){ optimizePrefetch(); // if maximumPendingMessages is set we will only discard messages which // have not been dispatched (i.e. we allow the prefetch buffer to be filled) diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java index fb123dbb50..7a45bbb71f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java @@ -36,7 +36,7 @@ public class ConnectionInfo extends BaseCommand { protected BrokerId[] brokerPath; protected boolean brokerMasterConnector; protected boolean manageable; - protected boolean clientMaster; + protected boolean clientMaster=true; protected transient Object transportContext; public ConnectionInfo() {