Merged in revision 418548 from trunk since that was needed for the stomp backport

http://issues.apache.org/activemq/browse/AMQ-793



git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/branches/activemq-4.0@419024 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-07-04 15:01:22 +00:00
parent 80c76a8882
commit f0713fecc3
7 changed files with 95 additions and 207 deletions

View File

@ -198,13 +198,34 @@ public abstract class TransportFactory {
return "default";
}
protected Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
IntrospectionSupport.setProperties(transport, options);
/**
* Fully configures and adds all need transport filters so that the transport
* can be used by the JMS client.
*
* @param transport
* @param wf
* @param options
* @return
* @throws Exception
*/
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
transport = compositeConfigure(transport, wf, options);
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
}
/**
* Similar to configure(...) but this avoid adding in the MutexTransport and ResponseCorrelator transport layers
* so that the resulting transport can more efficiently be used as part of a composite transport.
*
* @param transport
* @param format
* @param options
* @return
*/
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
return transport;

View File

@ -21,7 +21,7 @@ import org.apache.activemq.command.Response;
*/
public class TransportFilter implements TransportListener,Transport{
final protected Transport next;
private TransportListener transportListener;
protected TransportListener transportListener;
public TransportFilter(Transport next){
this.next=next;

View File

@ -23,13 +23,14 @@ import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Map;
import javax.net.SocketFactory;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
@ -40,8 +41,6 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.net.SocketFactory;
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
*
@ -60,10 +59,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
private boolean trace;
private boolean useLocalHost = true;
private int minmumWireFormatVersion;
private InetSocketAddress socketAddress;
private Map socketOptions;
private InetSocketAddress remoteAddress;
private InetSocketAddress localAddress;
/**
* Construct basic helpers
@ -74,19 +71,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
this.wireFormat = wireFormat;
}
/**
* Connect to a remote Node - e.g. a Broker
*
* @param wireFormat
* @param remoteLocation
* @throws IOException
* @throws UnknownHostException
*/
public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException {
this(wireFormat);
this.socket = createSocket(socketFactory, remoteLocation);
}
/**
* Connect to a remote Node - e.g. a Broker
*
@ -231,36 +215,22 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* Factory method to create a new socket
*
* @param remoteLocation
* the URI to connect to
* @return the newly created socket
* @throws UnknownHostException
* @throws IOException
*/
protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation) throws UnknownHostException, IOException {
String host = resolveHostName(remoteLocation.getHost());
socketAddress = new InetSocketAddress(host, remoteLocation.getPort());
Socket sock = socketFactory.createSocket();
return sock;
}
/**
* Factory method to create a new socket
*
* @param remoteLocation
* @param localLocation
* @param localLocation ignored if null
* @return
* @throws IOException
* @throws IOException
* @throws UnknownHostException
*/
protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
String host = resolveHostName(remoteLocation.getHost());
SocketAddress sockAddress = new InetSocketAddress(host, remoteLocation.getPort());
SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
Socket sock = socketFactory.createSocket();
initialiseSocket(sock);
sock.bind(localAddress);
sock.connect(sockAddress);
remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
if( localLocation!=null ) {
localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
}
Socket sock = socketFactory.createSocket();
return sock;
}
@ -293,12 +263,15 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected void doStart() throws Exception {
initialiseSocket(socket);
if (socketAddress != null) {
if( localAddress!=null ) {
socket.bind(localAddress);
}
if (remoteAddress != null) {
if (connectionTimeout >= 0) {
socket.connect(socketAddress, connectionTimeout);
socket.connect(remoteAddress, connectionTimeout);
}
else {
socket.connect(socketAddress);
socket.connect(remoteAddress);
}
}
initializeStreams();

View File

@ -29,8 +29,6 @@ import javax.net.SocketFactory;
import org.apache.activeio.command.WireFormat;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLogger;
@ -49,7 +47,7 @@ public class TcpTransportFactory extends TransportFactory {
Map options = new HashMap(URISupport.parseParamters(location));
ServerSocketFactory serverSocketFactory = createServerSocketFactory();
TcpTransportServer server = new TcpTransportServer(location, serverSocketFactory);
TcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
server.setWireFormatFactory(createWireFormatFactory(options));
IntrospectionSupport.setProperties(server, options);
Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
@ -62,31 +60,24 @@ public class TcpTransportFactory extends TransportFactory {
}
}
public Transport configure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
TcpTransport tcpTransport = (TcpTransport) transport;
Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
tcpTransport.setSocketOptions(socketOptions);
if (tcpTransport.isTrace()) {
transport = new TransportLogger(transport);
}
transport = new InactivityMonitor(transport);
// Only need the OpenWireFormat if using openwire
if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
}
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
}
/**
* Allows subclasses of TcpTransportFactory to create custom instances of TcpTransportServer.
*
* @param location
* @param serverSocketFactory
* @return
* @throws IOException
* @throws URISyntaxException
*/
protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory);
}
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
IntrospectionSupport.setProperties(transport, options);
TcpTransport tcpTransport = (TcpTransport) transport;
TcpTransport tcpTransport = (TcpTransport) transport.narrow(TcpTransport.class);
IntrospectionSupport.setProperties(tcpTransport, options);
Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
tcpTransport.setSocketOptions(socketOptions);
@ -96,7 +87,7 @@ public class TcpTransportFactory extends TransportFactory {
transport = new InactivityMonitor(transport);
// Only need the OpenWireFormat if using openwire
// Only need the WireFormatNegotiator if using openwire
if( format instanceof OpenWireFormat ) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
}
@ -120,12 +111,24 @@ public class TcpTransportFactory extends TransportFactory {
}
}
SocketFactory socketFactory = createSocketFactory();
if (localLocation != null) {
return new TcpTransport(wf, socketFactory, location, localLocation);
}
return new TcpTransport(wf, socketFactory, location);
return createTcpTransport(wf, socketFactory, location, localLocation);
}
/**
* Allows subclasses of TcpTransportFactory to provide a create custom TcpTransport intances.
*
* @param location
* @param wf
* @param socketFactory
* @param localLocation
* @return
* @throws UnknownHostException
* @throws IOException
*/
private TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new TcpTransport(wf, socketFactory, location, localLocation);
}
protected ServerSocketFactory createServerSocketFactory() {
return ServerSocketFactory.getDefault();
}

