This closes #1512 ARTEMIS-1387 Fix AMQPtoMQTT and Link route issues
This commit is contained in:
commit
f8ccb6d31d
|
@ -77,7 +77,7 @@ public interface ICoreMessage extends Message {
|
|||
map.put("userID", "ID:" + userID.toString());
|
||||
}
|
||||
|
||||
map.put("address", getAddress());
|
||||
map.put("address", getAddress() == null ? "" : getAddress());
|
||||
map.put("type", getType());
|
||||
map.put("durable", isDurable());
|
||||
map.put("expiration", getExpiration());
|
||||
|
|
|
@ -616,7 +616,7 @@ public interface Message {
|
|||
map.put("userID", "ID:" + userID.toString());
|
||||
}
|
||||
|
||||
map.put("address", getAddress());
|
||||
map.put("address", getAddress() == null ? "" : getAddress());
|
||||
map.put("durable", isDurable());
|
||||
map.put("expiration", getExpiration());
|
||||
map.put("timestamp", getTimestamp());
|
||||
|
|
|
@ -24,12 +24,12 @@ import io.netty.buffer.ByteBufAllocator;
|
|||
import io.netty.buffer.EmptyByteBuf;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
@ -112,7 +112,7 @@ public class MQTTPublishManager {
|
|||
* to original ID and consumer in the Session state. This way we can look up the consumer Id and the message Id from
|
||||
* the PubAck or PubRec message id. *
|
||||
*/
|
||||
protected void sendMessage(CoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
|
||||
protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
|
||||
// This is to allow retries of PubRel.
|
||||
if (isManagementConsumer(consumer)) {
|
||||
sendPubRelMessage(message);
|
||||
|
@ -257,8 +257,8 @@ public class MQTTPublishManager {
|
|||
}
|
||||
}
|
||||
|
||||
private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) {
|
||||
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration());
|
||||
private void sendServerMessage(int messageId, ICoreMessage message, int deliveryCount, int qos) {
|
||||
String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress() == null ? "" : message.getAddress().toString(), session.getWildcardConfiguration());
|
||||
boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
|
||||
|
||||
ByteBuf payload;
|
||||
|
|
|
@ -17,10 +17,8 @@
|
|||
|
||||
package org.apache.activemq.artemis.core.protocol.mqtt;
|
||||
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||
|
@ -49,7 +47,7 @@ public class MQTTSessionCallback implements SessionCallback {
|
|||
ServerConsumer consumer,
|
||||
int deliveryCount) {
|
||||
try {
|
||||
session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount);
|
||||
session.getMqttPublishManager().sendMessage(message.toCore(), consumer, deliveryCount);
|
||||
} catch (Exception e) {
|
||||
log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
|
||||
}
|
||||
|
|
|
@ -84,6 +84,10 @@ public class MQTTUtil {
|
|||
private static final MQTTLogger logger = MQTTLogger.LOGGER;
|
||||
|
||||
public static String convertCoreAddressFilterToMQTT(String filter, WildcardConfiguration wildcardConfiguration) {
|
||||
if (filter == null) {
|
||||
return "";
|
||||
}
|
||||
|
||||
if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
|
||||
filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
|
||||
}
|
||||
|
|
|
@ -137,7 +137,7 @@ public final class OpenTypeSupport {
|
|||
} else {
|
||||
rc.put(CompositeDataConstants.USER_ID, "");
|
||||
}
|
||||
rc.put(CompositeDataConstants.ADDRESS, m.getAddress().toString());
|
||||
rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : m.getAddress().toString());
|
||||
rc.put(CompositeDataConstants.TYPE, m.getType());
|
||||
rc.put(CompositeDataConstants.DURABLE, m.isDurable());
|
||||
rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration());
|
||||
|
|
|
@ -25,6 +25,7 @@ import javax.jms.Session;
|
|||
import java.io.EOFException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.ProtocolException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
|
@ -48,6 +49,11 @@ import org.apache.activemq.artemis.core.server.Queue;
|
|||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.fusesource.mqtt.client.BlockingConnection;
|
||||
import org.fusesource.mqtt.client.MQTT;
|
||||
import org.fusesource.mqtt.client.MQTTException;
|
||||
|
@ -71,6 +77,8 @@ public class MQTTTest extends MQTTTestSupport {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
|
||||
|
||||
private static final String AMQP_URI = "tcp://localhost:61616";
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -1162,6 +1170,35 @@ public class MQTTTest extends MQTTTestSupport {
|
|||
doTestSendMQTTReceiveJMS("foo.*", "foo/bar");
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testLinkRouteAmqpReceiveMQTT() throws Exception {
|
||||
AmqpClient client = new AmqpClient(new URI(AMQP_URI), null, null);
|
||||
AmqpConnection connection = client.connect();
|
||||
|
||||
try {
|
||||
AmqpSession session = connection.createSession();
|
||||
AmqpSender sender = session.createSender("test", true);
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
|
||||
MQTT mqtt = createMQTTConnection();
|
||||
mqtt.setClientId("TestClient");
|
||||
BlockingConnection blockingConnection = mqtt.blockingConnection();
|
||||
try {
|
||||
blockingConnection.connect();
|
||||
Topic t = new Topic("test", QoS.AT_LEAST_ONCE);
|
||||
blockingConnection.subscribe(new Topic[] {t});
|
||||
assertNotNull(blockingConnection.receive(5, TimeUnit.SECONDS));
|
||||
} finally {
|
||||
blockingConnection.kill();
|
||||
}
|
||||
}
|
||||
|
||||
public void doTestSendMQTTReceiveJMS(String jmsTopicAddress, String mqttAddress) throws Exception {
|
||||
final MQTTClientProvider provider = getMQTTClientProvider();
|
||||
initializeConnection(provider);
|
||||
|
|
Loading…
Reference in New Issue