From 24404740a16d6b7d8afd28845cf30c35abf7406a Mon Sep 17 00:00:00 2001 From: James Strachan Date: Tue, 28 Mar 2006 16:29:41 +0000 Subject: [PATCH] moved the SSL transport into the main core module and switched to using our own TCP transport by default (as well as refactoring to make the [Server]SocketFactory objects pluggable git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@389536 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/tcp/SslTransportFactory.java | 41 ++++++++++++++ .../activemq/transport/tcp/TcpTransport.java | 19 ++++--- .../transport/tcp/TcpTransportFactory.java | 27 +++++----- .../transport/tcp/TcpTransportServer.java | 19 +++---- .../org/apache/activemq/transport/activeiossl | 1 + .../org/apache/activemq/transport/ssl | 2 +- .../transport/tcp/InactivityMonitorTest.java | 4 +- .../transport/tcp/SslTransportBrokerTest.java | 51 ++++++++++++++++++ .../src/test/resources/client.keystore | Bin 0 -> 646 bytes .../src/test/resources/server.keystore | Bin 0 -> 1352 bytes 10 files changed, 130 insertions(+), 34 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java create mode 100644 activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/activeiossl create mode 100755 activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportBrokerTest.java create mode 100755 activemq-core/src/test/resources/client.keystore create mode 100755 activemq-core/src/test/resources/server.keystore diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java new file mode 100644 index 0000000000..0a5b64ed36 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java @@ -0,0 +1,41 @@ +/* + * Copyright 2005-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; +import javax.net.ssl.SSLServerSocketFactory; +import javax.net.ssl.SSLSocketFactory; + +/** + * An implementation of the TCP Transport using SSL + * + * @version $Revision: $ + */ +public class SslTransportFactory extends TcpTransportFactory { + + public SslTransportFactory() { + } + + protected ServerSocketFactory createServerSocketFactory() { + return SSLServerSocketFactory.getDefault(); + } + + protected SocketFactory createSocketFactory() { + return SSLSocketFactory.getDefault(); + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index d2e6f0ffa3..9f4e44bcae 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -38,6 +38,8 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import javax.net.SocketFactory; + /** * An implementation of the {@link Transport} interface using raw tcp/ip * @@ -76,24 +78,25 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S * @throws IOException * @throws UnknownHostException */ - public TcpTransport(WireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException { + public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException { this(wireFormat); - this.socket = createSocket(remoteLocation); + this.socket = createSocket(socketFactory, remoteLocation); } /** * Connect to a remote Node - e.g. a Broker * * @param wireFormat + * @param socketFactory * @param remoteLocation * @param localLocation - * e.g. local InetAddress and local port * @throws IOException * @throws UnknownHostException */ - public TcpTransport(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { + public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { this(wireFormat); - this.socket = createSocket(remoteLocation, localLocation); + this.socket = createSocket(socketFactory, remoteLocation, localLocation); } /** @@ -229,10 +232,10 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S * @throws UnknownHostException * @throws IOException */ - protected Socket createSocket(URI remoteLocation) throws UnknownHostException, IOException { + protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException { String host = resolveHostName(remoteLocation.getHost()); socketAddress = new InetSocketAddress(host, remoteLocation.getPort()); - Socket sock = new Socket(); + Socket sock = socketFactory.createSocket(); return sock; } @@ -246,11 +249,11 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S * @throws IOException * @throws UnknownHostException */ - protected Socket createSocket(URI remoteLocation, URI localLocation) throws IOException, UnknownHostException { + protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException { String host = resolveHostName(remoteLocation.getHost()); SocketAddress sockAddress = new InetSocketAddress(host, remoteLocation.getPort()); SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); - Socket sock = new Socket(); + Socket sock = socketFactory.createSocket(); initialiseSocket(sock); sock.bind(localAddress); sock.connect(sockAddress); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index 6679157685..a71678ae36 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -48,7 +48,8 @@ public class TcpTransportFactory extends TransportFactory { try { Map options = new HashMap(URISupport.parseParamters(location)); - TcpTransportServer server = new TcpTransportServer(location); + ServerSocketFactory serverSocketFactory = createServerSocketFactory(); + TcpTransportServer server = new TcpTransportServer(location, serverSocketFactory); server.setWireFormatFactory(createWireFormatFactory(options)); IntrospectionSupport.setProperties(server, options); @@ -99,20 +100,22 @@ public class TcpTransportFactory extends TransportFactory { URI localLocation=null; String path=location.getPath(); // see if the path is a local URI location - if(path!=null&&path.length()>0){ - int localPortIndex=path.indexOf(':'); - try{ - Integer.parseInt(path.substring((localPortIndex+1),path.length())); - String localString=location.getScheme()+ ":/" + path; - localLocation=new URI(localString); - }catch(Exception e){ - log.warn("path isn't a valid local location for TcpTransport to use",e); + if (path != null && path.length() > 0) { + int localPortIndex = path.indexOf(':'); + try { + Integer.parseInt(path.substring((localPortIndex + 1), path.length())); + String localString = location.getScheme() + ":/" + path; + localLocation = new URI(localString); + } + catch (Exception e) { + log.warn("path isn't a valid local location for TcpTransport to use", e); } } - if(localLocation!=null){ - return new TcpTransport(wf,location,localLocation); + SocketFactory socketFactory = createSocketFactory(); + if (localLocation != null) { + return new TcpTransport(wf, socketFactory, location, localLocation); } - return new TcpTransport(wf,location); + return new TcpTransport(wf, socketFactory, location); } protected ServerSocketFactory createServerSocketFactory() { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index 90c95813c3..f8e5e2671d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -38,6 +38,8 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import javax.net.ServerSocketFactory; + /** * A TCP based implementation of {@link TransportServer} * @@ -54,16 +56,9 @@ public class TcpTransportServer extends TransportServerThreadSupport { private int minmumWireFormatVersion; private boolean trace; - /** - * Constructor - * - * @param location - * @throws IOException - * @throws URISyntaxException - */ - public TcpTransportServer(URI location) throws IOException, URISyntaxException { + public TcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { super(location); - serverSocket = createServerSocket(location); + serverSocket = createServerSocket(location, serverSocketFactory); serverSocket.setSoTimeout(2000); updatePhysicalUri(location); } @@ -194,16 +189,16 @@ public class TcpTransportServer extends TransportServerThreadSupport { * @throws UnknownHostException * @throws IOException */ - protected ServerSocket createServerSocket(URI bind) throws UnknownHostException, IOException { + protected ServerSocket createServerSocket(URI bind, ServerSocketFactory factory) throws UnknownHostException, IOException { ServerSocket answer = null; String host = bind.getHost(); host = (host == null || host.length() == 0) ? "localhost" : host; InetAddress addr = InetAddress.getByName(host); if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) { - answer = new ServerSocket(bind.getPort(), backlog); + answer = factory.createServerSocket(bind.getPort(), backlog); } else { - answer = new ServerSocket(bind.getPort(), backlog, addr); + answer = factory.createServerSocket(bind.getPort(), backlog, addr); } return answer; } diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/activeiossl b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/activeiossl new file mode 100644 index 0000000000..4eed7519c9 --- /dev/null +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/activeiossl @@ -0,0 +1 @@ +class=org.apache.activemq.transport.activeio.ActiveIOTransportFactory diff --git a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/ssl b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/ssl index 4eed7519c9..844e3c8038 100644 --- a/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/ssl +++ b/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/transport/ssl @@ -1 +1 @@ -class=org.apache.activemq.transport.activeio.ActiveIOTransportFactory +class=org.apache.activemq.transport.tcp.SslTransportFactory diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java index c82722cf8b..953493315b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java @@ -29,6 +29,8 @@ import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportServer; +import javax.net.SocketFactory; + import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; @@ -146,7 +148,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra // // Manually create a client transport so that it does not send KeepAlive packets. // this should simulate a client hang. - clientTransport = new TcpTransport(new OpenWireFormat(), new URI("tcp://localhost:61616")); + clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616")); clientTransport.setTransportListener(new TransportListener() { public void onCommand(Command command) { clientReceiveCount.incrementAndGet(); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportBrokerTest.java new file mode 100755 index 0000000000..d149e64c2e --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportBrokerTest.java @@ -0,0 +1,51 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import org.apache.activemq.transport.TransportBrokerTestSupport; + +import junit.framework.Test; +import junit.textui.TestRunner; + +public class SslTransportBrokerTest extends TransportBrokerTestSupport { + + protected String getBindLocation() { + return "ssl://localhost:0"; + } + + protected void setUp() throws Exception { + System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + System.setProperty("javax.net.ssl.trustStoreType", "jks"); + System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "jks"); + //System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager"); + + MAX_WAIT = 2000; + super.setUp(); + } + + public static Test suite() { + return suite(SslTransportBrokerTest.class); + } + + public static void main(String[] args) { + TestRunner.run(suite()); + } + +} diff --git a/activemq-core/src/test/resources/client.keystore b/activemq-core/src/test/resources/client.keystore new file mode 100755 index 0000000000000000000000000000000000000000..85806727779f7b648400bb41b54beecb4f4cf74a GIT binary patch literal 646 zcmezO_TO6u1_mY|W(3o$xs}|_yE zckq1;`86 z0{zwibw~M`AHUpNxJ_}@da-r?Bed2(Y~T9HB+%=iq@KF}f0dcK+KXNu>8n^aA?$|n z+joH*yf#l|VrFDuM0Or9nwWv^3OXgRzs_T0D9g-xZKFMm&j5sG8&3c>mxJ-N&=!PFfUy=YBTV?L5*%n`1$thICwE51<`Q;3W*JCt1QcfRJxXpZ|boa`L zt|*p=AN+Pz6f`-UaZb`**Am+#qa40yQe*gWf8z@EH9Nk)-u?B3l=8WRTU<@oTi0xl zvz3?hC{<6~86j~@L3_?=?}>-5R;V0I&d&e*OhoBV^^0>HCeOFM`+aT_ORI`qvLeUF z6Jh21*47ADXecd6jbXM{;n>xtbHqkyNtr|>$Ca$f`NmaO%Qx8`e^ZqHM>2NeLfg#L zjv&!ncY>I_MO7QxbNG%WyY}eSJy_7iA@|R@R^#fV=To){>Ps>(v8i&dsIGYNUQB%X z;nSj14^G~vy(lxHoPE*DRWmPII|WaXUUhzR_0;<-vlBJ{%}hVAPk(v)g&(nNwwx9Y z&xzT-C3M4`y6chs*}pV41Zyk*x@I?fao*dhW}8nOJA6EKkFKk#%^}a9*PIWoVh?y^ zasAc+8hyU?giI%Qjxff1}+*4l6JY4?qzw~|6BiAJV+d#~?VzNy@z=fA9%_$sBJA?d{l`j7-2p=WAf2~5Zyz=Z5# z(8RcwiN%R&`ve1CHcqWJkGAi;jEpR-3ld~g?!!FF^ z<7g;qAPiE*CCrnUT#{Lqnp>!sUz83L=0+11G7tc%W)|km$xlwq$;dA*F_07IH83?W zGB7qWHZe6ahyrs>4NRb1dOD?XK5{q!b2f8hFM~m2CsSi1!^Rnt$_|$w&A+HS-TvR{ zhPygtN!GSM(|7QF4f~jUwm|6n#wV#dlUpr|%9y_xNu87^nJJUb_2|3ap<7Q@W{H&j z^xyZ@^SHD7@&f(V0Ch+CnIFI0TewYe)q1gY|0A^4KWyLn$t2M0proF<|9_R4y4s6g z9_g!CHX-bW@!NNS8@x78WnyMzU_^EvFq)Ww?g~03vA@n^V<^kadTpt$kjp1S)k}W= zJf)!_P^=_-tyni9Rbu%ntI6Mc5`2z|xgUGm#bbZst=Za#O-ZXiUQ2!&6}DgNJmc@+ zMHf7qCn^i<^O${GK0Z%NplwZ}tzn0_%(b}tg8OZ}f4wrfl4>`@e#=8S*>Z-TcB(=L hd*8XL$DW*VWs{eP@