mirror of https://github.com/apache/activemq.git
Support "transport." properties when creating a tranport server
Support "socket." properties when creating a transport. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@395640 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fa8b889906
commit
2ee58ca235
|
@ -28,12 +28,14 @@ import java.net.SocketException;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.activeio.command.WireFormat;
|
import org.apache.activeio.command.WireFormat;
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
import org.apache.activemq.command.Command;
|
import org.apache.activemq.command.Command;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.TransportThreadSupport;
|
import org.apache.activemq.transport.TransportThreadSupport;
|
||||||
|
import org.apache.activemq.util.IntrospectionSupport;
|
||||||
import org.apache.activemq.util.ServiceStopper;
|
import org.apache.activemq.util.ServiceStopper;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -60,6 +62,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
private int minmumWireFormatVersion;
|
private int minmumWireFormatVersion;
|
||||||
private InetSocketAddress socketAddress;
|
private InetSocketAddress socketAddress;
|
||||||
|
|
||||||
|
private Map socketOptions;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct basic helpers
|
* Construct basic helpers
|
||||||
|
@ -323,4 +327,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
dataIn.close();
|
dataIn.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSocketOptions(Map socketOptions) {
|
||||||
|
IntrospectionSupport.setProperties(socket, socketOptions);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,9 @@ public class TcpTransportFactory extends TransportFactory {
|
||||||
TcpTransportServer server = new TcpTransportServer(location, serverSocketFactory);
|
TcpTransportServer server = new TcpTransportServer(location, serverSocketFactory);
|
||||||
server.setWireFormatFactory(createWireFormatFactory(options));
|
server.setWireFormatFactory(createWireFormatFactory(options));
|
||||||
IntrospectionSupport.setProperties(server, options);
|
IntrospectionSupport.setProperties(server, options);
|
||||||
|
Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
|
||||||
|
server.setTransportOption(transportOptions);
|
||||||
|
|
||||||
return server;
|
return server;
|
||||||
}
|
}
|
||||||
catch (URISyntaxException e) {
|
catch (URISyntaxException e) {
|
||||||
|
@ -63,6 +65,9 @@ public class TcpTransportFactory extends TransportFactory {
|
||||||
public Transport configure(Transport transport, WireFormat format, Map options) {
|
public Transport configure(Transport transport, WireFormat format, Map options) {
|
||||||
IntrospectionSupport.setProperties(transport, options);
|
IntrospectionSupport.setProperties(transport, options);
|
||||||
TcpTransport tcpTransport = (TcpTransport) transport;
|
TcpTransport tcpTransport = (TcpTransport) transport;
|
||||||
|
Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
|
||||||
|
tcpTransport.setSocketOptions(socketOptions);
|
||||||
|
|
||||||
if (tcpTransport.isTrace()) {
|
if (tcpTransport.isTrace()) {
|
||||||
transport = new TransportLogger(transport);
|
transport = new TransportLogger(transport);
|
||||||
}
|
}
|
||||||
|
@ -82,6 +87,9 @@ public class TcpTransportFactory extends TransportFactory {
|
||||||
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
|
||||||
IntrospectionSupport.setProperties(transport, options);
|
IntrospectionSupport.setProperties(transport, options);
|
||||||
TcpTransport tcpTransport = (TcpTransport) transport;
|
TcpTransport tcpTransport = (TcpTransport) transport;
|
||||||
|
Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
|
||||||
|
tcpTransport.setSocketOptions(socketOptions);
|
||||||
|
|
||||||
if (tcpTransport.isTrace()) {
|
if (tcpTransport.isTrace()) {
|
||||||
transport = new TransportLogger(transport);
|
transport = new TransportLogger(transport);
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.activeio.command.WireFormat;
|
import org.apache.activeio.command.WireFormat;
|
||||||
import org.apache.activeio.command.WireFormatFactory;
|
import org.apache.activeio.command.WireFormatFactory;
|
||||||
|
@ -55,6 +56,7 @@ public class TcpTransportServer extends TransportServerThreadSupport {
|
||||||
private long maxInactivityDuration = 30000;
|
private long maxInactivityDuration = 30000;
|
||||||
private int minmumWireFormatVersion;
|
private int minmumWireFormatVersion;
|
||||||
private boolean trace;
|
private boolean trace;
|
||||||
|
private Map transportOptions;
|
||||||
|
|
||||||
public TcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
|
public TcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
|
||||||
super(location);
|
super(location);
|
||||||
|
@ -128,6 +130,7 @@ public class TcpTransportServer extends TransportServerThreadSupport {
|
||||||
options.put("maxInactivityDuration", new Long(maxInactivityDuration));
|
options.put("maxInactivityDuration", new Long(maxInactivityDuration));
|
||||||
options.put("minmumWireFormatVersion", new Integer(minmumWireFormatVersion));
|
options.put("minmumWireFormatVersion", new Integer(minmumWireFormatVersion));
|
||||||
options.put("trace", new Boolean(trace));
|
options.put("trace", new Boolean(trace));
|
||||||
|
options.putAll(transportOptions);
|
||||||
WireFormat format = wireFormatFactory.createWireFormat();
|
WireFormat format = wireFormatFactory.createWireFormat();
|
||||||
TcpTransport transport = new TcpTransport(format, socket);
|
TcpTransport transport = new TcpTransport(format, socket);
|
||||||
Transport configuredTransport = transportFactory.configure(transport, format, options);
|
Transport configuredTransport = transportFactory.configure(transport, format, options);
|
||||||
|
@ -213,4 +216,8 @@ public class TcpTransportServer extends TransportServerThreadSupport {
|
||||||
public InetSocketAddress getSocketAddress() {
|
public InetSocketAddress getSocketAddress() {
|
||||||
return (InetSocketAddress)serverSocket.getLocalSocketAddress();
|
return (InetSocketAddress)serverSocket.getLocalSocketAddress();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setTransportOption(Map transportOptions) {
|
||||||
|
this.transportOptions = transportOptions;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue