Add support for AMQP client to connect using WebSockets.
This commit is contained in:
Timothy Bish 2016-06-17 16:26:52 -04:00
parent 83827f2770
commit 31c55f7510
53 changed files with 2229 additions and 563 deletions

View File

@ -93,6 +93,21 @@
<artifactId>activemq-spring</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-http</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-mqtt</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
@ -123,6 +138,36 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty-all-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty-all-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty-all-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty-all-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty-all-version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -22,13 +22,13 @@ import org.apache.activemq.command.Command;
/**
* Interface that defines the API for any AMQP protocol converter ised to
* map AMQP mechanincs to ActiveMQ and back.
* map AMQP mechanics to ActiveMQ and back.
*/
public interface AmqpProtocolConverter {
/**
* A new incoming data packet from the remote peer is handed off to the
* protocol converter for porcessing. The type can vary and be either an
* protocol converter for processing. The type can vary and be either an
* AmqpHeader at the handshake phase or a byte buffer containing the next
* incoming frame data from the remote.
*
@ -70,9 +70,9 @@ public interface AmqpProtocolConverter {
* empty frames or closing connections due to remote end being inactive
* for to long.
*
* @returns the amount of milliseconds to wait before performaing another check.
* @returns the amount of milliseconds to wait before performing another check.
*
* @throws IOException if an error occurs on writing heatbeats to the wire.
* @throws IOException if an error occurs on writing heart-beats to the wire.
*/
long keepAlive() throws IOException;

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.transport.amqp.AmqpFrameParser.AMQPFrameSink;
import org.apache.activemq.transport.ws.WSTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
/**
* An AMQP based WebSocket transport implementation.
*/
public class AmqpWSTransport extends TransportSupport implements WSTransport, AMQPFrameSink {
private final AmqpFrameParser frameReader = new AmqpFrameParser(this);
private final URI remoteLocation;
private WSTransportSink outputSink;
private int receiveCounter;
private X509Certificate[] certificates;
/**
* Create a new Transport instance.
*
* @param location
* the remote location where the client connection is from.
* @param wireFormat
* the WireFormat instance that configures this Transport.
*/
public AmqpWSTransport(URI location, WireFormat wireFormat) {
super();
remoteLocation = location;
frameReader.setWireFormat((AmqpWireFormat) wireFormat);
}
@Override
public void setTransportSink(WSTransportSink outputSink) {
this.outputSink = outputSink;
}
@Override
public void oneway(Object command) throws IOException {
if (command instanceof ByteBuffer) {
outputSink.onSocketOutboundBinary((ByteBuffer) command);
} else {
throw new IOException("Unexpected output command.");
}
}
@Override
public String getRemoteAddress() {
return remoteLocation.toASCIIString();
}
@Override
public int getReceiveCounter() {
return receiveCounter;
}
@Override
public X509Certificate[] getPeerCertificates() {
return certificates;
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
this.certificates = certificates;
}
@Override
public String getSubProtocol() {
return "amqp";
}
@Override
public WireFormat getWireFormat() {
return frameReader.getWireFormat();
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
// Currently nothing needed here since we have no async workers.
}
@Override
protected void doStart() throws Exception {
if (outputSink == null) {
throw new IllegalStateException("Transport started before output sink assigned.");
}
// Currently nothing needed here since we have no async workers.
}
//----- WebSocket event hooks --------------------------------------------//
@Override
public void onWebSocketText(String data) throws IOException {
onException(new IOException("Illegal text content receive on AMQP WebSocket channel."));
}
@Override
public void onWebSocketBinary(ByteBuffer data) throws IOException {
try {
frameReader.parse(data);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
}
@Override
public void onWebSocketClosed() throws IOException {
onException(new IOException("Unexpected close of AMQP WebSocket channel."));
}
//----- AMQP Frame Data event hook ---------------------------------------//
@Override
public void onFrame(Object frame) {
doConsume(frame);
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Map;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
/**
* Factory for creating WebSocket aware AMQP Transports.
*/
public class AmqpWSTransportFactory extends TransportFactory implements BrokerServiceAware {
private BrokerService brokerService = null;
@Override
protected String getDefaultWireFormatType() {
return "amqp";
}
@Override
public TransportServer doBind(URI location) throws IOException {
throw new IOException("doBind() method not implemented! No Server over WS implemented.");
}
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
AmqpTransportFilter amqpTransport = new AmqpTransportFilter(transport, format, brokerService);
Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat.");
IntrospectionSupport.setProperties(amqpTransport, options);
IntrospectionSupport.setProperties(amqpTransport.getWireFormat(), wireFormatOptions);
// Now wrap the filter with the monitor
transport = createInactivityMonitor(amqpTransport, format);
IntrospectionSupport.setProperties(transport, options);
return super.compositeConfigure(transport, format, options);
}
/**
* Factory method to create a new transport
*
* @throws IOException
* @throws UnknownHostException
*/
@Override
protected Transport createTransport(URI location, WireFormat wireFormat) throws MalformedURLException, UnknownHostException, IOException {
return new AmqpWSTransport(location, wireFormat);
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
protected Transport createInactivityMonitor(AmqpTransportFilter transport, WireFormat format) {
AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
transport.setInactivityMonitor(monitor);
return monitor;
}
}

View File

@ -309,7 +309,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
while (!done) {
ByteBuffer toWrite = protonTransport.getOutputBuffer();
if (toWrite != null && toWrite.hasRemaining()) {
LOG.trace("Sending {} bytes out", toWrite.limit());
LOG.trace("Server: Sending {} bytes out", toWrite.limit());
amqpTransport.sendToAmqp(toWrite);
protonTransport.outputConsumed();
} else {
@ -356,6 +356,8 @@ public class AmqpConnection implements AmqpProtocolConverter {
return;
}
LOG.trace("Server: Received from client: {} bytes", frame.getLength());
while (frame.length > 0) {
try {
int count = protonTransport.input(frame.data, frame.offset, frame.length);
@ -386,7 +388,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
Event event = null;
while ((event = eventCollector.peek()) != null) {
if (amqpTransport.isTrace()) {
LOG.trace("Processing event: {}", event.getType());
LOG.trace("Server: Processing event: {}", event.getType());
}
switch (event.getType()) {
case CONNECTION_REMOTE_OPEN:
@ -484,7 +486,6 @@ public class AmqpConnection implements AmqpProtocolConverter {
protonConnection.close();
} else {
if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) {
LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout());
protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout());

View File

@ -54,14 +54,14 @@ public class AmqpAuthenticator {
}
/**
* @return true if the SASL exchange has conpleted, regardless of success.
* @return true if the SASL exchange has completed, regardless of success.
*/
public boolean isDone() {
return sasl.getOutcome() != Sasl.SaslOutcome.PN_SASL_NONE;
}
/**
* @return the list of all SASL mechanisms that are supported curretnly.
* @return the list of all SASL mechanisms that are supported currently.
*/
public String[] getSupportedMechanisms() {
return mechanisms;

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.amqp.AmqpWSTransportFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.amqp.AmqpWSTransportFactory

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.conversions;
package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -130,6 +130,7 @@ public class AmqpAndMqttTest {
exception.printStackTrace();
}
});
connection.start();
return connection;
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import java.security.SecureRandom;
import java.util.Set;
@ -33,6 +34,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.net.ServerSocketFactory;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
@ -79,6 +81,11 @@ public class AmqpTestSupport {
protected URI amqpNioPlusSslURI;
protected int amqpNioPlusSslPort;
protected URI amqpWsURI;
protected int amqpWsPort;
protected URI amqpWssURI;
protected int amqpWssPort;
protected URI autoURI;
protected int autoPort;
protected URI autoSslURI;
@ -213,6 +220,20 @@ public class AmqpTestSupport {
autoNioPlusSslURI = connector.getPublishableConnectURI();
LOG.debug("Using auto+nio+ssl port " + autoNioPlusSslPort);
}
if (isUseWsConnector()) {
connector = brokerService.addConnector(
"ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpWsPort = connector.getConnectUri().getPort();
amqpWsURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+ws port " + amqpWsPort);
}
if (isUseWssConnector()) {
connector = brokerService.addConnector(
"wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpWssPort = connector.getConnectUri().getPort();
amqpWssURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+wss port " + amqpWssPort);
}
}
protected boolean isPersistent() {
@ -263,6 +284,14 @@ public class AmqpTestSupport {
return false;
}
protected boolean isUseWsConnector() {
return false;
}
protected boolean isUseWssConnector() {
return false;
}
protected String getAmqpTransformer() {
return "jms";
}
@ -355,6 +384,26 @@ public class AmqpTestSupport {
return name.getMethodName();
}
protected int getProxyPort(int proxyPort) {
if (proxyPort == 0) {
ServerSocket ss = null;
try {
ss = ServerSocketFactory.getDefault().createServerSocket(0);
proxyPort = ss.getLocalPort();
} catch (IOException e) { // ignore
} finally {
try {
if (ss != null ) {
ss.close();
}
} catch (IOException e) { // ignore
}
}
}
return proxyPort;
}
protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
ObjectName brokerViewMBean = new ObjectName(
"org.apache.activemq:type=Broker,brokerName=localhost");

View File

@ -17,13 +17,7 @@
package org.apache.activemq.transport.amqp;
import java.net.URI;
import java.security.SecureRandom;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,13 +27,6 @@ import org.slf4j.LoggerFactory;
public class JMSClientSslTest extends JMSClientTest {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientSslTest.class);
@BeforeClass
public static void beforeClass() throws Exception {
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
SSLContext.setDefault(ctx);
}
@Override
protected URI getBrokerURI() {
return amqpSslURI;

View File

@ -57,7 +57,7 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Test;
import org.objectweb.jtests.jms.framework.TestConfig;
import org.slf4j.Logger;
@ -1180,8 +1180,8 @@ public class JMSClientTest extends JMSClientTestSupport {
@Test(timeout = 60000)
public void testZeroPrefetchWithTwoConsumers() throws Exception {
connection = createConnection();
((JmsConnection)connection).getPrefetchPolicy().setAll(0);
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0"));
connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -21,8 +21,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.qpid.proton.amqp.Symbol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -51,12 +51,12 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
@Override
protected boolean isUseTcpConnector() {
return !isUseSSL() && !connectorScheme.contains("nio");
return !isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("ws");
}
@Override
protected boolean isUseSslConnector() {
return isUseSSL() && !connectorScheme.contains("nio");
return isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("wss");
}
@Override
@ -69,13 +69,33 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
return isUseSSL() && connectorScheme.contains("nio");
}
@Override
protected boolean isUseWsConnector() {
return !isUseSSL() && connectorScheme.contains("ws");
}
@Override
protected boolean isUseWssConnector() {
return isUseSSL() && connectorScheme.contains("wss");
}
public URI getBrokerAmqpConnectionURI() {
boolean webSocket = false;
try {
int port = 0;
switch (connectorScheme) {
case "amqp":
port = this.amqpPort;
break;
case "amqp+ws":
port = this.amqpWsPort;
webSocket = true;
break;
case "amqp+wss":
port = this.amqpWssPort;
webSocket = true;
break;
case "amqp+ssl":
port = this.amqpSslPort;
break;
@ -92,10 +112,18 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
String uri = null;
if (isUseSSL()) {
if (webSocket) {
uri = "wss://127.0.0.1:" + port;
} else {
uri = "ssl://127.0.0.1:" + port;
}
} else {
if (webSocket) {
uri = "ws://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
}
if (!getAmqpConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getAmqpConnectionURIOptions();

View File

@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
@ -79,7 +78,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private final AtomicLong sessionIdGenerator = new AtomicLong();
private final AtomicLong txIdGenerator = new AtomicLong();
private final Collector protonCollector = new CollectorImpl();
private final NettyTransport transport;
private final org.apache.activemq.transport.amqp.client.transport.NettyTransport transport;
private final Transport protonTransport = Transport.Factory.create();
private final String username;
@ -103,7 +102,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
public AmqpConnection(NettyTransport transport, String username, String password) {
public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) {
setEndpoint(Connection.Factory.create());
getEndpoint().collect(protonCollector);
@ -490,7 +489,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
@Override
public void run() {
ByteBuffer source = incoming.nioBuffer();
LOG.trace("Received from Broker {} bytes:", source.remaining());
LOG.trace("Client Received from Broker {} bytes:", source.remaining());
if (protonTransport.isClosed()) {
LOG.debug("Ignoring incoming data because transport is closed");
@ -520,6 +519,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
@Override
public void onTransportClosed() {
LOG.debug("The transport has unexpectedly closed");
failed(getOpenAbortException());
}
@Override
@ -612,7 +612,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
Event protonEvent = null;
while ((protonEvent = protonCollector.peek()) != null) {
if (!protonEvent.getType().equals(Type.TRANSPORT)) {
LOG.trace("New Proton Event: {}", protonEvent.getType());
LOG.trace("Client: New Proton Event: {}", protonEvent.getType());
}
AmqpEventSink amqpEventSink = null;

View File

@ -0,0 +1,401 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* TCP based transport that uses Netty as the underlying IO layer.
*/
public class NettyTcpTransport implements NettyTransport {
private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
private static final int QUIET_PERIOD = 20;
private static final int SHUTDOWN_TIMEOUT = 100;
protected Bootstrap bootstrap;
protected EventLoopGroup group;
protected Channel channel;
protected NettyTransportListener listener;
protected NettyTransportOptions options;
protected final URI remote;
protected boolean secure;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private final CountDownLatch connectLatch = new CountDownLatch(1);
private IOException failureCause;
private Throwable pendingFailure;
/**
* Create a new transport instance
*
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
this(null, remoteLocation, options);
}
/**
* Create a new transport instance
*
* @param listener
* the TransportListener that will receive events from this Transport.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
this.options = options;
this.listener = listener;
this.remote = remoteLocation;
this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
}
@Override
public void connect() throws IOException {
if (listener == null) {
throw new IllegalStateException("A transport listener must be set before connection attempts.");
}
group = new NioEventLoopGroup(1);
bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel connectedChannel) throws Exception {
configureChannel(connectedChannel);
}
});
configureNetty(bootstrap, getTransportOptions());
ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
handleConnected(future.channel());
} else if (future.isCancelled()) {
connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
} else {
connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
}
}
});
try {
connectLatch.await();
} catch (InterruptedException ex) {
LOG.debug("Transport connection was interrupted.");
Thread.interrupted();
failureCause = IOExceptionSupport.create(ex);
}
if (failureCause != null) {
// Close out any Netty resources now as they are no longer needed.
if (channel != null) {
channel.close().syncUninterruptibly();
channel = null;
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
group = null;
}
throw failureCause;
} else {
// Connected, allow any held async error to fire now and close the transport.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (pendingFailure != null) {
channel.pipeline().fireExceptionCaught(pendingFailure);
}
}
});
}
}
@Override
public boolean isConnected() {
return connected.get();
}
@Override
public boolean isSSL() {
return secure;
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
if (channel != null) {
channel.close().syncUninterruptibly();
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
}
}
}
@Override
public ByteBuf allocateSendBuffer(int size) throws IOException {
checkConnected();
return channel.alloc().ioBuffer(size, size);
}
@Override
public void send(ByteBuf output) throws IOException {
checkConnected();
int length = output.readableBytes();
if (length == 0) {
return;
}
LOG.trace("Attempted write of: {} bytes", length);
channel.writeAndFlush(output);
}
@Override
public NettyTransportListener getTransportListener() {
return listener;
}
@Override
public void setTransportListener(NettyTransportListener listener) {
this.listener = listener;
}
@Override
public NettyTransportOptions getTransportOptions() {
if (options == null) {
if (isSSL()) {
options = NettyTransportSslOptions.INSTANCE;
} else {
options = NettyTransportOptions.INSTANCE;
}
}
return options;
}
@Override
public URI getRemoteLocation() {
return remote;
}
@Override
public Principal getLocalPrincipal() {
if (!isSSL()) {
throw new UnsupportedOperationException("Not connected to a secure channel");
}
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
return sslHandler.engine().getSession().getLocalPrincipal();
}
//----- Internal implementation details, can be overridden as needed --//
protected String getRemoteHost() {
return remote.getHost();
}
protected int getRemotePort() {
int port = remote.getPort();
if (port <= 0) {
if (isSSL()) {
port = getSslOptions().getDefaultSslPort();
} else {
port = getTransportOptions().getDefaultTcpPort();
}
}
return port;
}
protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
if (options.getSendBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
}
if (options.getReceiveBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
}
if (options.getTrafficClass() != -1) {
bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
}
}
protected void configureChannel(final Channel channel) throws Exception {
if (isSSL()) {
SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
LOG.trace("SSL Handshake has completed: {}", channel);
connectionEstablished(channel);
} else {
LOG.trace("SSL Handshake has failed: {}", channel);
connectionFailed(channel, IOExceptionSupport.create(future.cause()));
}
}
});
channel.pipeline().addLast(sslHandler);
}
channel.pipeline().addLast(new NettyTcpTransportHandler());
}
protected void handleConnected(final Channel channel) throws Exception {
if (!isSSL()) {
connectionEstablished(channel);
}
}
//----- State change handlers and checks ---------------------------------//
/**
* Called when the transport has successfully connected and is ready for use.
*/
protected void connectionEstablished(Channel connectedChannel) {
channel = connectedChannel;
connected.set(true);
connectLatch.countDown();
}
/**
* Called when the transport connection failed and an error should be returned.
*
* @param failedChannel
* The Channel instance that failed.
* @param cause
* An IOException that describes the cause of the failed connection.
*/
protected void connectionFailed(Channel failedChannel, IOException cause) {
failureCause = IOExceptionSupport.create(cause);
channel = failedChannel;
connected.set(false);
connectLatch.countDown();
}
private NettyTransportSslOptions getSslOptions() {
return (NettyTransportSslOptions) getTransportOptions();
}
private void checkConnected() throws IOException {
if (!connected.get()) {
throw new IOException("Cannot send to a non-connected transport.");
}
}
//----- Handle connection events -----------------------------------------//
private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has become active! Channel is {}", context.channel());
}
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportClosed listener");
listener.onTransportClosed();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
LOG.trace("Exception on channel! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportError listener");
if (pendingFailure != null) {
listener.onTransportError(pendingFailure);
} else {
listener.onTransportError(cause);
}
} else {
// Hold the first failure for later dispatch if connect succeeds.
// This will then trigger disconnect using the first error reported.
if (pendingFailure != null) {
LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
pendingFailure = cause;
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
listener.onData(buffer);
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -16,375 +16,37 @@
*/
package org.apache.activemq.transport.amqp.client.transport;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
/**
* TCP based transport that uses Netty as the underlying IO layer.
*/
public class NettyTransport {
private static final Logger LOG = LoggerFactory.getLogger(NettyTransport.class);
private static final int QUIET_PERIOD = 20;
private static final int SHUTDOWN_TIMEOUT = 100;
protected Bootstrap bootstrap;
protected EventLoopGroup group;
protected Channel channel;
protected NettyTransportListener listener;
protected NettyTransportOptions options;
protected final URI remote;
protected boolean secure;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private final CountDownLatch connectLatch = new CountDownLatch(1);
private IOException failureCause;
private Throwable pendingFailure;
/**
* Create a new transport instance
*
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyTransport(URI remoteLocation, NettyTransportOptions options) {
this(null, remoteLocation, options);
}
public interface NettyTransport {
/**
* Create a new transport instance
*
* @param listener
* the TransportListener that will receive events from this Transport.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
this.options = options;
this.listener = listener;
this.remote = remoteLocation;
this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
}
void connect() throws IOException;
public void connect() throws IOException {
boolean isConnected();
if (listener == null) {
throw new IllegalStateException("A transport listener must be set before connection attempts.");
}
boolean isSSL();
group = new NioEventLoopGroup(1);
void close() throws IOException;
bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
ByteBuf allocateSendBuffer(int size) throws IOException;
@Override
public void initChannel(Channel connectedChannel) throws Exception {
configureChannel(connectedChannel);
}
});
void send(ByteBuf output) throws IOException;
configureNetty(bootstrap, getTransportOptions());
NettyTransportListener getTransportListener();
ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
future.addListener(new ChannelFutureListener() {
void setTransportListener(NettyTransportListener listener);
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
handleConnected(future.channel());
} else if (future.isCancelled()) {
connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
} else {
connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
}
}
});
NettyTransportOptions getTransportOptions();
try {
connectLatch.await();
} catch (InterruptedException ex) {
LOG.debug("Transport connection was interrupted.");
Thread.interrupted();
failureCause = IOExceptionSupport.create(ex);
}
URI getRemoteLocation();
if (failureCause != null) {
// Close out any Netty resources now as they are no longer needed.
if (channel != null) {
channel.close().syncUninterruptibly();
channel = null;
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
group = null;
}
Principal getLocalPrincipal();
throw failureCause;
} else {
// Connected, allow any held async error to fire now and close the transport.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (pendingFailure != null) {
channel.pipeline().fireExceptionCaught(pendingFailure);
}
}
});
}
}
public boolean isConnected() {
return connected.get();
}
public boolean isSSL() {
return secure;
}
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
if (channel != null) {
channel.close().syncUninterruptibly();
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
}
}
}
public ByteBuf allocateSendBuffer(int size) throws IOException {
checkConnected();
return channel.alloc().ioBuffer(size, size);
}
public void send(ByteBuf output) throws IOException {
checkConnected();
int length = output.readableBytes();
if (length == 0) {
return;
}
LOG.trace("Attempted write of: {} bytes", length);
channel.writeAndFlush(output);
}
public NettyTransportListener getTransportListener() {
return listener;
}
public void setTransportListener(NettyTransportListener listener) {
this.listener = listener;
}
public NettyTransportOptions getTransportOptions() {
if (options == null) {
if (isSSL()) {
options = NettyTransportSslOptions.INSTANCE;
} else {
options = NettyTransportOptions.INSTANCE;
}
}
return options;
}
public URI getRemoteLocation() {
return remote;
}
public Principal getLocalPrincipal() {
if (!isSSL()) {
throw new UnsupportedOperationException("Not connected to a secure channel");
}
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
return sslHandler.engine().getSession().getLocalPrincipal();
}
//----- Internal implementation details, can be overridden as needed --//
protected String getRemoteHost() {
return remote.getHost();
}
protected int getRemotePort() {
int port = remote.getPort();
if (port <= 0) {
if (isSSL()) {
port = getSslOptions().getDefaultSslPort();
} else {
port = getTransportOptions().getDefaultTcpPort();
}
}
return port;
}
protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
if (options.getSendBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
}
if (options.getReceiveBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
}
if (options.getTrafficClass() != -1) {
bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
}
}
protected void configureChannel(final Channel channel) throws Exception {
if (isSSL()) {
SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
LOG.trace("SSL Handshake has completed: {}", channel);
connectionEstablished(channel);
} else {
LOG.trace("SSL Handshake has failed: {}", channel);
connectionFailed(channel, IOExceptionSupport.create(future.cause()));
}
}
});
channel.pipeline().addLast(sslHandler);
}
channel.pipeline().addLast(new NettyTcpTransportHandler());
}
protected void handleConnected(final Channel channel) throws Exception {
if (!isSSL()) {
connectionEstablished(channel);
}
}
//----- State change handlers and checks ---------------------------------//
/**
* Called when the transport has successfully connected and is ready for use.
*/
protected void connectionEstablished(Channel connectedChannel) {
channel = connectedChannel;
connected.set(true);
connectLatch.countDown();
}
/**
* Called when the transport connection failed and an error should be returned.
*
* @param failedChannel
* The Channel instance that failed.
* @param cause
* An IOException that describes the cause of the failed connection.
*/
protected void connectionFailed(Channel failedChannel, IOException cause) {
failureCause = IOExceptionSupport.create(cause);
channel = failedChannel;
connected.set(false);
connectLatch.countDown();
}
private NettyTransportSslOptions getSslOptions() {
return (NettyTransportSslOptions) getTransportOptions();
}
private void checkConnected() throws IOException {
if (!connected.get()) {
throw new IOException("Cannot send to a non-connected transport.");
}
}
//----- Handle connection events -----------------------------------------//
private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has become active! Channel is {}", context.channel());
}
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportClosed listener");
listener.onTransportClosed();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
LOG.trace("Exception on channel! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportError listener");
if (pendingFailure != null) {
listener.onTransportError(pendingFailure);
} else {
listener.onTransportError(cause);
}
} else {
// Hold the first failure for later dispatch if connect succeeds.
// This will then trigger disconnect using the first error reported.
if (pendingFailure != null) {
LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
pendingFailure = cause;
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
listener.onData(buffer);
}
}
}

View File

@ -46,7 +46,7 @@ public final class NettyTransportFactory {
remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
if (!remoteURI.getScheme().equalsIgnoreCase("ssl")) {
if (!remoteURI.getScheme().equalsIgnoreCase("ssl") && !remoteURI.getScheme().equalsIgnoreCase("wss")) {
transportOptions = NettyTransportOptions.INSTANCE.clone();
} else {
transportOptions = NettyTransportSslOptions.INSTANCE.clone();
@ -61,7 +61,20 @@ public final class NettyTransportFactory {
throw new IllegalArgumentException(msg);
}
NettyTransport result = new NettyTransport(remoteURI, transportOptions);
NettyTransport result = null;
switch (remoteURI.getScheme().toLowerCase()) {
case "tcp":
case "ssl":
result = new NettyTcpTransport(remoteURI, transportOptions);
break;
case "ws":
case "wss":
result = new NettyWSTransport(remoteURI, transportOptions);
break;
default:
throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme());
}
return result;
}

View File

@ -30,7 +30,7 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
public static final String DEFAULT_STORE_TYPE = "jks";
public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS";
public static final boolean DEFAULT_TRUST_ALL = false;
public static final boolean DEFAULT_VERIFY_HOST = true;
public static final boolean DEFAULT_VERIFY_HOST = false;
public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"}));
public static final int DEFAULT_SSL_PORT = 5671;

View File

@ -0,0 +1,470 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
/**
* Transport for communicating over WebSockets
*/
public class NettyWSTransport implements NettyTransport {
private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
private static final int QUIET_PERIOD = 20;
private static final int SHUTDOWN_TIMEOUT = 100;
protected Bootstrap bootstrap;
protected EventLoopGroup group;
protected Channel channel;
protected NettyTransportListener listener;
protected NettyTransportOptions options;
protected final URI remote;
protected boolean secure;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private ChannelPromise handshakeFuture;
private IOException failureCause;
private Throwable pendingFailure;
/**
* Create a new transport instance
*
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) {
this(null, remoteLocation, options);
}
/**
* Create a new transport instance
*
* @param listener
* the TransportListener that will receive events from this Transport.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
this.options = options;
this.listener = listener;
this.remote = remoteLocation;
this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss");
}
@Override
public void connect() throws IOException {
if (listener == null) {
throw new IllegalStateException("A transport listener must be set before connection attempts.");
}
group = new NioEventLoopGroup(1);
bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel connectedChannel) throws Exception {
configureChannel(connectedChannel);
}
});
configureNetty(bootstrap, getTransportOptions());
ChannelFuture future;
try {
future = bootstrap.connect(getRemoteHost(), getRemotePort());
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
handleConnected(future.channel());
} else if (future.isCancelled()) {
connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
} else {
connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
}
}
});
future.sync();
// Now wait for WS protocol level handshake completion
handshakeFuture.await();
} catch (InterruptedException ex) {
LOG.debug("Transport connection attempt was interrupted.");
Thread.interrupted();
failureCause = IOExceptionSupport.create(ex);
}
if (failureCause != null) {
// Close out any Netty resources now as they are no longer needed.
if (channel != null) {
channel.close().syncUninterruptibly();
channel = null;
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
group = null;
}
throw failureCause;
} else {
// Connected, allow any held async error to fire now and close the transport.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (pendingFailure != null) {
channel.pipeline().fireExceptionCaught(pendingFailure);
}
}
});
}
}
@Override
public boolean isConnected() {
return connected.get();
}
@Override
public boolean isSSL() {
return secure;
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
if (channel != null) {
channel.close().syncUninterruptibly();
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
}
}
}
@Override
public ByteBuf allocateSendBuffer(int size) throws IOException {
checkConnected();
return channel.alloc().ioBuffer(size, size);
}
@Override
public void send(ByteBuf output) throws IOException {
checkConnected();
int length = output.readableBytes();
if (length == 0) {
return;
}
LOG.trace("Attempted write of: {} bytes", length);
channel.writeAndFlush(new BinaryWebSocketFrame(output));
}
@Override
public NettyTransportListener getTransportListener() {
return listener;
}
@Override
public void setTransportListener(NettyTransportListener listener) {
this.listener = listener;
}
@Override
public NettyTransportOptions getTransportOptions() {
if (options == null) {
if (isSSL()) {
options = NettyTransportSslOptions.INSTANCE;
} else {
options = NettyTransportOptions.INSTANCE;
}
}
return options;
}
@Override
public URI getRemoteLocation() {
return remote;
}
@Override
public Principal getLocalPrincipal() {
if (!isSSL()) {
throw new UnsupportedOperationException("Not connected to a secure channel");
}
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
return sslHandler.engine().getSession().getLocalPrincipal();
}
//----- Internal implementation details, can be overridden as needed --//
protected String getRemoteHost() {
return remote.getHost();
}
protected int getRemotePort() {
int port = remote.getPort();
if (port <= 0) {
if (isSSL()) {
port = getSslOptions().getDefaultSslPort();
} else {
port = getTransportOptions().getDefaultTcpPort();
}
}
return port;
}
protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
if (options.getSendBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
}
if (options.getReceiveBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
}
if (options.getTrafficClass() != -1) {
bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
}
}
protected void configureChannel(final Channel channel) throws Exception {
if (isSSL()) {
SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
LOG.trace("SSL Handshake has completed: {}", channel);
connectionEstablished(channel);
} else {
LOG.trace("SSL Handshake has failed: {}", channel);
connectionFailed(channel, IOExceptionSupport.create(future.cause()));
}
}
});
channel.pipeline().addLast(sslHandler);
}
channel.pipeline().addLast(new HttpClientCodec());
channel.pipeline().addLast(new HttpObjectAggregator(8192));
channel.pipeline().addLast(new NettyTcpTransportHandler());
}
protected void handleConnected(final Channel channel) throws Exception {
if (!isSSL()) {
connectionEstablished(channel);
}
}
//----- State change handlers and checks ---------------------------------//
/**
* Called when the transport has successfully connected and is ready for use.
*/
protected void connectionEstablished(Channel connectedChannel) {
LOG.info("WebSocket connectionEstablished! {}", connectedChannel);
channel = connectedChannel;
connected.set(true);
}
/**
* Called when the transport connection failed and an error should be returned.
*
* @param failedChannel
* The Channel instance that failed.
* @param cause
* An IOException that describes the cause of the failed connection.
*/
protected void connectionFailed(Channel failedChannel, IOException cause) {
failureCause = IOExceptionSupport.create(cause);
channel = failedChannel;
connected.set(false);
handshakeFuture.setFailure(cause);
}
private NettyTransportSslOptions getSslOptions() {
return (NettyTransportSslOptions) getTransportOptions();
}
private void checkConnected() throws IOException {
if (!connected.get()) {
throw new IOException("Cannot send to a non-connected transport.");
}
}
//----- Handle connection events -----------------------------------------//
private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<Object> {
private final WebSocketClientHandshaker handshaker;
public NettyTcpTransportHandler() {
handshaker = WebSocketClientHandshakerFactory.newHandshaker(
remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders());
}
@Override
public void handlerAdded(ChannelHandlerContext context) {
LOG.trace("Handler has become added! Channel is {}", context.channel());
handshakeFuture = context.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has become active! Channel is {}", context.channel());
handshaker.handshake(context.channel());
}
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportClosed listener");
listener.onTransportClosed();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage());
LOG.trace("Error Stack: ", cause);
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportError listener");
if (pendingFailure != null) {
listener.onTransportError(pendingFailure);
} else {
listener.onTransportError(cause);
}
} else {
// Hold the first failure for later dispatch if connect succeeds.
// This will then trigger disconnect using the first error reported.
if (pendingFailure != null) {
LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
pendingFailure = cause;
}
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
LOG.trace("New data read: incoming: {}", message);
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) message);
LOG.info("WebSocket Client connected! {}", ctx.channel());
handshakeFuture.setSuccess();
return;
}
// We shouldn't get this since we handle the handshake previously.
if (message instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) message;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) message;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
LOG.warn("WebSocket Client received message: " + textFrame.text());
ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
} else if (frame instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
listener.onData(binaryFrame.content());
} else if (frame instanceof PongWebSocketFrame) {
LOG.trace("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
LOG.trace("WebSocket Client received closing");
ch.close();
}
}
}
}

View File

@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -32,14 +34,30 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Test handling of heartbeats requested by the broker.
*/
@RunWith(Parameterized.class)
public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 3000;
@Parameters(name="connector={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"amqp", false},
{"amqp+ws", false},
});
}
public AmqpBrokerReuqestedHearbeatsTest(String connectorScheme, boolean secure) {
super(connectorScheme, secure);
}
@Override
protected String getAdditionalConfig() {
return "&transport.wireFormat.idleTimeout=" + TEST_IDLE_TIMEOUT;

View File

@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -32,14 +34,30 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Tests that cover broker behavior when the client requests heartbeats
*/
@RunWith(Parameterized.class)
public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 3000;
@Parameters(name="connector={0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"amqp", false},
{"amqp+ws", false},
});
}
public AmqpClientRequestsHeartbeatsTest(String connectorScheme, boolean secure) {
super(connectorScheme, secure);
}
@Override
protected String getAdditionalConfig() {
return "&transport.wireFormat.idleTimeout=0";

View File

@ -25,6 +25,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import org.apache.activemq.transport.amqp.AmqpSupport;
@ -37,16 +39,34 @@ import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Test broker handling of AMQP connections with various configurations.
*/
@RunWith(Parameterized.class)
public class AmqpConnectionsTest extends AmqpClientTestSupport {
private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
@Parameters(name="{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"amqp", false},
{"amqp+ws", false},
{"amqp+ssl", true},
{"amqp+wss", true}
});
}
public AmqpConnectionsTest(String connectorScheme, boolean secure) {
super(connectorScheme, secure);
}
@Test(timeout = 60000)
public void testCanConnect() throws Exception {
AmqpClient client = createAmqpClient();

View File

@ -20,7 +20,7 @@
#
log4j.rootLogger=WARN, console, file
log4j.logger.org.apache.activemq=INFO
log4j.logger.org.apache.activemq.transport.amqp=DEBUG
log4j.logger.org.apache.activemq.transport.amqp=TRACE
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
log4j.logger.org.fusesource=INFO

View File

@ -19,6 +19,7 @@ package org.apache.activemq.transport.vm;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.security.cert.X509Certificate;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -34,6 +35,7 @@ import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -438,4 +440,19 @@ public class VMTransport implements Transport, Task {
public int getReceiveCounter() {
return receiveCounter;
}
@Override
public X509Certificate[] getPeerCertificates() {
return null;
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
}
@Override
public WireFormat getWireFormat() {
return null;
}
}

View File

@ -18,7 +18,10 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.net.URI;
import java.security.cert.X509Certificate;
import org.apache.activemq.Service;
import org.apache.activemq.wireformat.WireFormat;
/**
* Represents the client side of a transport allowing messages to be sent
@ -116,6 +119,7 @@ public interface Transport extends Service {
* @return true if updating uris is supported
*/
boolean isUpdateURIsSupported();
/**
* reconnect to another location
* @param uri
@ -139,4 +143,25 @@ public interface Transport extends Service {
* @return a counter which gets incremented as data is read from the transport.
*/
int getReceiveCounter();
/**
* @return the Certificates provided by the peer, or null if not a secure channel.
*/
X509Certificate[] getPeerCertificates();
/**
* Sets the certificates provided by the connected peer.
*
* @param certificates
* the Certificates provided by the peer.
*/
void setPeerCertificates(X509Certificate[] certificates);
/**
* Retrieves the WireFormat instance associated with this Transport instance.
*
* @return the WireFormat in use.
*/
WireFormat getWireFormat();
}

View File

@ -18,6 +18,9 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.net.URI;
import java.security.cert.X509Certificate;
import org.apache.activemq.wireformat.WireFormat;
/**
*
@ -30,10 +33,12 @@ public class TransportFilter implements TransportListener, Transport {
this.next = next;
}
@Override
public TransportListener getTransportListener() {
return transportListener;
}
@Override
public void setTransportListener(TransportListener channelListener) {
this.transportListener = channelListener;
if (channelListener == null) {
@ -48,6 +53,7 @@ public class TransportFilter implements TransportListener, Transport {
* @throws IOException
* if the next channel has not been set.
*/
@Override
public void start() throws Exception {
if (next == null) {
throw new IOException("The next channel has not been set.");
@ -61,10 +67,12 @@ public class TransportFilter implements TransportListener, Transport {
/**
* @see org.apache.activemq.Service#stop()
*/
@Override
public void stop() throws Exception {
next.stop();
}
@Override
public void onCommand(Object command) {
transportListener.onCommand(command);
}
@ -81,34 +89,42 @@ public class TransportFilter implements TransportListener, Transport {
return next.toString();
}
@Override
public void oneway(Object command) throws IOException {
next.oneway(command);
}
@Override
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
return next.asyncRequest(command, null);
}
@Override
public Object request(Object command) throws IOException {
return next.request(command);
}
@Override
public Object request(Object command, int timeout) throws IOException {
return next.request(command, timeout);
}
@Override
public void onException(IOException error) {
transportListener.onException(error);
}
@Override
public void transportInterupted() {
transportListener.transportInterupted();
}
@Override
public void transportResumed() {
transportListener.transportResumed();
}
@Override
public <T> T narrow(Class<T> target) {
if (target.isAssignableFrom(getClass())) {
return target.cast(this);
@ -116,6 +132,7 @@ public class TransportFilter implements TransportListener, Transport {
return next.narrow(target);
}
@Override
public String getRemoteAddress() {
return next.getRemoteAddress();
}
@ -124,35 +141,58 @@ public class TransportFilter implements TransportListener, Transport {
* @return
* @see org.apache.activemq.transport.Transport#isFaultTolerant()
*/
@Override
public boolean isFaultTolerant() {
return next.isFaultTolerant();
}
@Override
public boolean isDisposed() {
return next.isDisposed();
}
@Override
public boolean isConnected() {
return next.isConnected();
}
@Override
public void reconnect(URI uri) throws IOException {
next.reconnect(uri);
}
@Override
public int getReceiveCounter() {
return next.getReceiveCounter();
}
@Override
public boolean isReconnectSupported() {
return next.isReconnectSupported();
}
@Override
public boolean isUpdateURIsSupported() {
return next.isUpdateURIsSupported();
}
@Override
public void updateURIs(boolean rebalance,URI[] uris) throws IOException {
next.updateURIs(rebalance,uris);
}
@Override
public X509Certificate[] getPeerCertificates() {
return next.getPeerCertificates();
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
next.setPeerCertificates(certificates);
}
@Override
public WireFormat getWireFormat() {
return next.getWireFormat();
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -26,6 +26,7 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -61,6 +62,7 @@ import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -1448,4 +1450,28 @@ public class FailoverTransport implements CompositeTransport {
public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) {
this.warnAfterReconnectAttempts = warnAfterReconnectAttempts;
}
@Override
public X509Certificate[] getPeerCertificates() {
Transport transport = connectedTransport.get();
if (transport != null) {
return transport.getPeerCertificates();
} else {
return null;
}
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
}
@Override
public WireFormat getWireFormat() {
Transport transport = connectedTransport.get();
if (transport != null) {
return transport.getWireFormat();
} else {
return null;
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -19,6 +19,7 @@ package org.apache.activemq.transport.fanout;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
@ -44,13 +45,12 @@ import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Transport that fans out a connection to multiple brokers.
*
*
*/
public class FanoutTransport implements CompositeTransport {
@ -191,7 +191,7 @@ public class FanoutTransport implements CompositeTransport {
// Try to connect them up.
Iterator<FanoutTransportHandler> iter = transports.iterator();
for (int i = 0; iter.hasNext() && !disposed; i++) {
while (iter.hasNext() && !disposed) {
long now = System.currentTimeMillis();
@ -256,14 +256,13 @@ public class FanoutTransport implements CompositeTransport {
}
}
}
if (transports.size() == connectedCount || disposed) {
reconnectMutex.notifyAll();
return false;
}
}
}
}
try {
@ -428,7 +427,6 @@ public class FanoutTransport implements CompositeTransport {
primary.onException(e);
}
}
}
} catch (InterruptedException e) {
// Some one may be trying to stop our thread.
@ -448,8 +446,7 @@ public class FanoutTransport implements CompositeTransport {
}
return ((Message) command).getDestination().isTopic();
}
if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE || command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
return false;
}
return true;
@ -491,7 +488,6 @@ public class FanoutTransport implements CompositeTransport {
@Override
public <T> T narrow(Class<T> target) {
if (target.isAssignableFrom(getClass())) {
return target.cast(this);
}
@ -509,7 +505,6 @@ public class FanoutTransport implements CompositeTransport {
}
return null;
}
protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
@ -524,7 +519,6 @@ public class FanoutTransport implements CompositeTransport {
@Override
public void add(boolean reblance, URI uris[]) {
synchronized (reconnectMutex) {
for (int i = 0; i < uris.length; i++) {
URI uri = uris[i];
@ -537,6 +531,7 @@ public class FanoutTransport implements CompositeTransport {
break;
}
}
if (!match) {
FanoutTransportHandler th = new FanoutTransportHandler(uri);
transports.add(th);
@ -544,12 +539,10 @@ public class FanoutTransport implements CompositeTransport {
}
}
}
}
@Override
public void remove(boolean rebalance, URI uris[]) {
synchronized (reconnectMutex) {
for (int i = 0; i < uris.length; i++) {
URI uri = uris[i];
@ -567,13 +560,11 @@ public class FanoutTransport implements CompositeTransport {
}
}
}
}
@Override
public void reconnect(URI uri) throws IOException {
add(true, new URI[] { uri });
}
@Override
@ -585,12 +576,12 @@ public class FanoutTransport implements CompositeTransport {
public boolean isUpdateURIsSupported() {
return true;
}
@Override
public void updateURIs(boolean reblance, URI[] uris) throws IOException {
add(reblance, uris);
}
@Override
public String getRemoteAddress() {
if (primary != null) {
@ -625,7 +616,6 @@ public class FanoutTransport implements CompositeTransport {
return disposed;
}
@Override
public boolean isConnected() {
return connected;
@ -643,4 +633,19 @@ public class FanoutTransport implements CompositeTransport {
}
return rc;
}
@Override
public X509Certificate[] getPeerCertificates() {
return null;
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
}
@Override
public WireFormat getWireFormat() {
return null;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -18,16 +18,16 @@ package org.apache.activemq.transport.mock;
import java.io.IOException;
import java.net.URI;
import java.security.cert.X509Certificate;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.wireformat.WireFormat;
/**
*
*/
public class MockTransport extends DefaultTransportListener implements Transport {
protected Transport next;
@ -37,8 +37,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
this.next = next;
}
/**
*/
@Override
public synchronized void setTransportListener(TransportListener channelListener) {
this.transportListener = channelListener;
if (channelListener == null) {
@ -50,8 +49,10 @@ public class MockTransport extends DefaultTransportListener implements Transport
/**
* @see org.apache.activemq.Service#start()
* @throws IOException if the next channel has not been set.
* @throws IOException
* if the next channel has not been set.
*/
@Override
public void start() throws Exception {
if (getNext() == null) {
throw new IOException("The next channel has not been set.");
@ -65,6 +66,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
/**
* @see org.apache.activemq.Service#stop()
*/
@Override
public void stop() throws Exception {
getNext().stop();
}
@ -84,6 +86,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
/**
* @return Returns the packetListener.
*/
@Override
public synchronized TransportListener getTransportListener() {
return transportListener;
}
@ -93,18 +96,22 @@ public class MockTransport extends DefaultTransportListener implements Transport
return getNext().toString();
}
@Override
public void oneway(Object command) throws IOException {
getNext().oneway(command);
}
@Override
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
return getNext().asyncRequest(command, null);
}
@Override
public Object request(Object command) throws IOException {
return getNext().request(command);
}
@Override
public Object request(Object command, int timeout) throws IOException {
return getNext().request(command, timeout);
}
@ -114,6 +121,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
getTransportListener().onException(error);
}
@Override
public <T> T narrow(Class<T> target) {
if (target.isAssignableFrom(getClass())) {
return target.cast(this);
@ -131,6 +139,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
setNext(filter);
}
@Override
public String getRemoteAddress() {
return getNext().getRemoteAddress();
}
@ -138,35 +147,58 @@ public class MockTransport extends DefaultTransportListener implements Transport
/**
* @see org.apache.activemq.transport.Transport#isFaultTolerant()
*/
@Override
public boolean isFaultTolerant() {
return getNext().isFaultTolerant();
}
@Override
public boolean isDisposed() {
return getNext().isDisposed();
}
@Override
public boolean isConnected() {
return getNext().isConnected();
}
@Override
public void reconnect(URI uri) throws IOException {
getNext().reconnect(uri);
}
@Override
public int getReceiveCounter() {
return getNext().getReceiveCounter();
}
@Override
public boolean isReconnectSupported() {
return getNext().isReconnectSupported();
}
@Override
public boolean isUpdateURIsSupported() {
return getNext().isUpdateURIsSupported();
}
@Override
public void updateURIs(boolean reblance, URI[] uris) throws IOException {
getNext().updateURIs(reblance, uris);
}
@Override
public X509Certificate[] getPeerCertificates() {
return getNext().getPeerCertificates();
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
getNext().setPeerCertificates(certificates);
}
@Override
public WireFormat getWireFormat() {
return getNext().getWireFormat();
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.tcp;
import java.io.IOException;
@ -43,6 +42,7 @@ import org.apache.activemq.wireformat.WireFormat;
* unexpected situations may occur.
*/
public class SslTransport extends TcpTransport {
/**
* Connect to a remote node such as a Broker.
*
@ -56,6 +56,7 @@ public class SslTransport extends TcpTransport {
* @throws UnknownHostException If TcpTransport throws.
* @throws IOException If TcpTransport throws.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public SslTransport(WireFormat wireFormat, SSLSocketFactory socketFactory, URI remoteLocation, URI localLocation, boolean needClientAuth) throws IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
if (this.socket != null) {
@ -65,7 +66,7 @@ public class SslTransport extends TcpTransport {
// a single proxy to route to different messaging apps.
// On java 1.7 it seems like it can only be configured via reflection.
// todo: find out if this will work on java 1.8
// TODO: find out if this will work on java 1.8
HashMap props = new HashMap();
props.put("host", remoteLocation.getHost());
IntrospectionSupport.setProperties(this.socket, props);
@ -110,6 +111,7 @@ public class SslTransport extends TcpTransport {
/**
* @return peer certificate chain associated with the ssl socket
*/
@Override
public X509Certificate[] getPeerCertificates() {
SSLSocket sslSocket = (SSLSocket)this.socket;
@ -133,5 +135,4 @@ public class SslTransport extends TcpTransport {
public String toString() {
return "ssl://" + socket.getInetAddress() + ":" + socket.getPort();
}
}

View File

@ -1,5 +1,5 @@
/**
gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@ -51,12 +52,11 @@ import org.slf4j.LoggerFactory;
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
*
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
*
*/
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
protected final URI remoteLocation;
protected final URI localLocation;
protected final WireFormat wireFormat;
@ -754,4 +754,13 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
public WireFormat getWireFormat() {
return wireFormat;
}
@Override
public X509Certificate[] getPeerCertificates() {
return null;
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -20,7 +20,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.net.BindException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
@ -28,6 +27,7 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.DatagramChannel;
import java.security.cert.X509Certificate;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
@ -47,10 +47,9 @@ import org.slf4j.LoggerFactory;
/**
* An implementation of the {@link Transport} interface using raw UDP
*
*
*/
public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
private static final Logger LOG = LoggerFactory.getLogger(UdpTransport.class);
private static final int MAX_BIND_ATTEMPTS = 50;
@ -112,6 +111,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
/**
* A one way asynchronous send
*/
@Override
public void oneway(Object command) throws IOException {
oneway(command, targetAddress);
}
@ -130,6 +130,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
/**
* @return pretty print of 'this'
*/
@Override
public String toString() {
if (description != null) {
return description + port;
@ -141,6 +142,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
/**
* reads packets from a Socket
*/
@Override
public void run() {
LOG.trace("Consumer thread starting for: " + toString());
while (!isStopped()) {
@ -350,6 +352,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
return host;
}
@Override
protected void doStart() throws Exception {
getCommandChannel().start();
@ -419,6 +422,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
return new InetSocketAddress(port);
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
if (channel != null) {
channel.close();
@ -457,6 +461,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
}
}
@Override
public String getRemoteAddress() {
if (targetAddress != null) {
return "" + targetAddress;
@ -464,10 +469,20 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
return null;
}
@Override
public int getReceiveCounter() {
if (commandChannel == null) {
return 0;
}
return commandChannel.getReceiveCounter();
}
@Override
public X509Certificate[] getPeerCertificates() {
return null;
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.ws;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.activemq.transport.Transport;
/**
* Interface for a WebSocket Transport which provide hooks that a servlet can
* use to pass along WebSocket data and events.
*/
public interface WSTransport extends Transport {
/**
* WS Transport output sink, used to give the WS Transport implementation
* a way to produce output back to the WS connection without coupling it
* to the implementation.
*/
public interface WSTransportSink {
/**
* Called from the Transport when new outgoing String data is ready.
*
* @param data
* The newly prepared outgoing string data.
*
* @throws IOException if an error occurs or the socket doesn't support text data.
*/
void onSocketOutboundText(String data) throws IOException;
/**
* Called from the Transport when new outgoing String data is ready.
*
* @param data
* The newly prepared outgoing string data.
*
* @throws IOException if an error occurs or the socket doesn't support text data.
*/
void onSocketOutboundBinary(ByteBuffer data) throws IOException;
}
/**
* @return the WS sub-protocol that this transport is supplying.
*/
String getSubProtocol();
/**
* Called to provide the WS with the output data sink.
*/
void setTransportSink(WSTransportSink outputSink);
/**
* Called from the WebSocket framework when new incoming String data is received.
*
* @param data
* The newly received incoming data.
*
* @throws IOException if an error occurs or the socket doesn't support text data.
*/
void onWebSocketText(String data) throws IOException;
/**
* Called from the WebSocket framework when new incoming Binary data is received.
*
* @param data
* The newly received incoming data.
*
* @throws IOException if an error occurs or the socket doesn't support binary data.
*/
void onWebSocketBinary(ByteBuffer data) throws IOException;
/**
* Called from the WebSocket framework when the socket has been closed unexpectedly.
*
* @throws IOException if an error while processing the close.
*/
void onWebSocketClosed() throws IOException;
}

View File

@ -17,21 +17,22 @@
package org.apache.activemq.transport.http;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
/**
* A server side HTTP based TransportChannel which processes incoming packets
* and adds outgoing packets onto a {@link Queue} so that they can be dispatched
* by the HTTP GET requests from the client.
*
*
*/
public class BlockingQueueTransport extends TransportSupport {
public static final long MAX_TIMEOUT = 30000L;
private BlockingQueue<Object> queue;
@ -44,6 +45,7 @@ public class BlockingQueueTransport extends TransportSupport {
return queue;
}
@Override
public void oneway(Object command) throws IOException {
try {
boolean success = queue.offer(command, MAX_TIMEOUT, TimeUnit.MILLISECONDS);
@ -55,18 +57,35 @@ public class BlockingQueueTransport extends TransportSupport {
}
}
@Override
public String getRemoteAddress() {
return "blockingQueue_" + queue.hashCode();
}
@Override
protected void doStart() throws Exception {
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
}
@Override
public int getReceiveCounter() {
return 0;
}
@Override
public X509Certificate[] getPeerCertificates() {
return null;
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
}
@Override
public WireFormat getWireFormat() {
return null;
}
}

View File

@ -20,6 +20,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.security.cert.X509Certificate;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@ -30,6 +31,7 @@ import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
@ -396,4 +398,17 @@ public class HttpClientTransport extends HttpTransportSupport {
this.minSendAsCompressedSize = minSendAsCompressedSize;
}
@Override
public X509Certificate[] getPeerCertificates() {
return null;
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
}
@Override
public WireFormat getWireFormat() {
return getTextWireFormat();
}
}

View File

@ -33,4 +33,25 @@ public class HttpTransportUtils {
remoteAddress.append(request.getRemotePort());
return remoteAddress.toString();
}
public static String generateWsRemoteAddress(HttpServletRequest request, String schemePrefix) {
if (request == null) {
throw new IllegalArgumentException("HttpServletRequest must not be null.");
}
StringBuilder remoteAddress = new StringBuilder();
String scheme = request.getScheme();
if (scheme != null && scheme.equalsIgnoreCase("https")) {
scheme = schemePrefix + "+wss://";
} else {
scheme = schemePrefix + "+ws://";
}
remoteAddress.append(scheme);
remoteAddress.append(request.getRemoteAddr());
remoteAddress.append(":");
remoteAddress.append(request.getRemotePort());
return remoteAddress.toString();
}
}

View File

@ -147,7 +147,7 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
stompInactivityMonitor.onCommand(new KeepAliveInfo());
} else {
StompFrame frame = (StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8")));
frame.setTransportContext(getCertificates());
frame.setTransportContext(getPeerCertificates());
protocolConverter.onStompCommand(frame);
}
}
@ -162,11 +162,13 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
return socketTransportStarted.getCount() == 0;
}
public X509Certificate[] getCertificates() {
@Override
public X509Certificate[] getPeerCertificates() {
return certificates;
}
public void setCertificates(X509Certificate[] certificates) {
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
this.certificates = certificates;
}
}

View File

@ -129,10 +129,6 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList
}
}
/* (non-Javadoc)
* @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketClose(int, java.lang.String)
*/
@Override
public void onWebSocketClose(int statusCode, String reason) {
LOG.trace("STOMP WS Connection closed, code:{} message:{}", statusCode, reason);
@ -140,15 +136,10 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList
this.connection = null;
this.closeCode = statusCode;
this.closeMessage = reason;
}
/* (non-Javadoc)
* @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketConnect(org.eclipse.jetty.websocket.api.Session)
*/
@Override
public void onWebSocketConnect(
org.eclipse.jetty.websocket.api.Session session) {
public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) {
this.connection = session;
this.connectLatch.countDown();
}

View File

@ -23,6 +23,8 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.util.IOExceptionSupport;
@ -32,7 +34,9 @@ import org.apache.activemq.util.URISupport;
/**
* Factory for WebSocket (ws) transport
*/
public class WSTransportFactory extends TransportFactory {
public class WSTransportFactory extends TransportFactory implements BrokerServiceAware {
private BrokerService brokerService;
@Override
public TransportServer doBind(URI location) throws IOException {
@ -42,6 +46,7 @@ public class WSTransportFactory extends TransportFactory {
Map<String, Object> httpOptions = IntrospectionSupport.extractProperties(options, "http.");
Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "");
IntrospectionSupport.setProperties(result, transportOptions);
result.setBrokerService(brokerService);
result.setTransportOption(transportOptions);
result.setHttpOptions(httpOptions);
return result;
@ -49,4 +54,9 @@ public class WSTransportFactory extends TransportFactory {
throw IOExceptionSupport.create(e);
}
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
}

View File

@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.ws;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.transport.ws.WSTransport.WSTransportSink;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A proxy class that manages sending WebSocket events to the wrapped protocol level
* WebSocket Transport.
*/
public final class WSTransportProxy extends TransportSupport implements Transport, WebSocketListener, BrokerServiceAware, WSTransportSink {
private static final Logger LOG = LoggerFactory.getLogger(WSTransportProxy.class);
private final int ORDERLY_CLOSE_TIMEOUT = 10;
private final ReentrantLock protocolLock = new ReentrantLock();
private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
private final String remoteAddress;
private final Transport transport;
private final WSTransport wsTransport;
private Session session;
/**
* Create a WebSocket Transport Proxy instance that will pass
* along WebSocket event to the underlying protocol level transport.
*
* @param remoteAddress
* the provided remote address to report being connected to.
* @param transport
* The protocol level WebSocket Transport
*/
public WSTransportProxy(String remoteAddress, Transport transport) {
this.remoteAddress = remoteAddress;
this.transport = transport;
this.wsTransport = transport.narrow(WSTransport.class);
if (wsTransport == null) {
throw new IllegalArgumentException("Provided Transport does not contains a WSTransport implementation");
} else {
wsTransport.setTransportSink(this);
}
}
/**
* @return the sub-protocol of the proxied transport.
*/
public String getSubProtocol() {
return wsTransport.getSubProtocol();
}
/**
* Apply any configure Transport options on the wrapped Transport and its contained
* wireFormat instance.
*/
public void setTransportOptions(Map<String, Object> options) {
Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat.");
IntrospectionSupport.setProperties(transport, options);
IntrospectionSupport.setProperties(transport.getWireFormat(), wireFormatOptions);
}
@Override
public void setBrokerService(BrokerService brokerService) {
if (transport instanceof BrokerServiceAware) {
((BrokerServiceAware) transport).setBrokerService(brokerService);
}
}
@Override
public void oneway(Object command) throws IOException {
protocolLock.lock();
try {
transport.oneway(command);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
} finally {
protocolLock.unlock();
}
}
@Override
public X509Certificate[] getPeerCertificates() {
return transport.getPeerCertificates();
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
transport.setPeerCertificates(certificates);
}
@Override
public String getRemoteAddress() {
return remoteAddress;
}
@Override
public WireFormat getWireFormat() {
return transport.getWireFormat();
}
@Override
public int getReceiveCounter() {
return transport.getReceiveCounter();
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
transport.stop();
if (session != null && session.isOpen()) {
session.close();
}
}
@Override
protected void doStart() throws Exception {
socketTransportStarted.countDown();
transport.setTransportListener(getTransportListener());
transport.start();
}
//----- WebSocket methods being proxied to the WS Transport --------------//
@Override
public void onWebSocketBinary(byte[] payload, int offset, int length) {
if (!transportStartedAtLeastOnce()) {
LOG.debug("Waiting for WebSocket to be properly started...");
try {
socketTransportStarted.await();
} catch (InterruptedException e) {
LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
}
}
protocolLock.lock();
try {
wsTransport.onWebSocketBinary(ByteBuffer.wrap(payload, offset, length));
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
} finally {
protocolLock.unlock();
}
}
@Override
public void onWebSocketText(String data) {
if (!transportStartedAtLeastOnce()) {
LOG.debug("Waiting for WebSocket to be properly started...");
try {
socketTransportStarted.await();
} catch (InterruptedException e) {
LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
}
}
protocolLock.lock();
try {
wsTransport.onWebSocketText(data);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
} finally {
protocolLock.unlock();
}
}
@Override
public void onWebSocketClose(int statusCode, String reason) {
try {
if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
LOG.debug("WebSocket closed: code[{}] message[{}]", statusCode, reason);
wsTransport.onWebSocketClosed();
}
} catch (Exception e) {
LOG.debug("Failed to close WebSocket cleanly", e);
} finally {
if (protocolLock.isHeldByCurrentThread()) {
protocolLock.unlock();
}
}
}
@Override
public void onWebSocketConnect(Session session) {
this.session = session;
}
@Override
public void onWebSocketError(Throwable cause) {
onException(IOExceptionSupport.create(cause));
}
@Override
public void onSocketOutboundText(String data) throws IOException {
if (!transportStartedAtLeastOnce()) {
LOG.debug("Waiting for WebSocket to be properly started...");
try {
socketTransportStarted.await();
} catch (InterruptedException e) {
LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
}
}
LOG.trace("WS Proxy sending string of size {} out", data.length());
session.getRemote().sendString(data);
}
@Override
public void onSocketOutboundBinary(ByteBuffer data) throws IOException {
if (!transportStartedAtLeastOnce()) {
LOG.debug("Waiting for WebSocket to be properly started...");
try {
socketTransportStarted.await();
} catch (InterruptedException e) {
LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
}
}
LOG.trace("WS Proxy sending {} bytes out", data.remaining());
int limit = data.limit();
session.getRemote().sendBytes(data);
// Reset back to original limit and move position to match limit indicating
// that we read everything, the websocket sender clears the passed buffer
// which can make it look as if nothing was written.
data.limit(limit);
data.position(limit);
}
//----- Internal implementation ------------------------------------------//
private boolean transportStartedAtLeastOnce() {
return socketTransportStarted.getCount() == 0;
}
}

View File

@ -23,6 +23,8 @@ import java.util.Map;
import javax.servlet.Servlet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.transport.SocketConnectorFactory;
import org.apache.activemq.transport.WebTransportServerSupport;
@ -41,10 +43,13 @@ import org.slf4j.LoggerFactory;
* Creates a web server and registers web socket server
*
*/
public class WSTransportServer extends WebTransportServerSupport {
public class WSTransportServer extends WebTransportServerSupport implements BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(WSTransportServer.class);
private BrokerService brokerService;
private WSServlet servlet;
public WSTransportServer(URI location) {
super(location);
this.bindAddress = location;
@ -105,8 +110,10 @@ public class WSTransportServer extends WebTransportServerSupport {
}
private Servlet createWSServlet() throws Exception {
WSServlet servlet = new WSServlet();
servlet = new WSServlet();
servlet.setTransportOptions(transportOptions);
servlet.setBrokerService(brokerService);
return servlet;
}
@ -147,4 +154,12 @@ public class WSTransportServer extends WebTransportServerSupport {
public boolean isSslServer() {
return false;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
if (servlet != null) {
servlet.setBrokerService(brokerService);
}
}
}

View File

@ -18,17 +18,26 @@
package org.apache.activemq.transport.ws.jetty9;
import java.io.IOException;
import java.util.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.activemq.jms.pool.IntrospectionSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.util.HttpTransportUtils;
import org.apache.activemq.transport.ws.WSTransportProxy;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@ -39,7 +48,7 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
/**
* Handle connection upgrade requests and creates web sockets
*/
public class WSServlet extends WebSocketServlet {
public class WSServlet extends WebSocketServlet implements BrokerServiceAware {
private static final long serialVersionUID = -4716657876092884139L;
@ -49,6 +58,11 @@ public class WSServlet extends WebSocketServlet {
private final static Map<String, Integer> mqttProtocols = new ConcurrentHashMap<>();
private Map<String, Object> transportOptions;
private BrokerService brokerService;
private enum Protocol {
MQTT, STOMP, UNKNOWN
}
static {
stompProtocols.put("v12.stomp", 3);
@ -80,33 +94,90 @@ public class WSServlet extends WebSocketServlet {
@Override
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
WebSocketListener socket;
boolean isMqtt = false;
Protocol requestedProtocol = Protocol.UNKNOWN;
// When no sub-protocol is requested we default to STOMP for legacy reasons.
if (!req.getSubProtocols().isEmpty()) {
for (String subProtocol : req.getSubProtocols()) {
if (subProtocol.startsWith("mqtt")) {
isMqtt = true;
requestedProtocol = Protocol.MQTT;
} else if (subProtocol.contains("stomp")) {
requestedProtocol = Protocol.STOMP;
}
}
if (isMqtt) {
socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols,req.getSubProtocols(), "mqtt"));
((MQTTSocket)socket).setTransportOptions(new HashMap(transportOptions));
((MQTTSocket)socket).setPeerCertificates(req.getCertificates());
} else {
socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
((StompSocket)socket).setCertificates(req.getCertificates());
resp.setAcceptedSubProtocol(getAcceptedSubProtocol(stompProtocols,req.getSubProtocols(), "stomp"));
requestedProtocol = Protocol.STOMP;
}
switch (requestedProtocol) {
case MQTT:
socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
((MQTTSocket) socket).setTransportOptions(new HashMap<String, Object>(transportOptions));
((MQTTSocket) socket).setPeerCertificates(req.getCertificates());
resp.setAcceptedSubProtocol(getAcceptedSubProtocol(mqttProtocols, req.getSubProtocols(), "mqtt"));
break;
case UNKNOWN:
socket = findWSTransport(req, resp);
if (socket != null) {
break;
}
case STOMP:
socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
((StompSocket) socket).setPeerCertificates(req.getCertificates());
resp.setAcceptedSubProtocol(getAcceptedSubProtocol(stompProtocols, req.getSubProtocols(), "stomp"));
break;
default:
socket = null;
listener.onAcceptError(new IOException("Unknown protocol requested"));
break;
}
if (socket != null) {
listener.onAccept((Transport) socket);
}
return socket;
}
});
}
private String getAcceptedSubProtocol(final Map<String, Integer> protocols,
List<String> subProtocols, String defaultProtocol) {
private WebSocketListener findWSTransport(ServletUpgradeRequest request, ServletUpgradeResponse response) {
WSTransportProxy proxy = null;
for (String subProtocol : request.getSubProtocols()) {
try {
String remoteAddress = HttpTransportUtils.generateWsRemoteAddress(request.getHttpServletRequest(), subProtocol);
URI remoteURI = new URI(remoteAddress);
TransportFactory factory = TransportFactory.findTransportFactory(remoteURI);
if (factory instanceof BrokerServiceAware) {
((BrokerServiceAware) factory).setBrokerService(brokerService);
}
Transport transport = factory.doConnect(remoteURI);
proxy = new WSTransportProxy(remoteAddress, transport);
proxy.setPeerCertificates(request.getCertificates());
proxy.setTransportOptions(transportOptions);
response.setAcceptedSubProtocol(proxy.getSubProtocol());
} catch (Exception e) {
proxy = null;
// Keep going and try any other sub-protocols present.
continue;
}
}
return proxy;
}
private String getAcceptedSubProtocol(final Map<String, Integer> protocols, List<String> subProtocols, String defaultProtocol) {
List<SubProtocol> matchedProtocols = new ArrayList<>();
if (subProtocols != null && subProtocols.size() > 0) {
//detect which subprotocols match accepted protocols and add to the list
// detect which subprotocols match accepted protocols and add to the
// list
for (String subProtocol : subProtocols) {
Integer priority = protocols.get(subProtocol);
if (subProtocol != null && priority != null) {
@ -131,6 +202,7 @@ public class WSServlet extends WebSocketServlet {
private class SubProtocol {
private String protocol;
private Integer priority;
public SubProtocol(String protocol, Integer priority) {
this.protocol = protocol;
this.priority = priority;
@ -140,4 +212,9 @@ public class WSServlet extends WebSocketServlet {
public void setTransportOptions(Map<String, Object> transportOptions) {
this.transportOptions = transportOptions;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
}

View File

@ -22,6 +22,8 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
@ -32,7 +34,9 @@ import org.apache.activemq.util.URISupport;
/**
* Factory for Secure WebSocket (wss) transport
*/
public class WSSTransportFactory extends TransportFactory {
public class WSSTransportFactory extends TransportFactory implements BrokerServiceAware {
private BrokerService brokerService;
@Override
public TransportServer doBind(URI location) throws IOException {
@ -44,9 +48,15 @@ public class WSSTransportFactory extends TransportFactory {
IntrospectionSupport.setProperties(result, transportOptions);
result.setTransportOption(transportOptions);
result.setHttpOptions(httpOptions);
result.setBrokerService(brokerService);
return result;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
}

View File

@ -250,9 +250,6 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe
}
}
/* (non-Javadoc)
* @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketClose(int, java.lang.String)
*/
@Override
public void onWebSocketClose(int statusCode, String reason) {
LOG.trace("MQTT WS Connection closed, code:{} message:{}", statusCode, reason);
@ -263,14 +260,9 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe
}
/* (non-Javadoc)
* @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketConnect(org.eclipse.jetty.websocket.api.Session)
*/
@Override
public void onWebSocketConnect(
org.eclipse.jetty.websocket.api.Session session) {
public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) {
this.connection = session;
this.connectLatch.countDown();
}
}

View File

@ -43,13 +43,9 @@ public class StompWSConnectionTimeoutTest extends WSTransportTestSupport {
super.setUp();
wsStompConnection = new StompWSConnection();
// WebSocketClientFactory clientFactory = new WebSocketClientFactory();
// clientFactory.start();
wsClient = new WebSocketClient();
wsClient.start();
wsClient.connect(wsStompConnection, wsConnectUri);
if (!wsStompConnection.awaitConnection(30, TimeUnit.SECONDS)) {
throw new IOException("Could not connect to STOMP WS endpoint");

View File

@ -69,18 +69,16 @@ public class WSTransportTestSupport {
LOG.info("========== Finished test: {} ==========", name.getMethodName());
}
// protected String getWSConnectorURI() {
// return "ws://127.0.0.1:" + getProxyPort() +
// "?allowLinkStealing=" + isAllowLinkStealing() +
// "&websocket.maxTextMessageSize=99999&" +
// "transport.maxIdleTime=1001";
// }
protected String getWSConnectionURI() {
return "ws://127.0.0.1:" + getProxyPort();
}
protected String getWSConnectorURI() {
return "ws://127.0.0.1:" + getProxyPort() +
"?allowLinkStealing=" + isAllowLinkStealing() +
"&websocket.maxTextMessageSize=99999&" +
"transport.idleTimeout=1001";
"&websocket.maxTextMessageSize=99999" +
"&transport.idleTimeout=1001" +
"&trace=true&transport.trace=true";
}
protected boolean isAllowLinkStealing() {

View File

@ -20,8 +20,10 @@
#
log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.transport.ws=DEBUG
log4j.logger.org.apache.activemq.transport.ws=TRACE
log4j.logger.org.apache.activemq.transport.http=DEBUG
log4j.logger.org.apache.activemq.transport.amqp=TRACE
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=TRACE
#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG

View File

@ -62,10 +62,6 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-stomp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-partition</artifactId>

View File

@ -17,26 +17,27 @@
package org.apache.activemq.transport;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
/**
*
*
*/
public class StubTransport extends TransportSupport {
private Queue<Object> queue = new ConcurrentLinkedQueue<Object>();
private volatile int receiveCounter;
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
}
@Override
protected void doStart() throws Exception {
}
@Override
public void oneway(Object command) throws IOException {
receiveCounter++;
queue.add(command);
@ -46,12 +47,28 @@ public class StubTransport extends TransportSupport {
return queue;
}
@Override
public String getRemoteAddress() {
return null;
}
@Override
public int getReceiveCounter() {
return receiveCounter;
}
@Override
public X509Certificate[] getPeerCertificates() {
return null;
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
}
@Override
public WireFormat getWireFormat() {
return null;
}
}

View File

@ -35,7 +35,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class AutoTransportConfigureTest {
@ -49,12 +48,7 @@ public class AutoTransportConfigureTest {
@Parameters
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[][]{
{"auto"},
{"auto+nio"},
{"auto+ssl"},
{"auto+nio+ssl"}
});
return Arrays.asList(new Object[][] { { "auto" }, { "auto+nio" }, { "auto+ssl" }, { "auto+nio+ssl" } });
}
private String transportType;
@ -110,8 +104,8 @@ public class AutoTransportConfigureTest {
@Test
public void testUrlConfigurationOpenWireSuccess() throws Exception {
//Will work because max frame size only applies to amqp
createBroker(transportType + "://localhost:0?wireFormat.amqp.maxFrameSize=10");
// Will work because max frame size only applies to stomp
createBroker(transportType + "://localhost:0?wireFormat.stomp.maxFrameSize=10");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
@ -119,8 +113,8 @@ public class AutoTransportConfigureTest {
@Test(expected = JMSException.class)
public void testUrlConfigurationOpenWireNotAvailable() throws Exception {
//only amqp is available so should fail
createBroker(transportType + "://localhost:0?auto.protocols=amqp");
// only stomp is available so should fail
createBroker(transportType + "://localhost:0?auto.protocols=stomp");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
@ -137,13 +131,12 @@ public class AutoTransportConfigureTest {
@Test
public void testUrlConfigurationOpenWireAndAmqpAvailable() throws Exception {
createBroker(transportType + "://localhost:0?auto.protocols=default,amqp");
createBroker(transportType + "://localhost:0?auto.protocols=default,stomp");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
}
protected void sendMessage(Connection connection) throws JMSException {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -152,5 +145,4 @@ public class AutoTransportConfigureTest {
message.setText("this is a test");
producer.send(message);
}
}

View File

@ -103,7 +103,7 @@
<zookeeper-version>3.4.6</zookeeper-version>
<qpid-proton-version>0.13.0</qpid-proton-version>
<qpid-jms-version>0.9.0</qpid-jms-version>
<netty-all-version>4.0.33.Final</netty-all-version>
<netty-all-version>4.0.37.Final</netty-all-version>
<regexp-version>1.3</regexp-version>
<rome-version>1.0</rome-version>
<saxon-version>9.5.1-5</saxon-version>