diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java index c48fd8de33..2f8a11fb16 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java @@ -24,6 +24,8 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; public abstract class AbstractClientStompFrame implements ClientStompFrame { @@ -88,6 +90,16 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame { return toByteBufferInternal(str); } + @Override + public ByteBuf toNettyByteBuf() { + return Unpooled.copiedBuffer(toByteBuffer()); + } + + @Override + public ByteBuf toNettyByteBufWithExtras(String str) { + return Unpooled.copiedBuffer(toByteBufferWithExtra(str)); + } + public ByteBuffer toByteBufferInternal(String str) { StringBuffer sb = new StringBuffer(); sb.append(command + EOL); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java index 707c8a17bd..78c9c4b310 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java @@ -17,9 +17,8 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.io.IOException; -import java.net.InetSocketAddress; +import java.net.URI; import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -27,8 +26,15 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.transport.netty.NettyTransport; +import org.apache.activemq.transport.netty.NettyTransportFactory; +import org.apache.activemq.transport.netty.NettyTransportListener; public abstract class AbstractStompClientConnection implements StompClientConnection { @@ -39,41 +45,53 @@ public abstract class AbstractStompClientConnection implements StompClientConnec protected String username; protected String passcode; protected StompFrameFactory factory; - protected final SocketChannel socketChannel; + protected NettyTransport transport; protected ByteBuffer readBuffer; protected List receiveList; protected BlockingQueue frameQueue = new LinkedBlockingQueue<>(); protected boolean connected = false; protected int serverPingCounter; - protected ReaderThread readerThread; + //protected ReaderThread readerThread; + protected String scheme; + @Deprecated public AbstractStompClientConnection(String version, String host, int port) throws IOException { this.version = version; this.host = host; this.port = port; + this.scheme = "tcp"; + this.factory = StompFrameFactoryFactory.getFactory(version); - socketChannel = SocketChannel.open(); - initSocket(); } - private void initSocket() throws IOException { - socketChannel.configureBlocking(true); - InetSocketAddress remoteAddr = new InetSocketAddress(host, port); - socketChannel.connect(remoteAddr); + public AbstractStompClientConnection(URI uri) throws Exception { + parseURI(uri); + this.factory = StompFrameFactoryFactory.getFactory(version); - startReaderThread(); - } - - private void startReaderThread() { readBuffer = ByteBuffer.allocateDirect(10240); receiveList = new ArrayList<>(10240); - readerThread = new ReaderThread(); - readerThread.start(); + transport = NettyTransportFactory.createTransport(uri); + transport.setTransportListener(new StompTransportListener()); + transport.connect(); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return transport.isConnected(); + } + }, 10000); + + if (!transport.isConnected()) { + throw new RuntimeException("Could not connect transport"); + } } - public void killReaderThread() { - readerThread.stop(); + private void parseURI(URI uri) { + scheme = uri.getScheme() == null ? "tcp" : uri.getScheme(); + host = uri.getHost(); + port = uri.getPort(); + this.version = StompClientConnectionFactory.getStompVersionFromURI(uri); } private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException { @@ -85,8 +103,17 @@ public abstract class AbstractStompClientConnection implements StompClientConnec } else { buffer = frame.toByteBuffer(); } - while (buffer.remaining() > 0) { - socketChannel.write(buffer); + + ByteBuf buf = Unpooled.copiedBuffer(buffer); + + try { + buf.retain(); + ChannelFuture future = transport.send(buf); + if (future != null) { + future.awaitUninterruptibly(); + } + } finally { + buf.release(); } //now response @@ -179,35 +206,78 @@ public abstract class AbstractStompClientConnection implements StompClientConnec } protected void close() throws IOException { - socketChannel.close(); + transport.close(); } - private class ReaderThread extends Thread { + private class StompTransportListener implements NettyTransportListener { + /** + * Called when new incoming data has become available. + * + * @param incoming the next incoming packet of data. + */ @Override - public void run() { - try { - int n = socketChannel.read(readBuffer); - - while (n >= 0) { - if (n > 0) { - receiveBytes(n); - } - n = socketChannel.read(readBuffer); - } - //peer closed - close(); - - } catch (IOException e) { - try { - close(); - } catch (IOException e1) { - //ignore + public void onData(ByteBuf incoming) { + while (incoming.readableBytes() > 0) { + int bytes = incoming.readableBytes(); + if (incoming.readableBytes() < readBuffer.remaining()) { + ByteBuffer byteBuffer = ByteBuffer.allocate(incoming.readableBytes()); + incoming.readBytes(byteBuffer); + byteBuffer.rewind(); + readBuffer.put(byteBuffer); + receiveBytes(bytes); + } else { + incoming.readBytes(readBuffer); + receiveBytes(bytes - incoming.readableBytes()); } } } + + /** + * Called if the connection state becomes closed. + */ + @Override + public void onTransportClosed() { + } + + /** + * Called when an error occurs during normal Transport operations. + * + * @param cause the error that triggered this event. + */ + @Override + public void onTransportError(Throwable cause) { + throw new RuntimeException(cause); + } } +// private class ReaderThread extends Thread { +// +// @Override +// public void run() { +// try { +// transport.setTransportListener(); +// int n = Z..read(readBuffer); +// +// while (n >= 0) { +// if (n > 0) { +// receiveBytes(n); +// } +// n = socketChannel.read(readBuffer); +// } +// //peer closed +// close(); +// +// } catch (IOException e) { +// try { +// close(); +// } catch (IOException e1) { +// //ignore +// } +// } +// } +// } + @Override public ClientStompFrame connect() throws Exception { return connect(null, null); @@ -230,7 +300,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec @Override public boolean isConnected() { - return connected && socketChannel.isConnected(); + return connected && transport.isConnected(); } @Override @@ -243,6 +313,11 @@ public abstract class AbstractStompClientConnection implements StompClientConnec return this.frameQueue.size(); } + @Override + public void closeTransport() throws IOException { + transport.close(); + } + protected class Pinger extends Thread { long pingInterval; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java index 93801f9675..1b77e127d4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; + /** * pls use factory to create frames. */ @@ -25,6 +27,8 @@ public interface ClientStompFrame { ByteBuffer toByteBuffer(); + ByteBuf toNettyByteBuf(); + boolean needsReply(); ClientStompFrame setCommand(String command); @@ -41,6 +45,8 @@ public interface ClientStompFrame { ByteBuffer toByteBufferWithExtra(String str); + ByteBuf toNettyByteBufWithExtras(String str); + boolean isPing(); ClientStompFrame setForceOneway(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java index 7be09a5447..012bb49054 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java @@ -53,5 +53,6 @@ public interface StompClientConnection { int getServerPingNumber(); + void closeTransport() throws IOException; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java index 3a40c99926..06d18455e6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java @@ -17,9 +17,12 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.io.IOException; +import java.net.URI; public class StompClientConnectionFactory { + public static final String LATEST_VERSION = "1.2"; + //create a raw connection to the host. public static StompClientConnection createClientConnection(String version, String host, @@ -36,6 +39,34 @@ public class StompClientConnectionFactory { return null; } + public static StompClientConnection createClientConnection(URI uri) throws Exception { + String version = getStompVersionFromURI(uri); + if ("1.0".equals(version)) { + return new StompClientConnectionV10(uri); + } + if ("1.1".equals(version)) { + return new StompClientConnectionV11(uri); + } + if ("1.2".equals(version)) { + return new StompClientConnectionV12(uri); + } + return null; + } + + public static String getStompVersionFromURI(URI uri) { + String scheme = uri.getScheme(); + if (scheme.contains("10")) { + return "1.0"; + } + if (scheme.contains("11")) { + return "1.1"; + } + if (scheme.contains("12")) { + return "1.2"; + } + return LATEST_VERSION; + } + public static void main(String[] args) throws Exception { StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", "localhost", 61613); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java index d32823bfcd..56c72dba4e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java @@ -17,12 +17,14 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.io.IOException; +import java.net.URI; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; public class StompClientConnectionV10 extends AbstractStompClientConnection { + public StompClientConnectionV10(String host, int port) throws IOException { super("1.0", host, port); } @@ -31,6 +33,10 @@ public class StompClientConnectionV10 extends AbstractStompClientConnection { super(version, host, port); } + public StompClientConnectionV10(URI uri) throws Exception { + super(uri); + } + @Override public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException { return connect(username, passcode, null); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java index 6ef88cb653..5f0cca3c5f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.io.IOException; +import java.net.URI; import java.util.UUID; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; @@ -31,6 +32,10 @@ public class StompClientConnectionV11 extends StompClientConnectionV10 { super(version, host, port); } + public StompClientConnectionV11(URI uri) throws Exception { + super(uri); + } + @Override public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException { ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT); @@ -96,12 +101,16 @@ public class StompClientConnectionV11 extends StompClientConnectionV10 { frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); - ClientStompFrame result = this.sendFrame(frame); - - if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) { - throw new IOException("Disconnect failed! " + result); + try { + if (!transport.isConnected()) { + ClientStompFrame result = this.sendFrame(frame); + if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) { + throw new IOException("Disconnect failed! " + result); + } + } + } catch (Exception e) { + // Transport may have been closed } - close(); connected = false; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java index 2d8f354599..afa1f08a72 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.io.IOException; +import java.net.URI; public class StompClientConnectionV12 extends StompClientConnectionV11 { @@ -24,6 +25,10 @@ public class StompClientConnectionV12 extends StompClientConnectionV11 { super("1.2", host, port); } + public StompClientConnectionV12(URI uri) throws Exception { + super(uri); + } + public ClientStompFrame createAnyFrame(String command) { return factory.newAnyFrame(command); }