Unregister the connection on the Broker side so client's with the same client ID can reconnect. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1458900 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-03-20 15:56:12 +00:00
parent e80dc81741
commit 7b5f9563cd
4 changed files with 45 additions and 10 deletions

View File

@ -50,6 +50,7 @@ import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
@ -105,7 +106,7 @@ class MQTTProtocolConverter {
private final Object commnadIdMutex = new Object();
private int lastCommandId;
private final AtomicBoolean connected = new AtomicBoolean(false);
private ConnectionInfo connectionInfo = new ConnectionInfo();
private final ConnectionInfo connectionInfo = new ConnectionInfo();
private CONNECT connect;
private String clientId;
private long defaultKeepAlive;
@ -159,7 +160,7 @@ class MQTTProtocolConverter {
}
case DISCONNECT.TYPE: {
LOG.debug("MQTT Client " + getClientId() + " disconnecting");
stopTransport();
onMQTTDisconnect();
break;
}
case SUBSCRIBE.TYPE: {
@ -232,6 +233,7 @@ class MQTTProtocolConverter {
connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
@ -250,6 +252,7 @@ class MQTTProtocolConverter {
final ProducerInfo producerInfo = new ProducerInfo(producerId);
sendToActiveMQ(producerInfo, new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
@ -272,6 +275,14 @@ class MQTTProtocolConverter {
});
}
void onMQTTDisconnect() throws MQTTProtocolException {
if (connected.get()) {
sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
sendToActiveMQ(new ShutdownInfo(), null);
}
stopTransport();
}
void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
checkConnected();
Topic[] topics = command.topics();
@ -516,6 +527,7 @@ class MQTTProtocolConverter {
bytesOut.write(data, 0, read);
}
byteSequence = bytesOut.toByteSequence();
bytesOut.close();
}
result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
}
@ -555,7 +567,6 @@ class MQTTProtocolConverter {
return;
}
long keepAliveMS = keepAliveSeconds * 1000;
if (LOG.isDebugEnabled()) {
@ -586,9 +597,6 @@ class MQTTProtocolConverter {
} catch (Exception ex) {
LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
}
}
void handleException(Throwable exception, MQTTFrame command) {
@ -636,6 +644,7 @@ class MQTTProtocolConverter {
switch (command.qos()) {
case AT_LEAST_ONCE:
return new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
@ -648,6 +657,7 @@ class MQTTProtocolConverter {
};
case EXACTLY_ONCE:
return new ResponseHandler() {
@Override
public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());

View File

@ -21,6 +21,7 @@ import java.security.cert.X509Certificate;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
@ -58,6 +59,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
}
}
@Override
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
@ -67,6 +69,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
}
}
@Override
public void onCommand(Object command) {
try {
if (trace) {
@ -81,6 +84,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
}
}
@Override
public void sendToActiveMQ(Command command) {
TransportListener l = transportListener;
if (l != null) {
@ -88,6 +92,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
}
}
@Override
public void sendToMQTT(MQTTFrame command) throws IOException {
if( !stopped.get() ) {
if (trace) {
@ -107,6 +112,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
}
}
@Override
public X509Certificate[] getPeerCertificates() {
if (next instanceof SslTransport) {
X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
@ -162,10 +168,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
* The default = 1
* @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription
*/
public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
protocolConverter.setActiveMQSubscriptionPrefetch(activeMQSubscriptionPrefetch);
}
}

View File

@ -84,6 +84,28 @@ public class MQTTTest extends AbstractMQTTTest {
}));
}
@Test(timeout=300000)
public void testReuseConnection() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("Test-Client");
{
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.disconnect();
Thread.sleep(1000);
}
{
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
connection.disconnect();
Thread.sleep(1000);
}
}
@Override
protected String getProtocolScheme() {
return "mqtt";

View File

@ -20,9 +20,9 @@
#
log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
#log4j.logger.org.apache.activemq.transport.failover=TRACE
log4j.logger.org.apache.activemq.transport.mqtt=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc=TRACE
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG