ARTEMIS-1511 Enable WebSocket Transport in STOMP test client

This commit is contained in:
Martyn Taylor 2017-11-10 12:33:31 +00:00 committed by Clebert Suconic
parent 5211afdf86
commit c6e5163a51
8 changed files with 191 additions and 46 deletions

View File

@ -24,6 +24,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
public abstract class AbstractClientStompFrame implements ClientStompFrame {
@ -88,6 +90,16 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame {
return toByteBufferInternal(str);
}
@Override
public ByteBuf toNettyByteBuf() {
return Unpooled.copiedBuffer(toByteBuffer());
}
@Override
public ByteBuf toNettyByteBufWithExtras(String str) {
return Unpooled.copiedBuffer(toByteBufferWithExtra(str));
}
public ByteBuffer toByteBufferInternal(String str) {
StringBuffer sb = new StringBuffer();
sb.append(command + EOL);

View File

@ -17,9 +17,8 @@
package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@ -27,8 +26,15 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.netty.NettyTransport;
import org.apache.activemq.transport.netty.NettyTransportFactory;
import org.apache.activemq.transport.netty.NettyTransportListener;
public abstract class AbstractStompClientConnection implements StompClientConnection {
@ -39,41 +45,53 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
protected String username;
protected String passcode;
protected StompFrameFactory factory;
protected final SocketChannel socketChannel;
protected NettyTransport transport;
protected ByteBuffer readBuffer;
protected List<Byte> receiveList;
protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<>();
protected boolean connected = false;
protected int serverPingCounter;
protected ReaderThread readerThread;
//protected ReaderThread readerThread;
protected String scheme;
@Deprecated
public AbstractStompClientConnection(String version, String host, int port) throws IOException {
this.version = version;
this.host = host;
this.port = port;
this.scheme = "tcp";
this.factory = StompFrameFactoryFactory.getFactory(version);
socketChannel = SocketChannel.open();
initSocket();
}
private void initSocket() throws IOException {
socketChannel.configureBlocking(true);
InetSocketAddress remoteAddr = new InetSocketAddress(host, port);
socketChannel.connect(remoteAddr);
public AbstractStompClientConnection(URI uri) throws Exception {
parseURI(uri);
this.factory = StompFrameFactoryFactory.getFactory(version);
startReaderThread();
}
private void startReaderThread() {
readBuffer = ByteBuffer.allocateDirect(10240);
receiveList = new ArrayList<>(10240);
readerThread = new ReaderThread();
readerThread.start();
transport = NettyTransportFactory.createTransport(uri);
transport.setTransportListener(new StompTransportListener());
transport.connect();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return transport.isConnected();
}
}, 10000);
if (!transport.isConnected()) {
throw new RuntimeException("Could not connect transport");
}
}
public void killReaderThread() {
readerThread.stop();
private void parseURI(URI uri) {
scheme = uri.getScheme() == null ? "tcp" : uri.getScheme();
host = uri.getHost();
port = uri.getPort();
this.version = StompClientConnectionFactory.getStompVersionFromURI(uri);
}
private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException {
@ -85,8 +103,17 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
} else {
buffer = frame.toByteBuffer();
}
while (buffer.remaining() > 0) {
socketChannel.write(buffer);
ByteBuf buf = Unpooled.copiedBuffer(buffer);
try {
buf.retain();
ChannelFuture future = transport.send(buf);
if (future != null) {
future.awaitUninterruptibly();
}
} finally {
buf.release();
}
//now response
@ -179,35 +206,78 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
}
protected void close() throws IOException {
socketChannel.close();
transport.close();
}
private class ReaderThread extends Thread {
private class StompTransportListener implements NettyTransportListener {
/**
* Called when new incoming data has become available.
*
* @param incoming the next incoming packet of data.
*/
@Override
public void run() {
try {
int n = socketChannel.read(readBuffer);
while (n >= 0) {
if (n > 0) {
receiveBytes(n);
}
n = socketChannel.read(readBuffer);
}
//peer closed
close();
} catch (IOException e) {
try {
close();
} catch (IOException e1) {
//ignore
public void onData(ByteBuf incoming) {
while (incoming.readableBytes() > 0) {
int bytes = incoming.readableBytes();
if (incoming.readableBytes() < readBuffer.remaining()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(incoming.readableBytes());
incoming.readBytes(byteBuffer);
byteBuffer.rewind();
readBuffer.put(byteBuffer);
receiveBytes(bytes);
} else {
incoming.readBytes(readBuffer);
receiveBytes(bytes - incoming.readableBytes());
}
}
}
/**
* Called if the connection state becomes closed.
*/
@Override
public void onTransportClosed() {
}
/**
* Called when an error occurs during normal Transport operations.
*
* @param cause the error that triggered this event.
*/
@Override
public void onTransportError(Throwable cause) {
throw new RuntimeException(cause);
}
}
// private class ReaderThread extends Thread {
//
// @Override
// public void run() {
// try {
// transport.setTransportListener();
// int n = Z..read(readBuffer);
//
// while (n >= 0) {
// if (n > 0) {
// receiveBytes(n);
// }
// n = socketChannel.read(readBuffer);
// }
// //peer closed
// close();
//
// } catch (IOException e) {
// try {
// close();
// } catch (IOException e1) {
// //ignore
// }
// }
// }
// }
@Override
public ClientStompFrame connect() throws Exception {
return connect(null, null);
@ -230,7 +300,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
@Override
public boolean isConnected() {
return connected && socketChannel.isConnected();
return connected && transport.isConnected();
}
@Override
@ -243,6 +313,11 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
return this.frameQueue.size();
}
@Override
public void closeTransport() throws IOException {
transport.close();
}
protected class Pinger extends Thread {
long pingInterval;

View File

@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
/**
* pls use factory to create frames.
*/
@ -25,6 +27,8 @@ public interface ClientStompFrame {
ByteBuffer toByteBuffer();
ByteBuf toNettyByteBuf();
boolean needsReply();
ClientStompFrame setCommand(String command);
@ -41,6 +45,8 @@ public interface ClientStompFrame {
ByteBuffer toByteBufferWithExtra(String str);
ByteBuf toNettyByteBufWithExtras(String str);
boolean isPing();
ClientStompFrame setForceOneway();

View File

@ -53,5 +53,6 @@ public interface StompClientConnection {
int getServerPingNumber();
void closeTransport() throws IOException;
}

View File

@ -17,9 +17,12 @@
package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
import java.net.URI;
public class StompClientConnectionFactory {
public static final String LATEST_VERSION = "1.2";
//create a raw connection to the host.
public static StompClientConnection createClientConnection(String version,
String host,
@ -36,6 +39,34 @@ public class StompClientConnectionFactory {
return null;
}
public static StompClientConnection createClientConnection(URI uri) throws Exception {
String version = getStompVersionFromURI(uri);
if ("1.0".equals(version)) {
return new StompClientConnectionV10(uri);
}
if ("1.1".equals(version)) {
return new StompClientConnectionV11(uri);
}
if ("1.2".equals(version)) {
return new StompClientConnectionV12(uri);
}
return null;
}
public static String getStompVersionFromURI(URI uri) {
String scheme = uri.getScheme();
if (scheme.contains("10")) {
return "1.0";
}
if (scheme.contains("11")) {
return "1.1";
}
if (scheme.contains("12")) {
return "1.2";
}
return LATEST_VERSION;
}
public static void main(String[] args) throws Exception {
StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", "localhost", 61613);

View File

@ -17,12 +17,14 @@
package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
import java.net.URI;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
public class StompClientConnectionV10 extends AbstractStompClientConnection {
public StompClientConnectionV10(String host, int port) throws IOException {
super("1.0", host, port);
}
@ -31,6 +33,10 @@ public class StompClientConnectionV10 extends AbstractStompClientConnection {
super(version, host, port);
}
public StompClientConnectionV10(URI uri) throws Exception {
super(uri);
}
@Override
public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
return connect(username, passcode, null);

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
import java.net.URI;
import java.util.UUID;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
@ -31,6 +32,10 @@ public class StompClientConnectionV11 extends StompClientConnectionV10 {
super(version, host, port);
}
public StompClientConnectionV11(URI uri) throws Exception {
super(uri);
}
@Override
public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
@ -96,12 +101,16 @@ public class StompClientConnectionV11 extends StompClientConnectionV10 {
frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
ClientStompFrame result = this.sendFrame(frame);
if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
throw new IOException("Disconnect failed! " + result);
try {
if (!transport.isConnected()) {
ClientStompFrame result = this.sendFrame(frame);
if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
throw new IOException("Disconnect failed! " + result);
}
}
} catch (Exception e) {
// Transport may have been closed
}
close();
connected = false;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
import java.net.URI;
public class StompClientConnectionV12 extends StompClientConnectionV11 {
@ -24,6 +25,10 @@ public class StompClientConnectionV12 extends StompClientConnectionV11 {
super("1.2", host, port);
}
public StompClientConnectionV12(URI uri) throws Exception {
super(uri);
}
public ClientStompFrame createAnyFrame(String command) {
return factory.newAnyFrame(command);
}