https://issues.apache.org/jira/browse/AMQ-6029 - wss transport with certificate authentication

This commit is contained in:
Dejan Bosanac 2015-10-30 11:42:17 +01:00
parent 8d63083dff
commit f8bfff0bc8
19 changed files with 335 additions and 7 deletions

View File

@ -83,6 +83,11 @@
<type>test-jar</type> <type>test-jar</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-jaas</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -41,6 +41,7 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT
protected BrokerService brokerService; protected BrokerService brokerService;
protected volatile int receiveCounter; protected volatile int receiveCounter;
protected final String remoteAddress; protected final String remoteAddress;
protected X509Certificate[] peerCertificates;
public AbstractMQTTSocket(String remoteAddress) { public AbstractMQTTSocket(String remoteAddress) {
super(); super();
@ -111,7 +112,12 @@ public abstract class AbstractMQTTSocket extends TransportSupport implements MQT
@Override @Override
public X509Certificate[] getPeerCertificates() { public X509Certificate[] getPeerCertificates() {
return new X509Certificate[0]; return peerCertificates;
}
@Override
public void setPeerCertificates(X509Certificate[] certificates) {
this.peerCertificates = certificates;
} }
@Override @Override

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.ws; package org.apache.activemq.transport.ws;
import java.io.IOException; import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.apache.activemq.command.Command; import org.apache.activemq.command.Command;
@ -46,6 +47,7 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
protected final StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat); protected final StompInactivityMonitor stompInactivityMonitor = new StompInactivityMonitor(this, wireFormat);
protected volatile int receiveCounter; protected volatile int receiveCounter;
protected final String remoteAddress; protected final String remoteAddress;
protected X509Certificate[] certificates;
public AbstractStompSocket(String remoteAddress) { public AbstractStompSocket(String remoteAddress) {
@ -118,7 +120,6 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
//----- Internal implementation ------------------------------------------// //----- Internal implementation ------------------------------------------//
protected void processStompFrame(String data) { protected void processStompFrame(String data) {
if (!transportStartedAtLeastOnce()) { if (!transportStartedAtLeastOnce()) {
LOG.debug("Waiting for StompSocket to be properly started..."); LOG.debug("Waiting for StompSocket to be properly started...");
try { try {
@ -135,7 +136,9 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
if (data.equals("\n")) { if (data.equals("\n")) {
stompInactivityMonitor.onCommand(new KeepAliveInfo()); stompInactivityMonitor.onCommand(new KeepAliveInfo());
} else { } else {
protocolConverter.onStompCommand((StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8")))); StompFrame frame = (StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8")));
frame.setTransportContext(getCertificates());
protocolConverter.onStompCommand(frame);
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -146,4 +149,12 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
private boolean transportStartedAtLeastOnce() { private boolean transportStartedAtLeastOnce() {
return socketTransportStarted.getCount() == 0; return socketTransportStarted.getCount() == 0;
} }
public X509Certificate[] getCertificates() {
return certificates;
}
public void setCertificates(X509Certificate[] certificates) {
this.certificates = certificates;
}
} }

View File

@ -64,14 +64,17 @@ public class WSServlet extends WebSocketServlet {
WebSocketListener socket; WebSocketListener socket;
boolean isMqtt = false; boolean isMqtt = false;
for (String subProtocol : req.getSubProtocols()) { for (String subProtocol : req.getSubProtocols()) {
subProtocol.startsWith("mqtt"); if (subProtocol.startsWith("mqtt")) {
isMqtt = true; isMqtt = true;
} }
}
if (isMqtt) { if (isMqtt) {
socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); socket = new MQTTSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
resp.setAcceptedSubProtocol("mqtt"); resp.setAcceptedSubProtocol("mqtt");
((MQTTSocket)socket).setPeerCertificates(req.getCertificates());
} else { } else {
socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest())); socket = new StompSocket(HttpTransportUtils.generateWsRemoteAddress(req.getHttpServletRequest()));
((StompSocket)socket).setCertificates(req.getCertificates());
resp.setAcceptedSubProtocol("stomp"); resp.setAcceptedSubProtocol("stomp");
} }
listener.onAccept((Transport) socket); listener.onAccept((Transport) socket);

View File

@ -29,4 +29,8 @@ public class WSSTransportServer extends WSTransportServer {
this.socketConnectorFactory = new SecureSocketConnectorFactory(context); this.socketConnectorFactory = new SecureSocketConnectorFactory(context);
} }
@Override
public boolean isSslServer() {
return true;
}
} }

View File

@ -94,8 +94,14 @@ public class MQTTWSConnection extends WebSocketAdapter implements WebSocketListe
connection.getRemote().sendBytes(ByteBuffer.wrap(payload.data)); connection.getRemote().sendBytes(ByteBuffer.wrap(payload.data));
MQTTFrame incoming = receive(15, TimeUnit.SECONDS); MQTTFrame incoming = receive(15, TimeUnit.SECONDS);
if (incoming == null || incoming.messageType() != CONNACK.TYPE) { if (incoming == null || incoming.messageType() != CONNACK.TYPE) {
throw new IOException("Failed to connect to remote service."); throw new IOException("Failed to connect to remote service.");
} else {
CONNACK connack = new CONNACK().decode(incoming);
if (!connack.code().equals(CONNACK.Code.CONNECTION_ACCEPTED)) {
throw new IOException("Failed to connect to remote service: " + connack.code());
}
} }
} }

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.ws;
public class MQTTWSSTransportTest extends MQTTWSTransportTest {
@Override
protected String getWSConnectorURI() {
return "wss://localhost:61623";
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.hawtbuf.UTF8Buffer;
@ -47,7 +48,7 @@ public class MQTTWSTransportTest extends WSTransportTestSupport {
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
wsClient = new WebSocketClient(); wsClient = new WebSocketClient(new SslContextFactory(true));
wsClient.start(); wsClient.start();
request = new ClientUpgradeRequest(); request = new ClientUpgradeRequest();

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.ws;
public class StompWSSTransportTest extends StompWSTransportTest {
@Override
protected String getWSConnectorURI() {
return "wss://localhost:61623";
}
}

View File

@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.stomp.Stomp; import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompFrame; import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -50,7 +51,7 @@ public class StompWSTransportTest extends WSTransportTestSupport {
super.setUp(); super.setUp();
wsStompConnection = new StompWSConnection(); wsStompConnection = new StompWSConnection();
wsClient = new WebSocketClient(); wsClient = new WebSocketClient(new SslContextFactory(true));
wsClient.start(); wsClient.start();
wsClient.connect(wsStompConnection, wsConnectUri); wsClient.connect(wsStompConnection, wsConnectUri);

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.wss;
import junit.framework.Assert;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompFrame;
import org.apache.activemq.transport.ws.MQTTWSConnection;
import org.apache.activemq.transport.ws.StompWSConnection;
import org.apache.activemq.util.Wait;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.io.ConnectPromise;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static junit.framework.TestCase.assertNotNull;
import static junit.framework.TestCase.assertTrue;
public class WSSTransportNeedClientAuthTest {
public static final String KEYSTORE_TYPE = "jks";
public static final String PASSWORD = "password";
public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
public static final String KEYSTORE = "src/test/resources/server.keystore";
private BrokerService broker;
@Before
public void setUp() throws Exception {
broker = BrokerFactory.createBroker("xbean:activemq-https-need-client-auth.xml");
broker.setPersistent(false);
broker.start();
broker.waitUntilStarted();
// these are used for the client side... for the server side, the SSL context
// will be configured through the <sslContext> spring beans
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", KEYSTORE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
}
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}
@Test
public void testStompNeedClientAuth() throws Exception {
StompWSConnection wsStompConnection = new StompWSConnection();
System.out.println("starting connection");
SslContextFactory factory = new SslContextFactory();
factory.setKeyStorePath(KEYSTORE);
factory.setKeyStorePassword(PASSWORD);
factory.setKeyStoreType(KEYSTORE_TYPE);
factory.setTrustStorePath(TRUST_KEYSTORE);
factory.setTrustStorePassword(PASSWORD);
factory.setTrustStoreType(KEYSTORE_TYPE);
WebSocketClient wsClient = new WebSocketClient(factory);
wsClient.start();
Future<Session> connected = wsClient.connect(wsStompConnection, new URI("wss://localhost:61618"));
Session sess = connected.get(30, TimeUnit.SECONDS);
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.2\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
wsStompConnection.sendRawFrame(connectFrame);
String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
assertNotNull(incoming);
assertTrue(incoming.startsWith("CONNECTED"));
wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
wsStompConnection.close();
}
@Test
public void testMQTTNeedClientAuth() throws Exception {
SslContextFactory factory = new SslContextFactory();
factory.setKeyStorePath(KEYSTORE);
factory.setKeyStorePassword(PASSWORD);
factory.setKeyStoreType(KEYSTORE_TYPE);
factory.setTrustStorePath(TRUST_KEYSTORE);
factory.setTrustStorePassword(PASSWORD);
factory.setTrustStoreType(KEYSTORE_TYPE);
WebSocketClient wsClient = new WebSocketClient(factory);
wsClient.start();
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols("mqttv3.1");
MQTTWSConnection wsMQTTConnection = new MQTTWSConnection();
wsClient.connect(wsMQTTConnection, new URI("wss://localhost:61618"), request);
if (!wsMQTTConnection.awaitConnection(30, TimeUnit.SECONDS)) {
throw new IOException("Could not connect to MQTT WS endpoint");
}
wsMQTTConnection.connect();
assertTrue("Client not connected", wsMQTTConnection.isConnected());
wsMQTTConnection.disconnect();
wsMQTTConnection.close();
}
}

View File

@ -32,9 +32,15 @@
trustStorePassword="password" /> trustStorePassword="password" />
</amq:sslContext> </amq:sslContext>
<amq:plugins>
<amq:jaasDualAuthenticationPlugin configuration="activemq-domain" sslConfiguration="activemq-ssl-domain"/>
</amq:plugins>
<amq:transportConnectors> <amq:transportConnectors>
<amq:transportConnector name="https" <amq:transportConnector name="https"
uri="https://localhost:8161?transport.needClientAuth=true" /> uri="https://localhost:8161?transport.needClientAuth=true" />
<amq:transportConnector name="wss"
uri="wss://localhost:61618?transport.needClientAuth=true"/>
</amq:transportConnectors> </amq:transportConnectors>
</amq:broker> </amq:broker>

View File

@ -0,0 +1,17 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
client=CN=localhost, OU=activemq.org, O=activemq.org, L=Unknown, ST=Unknown, C=Unknown

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
admins=system,client
guests=guest

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
activemq-domain {
org.apache.activemq.jaas.PropertiesLoginModule requisite
debug=true
org.apache.activemq.jaas.properties.user="users.properties"
org.apache.activemq.jaas.properties.group="groups.properties";
};
activemq-ssl-domain {
org.apache.activemq.jaas.TextFileCertificateLoginModule required
debug=true
org.apache.activemq.jaas.textfiledn.user="dns.properties"
org.apache.activemq.jaas.textfiledn.group="groups.properties";
};

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
system=manager
guest=password

View File

@ -33,6 +33,8 @@ public interface MQTTTransport {
public X509Certificate[] getPeerCertificates(); public X509Certificate[] getPeerCertificates();
public void setPeerCertificates(X509Certificate[] certificates);
public void onException(IOException error); public void onException(IOException error);
public MQTTInactivityMonitor getInactivityMonitor(); public MQTTInactivityMonitor getInactivityMonitor();

View File

@ -279,4 +279,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
public void setMaxFrameSize(int maxFrameSize) { public void setMaxFrameSize(int maxFrameSize) {
wireFormat.setMaxFrameSize(maxFrameSize); wireFormat.setMaxFrameSize(maxFrameSize);
} }
@Override
public void setPeerCertificates(X509Certificate[] certificates) {}
} }