ARTEMIS-1428 support max frame payload length for STOMP websocket
This commit is contained in:
parent
24e18fdf6d
commit
abcf4b747e
|
@ -251,6 +251,10 @@ public class TransportConstants {
|
|||
|
||||
public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
|
||||
|
||||
public static final String STOMP_MAX_FRAME_PAYLOAD_LENGTH = "stompMaxFramePayloadLength";
|
||||
|
||||
public static final int DEFAULT_STOMP_MAX_FRAME_PAYLOAD_LENGTH = 65536;
|
||||
|
||||
static {
|
||||
Set<String> allowableAcceptorKeys = new HashSet<>();
|
||||
allowableAcceptorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME);
|
||||
|
@ -294,6 +298,7 @@ public class TransportConstants {
|
|||
allowableAcceptorKeys.add(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER);
|
||||
allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID);
|
||||
allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED);
|
||||
allowableAcceptorKeys.add(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH);
|
||||
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
|
||||
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
|
||||
allowableAcceptorKeys.add(TransportConstants.BACKLOG_PROP_NAME);
|
||||
|
|
|
@ -58,8 +58,6 @@ public class ProtocolHandler {
|
|||
|
||||
private NettyAcceptor nettyAcceptor;
|
||||
|
||||
private Map<String, Object> configuration;
|
||||
|
||||
private ScheduledExecutorService scheduledThreadPool;
|
||||
|
||||
private HttpKeepAliveRunnable httpKeepAliveRunnable;
|
||||
|
@ -68,11 +66,9 @@ public class ProtocolHandler {
|
|||
|
||||
public ProtocolHandler(Map<String, ProtocolManager> protocolMap,
|
||||
NettyAcceptor nettyAcceptor,
|
||||
final Map<String, Object> configuration,
|
||||
ScheduledExecutorService scheduledThreadPool) {
|
||||
this.protocolMap = protocolMap;
|
||||
this.nettyAcceptor = nettyAcceptor;
|
||||
this.configuration = configuration;
|
||||
this.scheduledThreadPool = scheduledThreadPool;
|
||||
|
||||
websocketSubprotocolIds = new ArrayList<>();
|
||||
|
@ -115,7 +111,7 @@ public class ProtocolHandler {
|
|||
HttpHeaders headers = request.headers();
|
||||
String upgrade = headers.get("upgrade");
|
||||
if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) {
|
||||
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds));
|
||||
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH, TransportConstants.DEFAULT_STOMP_MAX_FRAME_PAYLOAD_LENGTH, nettyAcceptor.getConfiguration())));
|
||||
ctx.pipeline().addLast(new ProtocolDecoder(false, false));
|
||||
ctx.pipeline().remove(this);
|
||||
ctx.pipeline().remove("http-handler");
|
||||
|
@ -207,12 +203,12 @@ public class ProtocolHandler {
|
|||
p.addLast("http-encoder", new HttpResponseEncoder());
|
||||
//create it lazily if and when we need it
|
||||
if (httpKeepAliveRunnable == null) {
|
||||
long httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, configuration);
|
||||
long httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, nettyAcceptor.getConfiguration());
|
||||
httpKeepAliveRunnable = new HttpKeepAliveRunnable();
|
||||
Future<?> future = scheduledThreadPool.scheduleAtFixedRate(httpKeepAliveRunnable, httpServerScanPeriod, httpServerScanPeriod, TimeUnit.MILLISECONDS);
|
||||
httpKeepAliveRunnable.setFuture(future);
|
||||
}
|
||||
long httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, configuration);
|
||||
long httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, nettyAcceptor.getConfiguration());
|
||||
HttpAcceptorHandler httpHandler = new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime, ctx.channel());
|
||||
ctx.pipeline().addLast("http-handler", httpHandler);
|
||||
p.addLast(new ProtocolDecoder(false, true));
|
||||
|
|
|
@ -238,7 +238,7 @@ public class NettyAcceptor extends AbstractAcceptor {
|
|||
backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
|
||||
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
|
||||
|
||||
this.protocolHandler = new ProtocolHandler(protocolMap, this, configuration, scheduledThreadPool);
|
||||
this.protocolHandler = new ProtocolHandler(protocolMap, this, scheduledThreadPool);
|
||||
|
||||
this.protocolsString = getProtocols(protocolMap);
|
||||
|
||||
|
|
|
@ -54,10 +54,12 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
|
|||
private HttpRequest httpRequest;
|
||||
private WebSocketServerHandshaker handshaker;
|
||||
private List<String> supportedProtocols;
|
||||
private int maxFramePayloadLength;
|
||||
private static final BinaryWebSocketEncoder BINARY_WEBSOCKET_ENCODER = new BinaryWebSocketEncoder();
|
||||
|
||||
public WebSocketServerHandler(List<String> supportedProtocols) {
|
||||
public WebSocketServerHandler(List<String> supportedProtocols, int maxFramePayloadLength) {
|
||||
this.supportedProtocols = supportedProtocols;
|
||||
this.maxFramePayloadLength = maxFramePayloadLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,7 +84,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
|
|||
|
||||
// Handshake
|
||||
String supportedProtocolsCSV = StringUtil.joinStringList(supportedProtocols, ",");
|
||||
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV, false);
|
||||
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV, false, maxFramePayloadLength);
|
||||
this.httpRequest = req;
|
||||
this.handshaker = wsFactory.newHandshaker(req);
|
||||
if (this.handshaker == null) {
|
||||
|
|
|
@ -546,6 +546,11 @@ available from [GitHub](http://github.com/jmesnil/stomp-websocket)
|
|||
(please see its [documentation](http://jmesnil.net/stomp-websocket/doc/)
|
||||
for a complete description).
|
||||
|
||||
The payload length of websocket frames can vary between client implementations. By default
|
||||
Apache ActiveMQ Artemis will accept frames with a payload length of 65,536. If the client
|
||||
needs to send payloads longer than this in a single frame this length can be adjusted by
|
||||
using the `stompMaxFramePayloadLength` URL parameter on the acceptor.
|
||||
|
||||
The `stomp-websockets` example shows how to configure Apache ActiveMQ Artemis server to
|
||||
have web browsers and Java applications exchanges messages on a JMS
|
||||
topic.
|
||||
|
|
Loading…
Reference in New Issue