Adding support for auto detection of wire protocols over a transport.
OpenWire, AMQP, STOMP, and MQTT can all be detected and the broker
will properly handle each one over a given Transport.  Currently
auto TCP, NIO, SSL, and NIO+SSL transports can handle auto-detection
of the wire format and client but support could be added in the
future for other transports like websockets.
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-07-16 18:23:24 +00:00
parent 9f50ce3d00
commit 04ee70a161
97 changed files with 3917 additions and 113 deletions

View File

@ -205,6 +205,27 @@
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/auto/*Test.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests.windows.excludes</id>

View File

@ -23,6 +23,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import javax.net.SocketFactory;
import javax.net.ssl.SSLEngine;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
@ -38,7 +39,14 @@ public class AmqpNioSslTransport extends NIOSSLTransport {
}
public AmqpNioSslTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
super(wireFormat, socket, null, null, null);
frameReader.setWireFormat((AmqpWireFormat) wireFormat);
}
public AmqpNioSslTransport(WireFormat wireFormat, Socket socket,
SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) throws IOException {
super(wireFormat, socket, engine, initBuffer, inputBuffer);
frameReader.setWireFormat((AmqpWireFormat) wireFormat);
}
@ -55,4 +63,37 @@ public class AmqpNioSslTransport extends NIOSSLTransport {
protected void processCommand(ByteBuffer plain) throws Exception {
frameReader.parse(plain);
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.nio.NIOSSLTransport#secureRead(java.nio.ByteBuffer)
*/
@Override
protected void doInit() {
if (initBuffer != null) {
nextFrameSize = -1;
serviceRead();
}
}
@Override
protected int secureRead(ByteBuffer plain) throws Exception {
if (initBuffer != null) {
initBuffer.buffer.flip();
if (initBuffer.buffer.hasRemaining()) {
plain.flip();
for (int i =0; i < 8; i++) {
plain.put(initBuffer.buffer.get());
}
plain.flip();
processCommand(plain);
initBuffer.buffer.clear();
return 8;
}
}
return super.secureRead(plain);
}
}

View File

@ -21,15 +21,18 @@ import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
@ -61,6 +64,13 @@ public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory {
return new AmqpNioSslTransport(wf, socketFactory, location, localLocation);
}
@Override
public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer)
throws IOException {
return new AmqpNioSslTransport(wireFormat, socket, engine, initBuffer, inputBuffer);
}
@Override
public TransportServer doBind(URI location) throws IOException {
if (SslContext.getCurrentSslContext() != null) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp;
import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
@ -63,6 +64,12 @@ public class AmqpNioTransport extends TcpTransport {
frameReader.setWireFormat((AmqpWireFormat) wireFormat);
}
public AmqpNioTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
super(wireFormat, socket, initBuffer);
frameReader.setWireFormat((AmqpWireFormat) wireFormat);
}
@Override
protected void initializeStreams() throws IOException {
channel = socket.getChannel();
@ -91,6 +98,17 @@ public class AmqpNioTransport extends TcpTransport {
NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
this.dataOut = new DataOutputStream(outPutStream);
this.buffOut = outPutStream;
try {
if (initBuffer != null) {
processBuffer(initBuffer.buffer, initBuffer.readSize);
}
} catch (IOException e) {
onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));
}
}
boolean magicRead = false;
@ -101,6 +119,7 @@ public class AmqpNioTransport extends TcpTransport {
while (isStarted()) {
// read channel
int readSize = channel.read(inputBuffer);
// channel is closed, cleanup
if (readSize == -1) {
onException(new EOFException());
@ -112,11 +131,7 @@ public class AmqpNioTransport extends TcpTransport {
break;
}
receiveCounter += readSize;
inputBuffer.flip();
frameReader.parse(inputBuffer);
inputBuffer.clear();
processBuffer(inputBuffer, readSize);
}
} catch (IOException e) {
onException(e);
@ -125,6 +140,14 @@ public class AmqpNioTransport extends TcpTransport {
}
}
protected void processBuffer(ByteBuffer buffer, int readSize) throws Exception {
receiveCounter += readSize;
buffer.flip();
frameReader.parse(buffer);
buffer.clear();
}
@Override
protected void doStart() throws Exception {
connect();

View File

@ -33,6 +33,7 @@ import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
@ -64,6 +65,12 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
return new AmqpNioTransport(wf, socketFactory, location, localLocation);
}
@Override
public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
InitBuffer initBuffer) throws IOException {
return new AmqpNioTransport(wireFormat, socket, initBuffer);
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {

View File

@ -79,6 +79,15 @@ public class AmqpTestSupport {
protected URI amqpNioPlusSslURI;
protected int amqpNioPlusSslPort;
protected URI autoURI;
protected int autoPort;
protected URI autoSslURI;
protected int autoSslPort;
protected URI autoNioURI;
protected int autoNioPort;
protected URI autoNioPlusSslURI;
protected int autoNioPlusSslPort;
protected URI openwireURI;
protected int openwirePort;
@ -176,6 +185,34 @@ public class AmqpTestSupport {
amqpNioPlusSslURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+nio+ssl port " + amqpNioPlusSslPort);
}
if (isUseAutoConnector()) {
connector = brokerService.addConnector(
"auto://0.0.0.0:" + autoPort + getAdditionalConfig());
autoPort = connector.getConnectUri().getPort();
autoURI = connector.getPublishableConnectURI();
LOG.debug("Using auto port " + autoPort);
}
if (isUseAutoSslConnector()) {
connector = brokerService.addConnector(
"auto+ssl://0.0.0.0:" + autoSslPort + getAdditionalConfig());
autoSslPort = connector.getConnectUri().getPort();
autoSslURI = connector.getPublishableConnectURI();
LOG.debug("Using auto+ssl port " + autoSslPort);
}
if (isUseAutoNioConnector()) {
connector = brokerService.addConnector(
"auto+nio://0.0.0.0:" + autoNioPort + getAdditionalConfig());
autoNioPort = connector.getConnectUri().getPort();
autoNioURI = connector.getPublishableConnectURI();
LOG.debug("Using auto+nio port " + autoNioPort);
}
if (isUseAutoNioPlusSslConnector()) {
connector = brokerService.addConnector(
"auto+nio+ssl://0.0.0.0:" + autoNioPlusSslPort + getAdditionalConfig());
autoNioPlusSslPort = connector.getConnectUri().getPort();
autoNioPlusSslURI = connector.getPublishableConnectURI();
LOG.debug("Using auto+nio+ssl port " + autoNioPlusSslPort);
}
}
protected boolean isPersistent() {
@ -206,6 +243,22 @@ public class AmqpTestSupport {
return false;
}
protected boolean isUseAutoConnector() {
return false;
}
protected boolean isUseAutoSslConnector() {
return false;
}
protected boolean isUseAutoNioConnector() {
return false;
}
protected boolean isUseAutoNioPlusSslConnector() {
return false;
}
protected String getAmqpTransformer() {
return "jms";
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.auto;
import java.net.URI;
import org.apache.activemq.transport.amqp.JMSClientNioPlusSslTest;
import org.apache.activemq.transport.amqp.JMSClientSslTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the JMS client when connected to the NIO+SSL transport.
*/
public class JMSClientAutoNioPlusSslTest extends JMSClientSslTest {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientAutoNioPlusSslTest.class);
@Override
protected URI getBrokerURI() {
return autoNioPlusSslURI;
}
@Override
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseAutoNioPlusSslConnector() {
return true;
}
@Override
protected String getTargetConnectorName() {
return "auto+nio+ssl";
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.auto;
import java.net.URI;
import org.apache.activemq.transport.amqp.JMSClientNioTest;
import org.apache.activemq.transport.amqp.JMSClientTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the JMS client when connected to the NIO transport.
*/
public class JMSClientAutoNioTest extends JMSClientTest {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientAutoNioTest.class);
@Override
protected URI getBrokerURI() {
return autoNioURI;
}
@Override
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseAutoNioConnector() {
return true;
}
@Override
protected String getTargetConnectorName() {
return "auto+nio";
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.auto;
import java.net.URI;
import org.apache.activemq.transport.amqp.JMSClientNioPlusSslTest;
import org.apache.activemq.transport.amqp.JMSClientSslTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the JMS client when connected to the NIO+SSL transport.
*/
public class JMSClientAutoPlusSslTest extends JMSClientSslTest {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientAutoPlusSslTest.class);
@Override
protected URI getBrokerURI() {
return autoSslURI;
}
@Override
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseAutoSslConnector() {
return true;
}
@Override
protected String getTargetConnectorName() {
return "auto+ssl";
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.auto;
import java.net.URI;
import org.apache.activemq.transport.amqp.JMSClientTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the JMS client when connected to the NIO transport.
*/
public class JMSClientAutoTest extends JMSClientTest {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientAutoTest.class);
@Override
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseAutoConnector() {
return true;
}
@Override
protected URI getBrokerURI() {
return autoURI;
}
@Override
protected String getTargetConnectorName() {
return "auto";
}
}

View File

@ -237,6 +237,27 @@
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -239,6 +239,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
if (!stopping.get() && !pendingStop) {
transportException.set(e);
e.printStackTrace();
if (TRANSPORTLOG.isDebugEnabled()) {
TRANSPORTLOG.debug(this + " failed: " + e, e);
} else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.auto;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLServerSocketFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.transport.tcp.SslTransportServer;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AutoSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(AutoSslTransportFactory.class);
protected BrokerService brokerService;
/* (non-Javadoc)
* @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
*/
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
private Set<String> enabledProtocols;
/**
* Overriding to use SslTransportServer and allow for proper reflection.
*/
@Override
public TransportServer doBind(final URI location) throws IOException {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto.");
this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols"));
ServerSocketFactory serverSocketFactory = createServerSocketFactory();
AutoSslTransportServer server = createAutoSslTransportServer(location, (SSLServerSocketFactory)serverSocketFactory);
if (options.get("allowLinkStealing") != null){
allowLinkStealingSet = true;
}
IntrospectionSupport.setProperties(server, options);
server.setAutoTransportOptions(IntrospectionSupport.extractProperties(options, "auto."));
server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport."));
server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options));
server.bind();
return server;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
boolean allowLinkStealingSet = false;
/**
* Allows subclasses of SslTransportFactory to create custom instances of
* SslTransportServer.
*
* @param location
* @param serverSocketFactory
* @return
* @throws IOException
* @throws URISyntaxException
*/
// @Override
protected AutoSslTransportServer createAutoSslTransportServer(final URI location, SSLServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
AutoSslTransportServer server = new AutoSslTransportServer(this, location, serverSocketFactory,
this.brokerService, enabledProtocols) {
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format)
throws IOException {
if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) {
this.setAllowLinkStealing(true);
}
return super.createTransport(socket, format);
}
};
return server;
}
}

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.auto;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Set;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocket;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.transport.tcp.SslTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
/**
* An SSL TransportServer.
*
* Allows for client certificate authentication (refer to setNeedClientAuth for
* details).
* NOTE: Client certificate authentication is disabled by default.
*
*/
public class AutoSslTransportServer extends AutoTcpTransportServer {
// Specifies if sockets created from this server should needClientAuth.
private boolean needClientAuth;
// Specifies if sockets created from this server should wantClientAuth.
private boolean wantClientAuth;
// /**
// * Creates a ssl transport server for the specified url using the provided
// * serverSocketFactory
// *
// * @param transportFactory The factory used to create transports when connections arrive.
// * @param location The location of the broker to bind to.
// * @param serverSocketFactory The factory used to create this server.
// * @throws IOException passed up from TcpTransportFactory.
// * @throws URISyntaxException passed up from TcpTransportFactory.
// */
// public SslTransportServer(SslTransportFactory transportFactory, URI location, SSLServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
// super(transportFactory, location, serverSocketFactory);
// }
public AutoSslTransportServer(SslTransportFactory transportFactory,
URI location, SSLServerSocketFactory serverSocketFactory,
BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException {
super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols);
// TODO Auto-generated constructor stub
}
/**
* Sets whether client authentication should be required
* Must be called before {@link #bind()}
* Note: Calling this method clears the wantClientAuth flag
* in the underlying implementation.
*/
public void setNeedClientAuth(boolean needAuth) {
this.needClientAuth = needAuth;
}
/**
* Returns whether client authentication should be required.
*/
public boolean getNeedClientAuth() {
return this.needClientAuth;
}
/**
* Returns whether client authentication should be requested.
*/
public boolean getWantClientAuth() {
return this.wantClientAuth;
}
/**
* Sets whether client authentication should be requested.
* Must be called before {@link #bind()}
* Note: Calling this method clears the needClientAuth flag
* in the underlying implementation.
*/
public void setWantClientAuth(boolean wantAuth) {
this.wantClientAuth = wantAuth;
}
/**
* Binds this socket to the previously specified URI.
*
* Overridden to allow for proper handling of needClientAuth.
*
* @throws IOException passed up from TcpTransportServer.
*/
@Override
public void bind() throws IOException {
super.bind();
if (needClientAuth) {
((SSLServerSocket)this.serverSocket).setNeedClientAuth(true);
} else if (wantClientAuth) {
((SSLServerSocket)this.serverSocket).setWantClientAuth(true);
}
}
/**
* Used to create Transports for this server.
*
* Overridden to allow the use of SslTransports (instead of TcpTransports).
*
* @param socket The incoming socket that will be wrapped into the new Transport.
* @param format The WireFormat being used.
* @return The newly return (SSL) Transport.
* @throws IOException
*/
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException {
return new SslTransport(format, (SSLSocket)socket, this.initBuffer);
}
@Override
public boolean isSslServer() {
return true;
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.auto;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
/**
*
*
*/
public class AutoTcpTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
protected BrokerService brokerService;
/* (non-Javadoc)
* @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
*/
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
public TransportServer doBind(final URI location) throws IOException {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto.");
this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols"));
ServerSocketFactory serverSocketFactory = createServerSocketFactory();
AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
//server.setWireFormatFactory(createWireFormatFactory(options));
server.setWireFormatFactory(new OpenWireFormatFactory());
if (options.get("allowLinkStealing") != null){
allowLinkStealingSet = true;
}
IntrospectionSupport.setProperties(server, options);
server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport."));
server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options));
server.bind();
return server;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
boolean allowLinkStealingSet = false;
private Set<String> enabledProtocols;
@Override
protected AutoTcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
AutoTcpTransportServer server = new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) {
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format)
throws IOException {
if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) {
this.setAllowLinkStealing(true);
}
return super.createTransport(socket, format);
}
};
return server;
}
}

