diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index f432e7b46e..f8043f25f3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -28,12 +28,14 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.URI; import java.net.UnknownHostException; +import java.util.Map; import org.apache.activeio.command.WireFormat; import org.apache.activemq.Service; import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportThreadSupport; +import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,6 +62,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S private int minmumWireFormatVersion; private InetSocketAddress socketAddress; + private Map socketOptions; + /** * Construct basic helpers @@ -323,4 +327,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S dataIn.close(); } } + + public void setSocketOptions(Map socketOptions) { + IntrospectionSupport.setProperties(socket, socketOptions); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index a71678ae36..71ad5189a5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -52,7 +52,9 @@ public class TcpTransportFactory extends TransportFactory { TcpTransportServer server = new TcpTransportServer(location, serverSocketFactory); server.setWireFormatFactory(createWireFormatFactory(options)); IntrospectionSupport.setProperties(server, options); - + Map transportOptions = IntrospectionSupport.extractProperties(options, "transport."); + server.setTransportOption(transportOptions); + return server; } catch (URISyntaxException e) { @@ -63,6 +65,9 @@ public class TcpTransportFactory extends TransportFactory { public Transport configure(Transport transport, WireFormat format, Map options) { IntrospectionSupport.setProperties(transport, options); TcpTransport tcpTransport = (TcpTransport) transport; + Map socketOptions = IntrospectionSupport.extractProperties(options, "socket."); + tcpTransport.setSocketOptions(socketOptions); + if (tcpTransport.isTrace()) { transport = new TransportLogger(transport); } @@ -82,6 +87,9 @@ public class TcpTransportFactory extends TransportFactory { public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { IntrospectionSupport.setProperties(transport, options); TcpTransport tcpTransport = (TcpTransport) transport; + Map socketOptions = IntrospectionSupport.extractProperties(options, "socket."); + tcpTransport.setSocketOptions(socketOptions); + if (tcpTransport.isTrace()) { transport = new TransportLogger(transport); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index f8e5e2671d..35880459e5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -26,6 +26,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.HashMap; +import java.util.Map; import org.apache.activeio.command.WireFormat; import org.apache.activeio.command.WireFormatFactory; @@ -55,6 +56,7 @@ public class TcpTransportServer extends TransportServerThreadSupport { private long maxInactivityDuration = 30000; private int minmumWireFormatVersion; private boolean trace; + private Map transportOptions; public TcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { super(location); @@ -128,6 +130,7 @@ public class TcpTransportServer extends TransportServerThreadSupport { options.put("maxInactivityDuration", new Long(maxInactivityDuration)); options.put("minmumWireFormatVersion", new Integer(minmumWireFormatVersion)); options.put("trace", new Boolean(trace)); + options.putAll(transportOptions); WireFormat format = wireFormatFactory.createWireFormat(); TcpTransport transport = new TcpTransport(format, socket); Transport configuredTransport = transportFactory.configure(transport, format, options); @@ -213,4 +216,8 @@ public class TcpTransportServer extends TransportServerThreadSupport { public InetSocketAddress getSocketAddress() { return (InetSocketAddress)serverSocket.getLocalSocketAddress(); } + + public void setTransportOption(Map transportOptions) { + this.transportOptions = transportOptions; + } }