This commit is contained in:
Clebert Suconic 2017-05-15 09:00:50 -04:00
commit 47e0cffc92
4 changed files with 25 additions and 5 deletions

View File

@ -109,9 +109,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private boolean trace;
private boolean noContainerID = false;
public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport,
String username,
String password) {
public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) {
setEndpoint(Connection.Factory.create());
getEndpoint().collect(protonCollector);
@ -137,6 +135,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
this.serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.transport.setTransportListener(this);
this.transport.setMaxFrameSize(getMaxFrameSize());
}
public void connect() throws Exception {

View File

@ -55,6 +55,7 @@ public class NettyTcpTransport implements NettyTransport {
private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
private static final int SHUTDOWN_TIMEOUT = 100;
public static final int DEFAULT_MAX_FRAME_SIZE = 65535;
protected Bootstrap bootstrap;
protected EventLoopGroup group;
@ -62,6 +63,7 @@ public class NettyTcpTransport implements NettyTransport {
protected NettyTransportListener listener;
protected final NettyTransportOptions options;
protected final URI remote;
protected int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
@ -265,6 +267,20 @@ public class NettyTcpTransport implements NettyTransport {
return result;
}
@Override
public void setMaxFrameSize(int maxFrameSize) {
if (connected.get()) {
throw new IllegalStateException("Cannot change Max Frame Size while connected.");
}
this.maxFrameSize = maxFrameSize;
}
@Override
public int getMaxFrameSize() {
return maxFrameSize;
}
// ----- Internal implementation details, can be overridden as needed -----//
protected String getRemoteHost() {

View File

@ -49,4 +49,8 @@ public interface NettyTransport {
Principal getLocalPrincipal();
void setMaxFrameSize(int maxFrameSize);
int getMaxFrameSize();
}

View File

@ -113,8 +113,9 @@ public class NettyWSTransport extends NettyTcpTransport {
private final WebSocketClientHandshaker handshaker;
NettyWebSocketTransportHandler() {
handshaker = WebSocketClientHandshakerFactory.newHandshaker(getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, true,
new DefaultHttpHeaders());
handshaker = WebSocketClientHandshakerFactory.newHandshaker(
getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL,
true, new DefaultHttpHeaders(), getMaxFrameSize());
}
@Override