added test for unsubscribe

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1500057 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2013-07-05 16:21:28 +00:00
parent 2f8381cbe4
commit 8335dfcf77
4 changed files with 74 additions and 45 deletions

View File

@ -277,6 +277,7 @@ public class MQTTProtocolConverter {
void onMQTTDisconnect() throws MQTTProtocolException {
if (connected.get()) {
connected.set(false);
sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
sendToActiveMQ(new ShutdownInfo(), null);
}
@ -542,7 +543,7 @@ public class MQTTProtocolConverter {
public void onTransportError() {
if (connect != null) {
if (connect.willTopic() != null && connect.willMessage() != null) {
if (connected.get() && connect.willTopic() != null && connect.willMessage() != null) {
try {
PUBLISH publish = new PUBLISH();
publish.topicName(connect.willTopic());

View File

@ -22,7 +22,6 @@ import java.security.ProtectionDomain;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -77,49 +76,6 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport {
super.tearDown();
}
@Test(timeout=300000)
public void testWillNotSentOnClose() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
String willTopic = "lastWillAndTestament";
subscriptionProvider.subscribe(willTopic,AT_MOST_ONCE);
final AtomicInteger count = new AtomicInteger();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1; i++){
try {
byte[] payload = subscriptionProvider.receive(10000);
assertNull("Should get a message", payload);
count.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
});
thread.start();
final MQTTClientProvider publishProvider = getMQTTClientProvider();
publishProvider.setWillTopic(willTopic);
publishProvider.setWillMessage("EverythingGoesToRob");
initializeConnection(publishProvider);
Thread.sleep(1000);
publishProvider.disconnect();
assertEquals(0, count.get());
subscriptionProvider.disconnect();
publishProvider.disconnect();
}
@Test(timeout=300000)
public void testSendAndReceiveMQTT() throws Exception {
@ -165,6 +121,54 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport {
publishProvider.disconnect();
}
@Test(timeout=300000)
public void testUnsubscribeMQTT() throws Exception {
addMQTTConnector();
brokerService.start();
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
initializeConnection(subscriptionProvider);
String topic = "foo/bah";
subscriptionProvider.subscribe(topic,AT_MOST_ONCE);
final CountDownLatch latch = new CountDownLatch(numberOfMessages/2);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < numberOfMessages; i++){
try {
byte[] payload = subscriptionProvider.receive(10000);
assertNotNull("Should get a message", payload);
latch.countDown();
} catch (Exception e) {
e.printStackTrace();
break;
}
}
}
});
thread.start();
final MQTTClientProvider publishProvider = getMQTTClientProvider();
initializeConnection(publishProvider);
for (int i = 0; i < numberOfMessages; i++){
String payload = "Message " + i;
if (i == numberOfMessages/2){
subscriptionProvider.unsubscribe(topic);
}
publishProvider.publish(topic,payload.getBytes(),AT_LEAST_ONCE);
}
latch.await(10, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
subscriptionProvider.disconnect();
publishProvider.disconnect();
}
@Test(timeout=300000)
public void testSendAtMostOnceReceiveExactlyOnce() throws Exception {
/**

View File

@ -57,6 +57,11 @@ class FuseMQQTTClientProvider implements MQTTClientProvider {
connection.subscribe(topics);
}
@Override
public void unsubscribe(String topic) throws Exception {
connection.unsubscribe(new String[]{topic});
}
@Override
public byte[] receive(int timeout) throws Exception {
byte[] result = null;
@ -82,4 +87,19 @@ class FuseMQQTTClientProvider implements MQTTClientProvider {
public void setWillTopic(String topic) {
mqtt.setWillTopic(topic);
}
@Override
public void setClientId(String clientId) {
mqtt.setClientId(clientId);
}
@Override
public void kill() throws Exception {
connection.kill();
}
@Override
public void setKeepAlive(int keepAlive) throws Exception {
mqtt.setKeepAlive((short) keepAlive);
}
}

View File

@ -21,9 +21,13 @@ public interface MQTTClientProvider {
void disconnect() throws Exception;
void publish(String topic,byte[] payload,int qos) throws Exception;
void subscribe(String topic,int qos) throws Exception;
void unsubscribe(String topic) throws Exception;
byte[] receive(int timeout) throws Exception;
void setSslContext(javax.net.ssl.SSLContext sslContext);
void setWillMessage(String string);
void setWillTopic(String topic);
void setClientId(String clientId);
void kill() throws Exception;
void setKeepAlive(int keepAlive) throws Exception;
}