moved the SSL transport into the main core module and switched to using our own TCP transport by default (as well as refactoring to make the [Server]SocketFactory objects pluggable

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@389536 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-28 16:29:41 +00:00
parent 2a076d0710
commit 24404740a1
10 changed files with 130 additions and 34 deletions

View File

@ -0,0 +1,41 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.tcp;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocketFactory;
/**
* An implementation of the TCP Transport using SSL
*
* @version $Revision: $
*/
public class SslTransportFactory extends TcpTransportFactory {
public SslTransportFactory() {
}
protected ServerSocketFactory createServerSocketFactory() {
return SSLServerSocketFactory.getDefault();
}
protected SocketFactory createSocketFactory() {
return SSLSocketFactory.getDefault();
}
}

View File

@ -38,6 +38,8 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.net.SocketFactory;
/** /**
* An implementation of the {@link Transport} interface using raw tcp/ip * An implementation of the {@link Transport} interface using raw tcp/ip
* *
@ -76,24 +78,25 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* @throws IOException * @throws IOException
* @throws UnknownHostException * @throws UnknownHostException
*/ */
public TcpTransport(WireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException { public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException {
this(wireFormat); this(wireFormat);
this.socket = createSocket(remoteLocation); this.socket = createSocket(socketFactory, remoteLocation);
} }
/** /**
* Connect to a remote Node - e.g. a Broker * Connect to a remote Node - e.g. a Broker
* *
* @param wireFormat * @param wireFormat
* @param socketFactory
* @param remoteLocation * @param remoteLocation
* @param localLocation - * @param localLocation -
* e.g. local InetAddress and local port * e.g. local InetAddress and local port
* @throws IOException * @throws IOException
* @throws UnknownHostException * @throws UnknownHostException
*/ */
public TcpTransport(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
this(wireFormat); this(wireFormat);
this.socket = createSocket(remoteLocation, localLocation); this.socket = createSocket(socketFactory, remoteLocation, localLocation);
} }
/** /**
@ -229,10 +232,10 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* @throws UnknownHostException * @throws UnknownHostException
* @throws IOException * @throws IOException
*/ */
protected Socket createSocket(URI remoteLocation) throws UnknownHostException, IOException { protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException {
String host = resolveHostName(remoteLocation.getHost()); String host = resolveHostName(remoteLocation.getHost());
socketAddress = new InetSocketAddress(host, remoteLocation.getPort()); socketAddress = new InetSocketAddress(host, remoteLocation.getPort());
Socket sock = new Socket(); Socket sock = socketFactory.createSocket();
return sock; return sock;
} }
@ -246,11 +249,11 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* @throws IOException * @throws IOException
* @throws UnknownHostException * @throws UnknownHostException
*/ */
protected Socket createSocket(URI remoteLocation, URI localLocation) throws IOException, UnknownHostException { protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
String host = resolveHostName(remoteLocation.getHost()); String host = resolveHostName(remoteLocation.getHost());
SocketAddress sockAddress = new InetSocketAddress(host, remoteLocation.getPort()); SocketAddress sockAddress = new InetSocketAddress(host, remoteLocation.getPort());
SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
Socket sock = new Socket(); Socket sock = socketFactory.createSocket();
initialiseSocket(sock); initialiseSocket(sock);
sock.bind(localAddress); sock.bind(localAddress);
sock.connect(sockAddress); sock.connect(sockAddress);

View File

@ -48,7 +48,8 @@ public class TcpTransportFactory extends TransportFactory {
try { try {
Map options = new HashMap(URISupport.parseParamters(location)); Map options = new HashMap(URISupport.parseParamters(location));
TcpTransportServer server = new TcpTransportServer(location); ServerSocketFactory serverSocketFactory = createServerSocketFactory();
TcpTransportServer server = new TcpTransportServer(location, serverSocketFactory);
server.setWireFormatFactory(createWireFormatFactory(options)); server.setWireFormatFactory(createWireFormatFactory(options));
IntrospectionSupport.setProperties(server, options); IntrospectionSupport.setProperties(server, options);
@ -99,20 +100,22 @@ public class TcpTransportFactory extends TransportFactory {
URI localLocation=null; URI localLocation=null;
String path=location.getPath(); String path=location.getPath();
// see if the path is a local URI location // see if the path is a local URI location
if(path!=null&&path.length()>0){ if (path != null && path.length() > 0) {
int localPortIndex=path.indexOf(':'); int localPortIndex = path.indexOf(':');
try{ try {
Integer.parseInt(path.substring((localPortIndex+1),path.length())); Integer.parseInt(path.substring((localPortIndex + 1), path.length()));
String localString=location.getScheme()+ ":/" + path; String localString = location.getScheme() + ":/" + path;
localLocation=new URI(localString); localLocation = new URI(localString);
}catch(Exception e){ }
log.warn("path isn't a valid local location for TcpTransport to use",e); catch (Exception e) {
log.warn("path isn't a valid local location for TcpTransport to use", e);
} }
} }
if(localLocation!=null){ SocketFactory socketFactory = createSocketFactory();
return new TcpTransport(wf,location,localLocation); if (localLocation != null) {
return new TcpTransport(wf, socketFactory, location, localLocation);
} }
return new TcpTransport(wf,location); return new TcpTransport(wf, socketFactory, location);
} }
protected ServerSocketFactory createServerSocketFactory() { protected ServerSocketFactory createServerSocketFactory() {

View File

@ -38,6 +38,8 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.net.ServerSocketFactory;
/** /**
* A TCP based implementation of {@link TransportServer} * A TCP based implementation of {@link TransportServer}
* *
@ -54,16 +56,9 @@ public class TcpTransportServer extends TransportServerThreadSupport {
private int minmumWireFormatVersion; private int minmumWireFormatVersion;
private boolean trace; private boolean trace;
/** public TcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
* Constructor
*
* @param location
* @throws IOException
* @throws URISyntaxException
*/
public TcpTransportServer(URI location) throws IOException, URISyntaxException {
super(location); super(location);
serverSocket = createServerSocket(location); serverSocket = createServerSocket(location, serverSocketFactory);
serverSocket.setSoTimeout(2000); serverSocket.setSoTimeout(2000);
updatePhysicalUri(location); updatePhysicalUri(location);
} }
@ -194,16 +189,16 @@ public class TcpTransportServer extends TransportServerThreadSupport {
* @throws UnknownHostException * @throws UnknownHostException
* @throws IOException * @throws IOException
*/ */
protected ServerSocket createServerSocket(URI bind) throws UnknownHostException, IOException { protected ServerSocket createServerSocket(URI bind, ServerSocketFactory factory) throws UnknownHostException, IOException {
ServerSocket answer = null; ServerSocket answer = null;
String host = bind.getHost(); String host = bind.getHost();
host = (host == null || host.length() == 0) ? "localhost" : host; host = (host == null || host.length() == 0) ? "localhost" : host;
InetAddress addr = InetAddress.getByName(host); InetAddress addr = InetAddress.getByName(host);
if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) { if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
answer = new ServerSocket(bind.getPort(), backlog); answer = factory.createServerSocket(bind.getPort(), backlog);
} }
else { else {
answer = new ServerSocket(bind.getPort(), backlog, addr); answer = factory.createServerSocket(bind.getPort(), backlog, addr);
} }
return answer; return answer;
} }

View File

@ -0,0 +1 @@
class=org.apache.activemq.transport.activeio.ActiveIOTransportFactory

View File

@ -1 +1 @@
class=org.apache.activemq.transport.activeio.ActiveIOTransportFactory class=org.apache.activemq.transport.tcp.SslTransportFactory

View File

@ -29,6 +29,8 @@ import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import javax.net.SocketFactory;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
@ -146,7 +148,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
// //
// Manually create a client transport so that it does not send KeepAlive packets. // Manually create a client transport so that it does not send KeepAlive packets.
// this should simulate a client hang. // this should simulate a client hang.
clientTransport = new TcpTransport(new OpenWireFormat(), new URI("tcp://localhost:61616")); clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"));
clientTransport.setTransportListener(new TransportListener() { clientTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) { public void onCommand(Command command) {
clientReceiveCount.incrementAndGet(); clientReceiveCount.incrementAndGet();

View File

@ -0,0 +1,51 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.tcp;
import org.apache.activemq.transport.TransportBrokerTestSupport;
import junit.framework.Test;
import junit.textui.TestRunner;
public class SslTransportBrokerTest extends TransportBrokerTestSupport {
protected String getBindLocation() {
return "ssl://localhost:0";
}
protected void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
//System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager");
MAX_WAIT = 2000;
super.setUp();
}
public static Test suite() {
return suite(SslTransportBrokerTest.class);
}
public static void main(String[] args) {
TestRunner.run(suite());
}
}

Binary file not shown.

Binary file not shown.