View File

@ -0,0 +1,298 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.auto;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.transport.protocol.AmqpProtocolVerifier;
import org.apache.activemq.broker.transport.protocol.MqttProtocolVerifier;
import org.apache.activemq.broker.transport.protocol.OpenWireProtocolVerifier;
import org.apache.activemq.broker.transport.protocol.ProtocolVerifier;
import org.apache.activemq.broker.transport.protocol.StompProtocolVerifier;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A TCP based implementation of {@link TransportServer}
*/
public class AutoTcpTransportServer extends TcpTransportServer {
private static final Logger LOG = LoggerFactory.getLogger(AutoTcpTransportServer.class);
protected Map<String, Map<String, Object>> wireFormatOptions;
protected Map<String, Object> autoTransportOptions;
protected Set<String> enabledProtocols;
protected final Map<String, ProtocolVerifier> protocolVerifiers = new ConcurrentHashMap<String, ProtocolVerifier>();
protected BrokerService brokerService;
private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
private final ConcurrentMap<String, TransportFactory> transportFactories = new ConcurrentHashMap<String, TransportFactory>();
private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
public WireFormatFactory findWireFormatFactory(String scheme, Map<String, Map<String, Object>> options) throws IOException {
WireFormatFactory wff = null;
try {
wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(scheme);
if (options != null) {
IntrospectionSupport.setProperties(wff, options.get(AutoTransportUtils.ALL));
IntrospectionSupport.setProperties(wff, options.get(scheme));
}
if (wff instanceof OpenWireFormatFactory) {
protocolVerifiers.put(AutoTransportUtils.OPENWIRE, new OpenWireProtocolVerifier((OpenWireFormatFactory) wff));
}
return wff;
} catch (Throwable e) {
throw IOExceptionSupport.create("Could not create wire format factory for: " + scheme + ", reason: " + e, e);
}
}
public TransportFactory findTransportFactory(String scheme, Map<String, ?> options) throws IOException {
scheme = append(scheme, "nio");
scheme = append(scheme, "ssl");
if (scheme.isEmpty()) {
scheme = "tcp";
}
TransportFactory tf = transportFactories.get(scheme);
if (tf == null) {
// Try to load if from a META-INF property.
try {
tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
if (options != null)
IntrospectionSupport.setProperties(tf, options);
transportFactories.put(scheme, tf);
} catch (Throwable e) {
throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
}
}
return tf;
}
protected String append(String currentScheme, String scheme) {
if (this.getBindLocation().getScheme().contains(scheme)) {
if (!currentScheme.isEmpty()) {
currentScheme += "+";
}
currentScheme += scheme;
}
return currentScheme;
}
/**
* @param transportFactory
* @param location
* @param serverSocketFactory
* @throws IOException
* @throws URISyntaxException
*/
public AutoTcpTransportServer(TcpTransportFactory transportFactory,
URI location, ServerSocketFactory serverSocketFactory, BrokerService brokerService,
Set<String> enabledProtocols)
throws IOException, URISyntaxException {
super(transportFactory, location, serverSocketFactory);
service = Executors.newCachedThreadPool();
this.brokerService = brokerService;
this.enabledProtocols = enabledProtocols;
initProtocolVerifiers();
}
@Override
public void setWireFormatFactory(WireFormatFactory factory) {
super.setWireFormatFactory(factory);
initOpenWireProtocolVerifier();
}
protected void initProtocolVerifiers() {
initOpenWireProtocolVerifier();
if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.AMQP)) {
protocolVerifiers.put(AutoTransportUtils.AMQP, new AmqpProtocolVerifier());
}
if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.STOMP)) {
protocolVerifiers.put(AutoTransportUtils.STOMP, new StompProtocolVerifier());
}
if (isAllProtocols()|| enabledProtocols.contains(AutoTransportUtils.MQTT)) {
protocolVerifiers.put(AutoTransportUtils.MQTT, new MqttProtocolVerifier());
}
}
protected void initOpenWireProtocolVerifier() {
if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.OPENWIRE)) {
OpenWireProtocolVerifier owpv;
if (wireFormatFactory instanceof OpenWireFormatFactory) {
owpv = new OpenWireProtocolVerifier((OpenWireFormatFactory) wireFormatFactory);
} else {
owpv = new OpenWireProtocolVerifier(new OpenWireFormatFactory());
}
protocolVerifiers.put(AutoTransportUtils.OPENWIRE, owpv);
}
}
protected boolean isAllProtocols() {
return enabledProtocols == null || enabledProtocols.isEmpty();
}
protected final ExecutorService service;
/**
* This holds the initial buffer that has been read to detect the protocol.
*/
public InitBuffer initBuffer;
@Override
protected void handleSocket(final Socket socket) {
final AutoTcpTransportServer server = this;
//This needs to be done in a new thread because
//the socket might be waiting on the client to send bytes
//doHandleSocket can't complete until the protocol can be detected
service.submit(new Runnable() {
@Override
public void run() {
server.doHandleSocket(socket);
}
});
}
@Override
protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
InputStream is = socket.getInputStream();
//We need to peak at the first 8 bytes of the buffer to detect the protocol
Buffer magic = new Buffer(8);
magic.readFrom(is);
ProtocolInfo protocolInfo = detectProtocol(magic.getData());
initBuffer = new InitBuffer(8, ByteBuffer.allocate(8));
initBuffer.buffer.put(magic.getData());
if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) {
((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService);
}
WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format, protocolInfo.detectedTransportFactory);
return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory);
}
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException {
return new TcpTransport(format, socket, this.initBuffer);
}
/**
* @param socket
* @param format
* @param detectedTransportFactory
* @return
*/
protected TcpTransport createTransport(Socket socket, WireFormat format,
TcpTransportFactory detectedTransportFactory) throws IOException {
return createTransport(socket, format);
}
public void setWireFormatOptions(Map<String, Map<String, Object>> wireFormatOptions) {
this.wireFormatOptions = wireFormatOptions;
}
public void setEnabledProtocols(Set<String> enabledProtocols) {
this.enabledProtocols = enabledProtocols;
}
public void setAutoTransportOptions(Map<String, Object> autoTransportOptions) {
this.autoTransportOptions = autoTransportOptions;
if (autoTransportOptions.get("protocols") != null)
this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoTransportOptions.get("protocols"));
}
protected ProtocolInfo detectProtocol(byte[] buffer) throws IOException {
TcpTransportFactory detectedTransportFactory = transportFactory;
WireFormatFactory detectedWireFormatFactory = wireFormatFactory;
boolean found = false;
for (String scheme : protocolVerifiers.keySet()) {
if (protocolVerifiers.get(scheme).isProtocol(buffer)) {
LOG.debug("Detected " + scheme);
detectedWireFormatFactory = findWireFormatFactory(scheme, wireFormatOptions);
if (scheme.equals("default")) {
scheme = "";
}
detectedTransportFactory = (TcpTransportFactory) findTransportFactory(scheme, transportOptions);
found = true;
break;
}
}
if (!found) {
throw new IllegalStateException("Could not detect wire format");
}
return new ProtocolInfo(detectedTransportFactory, detectedWireFormatFactory);
}
protected class ProtocolInfo {
public final TcpTransportFactory detectedTransportFactory;
public final WireFormatFactory detectedWireFormatFactory;
public ProtocolInfo(TcpTransportFactory detectedTransportFactory,
WireFormatFactory detectedWireFormatFactory) {
super();
this.detectedTransportFactory = detectedTransportFactory;
this.detectedWireFormatFactory = detectedWireFormatFactory;
}
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.auto;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.util.IntrospectionSupport;
/**
*
*
*/
public class AutoTransportUtils {
//wireformats
public static String ALL = "all";
public static String OPENWIRE = "default";
public static String STOMP = "stomp";
public static String AMQP = "amqp";
public static String MQTT = "mqtt";
//transports
public static String AUTO = "auto";
public static Map<String, Map<String, Object>> extractWireFormatOptions(Map<String, String> options ) {
Map<String, Map<String, Object>> wireFormatOptions = new HashMap<>();
if (options != null) {
wireFormatOptions.put(OPENWIRE, IntrospectionSupport.extractProperties(options, "wireFormat.default."));
wireFormatOptions.put(STOMP, IntrospectionSupport.extractProperties(options, "wireFormat.stomp."));
wireFormatOptions.put(AMQP, IntrospectionSupport.extractProperties(options, "wireFormat.amqp."));
wireFormatOptions.put(MQTT, IntrospectionSupport.extractProperties(options, "wireFormat.mqtt."));
wireFormatOptions.put(ALL, IntrospectionSupport.extractProperties(options, "wireFormat."));
}
return wireFormatOptions;
}
public static Set<String> parseProtocols(String protocolString) {
Set<String> protocolSet = new HashSet<>();;
if (protocolString != null && !protocolString.isEmpty()) {
protocolSet.addAll(Arrays.asList(protocolString.split(",")));
}
return protocolSet;
}
}

View File

@ -0,0 +1,122 @@
package org.apache.activemq.broker.transport.auto.nio;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Set;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.AutoInitNioSSLTransport;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
private static final Logger LOG = LoggerFactory.getLogger(AutoNIOSSLTransportServer.class);
private SSLContext context;
public AutoNIOSSLTransportServer(SSLContext context, TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory,
BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException {
super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols);
this.context = context;
}
private boolean needClientAuth;
private boolean wantClientAuth;
protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine,
InitBuffer initBuffer, ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException {
NIOSSLTransport transport = new NIOSSLTransport(format, socket, engine, initBuffer, inputBuffer);
if (context != null) {
transport.setSslContext(context);
}
transport.setNeedClientAuth(needClientAuth);
transport.setWantClientAuth(wantClientAuth);
return transport;
}
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException {
throw new UnsupportedOperationException("method not supported");
}
@Override
public boolean isSslServer() {
return true;
}
public boolean isNeedClientAuth() {
return this.needClientAuth;
}
public void setNeedClientAuth(boolean value) {
this.needClientAuth = value;
}
public boolean isWantClientAuth() {
return this.wantClientAuth;
}
public void setWantClientAuth(boolean value) {
this.wantClientAuth = value;
}
@Override
protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
//The SSLEngine needs to be initialized and handshake done to get the first command and detect the format
AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket);
if (context != null) {
in.setSslContext(context);
}
in.start();
SSLEngine engine = in.getSslSession();
//Wait for handshake to finish initializing
byte[] read = null;
do {
in.serviceRead();
} while((read = in.read) == null);
in.stop();
initBuffer = new InitBuffer(in.readSize, ByteBuffer.allocate(read.length));
initBuffer.buffer.put(read);
ProtocolInfo protocolInfo = detectProtocol(read);
if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) {
((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService);
}
WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format, engine, initBuffer, in.getInputBuffer(), protocolInfo.detectedTransportFactory);
return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory);
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.auto.nio;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import javax.net.SocketFactory;
import org.apache.activemq.transport.nio.NIOTransport;
import org.apache.activemq.wireformat.WireFormat;
/**
*
*
*/
public class AutoNIOTransport extends NIOTransport {
public AutoNIOTransport(WireFormat format, Socket socket,
InitBuffer initBuffer) throws IOException {
super(format, socket, initBuffer);
}
public AutoNIOTransport(WireFormat wireFormat, Socket socket)
throws IOException {
super(wireFormat, socket);
}
public AutoNIOTransport(WireFormat wireFormat, SocketFactory socketFactory,
URI remoteLocation, URI localLocation) throws UnknownHostException,
IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
boolean doneInitBuffer = false;
/**
* Read from the initial buffer if it is set
*/
@Override
protected int readFromBuffer() throws IOException {
int readSize = 0;
if (!doneInitBuffer) {
if (initBuffer == null) {
throw new IOException("Null initBuffer");
}
if (nextFrameSize == -1) {
readSize = 4;
this.initBuffer.buffer.flip();
for (int i = 0; i < 4; i++) {
currentBuffer.put(initBuffer.buffer.get());
}
} else {
for (int i = 0; i < 4; i++) {
currentBuffer.put(initBuffer.buffer.get());
}
readSize = 4;
doneInitBuffer = true;
}
} else {
readSize += channel.read(currentBuffer);
}
return readSize;
}
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.auto.nio;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLEngine;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer;
import org.apache.activemq.broker.transport.auto.AutoTransportUtils;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.transport.nio.NIOSSLTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
/**
*
*
*/
public class AutoNioSslTransportFactory extends NIOSSLTransportFactory implements BrokerServiceAware {
protected BrokerService brokerService;
/* (non-Javadoc)
* @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
*/
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
protected AutoNIOSSLTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new AutoNIOSSLTransportServer(context, this, location, serverSocketFactory, brokerService, enabledProtocols) {
@Override
protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine, InitBuffer initBuffer,
ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException {
NIOSSLTransport nioSslTransport = (NIOSSLTransport) detectedFactory.createTransport(
format, socket, engine, initBuffer, inputBuffer);
if (format.getClass().toString().contains("MQTT")) {
if (!allowLinkStealingSet) {
this.setAllowLinkStealing(true);
}
}
if (context != null) {
nioSslTransport.setSslContext(context);
}
nioSslTransport.setNeedClientAuth(isNeedClientAuth());
nioSslTransport.setWantClientAuth(isWantClientAuth());
return nioSslTransport;
}
};
}
boolean allowLinkStealingSet = false;
private Set<String> enabledProtocols;
@Override
public TransportServer doBind(final URI location) throws IOException {
try {
if (SslContext.getCurrentSslContext() != null) {
try {
context = SslContext.getCurrentSslContext().getSSLContext();
} catch (Exception e) {
throw new IOException(e);
}
}
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto.");
this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols"));
ServerSocketFactory serverSocketFactory = createServerSocketFactory();
AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
server.setWireFormatFactory(new OpenWireFormatFactory());
if (options.get("allowLinkStealing") != null){
allowLinkStealingSet = true;
}
IntrospectionSupport.setProperties(server, options);
server.setAutoTransportOptions(IntrospectionSupport.extractProperties(options, "auto."));
server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport."));
server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options));
server.bind();
return server;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
}

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.auto.nio;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer;
import org.apache.activemq.broker.transport.auto.AutoTransportUtils;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.nio.NIOTransport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
/**
*
*
*/
public class AutoNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
protected BrokerService brokerService;
/* (non-Javadoc)
* @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
*/
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@Override
protected AutoTcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) {
@Override
protected TcpTransport createTransport(Socket socket, WireFormat format, TcpTransportFactory detectedTransportFactory) throws IOException {
TcpTransport nioTransport = null;
if (detectedTransportFactory.getClass().equals(NIOTransportFactory.class)) {
nioTransport = new AutoNIOTransport(format, socket,this.initBuffer);
} else {
nioTransport = detectedTransportFactory.createTransport(
format, socket, this.initBuffer);
}
if (format.getClass().toString().contains("MQTT")) {
if (!allowLinkStealingSet) {
this.setAllowLinkStealing(true);
}
}
return nioTransport;
}
};
}
boolean allowLinkStealingSet = false;
private Set<String> enabledProtocols;
@Override
public TransportServer doBind(final URI location) throws IOException {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto.");
this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols"));
ServerSocketFactory serverSocketFactory = createServerSocketFactory();
AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory);
//server.setWireFormatFactory(createWireFormatFactory(options));
server.setWireFormatFactory(new OpenWireFormatFactory());
if (options.get("allowLinkStealing") != null){
allowLinkStealingSet = true;
}
IntrospectionSupport.setProperties(server, options);
server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport."));
server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options));
server.bind();
return server;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.protocol;
/**
*
*
*/
public class AmqpProtocolVerifier implements ProtocolVerifier {
static final byte[] PREFIX = new byte[] { 'A', 'M', 'Q', 'P' };
@Override
public boolean isProtocol(byte[] value) {
for (int i = 0; i < PREFIX.length; i++) {
if (value[i] != PREFIX[i])
return false;
}
return true;
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.protocol;
/**
*
*
*/
public class MqttProtocolVerifier implements ProtocolVerifier {
/* (non-Javadoc)
* @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
*/
@Override
public boolean isProtocol(byte[] value) {
boolean mqtt311 = value[4] == 77 && // M
value[5] == 81 && // Q
value[6] == 84 && // T
value[7] == 84; // T
boolean mqtt31 = value[4] == 77 && // M
value[5] == 81 && // Q
value[6] == 73 && // I
value[7] == 115; // s
return mqtt311 || mqtt31;
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.protocol;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.openwire.OpenWireFormatFactory;
/**
*
*
*/
public class OpenWireProtocolVerifier implements ProtocolVerifier {
protected final OpenWireFormatFactory wireFormatFactory;
public OpenWireProtocolVerifier(OpenWireFormatFactory wireFormatFactory) {
this.wireFormatFactory = wireFormatFactory;
}
/* (non-Javadoc)
* @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
*/
@Override
public boolean isProtocol(byte[] value) {
if (value.length < 8) {
throw new IllegalArgumentException("Protocol header length changed "
+ value.length);
}
int start = !((OpenWireFormat)wireFormatFactory.createWireFormat()).isSizePrefixDisabled() ? 4 : 0;
int j = 0;
// type
if (value[start] != WireFormatInfo.DATA_STRUCTURE_TYPE) {
return false;
}
start++;
WireFormatInfo info = new WireFormatInfo();
final byte[] magic = info.getMagic();
int remainingLen = value.length - start;
int useLen = remainingLen > magic.length ? magic.length : remainingLen;
useLen += start;
// magic
for (int i = start; i < useLen; i++) {
if (value[i] != magic[j]) {
return false;
}
j++;
}
return true;
}
}

View File

@ -0,0 +1,9 @@
package org.apache.activemq.broker.transport.protocol;
public interface ProtocolVerifier {
public boolean isProtocol(byte[] value);
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.transport.protocol;
import java.nio.charset.StandardCharsets;
/**
*
*
*/
public class StompProtocolVerifier implements ProtocolVerifier {
/* (non-Javadoc)
* @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
*/
@Override
public boolean isProtocol(byte[] value) {
String frameStart = new String(value, StandardCharsets.US_ASCII);
return frameStart.startsWith("CONNECT") || frameStart.startsWith("STOMP");
}
}

View File

@ -0,0 +1,234 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.nio;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This transport initializes the SSLEngine and reads the first command before
* handing off to the detected transport.
*
*/
public class AutoInitNioSSLTransport extends NIOSSLTransport {
private static final Logger LOG = LoggerFactory.getLogger(AutoInitNioSSLTransport.class);
public AutoInitNioSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
public AutoInitNioSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket, null, null, null);
}
@Override
public void setSslContext(SSLContext sslContext) {
this.sslContext = sslContext;
}
public ByteBuffer getInputBuffer() {
return this.inputBuffer;
}
@Override
protected void initializeStreams() throws IOException {
NIOOutputStream outputStream = null;
try {
channel = socket.getChannel();
channel.configureBlocking(false);
if (sslContext == null) {
sslContext = SSLContext.getDefault();
}
String remoteHost = null;
int remotePort = -1;
try {
URI remoteAddress = new URI(this.getRemoteAddress());
remoteHost = remoteAddress.getHost();
remotePort = remoteAddress.getPort();
} catch (Exception e) {
}
// initialize engine, the initial sslSession we get will need to be
// updated once the ssl handshake process is completed.
if (remoteHost != null && remotePort != -1) {
sslEngine = sslContext.createSSLEngine(remoteHost, remotePort);
} else {
sslEngine = sslContext.createSSLEngine();
}
sslEngine.setUseClientMode(false);
if (enabledCipherSuites != null) {
sslEngine.setEnabledCipherSuites(enabledCipherSuites);
}
if (enabledProtocols != null) {
sslEngine.setEnabledProtocols(enabledProtocols);
}
if (wantClientAuth) {
sslEngine.setWantClientAuth(wantClientAuth);
}
if (needClientAuth) {
sslEngine.setNeedClientAuth(needClientAuth);
}
sslSession = sslEngine.getSession();
inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
inputBuffer.clear();
outputStream = new NIOOutputStream(channel);
outputStream.setEngine(sslEngine);
this.dataOut = new DataOutputStream(outputStream);
this.buffOut = outputStream;
sslEngine.beginHandshake();
handshakeStatus = sslEngine.getHandshakeStatus();
doHandshake();
// detectReadyState();
} catch (Exception e) {
try {
if(outputStream != null) {
outputStream.close();
}
super.closeStreams();
} catch (Exception ex) {}
throw new IOException(e);
}
}
@Override
protected void doOpenWireInit() throws Exception {
}
@Override
protected void finishHandshake() throws Exception {
if (handshakeInProgress) {
handshakeInProgress = false;
nextFrameSize = -1;
// Once handshake completes we need to ask for the now real sslSession
// otherwise the session would return 'SSL_NULL_WITH_NULL_NULL' for the
// cipher suite.
sslSession = sslEngine.getSession();
}
}
public SSLEngine getSslSession() {
return this.sslEngine;
}
public volatile byte[] read;
public volatile int readSize;
@Override
public void serviceRead() {
try {
if (handshakeInProgress) {
doHandshake();
}
ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
plain.position(plain.limit());
while (true) {
if (!plain.hasRemaining()) {
int readCount = secureRead(plain);
if (readCount == 0) {
break;
}
// channel is closed, cleanup
if (readCount == -1) {
onException(new EOFException());
selection.close();
break;
}
receiveCounter += readCount;
}
if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
processCommand(plain);
//Break when command is found
break;
}
}
} catch (IOException e) {
onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));
}
}
@Override
protected void processCommand(ByteBuffer plain) throws Exception {
read = plain.array();
readSize = receiveCounter;
}
@Override
public void doStart() throws Exception {
taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task");
// no need to init as we can delay that until demand (eg in doHandshake)
connect();
//super.doStart();
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
if (taskRunnerFactory != null) {
taskRunnerFactory.shutdownNow();
taskRunnerFactory = null;
}
// if (selection != null) {
// selection.close();
// selection = null;
// }
}
}

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.broker.transport.auto.AutoTcpTransportFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.broker.transport.auto.nio.AutoNioTransportFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.broker.transport.auto.nio.AutoNioSslTransportFactory

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.broker.transport.auto.AutoSslTransportFactory

