ARTEMIS-1511 Refactor AMQP Transport for use with other test clients

This commit is contained in:
Martyn Taylor 2017-11-10 12:31:29 +00:00 committed by Clebert Suconic
parent 63b156e290
commit 5211afdf86
12 changed files with 51 additions and 38 deletions

View File

@ -21,8 +21,8 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.activemq.transport.amqp.client.transport.NettyTransport; import org.apache.activemq.transport.netty.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory; import org.apache.activemq.transport.netty.NettyTransportFactory;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@ -33,8 +33,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.netty.NettyTransport;
import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator; import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener; import org.apache.activemq.transport.netty.NettyTransportListener;
import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IdGenerator; import org.apache.activemq.transport.amqp.client.util.IdGenerator;
@ -80,7 +81,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private final AtomicLong sessionIdGenerator = new AtomicLong(); private final AtomicLong sessionIdGenerator = new AtomicLong();
private final AtomicLong txIdGenerator = new AtomicLong(); private final AtomicLong txIdGenerator = new AtomicLong();
private final Collector protonCollector = new CollectorImpl(); private final Collector protonCollector = new CollectorImpl();
private final org.apache.activemq.transport.amqp.client.transport.NettyTransport transport; private final NettyTransport transport;
private final Transport protonTransport = Transport.Factory.create(); private final Transport protonTransport = Transport.Factory.create();
private final String username; private final String username;
@ -109,7 +110,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private boolean trace; private boolean trace;
private boolean noContainerID = false; private boolean noContainerID = false;
public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) { public AmqpConnection(NettyTransport transport, String username, String password) {
setEndpoint(Connection.Factory.create()); setEndpoint(Connection.Factory.create());
getEndpoint().collect(protonCollector); getEndpoint().collect(protonCollector);

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.amqp.client.transport; package org.apache.activemq.transport.netty;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
@ -223,16 +223,16 @@ public class NettyTcpTransport implements NettyTransport {
} }
@Override @Override
public void send(ByteBuf output) throws IOException { public ChannelFuture send(ByteBuf output) throws IOException {
checkConnected(); checkConnected();
int length = output.readableBytes(); int length = output.readableBytes();
if (length == 0) { if (length == 0) {
return; return null;
} }
LOG.trace("Attempted write of: {} bytes", length); LOG.trace("Attempted write of: {} bytes", length);
channel.writeAndFlush(output); return channel.writeAndFlush(output);
} }
@Override @Override

View File

@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.amqp.client.transport; package org.apache.activemq.transport.netty;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.security.Principal; import java.security.Principal;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
/** /**
* Base for all Netty based Transports in this client. * Base for all Netty based Transports in this client.
@ -37,7 +38,7 @@ public interface NettyTransport {
ByteBuf allocateSendBuffer(int size) throws IOException; ByteBuf allocateSendBuffer(int size) throws IOException;
void send(ByteBuf output) throws IOException; ChannelFuture send(ByteBuf output) throws IOException;
NettyTransportListener getTransportListener(); NettyTransportListener getTransportListener();

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.amqp.client.transport; package org.apache.activemq.transport.netty;
import java.net.URI; import java.net.URI;
import java.util.Map; import java.util.Map;
@ -65,19 +65,18 @@ public final class NettyTransportFactory {
NettyTransport result = null; NettyTransport result = null;
switch (remoteURI.getScheme().toLowerCase()) { String scheme = remoteURI.getScheme().toLowerCase();
case "tcp": if (scheme.startsWith("tcp") || scheme.startsWith("ssl")) {
case "ssl": result = new NettyTcpTransport(remoteURI, transportOptions);
result = new NettyTcpTransport(remoteURI, transportOptions); } else if (scheme.startsWith("ws") || scheme.startsWith("wss")) {
break; // Check for ws subprotocol
case "ws": if (scheme.contains("+")) {
case "wss": transportOptions.setWsSubProtocol(scheme.substring(scheme.indexOf("+") + 1));
result = new NettyWSTransport(remoteURI, transportOptions); }
break; result = new NettyWSTransport(remoteURI, transportOptions);
default: } else {
throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme()); throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme());
} }
return result; return result;
} }
} }

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.amqp.client.transport; package org.apache.activemq.transport.netty;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.amqp.client.transport; package org.apache.activemq.transport.netty;
/** /**
* Encapsulates all the TCP Transport options in one configuration object. * Encapsulates all the TCP Transport options in one configuration object.
@ -31,6 +31,7 @@ public class NettyTransportOptions implements Cloneable {
public static final int DEFAULT_CONNECT_TIMEOUT = 60000; public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
public static final int DEFAULT_TCP_PORT = 5672; public static final int DEFAULT_TCP_PORT = 5672;
public static final boolean DEFAULT_TRACE_BYTES = false; public static final boolean DEFAULT_TRACE_BYTES = false;
public static final String DEFAULT_WS_SUBPROTOCOL = NettyWSTransport.AMQP_SUB_PROTOCOL;
public static final NettyTransportOptions INSTANCE = new NettyTransportOptions(); public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();
@ -44,6 +45,7 @@ public class NettyTransportOptions implements Cloneable {
private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
private int defaultTcpPort = DEFAULT_TCP_PORT; private int defaultTcpPort = DEFAULT_TCP_PORT;
private boolean traceBytes = DEFAULT_TRACE_BYTES; private boolean traceBytes = DEFAULT_TRACE_BYTES;
private String wsSubProtocol = DEFAULT_WS_SUBPROTOCOL;
/** /**
* @return the currently set send buffer size in bytes. * @return the currently set send buffer size in bytes.
@ -188,6 +190,14 @@ public class NettyTransportOptions implements Cloneable {
return false; return false;
} }
public String getWsSubProtocol() {
return wsSubProtocol;
}
public void setWsSubProtocol(String wsSubProtocol) {
this.wsSubProtocol = wsSubProtocol;
}
@Override @Override
public NettyTransportOptions clone() { public NettyTransportOptions clone() {
return copyOptions(new NettyTransportOptions()); return copyOptions(new NettyTransportOptions());
@ -202,6 +212,7 @@ public class NettyTransportOptions implements Cloneable {
copy.setTcpKeepAlive(isTcpKeepAlive()); copy.setTcpKeepAlive(isTcpKeepAlive());
copy.setTcpNoDelay(isTcpNoDelay()); copy.setTcpNoDelay(isTcpNoDelay());
copy.setTrafficClass(getTrafficClass()); copy.setTrafficClass(getTrafficClass());
copy.setWsSubProtocol(getWsSubProtocol());
return copy; return copy;
} }

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.amqp.client.transport; package org.apache.activemq.transport.netty;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.amqp.client.transport; package org.apache.activemq.transport.netty;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;

View File

@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.transport.amqp.client.transport; package org.apache.activemq.transport.netty;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import io.netty.channel.ChannelFuture;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -50,7 +51,7 @@ public class NettyWSTransport extends NettyTcpTransport {
private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class); private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
private static final String AMQP_SUB_PROTOCOL = "amqp"; public static final String AMQP_SUB_PROTOCOL = "amqp";
/** /**
* Create a new transport instance * Create a new transport instance
@ -79,16 +80,16 @@ public class NettyWSTransport extends NettyTcpTransport {
} }
@Override @Override
public void send(ByteBuf output) throws IOException { public ChannelFuture send(ByteBuf output) throws IOException {
checkConnected(); checkConnected();
int length = output.readableBytes(); int length = output.readableBytes();
if (length == 0) { if (length == 0) {
return; return null;
} }
LOG.trace("Attempted write of: {} bytes", length); LOG.trace("Attempted write of: {} bytes", length);
channel.writeAndFlush(new BinaryWebSocketFrame(output)); return channel.writeAndFlush(new BinaryWebSocketFrame(output));
} }
@Override @Override
@ -115,7 +116,7 @@ public class NettyWSTransport extends NettyTcpTransport {
NettyWebSocketTransportHandler() { NettyWebSocketTransportHandler() {
handshaker = WebSocketClientHandshakerFactory.newHandshaker( handshaker = WebSocketClientHandshakerFactory.newHandshaker(
getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, getRemoteLocation(), WebSocketVersion.V13, options.getWsSubProtocol(),
true, new DefaultHttpHeaders(), getMaxFrameSize()); true, new DefaultHttpHeaders(), getMaxFrameSize());
} }

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.apache.activemq.transport.amqp.client.transport; package org.apache.activemq.transport.netty;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import javax.net.ssl.X509ExtendedKeyManager; import javax.net.ssl.X509ExtendedKeyManager;

View File

@ -25,9 +25,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.junit.Wait; import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.transport.NettyTransport; import org.apache.activemq.transport.netty.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory; import org.apache.activemq.transport.netty.NettyTransportFactory;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener; import org.apache.activemq.transport.netty.NettyTransportListener;
import org.junit.Test; import org.junit.Test;
import java.net.URI; import java.net.URI;