View File

@ -52,16 +52,17 @@ public class TcpTransportServer extends TransportServerThreadSupport {
private ServerSocket serverSocket;
private int backlog = 5000;
private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
private TcpTransportFactory transportFactory = new TcpTransportFactory();
private final TcpTransportFactory transportFactory;
private long maxInactivityDuration = 30000;
private int minmumWireFormatVersion;
private boolean trace;
private Map transportOptions;
public TcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
super(location);
serverSocket = createServerSocket(location, serverSocketFactory);
serverSocket.setSoTimeout(2000);
this.transportFactory=transportFactory;
this.serverSocket = createServerSocket(location, serverSocketFactory);
this.serverSocket.setSoTimeout(2000);
updatePhysicalUri(location);
}
@ -132,7 +133,7 @@ public class TcpTransportServer extends TransportServerThreadSupport {
options.put("trace", new Boolean(trace));
options.putAll(transportOptions);
WireFormat format = wireFormatFactory.createWireFormat();
TcpTransport transport = new TcpTransport(format, socket);
Transport transport = createTransport(socket, format);
Transport configuredTransport = transportFactory.configure(transport, format, options);
getAcceptListener().onAccept(configuredTransport);
}
@ -152,6 +153,17 @@ public class TcpTransportServer extends TransportServerThreadSupport {
}
}
/**
* Allow derived classes to override the Transport implementation that this transport server creates.
* @param socket
* @param format
* @return
* @throws IOException
*/
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new TcpTransport(format, socket);
}
/**
* @return pretty print of this
*/

View File

@ -1,121 +0,0 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.*;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompWireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.JMSException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import junit.framework.TestCase;
public class StompWireFormatTest extends TestCase {
protected static final Log log = LogFactory.getLog(StompWireFormatTest.class);
private StompWireFormat wire;
public void setUp() throws Exception {
wire = new StompWireFormat();
}
public void testValidConnectHandshake() throws Exception {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout);
ConnectionInfo ci = (ConnectionInfo) parseCommand("CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "\n" + Stomp.NULL);
assertNotNull(ci);
assertTrue(ci.isResponseRequired());
Response cr = new Response();
cr.setCorrelationId(ci.getCommandId());
String response = writeCommand(cr);
log.info("Received: " + response);
SessionInfo si = (SessionInfo) wire.readCommand(null);
assertNotNull(si);
assertTrue(!si.isResponseRequired());
ProducerInfo pi = (ProducerInfo) wire.readCommand(null);
assertNotNull(pi);
assertTrue(pi.isResponseRequired());
Response sr = new Response();
sr.setCorrelationId(pi.getCommandId());
response = writeCommand(sr);
log.info("Received: " + response);
assertTrue("Response should start with CONNECTED: " + response, response.startsWith("CONNECTED"));
// now lets test subscribe
ConsumerInfo consumerInfo = (ConsumerInfo) parseCommand("SUBSCRIBE\n" + "destination: /queue/foo\n" + "ack: client\n" + "activemq.prefetchSize: 1\n"
+ "\n" + Stomp.NULL);
assertNotNull(consumerInfo);
// assertTrue(consumerInfo.isResponseRequired());
assertEquals("prefetch size", 1, consumerInfo.getPrefetchSize());
cr = new Response();
cr.setCorrelationId(consumerInfo.getCommandId());
response = writeCommand(cr);
log.info("Received: " + response);
}
public void _testFakeServer() throws Exception {
final BrokerService container = new BrokerService();
new Thread(new Runnable() {
public void run() {
try {
container.addConnector("stomp://localhost:61613");
container.start();
}
catch (Exception e) {
System.err.println("ARGH: caught: " + e);
e.printStackTrace();
}
}
}).start();
System.err.println("started container");
System.err.println("okay, go play");
System.err.println(System.in.read());
}
protected Command parseCommand(String connect_frame) throws IOException, JMSException {
DataInputStream din = new DataInputStream(new ByteArrayInputStream(connect_frame.getBytes()));
return wire.readCommand(din);
}
protected String writeCommand(Command command) throws IOException, JMSException {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream dout = new DataOutputStream(bout);
wire.writeCommand(command, dout);
return new String(bout.toByteArray());
}
}

View File

@ -148,7 +148,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
//
// Manually create a client transport so that it does not send KeepAlive packets.
// this should simulate a client hang.
clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"));
clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"), null);
clientTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
clientReceiveCount.incrementAndGet();