mirror of https://github.com/apache/activemq.git
Add patch that fixes the missing test case.
This commit is contained in:
parent
afddc1a832
commit
45c0dfb2bc
|
@ -693,41 +693,185 @@ public class MQTTTest extends AbstractMQTTTest {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
final BlockingConnection connection = mqtt.blockingConnection();
|
BlockingConnection connection = mqtt.blockingConnection();
|
||||||
connection.connect();
|
connection.connect();
|
||||||
|
|
||||||
// create overlapping subscriptions with different QoSs
|
|
||||||
final String TOPIC = "TopicA/";
|
final String TOPIC = "TopicA/";
|
||||||
final Topic[] topics = {new Topic(TOPIC, QoS.AT_LEAST_ONCE)};
|
final String[] topics = new String[] {TOPIC, "TopicA/+"};
|
||||||
connection.subscribe(topics);
|
connection.subscribe(new Topic[]{new Topic(topics[0], QoS.AT_LEAST_ONCE), new Topic(topics[1], QoS.EXACTLY_ONCE)});
|
||||||
|
|
||||||
// publish non-retained message
|
// publish non-retained message
|
||||||
connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
|
connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
|
||||||
|
|
||||||
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
|
Wait.waitFor(new Wait.Condition() {
|
||||||
assertNotNull(msg);
|
@Override
|
||||||
assertEquals(TOPIC, new String(msg.getPayload()));
|
public boolean isSatisified() throws Exception {
|
||||||
|
return publishList.size() == 2;
|
||||||
|
}
|
||||||
|
}, 5000);
|
||||||
|
assertEquals(2, publishList.size());
|
||||||
|
|
||||||
// drop subs without acknowledging messages, then subscribe and receive again
|
connection.disconnect();
|
||||||
connection.unsubscribe(new String[]{ TOPIC });
|
|
||||||
Thread.sleep(1000);
|
|
||||||
connection.subscribe(topics);
|
|
||||||
Thread.sleep(1000);
|
|
||||||
|
|
||||||
msg = connection.receive(30000, TimeUnit.MILLISECONDS);
|
connection = mqtt.blockingConnection();
|
||||||
assertNotNull(msg);
|
connection.connect();
|
||||||
assertEquals(TOPIC, new String(msg.getPayload()));
|
|
||||||
msg.ack();
|
Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return publishList.size() == 4;
|
||||||
|
}
|
||||||
|
}, 5000);
|
||||||
|
assertEquals(4, publishList.size());
|
||||||
|
|
||||||
// make sure we received duplicate message ids
|
// make sure we received duplicate message ids
|
||||||
assertEquals(2, publishList.size());
|
assertTrue(publishList.get(0).messageId() == publishList.get(2).messageId() ||
|
||||||
assertEquals(publishList.get(0).messageId(), publishList.get(1).messageId());
|
publishList.get(0).messageId() == publishList.get(3).messageId());
|
||||||
assertTrue(publishList.get(0).dup() || publishList.get(1).dup());
|
assertTrue(publishList.get(1).messageId() == publishList.get(3).messageId() ||
|
||||||
|
publishList.get(1).messageId() == publishList.get(2).messageId());
|
||||||
|
assertTrue(publishList.get(2).dup() && publishList.get(3).dup());
|
||||||
|
|
||||||
|
connection.unsubscribe(topics);
|
||||||
|
connection.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 90 * 1000)
|
||||||
|
public void testPacketIdGeneratorNonCleanSession() throws Exception {
|
||||||
|
addMQTTConnector("trace=true");
|
||||||
|
brokerService.start();
|
||||||
|
|
||||||
|
final MQTT mqtt = createMQTTConnection("nonclean-packetid", false);
|
||||||
|
mqtt.setKeepAlive((short) 15);
|
||||||
|
|
||||||
|
final Map<Short, PUBLISH> publishMap = new ConcurrentHashMap<Short, PUBLISH>();
|
||||||
|
mqtt.setTracer(new Tracer() {
|
||||||
|
@Override
|
||||||
|
public void onReceive(MQTTFrame frame) {
|
||||||
|
LOG.info("Client received:\n" + frame);
|
||||||
|
if (frame.messageType() == PUBLISH.TYPE) {
|
||||||
|
PUBLISH publish = new PUBLISH();
|
||||||
|
try {
|
||||||
|
publish.decode(frame);
|
||||||
|
LOG.info("PUBLISH " + publish);
|
||||||
|
} catch (ProtocolException e) {
|
||||||
|
fail("Error decoding publish " + e.getMessage());
|
||||||
|
}
|
||||||
|
if (publishMap.get(publish.messageId()) != null) {
|
||||||
|
assertTrue(publish.dup());
|
||||||
|
}
|
||||||
|
publishMap.put(publish.messageId(), publish);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSend(MQTTFrame frame) {
|
||||||
|
LOG.info("Client sent:\n" + frame);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
BlockingConnection connection = mqtt.blockingConnection();
|
||||||
|
connection.connect();
|
||||||
|
final String TOPIC = "TopicA/";
|
||||||
|
connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
|
||||||
|
|
||||||
|
// publish non-retained messages
|
||||||
|
final int TOTAL_MESSAGES = 10;
|
||||||
|
for (int i = 0; i < TOTAL_MESSAGES; i++) {
|
||||||
|
connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// receive half the messages in this session
|
||||||
|
for (int i = 0; i < TOTAL_MESSAGES / 2; i++) {
|
||||||
|
final Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertEquals(TOPIC, new String(msg.getPayload()));
|
||||||
|
msg.ack();
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.disconnect();
|
||||||
|
// resume session
|
||||||
|
connection = mqtt.blockingConnection();
|
||||||
|
connection.connect();
|
||||||
|
// receive rest of the messages
|
||||||
|
Message msg = null;
|
||||||
|
do {
|
||||||
|
msg = connection.receive(1000, TimeUnit.MILLISECONDS);
|
||||||
|
if (msg != null) {
|
||||||
|
assertEquals(TOPIC, new String(msg.getPayload()));
|
||||||
|
msg.ack();
|
||||||
|
}
|
||||||
|
} while (msg != null);
|
||||||
|
|
||||||
|
// make sure we received all message ids
|
||||||
|
for (short id = 1; id <= TOTAL_MESSAGES; id++) {
|
||||||
|
assertNotNull("No message for id " + id, publishMap.get(id));
|
||||||
|
}
|
||||||
|
|
||||||
connection.unsubscribe(new String[] { TOPIC });
|
connection.unsubscribe(new String[] { TOPIC });
|
||||||
connection.disconnect();
|
connection.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 90 * 1000)
|
||||||
|
public void testPacketIdGeneratorCleanSession() throws Exception {
|
||||||
|
addMQTTConnector("trace=true");
|
||||||
|
brokerService.start();
|
||||||
|
|
||||||
|
final String[] cleanClientIds = new String[] { "", "clean-packetid", null};
|
||||||
|
final Map<Short, PUBLISH> publishMap = new ConcurrentHashMap<Short, PUBLISH>();
|
||||||
|
MQTT[] mqtts = new MQTT[cleanClientIds.length];
|
||||||
|
for (int i = 0; i < cleanClientIds.length; i++) {
|
||||||
|
mqtts[i] = createMQTTConnection("", true);
|
||||||
|
mqtts[i].setKeepAlive((short) 15);
|
||||||
|
|
||||||
|
mqtts[i].setTracer(new Tracer() {
|
||||||
|
@Override
|
||||||
|
public void onReceive(MQTTFrame frame) {
|
||||||
|
LOG.info("Client received:\n" + frame);
|
||||||
|
if (frame.messageType() == PUBLISH.TYPE) {
|
||||||
|
PUBLISH publish = new PUBLISH();
|
||||||
|
try {
|
||||||
|
publish.decode(frame);
|
||||||
|
LOG.info("PUBLISH " + publish);
|
||||||
|
} catch (ProtocolException e) {
|
||||||
|
fail("Error decoding publish " + e.getMessage());
|
||||||
|
}
|
||||||
|
if (publishMap.get(publish.messageId()) != null) {
|
||||||
|
assertTrue(publish.dup());
|
||||||
|
}
|
||||||
|
publishMap.put(publish.messageId(), publish);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSend(MQTTFrame frame) {
|
||||||
|
LOG.info("Client sent:\n" + frame);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
final Random random = new Random();
|
||||||
|
for (short i = 0; i < 10; i++) {
|
||||||
|
BlockingConnection connection = mqtts[random.nextInt(cleanClientIds.length)].blockingConnection();
|
||||||
|
connection.connect();
|
||||||
|
final String TOPIC = "TopicA/";
|
||||||
|
connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
|
||||||
|
|
||||||
|
// publish non-retained message
|
||||||
|
connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
|
||||||
|
Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertEquals(TOPIC, new String(msg.getPayload()));
|
||||||
|
msg.ack();
|
||||||
|
|
||||||
|
assertEquals(1, publishMap.size());
|
||||||
|
final short id = (short) (i + 1);
|
||||||
|
assertNotNull("No message for id " + id, publishMap.get(id));
|
||||||
|
publishMap.clear();
|
||||||
|
|
||||||
|
connection.disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
public void testClientConnectionFailure() throws Exception {
|
public void testClientConnectionFailure() throws Exception {
|
||||||
addMQTTConnector();
|
addMQTTConnector();
|
||||||
|
|
Loading…
Reference in New Issue