minor refactor to use "stopped" and "stopping" as properties on ServiceSupport and the transport classes. Also made the TransportListener getter visible and avoided the need for protected variables in the various TransportFilter implementations (due to inlining, often getter methods are faster in hotspot than access to protected fields, plus it simplifies the code & makes it less brittle)

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384220 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-08 14:10:27 +00:00
parent 11be076ac9
commit 436fe42778
16 changed files with 75 additions and 61 deletions

View File

@ -112,7 +112,7 @@ public class InactivityMonitor extends TransportFilter implements Runnable {
public void onCommand(Command command) {
inReceive.set(true);
try {
commandListener.onCommand(command);
getTransportListener().onCommand(command);
} finally {
inReceive.set(false);
commandReceived.set(true);
@ -132,6 +132,6 @@ public class InactivityMonitor extends TransportFilter implements Runnable {
public void onException(IOException error) {
Scheduler.cancel(this);
commandListener.onException(error);
getTransportListener().onException(error);
}
}

View File

@ -38,9 +38,9 @@ public class MarshallingTransportFilter extends TransportFilter {
public void onCommand(Command command) {
try {
commandListener.onCommand((Command)localWireFormat.unmarshal(remoteWireFormat.marshal(command)));
getTransportListener().onCommand((Command)localWireFormat.unmarshal(remoteWireFormat.marshal(command)));
} catch (IOException e) {
commandListener.onException(e);
getTransportListener().onException(e);
}
}

View File

@ -79,7 +79,7 @@ final public class ResponseCorrelator extends TransportFilter {
if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId());
}
} else {
commandListener.onCommand(command);
getTransportListener().onCommand(command);
}
}

View File

@ -46,6 +46,11 @@ public interface Transport extends Service {
*/
public Response request(Command command) throws IOException;
/**
* Returns the current transport listener
*/
public TransportListener getTransportListener();
/**
* Registers an inbound command listener
*/

View File

@ -28,16 +28,18 @@ import org.apache.activemq.command.Response;
public class TransportFilter extends DefaultTransportListener implements Transport {
final protected Transport next;
protected TransportListener commandListener;
private TransportListener transportListener;
public TransportFilter(Transport next) {
this.next = next;
}
/**
*/
public TransportListener getTransportListener() {
return transportListener;
}
public void setTransportListener(TransportListener channelListener) {
this.commandListener = channelListener;
this.transportListener = channelListener;
if (channelListener == null)
next.setTransportListener(null);
else
@ -52,7 +54,7 @@ public class TransportFilter extends DefaultTransportListener implements Transpo
public void start() throws Exception {
if( next == null )
throw new IOException("The next channel has not been set.");
if( commandListener == null )
if( transportListener == null )
throw new IOException("The command listener has not been set.");
next.start();
}
@ -65,7 +67,7 @@ public class TransportFilter extends DefaultTransportListener implements Transpo
}
public void onCommand(Command command) {
commandListener.onCommand(command);
transportListener.onCommand(command);
}
/**
@ -75,12 +77,6 @@ public class TransportFilter extends DefaultTransportListener implements Transpo
return next;
}
/**
* @return Returns the packetListener.
*/
public TransportListener getCommandListener() {
return commandListener;
}
public String toString() {
return next.toString();
@ -99,7 +95,7 @@ public class TransportFilter extends DefaultTransportListener implements Transpo
}
public void onException(IOException error) {
commandListener.onException(error);
transportListener.onException(error);
}
public Object narrow(Class target) {

View File

@ -55,14 +55,14 @@ public class TransportLogger extends TransportFilter {
if( log.isDebugEnabled() ) {
log.debug("RECEIVED: "+command);
}
commandListener.onCommand(command);
getTransportListener().onCommand(command);
}
public void onException(IOException error) {
if( log.isDebugEnabled() ) {
log.debug("RECEIVED Exception: "+error, error);
}
commandListener.onException(error);
getTransportListener().onException(error);
}
public String toString() {

View File

@ -17,7 +17,9 @@
package org.apache.activemq.transport;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -32,8 +34,11 @@ import java.io.IOException;
public abstract class TransportSupport extends ServiceSupport implements Transport {
private static final Log log = LogFactory.getLog(TransportSupport.class);
private TransportListener transportListener;
TransportListener transportListener;
/**
* Returns the current transport listener
*/
public TransportListener getTransportListener() {
return transportListener;
}
@ -92,4 +97,13 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo
}
}
protected void checkStarted(Command command) throws IOException {
if (!isStarted()) {
// we might try to shut down the transport before it was ever started in some test cases
if (!(command instanceof ShutdownInfo || command instanceof RemoveInfo)) {
throw new IOException("The transport " + this + " of type: " + getClass().getName() + " is not running.");
}
}
}
}

View File

@ -16,11 +16,7 @@
*/
package org.apache.activemq.transport;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.ShutdownInfo;
import java.io.IOException;
/**
* A useful base class for a transport implementation which has a background
@ -46,13 +42,4 @@ public abstract class TransportThreadSupport extends TransportSupport implements
runner.setDaemon(daemon);
runner.start();
}
protected void checkStarted(Command command) throws IOException {
if (!isStarted()) {
// we might try to shut down the transport before it was ever started in some test cases
if (!(command instanceof ShutdownInfo || command instanceof RemoveInfo)) {
throw new IOException("The transport " + this + " of type: " + getClass().getName() + " is not running.");
}
}
}
}

View File

@ -90,9 +90,9 @@ public class WireFormatNegotiator extends TransportFilter {
}
if( !info.isValid() ) {
commandListener.onException(new IOException("Remote wire format magic is invalid"));
getTransportListener().onException(new IOException("Remote wire format magic is invalid"));
} else if( info.getVersion() < minimumVersion ) {
commandListener.onException(new IOException("Remote wire format ("+info.getVersion()+") is lower the minimum version required ("+minimumVersion+")"));
getTransportListener().onException(new IOException("Remote wire format ("+info.getVersion()+") is lower the minimum version required ("+minimumVersion+")"));
} else if ( info.getVersion()!=wireFormat.getVersion() ) {
// Match the remote side.
wireFormat.setVersion(info.getVersion());
@ -115,14 +115,14 @@ public class WireFormatNegotiator extends TransportFilter {
((OpenWireFormat)wireFormat).setTightEncodingEnabled(false);
}
} catch (IOException e) {
commandListener.onException(e);
getTransportListener().onException(e);
}
}
readyCountDownLatch.countDown();
}
commandListener.onCommand(command);
getTransportListener().onCommand(command);
}
public String toString() {

View File

@ -443,6 +443,10 @@ public class FailoverTransport implements CompositeTransport {
return l;
}
public TransportListener getTransportListener() {
return transportListener;
}
public void setTransportListener(TransportListener commandListener) {
this.transportListener = commandListener;
}

View File

@ -428,6 +428,10 @@ public class FanoutTransport implements CompositeTransport {
}
}
public TransportListener getTransportListener() {
return transportListener;
}
public void setTransportListener(TransportListener commandListener) {
this.transportListener = commandListener;
}

View File

@ -33,7 +33,7 @@ import org.apache.activemq.transport.TransportListener;
public class MockTransport extends DefaultTransportListener implements Transport {
protected Transport next;
protected TransportListener commandListener;
protected TransportListener transportListener;
public MockTransport(Transport next) {
this.next = next;
@ -42,7 +42,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
/**
*/
synchronized public void setTransportListener(TransportListener channelListener) {
this.commandListener = channelListener;
this.transportListener = channelListener;
if (channelListener == null)
next.setTransportListener(null);
else
@ -57,7 +57,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
public void start() throws Exception {
if( next == null )
throw new IOException("The next channel has not been set.");
if( commandListener == null )
if( transportListener == null )
throw new IOException("The command listener has not been set.");
next.start();
}
@ -70,7 +70,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
}
synchronized public void onCommand(Command command) {
commandListener.onCommand(command);
transportListener.onCommand(command);
}
/**
@ -83,8 +83,8 @@ public class MockTransport extends DefaultTransportListener implements Transport
/**
* @return Returns the packetListener.
*/
synchronized public TransportListener getCommandListener() {
return commandListener;
synchronized public TransportListener getTransportListener() {
return transportListener;
}
synchronized public String toString() {
@ -104,7 +104,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
}
synchronized public void onException(IOException error) {
commandListener.onException(error);
transportListener.onException(error);
}
synchronized public Object narrow(Class target) {

View File

@ -131,7 +131,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
*/
public void run() {
log.trace("TCP consumer thread starting");
while (!isClosed()) {
while (!isStopped()) {
try {
Command command = (Command) wireFormat.unmarshal(dataIn);
doConsume(command);

View File

@ -118,12 +118,12 @@ public class TcpTransportServer extends TransportServerThreadSupport {
* pull Sockets from the ServerSocket
*/
public void run() {
while (!isClosed()) {
while (!isStopped()) {
Socket socket = null;
try {
socket = serverSocket.accept();
if (socket != null) {
if (isClosed() || getAcceptListener() == null) {
if (isStopped() || getAcceptListener() == null) {
socket.close();
}
else {
@ -142,9 +142,9 @@ public class TcpTransportServer extends TransportServerThreadSupport {
// expect this to happen
}
catch (Exception e) {
if (!isClosing()) {
if (!isStopping()) {
onAcceptError(e);
} else if (!isClosed()) {
} else if (!isStopped()) {
log.warn("run()", e);
onAcceptError(e);
}

View File

@ -87,6 +87,10 @@ public class VMTransport implements Transport{
throw new AssertionError("Unsupported Method");
}
public synchronized TransportListener getTransportListener() {
return transportListener;
}
synchronized public void setTransportListener(TransportListener commandListener){
this.transportListener=commandListener;
}

View File

@ -30,9 +30,9 @@ import org.apache.commons.logging.LogFactory;
public abstract class ServiceSupport {
private static final Log log = LogFactory.getLog(ServiceSupport.class);
private AtomicBoolean closed = new AtomicBoolean(false);
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean closing = new AtomicBoolean(false);
private AtomicBoolean stopping = new AtomicBoolean(false);
private AtomicBoolean stopped = new AtomicBoolean(false);
public static void dispose(Service service) {
try {
@ -50,8 +50,8 @@ public abstract class ServiceSupport {
}
public void stop() throws Exception {
if (closed.compareAndSet(false, true)) {
closing.set(true);
if (stopped.compareAndSet(false, true)) {
stopping.set(true);
ServiceStopper stopper = new ServiceStopper();
try {
doStop(stopper);
@ -59,9 +59,9 @@ public abstract class ServiceSupport {
catch (Exception e) {
stopper.onException(this, e);
}
closed.set(true);
stopped.set(true);
started.set(false);
closing.set(false);
stopping.set(false);
stopper.throwFirstException();
}
}
@ -76,16 +76,16 @@ public abstract class ServiceSupport {
/**
* @return true if this service is in the process of closing
*/
public boolean isClosing() {
return closing.get();
public boolean isStopping() {
return stopping.get();
}
/**
* @return true if this service is closed
*/
public boolean isClosed() {
return closed.get();
public boolean isStopped() {
return stopped.get();
}
protected abstract void doStop(ServiceStopper stopper) throws Exception;