NO-JIRA Update test client to have no real linkage to the activemq

internals to make it easier to share the tests with Artemis.
This commit is contained in:
Timothy Bish 2015-11-04 12:47:57 -05:00
parent fd0f71a4c8
commit 82a5839fc7
18 changed files with 2706 additions and 406 deletions

View File

@ -21,7 +21,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
import org.apache.qpid.proton.amqp.Symbol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -91,7 +92,7 @@ public class AmqpClient {
throw new IllegalArgumentException("Password must be null if user name value is null");
}
ClientTcpTransport transport = new ClientTcpTransport(remoteURI);
NettyTransport transport = NettyTransportFactory.createTransport(remoteURI);
AmqpConnection connection = new AmqpConnection(transport, username, password);
connection.setOfferedCapabilities(getOfferedCapabilities());

View File

@ -17,6 +17,9 @@
package org.apache.activemq.transport.amqp.client;
import static org.apache.activemq.transport.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.net.URI;
@ -34,10 +37,11 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport;
import org.apache.activemq.transport.amqp.client.util.IdGenerator;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
import org.apache.activemq.util.IdGenerator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
@ -47,11 +51,10 @@ import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpConnection extends AmqpAbstractResource<Connection> implements ClientTcpTransport.TransportListener {
public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
@ -69,7 +72,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicLong sessionIdGenerator = new AtomicLong();
private final Collector protonCollector = new CollectorImpl();
private final ClientTcpTransport transport;
private final NettyTransport transport;
private final Transport protonTransport = Transport.Factory.create();
private final String username;
@ -90,7 +93,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
public AmqpConnection(ClientTcpTransport transport, String username, String password) {
public AmqpConnection(NettyTransport transport, String username, String password) {
setEndpoint(Connection.Factory.create());
getEndpoint().collect(protonCollector);
@ -98,7 +101,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
this.username = username;
this.password = password;
this.connectionId = CONNECTION_ID_GENERATOR.generateId();
this.remoteURI = transport.getRemoteURI();
this.remoteURI = transport.getRemoteLocation();
this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@ -257,7 +260,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
public void run() {
checkClosed();
try {
transport.send(ByteBuffer.wrap(rawData));
transport.send(Unpooled.wrappedBuffer(rawData));
} catch (IOException e) {
fireClientException(e);
} finally {
@ -409,7 +412,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
while (!done) {
ByteBuffer toWrite = protonTransport.getOutputBuffer();
if (toWrite != null && toWrite.hasRemaining()) {
transport.send(toWrite);
ByteBuf outbound = transport.allocateSendBuffer(toWrite.remaining());
outbound.writeBytes(toWrite);
transport.send(outbound);
protonTransport.outputConsumed();
} else {
done = true;
@ -423,12 +428,16 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
//----- Transport listener event hooks -----------------------------------//
@Override
public void onData(final Buffer input) {
public void onData(final ByteBuf incoming) {
// We need to retain until the serializer gets around to processing it.
ReferenceCountUtil.retain(incoming);
serializer.execute(new Runnable() {
@Override
public void run() {
ByteBuffer source = input.toByteBuffer();
ByteBuffer source = incoming.nioBuffer();
LOG.trace("Received from Broker {} bytes:", source.remaining());
if (protonTransport.isClosed()) {
@ -446,6 +455,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
source.position(source.position() + limit);
} while (source.hasRemaining());
ReferenceCountUtil.release(incoming);
// Process the state changes from the latest data and then answer back
// any pending updates to the Broker.
processUpdates();
@ -498,7 +509,10 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
}
}
} catch (Exception e) {
try {
transport.close();
} catch (IOException e1) {
}
fireClientException(e);
}
}

View File

@ -32,8 +32,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidDestinationException;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;

View File

@ -0,0 +1,389 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TCP based transport that uses Netty as the underlying IO layer.
*/
public class NettyTransport {
private static final Logger LOG = LoggerFactory.getLogger(NettyTransport.class);
private static final int QUIET_PERIOD = 20;
private static final int SHUTDOWN_TIMEOUT = 100;
protected Bootstrap bootstrap;
protected EventLoopGroup group;
protected Channel channel;
protected NettyTransportListener listener;
protected NettyTransportOptions options;
protected final URI remote;
protected boolean secure;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private final CountDownLatch connectLatch = new CountDownLatch(1);
private IOException failureCause;
private Throwable pendingFailure;
/**
* Create a new transport instance
*
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyTransport(URI remoteLocation, NettyTransportOptions options) {
this(null, remoteLocation, options);
}
/**
* Create a new transport instance
*
* @param listener
* the TransportListener that will receive events from this Transport.
* @param remoteLocation
* the URI that defines the remote resource to connect to.
* @param options
* the transport options used to configure the socket connection.
*/
public NettyTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
this.options = options;
this.listener = listener;
this.remote = remoteLocation;
this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
}
public void connect() throws IOException {
if (listener == null) {
throw new IllegalStateException("A transport listener must be set before connection attempts.");
}
group = new NioEventLoopGroup(1);
bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel connectedChannel) throws Exception {
configureChannel(connectedChannel);
}
});
configureNetty(bootstrap, getTransportOptions());
ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
handleConnected(future.channel());
} else if (future.isCancelled()) {
connectionFailed(new IOException("Connection attempt was cancelled"));
} else {
connectionFailed(IOExceptionSupport.create(future.cause()));
}
}
});
try {
connectLatch.await();
} catch (InterruptedException ex) {
LOG.debug("Transport connection was interrupted.");
Thread.interrupted();
failureCause = IOExceptionSupport.create(ex);
}
if (failureCause != null) {
// Close out any Netty resources now as they are no longer needed.
if (channel != null) {
channel.close();
channel = null;
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
group = null;
}
throw failureCause;
} else {
// Connected, allow any held async error to fire now and close the transport.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (pendingFailure != null) {
channel.pipeline().fireExceptionCaught(pendingFailure);
}
}
});
}
}
public boolean isSSL() {
return secure;
}
public boolean isConnected() {
return connected.get();
}
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connected.set(false);
if (channel != null) {
channel.close().syncUninterruptibly();
}
if (group != null) {
group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
}
}
}
public ByteBuf allocateSendBuffer(int size) throws IOException {
checkConnected();
return channel.alloc().ioBuffer(size, size);
}
public void send(ByteBuf output) throws IOException {
checkConnected();
int length = output.readableBytes();
if (length == 0) {
return;
}
LOG.trace("Attempted write of: {} bytes", length);
channel.writeAndFlush(output);
}
public NettyTransportListener getTransportListener() {
return listener;
}
public void setTransportListener(NettyTransportListener listener) {
this.listener = listener;
}
public NettyTransportOptions getTransportOptions() {
if (options == null) {
if (isSSL()) {
options = NettyTransportSslOptions.INSTANCE;
} else {
options = NettyTransportOptions.INSTANCE;
}
}
return options;
}
public URI getRemoteLocation() {
return remote;
}
public Principal getLocalPrincipal() {
if (!isSSL()) {
throw new UnsupportedOperationException("Not connected to a secure channel");
}
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
return sslHandler.engine().getSession().getLocalPrincipal();
}
//----- Internal implementation details, can be overridden as needed --//
protected String getRemoteHost() {
return remote.getHost();
}
protected int getRemotePort() {
int port = remote.getPort();
if (port <= 0) {
if (isSSL()) {
port = getSslOptions().getDefaultSslPort();
} else {
port = getTransportOptions().getDefaultTcpPort();
}
}
return port;
}
protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
if (options.getSendBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
}
if (options.getReceiveBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
}
if (options.getTrafficClass() != -1) {
bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
}
}
protected void configureChannel(Channel channel) throws Exception {
if (isSSL()) {
channel.pipeline().addLast(NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()));
}
channel.pipeline().addLast(new NettyTcpTransportHandler());
}
protected void handleConnected(final Channel channel) throws Exception {
if (isSSL()) {
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
Future<Channel> channelFuture = sslHandler.handshakeFuture();
channelFuture.addListener(new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(Future<Channel> future) throws Exception {
if (future.isSuccess()) {
LOG.trace("SSL Handshake has completed: {}", channel);
connectionEstablished(channel);
} else {
LOG.trace("SSL Handshake has failed: {}", channel);
connectionFailed(IOExceptionSupport.create(future.cause()));
}
}
});
} else {
connectionEstablished(channel);
}
}
//----- State change handlers and checks ---------------------------------//
/**
* Called when the transport has successfully connected and is ready for use.
*/
protected void connectionEstablished(Channel connectedChannel) {
channel = connectedChannel;
connected.set(true);
connectLatch.countDown();
}
/**
* Called when the transport connection failed and an error should be returned.
*
* @param cause
* An IOException that describes the cause of the failed connection.
*/
protected void connectionFailed(IOException cause) {
failureCause = IOExceptionSupport.create(cause);
connected.set(false);
connectLatch.countDown();
}
private NettyTransportSslOptions getSslOptions() {
return (NettyTransportSslOptions) getTransportOptions();
}
private void checkConnected() throws IOException {
if (!connected.get()) {
throw new IOException("Cannot send to a non-connected transport.");
}
}
//----- Handle connection events -----------------------------------------//
private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has become active! Channel is {}", context.channel());
}
@Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportClosed listener");
listener.onTransportClosed();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
LOG.trace("Exception on channel! Channel is {}", context.channel());
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportError listener");
if (pendingFailure != null) {
listener.onTransportError(pendingFailure);
} else {
listener.onTransportError(cause);
}
} else {
// Hold the first failure for later dispatch if connect succeeds.
// This will then trigger disconnect using the first error reported.
if (pendingFailure != null) {
LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
pendingFailure = cause;
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
listener.onData(buffer);
}
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
import java.net.URI;
import java.util.Map;
import org.apache.qpid.jms.util.PropertyUtil;
/**
* Factory for creating the Netty based TCP Transport.
*/
public final class NettyTransportFactory {
private NettyTransportFactory() {}
/**
* Creates an instance of the given Transport and configures it using the
* properties set on the given remote broker URI.
*
* @param remoteURI
* The URI used to connect to a remote Peer.
*
* @return a new Transport instance.
*
* @throws Exception if an error occurs while creating the Transport instance.
*/
public static NettyTransport createTransport(URI remoteURI) throws Exception {
Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport.");
NettyTransportOptions transportOptions = null;
remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
if (!remoteURI.getScheme().equalsIgnoreCase("ssl")) {
transportOptions = NettyTransportOptions.INSTANCE.clone();
} else {
transportOptions = NettyTransportSslOptions.INSTANCE.clone();
}
Map<String, String> unused = PropertyUtil.setProperties(transportOptions, transportURIOptions);
if (!unused.isEmpty()) {
String msg = " Not all transport options could be set on the TCP based" +
" Transport. Check the options are spelled correctly." +
" Unused parameters=[" + unused + "]." +
" This provider instance cannot be started.";
throw new IllegalArgumentException(msg);
}
NettyTransport result = new NettyTransport(remoteURI, transportOptions);
return result;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
import io.netty.buffer.ByteBuf;
/**
* Listener interface that should be implemented by users of the various
* QpidJMS Transport classes.
*/
public interface NettyTransportListener {
/**
* Called when new incoming data has become available.
*
* @param incoming
* the next incoming packet of data.
*/
void onData(ByteBuf incoming);
/**
* Called if the connection state becomes closed.
*/
void onTransportClosed();
/**
* Called when an error occurs during normal Transport operations.
*
* @param cause
* the error that triggered this event.
*/
void onTransportError(Throwable cause);
}

View File

@ -0,0 +1,183 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
/**
* Encapsulates all the TCP Transport options in one configuration object.
*/
public class NettyTransportOptions {
public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE;
public static final int DEFAULT_TRAFFIC_CLASS = 0;
public static final boolean DEFAULT_TCP_NO_DELAY = true;
public static final boolean DEFAULT_TCP_KEEP_ALIVE = false;
public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE;
public static final int DEFAULT_SO_TIMEOUT = -1;
public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
public static final int DEFAULT_TCP_PORT = 5672;
public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();
private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
private int trafficClass = DEFAULT_TRAFFIC_CLASS;
private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private int soTimeout = DEFAULT_SO_TIMEOUT;
private int soLinger = DEFAULT_SO_LINGER;
private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
private int defaultTcpPort = DEFAULT_TCP_PORT;
/**
* @return the currently set send buffer size in bytes.
*/
public int getSendBufferSize() {
return sendBufferSize;
}
/**
* Sets the send buffer size in bytes, the value must be greater than zero
* or an {@link IllegalArgumentException} will be thrown.
*
* @param sendBufferSize
* the new send buffer size for the TCP Transport.
*
* @throws IllegalArgumentException if the value given is not in the valid range.
*/
public void setSendBufferSize(int sendBufferSize) {
if (sendBufferSize <= 0) {
throw new IllegalArgumentException("The send buffer size must be > 0");
}
this.sendBufferSize = sendBufferSize;
}
/**
* @return the currently configured receive buffer size in bytes.
*/
public int getReceiveBufferSize() {
return receiveBufferSize;
}
/**
* Sets the receive buffer size in bytes, the value must be greater than zero
* or an {@link IllegalArgumentException} will be thrown.
*
* @param receiveBufferSize
* the new receive buffer size for the TCP Transport.
*
* @throws IllegalArgumentException if the value given is not in the valid range.
*/
public void setReceiveBufferSize(int receiveBufferSize) {
if (receiveBufferSize <= 0) {
throw new IllegalArgumentException("The send buffer size must be > 0");
}
this.receiveBufferSize = receiveBufferSize;
}
/**
* @return the currently configured traffic class value.
*/
public int getTrafficClass() {
return trafficClass;
}
/**
* Sets the traffic class value used by the TCP connection, valid
* range is between 0 and 255.
*
* @param trafficClass
* the new traffic class value.
*
* @throws IllegalArgumentException if the value given is not in the valid range.
*/
public void setTrafficClass(int trafficClass) {
if (trafficClass < 0 || trafficClass > 255) {
throw new IllegalArgumentException("Traffic class must be in the range [0..255]");
}
this.trafficClass = trafficClass;
}
public int getSoTimeout() {
return soTimeout;
}
public void setSoTimeout(int soTimeout) {
this.soTimeout = soTimeout;
}
public boolean isTcpNoDelay() {
return tcpNoDelay;
}
public void setTcpNoDelay(boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
public int getSoLinger() {
return soLinger;
}
public void setSoLinger(int soLinger) {
this.soLinger = soLinger;
}
public boolean isTcpKeepAlive() {
return tcpKeepAlive;
}
public void setTcpKeepAlive(boolean keepAlive) {
this.tcpKeepAlive = keepAlive;
}
public int getConnectTimeout() {
return connectTimeout;
}
public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}
public int getDefaultTcpPort() {
return defaultTcpPort;
}
public void setDefaultTcpPort(int defaultTcpPort) {
this.defaultTcpPort = defaultTcpPort;
}
@Override
public NettyTransportOptions clone() {
return copyOptions(new NettyTransportOptions());
}
protected NettyTransportOptions copyOptions(NettyTransportOptions copy) {
copy.setConnectTimeout(getConnectTimeout());
copy.setReceiveBufferSize(getReceiveBufferSize());
copy.setSendBufferSize(getSendBufferSize());
copy.setSoLinger(getSoLinger());
copy.setSoTimeout(getSoTimeout());
copy.setTcpKeepAlive(isTcpKeepAlive());
copy.setTcpNoDelay(isTcpNoDelay());
copy.setTrafficClass(getTrafficClass());
return copy;
}
}

View File

@ -0,0 +1,287 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Holds the defined SSL options for connections that operate over a secure
* transport. Options are read from the environment and can be overridden by
* specifying them on the connection URI.
*/
public class NettyTransportSslOptions extends NettyTransportOptions {
public static final String DEFAULT_STORE_TYPE = "jks";
public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS";
public static final boolean DEFAULT_TRUST_ALL = false;
public static final boolean DEFAULT_VERIFY_HOST = true;
public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"}));
public static final int DEFAULT_SSL_PORT = 5671;
public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions();
private String keyStoreLocation;
private String keyStorePassword;
private String trustStoreLocation;
private String trustStorePassword;
private String storeType = DEFAULT_STORE_TYPE;
private String[] enabledCipherSuites;
private String[] disabledCipherSuites;
private String[] enabledProtocols;
private String[] disabledProtocols = DEFAULT_DISABLED_PROTOCOLS.toArray(new String[0]);
private String contextProtocol = DEFAULT_CONTEXT_PROTOCOL;
private boolean trustAll = DEFAULT_TRUST_ALL;
private boolean verifyHost = DEFAULT_VERIFY_HOST;
private String keyAlias;
private int defaultSslPort = DEFAULT_SSL_PORT;
static {
INSTANCE.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore"));
INSTANCE.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
INSTANCE.setTrustStoreLocation(System.getProperty("javax.net.ssl.trustStore"));
INSTANCE.setTrustStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
}
/**
* @return the keyStoreLocation currently configured.
*/
public String getKeyStoreLocation() {
return keyStoreLocation;
}
/**
* Sets the location on disk of the key store to use.
*
* @param keyStoreLocation
* the keyStoreLocation to use to create the key manager.
*/
public void setKeyStoreLocation(String keyStoreLocation) {
this.keyStoreLocation = keyStoreLocation;
}
/**
* @return the keyStorePassword
*/
public String getKeyStorePassword() {
return keyStorePassword;
}
/**
* @param keyStorePassword the keyStorePassword to set
*/
public void setKeyStorePassword(String keyStorePassword) {
this.keyStorePassword = keyStorePassword;
}
/**
* @return the trustStoreLocation
*/
public String getTrustStoreLocation() {
return trustStoreLocation;
}
/**
* @param trustStoreLocation the trustStoreLocation to set
*/
public void setTrustStoreLocation(String trustStoreLocation) {
this.trustStoreLocation = trustStoreLocation;
}
/**
* @return the trustStorePassword
*/
public String getTrustStorePassword() {
return trustStorePassword;
}
/**
* @param trustStorePassword the trustStorePassword to set
*/
public void setTrustStorePassword(String trustStorePassword) {
this.trustStorePassword = trustStorePassword;
}
/**
* @return the storeType
*/
public String getStoreType() {
return storeType;
}
/**
* @param storeType
* the format that the store files are encoded in.
*/
public void setStoreType(String storeType) {
this.storeType = storeType;
}
/**
* @return the enabledCipherSuites
*/
public String[] getEnabledCipherSuites() {
return enabledCipherSuites;
}
/**
* @param enabledCipherSuites the enabledCipherSuites to set
*/
public void setEnabledCipherSuites(String[] enabledCipherSuites) {
this.enabledCipherSuites = enabledCipherSuites;
}
/**
* @return the disabledCipherSuites
*/
public String[] getDisabledCipherSuites() {
return disabledCipherSuites;
}
/**
* @param disabledCipherSuites the disabledCipherSuites to set
*/
public void setDisabledCipherSuites(String[] disabledCipherSuites) {
this.disabledCipherSuites = disabledCipherSuites;
}
/**
* @return the enabledProtocols or null if the defaults should be used
*/
public String[] getEnabledProtocols() {
return enabledProtocols;
}
/**
* The protocols to be set as enabled.
*
* @param enabledProtocols the enabled protocols to set, or null if the defaults should be used.
*/
public void setEnabledProtocols(String[] enabledProtocols) {
this.enabledProtocols = enabledProtocols;
}
/**
*
* @return the protocols to disable or null if none should be
*/
public String[] getDisabledProtocols() {
return disabledProtocols;
}
/**
* The protocols to be disable.
*
* @param disabledProtocols the protocols to disable, or null if none should be.
*/
public void setDisabledProtocols(String[] disabledProtocols) {
this.disabledProtocols = disabledProtocols;
}
/**
* @return the context protocol to use
*/
public String getContextProtocol() {
return contextProtocol;
}
/**
* The protocol value to use when creating an SSLContext via
* SSLContext.getInstance(protocol).
*
* @param contextProtocol the context protocol to use.
*/
public void setContextProtocol(String contextProtocol) {
this.contextProtocol = contextProtocol;
}
/**
* @return the trustAll
*/
public boolean isTrustAll() {
return trustAll;
}
/**
* @param trustAll the trustAll to set
*/
public void setTrustAll(boolean trustAll) {
this.trustAll = trustAll;
}
/**
* @return the verifyHost
*/
public boolean isVerifyHost() {
return verifyHost;
}
/**
* @param verifyHost the verifyHost to set
*/
public void setVerifyHost(boolean verifyHost) {
this.verifyHost = verifyHost;
}
/**
* @return the key alias
*/
public String getKeyAlias() {
return keyAlias;
}
/**
* @param keyAlias the key alias to use
*/
public void setKeyAlias(String keyAlias) {
this.keyAlias = keyAlias;
}
public int getDefaultSslPort() {
return defaultSslPort;
}
public void setDefaultSslPort(int defaultSslPort) {
this.defaultSslPort = defaultSslPort;
}
@Override
public NettyTransportSslOptions clone() {
return copyOptions(new NettyTransportSslOptions());
}
protected NettyTransportSslOptions copyOptions(NettyTransportSslOptions copy) {
super.copyOptions(copy);
copy.setKeyStoreLocation(getKeyStoreLocation());
copy.setKeyStorePassword(getKeyStorePassword());
copy.setTrustStoreLocation(getTrustStoreLocation());
copy.setTrustStorePassword(getTrustStorePassword());
copy.setStoreType(getStoreType());
copy.setEnabledCipherSuites(getEnabledCipherSuites());
copy.setDisabledCipherSuites(getDisabledCipherSuites());
copy.setEnabledProtocols(getEnabledProtocols());
copy.setDisabledProtocols(getDisabledProtocols());
copy.setTrustAll(isTrustAll());
copy.setVerifyHost(isVerifyHost());
copy.setKeyAlias(getKeyAlias());
copy.setContextProtocol(getContextProtocol());
return copy;
}
}

View File

@ -0,0 +1,299 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
import io.netty.handler.ssl.SslHandler;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509ExtendedKeyManager;
import javax.net.ssl.X509TrustManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Static class that provides various utility methods used by Transport implementations.
*/
public class NettyTransportSupport {
private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class);
/**
* Creates a Netty SslHandler instance for use in Transports that require
* an SSL encoder / decoder.
*
* @param remote
* The URI of the remote peer that the SslHandler will be used against.
* @param options
* The SSL options object to build the SslHandler instance from.
*
* @return a new SslHandler that is configured from the given options.
*
* @throws Exception if an error occurs while creating the SslHandler instance.
*/
public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception {
return new SslHandler(createSslEngine(remote, createSslContext(options), options));
}
/**
* Create a new SSLContext using the options specific in the given TransportSslOptions
* instance.
*
* @param options
* the configured options used to create the SSLContext.
*
* @return a new SSLContext instance.
*
* @throws Exception if an error occurs while creating the context.
*/
public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception {
try {
String contextProtocol = options.getContextProtocol();
LOG.trace("Getting SSLContext instance using protocol: {}", contextProtocol);
SSLContext context = SSLContext.getInstance(contextProtocol);
KeyManager[] keyMgrs = loadKeyManagers(options);
TrustManager[] trustManagers = loadTrustManagers(options);
context.init(keyMgrs, trustManagers, new SecureRandom());
return context;
} catch (Exception e) {
LOG.error("Failed to create SSLContext: {}", e, e);
throw e;
}
}
/**
* Create a new SSLEngine instance in client mode from the given SSLContext and
* TransportSslOptions instances.
*
* @param context
* the SSLContext to use when creating the engine.
* @param options
* the TransportSslOptions to use to configure the new SSLEngine.
*
* @return a new SSLEngine instance in client mode.
*
* @throws Exception if an error occurs while creating the new SSLEngine.
*/
public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception {
return createSslEngine(null, context, options);
}
/**
* Create a new SSLEngine instance in client mode from the given SSLContext and
* TransportSslOptions instances.
*
* @param remote
* the URI of the remote peer that will be used to initialize the engine, may be null if none should.
* @param context
* the SSLContext to use when creating the engine.
* @param options
* the TransportSslOptions to use to configure the new SSLEngine.
*
* @return a new SSLEngine instance in client mode.
*
* @throws Exception if an error occurs while creating the new SSLEngine.
*/
public static SSLEngine createSslEngine(URI remote, SSLContext context, NettyTransportSslOptions options) throws Exception {
SSLEngine engine = null;
if(remote == null) {
engine = context.createSSLEngine();
} else {
engine = context.createSSLEngine(remote.getHost(), remote.getPort());
}
engine.setEnabledProtocols(buildEnabledProtocols(engine, options));
engine.setEnabledCipherSuites(buildEnabledCipherSuites(engine, options));
engine.setUseClientMode(true);
if (options.isVerifyHost()) {
SSLParameters sslParameters = engine.getSSLParameters();
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
engine.setSSLParameters(sslParameters);
}
return engine;
}
private static String[] buildEnabledProtocols(SSLEngine engine, NettyTransportSslOptions options) {
List<String> enabledProtocols = new ArrayList<String>();
if (options.getEnabledProtocols() != null) {
List<String> configuredProtocols = Arrays.asList(options.getEnabledProtocols());
LOG.trace("Configured protocols from transport options: {}", configuredProtocols);
enabledProtocols.addAll(configuredProtocols);
} else {
List<String> engineProtocols = Arrays.asList(engine.getEnabledProtocols());
LOG.trace("Default protocols from the SSLEngine: {}", engineProtocols);
enabledProtocols.addAll(engineProtocols);
}
String[] disabledProtocols = options.getDisabledProtocols();
if (disabledProtocols != null) {
List<String> disabled = Arrays.asList(disabledProtocols);
LOG.trace("Disabled protocols: {}", disabled);
enabledProtocols.removeAll(disabled);
}
LOG.trace("Enabled protocols: {}", enabledProtocols);
return enabledProtocols.toArray(new String[0]);
}
private static String[] buildEnabledCipherSuites(SSLEngine engine, NettyTransportSslOptions options) {
List<String> enabledCipherSuites = new ArrayList<String>();
if (options.getEnabledCipherSuites() != null) {
List<String> configuredCipherSuites = Arrays.asList(options.getEnabledCipherSuites());
LOG.trace("Configured cipher suites from transport options: {}", configuredCipherSuites);
enabledCipherSuites.addAll(configuredCipherSuites);
} else {
List<String> engineCipherSuites = Arrays.asList(engine.getEnabledCipherSuites());
LOG.trace("Default cipher suites from the SSLEngine: {}", engineCipherSuites);
enabledCipherSuites.addAll(engineCipherSuites);
}
String[] disabledCipherSuites = options.getDisabledCipherSuites();
if (disabledCipherSuites != null) {
List<String> disabled = Arrays.asList(disabledCipherSuites);
LOG.trace("Disabled cipher suites: {}", disabled);
enabledCipherSuites.removeAll(disabled);
}
LOG.trace("Enabled cipher suites: {}", enabledCipherSuites);
return enabledCipherSuites.toArray(new String[0]);
}
private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception {
if (options.isTrustAll()) {
return new TrustManager[] { createTrustAllTrustManager() };
}
if (options.getTrustStoreLocation() == null) {
return null;
}
TrustManagerFactory fact = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
String storeLocation = options.getTrustStoreLocation();
String storePassword = options.getTrustStorePassword();
String storeType = options.getStoreType();
LOG.trace("Attempt to load TrustStore from location {} of type {}", storeLocation, storeType);
KeyStore trustStore = loadStore(storeLocation, storePassword, storeType);
fact.init(trustStore);
return fact.getTrustManagers();
}
private static KeyManager[] loadKeyManagers(NettyTransportSslOptions options) throws Exception {
if (options.getKeyStoreLocation() == null) {
return null;
}
KeyManagerFactory fact = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
String storeLocation = options.getKeyStoreLocation();
String storePassword = options.getKeyStorePassword();
String storeType = options.getStoreType();
String alias = options.getKeyAlias();
LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType);
KeyStore keyStore = loadStore(storeLocation, storePassword, storeType);
fact.init(keyStore, storePassword != null ? storePassword.toCharArray() : null);
if (alias == null) {
return fact.getKeyManagers();
} else {
validateAlias(keyStore, alias);
return wrapKeyManagers(alias, fact.getKeyManagers());
}
}
private static KeyManager[] wrapKeyManagers(String alias, KeyManager[] origKeyManagers) {
KeyManager[] keyManagers = new KeyManager[origKeyManagers.length];
for (int i = 0; i < origKeyManagers.length; i++) {
KeyManager km = origKeyManagers[i];
if (km instanceof X509ExtendedKeyManager) {
km = new X509AliasKeyManager(alias, (X509ExtendedKeyManager) km);
}
keyManagers[i] = km;
}
return keyManagers;
}
private static void validateAlias(KeyStore store, String alias) throws IllegalArgumentException, KeyStoreException {
if (!store.containsAlias(alias)) {
throw new IllegalArgumentException("The alias '" + alias + "' doesn't exist in the key store");
}
if (!store.isKeyEntry(alias)) {
throw new IllegalArgumentException("The alias '" + alias + "' in the keystore doesn't represent a key entry");
}
}
private static KeyStore loadStore(String storePath, final String password, String storeType) throws Exception {
KeyStore store = KeyStore.getInstance(storeType);
try (InputStream in = new FileInputStream(new File(storePath));) {
store.load(in, password != null ? password.toCharArray() : null);
}
return store;
}
private static TrustManager createTrustAllTrustManager() {
return new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
};
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
/**
* A {@link ByteBufAllocator} which is partial pooled. Which means only direct
* {@link ByteBuf}s are pooled. The rest is unpooled.
*
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/
public class PartialPooledByteBufAllocator implements ByteBufAllocator {
private static final ByteBufAllocator POOLED = new PooledByteBufAllocator(false);
private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false);
public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator();
private PartialPooledByteBufAllocator() {
}
@Override
public ByteBuf buffer() {
return UNPOOLED.heapBuffer();
}
@Override
public ByteBuf buffer(int initialCapacity) {
return UNPOOLED.heapBuffer(initialCapacity);
}
@Override
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf ioBuffer() {
return UNPOOLED.heapBuffer();
}
@Override
public ByteBuf ioBuffer(int initialCapacity) {
return UNPOOLED.heapBuffer(initialCapacity);
}
@Override
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf heapBuffer() {
return UNPOOLED.heapBuffer();
}
@Override
public ByteBuf heapBuffer(int initialCapacity) {
return UNPOOLED.heapBuffer(initialCapacity);
}
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf directBuffer() {
return POOLED.directBuffer();
}
@Override
public ByteBuf directBuffer(int initialCapacity) {
return POOLED.directBuffer(initialCapacity);
}
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
return POOLED.directBuffer(initialCapacity, maxCapacity);
}
@Override
public CompositeByteBuf compositeBuffer() {
return UNPOOLED.compositeHeapBuffer();
}
@Override
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
return UNPOOLED.compositeHeapBuffer(maxNumComponents);
}
@Override
public CompositeByteBuf compositeHeapBuffer() {
return UNPOOLED.compositeHeapBuffer();
}
@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
return UNPOOLED.compositeHeapBuffer(maxNumComponents);
}
@Override
public CompositeByteBuf compositeDirectBuffer() {
return POOLED.compositeDirectBuffer();
}
@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
return POOLED.compositeDirectBuffer();
}
@Override
public boolean isDirectBufferPooled() {
return true;
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.activemq.transport.amqp.client.transport;
import java.net.Socket;
import java.security.Principal;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.X509ExtendedKeyManager;
/**
* An X509ExtendedKeyManager wrapper which always chooses and only
* returns the given alias, and defers retrieval to the delegate
* key manager.
*/
public class X509AliasKeyManager extends X509ExtendedKeyManager {
private X509ExtendedKeyManager delegate;
private String alias;
public X509AliasKeyManager(String alias, X509ExtendedKeyManager delegate) throws IllegalArgumentException {
if (alias == null) {
throw new IllegalArgumentException("The given key alias must not be null.");
}
this.alias = alias;
this.delegate = delegate;
}
@Override
public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) {
return alias;
}
@Override
public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) {
return alias;
}
@Override
public X509Certificate[] getCertificateChain(String alias) {
return delegate.getCertificateChain(alias);
}
@Override
public String[] getClientAliases(String keyType, Principal[] issuers) {
return new String[] { alias };
}
@Override
public PrivateKey getPrivateKey(String alias) {
return delegate.getPrivateKey(alias);
}
@Override
public String[] getServerAliases(String keyType, Principal[] issuers) {
return new String[] { alias };
}
@Override
public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) {
return alias;
}
@Override
public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) {
return alias;
}
}

