diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml index ee8c480ec3..9e179a1a3a 100755 --- a/activemq-http/pom.xml +++ b/activemq-http/pom.xml @@ -83,6 +83,11 @@ test-jar test + + ${project.groupId} + activemq-jaas + test + junit junit diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java index 21c8f5e95f..65d12c216f 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractMQTTSocket.java @@ -41,6 +41,7 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT protected BrokerService brokerService; protected volatile int receiveCounter; protected final String remoteAddress; + protected X509Certificate[] peerCertificates; public AbstractMQTTSocket(String remoteAddress) { super(); @@ -111,7 +112,12 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT @Override public X509Certificate[] getPeerCertificates() { - return new X509Certificate[0]; + return peerCertificates; + } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) { + this.peerCertificates = certificates; } @Override diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java index 4ffa6c9ad3..ebd949d6bf 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.ws; import java.io.IOException; +import java.security.cert.X509Certificate; import java.util.concurrent.CountDownLatch; import org.apache.activemq.command.Command; @@ -46,6 +47,7 @@ public abstract class AbstractStompSocket extends TransportSupport implements St protected final StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat); protected volatile int receiveCounter; protected final String remoteAddress; + protected X509Certificate[] certificates; public AbstractStompSocket(String remoteAddress) { @@ -118,7 +120,6 @@ public abstract class AbstractStompSocket extends TransportSupport implements St //----- Internal implementation ------------------------------------------// protected void processStompFrame(String data) { - if (!transportStartedAtLeastOnce()) { LOG.debug("Waiting for StompSocket to be properly started..."); try { @@ -135,7 +136,9 @@ public abstract class AbstractStompSocket extends TransportSupport implements St if (data.equals("\n")) { stompInactivityMonitor.onCommand(new KeepAliveInfo()); } else { - protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8")))); + StompFrame frame = (StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8"))); + frame.setTransportContext(getCertificates()); + protocolConverter.onStompCommand(frame); } } } catch (Exception e) { @@ -146,4 +149,12 @@ public abstract class AbstractStompSocket extends TransportSupport implements St private boolean transportStartedAtLeastOnce() { return socketTransportStarted.getCount() == 0; } + + public X509Certificate[] getCertificates() { + return certificates; + } + + public void setCertificates(X509Certificate[] certificates) { + this.certificates = certificates; + } } diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java similarity index 100% rename from activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java rename to activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java index f84f02c359..981c2ff80f 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/WSServlet.java @@ -64,14 +64,17 @@ public class WSServlet extends WebSocketServlet { WebSocketListener socket; boolean isMqtt = false; for (String subProtocol : req.getSubProtocols()) { - subProtocol.startsWith("mqtt"); - isMqtt = true; + if (subProtocol.startsWith("mqtt")) { + isMqtt = true; + } } if (isMqtt) { socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); resp.setAcceptedSubProtocol("mqtt"); + ((MQTTSocket)socket).setPeerCertificates(req.getCertificates()); } else { socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); + ((StompSocket)socket).setCertificates(req.getCertificates()); resp.setAcceptedSubProtocol("stomp"); } listener.onAccept((Transport) socket); diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportServer.java b/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportServer.java index d73fd05eaa..2c19577f43 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportServer.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/wss/WSSTransportServer.java @@ -29,4 +29,8 @@ public class WSSTransportServer extends WSTransportServer { this.socketConnectorFactory = new SecureSocketConnectorFactory(context); } + @Override + public boolean isSslServer() { + return true; + } } diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java index 30fc0a6480..bd5377cf1b 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java @@ -94,8 +94,14 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe connection.getRemote().sendBytes(ByteBuffer.wrap(payload.data)); MQTTFrame incoming = receive(15, TimeUnit.SECONDS); + if (incoming == null || incoming.messageType() != CONNACK.TYPE) { throw new IOException("Failed to connect to remote service."); + } else { + CONNACK connack = new CONNACK().decode(incoming); + if (!connack.code().equals(CONNACK.Code.CONNECTION_ACCEPTED)) { + throw new IOException("Failed to connect to remote service: " + connack.code()); + } } } diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSSTransportTest.java new file mode 100644 index 0000000000..bf19259b85 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSSTransportTest.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws; + +public class MQTTWSSTransportTest extends MQTTWSTransportTest { + + @Override + protected String getWSConnectorURI() { + return "wss://localhost:61623"; + } + +} diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java index a74a160e57..f48a110a62 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.util.Wait; +import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.fusesource.hawtbuf.UTF8Buffer; @@ -47,7 +48,7 @@ public class MQTTWSTransportTest extends WSTransportTestSupport { public void setUp() throws Exception { super.setUp(); - wsClient = new WebSocketClient(); + wsClient = new WebSocketClient(new SslContextFactory(true)); wsClient.start(); request = new ClientUpgradeRequest(); diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSSTransportTest.java new file mode 100644 index 0000000000..a00eac1a5d --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSSTransportTest.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.ws; + +public class StompWSSTransportTest extends StompWSTransportTest { + + @Override + protected String getWSConnectorURI() { + return "wss://localhost:61623"; + } + +} diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java index 45bfa7f5f4..f84d7842db 100644 --- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java +++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.transport.stomp.Stomp; import org.apache.activemq.transport.stomp.StompFrame; import org.apache.activemq.util.Wait; +import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.junit.After; import org.junit.Before; @@ -50,7 +51,7 @@ public class StompWSTransportTest extends WSTransportTestSupport { super.setUp(); wsStompConnection = new StompWSConnection(); - wsClient = new WebSocketClient(); + wsClient = new WebSocketClient(new SslContextFactory(true)); wsClient.start(); wsClient.connect(wsStompConnection, wsConnectUri); diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportNeedClientAuthTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportNeedClientAuthTest.java new file mode 100644 index 0000000000..0849361bd7 --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/wss/WSSTransportNeedClientAuthTest.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.wss; + +import junit.framework.Assert; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompFrame; +import org.apache.activemq.transport.ws.MQTTWSConnection; +import org.apache.activemq.transport.ws.StompWSConnection; +import org.apache.activemq.util.Wait; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.client.io.ConnectPromise; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static junit.framework.TestCase.assertNotNull; +import static junit.framework.TestCase.assertTrue; + +public class WSSTransportNeedClientAuthTest { + + public static final String KEYSTORE_TYPE = "jks"; + public static final String PASSWORD = "password"; + public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore"; + public static final String KEYSTORE = "src/test/resources/server.keystore"; + + + private BrokerService broker; + + @Before + public void setUp() throws Exception { + broker = BrokerFactory.createBroker("xbean:activemq-https-need-client-auth.xml"); + broker.setPersistent(false); + broker.start(); + broker.waitUntilStarted(); + + // these are used for the client side... for the server side, the SSL context + // will be configured through the spring beans + System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE); + System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStore", KEYSTORE); + System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testStompNeedClientAuth() throws Exception { + StompWSConnection wsStompConnection = new StompWSConnection(); + System.out.println("starting connection"); + SslContextFactory factory = new SslContextFactory(); + factory.setKeyStorePath(KEYSTORE); + factory.setKeyStorePassword(PASSWORD); + factory.setKeyStoreType(KEYSTORE_TYPE); + factory.setTrustStorePath(TRUST_KEYSTORE); + factory.setTrustStorePassword(PASSWORD); + factory.setTrustStoreType(KEYSTORE_TYPE); + WebSocketClient wsClient = new WebSocketClient(factory); + wsClient.start(); + + Future connected = wsClient.connect(wsStompConnection, new URI("wss://localhost:61618")); + Session sess = connected.get(30, TimeUnit.SECONDS); + + String connectFrame = "STOMP\n" + + "login:system\n" + + "passcode:manager\n" + + "accept-version:1.2\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + + wsStompConnection.sendRawFrame(connectFrame); + + String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS); + assertNotNull(incoming); + assertTrue(incoming.startsWith("CONNECTED")); + + wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT)); + wsStompConnection.close(); + + } + + @Test + public void testMQTTNeedClientAuth() throws Exception { + SslContextFactory factory = new SslContextFactory(); + factory.setKeyStorePath(KEYSTORE); + factory.setKeyStorePassword(PASSWORD); + factory.setKeyStoreType(KEYSTORE_TYPE); + factory.setTrustStorePath(TRUST_KEYSTORE); + factory.setTrustStorePassword(PASSWORD); + factory.setTrustStoreType(KEYSTORE_TYPE); + WebSocketClient wsClient = new WebSocketClient(factory); + wsClient.start(); + + ClientUpgradeRequest request = new ClientUpgradeRequest(); + request.setSubProtocols("mqttv3.1"); + + MQTTWSConnection wsMQTTConnection = new MQTTWSConnection(); + + wsClient.connect(wsMQTTConnection, new URI("wss://localhost:61618"), request); + if (!wsMQTTConnection.awaitConnection(30, TimeUnit.SECONDS)) { + throw new IOException("Could not connect to MQTT WS endpoint"); + } + + wsMQTTConnection.connect(); + + assertTrue("Client not connected", wsMQTTConnection.isConnected()); + + wsMQTTConnection.disconnect(); + wsMQTTConnection.close(); + + } + +} diff --git a/activemq-http/src/test/resources/activemq-https-need-client-auth.xml b/activemq-http/src/test/resources/activemq-https-need-client-auth.xml index 899385be63..ddc7b8b8f0 100644 --- a/activemq-http/src/test/resources/activemq-https-need-client-auth.xml +++ b/activemq-http/src/test/resources/activemq-https-need-client-auth.xml @@ -32,9 +32,15 @@ trustStorePassword="password" /> + + + + + diff --git a/activemq-http/src/test/resources/dns.properties b/activemq-http/src/test/resources/dns.properties new file mode 100644 index 0000000000..d8c484da51 --- /dev/null +++ b/activemq-http/src/test/resources/dns.properties @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +client=CN=localhost, OU=activemq.org, O=activemq.org, L=Unknown, ST=Unknown, C=Unknown diff --git a/activemq-http/src/test/resources/groups.properties b/activemq-http/src/test/resources/groups.properties new file mode 100644 index 0000000000..4171c5ffe4 --- /dev/null +++ b/activemq-http/src/test/resources/groups.properties @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +admins=system,client +guests=guest diff --git a/activemq-http/src/test/resources/login.config b/activemq-http/src/test/resources/login.config new file mode 100644 index 0000000000..c3d87c1769 --- /dev/null +++ b/activemq-http/src/test/resources/login.config @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +activemq-domain { + + org.apache.activemq.jaas.PropertiesLoginModule requisite + debug=true + org.apache.activemq.jaas.properties.user="users.properties" + org.apache.activemq.jaas.properties.group="groups.properties"; +}; + +activemq-ssl-domain { + org.apache.activemq.jaas.TextFileCertificateLoginModule required + debug=true + org.apache.activemq.jaas.textfiledn.user="dns.properties" + org.apache.activemq.jaas.textfiledn.group="groups.properties"; +}; diff --git a/activemq-http/src/test/resources/users.properties b/activemq-http/src/test/resources/users.properties new file mode 100644 index 0000000000..2915bdb06a --- /dev/null +++ b/activemq-http/src/test/resources/users.properties @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +system=manager +guest=password \ No newline at end of file diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java index bed558533d..c69c5936dd 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransport.java @@ -33,6 +33,8 @@ public interface MQTTTransport { public X509Certificate[] getPeerCertificates(); + public void setPeerCertificates(X509Certificate[] certificates); + public void onException(IOException error); public MQTTInactivityMonitor getInactivityMonitor(); diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java index b15ff8b381..ee868b9b69 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTTransportFilter.java @@ -279,4 +279,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor public void setMaxFrameSize(int maxFrameSize) { wireFormat.setMaxFrameSize(maxFrameSize); } + + @Override + public void setPeerCertificates(X509Certificate[] certificates) {} }