-Cleaned up missing license headers and refactored packages.
-Added configuration options for a protocol detection timeout and
for the max number of connections accepted at the same time.
-Fixed a regression with connection counts
-Also added some more tests
This commit is contained in:
Christopher L. Shannon (cshannon) 2015-08-12 12:55:00 +00:00
parent e5a94bfee2
commit e14aca871c
26 changed files with 598 additions and 141 deletions

View File

@ -18,7 +18,6 @@ package org.apache.activemq.transport.amqp.auto;
import java.net.URI; import java.net.URI;
import org.apache.activemq.transport.amqp.JMSClientNioPlusSslTest;
import org.apache.activemq.transport.amqp.JMSClientSslTest; import org.apache.activemq.transport.amqp.JMSClientSslTest;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@ -1,9 +0,0 @@
package org.apache.activemq.broker.transport.protocol;
public interface ProtocolVerifier {
public boolean isProtocol(byte[] value);
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.auto; package org.apache.activemq.transport.auto;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;

View File

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.auto; package org.apache.activemq.transport.auto;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.auto; package org.apache.activemq.transport.auto;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
@ -23,27 +23,19 @@ import java.net.URISyntaxException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.net.ServerSocketFactory; import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
/** /**
* *

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.auto; package org.apache.activemq.transport.auto;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -28,20 +28,27 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ServerSocketFactory; import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.transport.protocol.AmqpProtocolVerifier;
import org.apache.activemq.broker.transport.protocol.MqttProtocolVerifier;
import org.apache.activemq.broker.transport.protocol.OpenWireProtocolVerifier;
import org.apache.activemq.broker.transport.protocol.ProtocolVerifier;
import org.apache.activemq.broker.transport.protocol.StompProtocolVerifier;
import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.protocol.AmqpProtocolVerifier;
import org.apache.activemq.transport.protocol.MqttProtocolVerifier;
import org.apache.activemq.transport.protocol.OpenWireProtocolVerifier;
import org.apache.activemq.transport.protocol.ProtocolVerifier;
import org.apache.activemq.transport.protocol.StompProtocolVerifier;
import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportFactory; import org.apache.activemq.transport.tcp.TcpTransportFactory;
@ -49,9 +56,9 @@ import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.FactoryFinder; import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory; import org.apache.activemq.wireformat.WireFormatFactory;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -69,6 +76,9 @@ public class AutoTcpTransportServer extends TcpTransportServer {
protected BrokerService brokerService; protected BrokerService brokerService;
protected int maxConnectionThreadPoolSize = Integer.MAX_VALUE;
protected int protocolDetectionTimeOut = 30000;
private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/"); private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
private final ConcurrentMap<String, TransportFactory> transportFactories = new ConcurrentHashMap<String, TransportFactory>(); private final ConcurrentMap<String, TransportFactory> transportFactories = new ConcurrentHashMap<String, TransportFactory>();
@ -104,8 +114,9 @@ public class AutoTcpTransportServer extends TcpTransportServer {
// Try to load if from a META-INF property. // Try to load if from a META-INF property.
try { try {
tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
if (options != null) if (options != null) {
IntrospectionSupport.setProperties(tf, options); IntrospectionSupport.setProperties(tf, options);
}
transportFactories.put(scheme, tf); transportFactories.put(scheme, tf);
} catch (Throwable e) { } catch (Throwable e) {
throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
@ -136,12 +147,35 @@ public class AutoTcpTransportServer extends TcpTransportServer {
Set<String> enabledProtocols) Set<String> enabledProtocols)
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
super(transportFactory, location, serverSocketFactory); super(transportFactory, location, serverSocketFactory);
service = Executors.newCachedThreadPool();
//Use an executor service here to handle new connections. Setting the max number
//of threads to the maximum number of connections the thread count isn't unbounded
service = new ThreadPoolExecutor(maxConnectionThreadPoolSize,
maxConnectionThreadPoolSize,
30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
//allow the thread pool to shrink if the max number of threads isn't needed
service.allowCoreThreadTimeOut(true);
this.brokerService = brokerService; this.brokerService = brokerService;
this.enabledProtocols = enabledProtocols; this.enabledProtocols = enabledProtocols;
initProtocolVerifiers(); initProtocolVerifiers();
} }
public int getMaxConnectionThreadPoolSize() {
return maxConnectionThreadPoolSize;
}
public void setMaxConnectionThreadPoolSize(int maxConnectionThreadPoolSize) {
this.maxConnectionThreadPoolSize = maxConnectionThreadPoolSize;
service.setCorePoolSize(maxConnectionThreadPoolSize);
service.setMaximumPoolSize(maxConnectionThreadPoolSize);
}
public void setProtocolDetectionTimeOut(int protocolDetectionTimeOut) {
this.protocolDetectionTimeOut = protocolDetectionTimeOut;
}
@Override @Override
public void setWireFormatFactory(WireFormatFactory factory) { public void setWireFormatFactory(WireFormatFactory factory) {
super.setWireFormatFactory(factory); super.setWireFormatFactory(factory);
@ -179,7 +213,7 @@ public class AutoTcpTransportServer extends TcpTransportServer {
} }
protected final ExecutorService service; protected final ThreadPoolExecutor service;
/** /**
@ -190,7 +224,6 @@ public class AutoTcpTransportServer extends TcpTransportServer {
@Override @Override
protected void handleSocket(final Socket socket) { protected void handleSocket(final Socket socket) {
final AutoTcpTransportServer server = this; final AutoTcpTransportServer server = this;
//This needs to be done in a new thread because //This needs to be done in a new thread because
//the socket might be waiting on the client to send bytes //the socket might be waiting on the client to send bytes
//doHandleSocket can't complete until the protocol can be detected //doHandleSocket can't complete until the protocol can be detected
@ -204,16 +237,47 @@ public class AutoTcpTransportServer extends TcpTransportServer {
@Override @Override
protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
InputStream is = socket.getInputStream(); final InputStream is = socket.getInputStream();
ExecutorService executor = Executors.newSingleThreadExecutor();
final AtomicInteger readBytes = new AtomicInteger(0);
final ByteBuffer data = ByteBuffer.allocate(8);
// We need to peak at the first 8 bytes of the buffer to detect the protocol // We need to peak at the first 8 bytes of the buffer to detect the protocol
Buffer magic = new Buffer(8); Future<?> future = executor.submit(new Runnable() {
magic.readFrom(is); @Override
public void run() {
try {
do {
int read = is.read();
if (read == -1) {
throw new IOException("Connection faild, stream is closed.");
}
data.put((byte) read);
readBytes.incrementAndGet();
} while (readBytes.get() < 8);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
});
ProtocolInfo protocolInfo = detectProtocol(magic.getData()); try {
//Wait for protocolDetectionTimeOut if defined
if (protocolDetectionTimeOut > 0) {
future.get(protocolDetectionTimeOut, TimeUnit.MILLISECONDS);
} else {
future.get();
}
data.flip();
} catch (TimeoutException e) {
throw new InactivityIOException("Client timed out before wire format could be detected. " +
" 8 bytes are required to detect the protocol but only: " + readBytes + " were sent.");
}
initBuffer = new InitBuffer(8, ByteBuffer.allocate(8)); ProtocolInfo protocolInfo = detectProtocol(data.array());
initBuffer.buffer.put(magic.getData());
initBuffer = new InitBuffer(readBytes.get(), ByteBuffer.allocate(readBytes.get()));
initBuffer.buffer.put(data.array());
if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) { if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) {
((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService); ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService);
@ -251,9 +315,17 @@ public class AutoTcpTransportServer extends TcpTransportServer {
public void setAutoTransportOptions(Map<String, Object> autoTransportOptions) { public void setAutoTransportOptions(Map<String, Object> autoTransportOptions) {
this.autoTransportOptions = autoTransportOptions; this.autoTransportOptions = autoTransportOptions;
if (autoTransportOptions.get("protocols") != null) if (autoTransportOptions.get("protocols") != null) {
this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoTransportOptions.get("protocols")); this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoTransportOptions.get("protocols"));
} }
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
if (service != null) {
service.shutdown();
}
super.doStop(stopper);
}
protected ProtocolInfo detectProtocol(byte[] buffer) throws IOException { protected ProtocolInfo detectProtocol(byte[] buffer) throws IOException {
TcpTransportFactory detectedTransportFactory = transportFactory; TcpTransportFactory detectedTransportFactory = transportFactory;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.auto; package org.apache.activemq.transport.auto;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;

View File

@ -1,4 +1,4 @@
package org.apache.activemq.broker.transport.auto.nio; package org.apache.activemq.transport.auto.nio;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
@ -6,6 +6,12 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ServerSocketFactory; import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -13,8 +19,9 @@ import javax.net.ssl.SSLEngine;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer; import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.auto.AutoTcpTransportServer;
import org.apache.activemq.transport.nio.AutoInitNioSSLTransport; import org.apache.activemq.transport.nio.AutoInitNioSSLTransport;
import org.apache.activemq.transport.nio.NIOSSLTransport; import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.transport.tcp.TcpTransport;
@ -25,6 +32,22 @@ import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class AutoNIOSSLTransportServer extends AutoTcpTransportServer { public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
private static final Logger LOG = LoggerFactory.getLogger(AutoNIOSSLTransportServer.class); private static final Logger LOG = LoggerFactory.getLogger(AutoNIOSSLTransportServer.class);
@ -84,27 +107,41 @@ public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
@Override @Override
protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
//The SSLEngine needs to be initialized and handshake done to get the first command and detect the format //The SSLEngine needs to be initialized and handshake done to get the first command and detect the format
AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket); final AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket);
if (context != null) { if (context != null) {
in.setSslContext(context); in.setSslContext(context);
} }
in.start(); in.start();
SSLEngine engine = in.getSslSession(); SSLEngine engine = in.getSslSession();
Future<Integer> future = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
//Wait for handshake to finish initializing //Wait for handshake to finish initializing
byte[] read = null;
do { do {
in.serviceRead(); in.serviceRead();
} while((read = in.read) == null); } while(in.readSize < 8);
return in.readSize;
}
});
try {
future.get(protocolDetectionTimeOut, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new InactivityIOException("Client timed out before wire format could be detected. " +
" 8 bytes are required to detect the protocol but only: " + in.readSize + " were sent.");
}
in.stop(); in.stop();
initBuffer = new InitBuffer(in.readSize, ByteBuffer.allocate(read.length)); initBuffer = new InitBuffer(in.readSize, ByteBuffer.allocate(in.read.length));
initBuffer.buffer.put(read); initBuffer.buffer.put(in.read);
ProtocolInfo protocolInfo = detectProtocol(read); ProtocolInfo protocolInfo = detectProtocol(in.read);
if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) { if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) {
((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService); ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService);

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.auto.nio; package org.apache.activemq.transport.auto.nio;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
@ -58,8 +58,8 @@ public class AutoNIOTransport extends NIOTransport {
protected int readFromBuffer() throws IOException { protected int readFromBuffer() throws IOException {
int readSize = 0; int readSize = 0;
if (!doneInitBuffer) { if (!doneInitBuffer) {
if (initBuffer == null) { if (initBuffer == null || initBuffer.readSize < 8) {
throw new IOException("Null initBuffer"); throw new IOException("Protocol type could not be determined.");
} }
if (nextFrameSize == -1) { if (nextFrameSize == -1) {
readSize = 4; readSize = 4;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.auto.nio; package org.apache.activemq.transport.auto.nio;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
@ -31,11 +31,11 @@ import javax.net.ssl.SSLEngine;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.SslContext; import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer;
import org.apache.activemq.broker.transport.auto.AutoTransportUtils;
import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.auto.AutoTcpTransportServer;
import org.apache.activemq.transport.auto.AutoTransportUtils;
import org.apache.activemq.transport.nio.NIOSSLTransport; import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.transport.nio.NIOSSLTransportFactory; import org.apache.activemq.transport.nio.NIOSSLTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.auto.nio; package org.apache.activemq.transport.auto.nio;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
@ -28,11 +28,11 @@ import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer;
import org.apache.activemq.broker.transport.auto.AutoTransportUtils;
import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.auto.AutoTcpTransportServer;
import org.apache.activemq.transport.auto.AutoTransportUtils;
import org.apache.activemq.transport.nio.NIOTransport; import org.apache.activemq.transport.nio.NIOTransport;
import org.apache.activemq.transport.nio.NIOTransportFactory; import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.transport.tcp.TcpTransport;

View File

@ -172,17 +172,11 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport {
while (true) { while (true) {
if (!plain.hasRemaining()) { if (!plain.hasRemaining()) {
int readCount = secureRead(plain); int readCount = secureRead(plain);
if (readCount == 0) {
break;
}
// channel is closed, cleanup // channel is closed, cleanup
if (readCount == -1) { if (readCount == -1) {
onException(new EOFException()); onException(new EOFException());
selection.close();
break; break;
} }
@ -191,10 +185,13 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport {
if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) { if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
processCommand(plain); processCommand(plain);
//Break when command is found //we have received enough bytes to detect the protocol
if (receiveCounter >= 8) {
readSize = receiveCounter;
break; break;
} }
} }
}
} catch (IOException e) { } catch (IOException e) {
onException(e); onException(e);
} catch (Throwable e) { } catch (Throwable e) {
@ -204,8 +201,13 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport {
@Override @Override
protected void processCommand(ByteBuffer plain) throws Exception { protected void processCommand(ByteBuffer plain) throws Exception {
read = plain.array(); ByteBuffer newBuffer = ByteBuffer.allocate(receiveCounter);
readSize = receiveCounter; if (read != null) {
newBuffer.put(read);
}
newBuffer.put(plain);
newBuffer.flip();
read = newBuffer.array();
} }
@ -214,7 +216,6 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport {
taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task"); taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task");
// no need to init as we can delay that until demand (eg in doHandshake) // no need to init as we can delay that until demand (eg in doHandshake)
connect(); connect();
//super.doStart();
} }
@ -224,10 +225,6 @@ public class AutoInitNioSSLTransport extends NIOSSLTransport {
taskRunnerFactory.shutdownNow(); taskRunnerFactory.shutdownNow();
taskRunnerFactory = null; taskRunnerFactory = null;
} }
// if (selection != null) {
// selection.close();
// selection = null;
// }
} }

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.protocol; package org.apache.activemq.transport.protocol;
/** /**
@ -28,9 +28,10 @@ public class AmqpProtocolVerifier implements ProtocolVerifier {
@Override @Override
public boolean isProtocol(byte[] value) { public boolean isProtocol(byte[] value) {
for (int i = 0; i < PREFIX.length; i++) { for (int i = 0; i < PREFIX.length; i++) {
if (value[i] != PREFIX[i]) if (value[i] != PREFIX[i]) {
return false; return false;
} }
}
return true; return true;
} }
} }

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.protocol; package org.apache.activemq.transport.protocol;
/** /**
* *

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.protocol; package org.apache.activemq.transport.protocol;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.protocol;
public interface ProtocolVerifier {
public boolean isProtocol(byte[] value);
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.broker.transport.protocol; package org.apache.activemq.transport.protocol;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;

View File

@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and ## See the License for the specific language governing permissions and
## limitations under the License. ## limitations under the License.
## --------------------------------------------------------------------------- ## ---------------------------------------------------------------------------
class=org.apache.activemq.broker.transport.auto.AutoTcpTransportFactory class=org.apache.activemq.transport.auto.AutoTcpTransportFactory

View File

@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and ## See the License for the specific language governing permissions and
## limitations under the License. ## limitations under the License.
## --------------------------------------------------------------------------- ## ---------------------------------------------------------------------------
class=org.apache.activemq.broker.transport.auto.nio.AutoNioTransportFactory class=org.apache.activemq.transport.auto.nio.AutoNioTransportFactory

View File

@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and ## See the License for the specific language governing permissions and
## limitations under the License. ## limitations under the License.
## --------------------------------------------------------------------------- ## ---------------------------------------------------------------------------
class=org.apache.activemq.broker.transport.auto.nio.AutoNioSslTransportFactory class=org.apache.activemq.transport.auto.nio.AutoNioSslTransportFactory

View File

@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and ## See the License for the specific language governing permissions and
## limitations under the License. ## limitations under the License.
## --------------------------------------------------------------------------- ## ---------------------------------------------------------------------------
class=org.apache.activemq.broker.transport.auto.AutoSslTransportFactory class=org.apache.activemq.transport.auto.AutoSslTransportFactory

View File

@ -73,8 +73,9 @@ public class NIOSSLTransport extends NIOTransport {
ByteBuffer inputBuffer) throws IOException { ByteBuffer inputBuffer) throws IOException {
super(wireFormat, socket, initBuffer); super(wireFormat, socket, initBuffer);
this.sslEngine = engine; this.sslEngine = engine;
if (engine != null) if (engine != null) {
this.sslSession = engine.getSession(); this.sslSession = engine.getSession();
}
this.inputBuffer = inputBuffer; this.inputBuffer = inputBuffer;
} }
@ -146,11 +147,13 @@ public class NIOSSLTransport extends NIOTransport {
this.buffOut = outputStream; this.buffOut = outputStream;
//If the sslEngine was not passed in, then handshake //If the sslEngine was not passed in, then handshake
if (!hasSslEngine) if (!hasSslEngine) {
sslEngine.beginHandshake(); sslEngine.beginHandshake();
}
handshakeStatus = sslEngine.getHandshakeStatus(); handshakeStatus = sslEngine.getHandshakeStatus();
if (!hasSslEngine) if (!hasSslEngine) {
doHandshake(); doHandshake();
}
// if (hasSslEngine) { // if (hasSslEngine) {
selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() { selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
@ -328,9 +331,9 @@ public class NIOSSLTransport extends NIOTransport {
currentBuffer.putInt(nextFrameSize); currentBuffer.putInt(nextFrameSize);
} else { } else {
// If its all in one read then we can just take it all, otherwise take only // If its all in one read then we can just take it all, otherwise take only
// the current frame size and the next iteration starts a new command. // the current frame size and the next iteration starts a new command.
if (currentBuffer != null) {
if (currentBuffer.remaining() >= plain.remaining()) { if (currentBuffer.remaining() >= plain.remaining()) {
currentBuffer.put(plain); currentBuffer.put(plain);
} else { } else {
@ -351,6 +354,7 @@ public class NIOSSLTransport extends NIOTransport {
} }
} }
} }
}
protected int secureRead(ByteBuffer plain) throws Exception { protected int secureRead(ByteBuffer plain) throws Exception {

View File

@ -481,14 +481,24 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
final protected void doHandleSocket(Socket socket) { final protected void doHandleSocket(Socket socket) {
boolean closeSocket = true; boolean closeSocket = true;
boolean countIncremented = false;
try { try {
if (this.currentTransportCount.get() >= this.maximumConnections) { int currentCount;
do {
currentCount = currentTransportCount.get();
if (currentCount >= this.maximumConnections) {
throw new ExceededMaximumConnectionsException( throw new ExceededMaximumConnectionsException(
"Exceeded the maximum number of allowed client connections. See the '" + "Exceeded the maximum number of allowed client connections. See the '" +
"maximumConnections' property on the TCP transport configuration URI " + "maximumConnections' property on the TCP transport configuration URI " +
"in the ActiveMQ configuration file (e.g., activemq.xml)"); "in the ActiveMQ configuration file (e.g., activemq.xml)");
} else { }
currentTransportCount.incrementAndGet();
//Increment this value before configuring the transport
//This is necessary because some of the transport servers must read from the
//socket during configureTransport() so we want to make sure this value is
//accurate as the transport server could pause here waiting for data to be sent from a client
} while(!currentTransportCount.compareAndSet(currentCount, currentCount + 1));
countIncremented = true;
HashMap<String, Object> options = new HashMap<String, Object>(); HashMap<String, Object> options = new HashMap<String, Object>();
options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
@ -514,14 +524,17 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
transportInfo.transport, transportInfo.format, options); transportInfo.transport, transportInfo.format, options);
getAcceptListener().onAccept(configuredTransport); getAcceptListener().onAccept(configuredTransport);
}
} catch (SocketTimeoutException ste) { } catch (SocketTimeoutException ste) {
// expect this to happen // expect this to happen
currentTransportCount.decrementAndGet();
} catch (Exception e) { } catch (Exception e) {
currentTransportCount.decrementAndGet();
if (closeSocket) { if (closeSocket) {
try { try {
//if closing the socket, only decrement the count it was actually incremented
//where it was incremented
if (countIncremented) {
currentTransportCount.decrementAndGet();
}
socket.close(); socket.close();
} catch (Exception ignore) { } catch (Exception ignore) {
} }

View File

@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp.auto;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.transport.auto.AutoTcpTransportServer;
import org.apache.activemq.transport.stomp.StompTestSupport;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test that connection attempts that don't send the connect get cleaned by
* by the protocolDetectionTimeOut property
*/
@RunWith(Parameterized.class)
public class AutoStompConnectTimeoutTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(AutoStompConnectTimeoutTest.class);
private Socket connection;
protected String connectorScheme;
@Parameters(name="{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"auto"},
{"auto+ssl"},
{"auto+nio"},
{"auto+nio+ssl"}
});
}
public AutoStompConnectTimeoutTest(String connectorScheme) {
this.connectorScheme = connectorScheme;
}
protected String getConnectorScheme() {
return connectorScheme;
}
@Override
public void tearDown() throws Exception {
if (connection != null) {
try {
connection.close();
} catch (Throwable e) {}
connection = null;
}
super.tearDown();
}
@Override
public String getAdditionalConfig() {
return "?protocolDetectionTimeOut=1500";
}
@Test(timeout = 15000)
public void testInactivityMonitor() throws Exception {
Thread t1 = new Thread() {
@Override
public void run() {
try {
connection = createSocket();
connection.getOutputStream().write('C');
connection.getOutputStream().flush();
} catch (Exception ex) {
LOG.error("unexpected exception on connect/disconnect", ex);
exceptions.add(ex);
}
}
};
t1.start();
assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
AutoTcpTransportServer server = (AutoTcpTransportServer) brokerService.getTransportConnectorByScheme(getConnectorScheme()).getServer();
return 1 == server.getCurrentTransportCount().get();
}
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
// and it should be closed due to inactivity
assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
AutoTcpTransportServer server = (AutoTcpTransportServer) brokerService.getTransportConnectorByScheme(getConnectorScheme()).getServer();
return 0 == server.getCurrentTransportCount().get();
}
}, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500)));
assertTrue("no exceptions", exceptions.isEmpty());
}
@Override
protected boolean isUseTcpConnector() {
return false;
}
@Override
protected boolean isUseAutoConnector() {
return connectorScheme.equalsIgnoreCase("auto");
}
@Override
protected boolean isUseAutoSslConnector() {
return connectorScheme.equalsIgnoreCase("auto+ssl");
}
@Override
protected boolean isUseAutoNioConnector() {
return connectorScheme.equalsIgnoreCase("auto+nio");
}
@Override
protected boolean isUseAutoNioPlusSslConnector() {
return connectorScheme.equalsIgnoreCase("auto+nio+ssl");
}
@Override
protected Socket createSocket() throws IOException {
boolean useSSL = false;
int port = 0;
switch (connectorScheme) {
case "auto":
port = this.autoPort;
break;
case "auto+ssl":
useSSL = true;
port = this.autoSslPort;
break;
case "auto+nio":
port = this.autoNioPort;
break;
case "auto+nio+ssl":
useSSL = true;
port = this.autoNioSslPort;
break;
default:
throw new IOException("Invalid STOMP connector scheme passed to test.");
}
if (useSSL) {
return SSLSocketFactory.getDefault().createSocket("localhost", port);
} else {
return new Socket("localhost", port);
}
}
}

