This closes #714

This commit is contained in:
Clebert Suconic 2016-08-10 11:07:47 -04:00
commit 79d3b93195
10 changed files with 74 additions and 11 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.protocol.proton; package org.apache.activemq.artemis.core.protocol.proton;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -49,6 +50,8 @@ import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME
*/ */
public class ProtonProtocolManager implements ProtocolManager<Interceptor>, NotificationListener { public class ProtonProtocolManager implements ProtocolManager<Interceptor>, NotificationListener {
private static final List<String> websocketRegistryNames = Arrays.asList("amqp");
private final ActiveMQServer server; private final ActiveMQServer server;
private MessageConverter protonConverter; private MessageConverter protonConverter;
@ -147,6 +150,11 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
} }
@Override
public List<String> websocketSubprotocolIdentifiers() {
return websocketRegistryNames;
}
public String getPubSubPrefix() { public String getPubSubPrefix() {
return pubSubPrefix; return pubSubPrefix;
} }

View File

@ -16,6 +16,10 @@
*/ */
package org.apache.activemq.artemis.core.protocol.mqtt; package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.codec.mqtt.MqttEncoder;
@ -34,15 +38,14 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import java.util.ArrayList;
import java.util.List;
/** /**
* MQTTProtocolManager * MQTTProtocolManager
*/ */
class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterceptor,MQTTConnection> class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterceptor,MQTTConnection>
implements NotificationListener { implements NotificationListener {
private static final List<String> websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1");
private ActiveMQServer server; private ActiveMQServer server;
private MQTTLogger log = MQTTLogger.LOGGER; private MQTTLogger log = MQTTLogger.LOGGER;
@ -138,6 +141,11 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterc
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
} }
@Override
public List<String> websocketSubprotocolIdentifiers() {
return websocketRegistryNames;
}
public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) { public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection); super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection);
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.openwire; package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.InvalidClientIDException; import javax.jms.InvalidClientIDException;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -74,6 +75,8 @@ import org.apache.activemq.util.LongSequenceGenerator;
public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener { public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator(); private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
private static final IdGenerator ID_GENERATOR = new IdGenerator(); private static final IdGenerator ID_GENERATOR = new IdGenerator();
@ -269,6 +272,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
} }
@Override
public List<String> websocketSubprotocolIdentifiers() {
return websocketRegistryNames;
}
public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception { public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
String username = info.getUserName(); String username = info.getUserName();
String password = info.getPassword(); String password = info.getPassword();

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp;
import javax.security.cert.X509Certificate; import javax.security.cert.X509Certificate;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -56,9 +57,8 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
* StompProtocolManager * StompProtocolManager
*/ */
public class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> { public class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
// Constants -----------------------------------------------------
// Attributes ---------------------------------------------------- private static final List<String> websocketRegistryNames = Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp");
private final ActiveMQServer server; private final ActiveMQServer server;
@ -192,6 +192,11 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,Sto
//Todo move handshake to here //Todo move handshake to here
} }
@Override
public List<String> websocketSubprotocolIdentifiers() {
return websocketRegistryNames;
}
// Public -------------------------------------------------------- // Public --------------------------------------------------------
public boolean send(final StompConnection connection, final StompFrame frame) { public boolean send(final StompConnection connection, final StompFrame frame) {

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.protocol; package org.apache.activemq.artemis.core.protocol;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -45,7 +46,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator; import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.protocol.stomp.WebSocketServerHandler; import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ConfigurationHelper;
@ -64,6 +65,8 @@ public class ProtocolHandler {
private HttpKeepAliveRunnable httpKeepAliveRunnable; private HttpKeepAliveRunnable httpKeepAliveRunnable;
private final List<String> websocketSubprotocolIds;
public ProtocolHandler(Map<String, ProtocolManager> protocolMap, public ProtocolHandler(Map<String, ProtocolManager> protocolMap,
NettyAcceptor nettyAcceptor, NettyAcceptor nettyAcceptor,
final Map<String, Object> configuration, final Map<String, Object> configuration,
@ -72,6 +75,13 @@ public class ProtocolHandler {
this.nettyAcceptor = nettyAcceptor; this.nettyAcceptor = nettyAcceptor;
this.configuration = configuration; this.configuration = configuration;
this.scheduledThreadPool = scheduledThreadPool; this.scheduledThreadPool = scheduledThreadPool;
websocketSubprotocolIds = new ArrayList<>();
for (ProtocolManager pm : protocolMap.values()) {
if (pm.websocketSubprotocolIdentifiers() != null) {
websocketSubprotocolIds.addAll(pm.websocketSubprotocolIdentifiers());
}
}
} }
public ChannelHandler getProtocolDecoder() { public ChannelHandler getProtocolDecoder() {
@ -106,7 +116,7 @@ public class ProtocolHandler {
HttpHeaders headers = request.headers(); HttpHeaders headers = request.headers();
String upgrade = headers.get("upgrade"); String upgrade = headers.get("upgrade");
if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) { if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) {
ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler()); ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds));
ctx.pipeline().addLast(new ProtocolDecoder(false, false)); ctx.pipeline().addLast(new ProtocolDecoder(false, false));
ctx.pipeline().remove(this); ctx.pipeline().remove(this);
ctx.pipeline().remove("http-handler"); ctx.pipeline().remove("http-handler");

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.protocol.core.impl; package org.apache.activemq.artemis.core.protocol.core.impl;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -63,6 +64,8 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
private static final Logger logger = Logger.getLogger(CoreProtocolManager.class); private static final Logger logger = Logger.getLogger(CoreProtocolManager.class);
private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
private final ActiveMQServer server; private final ActiveMQServer server;
private final List<Interceptor> incomingInterceptors; private final List<Interceptor> incomingInterceptors;
@ -181,6 +184,11 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
} }
} }
@Override
public List<String> websocketSubprotocolIdentifiers() {
return websocketRegistryNames;
}
private boolean isArtemis(ActiveMQBuffer buffer) { private boolean isArtemis(ActiveMQBuffer buffer) {
return buffer.getByte(0) == 'A' && return buffer.getByte(0) == 'A' &&
buffer.getByte(1) == 'R' && buffer.getByte(1) == 'R' &&

View File

@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.artemis.core.server.protocol.stomp; package org.apache.activemq.artemis.core.server.protocol.websocket;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -38,6 +39,7 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import org.apache.activemq.artemis.utils.StringUtil;
import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static io.netty.handler.codec.http.HttpHeaders.setContentLength; import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
@ -51,8 +53,13 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
private HttpRequest httpRequest; private HttpRequest httpRequest;
private WebSocketServerHandshaker handshaker; private WebSocketServerHandshaker handshaker;
private List<String> supportedProtocols;
private static final BinaryWebSocketEncoder BINARY_WEBSOCKET_ENCODER = new BinaryWebSocketEncoder(); private static final BinaryWebSocketEncoder BINARY_WEBSOCKET_ENCODER = new BinaryWebSocketEncoder();
public WebSocketServerHandler(List<String> supportedProtocols) {
this.supportedProtocols = supportedProtocols;
}
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) { if (msg instanceof FullHttpRequest) {
@ -75,7 +82,8 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
} }
// Handshake // Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), "v10.stomp,v11.stomp", false); String supportedProtocolsCSV = StringUtil.joinStringList(supportedProtocols, ",");
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV,false);
this.httpRequest = req; this.httpRequest = req;
this.handshaker = wsFactory.newHandshaker(req); this.handshaker = wsFactory.newHandshaker(req);
if (this.handshaker == null) { if (this.handshaker == null) {

View File

@ -61,4 +61,12 @@ public interface ProtocolManager<P extends BaseInterceptor> {
boolean acceptsNoHandshake(); boolean acceptsNoHandshake();
void handshake(NettyServerConnection connection, ActiveMQBuffer buffer); void handshake(NettyServerConnection connection, ActiveMQBuffer buffer);
/**
* A list of the IANA websocket subprotocol identifiers supported by this protocol manager. These are used
* during the websocket subprotocol handshake.
*
* @return A list of subprotocol ids
*/
List<String> websocketSubprotocolIdentifiers();
} }

View File

@ -72,7 +72,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase {
}; };
Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), null); Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), new HashMap<String, ProtocolManager>());
Assert.assertTrue(acceptor instanceof NettyAcceptor); Assert.assertTrue(acceptor instanceof NettyAcceptor);
} }

View File

@ -95,7 +95,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
} }
}; };
pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()); pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, null); NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, new HashMap<String, ProtocolManager>());
addActiveMQComponent(acceptor); addActiveMQComponent(acceptor);
acceptor.start(); acceptor.start();