ARTEMIS-677 Support websocket subprotocol handshakes

This commit is contained in:
Martyn Taylor 2016-08-10 12:16:39 +01:00 committed by Clebert Suconic
parent 2b710229e2
commit 7afd0fb028
10 changed files with 74 additions and 11 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.proton;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
@ -49,6 +50,8 @@ import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME
*/
public class ProtonProtocolManager implements ProtocolManager<Interceptor>, NotificationListener {
private static final List<String> websocketRegistryNames = Arrays.asList("amqp");
private final ActiveMQServer server;
private MessageConverter protonConverter;
@ -147,6 +150,11 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}
@Override
public List<String> websocketSubprotocolIdentifiers() {
return websocketRegistryNames;
}
public String getPubSubPrefix() {
return pubSubPrefix;
}

View File

@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
@ -34,15 +38,14 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import java.util.ArrayList;
import java.util.List;
/**
* MQTTProtocolManager
*/
class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterceptor,MQTTConnection>
implements NotificationListener {
private static final List<String> websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1");
private ActiveMQServer server;
private MQTTLogger log = MQTTLogger.LOGGER;
@ -138,6 +141,11 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterc
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}
@Override
public List<String> websocketSubprotocolIdentifiers() {
return websocketRegistryNames;
}
public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection);
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.InvalidClientIDException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -74,6 +75,8 @@ import org.apache.activemq.util.LongSequenceGenerator;
public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
private static final IdGenerator ID_GENERATOR = new IdGenerator();
@ -269,6 +272,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}
@Override
public List<String> websocketSubprotocolIdentifiers() {
return websocketRegistryNames;
}
public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
String username = info.getUserName();
String password = info.getPassword();

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp;
import javax.security.cert.X509Certificate;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -56,9 +57,8 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
* StompProtocolManager
*/
public class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
private static final List<String> websocketRegistryNames = Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp");
private final ActiveMQServer server;
@ -192,6 +192,11 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,Sto
//Todo move handshake to here
}
@Override
public List<String> websocketSubprotocolIdentifiers() {
return websocketRegistryNames;
}
// Public --------------------------------------------------------
public boolean send(final StompConnection connection, final StompFrame frame) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -45,7 +46,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.protocol.stomp.WebSocketServerHandler;
import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
@ -64,6 +65,8 @@ public class ProtocolHandler {
private HttpKeepAliveRunnable httpKeepAliveRunnable;
private final List<String> websocketSubprotocolIds;
public ProtocolHandler(Map<String, ProtocolManager> protocolMap,
NettyAcceptor nettyAcceptor,
final Map<String, Object> configuration,
@ -72,6 +75,13 @@ public class ProtocolHandler {
this.nettyAcceptor = nettyAcceptor;
this.configuration = configuration;
this.scheduledThreadPool = scheduledThreadPool;
websocketSubprotocolIds = new ArrayList<>();
for (ProtocolManager pm : protocolMap.values()) {
if (pm.websocketSubprotocolIdentifiers() != null) {
websocketSubprotocolIds.addAll(pm.websocketSubprotocolIdentifiers());
}
}
}
public ChannelHandler getProtocolDecoder() {
@ -106,7 +116,7 @@ public class ProtocolHandler {
HttpHeaders headers = request.headers();
String upgrade = headers.get("upgrade");
if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) {
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler());
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds));
ctx.pipeline().addLast(new ProtocolDecoder(false, false));
ctx.pipeline().remove(this);
ctx.pipeline().remove("http-handler");

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -63,6 +64,8 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
private static final Logger logger = Logger.getLogger(CoreProtocolManager.class);
private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
private final ActiveMQServer server;
private final List<Interceptor> incomingInterceptors;
@ -181,6 +184,11 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
}
}
@Override
public List<String> websocketSubprotocolIdentifiers() {
return websocketRegistryNames;
}
private boolean isArtemis(ActiveMQBuffer buffer) {
return buffer.getByte(0) == 'A' &&
buffer.getByte(1) == 'R' &&

View File

@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.protocol.stomp;
package org.apache.activemq.artemis.core.server.protocol.websocket;
import java.nio.charset.StandardCharsets;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
@ -38,6 +39,7 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import org.apache.activemq.artemis.utils.StringUtil;
import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
@ -51,8 +53,13 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
private HttpRequest httpRequest;
private WebSocketServerHandshaker handshaker;
private List<String> supportedProtocols;
private static final BinaryWebSocketEncoder BINARY_WEBSOCKET_ENCODER = new BinaryWebSocketEncoder();
public WebSocketServerHandler(List<String> supportedProtocols) {
this.supportedProtocols = supportedProtocols;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
@ -75,7 +82,8 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
}
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), "v10.stomp,v11.stomp", false);
String supportedProtocolsCSV = StringUtil.joinStringList(supportedProtocols, ",");
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV,false);
this.httpRequest = req;
this.handshaker = wsFactory.newHandshaker(req);
if (this.handshaker == null) {

View File

@ -61,4 +61,12 @@ public interface ProtocolManager<P extends BaseInterceptor> {
boolean acceptsNoHandshake();
void handshake(NettyServerConnection connection, ActiveMQBuffer buffer);
/**
* A list of the IANA websocket subprotocol identifiers supported by this protocol manager. These are used
* during the websocket subprotocol handshake.
*
* @return A list of subprotocol ids
*/
List<String> websocketSubprotocolIdentifiers();
}

View File

@ -72,7 +72,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase {
};
Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), null);
Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), new HashMap<String, ProtocolManager>());
Assert.assertTrue(acceptor instanceof NettyAcceptor);
}

View File

@ -95,7 +95,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
}
};
pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, null);
NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, new HashMap<String, ProtocolManager>());
addActiveMQComponent(acceptor);
acceptor.start();