minor refactor to make it easier for derivations to expose exceptions on initialization

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@467078 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-10-23 18:48:18 +00:00
parent 7a25bcf203
commit e73f5aabf6
1 changed files with 103 additions and 95 deletions

View File

@ -17,6 +17,16 @@
*/ */
package org.apache.activemq.transport.tcp; package org.apache.activemq.transport.tcp;
import org.apache.activemq.Service;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.net.SocketFactory;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -31,20 +41,9 @@ import java.net.UnknownHostException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import javax.net.SocketFactory;
import org.apache.activemq.Service;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** /**
* An implementation of the {@link Transport} interface using raw tcp/ip * An implementation of the {@link Transport} interface using raw tcp/ip
* *
* @version $Revision$ * @version $Revision$
*/ */
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
@ -65,38 +64,39 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected int minmumWireFormatVersion; protected int minmumWireFormatVersion;
protected SocketFactory socketFactory; protected SocketFactory socketFactory;
private Map socketOptions; private Map socketOptions;
private Boolean keepAlive; private Boolean keepAlive;
private Boolean tcpNoDelay; private Boolean tcpNoDelay;
/** /**
* Connect to a remote Node - e.g. a Broker * Connect to a remote Node - e.g. a Broker
* *
* @param wireFormat * @param wireFormat
* @param socketFactory * @param socketFactory
* @param remoteLocation * @param remoteLocation
* @param localLocation - * @param localLocation -
* e.g. local InetAddress and local port * e.g. local InetAddress and local port
* @throws IOException * @throws IOException
* @throws UnknownHostException * @throws UnknownHostException
*/ */
public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
this.socketFactory = socketFactory; this.socketFactory = socketFactory;
try { try {
this.socket = socketFactory.createSocket(); this.socket = socketFactory.createSocket();
} catch (SocketException e) { }
this.socket = null; catch (SocketException e) {
} this.socket = null;
this.remoteLocation = remoteLocation; }
this.localLocation = localLocation; this.remoteLocation = remoteLocation;
this.localLocation = localLocation;
setDaemon(false); setDaemon(false);
} }
/** /**
* Initialize from a server Socket * Initialize from a server Socket
* *
* @param wireFormat * @param wireFormat
* @param socket * @param socket
* @throws IOException * @throws IOException
@ -104,8 +104,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException { public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
this.wireFormat = wireFormat; this.wireFormat = wireFormat;
this.socket = socket; this.socket = socket;
this.remoteLocation = null; this.remoteLocation = null;
this.localLocation = null; this.localLocation = null;
setDaemon(true); setDaemon(true);
} }
@ -122,7 +122,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* @return pretty print of 'this' * @return pretty print of 'this'
*/ */
public String toString() { public String toString() {
return "tcp://"+socket.getInetAddress()+":"+socket.getPort(); return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
} }
/** /**
@ -132,7 +132,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
log.trace("TCP consumer thread starting"); log.trace("TCP consumer thread starting");
while (!isStopped()) { while (!isStopped()) {
try { try {
Object command = wireFormat.unmarshal(dataIn); Object command = readCommand();
doConsume(command); doConsume(command);
} }
catch (SocketTimeoutException e) { catch (SocketTimeoutException e) {
@ -151,6 +151,10 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
} }
} }
protected Object readCommand() throws IOException {
return wireFormat.unmarshal(dataIn);
}
// Properties // Properties
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@ -182,7 +186,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
public void setUseLocalHost(boolean useLocalHost) { public void setUseLocalHost(boolean useLocalHost) {
this.useLocalHost = useLocalHost; this.useLocalHost = useLocalHost;
} }
public int getSocketBufferSize() { public int getSocketBufferSize() {
return socketBufferSize; return socketBufferSize;
} }
@ -253,15 +257,15 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
/** /**
* Configures the socket for use * Configures the socket for use
* *
* @param sock * @param sock
* @throws SocketException * @throws SocketException
*/ */
protected void initialiseSocket(Socket sock) throws SocketException { protected void initialiseSocket(Socket sock) throws SocketException {
if( socketOptions != null ) { if (socketOptions != null) {
IntrospectionSupport.setProperties(socket, socketOptions); IntrospectionSupport.setProperties(socket, socketOptions);
} }
try { try {
sock.setReceiveBufferSize(socketBufferSize); sock.setReceiveBufferSize(socketBufferSize);
sock.setSendBufferSize(socketBufferSize); sock.setSendBufferSize(socketBufferSize);
@ -271,7 +275,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
log.debug("Cannot set socket buffer size. Reason: " + se, se); log.debug("Cannot set socket buffer size. Reason: " + se, se);
} }
sock.setSoTimeout(soTimeout); sock.setSoTimeout(soTimeout);
if (keepAlive != null) { if (keepAlive != null) {
sock.setKeepAlive(keepAlive.booleanValue()); sock.setKeepAlive(keepAlive.booleanValue());
} }
@ -285,66 +289,70 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
super.doStart(); super.doStart();
} }
protected void connect() throws SocketException, IOException { protected void connect() throws Exception {
if( socket == null && socketFactory == null ) {
throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
}
InetSocketAddress localAddress=null;
InetSocketAddress remoteAddress=null;
if( localLocation!=null ) {
localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
}
if( remoteLocation!=null ) {
String host = resolveHostName(remoteLocation.getHost());
remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
}
if( socket!=null ) {
if( localAddress!=null )
socket.bind(localAddress);
// If it's a server accepted socket.. we don't need to connect it
// to a remote address.
if ( remoteAddress!=null ) {
if (connectionTimeout >= 0) {
socket.connect(remoteAddress, connectionTimeout);
} else {
socket.connect(remoteAddress);
}
}
} else {
// For SSL sockets.. you can't create an unconnected socket :(
// This means the timout option are not supported either.
if( localAddress!=null ) {
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
} else {
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
}
}
initialiseSocket(socket);
initializeStreams();
}
protected void doStop(ServiceStopper stopper) throws Exception { if (socket == null && socketFactory == null) {
// Closing the streams flush the sockets before closing.. if the socket throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
// is hung.. then this hangs the close. }
InetSocketAddress localAddress = null;
InetSocketAddress remoteAddress = null;
if (localLocation != null) {
localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
}
if (remoteLocation != null) {
String host = resolveHostName(remoteLocation.getHost());
remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
}
if (socket != null) {
if (localAddress != null) {
socket.bind(localAddress);
}
// If it's a server accepted socket.. we don't need to connect it
// to a remote address.
if (remoteAddress != null) {
if (connectionTimeout >= 0) {
socket.connect(remoteAddress, connectionTimeout);
}
else {
socket.connect(remoteAddress);
}
}
}
else {
// For SSL sockets.. you can't create an unconnected socket :(
// This means the timout option are not supported either.
if (localAddress != null) {
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
}
else {
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
}
}
initialiseSocket(socket);
initializeStreams();
}
protected void doStop(ServiceStopper stopper) throws Exception {
// Closing the streams flush the sockets before closing.. if the socket
// is hung.. then this hangs the close.
// closeStreams(); // closeStreams();
if (socket != null) { if (socket != null) {
socket.close(); socket.close();
} }
} }
protected void initializeStreams() throws IOException { protected void initializeStreams() throws Exception {
TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), 8*1024); TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024);
this.dataIn = new DataInputStream(buffIn); this.dataIn = new DataInputStream(buffIn);
TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), 16*1024); TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024);
this.dataOut = new DataOutputStream(buffOut); this.dataOut = new DataOutputStream(buffOut);
} }
@ -358,13 +366,13 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
} }
public void setSocketOptions(Map socketOptions) { public void setSocketOptions(Map socketOptions) {
this.socketOptions = new HashMap(socketOptions); this.socketOptions = new HashMap(socketOptions);
} }
public String getRemoteAddress() { public String getRemoteAddress() {
if(socket != null){ if (socket != null) {
return "" + socket.getRemoteSocketAddress(); return "" + socket.getRemoteSocketAddress();
} }
return null; return null;
} }
} }