ARTEMIS-2323 NettyTransport should also send requests with void promises

This commit is contained in:
Francesco Nigro 2019-04-26 13:46:26 +02:00 committed by Clebert Suconic
parent d0a1cb37c4
commit 5f040cf38b
4 changed files with 34 additions and 40 deletions

View File

@ -57,7 +57,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
@ -295,31 +294,6 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
return session;
}
//----- Access to low level IO for specific test cases -------------------//
public void sendRawBytes(final byte[] rawData) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();
serializer.execute(new Runnable() {
@Override
public void run() {
checkClosed();
try {
transport.send(Unpooled.wrappedBuffer(rawData));
} catch (IOException e) {
fireClientException(e);
} finally {
request.onSuccess();
}
}
});
request.sync();
}
//----- Configuration accessors ------------------------------------------//
/**
@ -550,7 +524,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
if (toWrite != null && toWrite.hasRemaining()) {
ByteBuf outbound = transport.allocateSendBuffer(toWrite.remaining());
outbound.writeBytes(toWrite);
transport.send(outbound);
transport.sendVoidPromise(outbound);
protonTransport.outputConsumed();
} else {
done = true;

View File

@ -22,7 +22,11 @@ import java.security.Principal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static java.util.function.Function.identity;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCounted;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -222,17 +226,34 @@ public class NettyTcpTransport implements NettyTransport {
return channel.alloc().ioBuffer(size, size);
}
@Override
public ChannelFuture send(ByteBuf output) throws IOException {
checkConnected();
protected final ChannelFuture writeAndFlush(ByteBuf output,
ChannelPromise promise,
Function<? super ByteBuf, ? extends ReferenceCounted> bufferTransformer) throws IOException {
try {
checkConnected();
} catch (IOException ioEx) {
output.release();
throw ioEx;
}
int length = output.readableBytes();
if (length == 0) {
output.release();
return null;
}
LOG.trace("Attempted write of: {} bytes", length);
return channel.writeAndFlush(output);
return channel.writeAndFlush(bufferTransformer.apply(output), promise);
}
@Override
public ChannelFuture send(ByteBuf output) throws IOException {
return writeAndFlush(output, channel.newPromise(), identity());
}
@Override
public void sendVoidPromise(ByteBuf output) throws IOException {
writeAndFlush(output, channel.voidPromise(), identity());
}
@Override

View File

@ -38,6 +38,8 @@ public interface NettyTransport {
ByteBuf allocateSendBuffer(int size) throws IOException;
void sendVoidPromise(ByteBuf output) throws IOException;
ChannelFuture send(ByteBuf output) throws IOException;
NettyTransportListener getTransportListener();

View File

@ -79,17 +79,14 @@ public class NettyWSTransport extends NettyTcpTransport {
super(listener, remoteLocation, options);
}
@Override
public void sendVoidPromise(ByteBuf output) throws IOException {
writeAndFlush(output, channel.voidPromise(), BinaryWebSocketFrame::new);
}
@Override
public ChannelFuture send(ByteBuf output) throws IOException {
checkConnected();
int length = output.readableBytes();
if (length == 0) {
return null;
}
LOG.trace("Attempted write of: {} bytes", length);
return channel.writeAndFlush(new BinaryWebSocketFrame(output));
return writeAndFlush(output, channel.newPromise(), BinaryWebSocketFrame::new);
}
@Override