This commit is contained in:
Clebert Suconic 2017-09-26 15:17:08 -04:00
commit 0e655f9a8e
5 changed files with 18 additions and 10 deletions

View File

@ -251,6 +251,10 @@ public class TransportConstants {
public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L; public static final long DEFAULT_CONNECTIONS_ALLOWED = -1L;
public static final String STOMP_MAX_FRAME_PAYLOAD_LENGTH = "stompMaxFramePayloadLength";
public static final int DEFAULT_STOMP_MAX_FRAME_PAYLOAD_LENGTH = 65536;
static { static {
Set<String> allowableAcceptorKeys = new HashSet<>(); Set<String> allowableAcceptorKeys = new HashSet<>();
allowableAcceptorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME);
@ -294,6 +298,7 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER); allowableAcceptorKeys.add(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER);
allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID); allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID);
allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED); allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED);
allowableAcceptorKeys.add(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH);
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword()); allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec()); allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
allowableAcceptorKeys.add(TransportConstants.BACKLOG_PROP_NAME); allowableAcceptorKeys.add(TransportConstants.BACKLOG_PROP_NAME);

View File

@ -58,8 +58,6 @@ public class ProtocolHandler {
private NettyAcceptor nettyAcceptor; private NettyAcceptor nettyAcceptor;
private Map<String, Object> configuration;
private ScheduledExecutorService scheduledThreadPool; private ScheduledExecutorService scheduledThreadPool;
private HttpKeepAliveRunnable httpKeepAliveRunnable; private HttpKeepAliveRunnable httpKeepAliveRunnable;
@ -68,11 +66,9 @@ public class ProtocolHandler {
public ProtocolHandler(Map<String, ProtocolManager> protocolMap, public ProtocolHandler(Map<String, ProtocolManager> protocolMap,
NettyAcceptor nettyAcceptor, NettyAcceptor nettyAcceptor,
final Map<String, Object> configuration,
ScheduledExecutorService scheduledThreadPool) { ScheduledExecutorService scheduledThreadPool) {
this.protocolMap = protocolMap; this.protocolMap = protocolMap;
this.nettyAcceptor = nettyAcceptor; this.nettyAcceptor = nettyAcceptor;
this.configuration = configuration;
this.scheduledThreadPool = scheduledThreadPool; this.scheduledThreadPool = scheduledThreadPool;
websocketSubprotocolIds = new ArrayList<>(); websocketSubprotocolIds = new ArrayList<>();
@ -115,7 +111,7 @@ public class ProtocolHandler {
HttpHeaders headers = request.headers(); HttpHeaders headers = request.headers();
String upgrade = headers.get("upgrade"); String upgrade = headers.get("upgrade");
if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) { if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) {
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds)); ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds, ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MAX_FRAME_PAYLOAD_LENGTH, TransportConstants.DEFAULT_STOMP_MAX_FRAME_PAYLOAD_LENGTH, nettyAcceptor.getConfiguration())));
ctx.pipeline().addLast(new ProtocolDecoder(false, false)); ctx.pipeline().addLast(new ProtocolDecoder(false, false));
ctx.pipeline().remove(this); ctx.pipeline().remove(this);
ctx.pipeline().remove("http-handler"); ctx.pipeline().remove("http-handler");
@ -207,12 +203,12 @@ public class ProtocolHandler {
p.addLast("http-encoder", new HttpResponseEncoder()); p.addLast("http-encoder", new HttpResponseEncoder());
//create it lazily if and when we need it //create it lazily if and when we need it
if (httpKeepAliveRunnable == null) { if (httpKeepAliveRunnable == null) {
long httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, configuration); long httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, nettyAcceptor.getConfiguration());
httpKeepAliveRunnable = new HttpKeepAliveRunnable(); httpKeepAliveRunnable = new HttpKeepAliveRunnable();
Future<?> future = scheduledThreadPool.scheduleAtFixedRate(httpKeepAliveRunnable, httpServerScanPeriod, httpServerScanPeriod, TimeUnit.MILLISECONDS); Future<?> future = scheduledThreadPool.scheduleAtFixedRate(httpKeepAliveRunnable, httpServerScanPeriod, httpServerScanPeriod, TimeUnit.MILLISECONDS);
httpKeepAliveRunnable.setFuture(future); httpKeepAliveRunnable.setFuture(future);
} }
long httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, configuration); long httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, nettyAcceptor.getConfiguration());
HttpAcceptorHandler httpHandler = new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime, ctx.channel()); HttpAcceptorHandler httpHandler = new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime, ctx.channel());
ctx.pipeline().addLast("http-handler", httpHandler); ctx.pipeline().addLast("http-handler", httpHandler);
p.addLast(new ProtocolDecoder(false, true)); p.addLast(new ProtocolDecoder(false, true));

View File

@ -238,7 +238,7 @@ public class NettyAcceptor extends AbstractAcceptor {
backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration); backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration); useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
this.protocolHandler = new ProtocolHandler(protocolMap, this, configuration, scheduledThreadPool); this.protocolHandler = new ProtocolHandler(protocolMap, this, scheduledThreadPool);
this.protocolsString = getProtocols(protocolMap); this.protocolsString = getProtocols(protocolMap);

View File

@ -54,10 +54,12 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
private HttpRequest httpRequest; private HttpRequest httpRequest;
private WebSocketServerHandshaker handshaker; private WebSocketServerHandshaker handshaker;
private List<String> supportedProtocols; private List<String> supportedProtocols;
private int maxFramePayloadLength;
private static final BinaryWebSocketEncoder BINARY_WEBSOCKET_ENCODER = new BinaryWebSocketEncoder(); private static final BinaryWebSocketEncoder BINARY_WEBSOCKET_ENCODER = new BinaryWebSocketEncoder();
public WebSocketServerHandler(List<String> supportedProtocols) { public WebSocketServerHandler(List<String> supportedProtocols, int maxFramePayloadLength) {
this.supportedProtocols = supportedProtocols; this.supportedProtocols = supportedProtocols;
this.maxFramePayloadLength = maxFramePayloadLength;
} }
@Override @Override
@ -82,7 +84,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
// Handshake // Handshake
String supportedProtocolsCSV = StringUtil.joinStringList(supportedProtocols, ","); String supportedProtocolsCSV = StringUtil.joinStringList(supportedProtocols, ",");
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV, false); WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV, false, maxFramePayloadLength);
this.httpRequest = req; this.httpRequest = req;
this.handshaker = wsFactory.newHandshaker(req); this.handshaker = wsFactory.newHandshaker(req);
if (this.handshaker == null) { if (this.handshaker == null) {

View File

@ -546,6 +546,11 @@ available from [GitHub](http://github.com/jmesnil/stomp-websocket)
(please see its [documentation](http://jmesnil.net/stomp-websocket/doc/) (please see its [documentation](http://jmesnil.net/stomp-websocket/doc/)
for a complete description). for a complete description).
The payload length of websocket frames can vary between client implementations. By default
Apache ActiveMQ Artemis will accept frames with a payload length of 65,536. If the client
needs to send payloads longer than this in a single frame this length can be adjusted by
using the `stompMaxFramePayloadLength` URL parameter on the acceptor.
The `stomp-websockets` example shows how to configure Apache ActiveMQ Artemis server to The `stomp-websockets` example shows how to configure Apache ActiveMQ Artemis server to
have web browsers and Java applications exchanges messages on a JMS have web browsers and Java applications exchanges messages on a JMS
topic. topic.