ARTEMIS-1387 Fix AMQPtoMQTT and Link route issues

This commit is contained in:
Martyn Taylor 2017-09-05 17:27:27 +01:00
parent 125bd41f9d
commit 16dfd777b8
7 changed files with 49 additions and 10 deletions

View File

@ -77,7 +77,7 @@ public interface ICoreMessage extends Message {
map.put("userID", "ID:" + userID.toString());
}
map.put("address", getAddress());
map.put("address", getAddress() == null ? "" : getAddress());
map.put("type", getType());
map.put("durable", isDurable());
map.put("expiration", getExpiration());

View File

@ -616,7 +616,7 @@ public interface Message {
map.put("userID", "ID:" + userID.toString());
}
map.put("address", getAddress());
map.put("address", getAddress() == null ? "" : getAddress());
map.put("durable", isDurable());
map.put("expiration", getExpiration());
map.put("timestamp", getTimestamp());

View File

@ -24,12 +24,12 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.EmptyByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -112,7 +112,7 @@ public class MQTTPublishManager {
* to original ID and consumer in the Session state. This way we can look up the consumer Id and the message Id from
* the PubAck or PubRec message id. *
*/
protected void sendMessage(CoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
// This is to allow retries of PubRel.
if (isManagementConsumer(consumer)) {
sendPubRelMessage(message);
@ -257,8 +257,8 @@ public class MQTTPublishManager {
}
}
private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) {
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
private void sendServerMessage(int messageId, ICoreMessage message, int deliveryCount, int qos) {
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress() == null ? "" : message.getAddress().toString(), session.getWildcardConfiguration());
boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
ByteBuf payload;

View File

@ -17,10 +17,8 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@ -49,7 +47,7 @@ public class MQTTSessionCallback implements SessionCallback {
ServerConsumer consumer,
int deliveryCount) {
try {
session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount);
session.getMqttPublishManager().sendMessage(message.toCore(), consumer, deliveryCount);
} catch (Exception e) {
log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
}

View File

@ -84,6 +84,10 @@ public class MQTTUtil {
private static final MQTTLogger logger = MQTTLogger.LOGGER;
public static String convertCoreAddressFilterToMQTT(String filter, WildcardConfiguration wildcardConfiguration) {
if (filter == null) {
return "";
}
if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
}

View File

@ -137,7 +137,7 @@ public final class OpenTypeSupport {
} else {
rc.put(CompositeDataConstants.USER_ID, "");
}
rc.put(CompositeDataConstants.ADDRESS, m.getAddress().toString());
rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : m.getAddress().toString());
rc.put(CompositeDataConstants.TYPE, m.getType());
rc.put(CompositeDataConstants.DURABLE, m.isDurable());
rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration());

View File

@ -25,6 +25,7 @@ import javax.jms.Session;
import java.io.EOFException;
import java.lang.reflect.Field;
import java.net.ProtocolException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@ -48,6 +49,11 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.MQTTException;
@ -71,6 +77,8 @@ public class MQTTTest extends MQTTTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
private static final String AMQP_URI = "tcp://localhost:61616";
@Override
@Before
public void setUp() throws Exception {
@ -1162,6 +1170,35 @@ public class MQTTTest extends MQTTTestSupport {
doTestSendMQTTReceiveJMS("foo.*", "foo/bar");
}
@Test(timeout = 60 * 1000)
public void testLinkRouteAmqpReceiveMQTT() throws Exception {
AmqpClient client = new AmqpClient(new URI(AMQP_URI), null, null);
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("test", true);
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
sender.close();
} finally {
connection.close();
}
MQTT mqtt = createMQTTConnection();
mqtt.setClientId("TestClient");
BlockingConnection blockingConnection = mqtt.blockingConnection();
try {
blockingConnection.connect();
Topic t = new Topic("test", QoS.AT_LEAST_ONCE);
blockingConnection.subscribe(new Topic[] {t});
assertNotNull(blockingConnection.receive(5, TimeUnit.SECONDS));
} finally {
blockingConnection.kill();
}
}
public void doTestSendMQTTReceiveJMS(String jmsTopicAddress, String mqttAddress) throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);