mirror of https://github.com/apache/activemq.git
Return the right code when a client connects with bad credentials.
This commit is contained in:
parent
0ebbd5d974
commit
3653f81b5b
|
@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.zip.DataFormatException;
|
import java.util.zip.DataFormatException;
|
||||||
import java.util.zip.Inflater;
|
import java.util.zip.Inflater;
|
||||||
|
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
|
@ -37,7 +38,31 @@ import org.apache.activemq.broker.region.RegionBroker;
|
||||||
import org.apache.activemq.broker.region.Subscription;
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
import org.apache.activemq.broker.region.TopicRegion;
|
import org.apache.activemq.broker.region.TopicRegion;
|
||||||
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
|
import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
|
||||||
import org.apache.activemq.command.*;
|
import org.apache.activemq.command.ActiveMQBytesMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQMapMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.command.Command;
|
||||||
|
import org.apache.activemq.command.ConnectionError;
|
||||||
|
import org.apache.activemq.command.ConnectionId;
|
||||||
|
import org.apache.activemq.command.ConnectionInfo;
|
||||||
|
import org.apache.activemq.command.ConsumerId;
|
||||||
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
|
import org.apache.activemq.command.ExceptionResponse;
|
||||||
|
import org.apache.activemq.command.MessageAck;
|
||||||
|
import org.apache.activemq.command.MessageDispatch;
|
||||||
|
import org.apache.activemq.command.MessageId;
|
||||||
|
import org.apache.activemq.command.ProducerId;
|
||||||
|
import org.apache.activemq.command.ProducerInfo;
|
||||||
|
import org.apache.activemq.command.RemoveInfo;
|
||||||
|
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||||
|
import org.apache.activemq.command.Response;
|
||||||
|
import org.apache.activemq.command.SessionId;
|
||||||
|
import org.apache.activemq.command.SessionInfo;
|
||||||
|
import org.apache.activemq.command.ShutdownInfo;
|
||||||
|
import org.apache.activemq.command.SubscriptionInfo;
|
||||||
import org.apache.activemq.store.PersistenceAdapterSupport;
|
import org.apache.activemq.store.PersistenceAdapterSupport;
|
||||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||||
import org.apache.activemq.util.ByteSequence;
|
import org.apache.activemq.util.ByteSequence;
|
||||||
|
@ -49,7 +74,21 @@ import org.fusesource.hawtbuf.Buffer;
|
||||||
import org.fusesource.hawtbuf.UTF8Buffer;
|
import org.fusesource.hawtbuf.UTF8Buffer;
|
||||||
import org.fusesource.mqtt.client.QoS;
|
import org.fusesource.mqtt.client.QoS;
|
||||||
import org.fusesource.mqtt.client.Topic;
|
import org.fusesource.mqtt.client.Topic;
|
||||||
import org.fusesource.mqtt.codec.*;
|
import org.fusesource.mqtt.codec.CONNACK;
|
||||||
|
import org.fusesource.mqtt.codec.CONNECT;
|
||||||
|
import org.fusesource.mqtt.codec.DISCONNECT;
|
||||||
|
import org.fusesource.mqtt.codec.MQTTFrame;
|
||||||
|
import org.fusesource.mqtt.codec.PINGREQ;
|
||||||
|
import org.fusesource.mqtt.codec.PINGRESP;
|
||||||
|
import org.fusesource.mqtt.codec.PUBACK;
|
||||||
|
import org.fusesource.mqtt.codec.PUBCOMP;
|
||||||
|
import org.fusesource.mqtt.codec.PUBLISH;
|
||||||
|
import org.fusesource.mqtt.codec.PUBREC;
|
||||||
|
import org.fusesource.mqtt.codec.PUBREL;
|
||||||
|
import org.fusesource.mqtt.codec.SUBACK;
|
||||||
|
import org.fusesource.mqtt.codec.SUBSCRIBE;
|
||||||
|
import org.fusesource.mqtt.codec.UNSUBACK;
|
||||||
|
import org.fusesource.mqtt.codec.UNSUBSCRIBE;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -253,7 +292,11 @@ public class MQTTProtocolConverter {
|
||||||
Throwable exception = ((ExceptionResponse) response).getException();
|
Throwable exception = ((ExceptionResponse) response).getException();
|
||||||
//let the client know
|
//let the client know
|
||||||
CONNACK ack = new CONNACK();
|
CONNACK ack = new CONNACK();
|
||||||
|
if (exception instanceof SecurityException) {
|
||||||
|
ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
|
||||||
|
} else {
|
||||||
ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
||||||
|
}
|
||||||
getMQTTTransport().sendToMQTT(ack.encode());
|
getMQTTTransport().sendToMQTT(ack.encode());
|
||||||
getMQTTTransport().onException(IOExceptionSupport.create(exception));
|
getMQTTTransport().onException(IOExceptionSupport.create(exception));
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -0,0 +1,388 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.transport.mqtt;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.ProtectionDomain;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.management.MalformedObjectNameException;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerPlugin;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.broker.jmx.BrokerViewMBean;
|
||||||
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
|
import org.apache.activemq.broker.jmx.TopicViewMBean;
|
||||||
|
import org.apache.activemq.filter.DestinationMapEntry;
|
||||||
|
import org.apache.activemq.security.AuthenticationUser;
|
||||||
|
import org.apache.activemq.security.AuthorizationEntry;
|
||||||
|
import org.apache.activemq.security.AuthorizationPlugin;
|
||||||
|
import org.apache.activemq.security.DefaultAuthorizationMap;
|
||||||
|
import org.apache.activemq.security.SimpleAuthenticationPlugin;
|
||||||
|
import org.apache.activemq.security.TempDestinationAuthorizationEntry;
|
||||||
|
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
|
||||||
|
import org.apache.activemq.transport.mqtt.util.ResourceLoadingSslContext;
|
||||||
|
import org.fusesource.mqtt.client.MQTT;
|
||||||
|
import org.fusesource.mqtt.client.Tracer;
|
||||||
|
import org.fusesource.mqtt.codec.MQTTFrame;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class MQTTTestSupport {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
|
||||||
|
|
||||||
|
protected BrokerService brokerService;
|
||||||
|
protected int port;
|
||||||
|
protected int sslPort;
|
||||||
|
protected int nioPort;
|
||||||
|
protected int nioSslPort;
|
||||||
|
protected String jmsUri = "vm://localhost";
|
||||||
|
protected ActiveMQConnectionFactory cf;
|
||||||
|
protected LinkedList<Throwable> exceptions = new LinkedList<Throwable>();
|
||||||
|
protected int numberOfMessages;
|
||||||
|
|
||||||
|
public static final int AT_MOST_ONCE = 0;
|
||||||
|
public static final int AT_LEAST_ONCE = 1;
|
||||||
|
public static final int EXACTLY_ONCE = 2;
|
||||||
|
|
||||||
|
@Rule public TestName name = new TestName();
|
||||||
|
|
||||||
|
public File basedir() throws IOException {
|
||||||
|
ProtectionDomain protectionDomain = getClass().getProtectionDomain();
|
||||||
|
return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
final MQTTTestSupport s = new MQTTTestSupport();
|
||||||
|
|
||||||
|
s.sslPort = 5675;
|
||||||
|
s.port = 5676;
|
||||||
|
s.nioPort = 5677;
|
||||||
|
s.nioSslPort = 5678;
|
||||||
|
|
||||||
|
s.startBroker();
|
||||||
|
while(true) {
|
||||||
|
Thread.sleep(100000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name.getMethodName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
exceptions.clear();
|
||||||
|
numberOfMessages = 1000;
|
||||||
|
startBroker();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
stopBroker();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void startBroker() throws Exception {
|
||||||
|
|
||||||
|
createBroker();
|
||||||
|
|
||||||
|
applyBrokerPolicies();
|
||||||
|
applyMemoryLimitPolicy();
|
||||||
|
|
||||||
|
// Setup SSL context...
|
||||||
|
File keyStore = new File(basedir(), "src/test/resources/server.keystore");
|
||||||
|
File trustStore = new File(basedir(), "src/test/resources/client.keystore");
|
||||||
|
|
||||||
|
final ResourceLoadingSslContext sslContext = new ResourceLoadingSslContext();
|
||||||
|
sslContext.setKeyStore(keyStore.getCanonicalPath());
|
||||||
|
sslContext.setKeyStorePassword("password");
|
||||||
|
sslContext.setTrustStore(trustStore.getCanonicalPath());
|
||||||
|
sslContext.setTrustStorePassword("password");
|
||||||
|
sslContext.afterPropertiesSet();
|
||||||
|
brokerService.setSslContext(sslContext);
|
||||||
|
|
||||||
|
ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
|
||||||
|
|
||||||
|
addMQTTConnector();
|
||||||
|
addOpenWireConnector();
|
||||||
|
|
||||||
|
cf = new ActiveMQConnectionFactory(jmsUri);
|
||||||
|
|
||||||
|
BrokerPlugin authenticationPlugin = configureAuthentication();
|
||||||
|
if (authenticationPlugin != null) {
|
||||||
|
plugins.add(configureAuthorization());
|
||||||
|
}
|
||||||
|
|
||||||
|
BrokerPlugin authorizationPlugin = configureAuthorization();
|
||||||
|
if (authorizationPlugin != null) {
|
||||||
|
plugins.add(configureAuthentication());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!plugins.isEmpty()) {
|
||||||
|
BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
|
||||||
|
brokerService.setPlugins(plugins.toArray(array));
|
||||||
|
}
|
||||||
|
|
||||||
|
brokerService.start();
|
||||||
|
brokerService.waitUntilStarted();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void applyMemoryLimitPolicy() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void createBroker() throws Exception {
|
||||||
|
brokerService = new BrokerService();
|
||||||
|
brokerService.setPersistent(isPersistent());
|
||||||
|
brokerService.setAdvisorySupport(false);
|
||||||
|
brokerService.setSchedulerSupport(true);
|
||||||
|
brokerService.setPopulateJMSXUserID(true);
|
||||||
|
brokerService.setSchedulerSupport(true);
|
||||||
|
|
||||||
|
JobSchedulerStoreImpl jobStore = new JobSchedulerStoreImpl();
|
||||||
|
jobStore.setDirectory(new File("activemq-data"));
|
||||||
|
|
||||||
|
brokerService.setJobSchedulerStore(jobStore);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerPlugin configureAuthentication() throws Exception {
|
||||||
|
List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
|
||||||
|
users.add(new AuthenticationUser("system", "manager", "users,admins"));
|
||||||
|
users.add(new AuthenticationUser("user", "password", "users"));
|
||||||
|
users.add(new AuthenticationUser("guest", "password", "guests"));
|
||||||
|
SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
|
||||||
|
|
||||||
|
return authenticationPlugin;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerPlugin configureAuthorization() throws Exception {
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
List<DestinationMapEntry> authorizationEntries = new ArrayList<DestinationMapEntry>();
|
||||||
|
|
||||||
|
AuthorizationEntry entry = new AuthorizationEntry();
|
||||||
|
entry.setQueue(">");
|
||||||
|
entry.setRead("admins");
|
||||||
|
entry.setWrite("admins");
|
||||||
|
entry.setAdmin("admins");
|
||||||
|
authorizationEntries.add(entry);
|
||||||
|
entry = new AuthorizationEntry();
|
||||||
|
entry.setQueue("USERS.>");
|
||||||
|
entry.setRead("users");
|
||||||
|
entry.setWrite("users");
|
||||||
|
entry.setAdmin("users");
|
||||||
|
authorizationEntries.add(entry);
|
||||||
|
entry = new AuthorizationEntry();
|
||||||
|
entry.setQueue("GUEST.>");
|
||||||
|
entry.setRead("guests");
|
||||||
|
entry.setWrite("guests,users");
|
||||||
|
entry.setAdmin("guests,users");
|
||||||
|
authorizationEntries.add(entry);
|
||||||
|
entry = new AuthorizationEntry();
|
||||||
|
entry.setTopic(">");
|
||||||
|
entry.setRead("admins");
|
||||||
|
entry.setWrite("admins");
|
||||||
|
entry.setAdmin("admins");
|
||||||
|
authorizationEntries.add(entry);
|
||||||
|
entry = new AuthorizationEntry();
|
||||||
|
entry.setTopic("USERS.>");
|
||||||
|
entry.setRead("users");
|
||||||
|
entry.setWrite("users");
|
||||||
|
entry.setAdmin("users");
|
||||||
|
authorizationEntries.add(entry);
|
||||||
|
entry = new AuthorizationEntry();
|
||||||
|
entry.setTopic("GUEST.>");
|
||||||
|
entry.setRead("guests");
|
||||||
|
entry.setWrite("guests,users");
|
||||||
|
entry.setAdmin("guests,users");
|
||||||
|
authorizationEntries.add(entry);
|
||||||
|
entry = new AuthorizationEntry();
|
||||||
|
entry.setTopic("ActiveMQ.Advisory.>");
|
||||||
|
entry.setRead("guests,users");
|
||||||
|
entry.setWrite("guests,users");
|
||||||
|
entry.setAdmin("guests,users");
|
||||||
|
authorizationEntries.add(entry);
|
||||||
|
|
||||||
|
TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
|
||||||
|
tempEntry.setRead("admins");
|
||||||
|
tempEntry.setWrite("admins");
|
||||||
|
tempEntry.setAdmin("admins");
|
||||||
|
|
||||||
|
DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries);
|
||||||
|
authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
|
||||||
|
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap);
|
||||||
|
|
||||||
|
return authorizationPlugin;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void applyBrokerPolicies() throws Exception {
|
||||||
|
// NOOP here
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addOpenWireConnector() throws Exception {
|
||||||
|
TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:0");
|
||||||
|
jmsUri = connector.getPublishableConnectString();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addMQTTConnector() throws Exception {
|
||||||
|
// Overrides of this method can add additional configuration options or add multiple
|
||||||
|
// MQTT transport connectors as needed, the port variable is always supposed to be
|
||||||
|
// assigned the primary MQTT connector's port.
|
||||||
|
TransportConnector connector = brokerService.addConnector(getProtocolScheme() + "://0.0.0.0:" + port);
|
||||||
|
port = connector.getConnectUri().getPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stopBroker() throws Exception {
|
||||||
|
if (brokerService != null) {
|
||||||
|
brokerService.stop();
|
||||||
|
brokerService.waitUntilStopped();
|
||||||
|
brokerService = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getQueueName() {
|
||||||
|
return getClass().getName() + "." + name.getMethodName();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getTopicName() {
|
||||||
|
return getClass().getName() + "." + name.getMethodName();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
|
||||||
|
ObjectName brokerViewMBean = new ObjectName(
|
||||||
|
"org.apache.activemq:type=Broker,brokerName=localhost");
|
||||||
|
BrokerViewMBean proxy = (BrokerViewMBean) brokerService.getManagementContext()
|
||||||
|
.newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
|
||||||
|
return proxy;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
|
||||||
|
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
|
||||||
|
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
|
||||||
|
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
|
||||||
|
return proxy;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
|
||||||
|
ObjectName topicViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
|
||||||
|
TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext()
|
||||||
|
.newProxyInstance(topicViewMBeanName, TopicViewMBean.class, true);
|
||||||
|
return proxy;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize an MQTTClientProvider instance. By default this method uses the port that's
|
||||||
|
* assigned to be the TCP based port using the base version of addMQTTConnector. A sbuclass
|
||||||
|
* can either change the value of port or override this method to assign the correct port.
|
||||||
|
*
|
||||||
|
* @param provider
|
||||||
|
* the MQTTClientProvider instance to initialize.
|
||||||
|
*
|
||||||
|
* @throws Exception if an error occurs during initialization.
|
||||||
|
*/
|
||||||
|
protected void initializeConnection(MQTTClientProvider provider) throws Exception {
|
||||||
|
provider.connect("tcp://localhost:" + port);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getProtocolScheme() {
|
||||||
|
return "mqtt";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isPersistent() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static interface Task {
|
||||||
|
public void run() throws Exception;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void within(int time, TimeUnit unit, Task task) throws InterruptedException {
|
||||||
|
long timeMS = unit.toMillis(time);
|
||||||
|
long deadline = System.currentTimeMillis() + timeMS;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
return;
|
||||||
|
} catch (Throwable e) {
|
||||||
|
long remaining = deadline - System.currentTimeMillis();
|
||||||
|
if( remaining <=0 ) {
|
||||||
|
if( e instanceof RuntimeException ) {
|
||||||
|
throw (RuntimeException)e;
|
||||||
|
}
|
||||||
|
if( e instanceof Error ) {
|
||||||
|
throw (Error)e;
|
||||||
|
}
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
Thread.sleep(Math.min(timeMS/10, remaining));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MQTTClientProvider getMQTTClientProvider() {
|
||||||
|
return new FuseMQQTTClientProvider();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MQTT createMQTTConnection() throws Exception {
|
||||||
|
return createMQTTConnection(null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception {
|
||||||
|
MQTT mqtt = new MQTT();
|
||||||
|
mqtt.setConnectAttemptsMax(1);
|
||||||
|
mqtt.setReconnectAttemptsMax(0);
|
||||||
|
mqtt.setTracer(createTracer());
|
||||||
|
if (clientId != null) {
|
||||||
|
mqtt.setClientId(clientId);
|
||||||
|
}
|
||||||
|
mqtt.setCleanSession(clean);
|
||||||
|
mqtt.setHost("localhost", port);
|
||||||
|
return mqtt;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Tracer createTracer() {
|
||||||
|
return new Tracer() {
|
||||||
|
@Override
|
||||||
|
public void onReceive(MQTTFrame frame) {
|
||||||
|
LOG.info("Client Received:\n" + frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSend(MQTTFrame frame) {
|
||||||
|
LOG.info("Client Sent:\n" + frame);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void debug(String message, Object... args) {
|
||||||
|
LOG.info(String.format(message, args));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.mqtt;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.net.ProtocolException;
|
||||||
|
|
||||||
|
import org.fusesource.mqtt.client.BlockingConnection;
|
||||||
|
import org.fusesource.mqtt.client.MQTT;
|
||||||
|
import org.fusesource.mqtt.client.Tracer;
|
||||||
|
import org.fusesource.mqtt.codec.CONNACK;
|
||||||
|
import org.fusesource.mqtt.codec.MQTTFrame;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class MQTTTests extends MQTTTestSupport {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(MQTTTests.class);
|
||||||
|
|
||||||
|
@Test(timeout = 60 * 1000)
|
||||||
|
public void testBadUserNameOrPasswordGetsConnAckWithErrorCode() throws Exception {
|
||||||
|
MQTT mqttPub = createMQTTConnection("pub", true);
|
||||||
|
mqttPub.setUserName("admin");
|
||||||
|
mqttPub.setPassword("admin");
|
||||||
|
|
||||||
|
mqttPub.setTracer(new Tracer() {
|
||||||
|
@Override
|
||||||
|
public void onReceive(MQTTFrame frame) {
|
||||||
|
LOG.info("Client received: {}", frame);
|
||||||
|
if (frame.messageType() == CONNACK.TYPE) {
|
||||||
|
CONNACK connAck = new CONNACK();
|
||||||
|
try {
|
||||||
|
connAck.decode(frame);
|
||||||
|
LOG.info("{}", connAck);
|
||||||
|
assertEquals(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, connAck.code());
|
||||||
|
} catch (ProtocolException e) {
|
||||||
|
fail("Error decoding publish " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSend(MQTTFrame frame) {
|
||||||
|
LOG.info("Client sent: {}", frame);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
BlockingConnection connectionPub = mqttPub.blockingConnection();
|
||||||
|
try {
|
||||||
|
connectionPub.connect();
|
||||||
|
fail("Should not be able to connect.");
|
||||||
|
} catch (Exception e) {}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,237 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.mqtt.util;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
import java.security.KeyStore;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.security.SecureRandom;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import javax.net.ssl.KeyManager;
|
||||||
|
import javax.net.ssl.KeyManagerFactory;
|
||||||
|
import javax.net.ssl.TrustManager;
|
||||||
|
import javax.net.ssl.TrustManagerFactory;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.SslContext;
|
||||||
|
import org.springframework.core.io.ClassPathResource;
|
||||||
|
import org.springframework.core.io.FileSystemResource;
|
||||||
|
import org.springframework.core.io.Resource;
|
||||||
|
import org.springframework.core.io.UrlResource;
|
||||||
|
import org.springframework.util.ResourceUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extends the SslContext so that it's easier to configure from spring.
|
||||||
|
*/
|
||||||
|
public class ResourceLoadingSslContext extends SslContext {
|
||||||
|
|
||||||
|
private String keyStoreType = "jks";
|
||||||
|
private String trustStoreType = "jks";
|
||||||
|
|
||||||
|
private String secureRandomAlgorithm = "SHA1PRNG";
|
||||||
|
private String keyStoreAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
|
||||||
|
private String trustStoreAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
|
||||||
|
|
||||||
|
private String keyStore;
|
||||||
|
private String trustStore;
|
||||||
|
|
||||||
|
private String keyStoreKeyPassword;
|
||||||
|
private String keyStorePassword;
|
||||||
|
private String trustStorePassword;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
|
||||||
|
*
|
||||||
|
* delegates to afterPropertiesSet, done to prevent backwards incompatible
|
||||||
|
* signature change.
|
||||||
|
*/
|
||||||
|
@PostConstruct
|
||||||
|
private void postConstruct() {
|
||||||
|
try {
|
||||||
|
afterPropertiesSet();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
* @org.apache.xbean.InitMethod
|
||||||
|
*/
|
||||||
|
public void afterPropertiesSet() throws Exception {
|
||||||
|
keyManagers.addAll(createKeyManagers());
|
||||||
|
trustManagers.addAll(createTrustManagers());
|
||||||
|
if (secureRandom == null) {
|
||||||
|
secureRandom = createSecureRandom();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private SecureRandom createSecureRandom() throws NoSuchAlgorithmException {
|
||||||
|
return SecureRandom.getInstance(secureRandomAlgorithm);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Collection<TrustManager> createTrustManagers() throws Exception {
|
||||||
|
KeyStore ks = createTrustManagerKeyStore();
|
||||||
|
if (ks == null) {
|
||||||
|
return new ArrayList<TrustManager>(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
TrustManagerFactory tmf = TrustManagerFactory.getInstance(trustStoreAlgorithm);
|
||||||
|
tmf.init(ks);
|
||||||
|
return Arrays.asList(tmf.getTrustManagers());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Collection<KeyManager> createKeyManagers() throws Exception {
|
||||||
|
KeyStore ks = createKeyManagerKeyStore();
|
||||||
|
if (ks == null) {
|
||||||
|
return new ArrayList<KeyManager>(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
KeyManagerFactory tmf = KeyManagerFactory.getInstance(keyStoreAlgorithm);
|
||||||
|
tmf.init(ks, keyStoreKeyPassword == null ? (keyStorePassword == null ? null : keyStorePassword.toCharArray()) : keyStoreKeyPassword.toCharArray());
|
||||||
|
return Arrays.asList(tmf.getKeyManagers());
|
||||||
|
}
|
||||||
|
|
||||||
|
private KeyStore createTrustManagerKeyStore() throws Exception {
|
||||||
|
if (trustStore == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
KeyStore ks = KeyStore.getInstance(trustStoreType);
|
||||||
|
InputStream is = resourceFromString(trustStore).getInputStream();
|
||||||
|
try {
|
||||||
|
ks.load(is, trustStorePassword == null ? null : trustStorePassword.toCharArray());
|
||||||
|
} finally {
|
||||||
|
is.close();
|
||||||
|
}
|
||||||
|
return ks;
|
||||||
|
}
|
||||||
|
|
||||||
|
private KeyStore createKeyManagerKeyStore() throws Exception {
|
||||||
|
if (keyStore == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
KeyStore ks = KeyStore.getInstance(keyStoreType);
|
||||||
|
InputStream is = resourceFromString(keyStore).getInputStream();
|
||||||
|
try {
|
||||||
|
ks.load(is, keyStorePassword == null ? null : keyStorePassword.toCharArray());
|
||||||
|
} finally {
|
||||||
|
is.close();
|
||||||
|
}
|
||||||
|
return ks;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTrustStoreType() {
|
||||||
|
return trustStoreType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKeyStoreType() {
|
||||||
|
return keyStoreType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKeyStore() {
|
||||||
|
return keyStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setKeyStore(String keyStore) throws MalformedURLException {
|
||||||
|
this.keyStore = keyStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTrustStore() {
|
||||||
|
return trustStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTrustStore(String trustStore) throws MalformedURLException {
|
||||||
|
this.trustStore = trustStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKeyStoreAlgorithm() {
|
||||||
|
return keyStoreAlgorithm;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setKeyStoreAlgorithm(String keyAlgorithm) {
|
||||||
|
this.keyStoreAlgorithm = keyAlgorithm;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTrustStoreAlgorithm() {
|
||||||
|
return trustStoreAlgorithm;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTrustStoreAlgorithm(String trustAlgorithm) {
|
||||||
|
this.trustStoreAlgorithm = trustAlgorithm;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKeyStoreKeyPassword() {
|
||||||
|
return keyStoreKeyPassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setKeyStoreKeyPassword(String keyPassword) {
|
||||||
|
this.keyStoreKeyPassword = keyPassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKeyStorePassword() {
|
||||||
|
return keyStorePassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setKeyStorePassword(String keyPassword) {
|
||||||
|
this.keyStorePassword = keyPassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTrustStorePassword() {
|
||||||
|
return trustStorePassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTrustStorePassword(String trustPassword) {
|
||||||
|
this.trustStorePassword = trustPassword;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setKeyStoreType(String keyType) {
|
||||||
|
this.keyStoreType = keyType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTrustStoreType(String trustType) {
|
||||||
|
this.trustStoreType = trustType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSecureRandomAlgorithm() {
|
||||||
|
return secureRandomAlgorithm;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSecureRandomAlgorithm(String secureRandomAlgorithm) {
|
||||||
|
this.secureRandomAlgorithm = secureRandomAlgorithm;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Resource resourceFromString(String uri) throws MalformedURLException {
|
||||||
|
Resource resource;
|
||||||
|
File file = new File(uri);
|
||||||
|
if (file.exists()) {
|
||||||
|
resource = new FileSystemResource(uri);
|
||||||
|
} else if (ResourceUtils.isUrl(uri)) {
|
||||||
|
resource = new UrlResource(uri);
|
||||||
|
} else {
|
||||||
|
resource = new ClassPathResource(uri);
|
||||||
|
}
|
||||||
|
return resource;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue