From 11be076ac9d69171a54a03e03169b7d78685e095 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Wed, 8 Mar 2006 13:09:17 +0000 Subject: [PATCH] refactor to move the useful code for dealing with start/stop lifecycles and the start, closed, closing properties into the base ServiceSupport class git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384208 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/TransportServerSupport.java | 4 +- .../TransportServerThreadSupport.java | 55 +++--------------- .../activemq/transport/TransportSupport.java | 3 +- .../transport/TransportThreadSupport.java | 37 ------------ .../transport/tcp/TcpTransportServer.java | 49 ++++++++-------- .../apache/activemq/util/ServiceSupport.java | 58 ++++++++++++++++--- 6 files changed, 88 insertions(+), 118 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java index e37870c1c1..01bd9e6c90 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.transport; +import org.apache.activemq.util.ServiceSupport; + import java.net.URI; /** @@ -23,7 +25,7 @@ import java.net.URI; * * @version $Revision: 1.1 $ */ -public abstract class TransportServerSupport implements TransportServer { +public abstract class TransportServerSupport extends ServiceSupport implements TransportServer { private URI location; private TransportAcceptListener acceptListener; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java index 1fde3cbc5f..b3dae7d0e4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java @@ -16,7 +16,6 @@ */ package org.apache.activemq.transport; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.ThreadPriorities; import org.apache.activemq.util.ServiceStopper; @@ -34,9 +33,6 @@ import java.net.URI; public abstract class TransportServerThreadSupport extends TransportServerSupport implements Runnable { private static final Log log = LogFactory.getLog(TransportServerThreadSupport.class); - private AtomicBoolean closed = new AtomicBoolean(false); - private AtomicBoolean started = new AtomicBoolean(false); - private AtomicBoolean closing = new AtomicBoolean(false); private boolean daemon = true; private boolean joinOnStop = true; private Thread runner; @@ -48,48 +44,6 @@ public abstract class TransportServerThreadSupport extends TransportServerSuppor super(location); } - public void start() throws Exception { - if (started.compareAndSet(false, true)) { - doStart(); - } - } - - public void stop() throws Exception { - if (closed.compareAndSet(false, true)) { - closing.set(true); - ServiceStopper stopper = new ServiceStopper(); - try { - doStop(stopper); - } - catch (Exception e) { - stopper.onException(this, e); - } - if (runner != null && joinOnStop) { - runner.join(); - runner = null; - } - closed.set(true); - started.set(false); - closing.set(false); - stopper.throwFirstException(); - } - } - - public boolean isStarted() { - return started.get(); - } - - /** - * @return true if the transport server is in the process of closing down. - */ - public boolean isClosing() { - return closing.get(); - } - - public boolean isClosed() { - return closed.get(); - } - public boolean isDaemon() { return daemon; } @@ -113,7 +67,7 @@ public abstract class TransportServerThreadSupport extends TransportServerSuppor this.joinOnStop = joinOnStop; } - protected void doStart() { + protected void doStart() throws Exception { log.info("Listening for connections at: " + getLocation()); runner = new Thread(this, toString()); runner.setDaemon(daemon); @@ -121,5 +75,10 @@ public abstract class TransportServerThreadSupport extends TransportServerSuppor runner.start(); } - protected abstract void doStop(ServiceStopper stopper) throws Exception; + protected void doStop(ServiceStopper stopper) throws Exception { + if (runner != null && joinOnStop) { + runner.join(); + runner = null; + } + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java index 90fba9a998..378eebb73b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport; import org.apache.activemq.command.Command; import org.apache.activemq.command.Response; +import org.apache.activemq.util.ServiceSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,7 +29,7 @@ import java.io.IOException; * * @version $Revision: 1.1 $ */ -public abstract class TransportSupport implements Transport { +public abstract class TransportSupport extends ServiceSupport implements Transport { private static final Log log = LogFactory.getLog(TransportSupport.class); private TransportListener transportListener; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java index 7a39200227..b8adeba97e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportThreadSupport.java @@ -19,12 +19,9 @@ package org.apache.activemq.transport; import org.apache.activemq.command.Command; import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.ShutdownInfo; -import org.apache.activemq.util.ServiceStopper; import java.io.IOException; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; - /** * A useful base class for a transport implementation which has a background * reading thread. @@ -33,40 +30,9 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; */ public abstract class TransportThreadSupport extends TransportSupport implements Runnable { - private AtomicBoolean closed = new AtomicBoolean(false); - private AtomicBoolean started = new AtomicBoolean(false); private boolean daemon = false; private Thread runner; - public void start() throws Exception { - if (started.compareAndSet(false, true)) { - doStart(); - } - } - - public void stop() throws Exception { - if (closed.compareAndSet(false, true)) { - started.set(false); - ServiceStopper stopper = new ServiceStopper(); - try { - doStop(stopper); - } - catch (Exception e) { - stopper.onException(this, e); - } - stopper.throwFirstException(); - } - closed.set(true); - } - - public boolean isStarted() { - return started.get(); - } - - public boolean isClosed() { - return closed.get(); - } - public boolean isDaemon() { return daemon; } @@ -75,15 +41,12 @@ public abstract class TransportThreadSupport extends TransportSupport implements this.daemon = daemon; } - protected void doStart() throws Exception { runner = new Thread(this, toString()); runner.setDaemon(daemon); runner.start(); } - protected abstract void doStop(ServiceStopper stopper) throws Exception; - protected void checkStarted(Command command) throws IOException { if (!isStarted()) { // we might try to shut down the transport before it was ever started in some test cases diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index 7473a584a9..58708d9739 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -90,6 +90,30 @@ public class TcpTransportServer extends TransportServerThreadSupport { public void setBrokerInfo(BrokerInfo brokerInfo) { } + public long getMaxInactivityDuration() { + return maxInactivityDuration; + } + + public void setMaxInactivityDuration(long maxInactivityDuration) { + this.maxInactivityDuration = maxInactivityDuration; + } + + public int getMinmumWireFormatVersion() { + return minmumWireFormatVersion; + } + + public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { + this.minmumWireFormatVersion = minmumWireFormatVersion; + } + + public boolean isTrace() { + return trace; + } + + public void setTrace(boolean trace) { + this.trace = trace; + } + /** * pull Sockets from the ServerSocket */ @@ -183,32 +207,9 @@ public class TcpTransportServer extends TransportServerThreadSupport { } protected void doStop(ServiceStopper stopper) throws Exception { + super.doStop(stopper); if (serverSocket != null) { serverSocket.close(); } } - - public long getMaxInactivityDuration() { - return maxInactivityDuration; - } - - public void setMaxInactivityDuration(long maxInactivityDuration) { - this.maxInactivityDuration = maxInactivityDuration; - } - - public int getMinmumWireFormatVersion() { - return minmumWireFormatVersion; - } - - public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { - this.minmumWireFormatVersion = minmumWireFormatVersion; - } - - public boolean isTrace() { - return trace; - } - - public void setTrace(boolean trace) { - this.trace = trace; - } } diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java index 9a9c2dc351..b93d41c94b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java @@ -16,18 +16,24 @@ */ package org.apache.activemq.util; +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; + import org.apache.activemq.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** - * A helper class for working with services + * A helper class for working with services together with a useful base class for service implementations. * * @version $Revision: 1.1 $ */ public abstract class ServiceSupport { private static final Log log = LogFactory.getLog(ServiceSupport.class); + private AtomicBoolean closed = new AtomicBoolean(false); + private AtomicBoolean started = new AtomicBoolean(false); + private AtomicBoolean closing = new AtomicBoolean(false); + public static void dispose(Service service) { try { service.stop(); @@ -36,15 +42,53 @@ public abstract class ServiceSupport { log.error("Could not stop service: " + service + ". Reason: " + e, e); } } - + + public void start() throws Exception { + if (started.compareAndSet(false, true)) { + doStart(); + } + } + public void stop() throws Exception { - ServiceStopper stopper = new ServiceStopper(); - stop(stopper); - stopper.throwFirstException(); + if (closed.compareAndSet(false, true)) { + closing.set(true); + ServiceStopper stopper = new ServiceStopper(); + try { + doStop(stopper); + } + catch (Exception e) { + stopper.onException(this, e); + } + closed.set(true); + started.set(false); + closing.set(false); + stopper.throwFirstException(); + } } /** - * Provides a way for derived classes to stop resources cleanly, handling exceptions + * @return true if this service has been started */ - protected abstract void stop(ServiceStopper stopper); + public boolean isStarted() { + return started.get(); + } + + /** + * @return true if this service is in the process of closing + */ + public boolean isClosing() { + return closing.get(); + } + + + /** + * @return true if this service is closed + */ + public boolean isClosed() { + return closed.get(); + } + + protected abstract void doStop(ServiceStopper stopper) throws Exception; + + protected abstract void doStart() throws Exception; }