From 82a5839fc733c24cef6e2178b52d8fb649b69879 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 4 Nov 2015 12:47:57 -0500 Subject: [PATCH] NO-JIRA Update test client to have no real linkage to the activemq internals to make it easier to share the tests with Artemis. --- .../transport/amqp/client/AmqpClient.java | 5 +- .../transport/amqp/client/AmqpConnection.java | 38 +- .../transport/amqp/client/AmqpReceiver.java | 2 +- .../amqp/client/transport/NettyTransport.java | 389 ++++++++++++ .../transport/NettyTransportFactory.java | 68 ++ .../transport/NettyTransportListener.java | 48 ++ .../transport/NettyTransportOptions.java | 183 ++++++ .../transport/NettyTransportSslOptions.java | 287 +++++++++ .../transport/NettyTransportSupport.java | 299 +++++++++ .../PartialPooledByteBufAllocator.java | 135 ++++ .../client/transport/X509AliasKeyManager.java | 86 +++ .../amqp/client/util/ClientFuture.java | 2 - .../amqp/client/util/ClientTcpTransport.java | 389 ------------ .../amqp/client/util/IOExceptionSupport.java | 47 ++ .../amqp/client/util/IdGenerator.java | 272 ++++++++ .../amqp/client/util/PropertyUtil.java | 589 ++++++++++++++++++ .../client/util/StringArrayConverter.java | 64 ++ .../client/util/TypeConversionSupport.java | 209 +++++++ 18 files changed, 2706 insertions(+), 406 deletions(-) create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java delete mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/IdGenerator.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/PropertyUtil.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/StringArrayConverter.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/TypeConversionSupport.java diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java index 175a8de1a8..72d0a97b4d 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -21,7 +21,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport; +import org.apache.activemq.transport.amqp.client.transport.NettyTransport; +import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory; import org.apache.qpid.proton.amqp.Symbol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +92,7 @@ public class AmqpClient { throw new IllegalArgumentException("Password must be null if user name value is null"); } - ClientTcpTransport transport = new ClientTcpTransport(remoteURI); + NettyTransport transport = NettyTransportFactory.createTransport(remoteURI); AmqpConnection connection = new AmqpConnection(transport, username, password); connection.setOfferedCapabilities(getOfferedCapabilities()); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 000512c37e..c24a90429d 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -17,6 +17,9 @@ package org.apache.activemq.transport.amqp.client; import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.net.URI; @@ -34,10 +37,11 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator; +import org.apache.activemq.transport.amqp.client.transport.NettyTransport; +import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener; import org.apache.activemq.transport.amqp.client.util.ClientFuture; -import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport; +import org.apache.activemq.transport.amqp.client.util.IdGenerator; import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection; -import org.apache.activemq.util.IdGenerator; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; @@ -47,11 +51,10 @@ import org.apache.qpid.proton.engine.Event.Type; import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.CollectorImpl; -import org.fusesource.hawtbuf.Buffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AmqpConnection extends AmqpAbstractResource implements ClientTcpTransport.TransportListener { +public class AmqpConnection extends AmqpAbstractResource implements NettyTransportListener { private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); @@ -69,7 +72,7 @@ public class AmqpConnection extends AmqpAbstractResource implements private final AtomicBoolean connected = new AtomicBoolean(); private final AtomicLong sessionIdGenerator = new AtomicLong(); private final Collector protonCollector = new CollectorImpl(); - private final ClientTcpTransport transport; + private final NettyTransport transport; private final Transport protonTransport = Transport.Factory.create(); private final String username; @@ -90,7 +93,7 @@ public class AmqpConnection extends AmqpAbstractResource implements private long connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; - public AmqpConnection(ClientTcpTransport transport, String username, String password) { + public AmqpConnection(NettyTransport transport, String username, String password) { setEndpoint(Connection.Factory.create()); getEndpoint().collect(protonCollector); @@ -98,7 +101,7 @@ public class AmqpConnection extends AmqpAbstractResource implements this.username = username; this.password = password; this.connectionId = CONNECTION_ID_GENERATOR.generateId(); - this.remoteURI = transport.getRemoteURI(); + this.remoteURI = transport.getRemoteLocation(); this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @@ -257,7 +260,7 @@ public class AmqpConnection extends AmqpAbstractResource implements public void run() { checkClosed(); try { - transport.send(ByteBuffer.wrap(rawData)); + transport.send(Unpooled.wrappedBuffer(rawData)); } catch (IOException e) { fireClientException(e); } finally { @@ -409,7 +412,9 @@ public class AmqpConnection extends AmqpAbstractResource implements while (!done) { ByteBuffer toWrite = protonTransport.getOutputBuffer(); if (toWrite != null && toWrite.hasRemaining()) { - transport.send(toWrite); + ByteBuf outbound = transport.allocateSendBuffer(toWrite.remaining()); + outbound.writeBytes(toWrite); + transport.send(outbound); protonTransport.outputConsumed(); } else { done = true; @@ -423,12 +428,16 @@ public class AmqpConnection extends AmqpAbstractResource implements //----- Transport listener event hooks -----------------------------------// @Override - public void onData(final Buffer input) { + public void onData(final ByteBuf incoming) { + + // We need to retain until the serializer gets around to processing it. + ReferenceCountUtil.retain(incoming); + serializer.execute(new Runnable() { @Override public void run() { - ByteBuffer source = input.toByteBuffer(); + ByteBuffer source = incoming.nioBuffer(); LOG.trace("Received from Broker {} bytes:", source.remaining()); if (protonTransport.isClosed()) { @@ -446,6 +455,8 @@ public class AmqpConnection extends AmqpAbstractResource implements source.position(source.position() + limit); } while (source.hasRemaining()); + ReferenceCountUtil.release(incoming); + // Process the state changes from the latest data and then answer back // any pending updates to the Broker. processUpdates(); @@ -498,7 +509,10 @@ public class AmqpConnection extends AmqpAbstractResource implements } } } catch (Exception e) { - transport.close(); + try { + transport.close(); + } catch (IOException e1) { + } fireClientException(e); } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index a40b10c575..9d139e6866 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -32,8 +32,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.InvalidDestinationException; import org.apache.activemq.transport.amqp.client.util.ClientFuture; +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver; -import org.apache.activemq.util.IOExceptionSupport; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java new file mode 100644 index 0000000000..81e3a1dd45 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java @@ -0,0 +1,389 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +import java.io.IOException; +import java.net.URI; +import java.security.Principal; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TCP based transport that uses Netty as the underlying IO layer. + */ +public class NettyTransport { + + private static final Logger LOG = LoggerFactory.getLogger(NettyTransport.class); + + private static final int QUIET_PERIOD = 20; + private static final int SHUTDOWN_TIMEOUT = 100; + + protected Bootstrap bootstrap; + protected EventLoopGroup group; + protected Channel channel; + protected NettyTransportListener listener; + protected NettyTransportOptions options; + protected final URI remote; + protected boolean secure; + + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final CountDownLatch connectLatch = new CountDownLatch(1); + private IOException failureCause; + private Throwable pendingFailure; + + /** + * Create a new transport instance + * + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public NettyTransport(URI remoteLocation, NettyTransportOptions options) { + this(null, remoteLocation, options); + } + + /** + * Create a new transport instance + * + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public NettyTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { + this.options = options; + this.listener = listener; + this.remote = remoteLocation; + this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl"); + } + + public void connect() throws IOException { + + if (listener == null) { + throw new IllegalStateException("A transport listener must be set before connection attempts."); + } + + group = new NioEventLoopGroup(1); + + bootstrap = new Bootstrap(); + bootstrap.group(group); + bootstrap.channel(NioSocketChannel.class); + bootstrap.handler(new ChannelInitializer() { + + @Override + public void initChannel(Channel connectedChannel) throws Exception { + configureChannel(connectedChannel); + } + }); + + configureNetty(bootstrap, getTransportOptions()); + + ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort()); + future.addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + handleConnected(future.channel()); + } else if (future.isCancelled()) { + connectionFailed(new IOException("Connection attempt was cancelled")); + } else { + connectionFailed(IOExceptionSupport.create(future.cause())); + } + } + }); + + try { + connectLatch.await(); + } catch (InterruptedException ex) { + LOG.debug("Transport connection was interrupted."); + Thread.interrupted(); + failureCause = IOExceptionSupport.create(ex); + } + + if (failureCause != null) { + // Close out any Netty resources now as they are no longer needed. + if (channel != null) { + channel.close(); + channel = null; + } + if (group != null) { + group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + group = null; + } + + throw failureCause; + } else { + // Connected, allow any held async error to fire now and close the transport. + channel.eventLoop().execute(new Runnable() { + + @Override + public void run() { + if (pendingFailure != null) { + channel.pipeline().fireExceptionCaught(pendingFailure); + } + } + }); + } + } + + public boolean isSSL() { + return secure; + } + + public boolean isConnected() { + return connected.get(); + } + + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + connected.set(false); + if (channel != null) { + channel.close().syncUninterruptibly(); + } + if (group != null) { + group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + } + } + } + + public ByteBuf allocateSendBuffer(int size) throws IOException { + checkConnected(); + return channel.alloc().ioBuffer(size, size); + } + + public void send(ByteBuf output) throws IOException { + checkConnected(); + int length = output.readableBytes(); + if (length == 0) { + return; + } + + LOG.trace("Attempted write of: {} bytes", length); + + channel.writeAndFlush(output); + } + + public NettyTransportListener getTransportListener() { + return listener; + } + + public void setTransportListener(NettyTransportListener listener) { + this.listener = listener; + } + + public NettyTransportOptions getTransportOptions() { + if (options == null) { + if (isSSL()) { + options = NettyTransportSslOptions.INSTANCE; + } else { + options = NettyTransportOptions.INSTANCE; + } + } + + return options; + } + + public URI getRemoteLocation() { + return remote; + } + + public Principal getLocalPrincipal() { + if (!isSSL()) { + throw new UnsupportedOperationException("Not connected to a secure channel"); + } + + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + + return sslHandler.engine().getSession().getLocalPrincipal(); + } + + //----- Internal implementation details, can be overridden as needed --// + + protected String getRemoteHost() { + return remote.getHost(); + } + + protected int getRemotePort() { + int port = remote.getPort(); + + if (port <= 0) { + if (isSSL()) { + port = getSslOptions().getDefaultSslPort(); + } else { + port = getTransportOptions().getDefaultTcpPort(); + } + } + + return port; + } + + protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { + bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); + bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); + bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); + bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); + + if (options.getSendBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); + } + + if (options.getReceiveBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize()); + bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize())); + } + + if (options.getTrafficClass() != -1) { + bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); + } + } + + protected void configureChannel(Channel channel) throws Exception { + if (isSSL()) { + channel.pipeline().addLast(NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions())); + } + + channel.pipeline().addLast(new NettyTcpTransportHandler()); + } + + protected void handleConnected(final Channel channel) throws Exception { + if (isSSL()) { + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + + Future channelFuture = sslHandler.handshakeFuture(); + channelFuture.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + LOG.trace("SSL Handshake has completed: {}", channel); + connectionEstablished(channel); + } else { + LOG.trace("SSL Handshake has failed: {}", channel); + connectionFailed(IOExceptionSupport.create(future.cause())); + } + } + }); + } else { + connectionEstablished(channel); + } + } + + //----- State change handlers and checks ---------------------------------// + + /** + * Called when the transport has successfully connected and is ready for use. + */ + protected void connectionEstablished(Channel connectedChannel) { + channel = connectedChannel; + connected.set(true); + connectLatch.countDown(); + } + + /** + * Called when the transport connection failed and an error should be returned. + * + * @param cause + * An IOException that describes the cause of the failed connection. + */ + protected void connectionFailed(IOException cause) { + failureCause = IOExceptionSupport.create(cause); + connected.set(false); + connectLatch.countDown(); + } + + private NettyTransportSslOptions getSslOptions() { + return (NettyTransportSslOptions) getTransportOptions(); + } + + private void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } + + //----- Handle connection events -----------------------------------------// + + private class NettyTcpTransportHandler extends SimpleChannelInboundHandler { + + @Override + public void channelActive(ChannelHandlerContext context) throws Exception { + LOG.trace("Channel has become active! Channel is {}", context.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext context) throws Exception { + LOG.trace("Channel has gone inactive! Channel is {}", context.channel()); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportClosed listener"); + listener.onTransportClosed(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { + LOG.trace("Exception on channel! Channel is {}", context.channel()); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportError listener"); + if (pendingFailure != null) { + listener.onTransportError(pendingFailure); + } else { + listener.onTransportError(cause); + } + } else { + // Hold the first failure for later dispatch if connect succeeds. + // This will then trigger disconnect using the first error reported. + if (pendingFailure != null) { + LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); + pendingFailure = cause; + } + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer); + listener.onData(buffer); + } + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java new file mode 100644 index 0000000000..a002eae147 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import java.net.URI; +import java.util.Map; + +import org.apache.qpid.jms.util.PropertyUtil; + +/** + * Factory for creating the Netty based TCP Transport. + */ +public final class NettyTransportFactory { + + private NettyTransportFactory() {} + + /** + * Creates an instance of the given Transport and configures it using the + * properties set on the given remote broker URI. + * + * @param remoteURI + * The URI used to connect to a remote Peer. + * + * @return a new Transport instance. + * + * @throws Exception if an error occurs while creating the Transport instance. + */ + public static NettyTransport createTransport(URI remoteURI) throws Exception { + Map map = PropertyUtil.parseQuery(remoteURI.getQuery()); + Map transportURIOptions = PropertyUtil.filterProperties(map, "transport."); + NettyTransportOptions transportOptions = null; + + remoteURI = PropertyUtil.replaceQuery(remoteURI, map); + + if (!remoteURI.getScheme().equalsIgnoreCase("ssl")) { + transportOptions = NettyTransportOptions.INSTANCE.clone(); + } else { + transportOptions = NettyTransportSslOptions.INSTANCE.clone(); + } + + Map unused = PropertyUtil.setProperties(transportOptions, transportURIOptions); + if (!unused.isEmpty()) { + String msg = " Not all transport options could be set on the TCP based" + + " Transport. Check the options are spelled correctly." + + " Unused parameters=[" + unused + "]." + + " This provider instance cannot be started."; + throw new IllegalArgumentException(msg); + } + + NettyTransport result = new NettyTransport(remoteURI, transportOptions); + + return result; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java new file mode 100644 index 0000000000..09959a32f8 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import io.netty.buffer.ByteBuf; + +/** + * Listener interface that should be implemented by users of the various + * QpidJMS Transport classes. + */ +public interface NettyTransportListener { + + /** + * Called when new incoming data has become available. + * + * @param incoming + * the next incoming packet of data. + */ + void onData(ByteBuf incoming); + + /** + * Called if the connection state becomes closed. + */ + void onTransportClosed(); + + /** + * Called when an error occurs during normal Transport operations. + * + * @param cause + * the error that triggered this event. + */ + void onTransportError(Throwable cause); + +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java new file mode 100644 index 0000000000..745a40d208 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +/** + * Encapsulates all the TCP Transport options in one configuration object. + */ +public class NettyTransportOptions { + + public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024; + public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE; + public static final int DEFAULT_TRAFFIC_CLASS = 0; + public static final boolean DEFAULT_TCP_NO_DELAY = true; + public static final boolean DEFAULT_TCP_KEEP_ALIVE = false; + public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE; + public static final int DEFAULT_SO_TIMEOUT = -1; + public static final int DEFAULT_CONNECT_TIMEOUT = 60000; + public static final int DEFAULT_TCP_PORT = 5672; + + public static final NettyTransportOptions INSTANCE = new NettyTransportOptions(); + + private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; + private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; + private int trafficClass = DEFAULT_TRAFFIC_CLASS; + private int connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private int soTimeout = DEFAULT_SO_TIMEOUT; + private int soLinger = DEFAULT_SO_LINGER; + private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE; + private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; + private int defaultTcpPort = DEFAULT_TCP_PORT; + + /** + * @return the currently set send buffer size in bytes. + */ + public int getSendBufferSize() { + return sendBufferSize; + } + + /** + * Sets the send buffer size in bytes, the value must be greater than zero + * or an {@link IllegalArgumentException} will be thrown. + * + * @param sendBufferSize + * the new send buffer size for the TCP Transport. + * + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setSendBufferSize(int sendBufferSize) { + if (sendBufferSize <= 0) { + throw new IllegalArgumentException("The send buffer size must be > 0"); + } + + this.sendBufferSize = sendBufferSize; + } + + /** + * @return the currently configured receive buffer size in bytes. + */ + public int getReceiveBufferSize() { + return receiveBufferSize; + } + + /** + * Sets the receive buffer size in bytes, the value must be greater than zero + * or an {@link IllegalArgumentException} will be thrown. + * + * @param receiveBufferSize + * the new receive buffer size for the TCP Transport. + * + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setReceiveBufferSize(int receiveBufferSize) { + if (receiveBufferSize <= 0) { + throw new IllegalArgumentException("The send buffer size must be > 0"); + } + + this.receiveBufferSize = receiveBufferSize; + } + + /** + * @return the currently configured traffic class value. + */ + public int getTrafficClass() { + return trafficClass; + } + + /** + * Sets the traffic class value used by the TCP connection, valid + * range is between 0 and 255. + * + * @param trafficClass + * the new traffic class value. + * + * @throws IllegalArgumentException if the value given is not in the valid range. + */ + public void setTrafficClass(int trafficClass) { + if (trafficClass < 0 || trafficClass > 255) { + throw new IllegalArgumentException("Traffic class must be in the range [0..255]"); + } + + this.trafficClass = trafficClass; + } + + public int getSoTimeout() { + return soTimeout; + } + + public void setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + } + + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public int getSoLinger() { + return soLinger; + } + + public void setSoLinger(int soLinger) { + this.soLinger = soLinger; + } + + public boolean isTcpKeepAlive() { + return tcpKeepAlive; + } + + public void setTcpKeepAlive(boolean keepAlive) { + this.tcpKeepAlive = keepAlive; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public int getDefaultTcpPort() { + return defaultTcpPort; + } + + public void setDefaultTcpPort(int defaultTcpPort) { + this.defaultTcpPort = defaultTcpPort; + } + + @Override + public NettyTransportOptions clone() { + return copyOptions(new NettyTransportOptions()); + } + + protected NettyTransportOptions copyOptions(NettyTransportOptions copy) { + copy.setConnectTimeout(getConnectTimeout()); + copy.setReceiveBufferSize(getReceiveBufferSize()); + copy.setSendBufferSize(getSendBufferSize()); + copy.setSoLinger(getSoLinger()); + copy.setSoTimeout(getSoTimeout()); + copy.setTcpKeepAlive(isTcpKeepAlive()); + copy.setTcpNoDelay(isTcpNoDelay()); + copy.setTrafficClass(getTrafficClass()); + + return copy; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java new file mode 100644 index 0000000000..92ffd3c42a --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Holds the defined SSL options for connections that operate over a secure + * transport. Options are read from the environment and can be overridden by + * specifying them on the connection URI. + */ +public class NettyTransportSslOptions extends NettyTransportOptions { + + public static final String DEFAULT_STORE_TYPE = "jks"; + public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS"; + public static final boolean DEFAULT_TRUST_ALL = false; + public static final boolean DEFAULT_VERIFY_HOST = true; + public static final List DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"})); + public static final int DEFAULT_SSL_PORT = 5671; + + public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions(); + + private String keyStoreLocation; + private String keyStorePassword; + private String trustStoreLocation; + private String trustStorePassword; + private String storeType = DEFAULT_STORE_TYPE; + private String[] enabledCipherSuites; + private String[] disabledCipherSuites; + private String[] enabledProtocols; + private String[] disabledProtocols = DEFAULT_DISABLED_PROTOCOLS.toArray(new String[0]); + private String contextProtocol = DEFAULT_CONTEXT_PROTOCOL; + + private boolean trustAll = DEFAULT_TRUST_ALL; + private boolean verifyHost = DEFAULT_VERIFY_HOST; + private String keyAlias; + private int defaultSslPort = DEFAULT_SSL_PORT; + + static { + INSTANCE.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore")); + INSTANCE.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword")); + INSTANCE.setTrustStoreLocation(System.getProperty("javax.net.ssl.trustStore")); + INSTANCE.setTrustStorePassword(System.getProperty("javax.net.ssl.keyStorePassword")); + } + + /** + * @return the keyStoreLocation currently configured. + */ + public String getKeyStoreLocation() { + return keyStoreLocation; + } + + /** + * Sets the location on disk of the key store to use. + * + * @param keyStoreLocation + * the keyStoreLocation to use to create the key manager. + */ + public void setKeyStoreLocation(String keyStoreLocation) { + this.keyStoreLocation = keyStoreLocation; + } + + /** + * @return the keyStorePassword + */ + public String getKeyStorePassword() { + return keyStorePassword; + } + + /** + * @param keyStorePassword the keyStorePassword to set + */ + public void setKeyStorePassword(String keyStorePassword) { + this.keyStorePassword = keyStorePassword; + } + + /** + * @return the trustStoreLocation + */ + public String getTrustStoreLocation() { + return trustStoreLocation; + } + + /** + * @param trustStoreLocation the trustStoreLocation to set + */ + public void setTrustStoreLocation(String trustStoreLocation) { + this.trustStoreLocation = trustStoreLocation; + } + + /** + * @return the trustStorePassword + */ + public String getTrustStorePassword() { + return trustStorePassword; + } + + /** + * @param trustStorePassword the trustStorePassword to set + */ + public void setTrustStorePassword(String trustStorePassword) { + this.trustStorePassword = trustStorePassword; + } + + /** + * @return the storeType + */ + public String getStoreType() { + return storeType; + } + + /** + * @param storeType + * the format that the store files are encoded in. + */ + public void setStoreType(String storeType) { + this.storeType = storeType; + } + + /** + * @return the enabledCipherSuites + */ + public String[] getEnabledCipherSuites() { + return enabledCipherSuites; + } + + /** + * @param enabledCipherSuites the enabledCipherSuites to set + */ + public void setEnabledCipherSuites(String[] enabledCipherSuites) { + this.enabledCipherSuites = enabledCipherSuites; + } + + /** + * @return the disabledCipherSuites + */ + public String[] getDisabledCipherSuites() { + return disabledCipherSuites; + } + + /** + * @param disabledCipherSuites the disabledCipherSuites to set + */ + public void setDisabledCipherSuites(String[] disabledCipherSuites) { + this.disabledCipherSuites = disabledCipherSuites; + } + + /** + * @return the enabledProtocols or null if the defaults should be used + */ + public String[] getEnabledProtocols() { + return enabledProtocols; + } + + /** + * The protocols to be set as enabled. + * + * @param enabledProtocols the enabled protocols to set, or null if the defaults should be used. + */ + public void setEnabledProtocols(String[] enabledProtocols) { + this.enabledProtocols = enabledProtocols; + } + + /** + * + * @return the protocols to disable or null if none should be + */ + public String[] getDisabledProtocols() { + return disabledProtocols; + } + + /** + * The protocols to be disable. + * + * @param disabledProtocols the protocols to disable, or null if none should be. + */ + public void setDisabledProtocols(String[] disabledProtocols) { + this.disabledProtocols = disabledProtocols; + } + + /** + * @return the context protocol to use + */ + public String getContextProtocol() { + return contextProtocol; + } + + /** + * The protocol value to use when creating an SSLContext via + * SSLContext.getInstance(protocol). + * + * @param contextProtocol the context protocol to use. + */ + public void setContextProtocol(String contextProtocol) { + this.contextProtocol = contextProtocol; + } + + /** + * @return the trustAll + */ + public boolean isTrustAll() { + return trustAll; + } + + /** + * @param trustAll the trustAll to set + */ + public void setTrustAll(boolean trustAll) { + this.trustAll = trustAll; + } + + /** + * @return the verifyHost + */ + public boolean isVerifyHost() { + return verifyHost; + } + + /** + * @param verifyHost the verifyHost to set + */ + public void setVerifyHost(boolean verifyHost) { + this.verifyHost = verifyHost; + } + + /** + * @return the key alias + */ + public String getKeyAlias() { + return keyAlias; + } + + /** + * @param keyAlias the key alias to use + */ + public void setKeyAlias(String keyAlias) { + this.keyAlias = keyAlias; + } + + public int getDefaultSslPort() { + return defaultSslPort; + } + + public void setDefaultSslPort(int defaultSslPort) { + this.defaultSslPort = defaultSslPort; + } + + @Override + public NettyTransportSslOptions clone() { + return copyOptions(new NettyTransportSslOptions()); + } + + protected NettyTransportSslOptions copyOptions(NettyTransportSslOptions copy) { + super.copyOptions(copy); + + copy.setKeyStoreLocation(getKeyStoreLocation()); + copy.setKeyStorePassword(getKeyStorePassword()); + copy.setTrustStoreLocation(getTrustStoreLocation()); + copy.setTrustStorePassword(getTrustStorePassword()); + copy.setStoreType(getStoreType()); + copy.setEnabledCipherSuites(getEnabledCipherSuites()); + copy.setDisabledCipherSuites(getDisabledCipherSuites()); + copy.setEnabledProtocols(getEnabledProtocols()); + copy.setDisabledProtocols(getDisabledProtocols()); + copy.setTrustAll(isTrustAll()); + copy.setVerifyHost(isVerifyHost()); + copy.setKeyAlias(getKeyAlias()); + copy.setContextProtocol(getContextProtocol()); + return copy; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java new file mode 100644 index 0000000000..18f9630716 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import io.netty.handler.ssl.SslHandler; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.URI; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509ExtendedKeyManager; +import javax.net.ssl.X509TrustManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Static class that provides various utility methods used by Transport implementations. + */ +public class NettyTransportSupport { + + private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class); + + /** + * Creates a Netty SslHandler instance for use in Transports that require + * an SSL encoder / decoder. + * + * @param remote + * The URI of the remote peer that the SslHandler will be used against. + * @param options + * The SSL options object to build the SslHandler instance from. + * + * @return a new SslHandler that is configured from the given options. + * + * @throws Exception if an error occurs while creating the SslHandler instance. + */ + public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception { + return new SslHandler(createSslEngine(remote, createSslContext(options), options)); + } + + /** + * Create a new SSLContext using the options specific in the given TransportSslOptions + * instance. + * + * @param options + * the configured options used to create the SSLContext. + * + * @return a new SSLContext instance. + * + * @throws Exception if an error occurs while creating the context. + */ + public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception { + try { + String contextProtocol = options.getContextProtocol(); + LOG.trace("Getting SSLContext instance using protocol: {}", contextProtocol); + + SSLContext context = SSLContext.getInstance(contextProtocol); + KeyManager[] keyMgrs = loadKeyManagers(options); + TrustManager[] trustManagers = loadTrustManagers(options); + + context.init(keyMgrs, trustManagers, new SecureRandom()); + return context; + } catch (Exception e) { + LOG.error("Failed to create SSLContext: {}", e, e); + throw e; + } + } + + /** + * Create a new SSLEngine instance in client mode from the given SSLContext and + * TransportSslOptions instances. + * + * @param context + * the SSLContext to use when creating the engine. + * @param options + * the TransportSslOptions to use to configure the new SSLEngine. + * + * @return a new SSLEngine instance in client mode. + * + * @throws Exception if an error occurs while creating the new SSLEngine. + */ + public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception { + return createSslEngine(null, context, options); + } + + /** + * Create a new SSLEngine instance in client mode from the given SSLContext and + * TransportSslOptions instances. + * + * @param remote + * the URI of the remote peer that will be used to initialize the engine, may be null if none should. + * @param context + * the SSLContext to use when creating the engine. + * @param options + * the TransportSslOptions to use to configure the new SSLEngine. + * + * @return a new SSLEngine instance in client mode. + * + * @throws Exception if an error occurs while creating the new SSLEngine. + */ + public static SSLEngine createSslEngine(URI remote, SSLContext context, NettyTransportSslOptions options) throws Exception { + SSLEngine engine = null; + if(remote == null) { + engine = context.createSSLEngine(); + } else { + engine = context.createSSLEngine(remote.getHost(), remote.getPort()); + } + + engine.setEnabledProtocols(buildEnabledProtocols(engine, options)); + engine.setEnabledCipherSuites(buildEnabledCipherSuites(engine, options)); + engine.setUseClientMode(true); + + if (options.isVerifyHost()) { + SSLParameters sslParameters = engine.getSSLParameters(); + sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); + engine.setSSLParameters(sslParameters); + } + + return engine; + } + + private static String[] buildEnabledProtocols(SSLEngine engine, NettyTransportSslOptions options) { + List enabledProtocols = new ArrayList(); + + if (options.getEnabledProtocols() != null) { + List configuredProtocols = Arrays.asList(options.getEnabledProtocols()); + LOG.trace("Configured protocols from transport options: {}", configuredProtocols); + enabledProtocols.addAll(configuredProtocols); + } else { + List engineProtocols = Arrays.asList(engine.getEnabledProtocols()); + LOG.trace("Default protocols from the SSLEngine: {}", engineProtocols); + enabledProtocols.addAll(engineProtocols); + } + + String[] disabledProtocols = options.getDisabledProtocols(); + if (disabledProtocols != null) { + List disabled = Arrays.asList(disabledProtocols); + LOG.trace("Disabled protocols: {}", disabled); + enabledProtocols.removeAll(disabled); + } + + LOG.trace("Enabled protocols: {}", enabledProtocols); + + return enabledProtocols.toArray(new String[0]); + } + + private static String[] buildEnabledCipherSuites(SSLEngine engine, NettyTransportSslOptions options) { + List enabledCipherSuites = new ArrayList(); + + if (options.getEnabledCipherSuites() != null) { + List configuredCipherSuites = Arrays.asList(options.getEnabledCipherSuites()); + LOG.trace("Configured cipher suites from transport options: {}", configuredCipherSuites); + enabledCipherSuites.addAll(configuredCipherSuites); + } else { + List engineCipherSuites = Arrays.asList(engine.getEnabledCipherSuites()); + LOG.trace("Default cipher suites from the SSLEngine: {}", engineCipherSuites); + enabledCipherSuites.addAll(engineCipherSuites); + } + + String[] disabledCipherSuites = options.getDisabledCipherSuites(); + if (disabledCipherSuites != null) { + List disabled = Arrays.asList(disabledCipherSuites); + LOG.trace("Disabled cipher suites: {}", disabled); + enabledCipherSuites.removeAll(disabled); + } + + LOG.trace("Enabled cipher suites: {}", enabledCipherSuites); + + return enabledCipherSuites.toArray(new String[0]); + } + + private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception { + if (options.isTrustAll()) { + return new TrustManager[] { createTrustAllTrustManager() }; + } + + if (options.getTrustStoreLocation() == null) { + return null; + } + + TrustManagerFactory fact = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + + String storeLocation = options.getTrustStoreLocation(); + String storePassword = options.getTrustStorePassword(); + String storeType = options.getStoreType(); + + LOG.trace("Attempt to load TrustStore from location {} of type {}", storeLocation, storeType); + + KeyStore trustStore = loadStore(storeLocation, storePassword, storeType); + fact.init(trustStore); + + return fact.getTrustManagers(); + } + + private static KeyManager[] loadKeyManagers(NettyTransportSslOptions options) throws Exception { + if (options.getKeyStoreLocation() == null) { + return null; + } + + KeyManagerFactory fact = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + + String storeLocation = options.getKeyStoreLocation(); + String storePassword = options.getKeyStorePassword(); + String storeType = options.getStoreType(); + String alias = options.getKeyAlias(); + + LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType); + + KeyStore keyStore = loadStore(storeLocation, storePassword, storeType); + fact.init(keyStore, storePassword != null ? storePassword.toCharArray() : null); + + if (alias == null) { + return fact.getKeyManagers(); + } else { + validateAlias(keyStore, alias); + return wrapKeyManagers(alias, fact.getKeyManagers()); + } + } + + private static KeyManager[] wrapKeyManagers(String alias, KeyManager[] origKeyManagers) { + KeyManager[] keyManagers = new KeyManager[origKeyManagers.length]; + for (int i = 0; i < origKeyManagers.length; i++) { + KeyManager km = origKeyManagers[i]; + if (km instanceof X509ExtendedKeyManager) { + km = new X509AliasKeyManager(alias, (X509ExtendedKeyManager) km); + } + + keyManagers[i] = km; + } + + return keyManagers; + } + + private static void validateAlias(KeyStore store, String alias) throws IllegalArgumentException, KeyStoreException { + if (!store.containsAlias(alias)) { + throw new IllegalArgumentException("The alias '" + alias + "' doesn't exist in the key store"); + } + + if (!store.isKeyEntry(alias)) { + throw new IllegalArgumentException("The alias '" + alias + "' in the keystore doesn't represent a key entry"); + } + } + + private static KeyStore loadStore(String storePath, final String password, String storeType) throws Exception { + KeyStore store = KeyStore.getInstance(storeType); + try (InputStream in = new FileInputStream(new File(storePath));) { + store.load(in, password != null ? password.toCharArray() : null); + } + + return store; + } + + private static TrustManager createTrustAllTrustManager() { + return new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + }; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java new file mode 100644 index 0000000000..aa3e89283d --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; + +/** + * A {@link ByteBufAllocator} which is partial pooled. Which means only direct + * {@link ByteBuf}s are pooled. The rest is unpooled. + * + * @author Norman Maurer + */ +public class PartialPooledByteBufAllocator implements ByteBufAllocator { + + private static final ByteBufAllocator POOLED = new PooledByteBufAllocator(false); + private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false); + + public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator(); + + private PartialPooledByteBufAllocator() { + } + + @Override + public ByteBuf buffer() { + return UNPOOLED.heapBuffer(); + } + + @Override + public ByteBuf buffer(int initialCapacity) { + return UNPOOLED.heapBuffer(initialCapacity); + } + + @Override + public ByteBuf buffer(int initialCapacity, int maxCapacity) { + return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf ioBuffer() { + return UNPOOLED.heapBuffer(); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity) { + return UNPOOLED.heapBuffer(initialCapacity); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { + return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf heapBuffer() { + return UNPOOLED.heapBuffer(); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity) { + return UNPOOLED.heapBuffer(initialCapacity); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + return UNPOOLED.heapBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf directBuffer() { + return POOLED.directBuffer(); + } + + @Override + public ByteBuf directBuffer(int initialCapacity) { + return POOLED.directBuffer(initialCapacity); + } + + @Override + public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { + return POOLED.directBuffer(initialCapacity, maxCapacity); + } + + @Override + public CompositeByteBuf compositeBuffer() { + return UNPOOLED.compositeHeapBuffer(); + } + + @Override + public CompositeByteBuf compositeBuffer(int maxNumComponents) { + return UNPOOLED.compositeHeapBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeHeapBuffer() { + return UNPOOLED.compositeHeapBuffer(); + } + + @Override + public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { + return UNPOOLED.compositeHeapBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeDirectBuffer() { + return POOLED.compositeDirectBuffer(); + } + + @Override + public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { + return POOLED.compositeDirectBuffer(); + } + + @Override + public boolean isDirectBufferPooled() { + return true; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java new file mode 100644 index 0000000000..f47f85e105 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.activemq.transport.amqp.client.transport; + +import java.net.Socket; +import java.security.Principal; +import java.security.PrivateKey; +import java.security.cert.X509Certificate; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.X509ExtendedKeyManager; + +/** + * An X509ExtendedKeyManager wrapper which always chooses and only + * returns the given alias, and defers retrieval to the delegate + * key manager. + */ +public class X509AliasKeyManager extends X509ExtendedKeyManager { + private X509ExtendedKeyManager delegate; + private String alias; + + public X509AliasKeyManager(String alias, X509ExtendedKeyManager delegate) throws IllegalArgumentException { + if (alias == null) { + throw new IllegalArgumentException("The given key alias must not be null."); + } + + this.alias = alias; + this.delegate = delegate; + } + + @Override + public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) { + return alias; + } + + @Override + public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) { + return alias; + } + + @Override + public X509Certificate[] getCertificateChain(String alias) { + return delegate.getCertificateChain(alias); + } + + @Override + public String[] getClientAliases(String keyType, Principal[] issuers) { + return new String[] { alias }; + } + + @Override + public PrivateKey getPrivateKey(String alias) { + return delegate.getPrivateKey(alias); + } + + @Override + public String[] getServerAliases(String keyType, Principal[] issuers) { + return new String[] { alias }; + } + + @Override + public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) { + return alias; + } + + @Override + public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) { + return alias; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java index 9f83a1d7ff..01ececb742 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java @@ -20,8 +20,6 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.activemq.util.IOExceptionSupport; - /** * Asynchronous Client Future class. */ diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java deleted file mode 100644 index 5708088361..0000000000 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/ClientTcpTransport.java +++ /dev/null @@ -1,389 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.util; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketException; -import java.net.URI; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import javax.net.SocketFactory; -import javax.net.ssl.SSLSocketFactory; - -import org.apache.activemq.transport.tcp.TcpBufferedInputStream; -import org.apache.activemq.transport.tcp.TcpBufferedOutputStream; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.InetAddressUtil; -import org.fusesource.hawtbuf.Buffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Simple TCP based transport used by the client. - */ -public class ClientTcpTransport implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(ClientTcpTransport.class); - - public interface TransportListener { - - /** - * Called when new incoming data has become available. - * - * @param incoming - * the next incoming packet of data. - */ - void onData(Buffer incoming); - - /** - * Called if the connection state becomes closed. - */ - void onTransportClosed(); - - /** - * Called when an error occurs during normal Transport operations. - * - * @param cause - * the error that triggered this event. - */ - void onTransportError(Throwable cause); - - } - - private final URI remoteLocation; - private final AtomicBoolean connected = new AtomicBoolean(); - private final AtomicBoolean closed = new AtomicBoolean(); - private final AtomicReference connectionError = new AtomicReference(); - - private final Socket socket; - private DataOutputStream dataOut; - private DataInputStream dataIn; - private Thread runner; - private TransportListener listener; - - private int socketBufferSize = 64 * 1024; - private int soTimeout = 0; - private int soLinger = Integer.MIN_VALUE; - private Boolean keepAlive; - private Boolean tcpNoDelay = true; - private boolean useLocalHost = false; - private int ioBufferSize = 8 * 1024; - - /** - * Create a new instance of the transport. - * - * @param listener - * The TransportListener that will receive data from this Transport instance. - * @param remoteLocation - * The remote location where this transport should connection to. - */ - public ClientTcpTransport(URI remoteLocation) { - this.remoteLocation = remoteLocation; - - Socket temp = null; - try { - temp = createSocketFactory().createSocket(); - } catch (IOException e) { - connectionError.set(e); - } - - this.socket = temp; - } - - public void connect() throws IOException { - if (connectionError.get() != null) { - throw IOExceptionSupport.create(connectionError.get()); - } - - if (listener == null) { - throw new IllegalStateException("Cannot connect until a listener has been set."); - } - - if (socket == null) { - throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); - } - - InetSocketAddress remoteAddress = null; - - if (remoteLocation != null) { - String host = resolveHostName(remoteLocation.getHost()); - remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); - } - - socket.connect(remoteAddress); - - connected.set(true); - - initialiseSocket(socket); - initializeStreams(); - - runner = new Thread(null, this, "ClientTcpTransport: " + toString()); - runner.setDaemon(false); - runner.start(); - } - - public void close() { - if (closed.compareAndSet(false, true)) { - if (socket == null) { - return; - } - - // Closing the streams flush the sockets before closing.. if the socket - // is hung.. then this hangs the close so we perform an asynchronous close - // by default which will timeout if the close doesn't happen after a delay. - final CountDownLatch latch = new CountDownLatch(1); - - final ExecutorService closer = Executors.newSingleThreadExecutor(); - closer.execute(new Runnable() { - @Override - public void run() { - LOG.trace("Closing socket {}", socket); - try { - socket.close(); - LOG.debug("Closed socket {}", socket); - } catch (IOException e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); - } - } finally { - latch.countDown(); - } - } - }); - - try { - latch.await(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - closer.shutdownNow(); - } - } - } - - public void send(ByteBuffer output) throws IOException { - checkConnected(); - LOG.trace("Client Transport sending packet of size: {}", output.remaining()); - WritableByteChannel channel = Channels.newChannel(dataOut); - channel.write(output); - dataOut.flush(); - } - - public void send(Buffer output) throws IOException { - checkConnected(); - send(output.toByteBuffer()); - } - - public URI getRemoteURI() { - return this.remoteLocation; - } - - public boolean isConnected() { - return this.connected.get(); - } - - public TransportListener getTransportListener() { - return this.listener; - } - - public void setTransportListener(TransportListener listener) { - if (listener == null) { - throw new IllegalArgumentException("Listener cannot be set to null"); - } - - this.listener = listener; - } - - public int getSocketBufferSize() { - return socketBufferSize; - } - - public void setSocketBufferSize(int socketBufferSize) { - this.socketBufferSize = socketBufferSize; - } - - public int getSoTimeout() { - return soTimeout; - } - - public void setSoTimeout(int soTimeout) { - this.soTimeout = soTimeout; - } - - public boolean isTcpNoDelay() { - return tcpNoDelay; - } - - public void setTcpNoDelay(Boolean tcpNoDelay) { - this.tcpNoDelay = tcpNoDelay; - } - - public int getSoLinger() { - return soLinger; - } - - public void setSoLinger(int soLinger) { - this.soLinger = soLinger; - } - - public boolean isKeepAlive() { - return keepAlive; - } - - public void setKeepAlive(Boolean keepAlive) { - this.keepAlive = keepAlive; - } - - public boolean isUseLocalHost() { - return useLocalHost; - } - - public void setUseLocalHost(boolean useLocalHost) { - this.useLocalHost = useLocalHost; - } - - public int getIoBufferSize() { - return ioBufferSize; - } - - public void setIoBufferSize(int ioBufferSize) { - this.ioBufferSize = ioBufferSize; - } - - //---------- Transport internal implementation ---------------------------// - - @Override - public void run() { - LOG.trace("TCP consumer thread for {} starting", this); - try { - while (isConnected()) { - doRun(); - } - } catch (IOException e) { - connectionError.set(e); - onException(e); - } catch (Throwable e) { - IOException ioe = new IOException("Unexpected error occured: " + e); - connectionError.set(ioe); - ioe.initCause(e); - onException(ioe); - } - } - - protected void doRun() throws IOException { - int size = dataIn.available(); - if (size <= 0) { - try { - TimeUnit.NANOSECONDS.sleep(1); - } catch (InterruptedException e) { - } - return; - } - - byte[] buffer = new byte[size]; - dataIn.readFully(buffer); - Buffer incoming = new Buffer(buffer); - listener.onData(incoming); - } - - /** - * Passes any IO exceptions into the transport listener - */ - public void onException(IOException e) { - if (listener != null) { - try { - listener.onTransportError(e); - } catch (RuntimeException e2) { - LOG.debug("Unexpected runtime exception: {}", e2.getMessage(), e2); - } - } - } - - protected SocketFactory createSocketFactory() throws IOException { - if (remoteLocation.getScheme().equalsIgnoreCase("ssl")) { - return SSLSocketFactory.getDefault(); - } else { - return SocketFactory.getDefault(); - } - } - - protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException { - try { - sock.setReceiveBufferSize(socketBufferSize); - sock.setSendBufferSize(socketBufferSize); - } catch (SocketException se) { - LOG.warn("Cannot set socket buffer size = {}", socketBufferSize); - LOG.debug("Cannot set socket buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se); - } - - sock.setSoTimeout(soTimeout); - - if (keepAlive != null) { - sock.setKeepAlive(keepAlive.booleanValue()); - } - - if (soLinger > -1) { - sock.setSoLinger(true, soLinger); - } else if (soLinger == -1) { - sock.setSoLinger(false, 0); - } - - if (tcpNoDelay != null) { - sock.setTcpNoDelay(tcpNoDelay.booleanValue()); - } - } - - protected void initializeStreams() throws IOException { - try { - TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize); - this.dataIn = new DataInputStream(buffIn); - TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); - this.dataOut = new DataOutputStream(outputStream); - } catch (Throwable e) { - throw IOExceptionSupport.create(e); - } - } - - protected String resolveHostName(String host) throws UnknownHostException { - if (isUseLocalHost()) { - String localName = InetAddressUtil.getLocalHostName(); - if (localName != null && localName.equals(host)) { - return "localhost"; - } - } - return host; - } - - private void checkConnected() throws IOException { - if (!connected.get()) { - throw new IOException("Cannot send to a non-connected transport."); - } - } -} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java new file mode 100644 index 0000000000..b0f35c5ff5 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.io.IOException; + +/** + * Used to make throwing IOException instances easier. + */ +public class IOExceptionSupport { + + /** + * Checks the given cause to determine if it's already an IOException type and + * if not creates a new IOException to wrap it. + * + * @param cause + * The initiating exception that should be cast or wrapped. + * + * @return an IOException instance. + */ + public static IOException create(Throwable cause) { + if (cause instanceof IOException) { + return (IOException) cause; + } + + String message = cause.getMessage(); + if (message == null || message.length() == 0) { + message = cause.toString(); + } + + return new IOException(message, cause); + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/IdGenerator.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/IdGenerator.java new file mode 100644 index 0000000000..bdad73b25d --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/IdGenerator.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generator for Globally unique Strings. + */ +public class IdGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(IdGenerator.class); + private static final String UNIQUE_STUB; + private static int instanceCount; + private static String hostName; + private String seed; + private final AtomicLong sequence = new AtomicLong(1); + private int length; + public static final String PROPERTY_IDGENERATOR_PORT = "activemq.idgenerator.port"; + + static { + String stub = ""; + boolean canAccessSystemProps = true; + try { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkPropertiesAccess(); + } + } catch (SecurityException se) { + canAccessSystemProps = false; + } + + if (canAccessSystemProps) { + int idGeneratorPort = 0; + ServerSocket ss = null; + try { + idGeneratorPort = Integer.parseInt(System.getProperty(PROPERTY_IDGENERATOR_PORT, "0")); + LOG.trace("Using port {}", idGeneratorPort); + hostName = getLocalHostName(); + ss = new ServerSocket(idGeneratorPort); + stub = "-" + ss.getLocalPort() + "-" + System.currentTimeMillis() + "-"; + Thread.sleep(100); + } catch (Exception e) { + if (LOG.isTraceEnabled()) { + LOG.trace("could not generate unique stub by using DNS and binding to local port", e); + } else { + LOG.warn("could not generate unique stub by using DNS and binding to local port: {} {}", e.getClass().getCanonicalName(), e.getMessage()); + } + + // Restore interrupted state so higher level code can deal with it. + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } finally { + if (ss != null) { + try { + ss.close(); + } catch (IOException ioe) { + if (LOG.isTraceEnabled()) { + LOG.trace("Closing the server socket failed", ioe); + } else { + LOG.warn("Closing the server socket failed" + " due " + ioe.getMessage()); + } + } + } + } + } + + if (hostName == null) { + hostName = "localhost"; + } + hostName = sanitizeHostName(hostName); + + if (stub.length() == 0) { + stub = "-1-" + System.currentTimeMillis() + "-"; + } + UNIQUE_STUB = stub; + } + + /** + * Construct an IdGenerator + * + * @param prefix + * The prefix value that is applied to all generated IDs. + */ + public IdGenerator(String prefix) { + synchronized (UNIQUE_STUB) { + this.seed = prefix + UNIQUE_STUB + (instanceCount++) + ":"; + this.length = this.seed.length() + ("" + Long.MAX_VALUE).length(); + } + } + + public IdGenerator() { + this("ID:" + hostName); + } + + /** + * As we have to find the host name as a side-affect of generating a unique stub, we allow + * it's easy retrieval here + * + * @return the local host name + */ + public static String getHostName() { + return hostName; + } + + /** + * Generate a unique id + * + * @return a unique id + */ + public synchronized String generateId() { + StringBuilder sb = new StringBuilder(length); + sb.append(seed); + sb.append(sequence.getAndIncrement()); + return sb.toString(); + } + + public static String sanitizeHostName(String hostName) { + boolean changed = false; + + StringBuilder sb = new StringBuilder(); + for (char ch : hostName.toCharArray()) { + // only include ASCII chars + if (ch < 127) { + sb.append(ch); + } else { + changed = true; + } + } + + if (changed) { + String newHost = sb.toString(); + LOG.info("Sanitized hostname from: {} to: {}", hostName, newHost); + return newHost; + } else { + return hostName; + } + } + + /** + * Generate a unique ID - that is friendly for a URL or file system + * + * @return a unique id + */ + public String generateSanitizedId() { + String result = generateId(); + result = result.replace(':', '-'); + result = result.replace('_', '-'); + result = result.replace('.', '-'); + return result; + } + + /** + * From a generated id - return the seed (i.e. minus the count) + * + * @param id + * the generated identifier + * @return the seed + */ + public static String getSeedFromId(String id) { + String result = id; + if (id != null) { + int index = id.lastIndexOf(':'); + if (index > 0 && (index + 1) < id.length()) { + result = id.substring(0, index); + } + } + return result; + } + + /** + * From a generated id - return the generator count + * + * @param id + * The ID that will be parsed for a sequence number. + * + * @return the sequence value parsed from the given ID. + */ + public static long getSequenceFromId(String id) { + long result = -1; + if (id != null) { + int index = id.lastIndexOf(':'); + + if (index > 0 && (index + 1) < id.length()) { + String numStr = id.substring(index + 1, id.length()); + result = Long.parseLong(numStr); + } + } + return result; + } + + /** + * Does a proper compare on the Id's + * + * @param id1 the lhs of the comparison. + * @param id2 the rhs of the comparison. + * + * @return 0 if equal else a positive if {@literal id1 > id2} ... + */ + public static int compare(String id1, String id2) { + int result = -1; + String seed1 = IdGenerator.getSeedFromId(id1); + String seed2 = IdGenerator.getSeedFromId(id2); + if (seed1 != null && seed2 != null) { + result = seed1.compareTo(seed2); + if (result == 0) { + long count1 = IdGenerator.getSequenceFromId(id1); + long count2 = IdGenerator.getSequenceFromId(id2); + result = (int) (count1 - count2); + } + } + return result; + } + + /** + * When using the {@link java.net.InetAddress#getHostName()} method in an + * environment where neither a proper DNS lookup nor an /etc/hosts + * entry exists for a given host, the following exception will be thrown: + * + * java.net.UnknownHostException: <hostname>: <hostname> + * at java.net.InetAddress.getLocalHost(InetAddress.java:1425) + * ... + * + * Instead of just throwing an UnknownHostException and giving up, this + * method grabs a suitable hostname from the exception and prevents the + * exception from being thrown. If a suitable hostname cannot be acquired + * from the exception, only then is the UnknownHostException thrown. + * + * @return The hostname + * + * @throws UnknownHostException if the given host cannot be looked up. + * + * @see java.net.InetAddress#getLocalHost() + * @see java.net.InetAddress#getHostName() + */ + protected static String getLocalHostName() throws UnknownHostException { + try { + return (InetAddress.getLocalHost()).getHostName(); + } catch (UnknownHostException uhe) { + String host = uhe.getMessage(); // host = "hostname: hostname" + if (host != null) { + int colon = host.indexOf(':'); + if (colon > 0) { + return host.substring(0, colon); + } + } + throw uhe; + } + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/PropertyUtil.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/PropertyUtil.java new file mode 100644 index 0000000000..9be74978a0 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/PropertyUtil.java @@ -0,0 +1,589 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.beans.BeanInfo; +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Method; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; + +import javax.net.ssl.SSLContext; + +/** + * Utilities for properties + */ +public class PropertyUtil { + + /** + * Creates a URI from the original URI and the given parameters. + * + * @param originalURI + * The URI whose current parameters are removed and replaced with the given remainder value. + * @param params + * The URI params that should be used to replace the current ones in the target. + * + * @return a new URI that matches the original one but has its query options replaced with + * the given ones. + * + * @throws URISyntaxException if the given URI is invalid. + */ + public static URI replaceQuery(URI originalURI, Map params) throws URISyntaxException { + String s = createQueryString(params); + if (s.length() == 0) { + s = null; + } + return replaceQuery(originalURI, s); + } + + /** + * Creates a URI with the given query, removing an previous query value from the given URI. + * + * @param uri + * The source URI whose existing query is replaced with the newly supplied one. + * @param query + * The new URI query string that should be appended to the given URI. + * + * @return a new URI that is a combination of the original URI and the given query string. + * + * @throws URISyntaxException if the given URI is invalid. + */ + public static URI replaceQuery(URI uri, String query) throws URISyntaxException { + String schemeSpecificPart = uri.getRawSchemeSpecificPart(); + // strip existing query if any + int questionMark = schemeSpecificPart.lastIndexOf("?"); + // make sure question mark is not within parentheses + if (questionMark < schemeSpecificPart.lastIndexOf(")")) { + questionMark = -1; + } + if (questionMark > 0) { + schemeSpecificPart = schemeSpecificPart.substring(0, questionMark); + } + if (query != null && query.length() > 0) { + schemeSpecificPart += "?" + query; + } + return new URI(uri.getScheme(), schemeSpecificPart, uri.getFragment()); + } + + /** + * Creates a URI with the given query, removing an previous query value from the given URI. + * + * @param uri + * The source URI whose existing query is replaced with the newly supplied one. + * + * @return a new URI that is a combination of the original URI and the given query string. + * + * @throws URISyntaxException if the given URI is invalid. + */ + public static URI eraseQuery(URI uri) throws URISyntaxException { + return replaceQuery(uri, (String) null); + } + + /** + * Given a key / value mapping, create and return a URI formatted query string that is valid + * and can be appended to a URI. + * + * @param options + * The Mapping that will create the new Query string. + * + * @return a URI formatted query string. + * + * @throws URISyntaxException if the given URI is invalid. + */ + public static String createQueryString(Map options) throws URISyntaxException { + try { + if (options.size() > 0) { + StringBuffer rc = new StringBuffer(); + boolean first = true; + for (Entry entry : options.entrySet()) { + if (first) { + first = false; + } else { + rc.append("&"); + } + rc.append(URLEncoder.encode(entry.getKey(), "UTF-8")); + rc.append("="); + rc.append(URLEncoder.encode((String) entry.getValue(), "UTF-8")); + } + return rc.toString(); + } else { + return ""; + } + } catch (UnsupportedEncodingException e) { + throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e); + } + } + + /** + * Get properties from a URI and return them in a new {@code Map} instance. + * + * If the URI is null or the query string of the URI is null an empty Map is returned. + * + * @param uri + * the URI whose parameters are to be parsed. + * + * @return Map of properties + * + * @throws Exception if an error occurs while parsing the query options. + */ + public static Map parseParameters(URI uri) throws Exception { + if (uri == null || uri.getQuery() == null) { + return Collections.emptyMap(); + } + + return parseQuery(stripPrefix(uri.getQuery(), "?")); + } + + /** + * Parse properties from a named resource -eg. a URI or a simple name e.g. + * {@literal foo?name="fred"&size=2} + * + * @param uri + * the URI whose parameters are to be parsed. + * + * @return Map of properties + * + * @throws Exception if an error occurs while parsing the query options. + */ + public static Map parseParameters(String uri) throws Exception { + if (uri == null) { + return Collections.emptyMap(); + } + + return parseQuery(stripUpto(uri, '?')); + } + + /** + * Get properties from a URI query string. + * + * @param queryString + * the string value returned from a call to the URI class getQuery method. + * + * @return Map of properties from the parsed string. + * + * @throws Exception if an error occurs while parsing the query options. + */ + public static Map parseQuery(String queryString) throws Exception { + if (queryString != null && !queryString.isEmpty()) { + Map rc = new HashMap(); + String[] parameters = queryString.split("&"); + for (int i = 0; i < parameters.length; i++) { + int p = parameters[i].indexOf("="); + if (p >= 0) { + String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8"); + String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8"); + rc.put(name, value); + } else { + rc.put(parameters[i], null); + } + } + return rc; + } + + return Collections.emptyMap(); + } + + /** + * Given a map of properties, filter out only those prefixed with the given value, the + * values filtered are returned in a new Map instance. + * + * @param properties + * The map of properties to filter. + * @param optionPrefix + * The prefix value to use when filtering. + * + * @return a filter map with only values that match the given prefix. + */ + public static Map filterProperties(Map properties, String optionPrefix) { + if (properties == null) { + throw new IllegalArgumentException("The given properties object was null."); + } + + HashMap rc = new HashMap(properties.size()); + + for (Iterator> iter = properties.entrySet().iterator(); iter.hasNext();) { + Entry entry = iter.next(); + if (entry.getKey().startsWith(optionPrefix)) { + String name = entry.getKey().substring(optionPrefix.length()); + rc.put(name, entry.getValue()); + iter.remove(); + } + } + + return rc; + } + + /** + * Enumerate the properties of the target object and add them as additional entries + * to the query string of the given string URI. + * + * @param uri + * The string URI value to append the object properties to. + * @param bean + * The Object whose properties will be added to the target URI. + * + * @return a new String value that is the original URI with the added bean properties. + * + * @throws Exception if an error occurs while enumerating the bean properties. + */ + public static String addPropertiesToURIFromBean(String uri, Object bean) throws Exception { + Map properties = PropertyUtil.getProperties(bean); + return PropertyUtil.addPropertiesToURI(uri, properties); + } + + /** + * Enumerate the properties of the target object and add them as additional entries + * to the query string of the given URI. + * + * @param uri + * The URI value to append the object properties to. + * @param properties + * The Object whose properties will be added to the target URI. + * + * @return a new String value that is the original URI with the added bean properties. + * + * @throws Exception if an error occurs while enumerating the bean properties. + */ + public static String addPropertiesToURI(URI uri, Map properties) throws Exception { + return addPropertiesToURI(uri.toString(), properties); + } + + /** + * Append the given properties to the query portion of the given URI. + * + * @param uri + * The string URI value to append the object properties to. + * @param properties + * The properties that will be added to the target URI. + * + * @return a new String value that is the original URI with the added properties. + * + * @throws Exception if an error occurs while building the new URI string. + */ + public static String addPropertiesToURI(String uri, Map properties) throws Exception { + String result = uri; + if (uri != null && properties != null) { + StringBuilder base = new StringBuilder(stripBefore(uri, '?')); + Map map = parseParameters(uri); + if (!map.isEmpty()) { + map.putAll(properties); + } else { + map = properties; + } + if (!map.isEmpty()) { + base.append('?'); + boolean first = true; + for (Map.Entry entry : map.entrySet()) { + if (!first) { + base.append('&'); + } + first = false; + base.append(entry.getKey()).append("=").append(entry.getValue()); + } + result = base.toString(); + } + } + return result; + } + + /** + * Set properties on an object using the provided map. The return value + * indicates if all properties from the given map were set on the target object. + * + * @param target + * the object whose properties are to be set from the map options. + * @param properties + * the properties that should be applied to the given object. + * + * @return true if all values in the properties map were applied to the target object. + */ + public static Map setProperties(Object target, Map properties) { + if (target == null) { + throw new IllegalArgumentException("target object cannot be null"); + } + if (properties == null) { + throw new IllegalArgumentException("Given Properties object cannot be null"); + } + + Map unmatched = new HashMap(); + + for (Map.Entry entry : properties.entrySet()) { + if (!setProperty(target, entry.getKey(), entry.getValue())) { + unmatched.put(entry.getKey(), entry.getValue()); + } + } + + return Collections.unmodifiableMap(unmatched); + } + + //TODO: common impl for above and below methods. + + /** + * Set properties on an object using the provided Properties object. The return value + * indicates if all properties from the given map were set on the target object. + * + * @param target + * the object whose properties are to be set from the map options. + * @param properties + * the properties that should be applied to the given object. + * + * @return an unmodifiable map with any values that could not be applied to the target. + */ + public static Map setProperties(Object target, Properties properties) { + if (target == null) { + throw new IllegalArgumentException("target object cannot be null"); + } + if (properties == null) { + throw new IllegalArgumentException("Given Properties object cannot be null"); + } + + Map unmatched = new HashMap(); + + for (Map.Entry entry : properties.entrySet()) { + if (!setProperty(target, (String) entry.getKey(), entry.getValue())) { + unmatched.put((String) entry.getKey(), entry.getValue()); + } + } + + return Collections.unmodifiableMap(unmatched); + } + + /** + * Get properties from an object using reflection. If the passed object is null an + * empty Map is returned. + * + * @param object + * the Object whose properties are to be extracted. + * + * @return Map of properties extracted from the given object. + * + * @throws Exception if an error occurs while examining the object's properties. + */ + public static Map getProperties(Object object) throws Exception { + if (object == null) { + return Collections.emptyMap(); + } + + Map properties = new LinkedHashMap(); + BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass()); + Object[] NULL_ARG = {}; + PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors(); + if (propertyDescriptors != null) { + for (int i = 0; i < propertyDescriptors.length; i++) { + PropertyDescriptor pd = propertyDescriptors[i]; + if (pd.getReadMethod() != null && !pd.getName().equals("class") && !pd.getName().equals("properties") && !pd.getName().equals("reference")) { + Object value = pd.getReadMethod().invoke(object, NULL_ARG); + if (value != null) { + if (value instanceof Boolean || value instanceof Number || value instanceof String || value instanceof URI || value instanceof URL) { + properties.put(pd.getName(), ("" + value)); + } else if (value instanceof SSLContext) { + // ignore this one.. + } else { + Map inner = getProperties(value); + for (Map.Entry entry : inner.entrySet()) { + properties.put(pd.getName() + "." + entry.getKey(), entry.getValue()); + } + } + } + } + } + } + + return properties; + } + + /** + * Find a specific property getter in a given object based on a property name. + * + * @param object + * the object to search. + * @param name + * the property name to search for. + * + * @return the result of invoking the specific property get method. + * + * @throws Exception if an error occurs while searching the object's bean info. + */ + public static Object getProperty(Object object, String name) throws Exception { + BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass()); + PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors(); + if (propertyDescriptors != null) { + for (int i = 0; i < propertyDescriptors.length; i++) { + PropertyDescriptor pd = propertyDescriptors[i]; + if (pd.getReadMethod() != null && pd.getName().equals(name)) { + return pd.getReadMethod().invoke(object); + } + } + } + return null; + } + + /** + * Set a property named property on a given Object. + *

+ * The object is searched for an set method that would match the given named + * property and if one is found. If necessary an attempt will be made to convert + * the new value to an acceptable type. + * + * @param target + * The object whose property is to be set. + * @param name + * The name of the property to set. + * @param value + * The new value to set for the named property. + * + * @return true if the property was able to be set on the target object. + */ + public static boolean setProperty(Object target, String name, Object value) { + try { + int dotPos = name.indexOf("."); + while (dotPos >= 0) { + String getterName = name.substring(0, dotPos); + target = getProperty(target, getterName); + name = name.substring(dotPos + 1); + dotPos = name.indexOf("."); + } + + Class clazz = target.getClass(); + Method setter = findSetterMethod(clazz, name); + if (setter == null) { + return false; + } + // If the type is null or it matches the needed type, just use the + // value directly + if (value == null || value.getClass() == setter.getParameterTypes()[0]) { + setter.invoke(target, new Object[] { value }); + } else { + setter.invoke(target, new Object[] { convert(value, setter.getParameterTypes()[0]) }); + } + return true; + } catch (Throwable ignore) { + return false; + } + } + + /** + * Return a String minus the given prefix. If the string does not start + * with the given prefix the original string value is returned. + * + * @param value + * The String whose prefix is to be removed. + * @param prefix + * The prefix string to remove from the target string. + * + * @return stripped version of the original input string. + */ + public static String stripPrefix(String value, String prefix) { + if (value != null && prefix != null && value.startsWith(prefix)) { + return value.substring(prefix.length()); + } + return value; + } + + /** + * Return a portion of a String value by looking beyond the given + * character. + * + * @param value + * The string value to split + * @param c + * The character that marks the split point. + * + * @return the sub-string value starting beyond the given character. + */ + public static String stripUpto(String value, char c) { + String result = null; + if (value != null) { + int index = value.indexOf(c); + if (index > 0) { + result = value.substring(index + 1); + } + } + return result; + } + + /** + * Return a String up to and including character + * + * @param value + * The string value to split + * @param c + * The character that marks the start of split point. + * + * @return the sub-string value starting from the given character. + */ + public static String stripBefore(String value, char c) { + String result = value; + if (value != null) { + int index = value.indexOf(c); + if (index > 0) { + result = value.substring(0, index); + } + } + return result; + } + + private static Method findSetterMethod(Class clazz, String name) { + // Build the method name. + name = "set" + name.substring(0, 1).toUpperCase() + name.substring(1); + Method[] methods = clazz.getMethods(); + for (int i = 0; i < methods.length; i++) { + Method method = methods[i]; + Class params[] = method.getParameterTypes(); + if (method.getName().equals(name) && params.length == 1) { + return method; + } + } + return null; + } + + private static Object convert(Object value, Class type) throws Exception { + if (value == null) { + if (boolean.class.isAssignableFrom(type)) { + return Boolean.FALSE; + } + return null; + } + + if (type.isAssignableFrom(value.getClass())) { + return type.cast(value); + } + + // special for String[] as we do not want to use a PropertyEditor for that + if (type.isAssignableFrom(String[].class)) { + return StringArrayConverter.convertToStringArray(value); + } + + if (type == URI.class) { + return new URI(value.toString()); + } + + return TypeConversionSupport.convert(value, type); + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/StringArrayConverter.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/StringArrayConverter.java new file mode 100644 index 0000000000..e595105826 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/StringArrayConverter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.StringTokenizer; + +/** + * Class for converting to/from String[] to be used instead of a + * {@link java.beans.PropertyEditor} which otherwise causes memory leaks as the + * JDK {@link java.beans.PropertyEditorManager} is a static class and has strong + * references to classes, causing problems in hot-deployment environments. + */ +public class StringArrayConverter { + + public static String[] convertToStringArray(Object value) { + if (value == null) { + return null; + } + + String text = value.toString(); + if (text == null || text.isEmpty()) { + return null; + } + + StringTokenizer stok = new StringTokenizer(text, ","); + final List list = new ArrayList(); + + while (stok.hasMoreTokens()) { + list.add(stok.nextToken()); + } + + String[] array = list.toArray(new String[list.size()]); + return array; + } + + public static String convertToString(String[] value) { + if (value == null || value.length == 0) { + return null; + } + + StringBuffer result = new StringBuffer(String.valueOf(value[0])); + for (int i = 1; i < value.length; i++) { + result.append(",").append(value[i]); + } + + return result.toString(); + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/TypeConversionSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/TypeConversionSupport.java new file mode 100644 index 0000000000..efa10b594c --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/TypeConversionSupport.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.util.Date; +import java.util.HashMap; + +public final class TypeConversionSupport { + + static class ConversionKey { + final Class from; + final Class to; + final int hashCode; + + public ConversionKey(Class from, Class to) { + this.from = from; + this.to = to; + this.hashCode = from.hashCode() ^ (to.hashCode() << 1); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || o.getClass() != this.getClass()) { + return false; + } + + ConversionKey x = (ConversionKey) o; + return x.from == from && x.to == to; + } + + @Override + public int hashCode() { + return hashCode; + } + } + + interface Converter { + Object convert(Object value); + } + + private static final HashMap CONVERSION_MAP = new HashMap(); + + static { + Converter toStringConverter = new Converter() { + @Override + public Object convert(Object value) { + return value.toString(); + } + }; + CONVERSION_MAP.put(new ConversionKey(Boolean.class, String.class), toStringConverter); + CONVERSION_MAP.put(new ConversionKey(Byte.class, String.class), toStringConverter); + CONVERSION_MAP.put(new ConversionKey(Short.class, String.class), toStringConverter); + CONVERSION_MAP.put(new ConversionKey(Integer.class, String.class), toStringConverter); + CONVERSION_MAP.put(new ConversionKey(Long.class, String.class), toStringConverter); + CONVERSION_MAP.put(new ConversionKey(Float.class, String.class), toStringConverter); + CONVERSION_MAP.put(new ConversionKey(Double.class, String.class), toStringConverter); + + CONVERSION_MAP.put(new ConversionKey(String.class, Boolean.class), new Converter() { + @Override + public Object convert(Object value) { + return Boolean.valueOf((String) value); + } + }); + CONVERSION_MAP.put(new ConversionKey(String.class, Byte.class), new Converter() { + @Override + public Object convert(Object value) { + return Byte.valueOf((String) value); + } + }); + CONVERSION_MAP.put(new ConversionKey(String.class, Short.class), new Converter() { + @Override + public Object convert(Object value) { + return Short.valueOf((String) value); + } + }); + CONVERSION_MAP.put(new ConversionKey(String.class, Integer.class), new Converter() { + @Override + public Object convert(Object value) { + return Integer.valueOf((String) value); + } + }); + CONVERSION_MAP.put(new ConversionKey(String.class, Long.class), new Converter() { + @Override + public Object convert(Object value) { + return Long.valueOf((String) value); + } + }); + CONVERSION_MAP.put(new ConversionKey(String.class, Float.class), new Converter() { + @Override + public Object convert(Object value) { + return Float.valueOf((String) value); + } + }); + CONVERSION_MAP.put(new ConversionKey(String.class, Double.class), new Converter() { + @Override + public Object convert(Object value) { + return Double.valueOf((String) value); + } + }); + + Converter longConverter = new Converter() { + @Override + public Object convert(Object value) { + return Long.valueOf(((Number) value).longValue()); + } + }; + CONVERSION_MAP.put(new ConversionKey(Byte.class, Long.class), longConverter); + CONVERSION_MAP.put(new ConversionKey(Short.class, Long.class), longConverter); + CONVERSION_MAP.put(new ConversionKey(Integer.class, Long.class), longConverter); + CONVERSION_MAP.put(new ConversionKey(Date.class, Long.class), new Converter() { + @Override + public Object convert(Object value) { + return Long.valueOf(((Date) value).getTime()); + } + }); + + Converter intConverter = new Converter() { + @Override + public Object convert(Object value) { + return Integer.valueOf(((Number) value).intValue()); + } + }; + CONVERSION_MAP.put(new ConversionKey(Byte.class, Integer.class), intConverter); + CONVERSION_MAP.put(new ConversionKey(Short.class, Integer.class), intConverter); + + CONVERSION_MAP.put(new ConversionKey(Byte.class, Short.class), new Converter() { + @Override + public Object convert(Object value) { + return Short.valueOf(((Number) value).shortValue()); + } + }); + + CONVERSION_MAP.put(new ConversionKey(Float.class, Double.class), new Converter() { + @Override + public Object convert(Object value) { + return new Double(((Number) value).doubleValue()); + } + }); + } + + public static Object convert(Object value, Class toClass) { + + assert value != null && toClass != null; + + if (value.getClass() == toClass) { + return value; + } + + Class fromClass = value.getClass(); + + if (fromClass.isPrimitive()) { + fromClass = convertPrimitiveTypeToWrapperType(fromClass); + } + + if (toClass.isPrimitive()) { + toClass = convertPrimitiveTypeToWrapperType(toClass); + } + + Converter c = CONVERSION_MAP.get(new ConversionKey(fromClass, toClass)); + if (c == null) { + return null; + } + + return c.convert(value); + } + + private static Class convertPrimitiveTypeToWrapperType(Class type) { + Class rc = type; + if (type.isPrimitive()) { + if (type == int.class) { + rc = Integer.class; + } else if (type == long.class) { + rc = Long.class; + } else if (type == double.class) { + rc = Double.class; + } else if (type == float.class) { + rc = Float.class; + } else if (type == short.class) { + rc = Short.class; + } else if (type == byte.class) { + rc = Byte.class; + } else if (type == boolean.class) { + rc = Boolean.class; + } + } + + return rc; + } + + private TypeConversionSupport() {} +}