refactor to move the useful code for dealing with start/stop lifecycles and the start, closed, closing properties into the base ServiceSupport class

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384208 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-08 13:09:17 +00:00
parent 8568d6ee05
commit 11be076ac9
6 changed files with 88 additions and 118 deletions

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.transport;
import org.apache.activemq.util.ServiceSupport;
import java.net.URI;
/**
@ -23,7 +25,7 @@ import java.net.URI;
*
* @version $Revision: 1.1 $
*/
public abstract class TransportServerSupport implements TransportServer {
public abstract class TransportServerSupport extends ServiceSupport implements TransportServer {
private URI location;
private TransportAcceptListener acceptListener;

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.transport;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ThreadPriorities;
import org.apache.activemq.util.ServiceStopper;
@ -34,9 +33,6 @@ import java.net.URI;
public abstract class TransportServerThreadSupport extends TransportServerSupport implements Runnable {
private static final Log log = LogFactory.getLog(TransportServerThreadSupport.class);
private AtomicBoolean closed = new AtomicBoolean(false);
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean closing = new AtomicBoolean(false);
private boolean daemon = true;
private boolean joinOnStop = true;
private Thread runner;
@ -48,48 +44,6 @@ public abstract class TransportServerThreadSupport extends TransportServerSuppor
super(location);
}
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
doStart();
}
}
public void stop() throws Exception {
if (closed.compareAndSet(false, true)) {
closing.set(true);
ServiceStopper stopper = new ServiceStopper();
try {
doStop(stopper);
}
catch (Exception e) {
stopper.onException(this, e);
}
if (runner != null && joinOnStop) {
runner.join();
runner = null;
}
closed.set(true);
started.set(false);
closing.set(false);
stopper.throwFirstException();
}
}
public boolean isStarted() {
return started.get();
}
/**
* @return true if the transport server is in the process of closing down.
*/
public boolean isClosing() {
return closing.get();
}
public boolean isClosed() {
return closed.get();
}
public boolean isDaemon() {
return daemon;
}
@ -113,7 +67,7 @@ public abstract class TransportServerThreadSupport extends TransportServerSuppor
this.joinOnStop = joinOnStop;
}
protected void doStart() {
protected void doStart() throws Exception {
log.info("Listening for connections at: " + getLocation());
runner = new Thread(this, toString());
runner.setDaemon(daemon);
@ -121,5 +75,10 @@ public abstract class TransportServerThreadSupport extends TransportServerSuppor
runner.start();
}
protected abstract void doStop(ServiceStopper stopper) throws Exception;
protected void doStop(ServiceStopper stopper) throws Exception {
if (runner != null && joinOnStop) {
runner.join();
runner = null;
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -28,7 +29,7 @@ import java.io.IOException;
*
* @version $Revision: 1.1 $
*/
public abstract class TransportSupport implements Transport {
public abstract class TransportSupport extends ServiceSupport implements Transport {
private static final Log log = LogFactory.getLog(TransportSupport.class);
private TransportListener transportListener;

View File

@ -19,12 +19,9 @@ package org.apache.activemq.transport;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.util.ServiceStopper;
import java.io.IOException;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* A useful base class for a transport implementation which has a background
* reading thread.
@ -33,40 +30,9 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
*/
public abstract class TransportThreadSupport extends TransportSupport implements Runnable {
private AtomicBoolean closed = new AtomicBoolean(false);
private AtomicBoolean started = new AtomicBoolean(false);
private boolean daemon = false;
private Thread runner;
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
doStart();
}
}
public void stop() throws Exception {
if (closed.compareAndSet(false, true)) {
started.set(false);
ServiceStopper stopper = new ServiceStopper();
try {
doStop(stopper);
}
catch (Exception e) {
stopper.onException(this, e);
}
stopper.throwFirstException();
}
closed.set(true);
}
public boolean isStarted() {
return started.get();
}
public boolean isClosed() {
return closed.get();
}
public boolean isDaemon() {
return daemon;
}
@ -75,15 +41,12 @@ public abstract class TransportThreadSupport extends TransportSupport implements
this.daemon = daemon;
}
protected void doStart() throws Exception {
runner = new Thread(this, toString());
runner.setDaemon(daemon);
runner.start();
}
protected abstract void doStop(ServiceStopper stopper) throws Exception;
protected void checkStarted(Command command) throws IOException {
if (!isStarted()) {
// we might try to shut down the transport before it was ever started in some test cases

View File

@ -90,6 +90,30 @@ public class TcpTransportServer extends TransportServerThreadSupport {
public void setBrokerInfo(BrokerInfo brokerInfo) {
}
public long getMaxInactivityDuration() {
return maxInactivityDuration;
}
public void setMaxInactivityDuration(long maxInactivityDuration) {
this.maxInactivityDuration = maxInactivityDuration;
}
public int getMinmumWireFormatVersion() {
return minmumWireFormatVersion;
}
public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
this.minmumWireFormatVersion = minmumWireFormatVersion;
}
public boolean isTrace() {
return trace;
}
public void setTrace(boolean trace) {
this.trace = trace;
}
/**
* pull Sockets from the ServerSocket
*/
@ -183,32 +207,9 @@ public class TcpTransportServer extends TransportServerThreadSupport {
}
protected void doStop(ServiceStopper stopper) throws Exception {
super.doStop(stopper);
if (serverSocket != null) {
serverSocket.close();
}
}
public long getMaxInactivityDuration() {
return maxInactivityDuration;
}
public void setMaxInactivityDuration(long maxInactivityDuration) {
this.maxInactivityDuration = maxInactivityDuration;
}
public int getMinmumWireFormatVersion() {
return minmumWireFormatVersion;
}
public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
this.minmumWireFormatVersion = minmumWireFormatVersion;
}
public boolean isTrace() {
return trace;
}
public void setTrace(boolean trace) {
this.trace = trace;
}
}

View File

@ -16,18 +16,24 @@
*/
package org.apache.activemq.util;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A helper class for working with services
* A helper class for working with services together with a useful base class for service implementations.
*
* @version $Revision: 1.1 $
*/
public abstract class ServiceSupport {
private static final Log log = LogFactory.getLog(ServiceSupport.class);
private AtomicBoolean closed = new AtomicBoolean(false);
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean closing = new AtomicBoolean(false);
public static void dispose(Service service) {
try {
service.stop();
@ -36,15 +42,53 @@ public abstract class ServiceSupport {
log.error("Could not stop service: " + service + ". Reason: " + e, e);
}
}
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
doStart();
}
}
public void stop() throws Exception {
ServiceStopper stopper = new ServiceStopper();
stop(stopper);
stopper.throwFirstException();
if (closed.compareAndSet(false, true)) {
closing.set(true);
ServiceStopper stopper = new ServiceStopper();
try {
doStop(stopper);
}
catch (Exception e) {
stopper.onException(this, e);
}
closed.set(true);
started.set(false);
closing.set(false);
stopper.throwFirstException();
}
}
/**
* Provides a way for derived classes to stop resources cleanly, handling exceptions
* @return true if this service has been started
*/
protected abstract void stop(ServiceStopper stopper);
public boolean isStarted() {
return started.get();
}
/**
* @return true if this service is in the process of closing
*/
public boolean isClosing() {
return closing.get();
}
/**
* @return true if this service is closed
*/
public boolean isClosed() {
return closed.get();
}
protected abstract void doStop(ServiceStopper stopper) throws Exception;
protected abstract void doStart() throws Exception;
}