https://issues.apache.org/jira/browse/AMQ-5092 - apply patch from Dhiraj Bokde with thanks

This commit is contained in:
gtully 2014-03-12 13:00:38 +00:00
parent 6aaf859d22
commit 67f151fe0b
3 changed files with 258 additions and 7 deletions

View File

@ -57,6 +57,7 @@ public class MQTTProtocolConverter {
private final SessionId sessionId = new SessionId(connectionId, -1);
private final ProducerId producerId = new ProducerId(sessionId, 1);
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator();
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
@ -66,6 +67,9 @@ public class MQTTProtocolConverter {
private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination, UTF8Buffer>(DEFAULT_CACHE_SIZE);
private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
private final Map<String, Short> activemqToPacketIds = new LRUCache<String, Short>(DEFAULT_CACHE_SIZE);
private final Map<Short, String> packetIdsToActivemq = new LRUCache<Short, String>(DEFAULT_CACHE_SIZE);
private final MQTTTransport mqttTransport;
private final BrokerService brokerService;
@ -348,10 +352,15 @@ public class MQTTProtocolConverter {
PUBLISH retainedCopy = new PUBLISH();
retainedCopy.topicName(msg.topicName());
retainedCopy.retain(msg.retain());
retainedCopy.messageId(msg.messageId());
retainedCopy.payload(msg.payload());
// set QoS of retained message to maximum of subscription QoS
retainedCopy.qos(msg.qos().ordinal() > qos[i] ? QoS.values()[qos[i]] : msg.qos());
switch (retainedCopy.qos()) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
retainedCopy.messageId(getNextSequenceId());
case AT_MOST_ONCE:
}
getMQTTTransport().sendToMQTT(retainedCopy.encode());
} catch (IOException e) {
LOG.warn("Couldn't send retained message " + msg, e);
@ -446,6 +455,12 @@ public class MQTTProtocolConverter {
if (sub != null) {
MessageAck ack = sub.createMessageAck(md);
PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
switch (publish.qos()) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
publish.dup(publish.dup() ? true : md.getMessage().isRedelivered());
case AT_MOST_ONCE:
}
if (ack != null && sub.expectAck(publish)) {
synchronized (consumerAcks) {
consumerAcks.put(publish.messageId(), ack);
@ -480,6 +495,7 @@ public class MQTTProtocolConverter {
void onMQTTPubAck(PUBACK command) {
short messageId = command.messageId();
ackPacketId(messageId);
MessageAck ack;
synchronized (consumerAcks) {
ack = consumerAcks.remove(messageId);
@ -511,6 +527,7 @@ public class MQTTProtocolConverter {
void onMQTTPubComp(PUBCOMP command) {
short messageId = command.messageId();
ackPacketId(messageId);
MessageAck ack;
synchronized (consumerAcks) {
ack = consumerAcks.remove(messageId);
@ -524,7 +541,7 @@ public class MQTTProtocolConverter {
ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
msg.setProducerId(producerId);
MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId());
msg.setMessageId(id);
msg.setTimestamp(System.currentTimeMillis());
msg.setPriority((byte) Message.DEFAULT_PRIORITY);
@ -547,8 +564,7 @@ public class MQTTProtocolConverter {
public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException {
PUBLISH result = new PUBLISH();
short id = (short) message.getMessageId().getProducerSequenceId();
result.messageId(id);
// packet id is set in MQTTSubscription
QoS qoS;
if (message.propertyExists(QOS_PROPERTY_NAME)) {
int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
@ -623,7 +639,7 @@ public class MQTTProtocolConverter {
PUBLISH publish = new PUBLISH();
publish.topicName(connect.willTopic());
publish.qos(connect.willQos());
publish.messageId((short) messageIdGenerator.getNextSequenceId());
publish.messageId(getNextSequenceId());
publish.payload(connect.willMessage());
ActiveMQMessage message = convertMessage(publish);
message.setProducerId(producerId);
@ -815,4 +831,39 @@ public class MQTTProtocolConverter {
public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
}
short setPacketId(MQTTSubscription subscription, ActiveMQMessage message, PUBLISH publish) {
// subscription key
final StringBuilder subscriptionKey = new StringBuilder();
subscriptionKey.append(subscription.getConsumerInfo().getDestination().getPhysicalName())
.append(':').append(message.getJMSMessageID());
final String keyStr = subscriptionKey.toString();
Short packetId;
synchronized (activemqToPacketIds) {
packetId = activemqToPacketIds.get(keyStr);
if (packetId == null) {
packetId = getNextSequenceId();
activemqToPacketIds.put(keyStr, packetId);
packetIdsToActivemq.put(packetId, keyStr);
} else {
// mark publish as duplicate!
publish.dup(true);
}
}
publish.messageId(packetId);
return packetId;
}
void ackPacketId(short packetId) {
synchronized (activemqToPacketIds) {
final String subscriptionKey = packetIdsToActivemq.remove(packetId);
if (subscriptionKey != null) {
activemqToPacketIds.remove(subscriptionKey);
}
}
}
short getNextSequenceId() {
return (short) messageIdGenerator.getNextSequenceId();
}
}

View File

@ -18,8 +18,8 @@ package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.util.zip.DataFormatException;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerInfo;
@ -53,6 +53,13 @@ class MQTTSubscription {
if (publish.qos().ordinal() > this.qos.ordinal()) {
publish.qos(this.qos);
}
switch (publish.qos()) {
case AT_LEAST_ONCE:
case EXACTLY_ONCE:
// set packet id, and optionally dup flag
protocolConverter.setPacketId(this, message, publish);
case AT_MOST_ONCE:
}
return publish;
}
@ -71,4 +78,5 @@ class MQTTSubscription {
public QoS qos() {
return qos;
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.mqtt;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -30,12 +31,14 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotEquals;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
@ -512,6 +515,195 @@ public class MQTTTest extends AbstractMQTTTest {
}
@Test(timeout = 60 * 1000)
public void testUniqueMessageIds() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short)2);
mqtt.setCleanSession(true);
final List<PUBLISH> publishList = new ArrayList<PUBLISH>();
mqtt.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received:\n" + frame);
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
// copy the buffers before we decode
Buffer[] buffers = frame.buffers();
Buffer[] copy = new Buffer[buffers.length];
for (int i = 0; i < buffers.length; i++) {
copy[i] = buffers[i].deepCopy();
}
publish.decode(frame);
// reset frame buffers to deep copy
frame.buffers(copy);
} catch (ProtocolException e) {
fail("Error decoding publish " + e.getMessage());
}
publishList.add(publish);
}
}
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent:\n" + frame);
}
});
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
// create overlapping subscriptions with different QoSs
QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE };
final String TOPIC = "TopicA/";
// publish retained message
connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
String[] subs = {TOPIC, "TopicA/#", "TopicA/+"};
for (int i = 0; i < qoss.length; i++) {
connection.subscribe(new Topic[]{ new Topic(subs[i], qoss[i]) });
}
// publish non-retained message
connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
int received = 0;
Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
do {
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
msg.ack();
int waitCount = 0;
while (publishList.size() <= received && waitCount < 10) {
Thread.sleep(1000);
waitCount++;
}
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
} while (msg != null && received++ < subs.length * 2);
assertEquals("Unexpected number of messages", subs.length * 2, received + 1);
// make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for AT_MOST_ONCE
for (int i = 0; i < publishList.size(); i++) {
for (int j = i + 1; j < publishList.size(); j++) {
final PUBLISH publish1 = publishList.get(i);
final PUBLISH publish2 = publishList.get(j);
boolean qos0 = false;
if (publish1.qos() == QoS.AT_MOST_ONCE) {
qos0 = true;
assertEquals(0, publish1.messageId());
}
if (publish2.qos() == QoS.AT_MOST_ONCE) {
qos0 = true;
assertEquals(0, publish2.messageId());
}
if (!qos0) {
assertNotEquals(publish1.messageId(), publish2.messageId());
}
}
}
connection.unsubscribe(subs);
connection.disconnect();
}
@Test(timeout = 600 * 1000)
public void testResendMessageId() throws Exception {
addMQTTConnector();
brokerService.start();
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("foo");
mqtt.setKeepAlive((short)2);
mqtt.setCleanSession(true);
final List<PUBLISH> publishList = new ArrayList<PUBLISH>();
mqtt.setTracer(new Tracer() {
@Override
public void onReceive(MQTTFrame frame) {
LOG.info("Client received:\n" + frame);
if (frame.messageType() == PUBLISH.TYPE) {
PUBLISH publish = new PUBLISH();
try {
// copy the buffers before we decode
Buffer[] buffers = frame.buffers();
Buffer[] copy = new Buffer[buffers.length];
for (int i = 0; i < buffers.length; i++) {
copy[i] = buffers[i].deepCopy();
}
publish.decode(frame);
// reset frame buffers to deep copy
frame.buffers(copy);
} catch (ProtocolException e) {
fail("Error decoding publish " + e.getMessage());
}
publishList.add(publish);
}
}
@Override
public void onSend(MQTTFrame frame) {
LOG.info("Client sent:\n" + frame);
}
});
final BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
// create overlapping subscriptions with different QoSs
final String TOPIC = "TopicA/";
final String[] subs = { TOPIC, "+/"};
connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)});
// publish non-retained message
connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
// drop subs without acknowledging messages, then subscribe and receive again
connection.unsubscribe(subs);
connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1], QoS.EXACTLY_ONCE)});
// wait for all acks to be processed
Thread.sleep(1000);
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
msg.ack();
msg = connection.receive(5000, TimeUnit.MILLISECONDS);
assertNotNull(msg);
assertEquals(TOPIC, new String(msg.getPayload()));
msg.ack();
// make sure we received duplicate message ids
for (int i = 0; i < publishList.size(); i++) {
boolean found = false;
for (int j = 0; j < publishList.size(); j++) {
if (i != j) {
if (publishList.get(i).messageId() == publishList.get(j).messageId()) {
// one of them is a duplicate
assertTrue(publishList.get(i).dup() || publishList.get(j).dup());
found = true;
}
}
}
assertTrue("Dup Not found " + publishList.get(i), found);
}
connection.unsubscribe(subs);
connection.disconnect();
}
@Test(timeout=60 * 1000)
public void testSendMQTTReceiveJMS() throws Exception {
addMQTTConnector();
@ -691,7 +883,7 @@ public class MQTTTest extends AbstractMQTTTest {
payload = message.getPayload();
String messageContent = new String(payload);
LOG.info("Received message from topic: " + message.getTopic() +
" Message content: " + messageContent);
" Message content: " + messageContent);
message.ack();
}