This commit is contained in:
Dejan Bosanac 2014-03-20 11:31:44 +01:00
parent 6972d37e63
commit 4ba4aa21d3
5 changed files with 22 additions and 65 deletions

View File

@ -121,7 +121,7 @@ public class MQTTProtocolConverter {
command.setResponseRequired(true);
resposeHandlers.put(command.getCommandId(), handler);
}
mqttTransport.sendToActiveMQ(command);
getMQTTTransport().sendToActiveMQ(command);
}
void sendToMQTT(MQTTFrame frame) {
@ -140,7 +140,7 @@ public class MQTTProtocolConverter {
switch (frame.messageType()) {
case PINGREQ.TYPE: {
LOG.debug("Received a ping from client: " + getClientId());
mqttTransport.sendToMQTT(PING_RESP_FRAME);
sendToMQTT(PING_RESP_FRAME);
LOG.debug("Sent Ping Response to " + getClientId());
break;
}

View File

@ -17,13 +17,10 @@
package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.net.ProtocolException;
import java.security.cert.X509Certificate;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
@ -51,6 +48,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
private final AtomicBoolean stopped = new AtomicBoolean();
private boolean trace;
private final Object sendLock = new Object();
public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) {
super(next);
@ -80,7 +78,7 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
}
protocolConverter.onMQTTCommand(frame);
} catch (IOException e) {
handleException(e);
onException(e);
} catch (JMSException e) {
onException(IOExceptionSupport.create(e));
}
@ -102,7 +100,10 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
}
Transport n = next;
if (n != null) {
n.oneway(command);
// sync access to underlying transport buffer
synchronized (sendLock) {
n.oneway(command);
}
}
}
}
@ -174,9 +175,10 @@ public class MQTTTransportFilter extends TransportFilter implements MQTTTranspor
return this.wireFormat;
}
public void handleException(IOException e) {
@Override
public void onException(IOException error) {
protocolConverter.onTransportError();
super.onException(e);
super.onException(error);
}
public long getDefaultKeepAlive() {

View File

@ -30,7 +30,6 @@ import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
@ -43,20 +42,6 @@ public class MQTTNioTest extends MQTTTest {
return "mqtt+nio";
}
@Ignore("See AMQ-4712")
@Override
@Test
public void testReceiveMessageSentWhileOffline() throws Exception {
super.testReceiveMessageSentWhileOffline();
}
@Ignore("See AMQ-4712")
@Override
@Test
public void testResendMessageId() throws Exception {
super.testResendMessageId();
}
@Test
public void testPingOnMQTTNIO() throws Exception {
addMQTTConnector("maxInactivityDuration=-1");

View File

@ -19,14 +19,12 @@ package org.apache.activemq.transport.mqtt;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.fusesource.mqtt.client.MQTT;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.slf4j.Logger;
@ -53,20 +51,6 @@ public class MQTTSSLTest extends MQTTTest {
return "mqtt+ssl";
}
@Ignore("See AMQ-4712")
@Override
@Test
public void testReceiveMessageSentWhileOffline() throws Exception {
super.testReceiveMessageSentWhileOffline();
}
@Ignore("See AMQ-4712")
@Override
@Test
public void testResendMessageId() throws Exception {
super.testResendMessageId();
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);

View File

@ -46,7 +46,6 @@ import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.security.SimpleAuthorizationMap;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
@ -590,15 +589,7 @@ public class MQTTTest extends AbstractMQTTTest {
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
// copy the buffers before we decode
Buffer[] buffers = frame.buffers();
Buffer[] copy = new Buffer[buffers.length];
for (int i = 0; i < buffers.length; i++) {
copy[i] = buffers[i].deepCopy();
}
publish.decode(frame);
// reset frame buffers to deep copy
frame.buffers(copy);
} catch (ProtocolException e) {
fail("Error decoding publish " + e.getMessage());
}
@ -684,15 +675,7 @@ public class MQTTTest extends AbstractMQTTTest {
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
// copy the buffers before we decode
Buffer[] buffers = frame.buffers();
Buffer[] copy = new Buffer[buffers.length];
for (int i = 0; i < buffers.length; i++) {
copy[i] = buffers[i].deepCopy();
}
publish.decode(frame);
// reset frame buffers to deep copy
frame.buffers(copy);
} catch (ProtocolException e) {
fail("Error decoding publish " + e.getMessage());
}
@ -717,25 +700,28 @@ public class MQTTTest extends AbstractMQTTTest {
// publish non-retained message
connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
// drop subs without acknowledging messages, then subscribe and receive again
connection.unsubscribe(subs);
Thread.sleep(1000);
connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)});
Thread.sleep(1000);
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
final Message msg2 = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg2);
assertEquals(TOPIC, new String(msg2.getPayload()));
// ack messages after receiving all of them
msg.ack();
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
msg.ack();
msg2.ack();
// make sure we received duplicate message ids
List<Integer> dups = new ArrayList<Integer>();
@ -1177,4 +1163,4 @@ public class MQTTTest extends AbstractMQTTTest {
connection.disconnect();
}
}
}