View File

@ -20,8 +20,6 @@ import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.util.IOExceptionSupport;
/**
* Asynchronous Client Future class.
*/

View File

@ -1,389 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.transport.tcp.TcpBufferedInputStream;
import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.InetAddressUtil;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Simple TCP based transport used by the client.
*/
public class ClientTcpTransport implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ClientTcpTransport.class);
public interface TransportListener {
/**
* Called when new incoming data has become available.
*
* @param incoming
* the next incoming packet of data.
*/
void onData(Buffer incoming);
/**
* Called if the connection state becomes closed.
*/
void onTransportClosed();
/**
* Called when an error occurs during normal Transport operations.
*
* @param cause
* the error that triggered this event.
*/
void onTransportError(Throwable cause);
}
private final URI remoteLocation;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicReference<Throwable> connectionError = new AtomicReference<Throwable>();
private final Socket socket;
private DataOutputStream dataOut;
private DataInputStream dataIn;
private Thread runner;
private TransportListener listener;
private int socketBufferSize = 64 * 1024;
private int soTimeout = 0;
private int soLinger = Integer.MIN_VALUE;
private Boolean keepAlive;
private Boolean tcpNoDelay = true;
private boolean useLocalHost = false;
private int ioBufferSize = 8 * 1024;
/**
* Create a new instance of the transport.
*
* @param listener
* The TransportListener that will receive data from this Transport instance.
* @param remoteLocation
* The remote location where this transport should connection to.
*/
public ClientTcpTransport(URI remoteLocation) {
this.remoteLocation = remoteLocation;
Socket temp = null;
try {
temp = createSocketFactory().createSocket();
} catch (IOException e) {
connectionError.set(e);
}
this.socket = temp;
}
public void connect() throws IOException {
if (connectionError.get() != null) {
throw IOExceptionSupport.create(connectionError.get());
}
if (listener == null) {
throw new IllegalStateException("Cannot connect until a listener has been set.");
}
if (socket == null) {
throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
}
InetSocketAddress remoteAddress = null;
if (remoteLocation != null) {
String host = resolveHostName(remoteLocation.getHost());
remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
}
socket.connect(remoteAddress);
connected.set(true);
initialiseSocket(socket);
initializeStreams();
runner = new Thread(null, this, "ClientTcpTransport: " + toString());
runner.setDaemon(false);
runner.start();
}
public void close() {
if (closed.compareAndSet(false, true)) {
if (socket == null) {
return;
}
// Closing the streams flush the sockets before closing.. if the socket
// is hung.. then this hangs the close so we perform an asynchronous close
// by default which will timeout if the close doesn't happen after a delay.
final CountDownLatch latch = new CountDownLatch(1);
final ExecutorService closer = Executors.newSingleThreadExecutor();
closer.execute(new Runnable() {
@Override
public void run() {
LOG.trace("Closing socket {}", socket);
try {
socket.close();
LOG.debug("Closed socket {}", socket);
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
}
} finally {
latch.countDown();
}
}
});
try {
latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
closer.shutdownNow();
}
}
}
public void send(ByteBuffer output) throws IOException {
checkConnected();
LOG.trace("Client Transport sending packet of size: {}", output.remaining());
WritableByteChannel channel = Channels.newChannel(dataOut);
channel.write(output);
dataOut.flush();
}
public void send(Buffer output) throws IOException {
checkConnected();
send(output.toByteBuffer());
}
public URI getRemoteURI() {
return this.remoteLocation;
}
public boolean isConnected() {
return this.connected.get();
}
public TransportListener getTransportListener() {
return this.listener;
}
public void setTransportListener(TransportListener listener) {
if (listener == null) {
throw new IllegalArgumentException("Listener cannot be set to null");
}
this.listener = listener;
}
public int getSocketBufferSize() {
return socketBufferSize;
}
public void setSocketBufferSize(int socketBufferSize) {
this.socketBufferSize = socketBufferSize;
}
public int getSoTimeout() {
return soTimeout;
}
public void setSoTimeout(int soTimeout) {
this.soTimeout = soTimeout;
}
public boolean isTcpNoDelay() {
return tcpNoDelay;
}
public void setTcpNoDelay(Boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
public int getSoLinger() {
return soLinger;
}
public void setSoLinger(int soLinger) {
this.soLinger = soLinger;
}
public boolean isKeepAlive() {
return keepAlive;
}
public void setKeepAlive(Boolean keepAlive) {
this.keepAlive = keepAlive;
}
public boolean isUseLocalHost() {
return useLocalHost;
}
public void setUseLocalHost(boolean useLocalHost) {
this.useLocalHost = useLocalHost;
}
public int getIoBufferSize() {
return ioBufferSize;
}
public void setIoBufferSize(int ioBufferSize) {
this.ioBufferSize = ioBufferSize;
}
//---------- Transport internal implementation ---------------------------//
@Override
public void run() {
LOG.trace("TCP consumer thread for {} starting", this);
try {
while (isConnected()) {
doRun();
}
} catch (IOException e) {
connectionError.set(e);
onException(e);
} catch (Throwable e) {
IOException ioe = new IOException("Unexpected error occured: " + e);
connectionError.set(ioe);
ioe.initCause(e);
onException(ioe);
}
}
protected void doRun() throws IOException {
int size = dataIn.available();
if (size <= 0) {
try {
TimeUnit.NANOSECONDS.sleep(1);
} catch (InterruptedException e) {
}
return;
}
byte[] buffer = new byte[size];
dataIn.readFully(buffer);
Buffer incoming = new Buffer(buffer);
listener.onData(incoming);
}
/**
* Passes any IO exceptions into the transport listener
*/
public void onException(IOException e) {
if (listener != null) {
try {
listener.onTransportError(e);
} catch (RuntimeException e2) {
LOG.debug("Unexpected runtime exception: {}", e2.getMessage(), e2);
}
}
}
protected SocketFactory createSocketFactory() throws IOException {
if (remoteLocation.getScheme().equalsIgnoreCase("ssl")) {
return SSLSocketFactory.getDefault();
} else {
return SocketFactory.getDefault();
}
}
protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
try {
sock.setReceiveBufferSize(socketBufferSize);
sock.setSendBufferSize(socketBufferSize);
} catch (SocketException se) {
LOG.warn("Cannot set socket buffer size = {}", socketBufferSize);
LOG.debug("Cannot set socket buffer size. Reason: {}. This exception is ignored.", se.getMessage(), se);
}
sock.setSoTimeout(soTimeout);
if (keepAlive != null) {
sock.setKeepAlive(keepAlive.booleanValue());
}
if (soLinger > -1) {
sock.setSoLinger(true, soLinger);
} else if (soLinger == -1) {
sock.setSoLinger(false, 0);
}
if (tcpNoDelay != null) {
sock.setTcpNoDelay(tcpNoDelay.booleanValue());
}
}
protected void initializeStreams() throws IOException {
try {
TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
this.dataIn = new DataInputStream(buffIn);
TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
this.dataOut = new DataOutputStream(outputStream);
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
}
protected String resolveHostName(String host) throws UnknownHostException {
if (isUseLocalHost()) {
String localName = InetAddressUtil.getLocalHostName();
if (localName != null && localName.equals(host)) {
return "localhost";
}
}
return host;
}
private void checkConnected() throws IOException {
if (!connected.get()) {
throw new IOException("Cannot send to a non-connected transport.");
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.io.IOException;
/**
* Used to make throwing IOException instances easier.
*/
public class IOExceptionSupport {
/**
* Checks the given cause to determine if it's already an IOException type and
* if not creates a new IOException to wrap it.
*
* @param cause
* The initiating exception that should be cast or wrapped.
*
* @return an IOException instance.
*/
public static IOException create(Throwable cause) {
if (cause instanceof IOException) {
return (IOException) cause;
}
String message = cause.getMessage();
if (message == null || message.length() == 0) {
message = cause.toString();
}
return new IOException(message, cause);
}
}

View File

@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Generator for Globally unique Strings.
*/
public class IdGenerator {
private static final Logger LOG = LoggerFactory.getLogger(IdGenerator.class);
private static final String UNIQUE_STUB;
private static int instanceCount;
private static String hostName;
private String seed;
private final AtomicLong sequence = new AtomicLong(1);
private int length;
public static final String PROPERTY_IDGENERATOR_PORT = "activemq.idgenerator.port";
static {
String stub = "";
boolean canAccessSystemProps = true;
try {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPropertiesAccess();
}
} catch (SecurityException se) {
canAccessSystemProps = false;
}
if (canAccessSystemProps) {
int idGeneratorPort = 0;
ServerSocket ss = null;
try {
idGeneratorPort = Integer.parseInt(System.getProperty(PROPERTY_IDGENERATOR_PORT, "0"));
LOG.trace("Using port {}", idGeneratorPort);
hostName = getLocalHostName();
ss = new ServerSocket(idGeneratorPort);
stub = "-" + ss.getLocalPort() + "-" + System.currentTimeMillis() + "-";
Thread.sleep(100);
} catch (Exception e) {
if (LOG.isTraceEnabled()) {
LOG.trace("could not generate unique stub by using DNS and binding to local port", e);
} else {
LOG.warn("could not generate unique stub by using DNS and binding to local port: {} {}", e.getClass().getCanonicalName(), e.getMessage());
}
// Restore interrupted state so higher level code can deal with it.
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
} finally {
if (ss != null) {
try {
ss.close();
} catch (IOException ioe) {
if (LOG.isTraceEnabled()) {
LOG.trace("Closing the server socket failed", ioe);
} else {
LOG.warn("Closing the server socket failed" + " due " + ioe.getMessage());
}
}
}
}
}
if (hostName == null) {
hostName = "localhost";
}
hostName = sanitizeHostName(hostName);
if (stub.length() == 0) {
stub = "-1-" + System.currentTimeMillis() + "-";
}
UNIQUE_STUB = stub;
}
/**
* Construct an IdGenerator
*
* @param prefix
* The prefix value that is applied to all generated IDs.
*/
public IdGenerator(String prefix) {
synchronized (UNIQUE_STUB) {
this.seed = prefix + UNIQUE_STUB + (instanceCount++) + ":";
this.length = this.seed.length() + ("" + Long.MAX_VALUE).length();
}
}
public IdGenerator() {
this("ID:" + hostName);
}
/**
* As we have to find the host name as a side-affect of generating a unique stub, we allow
* it's easy retrieval here
*
* @return the local host name
*/
public static String getHostName() {
return hostName;
}
/**
* Generate a unique id
*
* @return a unique id
*/
public synchronized String generateId() {
StringBuilder sb = new StringBuilder(length);
sb.append(seed);
sb.append(sequence.getAndIncrement());
return sb.toString();
}
public static String sanitizeHostName(String hostName) {
boolean changed = false;
StringBuilder sb = new StringBuilder();
for (char ch : hostName.toCharArray()) {
// only include ASCII chars
if (ch < 127) {
sb.append(ch);
} else {
changed = true;
}
}
if (changed) {
String newHost = sb.toString();
LOG.info("Sanitized hostname from: {} to: {}", hostName, newHost);
return newHost;
} else {
return hostName;
}
}
/**
* Generate a unique ID - that is friendly for a URL or file system
*
* @return a unique id
*/
public String generateSanitizedId() {
String result = generateId();
result = result.replace(':', '-');
result = result.replace('_', '-');
result = result.replace('.', '-');
return result;
}
/**
* From a generated id - return the seed (i.e. minus the count)
*
* @param id
* the generated identifier
* @return the seed
*/
public static String getSeedFromId(String id) {
String result = id;
if (id != null) {
int index = id.lastIndexOf(':');
if (index > 0 && (index + 1) < id.length()) {
result = id.substring(0, index);
}
}
return result;
}
/**
* From a generated id - return the generator count
*
* @param id
* The ID that will be parsed for a sequence number.
*
* @return the sequence value parsed from the given ID.
*/
public static long getSequenceFromId(String id) {
long result = -1;
if (id != null) {
int index = id.lastIndexOf(':');
if (index > 0 && (index + 1) < id.length()) {
String numStr = id.substring(index + 1, id.length());
result = Long.parseLong(numStr);
}
}
return result;
}
/**
* Does a proper compare on the Id's
*
* @param id1 the lhs of the comparison.
* @param id2 the rhs of the comparison.
*
* @return 0 if equal else a positive if {@literal id1 > id2} ...
*/
public static int compare(String id1, String id2) {
int result = -1;
String seed1 = IdGenerator.getSeedFromId(id1);
String seed2 = IdGenerator.getSeedFromId(id2);
if (seed1 != null && seed2 != null) {
result = seed1.compareTo(seed2);
if (result == 0) {
long count1 = IdGenerator.getSequenceFromId(id1);
long count2 = IdGenerator.getSequenceFromId(id2);
result = (int) (count1 - count2);
}
}
return result;
}
/**
* When using the {@link java.net.InetAddress#getHostName()} method in an
* environment where neither a proper DNS lookup nor an <tt>/etc/hosts</tt>
* entry exists for a given host, the following exception will be thrown:
* <code>
* java.net.UnknownHostException: &lt;hostname&gt;: &lt;hostname&gt;
* at java.net.InetAddress.getLocalHost(InetAddress.java:1425)
* ...
* </code>
* Instead of just throwing an UnknownHostException and giving up, this
* method grabs a suitable hostname from the exception and prevents the
* exception from being thrown. If a suitable hostname cannot be acquired
* from the exception, only then is the <tt>UnknownHostException</tt> thrown.
*
* @return The hostname
*
* @throws UnknownHostException if the given host cannot be looked up.
*
* @see java.net.InetAddress#getLocalHost()
* @see java.net.InetAddress#getHostName()
*/
protected static String getLocalHostName() throws UnknownHostException {
try {
return (InetAddress.getLocalHost()).getHostName();
} catch (UnknownHostException uhe) {
String host = uhe.getMessage(); // host = "hostname: hostname"
if (host != null) {
int colon = host.indexOf(':');
if (colon > 0) {
return host.substring(0, colon);
}
}
throw uhe;
}
}
}

View File

@ -0,0 +1,589 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.beans.BeanInfo;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import javax.net.ssl.SSLContext;
/**
* Utilities for properties
*/
public class PropertyUtil {
/**
* Creates a URI from the original URI and the given parameters.
*
* @param originalURI
* The URI whose current parameters are removed and replaced with the given remainder value.
* @param params
* The URI params that should be used to replace the current ones in the target.
*
* @return a new URI that matches the original one but has its query options replaced with
* the given ones.
*
* @throws URISyntaxException if the given URI is invalid.
*/
public static URI replaceQuery(URI originalURI, Map<String, String> params) throws URISyntaxException {
String s = createQueryString(params);
if (s.length() == 0) {
s = null;
}
return replaceQuery(originalURI, s);
}
/**
* Creates a URI with the given query, removing an previous query value from the given URI.
*
* @param uri
* The source URI whose existing query is replaced with the newly supplied one.
* @param query
* The new URI query string that should be appended to the given URI.
*
* @return a new URI that is a combination of the original URI and the given query string.
*
* @throws URISyntaxException if the given URI is invalid.
*/
public static URI replaceQuery(URI uri, String query) throws URISyntaxException {
String schemeSpecificPart = uri.getRawSchemeSpecificPart();
// strip existing query if any
int questionMark = schemeSpecificPart.lastIndexOf("?");
// make sure question mark is not within parentheses
if (questionMark < schemeSpecificPart.lastIndexOf(")")) {
questionMark = -1;
}
if (questionMark > 0) {
schemeSpecificPart = schemeSpecificPart.substring(0, questionMark);
}
if (query != null && query.length() > 0) {
schemeSpecificPart += "?" + query;
}
return new URI(uri.getScheme(), schemeSpecificPart, uri.getFragment());
}
/**
* Creates a URI with the given query, removing an previous query value from the given URI.
*
* @param uri
* The source URI whose existing query is replaced with the newly supplied one.
*
* @return a new URI that is a combination of the original URI and the given query string.
*
* @throws URISyntaxException if the given URI is invalid.
*/
public static URI eraseQuery(URI uri) throws URISyntaxException {
return replaceQuery(uri, (String) null);
}
/**
* Given a key / value mapping, create and return a URI formatted query string that is valid
* and can be appended to a URI.
*
* @param options
* The Mapping that will create the new Query string.
*
* @return a URI formatted query string.
*
* @throws URISyntaxException if the given URI is invalid.
*/
public static String createQueryString(Map<String, ? extends Object> options) throws URISyntaxException {
try {
if (options.size() > 0) {
StringBuffer rc = new StringBuffer();
boolean first = true;
for (Entry<String, ? extends Object> entry : options.entrySet()) {
if (first) {
first = false;
} else {
rc.append("&");
}
rc.append(URLEncoder.encode(entry.getKey(), "UTF-8"));
rc.append("=");
rc.append(URLEncoder.encode((String) entry.getValue(), "UTF-8"));
}
return rc.toString();
} else {
return "";
}
} catch (UnsupportedEncodingException e) {
throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
}
}
/**
* Get properties from a URI and return them in a new {@code Map<String, String>} instance.
*
* If the URI is null or the query string of the URI is null an empty Map is returned.
*
* @param uri
* the URI whose parameters are to be parsed.
*
* @return <Code>Map</Code> of properties
*
* @throws Exception if an error occurs while parsing the query options.
*/
public static Map<String, String> parseParameters(URI uri) throws Exception {
if (uri == null || uri.getQuery() == null) {
return Collections.emptyMap();
}
return parseQuery(stripPrefix(uri.getQuery(), "?"));
}
/**
* Parse properties from a named resource -eg. a URI or a simple name e.g.
* {@literal foo?name="fred"&size=2}
*
* @param uri
* the URI whose parameters are to be parsed.
*
* @return <Code>Map</Code> of properties
*
* @throws Exception if an error occurs while parsing the query options.
*/
public static Map<String, String> parseParameters(String uri) throws Exception {
if (uri == null) {
return Collections.emptyMap();
}
return parseQuery(stripUpto(uri, '?'));
}
/**
* Get properties from a URI query string.
*
* @param queryString
* the string value returned from a call to the URI class getQuery method.
*
* @return <Code>Map</Code> of properties from the parsed string.
*
* @throws Exception if an error occurs while parsing the query options.
*/
public static Map<String, String> parseQuery(String queryString) throws Exception {
if (queryString != null && !queryString.isEmpty()) {
Map<String, String> rc = new HashMap<String, String>();
String[] parameters = queryString.split("&");
for (int i = 0; i < parameters.length; i++) {
int p = parameters[i].indexOf("=");
if (p >= 0) {
String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8");
String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8");
rc.put(name, value);
} else {
rc.put(parameters[i], null);
}
}
return rc;
}
return Collections.emptyMap();
}
/**
* Given a map of properties, filter out only those prefixed with the given value, the
* values filtered are returned in a new Map instance.
*
* @param properties
* The map of properties to filter.
* @param optionPrefix
* The prefix value to use when filtering.
*
* @return a filter map with only values that match the given prefix.
*/
public static Map<String, String> filterProperties(Map<String, String> properties, String optionPrefix) {
if (properties == null) {
throw new IllegalArgumentException("The given properties object was null.");
}
HashMap<String, String> rc = new HashMap<String, String>(properties.size());
for (Iterator<Entry<String, String>> iter = properties.entrySet().iterator(); iter.hasNext();) {
Entry<String, String> entry = iter.next();
if (entry.getKey().startsWith(optionPrefix)) {
String name = entry.getKey().substring(optionPrefix.length());
rc.put(name, entry.getValue());
iter.remove();
}
}
return rc;
}
/**
* Enumerate the properties of the target object and add them as additional entries
* to the query string of the given string URI.
*
* @param uri
* The string URI value to append the object properties to.
* @param bean
* The Object whose properties will be added to the target URI.
*
* @return a new String value that is the original URI with the added bean properties.
*
* @throws Exception if an error occurs while enumerating the bean properties.
*/
public static String addPropertiesToURIFromBean(String uri, Object bean) throws Exception {
Map<String, String> properties = PropertyUtil.getProperties(bean);
return PropertyUtil.addPropertiesToURI(uri, properties);
}
/**
* Enumerate the properties of the target object and add them as additional entries
* to the query string of the given URI.
*
* @param uri
* The URI value to append the object properties to.
* @param properties
* The Object whose properties will be added to the target URI.
*
* @return a new String value that is the original URI with the added bean properties.
*
* @throws Exception if an error occurs while enumerating the bean properties.
*/
public static String addPropertiesToURI(URI uri, Map<String, String> properties) throws Exception {
return addPropertiesToURI(uri.toString(), properties);
}
/**
* Append the given properties to the query portion of the given URI.
*
* @param uri
* The string URI value to append the object properties to.
* @param properties
* The properties that will be added to the target URI.
*
* @return a new String value that is the original URI with the added properties.
*
* @throws Exception if an error occurs while building the new URI string.
*/
public static String addPropertiesToURI(String uri, Map<String, String> properties) throws Exception {
String result = uri;
if (uri != null && properties != null) {
StringBuilder base = new StringBuilder(stripBefore(uri, '?'));
Map<String, String> map = parseParameters(uri);
if (!map.isEmpty()) {
map.putAll(properties);
} else {
map = properties;
}
if (!map.isEmpty()) {
base.append('?');
boolean first = true;
for (Map.Entry<String, String> entry : map.entrySet()) {
if (!first) {
base.append('&');
}
first = false;
base.append(entry.getKey()).append("=").append(entry.getValue());
}
result = base.toString();
}
}
return result;
}
/**
* Set properties on an object using the provided map. The return value
* indicates if all properties from the given map were set on the target object.
*
* @param target
* the object whose properties are to be set from the map options.
* @param properties
* the properties that should be applied to the given object.
*
* @return true if all values in the properties map were applied to the target object.
*/
public static Map<String, String> setProperties(Object target, Map<String, String> properties) {
if (target == null) {
throw new IllegalArgumentException("target object cannot be null");
}
if (properties == null) {
throw new IllegalArgumentException("Given Properties object cannot be null");
}
Map<String, String> unmatched = new HashMap<String, String>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (!setProperty(target, entry.getKey(), entry.getValue())) {
unmatched.put(entry.getKey(), entry.getValue());
}
}
return Collections.unmodifiableMap(unmatched);
}
//TODO: common impl for above and below methods.
/**
* Set properties on an object using the provided Properties object. The return value
* indicates if all properties from the given map were set on the target object.
*
* @param target
* the object whose properties are to be set from the map options.
* @param properties
* the properties that should be applied to the given object.
*
* @return an unmodifiable map with any values that could not be applied to the target.
*/
public static Map<String, Object> setProperties(Object target, Properties properties) {
if (target == null) {
throw new IllegalArgumentException("target object cannot be null");
}
if (properties == null) {
throw new IllegalArgumentException("Given Properties object cannot be null");
}
Map<String, Object> unmatched = new HashMap<String, Object>();
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
if (!setProperty(target, (String) entry.getKey(), entry.getValue())) {
unmatched.put((String) entry.getKey(), entry.getValue());
}
}
return Collections.<String, Object>unmodifiableMap(unmatched);
}
/**
* Get properties from an object using reflection. If the passed object is null an
* empty <code>Map</code> is returned.
*
* @param object
* the Object whose properties are to be extracted.
*
* @return <Code>Map</Code> of properties extracted from the given object.
*
* @throws Exception if an error occurs while examining the object's properties.
*/
public static Map<String, String> getProperties(Object object) throws Exception {
if (object == null) {
return Collections.emptyMap();
}
Map<String, String> properties = new LinkedHashMap<String, String>();
BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass());
Object[] NULL_ARG = {};
PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
if (propertyDescriptors != null) {
for (int i = 0; i < propertyDescriptors.length; i++) {
PropertyDescriptor pd = propertyDescriptors[i];
if (pd.getReadMethod() != null && !pd.getName().equals("class") && !pd.getName().equals("properties") && !pd.getName().equals("reference")) {
Object value = pd.getReadMethod().invoke(object, NULL_ARG);
if (value != null) {
if (value instanceof Boolean || value instanceof Number || value instanceof String || value instanceof URI || value instanceof URL) {
properties.put(pd.getName(), ("" + value));
} else if (value instanceof SSLContext) {
// ignore this one..
} else {
Map<String, String> inner = getProperties(value);
for (Map.Entry<String, String> entry : inner.entrySet()) {
properties.put(pd.getName() + "." + entry.getKey(), entry.getValue());
}
}
}
}
}
}
return properties;
}
/**
* Find a specific property getter in a given object based on a property name.
*
* @param object
* the object to search.
* @param name
* the property name to search for.
*
* @return the result of invoking the specific property get method.
*
* @throws Exception if an error occurs while searching the object's bean info.
*/
public static Object getProperty(Object object, String name) throws Exception {
BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass());
PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
if (propertyDescriptors != null) {
for (int i = 0; i < propertyDescriptors.length; i++) {
PropertyDescriptor pd = propertyDescriptors[i];
if (pd.getReadMethod() != null && pd.getName().equals(name)) {
return pd.getReadMethod().invoke(object);
}
}
}
return null;
}
/**
* Set a property named property on a given Object.
* <p>
* The object is searched for an set method that would match the given named
* property and if one is found. If necessary an attempt will be made to convert
* the new value to an acceptable type.
*
* @param target
* The object whose property is to be set.
* @param name
* The name of the property to set.
* @param value
* The new value to set for the named property.
*
* @return true if the property was able to be set on the target object.
*/
public static boolean setProperty(Object target, String name, Object value) {
try {
int dotPos = name.indexOf(".");
while (dotPos >= 0) {
String getterName = name.substring(0, dotPos);
target = getProperty(target, getterName);
name = name.substring(dotPos + 1);
dotPos = name.indexOf(".");
}
Class<? extends Object> clazz = target.getClass();
Method setter = findSetterMethod(clazz, name);
if (setter == null) {
return false;
}
// If the type is null or it matches the needed type, just use the
// value directly
if (value == null || value.getClass() == setter.getParameterTypes()[0]) {
setter.invoke(target, new Object[] { value });
} else {
setter.invoke(target, new Object[] { convert(value, setter.getParameterTypes()[0]) });
}
return true;
} catch (Throwable ignore) {
return false;
}
}
/**
* Return a String minus the given prefix. If the string does not start
* with the given prefix the original string value is returned.
*
* @param value
* The String whose prefix is to be removed.
* @param prefix
* The prefix string to remove from the target string.
*
* @return stripped version of the original input string.
*/
public static String stripPrefix(String value, String prefix) {
if (value != null && prefix != null && value.startsWith(prefix)) {
return value.substring(prefix.length());
}
return value;
}
/**
* Return a portion of a String value by looking beyond the given
* character.
*
* @param value
* The string value to split
* @param c
* The character that marks the split point.
*
* @return the sub-string value starting beyond the given character.
*/
public static String stripUpto(String value, char c) {
String result = null;
if (value != null) {
int index = value.indexOf(c);
if (index > 0) {
result = value.substring(index + 1);
}
}
return result;
}
/**
* Return a String up to and including character
*
* @param value
* The string value to split
* @param c
* The character that marks the start of split point.
*
* @return the sub-string value starting from the given character.
*/
public static String stripBefore(String value, char c) {
String result = value;
if (value != null) {
int index = value.indexOf(c);
if (index > 0) {
result = value.substring(0, index);
}
}
return result;
}
private static Method findSetterMethod(Class<? extends Object> clazz, String name) {
// Build the method name.
name = "set" + name.substring(0, 1).toUpperCase() + name.substring(1);
Method[] methods = clazz.getMethods();
for (int i = 0; i < methods.length; i++) {
Method method = methods[i];
Class<? extends Object> params[] = method.getParameterTypes();
if (method.getName().equals(name) && params.length == 1) {
return method;
}
}
return null;
}
private static Object convert(Object value, Class<?> type) throws Exception {
if (value == null) {
if (boolean.class.isAssignableFrom(type)) {
return Boolean.FALSE;
}
return null;
}
if (type.isAssignableFrom(value.getClass())) {
return type.cast(value);
}
// special for String[] as we do not want to use a PropertyEditor for that
if (type.isAssignableFrom(String[].class)) {
return StringArrayConverter.convertToStringArray(value);
}
if (type == URI.class) {
return new URI(value.toString());
}
return TypeConversionSupport.convert(value, type);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
/**
* Class for converting to/from String[] to be used instead of a
* {@link java.beans.PropertyEditor} which otherwise causes memory leaks as the
* JDK {@link java.beans.PropertyEditorManager} is a static class and has strong
* references to classes, causing problems in hot-deployment environments.
*/
public class StringArrayConverter {
public static String[] convertToStringArray(Object value) {
if (value == null) {
return null;
}
String text = value.toString();
if (text == null || text.isEmpty()) {
return null;
}
StringTokenizer stok = new StringTokenizer(text, ",");
final List<String> list = new ArrayList<String>();
while (stok.hasMoreTokens()) {
list.add(stok.nextToken());
}
String[] array = list.toArray(new String[list.size()]);
return array;
}
public static String convertToString(String[] value) {
if (value == null || value.length == 0) {
return null;
}
StringBuffer result = new StringBuffer(String.valueOf(value[0]));
for (int i = 1; i < value.length; i++) {
result.append(",").append(value[i]);
}
return result.toString();
}
}

View File

@ -0,0 +1,209 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.util.Date;
import java.util.HashMap;
public final class TypeConversionSupport {
static class ConversionKey {
final Class<?> from;
final Class<?> to;
final int hashCode;
public ConversionKey(Class<?> from, Class<?> to) {
this.from = from;
this.to = to;
this.hashCode = from.hashCode() ^ (to.hashCode() << 1);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || o.getClass() != this.getClass()) {
return false;
}
ConversionKey x = (ConversionKey) o;
return x.from == from && x.to == to;
}
@Override
public int hashCode() {
return hashCode;
}
}
interface Converter {
Object convert(Object value);
}
private static final HashMap<ConversionKey, Converter> CONVERSION_MAP = new HashMap<ConversionKey, Converter>();
static {
Converter toStringConverter = new Converter() {
@Override
public Object convert(Object value) {
return value.toString();
}
};
CONVERSION_MAP.put(new ConversionKey(Boolean.class, String.class), toStringConverter);
CONVERSION_MAP.put(new ConversionKey(Byte.class, String.class), toStringConverter);
CONVERSION_MAP.put(new ConversionKey(Short.class, String.class), toStringConverter);
CONVERSION_MAP.put(new ConversionKey(Integer.class, String.class), toStringConverter);
CONVERSION_MAP.put(new ConversionKey(Long.class, String.class), toStringConverter);
CONVERSION_MAP.put(new ConversionKey(Float.class, String.class), toStringConverter);
CONVERSION_MAP.put(new ConversionKey(Double.class, String.class), toStringConverter);
CONVERSION_MAP.put(new ConversionKey(String.class, Boolean.class), new Converter() {
@Override
public Object convert(Object value) {
return Boolean.valueOf((String) value);
}
});
CONVERSION_MAP.put(new ConversionKey(String.class, Byte.class), new Converter() {
@Override
public Object convert(Object value) {
return Byte.valueOf((String) value);
}
});
CONVERSION_MAP.put(new ConversionKey(String.class, Short.class), new Converter() {
@Override
public Object convert(Object value) {
return Short.valueOf((String) value);
}
});
CONVERSION_MAP.put(new ConversionKey(String.class, Integer.class), new Converter() {
@Override
public Object convert(Object value) {
return Integer.valueOf((String) value);
}
});
CONVERSION_MAP.put(new ConversionKey(String.class, Long.class), new Converter() {
@Override
public Object convert(Object value) {
return Long.valueOf((String) value);
}
});
CONVERSION_MAP.put(new ConversionKey(String.class, Float.class), new Converter() {
@Override
public Object convert(Object value) {
return Float.valueOf((String) value);
}
});
CONVERSION_MAP.put(new ConversionKey(String.class, Double.class), new Converter() {
@Override
public Object convert(Object value) {
return Double.valueOf((String) value);
}
});
Converter longConverter = new Converter() {
@Override
public Object convert(Object value) {
return Long.valueOf(((Number) value).longValue());
}
};
CONVERSION_MAP.put(new ConversionKey(Byte.class, Long.class), longConverter);
CONVERSION_MAP.put(new ConversionKey(Short.class, Long.class), longConverter);
CONVERSION_MAP.put(new ConversionKey(Integer.class, Long.class), longConverter);
CONVERSION_MAP.put(new ConversionKey(Date.class, Long.class), new Converter() {
@Override
public Object convert(Object value) {
return Long.valueOf(((Date) value).getTime());
}
});
Converter intConverter = new Converter() {
@Override
public Object convert(Object value) {
return Integer.valueOf(((Number) value).intValue());
}
};
CONVERSION_MAP.put(new ConversionKey(Byte.class, Integer.class), intConverter);
CONVERSION_MAP.put(new ConversionKey(Short.class, Integer.class), intConverter);
CONVERSION_MAP.put(new ConversionKey(Byte.class, Short.class), new Converter() {
@Override
public Object convert(Object value) {
return Short.valueOf(((Number) value).shortValue());
}
});
CONVERSION_MAP.put(new ConversionKey(Float.class, Double.class), new Converter() {
@Override
public Object convert(Object value) {
return new Double(((Number) value).doubleValue());
}
});
}
public static Object convert(Object value, Class<?> toClass) {
assert value != null && toClass != null;
if (value.getClass() == toClass) {
return value;
}
Class<?> fromClass = value.getClass();
if (fromClass.isPrimitive()) {
fromClass = convertPrimitiveTypeToWrapperType(fromClass);
}
if (toClass.isPrimitive()) {
toClass = convertPrimitiveTypeToWrapperType(toClass);
}
Converter c = CONVERSION_MAP.get(new ConversionKey(fromClass, toClass));
if (c == null) {
return null;
}
return c.convert(value);
}
private static Class<?> convertPrimitiveTypeToWrapperType(Class<?> type) {
Class<?> rc = type;
if (type.isPrimitive()) {
if (type == int.class) {
rc = Integer.class;
} else if (type == long.class) {
rc = Long.class;
} else if (type == double.class) {
rc = Double.class;
} else if (type == float.class) {
rc = Float.class;
} else if (type == short.class) {
rc = Short.class;
} else if (type == byte.class) {
rc = Byte.class;
} else if (type == boolean.class) {
rc = Boolean.class;
}
}
return rc;
}
private TypeConversionSupport() {}
}