diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java index f74277c89e..5e1f872483 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java @@ -198,13 +198,34 @@ public abstract class TransportFactory { return "default"; } - protected Transport configure(Transport transport, WireFormat wf, Map options) throws Exception { - IntrospectionSupport.setProperties(transport, options); + /** + * Fully configures and adds all need transport filters so that the transport + * can be used by the JMS client. + * + * @param transport + * @param wf + * @param options + * @return + * @throws Exception + */ + public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception { + transport = compositeConfigure(transport, wf, options); + transport = new MutexTransport(transport); transport = new ResponseCorrelator(transport); + return transport; } + /** + * Similar to configure(...) but this avoid adding in the MutexTransport and ResponseCorrelator transport layers + * so that the resulting transport can more efficiently be used as part of a composite transport. + * + * @param transport + * @param format + * @param options + * @return + */ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { IntrospectionSupport.setProperties(transport, options); return transport; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java index 8627d07fe3..92f29f2afd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java @@ -21,7 +21,7 @@ import org.apache.activemq.command.Response; */ public class TransportFilter implements TransportListener,Transport{ final protected Transport next; - private TransportListener transportListener; + protected TransportListener transportListener; public TransportFilter(Transport next){ this.next=next; diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index f8043f25f3..8dafe45801 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -23,13 +23,14 @@ import java.io.InterruptedIOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; -import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.URI; import java.net.UnknownHostException; import java.util.Map; +import javax.net.SocketFactory; + import org.apache.activeio.command.WireFormat; import org.apache.activemq.Service; import org.apache.activemq.command.Command; @@ -40,8 +41,6 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import javax.net.SocketFactory; - /** * An implementation of the {@link Transport} interface using raw tcp/ip * @@ -60,10 +59,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S private boolean trace; private boolean useLocalHost = true; private int minmumWireFormatVersion; - private InetSocketAddress socketAddress; - - private Map socketOptions; - + private InetSocketAddress remoteAddress; + private InetSocketAddress localAddress; /** * Construct basic helpers @@ -74,19 +71,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S this.wireFormat = wireFormat; } - /** - * Connect to a remote Node - e.g. a Broker - * - * @param wireFormat - * @param remoteLocation - * @throws IOException - * @throws UnknownHostException - */ - public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException { - this(wireFormat); - this.socket = createSocket(socketFactory, remoteLocation); - } - /** * Connect to a remote Node - e.g. a Broker * @@ -231,36 +215,22 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S * Factory method to create a new socket * * @param remoteLocation - * the URI to connect to - * @return the newly created socket - * @throws UnknownHostException - * @throws IOException - */ - protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException { - String host = resolveHostName(remoteLocation.getHost()); - socketAddress = new InetSocketAddress(host, remoteLocation.getPort()); - Socket sock = socketFactory.createSocket(); - return sock; - } - - /** - * Factory method to create a new socket - * - * @param remoteLocation - * @param localLocation + * @param localLocation ignored if null * @return * @throws IOException * @throws IOException * @throws UnknownHostException */ protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException { + String host = resolveHostName(remoteLocation.getHost()); - SocketAddress sockAddress = new InetSocketAddress(host, remoteLocation.getPort()); - SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); - Socket sock = socketFactory.createSocket(); - initialiseSocket(sock); - sock.bind(localAddress); - sock.connect(sockAddress); + remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); + + if( localLocation!=null ) { + localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); + } + + Socket sock = socketFactory.createSocket(); return sock; } @@ -293,12 +263,15 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S protected void doStart() throws Exception { initialiseSocket(socket); - if (socketAddress != null) { + if( localAddress!=null ) { + socket.bind(localAddress); + } + if (remoteAddress != null) { if (connectionTimeout >= 0) { - socket.connect(socketAddress, connectionTimeout); + socket.connect(remoteAddress, connectionTimeout); } else { - socket.connect(socketAddress); + socket.connect(remoteAddress); } } initializeStreams(); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index 71ad5189a5..42ca71447e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -29,8 +29,6 @@ import javax.net.SocketFactory; import org.apache.activeio.command.WireFormat; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.transport.InactivityMonitor; -import org.apache.activemq.transport.MutexTransport; -import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportLogger; @@ -49,7 +47,7 @@ public class TcpTransportFactory extends TransportFactory { Map options = new HashMap(URISupport.parseParamters(location)); ServerSocketFactory serverSocketFactory = createServerSocketFactory(); - TcpTransportServer server = new TcpTransportServer(location, serverSocketFactory); + TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); server.setWireFormatFactory(createWireFormatFactory(options)); IntrospectionSupport.setProperties(server, options); Map transportOptions = IntrospectionSupport.extractProperties(options, "transport."); @@ -62,31 +60,24 @@ public class TcpTransportFactory extends TransportFactory { } } - public Transport configure(Transport transport, WireFormat format, Map options) { - IntrospectionSupport.setProperties(transport, options); - TcpTransport tcpTransport = (TcpTransport) transport; - Map socketOptions = IntrospectionSupport.extractProperties(options, "socket."); - tcpTransport.setSocketOptions(socketOptions); - - if (tcpTransport.isTrace()) { - transport = new TransportLogger(transport); - } - - transport = new InactivityMonitor(transport); - - // Only need the OpenWireFormat if using openwire - if( format instanceof OpenWireFormat ) { - transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); - } - - transport = new MutexTransport(transport); - transport = new ResponseCorrelator(transport); - return transport; - } + /** + * Allows subclasses of TcpTransportFactory to create custom instances of TcpTransportServer. + * + * @param location + * @param serverSocketFactory + * @return + * @throws IOException + * @throws URISyntaxException + */ + protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new TcpTransportServer(this, location, serverSocketFactory); + } public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { - IntrospectionSupport.setProperties(transport, options); - TcpTransport tcpTransport = (TcpTransport) transport; + + TcpTransport tcpTransport = (TcpTransport) transport.narrow(TcpTransport.class); + IntrospectionSupport.setProperties(tcpTransport, options); + Map socketOptions = IntrospectionSupport.extractProperties(options, "socket."); tcpTransport.setSocketOptions(socketOptions); @@ -96,7 +87,7 @@ public class TcpTransportFactory extends TransportFactory { transport = new InactivityMonitor(transport); - // Only need the OpenWireFormat if using openwire + // Only need the WireFormatNegotiator if using openwire if( format instanceof OpenWireFormat ) { transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion()); } @@ -120,12 +111,24 @@ public class TcpTransportFactory extends TransportFactory { } } SocketFactory socketFactory = createSocketFactory(); - if (localLocation != null) { - return new TcpTransport(wf, socketFactory, location, localLocation); - } - return new TcpTransport(wf, socketFactory, location); + return createTcpTransport(wf, socketFactory, location, localLocation); } + /** + * Allows subclasses of TcpTransportFactory to provide a create custom TcpTransport intances. + * + * @param location + * @param wf + * @param socketFactory + * @param localLocation + * @return + * @throws UnknownHostException + * @throws IOException + */ + private TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { + return new TcpTransport(wf, socketFactory, location, localLocation); + } + protected ServerSocketFactory createServerSocketFactory() { return ServerSocketFactory.getDefault(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index 2109c11752..eb6e19ec15 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -52,16 +52,17 @@ public class TcpTransportServer extends TransportServerThreadSupport { private ServerSocket serverSocket; private int backlog = 5000; private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); - private TcpTransportFactory transportFactory = new TcpTransportFactory(); + private final TcpTransportFactory transportFactory; private long maxInactivityDuration = 30000; private int minmumWireFormatVersion; private boolean trace; private Map transportOptions; - public TcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { super(location); - serverSocket = createServerSocket(location, serverSocketFactory); - serverSocket.setSoTimeout(2000); + this.transportFactory=transportFactory; + this.serverSocket = createServerSocket(location, serverSocketFactory); + this.serverSocket.setSoTimeout(2000); updatePhysicalUri(location); } @@ -132,7 +133,7 @@ public class TcpTransportServer extends TransportServerThreadSupport { options.put("trace", new Boolean(trace)); options.putAll(transportOptions); WireFormat format = wireFormatFactory.createWireFormat(); - TcpTransport transport = new TcpTransport(format, socket); + Transport transport = createTransport(socket, format); Transport configuredTransport = transportFactory.configure(transport, format, options); getAcceptListener().onAccept(configuredTransport); } @@ -152,6 +153,17 @@ public class TcpTransportServer extends TransportServerThreadSupport { } } + /** + * Allow derived classes to override the Transport implementation that this transport server creates. + * @param socket + * @param format + * @return + * @throws IOException + */ + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + return new TcpTransport(format, socket); + } + /** * @return pretty print of this */ diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java deleted file mode 100644 index 26c150c5f9..0000000000 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompWireFormatTest.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * - * Copyright 2005-2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.stomp; - -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.*; -import org.apache.activemq.transport.stomp.Stomp; -import org.apache.activemq.transport.stomp.StompWireFormat; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import javax.jms.JMSException; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import junit.framework.TestCase; - -public class StompWireFormatTest extends TestCase { - - protected static final Log log = LogFactory.getLog(StompWireFormatTest.class); - - private StompWireFormat wire; - - public void setUp() throws Exception { - wire = new StompWireFormat(); - } - - public void testValidConnectHandshake() throws Exception { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - DataOutputStream dout = new DataOutputStream(bout); - - ConnectionInfo ci = (ConnectionInfo) parseCommand("CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL); - assertNotNull(ci); - assertTrue(ci.isResponseRequired()); - - Response cr = new Response(); - cr.setCorrelationId(ci.getCommandId()); - - String response = writeCommand(cr); - log.info("Received: " + response); - - SessionInfo si = (SessionInfo) wire.readCommand(null); - assertNotNull(si); - assertTrue(!si.isResponseRequired()); - - ProducerInfo pi = (ProducerInfo) wire.readCommand(null); - assertNotNull(pi); - assertTrue(pi.isResponseRequired()); - - Response sr = new Response(); - sr.setCorrelationId(pi.getCommandId()); - response = writeCommand(sr); - log.info("Received: " + response); - assertTrue("Response should start with CONNECTED: " + response, response.startsWith("CONNECTED")); - - // now lets test subscribe - ConsumerInfo consumerInfo = (ConsumerInfo) parseCommand("SUBSCRIBE\n" + "destination: /queue/foo\n" + "ack: client\n" + "activemq.prefetchSize: 1\n" - + "\n" + Stomp.NULL); - assertNotNull(consumerInfo); - // assertTrue(consumerInfo.isResponseRequired()); - assertEquals("prefetch size", 1, consumerInfo.getPrefetchSize()); - - cr = new Response(); - cr.setCorrelationId(consumerInfo.getCommandId()); - response = writeCommand(cr); - log.info("Received: " + response); - } - - public void _testFakeServer() throws Exception { - final BrokerService container = new BrokerService(); - new Thread(new Runnable() { - public void run() { - try { - container.addConnector("stomp://localhost:61613"); - container.start(); - } - catch (Exception e) { - System.err.println("ARGH: caught: " + e); - e.printStackTrace(); - } - } - }).start(); - System.err.println("started container"); - System.err.println("okay, go play"); - - System.err.println(System.in.read()); - } - - protected Command parseCommand(String connect_frame) throws IOException, JMSException { - DataInputStream din = new DataInputStream(new ByteArrayInputStream(connect_frame.getBytes())); - - return wire.readCommand(din); - } - - protected String writeCommand(Command command) throws IOException, JMSException { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - DataOutputStream dout = new DataOutputStream(bout); - wire.writeCommand(command, dout); - return new String(bout.toByteArray()); - } - -} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java index 607dbf6758..c9ab3a307d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java @@ -148,7 +148,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra // // Manually create a client transport so that it does not send KeepAlive packets. // this should simulate a client hang. - clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616")); + clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"), null); clientTransport.setTransportListener(new TransportListener() { public void onCommand(Command command) { clientReceiveCount.incrementAndGet();