View File

@ -254,7 +254,27 @@
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -344,6 +344,27 @@
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
@ -314,7 +315,22 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
*/
protected Transport createTransport() throws JMSException {
try {
return TransportFactory.connect(brokerURL);
URI connectBrokerUL = brokerURL;
String scheme = brokerURL.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
}
if (scheme.equals("auto")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
} else if (scheme.equals("auto+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
} else if (scheme.equals("auto+nio")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
} else if (scheme.equals("auto+nio+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
}
return TransportFactory.connect(connectBrokerUL);
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
}

View File

@ -18,15 +18,19 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLEngine;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
@ -119,6 +123,9 @@ public abstract class TransportFactory {
WireFormat wf = createWireFormat(options);
Transport transport = createTransport(location, wf);
Transport rc = configure(transport, wf, options);
//remove auto
IntrospectionSupport.extractProperties(options, "auto.");
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}

View File

@ -69,16 +69,26 @@ public class NIOSSLTransport extends NIOTransport {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
public NIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
public NIOSSLTransport(WireFormat wireFormat, Socket socket, SSLEngine engine, InitBuffer initBuffer,
ByteBuffer inputBuffer) throws IOException {
super(wireFormat, socket, initBuffer);
this.sslEngine = engine;
if (engine != null)
this.sslSession = engine.getSession();
this.inputBuffer = inputBuffer;
}
public void setSslContext(SSLContext sslContext) {
this.sslContext = sslContext;
}
volatile boolean hasSslEngine = false;
@Override
protected void initializeStreams() throws IOException {
if (sslEngine != null) {
hasSslEngine = true;
}
NIOOutputStream outputStream = null;
try {
channel = socket.getChannel();
@ -100,41 +110,66 @@ public class NIOSSLTransport extends NIOTransport {
// initialize engine, the initial sslSession we get will need to be
// updated once the ssl handshake process is completed.
if (remoteHost != null && remotePort != -1) {
sslEngine = sslContext.createSSLEngine(remoteHost, remotePort);
} else {
sslEngine = sslContext.createSSLEngine();
if (!hasSslEngine) {
if (remoteHost != null && remotePort != -1) {
sslEngine = sslContext.createSSLEngine(remoteHost, remotePort);
} else {
sslEngine = sslContext.createSSLEngine();
}
sslEngine.setUseClientMode(false);
if (enabledCipherSuites != null) {
sslEngine.setEnabledCipherSuites(enabledCipherSuites);
}
if (enabledProtocols != null) {
sslEngine.setEnabledProtocols(enabledProtocols);
}
if (wantClientAuth) {
sslEngine.setWantClientAuth(wantClientAuth);
}
if (needClientAuth) {
sslEngine.setNeedClientAuth(needClientAuth);
}
sslSession = sslEngine.getSession();
inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
inputBuffer.clear();
}
sslEngine.setUseClientMode(false);
if (enabledCipherSuites != null) {
sslEngine.setEnabledCipherSuites(enabledCipherSuites);
}
if (enabledProtocols != null) {
sslEngine.setEnabledProtocols(enabledProtocols);
}
if (wantClientAuth) {
sslEngine.setWantClientAuth(wantClientAuth);
}
if (needClientAuth) {
sslEngine.setNeedClientAuth(needClientAuth);
}
sslSession = sslEngine.getSession();
inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
inputBuffer.clear();
outputStream = new NIOOutputStream(channel);
outputStream.setEngine(sslEngine);
this.dataOut = new DataOutputStream(outputStream);
this.buffOut = outputStream;
sslEngine.beginHandshake();
//If the sslEngine was not passed in, then handshake
if (!hasSslEngine)
sslEngine.beginHandshake();
handshakeStatus = sslEngine.getHandshakeStatus();
doHandshake();
if (!hasSslEngine)
doHandshake();
// if (hasSslEngine) {
selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
@Override
public void onSelect(SelectorSelection selection) {
serviceRead();
}
@Override
public void onError(SelectorSelection selection, Throwable error) {
if (error instanceof IOException) {
onException((IOException) error);
} else {
onException(IOExceptionSupport.create(error));
}
}
});
doInit();
} catch (Exception e) {
try {
if(outputStream != null) {
@ -146,6 +181,24 @@ public class NIOSSLTransport extends NIOTransport {
}
}
protected void doInit() throws Exception {
}
protected void doOpenWireInit() throws Exception {
//Do this later to let wire format negotiation happen
if (initBuffer != null && this.wireFormat instanceof OpenWireFormat) {
initBuffer.buffer.flip();
if (initBuffer.buffer.hasRemaining()) {
nextFrameSize = -1;
receiveCounter += initBuffer.readSize;
processCommand(initBuffer.buffer);
processCommand(initBuffer.buffer);
initBuffer.buffer.clear();
}
}
}
protected void finishHandshake() throws Exception {
if (handshakeInProgress) {
handshakeInProgress = false;
@ -176,12 +229,14 @@ public class NIOSSLTransport extends NIOTransport {
}
@Override
protected void serviceRead() {
public void serviceRead() {
try {
if (handshakeInProgress) {
doHandshake();
}
doOpenWireInit();
ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
plain.position(plain.limit());
@ -293,7 +348,7 @@ public class NIOSSLTransport extends NIOTransport {
doConsume(command);
nextFrameSize = -1;
currentBuffer = null;
}
}
}
}

View File

@ -18,20 +18,25 @@
package org.apache.activemq.transport.nio;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.SslTransport;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
@ -44,6 +49,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
protected SSLContext context;
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new NIOSSLTransportServer(context, this, location, serverSocketFactory);
}
@ -64,6 +70,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
* Overriding to allow for proper configuration through reflection but
* delegate to get common configuration
*/
@Override
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
if (transport instanceof SslTransport) {
SslTransport sslTransport = (SslTransport) transport.narrow(SslTransport.class);
@ -79,6 +86,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
/**
* Overriding to use SslTransports.
*/
@Override
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
URI localLocation = null;
@ -98,6 +106,13 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
return new SslTransport(wf, (SSLSocketFactory) socketFactory, location, localLocation, false);
}
@Override
public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer)
throws IOException {
return new NIOSSLTransport(wireFormat, socket, engine, initBuffer, inputBuffer);
}
/**
* Creates a new SSL SocketFactory. The given factory will use user-provided
* key and trust managers (if the user provided them).
@ -105,6 +120,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
* @return Newly created (Ssl)SocketFactory.
* @throws IOException
*/
@Override
protected SocketFactory createSocketFactory() throws IOException {
if (SslContext.getCurrentSslContext() != null) {
SslContext ctx = SslContext.getCurrentSslContext();

View File

@ -44,7 +44,7 @@ public class NIOSSLTransportServer extends TcpTransportServer {
@Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
NIOSSLTransport transport = new NIOSSLTransport(format, socket);
NIOSSLTransport transport = new NIOSSLTransport(format, socket, null, null, null);
if (context != null) {
transport.setSslContext(context);
}

View File

@ -58,6 +58,16 @@ public class NIOTransport extends TcpTransport {
super(wireFormat, socket);
}
/**
* @param format
* @param socket
* @param initBuffer
* @throws IOException
*/
public NIOTransport(WireFormat format, Socket socket, InitBuffer initBuffer) throws IOException {
super(format, socket, initBuffer);
}
@Override
protected void initializeStreams() throws IOException {
channel = socket.getChannel();
@ -91,11 +101,15 @@ public class NIOTransport extends TcpTransport {
this.buffOut = outPutStream;
}
protected int readFromBuffer() throws IOException {
return channel.read(currentBuffer);
}
protected void serviceRead() {
try {
while (true) {
int readSize = channel.read(currentBuffer);
int readSize = readFromBuffer();
if (readSize == -1) {
onException(new EOFException());
selection.close();

View File

@ -32,38 +32,52 @@ import javax.net.SocketFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
public class NIOTransportFactory extends TcpTransportFactory {
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) {
@Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new NIOTransport(format, socket);
}
};
}
@Override
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new NIOTransport(wf, socketFactory, location, localLocation);
}
@Override
public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
InitBuffer initBuffer) throws IOException {
return new NIOTransport(wireFormat, socket, initBuffer);
}
@Override
protected ServerSocketFactory createServerSocketFactory() {
return new ServerSocketFactory() {
@Override
public ServerSocket createServerSocket(int port) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
return serverSocketChannel.socket();
}
@Override
public ServerSocket createServerSocket(int port, int backlog) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
return serverSocketChannel.socket();
}
@Override
public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
@ -72,26 +86,31 @@ public class NIOTransportFactory extends TcpTransportFactory {
};
}
@Override
protected SocketFactory createSocketFactory() throws IOException {
return new SocketFactory() {
@Override
public Socket createSocket() throws IOException {
SocketChannel channel = SocketChannel.open();
return channel.socket();
}
@Override
public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress(host, port));
return channel.socket();
}
@Override
public Socket createSocket(InetAddress address, int port) throws IOException {
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress(address, port));
return channel.socket();
}
@Override
public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException {
SocketChannel channel = SocketChannel.open();
channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
@ -99,6 +118,7 @@ public class NIOTransportFactory extends TcpTransportFactory {
return channel.socket();
}
@Override
public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException {
SocketChannel channel = SocketChannel.open();
channel.socket().bind(new InetSocketAddress(localAddresss, localPort));

View File

@ -29,7 +29,6 @@ import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
@ -46,7 +45,7 @@ import org.apache.activemq.wireformat.WireFormat;
public class SslTransport extends TcpTransport {
/**
* Connect to a remote node such as a Broker.
*
*
* @param wireFormat The WireFormat to be used.
* @param socketFactory The socket factory to be used. Forcing SSLSockets
* for obvious reasons.
@ -76,7 +75,7 @@ public class SslTransport extends TcpTransport {
/**
* Initialize from a ServerSocket. No access to needClientAuth is given
* since it is already set within the provided socket.
*
*
* @param wireFormat The WireFormat to be used.
* @param socket The Socket to be used. Forcing SSL.
* @throws IOException If TcpTransport throws.
@ -85,12 +84,18 @@ public class SslTransport extends TcpTransport {
super(wireFormat, socket);
}
public SslTransport(WireFormat format, SSLSocket socket,
InitBuffer initBuffer) throws IOException {
super(format, socket, initBuffer);
}
/**
* Overriding in order to add the client's certificates to ConnectionInfo
* Commmands.
*
*
* @param command The Command coming in.
*/
@Override
public void doConsume(Object command) {
// The instanceof can be avoided, but that would require modifying the
// Command clas tree and that would require too much effort right
@ -98,15 +103,15 @@ public class SslTransport extends TcpTransport {
if (command instanceof ConnectionInfo) {
ConnectionInfo connectionInfo = (ConnectionInfo)command;
connectionInfo.setTransportContext(getPeerCertificates());
}
}
super.doConsume(command);
}
/**
* @return peer certificate chain associated with the ssl socket
*/
public X509Certificate[] getPeerCertificates() {
SSLSocket sslSocket = (SSLSocket)this.socket;
SSLSession sslSession = sslSocket.getSession();
@ -117,13 +122,14 @@ public class SslTransport extends TcpTransport {
} catch (SSLPeerUnverifiedException e) {
clientCertChain = null;
}
return clientCertChain;
}
/**
* @return pretty print of 'this'
*/
@Override
public String toString() {
return "ssl://" + socket.getInetAddress() + ":" + socket.getPort();
}

View File

@ -22,8 +22,8 @@ import java.io.InputStream;
/**
* An optimized buffered input stream for Tcp
*
*
*
*
*/
public class TcpBufferedInputStream extends FilterInputStream {
private static final int DEFAULT_BUFFER_SIZE = 8192;
@ -53,6 +53,7 @@ public class TcpBufferedInputStream extends FilterInputStream {
}
}
@Override
public int read() throws IOException {
if (position >= count) {
fill();
@ -81,6 +82,7 @@ public class TcpBufferedInputStream extends FilterInputStream {
return cnt;
}
@Override
public int read(byte b[], int off, int len) throws IOException {
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
@ -105,6 +107,7 @@ public class TcpBufferedInputStream extends FilterInputStream {
}
}
@Override
public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
@ -118,17 +121,34 @@ public class TcpBufferedInputStream extends FilterInputStream {
return skipped;
}
@Override
public int available() throws IOException {
return in.available() + (count - position);
}
@Override
public boolean markSupported() {
return false;
}
@Override
public void close() throws IOException {
if (in != null) {
in.close();
}
}
/**
* @param array
* @throws IOException
*/
public void unread(byte[] array) throws IOException {
int avail = internalBuffer.length - position;
if (array.length > avail) {
throw new IOException("Buffer is full, can't unread");
}
System.arraycopy(array, position, internalBuffer, 0, array.length);
count += array.length;
}
}

View File

@ -16,6 +16,27 @@ gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
*/
package org.apache.activemq.transport.tcp;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.apache.activemq.Service;
import org.apache.activemq.TransportLoggerSupport;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -28,18 +49,6 @@ import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.SocketFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
*
@ -62,6 +71,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected DataInputStream dataIn;
protected TimeStampStream buffOut = null;
protected final InitBuffer initBuffer;
/**
* The Traffic Class to be set on the socket.
*/
@ -149,6 +160,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
}
this.remoteLocation = remoteLocation;
this.localLocation = localLocation;
this.initBuffer = null;
setDaemon(false);
}
@ -160,16 +172,22 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* @throws IOException
*/
public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
this(wireFormat, socket, null);
}
public TcpTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
this.wireFormat = wireFormat;
this.socket = socket;
this.remoteLocation = null;
this.localLocation = null;
this.initBuffer = initBuffer;
setDaemon(true);
}
/**
* A one way asynchronous send
*/
@Override
public void oneway(Object command) throws IOException {
checkStarted();
wireFormat.marshal(command, dataOut);
@ -188,6 +206,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
/**
* reads packets from a Socket
*/
@Override
public void run() {
LOG.trace("TCP consumer thread for " + this + " starting");
this.runnerThread=Thread.currentThread();
@ -536,6 +555,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
// need a async task for this
final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
taskRunnerFactory.execute(new Runnable() {
@Override
public void run() {
LOG.trace("Closing socket {}", socket);
try {
@ -609,10 +629,16 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
super.fill();
}
};
//Unread the initBuffer that was used for protocol detection if it exists
//so the stream can start over
if (initBuffer != null) {
buffIn.unread(initBuffer.buffer.array());
}
this.dataIn = new DataInputStream(buffIn);
TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
this.dataOut = new DataOutputStream(outputStream);
this.buffOut = outputStream;
}
protected void closeStreams() throws IOException {
@ -628,6 +654,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
this.socketOptions = new HashMap<String, Object>(socketOptions);
}
@Override
public String getRemoteAddress() {
if (socket != null) {
SocketAddress address = socket.getRemoteSocketAddress();
@ -650,10 +677,24 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
return super.narrow(target);
}
@Override
public int getReceiveCounter() {
return receiveCounter;
}
public static class InitBuffer {
public final int readSize;
public final ByteBuffer buffer;
public InitBuffer(int readSize, ByteBuffer buffer) {
if (buffer == null) {
throw new IllegalArgumentException("Null buffer not allowed.");
}
this.readSize = readSize;
this.buffer = buffer;
}
}
/**
* @param sock The socket on which to set the Traffic Class.
* @return Whether or not the Traffic Class was set on the given socket.

View File

@ -17,14 +17,17 @@
package org.apache.activemq.transport.tcp;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLEngine;
import org.apache.activemq.TransportLoggerSupport;
import org.apache.activemq.openwire.OpenWireFormat;
@ -33,6 +36,7 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
@ -139,6 +143,15 @@ public class TcpTransportFactory extends TransportFactory {
return createTcpTransport(wf, socketFactory, location, localLocation);
}
public TcpTransport createTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
throw new IOException("createTransport() method not implemented!");
}
public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) throws IOException {
throw new IOException("createTransport() method not implemented!");
}
/**
* Allows subclasses of TcpTransportFactory to provide a create custom
* TcpTransport instances.

View File

@ -44,6 +44,7 @@ import org.apache.activemq.TransportLoggerSupport;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerThreadSupport;
import org.apache.activemq.transport.nio.SelectorManager;
@ -474,7 +475,11 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
return (InetSocketAddress) serverSocket.getLocalSocketAddress();
}
protected final void handleSocket(Socket socket) {
protected void handleSocket(Socket socket) {
doHandleSocket(socket);
}
final protected void doHandleSocket(Socket socket) {
boolean closeSocket = true;
try {
if (this.currentTransportCount.get() >= this.maximumConnections) {
@ -483,6 +488,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
"maximumConnections' property on the TCP transport configuration URI " +
"in the ActiveMQ configuration file (e.g., activemq.xml)");
} else {
currentTransportCount.incrementAndGet();
HashMap<String, Object> options = new HashMap<String, Object>();
options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
@ -496,22 +503,23 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
options.put("startLogging", Boolean.valueOf(startLogging));
options.putAll(transportOptions);
WireFormat format = wireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format);
TransportInfo transportInfo = configureTransport(this, socket);
closeSocket = false;
if (transport instanceof ServiceSupport) {
((ServiceSupport) transport).addServiceListener(this);
if (transportInfo.transport instanceof ServiceSupport) {
((ServiceSupport) transportInfo.transport).addServiceListener(this);
}
Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
Transport configuredTransport = transportInfo.transportFactory.serverConfigure(
transportInfo.transport, transportInfo.format, options);
getAcceptListener().onAccept(configuredTransport);
currentTransportCount.incrementAndGet();
}
} catch (SocketTimeoutException ste) {
// expect this to happen
currentTransportCount.decrementAndGet();
} catch (Exception e) {
currentTransportCount.decrementAndGet();
if (closeSocket) {
try {
socket.close();
@ -528,6 +536,24 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
}
}
protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
WireFormat format = wireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format);
return new TransportInfo(format, transport, transportFactory);
}
protected class TransportInfo {
final WireFormat format;
final Transport transport;
final TransportFactory transportFactory;
public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) {
this.format = format;
this.transport = transport;
this.transportFactory = transportFactory;
}
}
public int getSoTimeout() {
return soTimeout;
}
@ -567,6 +593,10 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
this.maximumConnections = maximumConnections;
}
public AtomicInteger getCurrentTransportCount() {
return currentTransportCount;
}
@Override
public void started(Service service) {
}

View File

@ -220,6 +220,26 @@
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -147,6 +147,27 @@
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -131,6 +131,27 @@
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

View File

@ -251,7 +251,28 @@
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests.aix.excludes</id>
<activation>

View File

@ -266,6 +266,27 @@
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

View File

@ -512,6 +512,27 @@
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests.windows.excludes</id>

View File

@ -289,6 +289,27 @@
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/auto/*Test.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<repositories>

View File

@ -23,6 +23,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import javax.net.SocketFactory;
import javax.net.ssl.SSLEngine;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
@ -37,7 +38,12 @@ public class MQTTNIOSSLTransport extends NIOSSLTransport {
}
public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
super(wireFormat, socket, null, null, null);
}
public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket,
SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) throws IOException {
super(wireFormat, socket, engine, initBuffer, inputBuffer);
}
@Override
@ -56,4 +62,19 @@ public class MQTTNIOSSLTransport extends NIOSSLTransport {
DataByteArrayInputStream dis = new DataByteArrayInputStream(fill);
codec.parse(dis, fill.length);
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.nio.NIOSSLTransport#doInit()
*/
@Override
protected void doInit() throws Exception {
if (initBuffer != null) {
nextFrameSize = -1;
receiveCounter += initBuffer.readSize;
initBuffer.buffer.flip();
processCommand(initBuffer.buffer);
}
}
}

View File

@ -21,14 +21,18 @@ import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
@ -39,6 +43,7 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
@Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket);
if (context != null) {
@ -56,6 +61,13 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
return new MQTTNIOSSLTransport(wf, socketFactory, location, localLocation);
}
@Override
public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer)
throws IOException {
return new MQTTNIOSSLTransport(wireFormat, socket, engine, initBuffer, inputBuffer);
}
@Override
public TransportServer doBind(URI location) throws IOException {
if (SslContext.getCurrentSslContext() != null) {

View File

@ -56,6 +56,10 @@ public class MQTTNIOTransport extends TcpTransport {
super(wireFormat, socket);
}
public MQTTNIOTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
super(wireFormat, socket, initBuffer);
}
@Override
protected void initializeStreams() throws IOException {
channel = socket.getChannel();
@ -84,6 +88,16 @@ public class MQTTNIOTransport extends TcpTransport {
dataOut = new DataOutputStream(outPutStream);
buffOut = outPutStream;
codec = new MQTTCodec(this, (MQTTWireFormat) getWireFormat());
try {
if (initBuffer != null) {
processBuffer(initBuffer.buffer, initBuffer.readSize);
}
} catch (IOException e) {
onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));
}
}
private void serviceRead() {
@ -103,14 +117,7 @@ public class MQTTNIOTransport extends TcpTransport {
break;
}
inputBuffer.flip();
DataByteArrayInputStream dis = new DataByteArrayInputStream(inputBuffer.array());
codec.parse(dis, readSize);
receiveCounter += readSize;
// clear the buffer
inputBuffer.clear();
processBuffer(inputBuffer, readSize);
}
} catch (IOException e) {
onException(e);
@ -119,6 +126,17 @@ public class MQTTNIOTransport extends TcpTransport {
}
}
protected void processBuffer(ByteBuffer buffer, int readSize) throws Exception {
buffer.flip();
DataByteArrayInputStream dis = new DataByteArrayInputStream(buffer.array());
codec.parse(dis, readSize);
receiveCounter += readSize;
// clear the buffer
buffer.clear();
}
@Override
protected void doStart() throws Exception {
connect();

View File

@ -33,6 +33,7 @@ import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
@ -66,6 +67,12 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
return new MQTTNIOTransport(wf, socketFactory, location, localLocation);
}
@Override
public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
InitBuffer initBuffer) throws IOException {
return new MQTTNIOTransport(wireFormat, socket, initBuffer);
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
/**
* Run the basic tests with the NIO Transport.
*/
public class MQTTAutoNioSslTest extends MQTTTest {
@Override
public String getProtocolScheme() {
return "auto+nio+ssl";
}
@Override
public boolean isUseSSL() {
return true;
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
/**
* Run the basic tests with the NIO Transport.
*/
public class MQTTAutoNioTest extends MQTTTest {
@Override
public String getProtocolScheme() {
return "auto+nio";
}
@Override
public boolean isUseSSL() {
return false;
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
/**
* Run the basic tests with the NIO Transport.
*/
public class MQTTAutoSslTest extends MQTTTest {
@Override
public String getProtocolScheme() {
return "auto+ssl";
}
@Override
public boolean isUseSSL() {
return true;
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.mqtt.auto;
import org.apache.activemq.transport.mqtt.MQTTTest;
/**
* Run the basic tests with the NIO Transport.
*/
public class MQTTAutoTest extends MQTTTest {
@Override
public String getProtocolScheme() {
return "auto";
}
@Override
public boolean isUseSSL() {
return false;
}
}

View File

@ -129,6 +129,27 @@
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

View File

@ -104,5 +104,29 @@
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -168,6 +168,27 @@
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

View File

@ -207,6 +207,27 @@
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

View File

@ -323,6 +323,27 @@
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests.windows.excludes</id>

View File

@ -164,6 +164,27 @@
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/auto/*Test.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests.windows.excludes</id>

View File

@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import javax.net.SocketFactory;
import javax.net.ssl.SSLEngine;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
@ -40,7 +41,14 @@ public class StompNIOSSLTransport extends NIOSSLTransport {
}
public StompNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
super(wireFormat, socket);
super(wireFormat, socket, null, null, null);
}
public StompNIOSSLTransport(WireFormat wireFormat, Socket socket,
SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) throws IOException {
super(wireFormat, socket, engine, initBuffer, inputBuffer);
}
@Override
@ -71,4 +79,18 @@ public class StompNIOSSLTransport extends NIOSSLTransport {
super.doConsume(command);
}
/* (non-Javadoc)
* @see org.apache.activemq.transport.nio.NIOSSLTransport#doInit()
*/
@Override
protected void doInit() throws Exception {
if (initBuffer != null) {
nextFrameSize = -1;
// System.out.println("length1: " + initBuffer.array().length);
receiveCounter += initBuffer.readSize;
initBuffer.buffer.flip();
processCommand(initBuffer.buffer);
}
}
}

View File

@ -21,16 +21,19 @@ import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.nio.NIOSSLTransportServer;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
@ -62,6 +65,13 @@ public class StompNIOSSLTransportFactory extends StompNIOTransportFactory {
return new StompNIOSSLTransport(wf, socketFactory, location, localLocation);
}
@Override
public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer)
throws IOException {
return new StompNIOSSLTransport(wireFormat, socket, engine, initBuffer, inputBuffer);
}
@Override
public TransportServer doBind(URI location) throws IOException {
if (SslContext.getCurrentSslContext() != null) {

View File

@ -57,6 +57,10 @@ public class StompNIOTransport extends TcpTransport {
super(wireFormat, socket);
}
public StompNIOTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
super(wireFormat, socket, initBuffer);
}
@Override
protected void initializeStreams() throws IOException {
channel = socket.getChannel();
@ -84,14 +88,24 @@ public class StompNIOTransport extends TcpTransport {
this.dataOut = new DataOutputStream(outPutStream);
this.buffOut = outPutStream;
codec = new StompCodec(this);
try {
if (initBuffer != null) {
processBuffer(initBuffer.buffer, initBuffer.readSize);
}
} catch (IOException e) {
onException(e);
} catch (Throwable e) {
onException(IOExceptionSupport.create(e));
}
}
private void serviceRead() {
try {
while (true) {
// read channel
int readSize = channel.read(inputBuffer);
// channel is closed, cleanup
if (readSize == -1) {
onException(new EOFException());
@ -104,15 +118,7 @@ public class StompNIOTransport extends TcpTransport {
break;
}
receiveCounter += readSize;
inputBuffer.flip();
ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
codec.parse(input, readSize);
// clear the buffer
inputBuffer.clear();
processBuffer(inputBuffer, readSize);
}
} catch (IOException e) {
onException(e);
@ -121,6 +127,18 @@ public class StompNIOTransport extends TcpTransport {
}
}
protected void processBuffer(ByteBuffer buffer, int readSize) throws Exception {
receiveCounter += readSize;
buffer.flip();
ByteArrayInputStream input = new ByteArrayInputStream(buffer.array());
codec.parse(input, readSize);
// clear the buffer
buffer.clear();
}
@Override
protected void doStart() throws Exception {
connect();

View File

@ -34,6 +34,7 @@ import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
@ -47,22 +48,32 @@ public class StompNIOTransportFactory extends NIOTransportFactory implements Bro
private BrokerContext brokerContext = null;
@Override
protected String getDefaultWireFormatType() {
return "stomp";
}
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) {
@Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new StompNIOTransport(format, socket);
}
};
}
@Override
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new StompNIOTransport(wf, socketFactory, location, localLocation);
}
@Override
public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
InitBuffer initBuffer) throws IOException {
return new StompNIOTransport(wireFormat, socket, initBuffer);
}
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
@ -76,6 +87,7 @@ public class StompNIOTransportFactory extends NIOTransportFactory implements Bro
return transport;
}
@Override
@SuppressWarnings("rawtypes")
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
transport = new StompTransportFilter(transport, format, brokerContext);
@ -83,6 +95,7 @@ public class StompNIOTransportFactory extends NIOTransportFactory implements Bro
return super.compositeConfigure(transport, format, options);
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerContext = brokerService.getBrokerContext();
}

View File

@ -62,6 +62,10 @@ public class StompTestSupport {
protected int sslPort;
protected int nioPort;
protected int nioSslPort;
protected int autoPort;
protected int autoSslPort;
protected int autoNioPort;
protected int autoNioSslPort;
protected String jmsUri = "vm://localhost";
protected StompConnection stompConnection;
protected ActiveMQConnectionFactory cf;
@ -81,6 +85,10 @@ public class StompTestSupport {
s.port = 5676;
s.nioPort = 5677;
s.nioSslPort = 5678;
s.autoPort = 5679;
s.autoSslPort = 5680;
s.autoNioPort = 5681;
s.autoNioSslPort = 5682;
s.startBroker();
while(true) {
@ -288,25 +296,49 @@ public class StompTestSupport {
connector = brokerService.addConnector(
"stomp://0.0.0.0:" + port + getAdditionalConfig());
port = connector.getConnectUri().getPort();
LOG.debug("Using amqp port " + port);
LOG.debug("Using stomp port " + port);
}
if (isUseSslConnector()) {
connector = brokerService.addConnector(
"stomp+ssl://0.0.0.0:" + sslPort + getAdditionalConfig());
sslPort = connector.getConnectUri().getPort();
LOG.debug("Using amqp+ssl port " + sslPort);
LOG.debug("Using stomp+ssl port " + sslPort);
}
if (isUseNioConnector()) {
connector = brokerService.addConnector(
"stomp+nio://0.0.0.0:" + nioPort + getAdditionalConfig());
nioPort = connector.getConnectUri().getPort();
LOG.debug("Using amqp+nio port " + nioPort);
LOG.debug("Using stomp+nio port " + nioPort);
}
if (isUseNioPlusSslConnector()) {
connector = brokerService.addConnector(
"stomp+nio+ssl://0.0.0.0:" + nioSslPort + getAdditionalConfig());
nioSslPort = connector.getConnectUri().getPort();
LOG.debug("Using amqp+nio+ssl port " + nioSslPort);
LOG.debug("Using stomp+nio+ssl port " + nioSslPort);
}
if (isUseAutoConnector()) {
connector = brokerService.addConnector(
"auto://0.0.0.0:" + autoPort + getAdditionalConfig());
autoPort = connector.getConnectUri().getPort();
LOG.debug("Using auto port " + autoPort);
}
if (isUseAutoSslConnector()) {
connector = brokerService.addConnector(
"auto+ssl://0.0.0.0:" + autoSslPort + getAdditionalConfig());
autoSslPort = connector.getConnectUri().getPort();
LOG.debug("Using auto+ssl port " + autoSslPort);
}
if (isUseAutoNioConnector()) {
connector = brokerService.addConnector(
"auto+nio://0.0.0.0:" + autoNioPort + getAdditionalConfig());
autoNioPort = connector.getConnectUri().getPort();
LOG.debug("Using auto+nio port " + autoNioPort);
}
if (isUseAutoNioPlusSslConnector()) {
connector = brokerService.addConnector(
"auto+nio+ssl://0.0.0.0:" + autoNioSslPort + getAdditionalConfig());
autoNioSslPort = connector.getConnectUri().getPort();
LOG.debug("Using auto+nio+ssl port " + autoNioSslPort);
}
}
@ -334,6 +366,22 @@ public class StompTestSupport {
return false;
}
protected boolean isUseAutoConnector() {
return false;
}
protected boolean isUseAutoSslConnector() {
return false;
}
protected boolean isUseAutoNioConnector() {
return false;
}
protected boolean isUseAutoNioPlusSslConnector() {
return false;
}
protected String getAdditionalConfig() {
return "";
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp.auto;
import java.io.IOException;
import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.transport.stomp.StompTest;
public class StompAutoNioSslTest extends StompTest {
@Override
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseAutoNioPlusSslConnector() {
return true;
}
@Override
protected Socket createSocket() throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", this.autoNioSslPort);
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp.auto;
import java.io.IOException;
import java.net.Socket;
import org.apache.activemq.transport.stomp.StompTest;
public class StompAutoNioTest extends StompTest {
@Override
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseAutoNioConnector() {
return true;
}
@Override
protected Socket createSocket() throws IOException {
return new Socket("127.0.0.1", this.autoNioPort);
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp.auto;
import java.io.IOException;
import java.net.Socket;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.transport.stomp.StompTest;
public class StompAutoSslTest extends StompTest {
@Override
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseAutoSslConnector() {
return true;
}
@Override
protected Socket createSocket() throws IOException {
SocketFactory factory = SSLSocketFactory.getDefault();
return factory.createSocket("127.0.0.1", this.autoSslPort);
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp.auto;
import java.io.IOException;
import java.net.Socket;
import org.apache.activemq.transport.stomp.StompTest;
public class StompAutoTest extends StompTest {
@Override
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseAutoConnector() {
return true;
}
@Override
protected Socket createSocket() throws IOException {
return new Socket("127.0.0.1", this.autoPort);
}
}

View File

@ -579,6 +579,27 @@
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/transport/auto/**/*Test.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.exclude-broken-tests</id>
<activation>

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto;
import java.net.URI;
import java.net.URISyntaxException;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.transport.TransportBrokerTestSupport;
public class AutoNIOSslTransportBrokerTest extends TransportBrokerTestSupport {
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
@Override
protected String getBindLocation() {
return "auto+nio+ssl://localhost:0?transport.soWriteTimeout=20000";
}
@Override
protected URI getBindURI() throws URISyntaxException {
return new URI("auto+nio+ssl://localhost:0?soWriteTimeout=20000");
}
@Override
protected void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
//System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager");
maxWait = 10000;
super.setUp();
}
public static Test suite() {
return suite(AutoNIOSslTransportBrokerTest.class);
}
public static void main(String[] args) {
TestRunner.run(suite());
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.transport.TransportBrokerTestSupport;
public class AutoNIOTransportBrokerTest extends TransportBrokerTestSupport {
@Override
protected String getBindLocation() {
return "auto+nio://localhost:0";
}
public static Test suite() {
return suite(AutoNIOTransportBrokerTest.class);
}
public static void main(String[] args) {
TestRunner.run(suite());
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto;
import java.net.URI;
import java.net.URISyntaxException;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.transport.TransportBrokerTestSupport;
public class AutoSslTransportBrokerTest extends TransportBrokerTestSupport {
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
@Override
protected String getBindLocation() {
return "auto+ssl://localhost:0?transport.soWriteTimeout=20000";
}
@Override
protected URI getBindURI() throws URISyntaxException {
return new URI("auto+ssl://localhost:0?soWriteTimeout=20000");
}
@Override
protected void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
//System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager");
maxWait = 10000;
super.setUp();
}
public static Test suite() {
return suite(AutoSslTransportBrokerTest.class);
}
public static void main(String[] args) {
TestRunner.run(suite());
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.transport.TransportBrokerTestSupport;
public class AutoTransportBrokerTest extends TransportBrokerTestSupport {
@Override
protected String getBindLocation() {
return "auto://localhost:0";
}
public static Test suite() {
return suite(AutoTransportBrokerTest.class);
}
public static void main(String[] args) {
TestRunner.run(suite());
}
}

View File

@ -0,0 +1,156 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto;
import java.util.Arrays;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class AutoTransportConfigureTest {
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
private BrokerService brokerService;
private String url;
@Parameters
public static Iterable<Object[]> parameters() {
return Arrays.asList(new Object[][]{
{"auto"},
{"auto+nio"},
{"auto+ssl"},
{"auto+nio+ssl"}
});
}
private String transportType;
public AutoTransportConfigureTest(String transportType) {
super();
this.transportType = transportType;
}
@Before
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
}
@After
public void tearDown() throws Exception{
if (this.brokerService != null) {
this.brokerService.stop();
this.brokerService.waitUntilStopped();
}
}
protected void createBroker(String uriConfig) throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
url = brokerService.addConnector(uriConfig).getPublishableConnectString();
brokerService.start();
brokerService.waitUntilStarted();
}
@Test(expected=JMSException.class)
public void testUrlConfiguration() throws Exception {
createBroker(transportType + "://localhost:0?wireFormat.maxFrameSize=10");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
}
@Test(expected=JMSException.class)
public void testUrlConfigurationOpenWireFail() throws Exception {
createBroker(transportType + "://localhost:0?wireFormat.default.maxFrameSize=10");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
}
@Test
public void testUrlConfigurationOpenWireSuccess() throws Exception {
//Will work because max frame size only applies to amqp
createBroker(transportType + "://localhost:0?wireFormat.amqp.maxFrameSize=10");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
}
@Test(expected=JMSException.class)
public void testUrlConfigurationOpenWireNotAvailable() throws Exception {
//only amqp is available so should fail
createBroker(transportType + "://localhost:0?auto.protocols=amqp");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
}
@Test
public void testUrlConfigurationOpenWireAvailable() throws Exception {
//only open wire is available
createBroker(transportType + "://localhost:0?auto.protocols=default");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
}
@Test
public void testUrlConfigurationOpenWireAndAmqpAvailable() throws Exception {
createBroker(transportType + "://localhost:0?auto.protocols=default,amqp");
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
sendMessage(factory.createConnection());
}
protected void sendMessage(Connection connection) throws JMSException {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue("test"));
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("this is a test");
producer.send(message);
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto.failover;
import org.apache.activemq.transport.failover.FailoverClusterTest;
public class AutoFailoverClusterTest extends FailoverClusterTest {
@Override
protected String getBindAddress() {
return "auto://0.0.0.0:0";
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto.failover;
import org.apache.activemq.transport.failover.FailoverTimeoutTest;
public class AutoFailoverTimeoutTest extends FailoverTimeoutTest {
@Override
protected String getTransportUri() {
return "auto://localhost:0";
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto.failover;
import org.apache.activemq.transport.failover.FailoverRandomTest;
public class FailoverAutoRandomTest extends FailoverRandomTest {
@Override
protected String getBrokerUrl() {
return "auto://localhost:0";
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto.failover;
import junit.framework.Test;
import org.apache.activemq.transport.failover.FailoverTransportBrokerTest;
public class FailoverAutoTransportBrokerTest extends FailoverTransportBrokerTest {
@Override
protected String getLocalURI() {
return "auto://localhost:0?wireFormat.tcpNoDelayEnabled=true";
}
@Override
protected String getRemoteURI() {
return "auto://localhost:0?wireFormat.tcpNoDelayEnabled=true";
}
public static Test suite() {
return suite(FailoverAutoTransportBrokerTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto.nio;
import org.apache.activemq.transport.nio.NIOJmsDurableTopicSendReceiveTest;
public class AutoNIOJmsDurableTopicSendReceiveTest extends NIOJmsDurableTopicSendReceiveTest {
@Override
protected String getBrokerURL() {
return "auto+nio://localhost:61616";
}
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto.nio;
import org.apache.activemq.transport.nio.NIOJmsSendAndReceiveTest;
/**
*
*/
public class AutoNIOJmsSendAndReceiveTest extends NIOJmsSendAndReceiveTest {
@Override
protected String getBrokerURL() {
return "auto+nio://localhost:61616";
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto.nio;
import org.apache.activemq.transport.nio.NIOPersistentSendAndReceiveTest;
public class AutoNIOPersistentSendAndReceiveTest extends NIOPersistentSendAndReceiveTest {
@Override
protected String getBrokerURL() {
return "auto+nio://localhost:61616";
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto.nio;
import org.apache.activemq.transport.nio.NIOSSLBasicTest;
public class AutoNIOSSLBasicTest extends NIOSSLBasicTest {
@Override
protected String getTransportType() {
return "auto+nio+ssl";
}
}

View File

@ -44,10 +44,14 @@ public class FailoverClusterTest extends TestCase {
private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
protected String getBindAddress() {
return BROKER_BIND_ADDRESS;
}
public void testClusterConnectedAfterClients() throws Exception {
createClients();
if (brokerB == null) {
brokerB = createBrokerB(BROKER_BIND_ADDRESS);
brokerB = createBrokerB(getBindAddress());
}
Thread.sleep(3000);
Set<String> set = new HashSet<String>();
@ -61,7 +65,7 @@ public class FailoverClusterTest extends TestCase {
createClients();
if (brokerB == null) {
// add in server side only url param, should not be propagated
brokerB = createBrokerB(BROKER_BIND_ADDRESS + "?transport.closeAsync=false");
brokerB = createBrokerB(getBindAddress() + "?transport.closeAsync=false");
}
Thread.sleep(3000);
Set<String> set = new HashSet<String>();
@ -74,7 +78,7 @@ public class FailoverClusterTest extends TestCase {
public void testClusterConnectedBeforeClients() throws Exception {
if (brokerB == null) {
brokerB = createBrokerB(BROKER_BIND_ADDRESS);
brokerB = createBrokerB(getBindAddress());
}
Thread.sleep(5000);
createClients();
@ -92,7 +96,7 @@ public class FailoverClusterTest extends TestCase {
@Override
protected void setUp() throws Exception {
if (brokerA == null) {
brokerA = createBrokerA(BROKER_BIND_ADDRESS + "?transport.closeAsync=false");
brokerA = createBrokerA(getBindAddress() + "?transport.closeAsync=false");
clientUrl = "failover://(" + brokerA.getTransportConnectors().get(0).getPublishableConnectString() + ")";
}
}

View File

@ -24,23 +24,29 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
public class FailoverRandomTest extends TestCase {
BrokerService brokerA, brokerB;
@Override
public void setUp() throws Exception {
brokerA = createBroker("A");
brokerB = createBroker("B");
}
@Override
public void tearDown() throws Exception {
brokerA.stop();
brokerB.stop();
}
protected String getBrokerUrl() {
return "tcp://localhost:0";
}
private BrokerService createBroker(String name) throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("Broker"+ name);
broker.addConnector("tcp://localhost:0");
broker.addConnector(getBrokerUrl());
broker.getManagementContext().setCreateConnector(false);
broker.setPersistent(false);
broker.setUseJmx(false);
@ -55,14 +61,14 @@ public class FailoverRandomTest extends TestCase {
+ brokerB.getTransportConnectors().get(0).getConnectUri()
+ ")";
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUrl);
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
connection.start();
String brokerName1 = connection.getBrokerName();
assertNotNull(brokerName1);
connection.close();
String brokerName2 = brokerName1;
int attempts = 40;
while (brokerName1.equals(brokerName2) && attempts-- > 0) {

View File

@ -61,6 +61,10 @@ public class FailoverTimeoutTest {
}
}
protected String getTransportUri() {
return "tcp://localhost:0";
}
@Test
public void testTimoutDoesNotFailConnectionAttempts() throws Exception {
bs.stop();

View File

@ -78,21 +78,21 @@ public class NIOSSLBasicTest {
@Test
public void basicConnector() throws Exception {
BrokerService broker = createBroker("nio+ssl", "nio+ssl://localhost:0?transport.needClientAuth=true");
BrokerService broker = createBroker("nio+ssl", getTransportType() + "://localhost:0?transport.needClientAuth=true");
basicSendReceive("ssl://localhost:" + broker.getConnectorByName("nio+ssl").getConnectUri().getPort());
stopBroker(broker);
}
@Test
public void enabledCipherSuites() throws Exception {
BrokerService broker = createBroker("nio+ssl", "nio+ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=SSL_RSA_WITH_RC4_128_SHA,SSL_DH_anon_WITH_3DES_EDE_CBC_SHA");
BrokerService broker = createBroker("nio+ssl", getTransportType() + "://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=SSL_RSA_WITH_RC4_128_SHA,SSL_DH_anon_WITH_3DES_EDE_CBC_SHA");
basicSendReceive("ssl://localhost:" + broker.getConnectorByName("nio+ssl").getConnectUri().getPort());
stopBroker(broker);
}
@Test
public void enabledProtocols() throws Exception {
BrokerService broker = createBroker("nio+ssl", "nio+ssl://localhost:61616?transport.needClientAuth=true&transport.enabledProtocols=TLSv1,TLSv1.1,TLSv1.2");
BrokerService broker = createBroker("nio+ssl", getTransportType() + "://localhost:61616?transport.needClientAuth=true&transport.enabledProtocols=TLSv1,TLSv1.1,TLSv1.2");
basicSendReceive("ssl://localhost:" + broker.getConnectorByName("nio+ssl").getConnectUri().getPort());
stopBroker(broker);
}
@ -112,4 +112,8 @@ public class NIOSSLBasicTest {
Message received = consumer.receive(2000);
TestCase.assertEquals(body, ((TextMessage)received).getText());
}
protected String getTransportType() {
return "nio+ssl";
}
}

View File

@ -29,6 +29,7 @@ public class NIOSSLTransportBrokerTest extends TransportBrokerTestSupport {
public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
@Override
protected String getBindLocation() {
return "nio+ssl://localhost:0?transport.soWriteTimeout=20000";
}
@ -38,10 +39,11 @@ public class NIOSSLTransportBrokerTest extends TransportBrokerTestSupport {
return new URI("nio+ssl://localhost:0?soWriteTimeout=20000");
}
@Override
protected void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);

View File

@ -257,6 +257,27 @@
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -160,6 +160,27 @@
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -574,6 +574,27 @@
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>