Cleaned up TCP transport a little.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@418548 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-07-02 04:08:53 +00:00
parent e423bc58a0
commit b4b8499be5
6 changed files with 95 additions and 86 deletions

View File

@ -198,13 +198,34 @@ public abstract class TransportFactory {
return "default";
}
protected Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
IntrospectionSupport.setProperties(transport, options);
/**
* Fully configures and adds all need transport filters so that the transport
* can be used by the JMS client.
*
* @param transport
* @param wf
* @param options
* @return
* @throws Exception
*/
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
transport = compositeConfigure(transport, wf, options);
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
}
/**
* Similar to configure(...) but this avoid adding in the MutexTransport and ResponseCorrelator transport layers
* so that the resulting transport can more efficiently be used as part of a composite transport.
*
* @param transport
* @param format
* @param options
* @return
*/
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
return transport;

View File

@ -21,7 +21,7 @@ import org.apache.activemq.command.Response;
*/
public class TransportFilter implements TransportListener,Transport{
final protected Transport next;
private TransportListener transportListener;
protected TransportListener transportListener;
public TransportFilter(Transport next){
this.next=next;

View File

@ -23,13 +23,14 @@ import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Map;
import javax.net.SocketFactory;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
@ -40,8 +41,6 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.net.SocketFactory;
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
*
@ -60,10 +59,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
private boolean trace;
private boolean useLocalHost = true;
private int minmumWireFormatVersion;
private InetSocketAddress socketAddress;
private Map socketOptions;
private InetSocketAddress remoteAddress;
private InetSocketAddress localAddress;
/**
* Construct basic helpers
@ -74,19 +71,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
this.wireFormat = wireFormat;
}
/**
* Connect to a remote Node - e.g. a Broker
*
* @param wireFormat
* @param remoteLocation
* @throws IOException
* @throws UnknownHostException
*/
public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException {
this(wireFormat);
this.socket = createSocket(socketFactory, remoteLocation);
}
/**
* Connect to a remote Node - e.g. a Broker
*
@ -231,36 +215,22 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* Factory method to create a new socket
*
* @param remoteLocation
* the URI to connect to
* @return the newly created socket
* @throws UnknownHostException
* @throws IOException
*/
protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException {
String host = resolveHostName(remoteLocation.getHost());
socketAddress = new InetSocketAddress(host, remoteLocation.getPort());
Socket sock = socketFactory.createSocket();
return sock;
}
/**
* Factory method to create a new socket
*
* @param remoteLocation
* @param localLocation
* @param localLocation ignored if null
* @return
* @throws IOException
* @throws IOException
* @throws UnknownHostException
*/
protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
String host = resolveHostName(remoteLocation.getHost());
SocketAddress sockAddress = new InetSocketAddress(host, remoteLocation.getPort());
SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
if( localLocation!=null ) {
localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
}
Socket sock = socketFactory.createSocket();
initialiseSocket(sock);
sock.bind(localAddress);
sock.connect(sockAddress);
return sock;
}
@ -293,12 +263,15 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected void doStart() throws Exception {
initialiseSocket(socket);
if (socketAddress != null) {
if( localAddress!=null ) {
socket.bind(localAddress);
}
if (remoteAddress != null) {
if (connectionTimeout >= 0) {
socket.connect(socketAddress, connectionTimeout);
socket.connect(remoteAddress, connectionTimeout);
}
else {
socket.connect(socketAddress);
socket.connect(remoteAddress);
}
}
initializeStreams();

View File

@ -29,8 +29,6 @@ import javax.net.SocketFactory;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLogger;
@ -49,7 +47,7 @@ public class TcpTransportFactory extends TransportFactory {
Map options = new HashMap(URISupport.parseParamters(location));
ServerSocketFactory serverSocketFactory = createServerSocketFactory();
TcpTransportServer server = new TcpTransportServer(location, serverSocketFactory);
TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
server.setWireFormatFactory(createWireFormatFactory(options));
IntrospectionSupport.setProperties(server, options);
Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
@ -62,31 +60,24 @@ public class TcpTransportFactory extends TransportFactory {
}
}
public Transport configure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
TcpTransport tcpTransport = (TcpTransport) transport;
Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
tcpTransport.setSocketOptions(socketOptions);
if (tcpTransport.isTrace()) {
transport = new TransportLogger(transport);
}
transport = new InactivityMonitor(transport);
// Only need the OpenWireFormat if using openwire
if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
}
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
/**
* Allows subclasses of TcpTransportFactory to create custom instances of TcpTransportServer.
*
* @param location
* @param serverSocketFactory
* @return
* @throws IOException
* @throws URISyntaxException
*/
protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory);
}
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
TcpTransport tcpTransport = (TcpTransport) transport;
TcpTransport tcpTransport = (TcpTransport) transport.narrow(TcpTransport.class);
IntrospectionSupport.setProperties(tcpTransport, options);
Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
tcpTransport.setSocketOptions(socketOptions);
@ -96,7 +87,7 @@ public class TcpTransportFactory extends TransportFactory {
transport = new InactivityMonitor(transport);
// Only need the OpenWireFormat if using openwire
// Only need the WireFormatNegotiator if using openwire
if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
}
@ -120,10 +111,22 @@ public class TcpTransportFactory extends TransportFactory {
}
}
SocketFactory socketFactory = createSocketFactory();
if (localLocation != null) {
return new TcpTransport(wf, socketFactory, location, localLocation);
return createTcpTransport(wf, socketFactory, location, localLocation);
}
return new TcpTransport(wf, socketFactory, location);
/**
* Allows subclasses of TcpTransportFactory to provide a create custom TcpTransport intances.
*
* @param location
* @param wf
* @param socketFactory
* @param localLocation
* @return
* @throws UnknownHostException
* @throws IOException
*/
private TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new TcpTransport(wf, socketFactory, location, localLocation);
}
protected ServerSocketFactory createServerSocketFactory() {

View File

@ -52,16 +52,17 @@ public class TcpTransportServer extends TransportServerThreadSupport {
private ServerSocket serverSocket;
private int backlog = 5000;
private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
private TcpTransportFactory transportFactory = new TcpTransportFactory();
private final TcpTransportFactory transportFactory;
private long maxInactivityDuration = 30000;
private int minmumWireFormatVersion;
private boolean trace;
private Map transportOptions;
public TcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
super(location);
serverSocket = createServerSocket(location, serverSocketFactory);
serverSocket.setSoTimeout(2000);
this.transportFactory=transportFactory;
this.serverSocket = createServerSocket(location, serverSocketFactory);
this.serverSocket.setSoTimeout(2000);
updatePhysicalUri(location);
}
@ -132,7 +133,7 @@ public class TcpTransportServer extends TransportServerThreadSupport {
options.put("trace", new Boolean(trace));
options.putAll(transportOptions);
WireFormat format = wireFormatFactory.createWireFormat();
TcpTransport transport = new TcpTransport(format, socket);
Transport transport = createTransport(socket, format);
Transport configuredTransport = transportFactory.configure(transport, format, options);
getAcceptListener().onAccept(configuredTransport);
}
@ -152,6 +153,17 @@ public class TcpTransportServer extends TransportServerThreadSupport {
}
}
/**
* Allow derived classes to override the Transport implementation that this transport server creates.
* @param socket
* @param format
* @return
* @throws IOException
*/
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new TcpTransport(format, socket);
}
/**
* @return pretty print of this
*/

View File

@ -148,7 +148,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
//
// Manually create a client transport so that it does not send KeepAlive packets.
// this should simulate a client hang.
clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"));
clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"), null);
clientTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
clientReceiveCount.incrementAndGet();