View File

@ -49,7 +49,6 @@ public class AutoNIOSslTransportBrokerTest extends TransportBrokerTestSupport {
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
//System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager");
maxWait = 10000; maxWait = 10000;
super.setUp(); super.setUp();

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.auto;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.springframework.jms.support.JmsUtils;
@RunWith(Parameterized.class)
public class AutoTransportMaxConnectionsTest {
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
private static final int maxConnections = 20;
private final ExecutorService executor = Executors.newCachedThreadPool();
private String connectionUri;
private BrokerService service;
private TransportConnector connector;
private final String transportType;
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"auto"},
{"auto+nio"},
{"auto+ssl"},
{"auto+nio+ssl"},
});
}
public AutoTransportMaxConnectionsTest(String transportType) {
super();
this.transportType = transportType;
}
@Before
public void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
service = new BrokerService();
service.setPersistent(false);
service.setUseJmx(false);
connector = service.addConnector(transportType + "://0.0.0.0:0?maxConnectionThreadPoolSize=10&maximumConnections="+maxConnections);
connectionUri = connector.getPublishableConnectString();
service.start();
service.waitUntilStarted();
}
protected ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(connectionUri);
}
@Test
public void testMaxConnectionControl() throws Exception {
final ConnectionFactory cf = createConnectionFactory();
final CountDownLatch startupLatch = new CountDownLatch(1);
for(int i = 0; i < maxConnections + 20; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
Connection conn = null;
try {
startupLatch.await();
conn = cf.createConnection();
conn.start();
} catch (Exception e) {
//JmsUtils.closeConnection(conn);
}
}
});
}
TcpTransportServer transportServer = (TcpTransportServer)connector.getServer();
// ensure the max connections is in effect
assertEquals(maxConnections, transportServer.getMaximumConnections());
// No connections at first
assertEquals(0, connector.getConnections().size());
// Release the latch to set up connections in parallel
startupLatch.countDown();
final TransportConnector connector = this.connector;
// Expect the max connections is created
assertTrue("Expected: " + maxConnections + " found: " + connector.getConnections().size(),
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connector.getConnections().size() == maxConnections;
}
})
);
}
@After
public void tearDown() throws Exception {
executor.shutdown();
service.stop();
service.waitUntilStopped();
}
}