This commit is contained in:
Clebert Suconic 2019-08-25 23:29:04 -04:00
commit ede3902a4c
4 changed files with 160 additions and 42 deletions

View File

@ -208,8 +208,10 @@ public class AMQPMessage extends RefCountMessage {
* @return a MessageImpl that wraps the AMQP message data in this {@link AMQPMessage}
*/
public MessageImpl getProtonMessage() {
ensureMessageDataScanned();
ensureDataIsValid();
if (data == null) {
throw new NullPointerException("Data is not initialized");
}
ensureScanning();
MessageImpl protonMessage = null;
if (data != null) {
@ -228,11 +230,15 @@ public class AMQPMessage extends RefCountMessage {
* @return a copy of the Message Header if one exists or null if none present.
*/
public Header getHeader() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(headerPosition, Header.class);
}
private void ensureScanning() {
ensureDataIsValid();
ensureMessageDataScanned();
}
/**
* Returns a copy of the MessageAnnotations in the message if present or null. Changes to the
* returned DeliveryAnnotations instance do not affect the original Message.
@ -240,8 +246,7 @@ public class AMQPMessage extends RefCountMessage {
* @return a copy of the {@link DeliveryAnnotations} present in the message or null if non present.
*/
public DeliveryAnnotations getDeliveryAnnotations() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(deliveryAnnotationsPosition, DeliveryAnnotations.class);
}
@ -267,8 +272,7 @@ public class AMQPMessage extends RefCountMessage {
* @return a copy of the {@link MessageAnnotations} present in the message or null if non present.
*/
public MessageAnnotations getMessageAnnotations() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(messageAnnotationsPosition, MessageAnnotations.class);
}
@ -279,8 +283,7 @@ public class AMQPMessage extends RefCountMessage {
* @return a copy of the Message Properties if one exists or null if none present.
*/
public Properties getProperties() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(propertiesPosition, Properties.class);
}
@ -291,8 +294,7 @@ public class AMQPMessage extends RefCountMessage {
* @return a copy of the {@link ApplicationProperties} present in the message or null if non present.
*/
public ApplicationProperties getApplicationProperties() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
}
@ -310,8 +312,7 @@ public class AMQPMessage extends RefCountMessage {
* @return the Section that makes up the body of this message.
*/
public Section getBody() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
// We only handle Sections of AmqpSequence, AmqpValue and Data types so we filter on those.
// There could also be a Footer and no body so this will prevent a faulty return type in case
@ -327,8 +328,7 @@ public class AMQPMessage extends RefCountMessage {
* @return the Footer that was encoded into this AMQP Message.
*/
public Footer getFooter() {
ensureMessageDataScanned();
ensureDataIsValid();
ensureScanning();
return scanForMessageSection(Math.max(0, remainingBodyPosition), Footer.class);
}
@ -445,11 +445,11 @@ public class AMQPMessage extends RefCountMessage {
private synchronized void ensureMessageDataScanned() {
if (!messageDataScanned) {
scanMessageData();
messageDataScanned = true;
}
}
private synchronized void scanMessageData() {
this.messageDataScanned = true;
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(data.rewind());
@ -781,21 +781,17 @@ public class AMQPMessage extends RefCountMessage {
encodeMessage();
scanMessageData();
messageDataScanned = true;
modified = false;
}
private synchronized void ensureDataIsValid() {
assert data != null;
if (modified) {
encodeMessage();
modified = false;
}
}
private synchronized void encodeMessage() {
this.modified = false;
this.messageDataScanned = false;
int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
EncoderImpl encoder = TLSEncode.getEncoder();
@ -1542,14 +1538,15 @@ public class AMQPMessage extends RefCountMessage {
@Override
public String toString() {
return "AMQPMessage [durable=" + isDurable() +
/* return "AMQPMessage [durable=" + isDurable() +
", messageID=" + getMessageID() +
", address=" + getAddress() +
", size=" + getEncodeSize() +
", applicationProperties=" + applicationProperties +
", properties=" + properties +
", extraProperties = " + getExtraProperties() +
"]";
"]"; */
return super.toString();
}
@Override

View File

@ -17,18 +17,14 @@
package org.apache.activemq.artemis.protocol.amqp.converter;
import java.nio.ByteBuffer;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
@ -59,9 +55,11 @@ import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
public class TestConversions extends Assert {
@ -421,8 +419,8 @@ public class TestConversions extends Assert {
}
encodedMessage.messageChanged();
AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
Assert.assertEquals(text, (String)value.getValue());
AmqpValue value = (AmqpValue) encodedMessage.getProtonMessage().getBody();
Assert.assertEquals(text, (String) value.getValue());
// this line is needed to force a failure
ICoreMessage coreMessage = encodedMessage.toCore();
@ -452,11 +450,11 @@ public class TestConversions extends Assert {
encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
for (int i = 0; i < 100; i++) {
encodedMessage.getApplicationProperties().getValue().put("another" + i, "value" + i);
encodedMessage.putStringProperty("another" + i, "value" + i);
encodedMessage.messageChanged();
encodedMessage.reencode();
AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
Assert.assertEquals(text, (String)value.getValue());
AmqpValue value = (AmqpValue) encodedMessage.getProtonMessage().getBody();
Assert.assertEquals(text, (String) value.getValue());
ICoreMessage coreMessage = encodedMessage.toCore();
if (logger.isDebugEnabled()) {
logger.debug("Converted message: " + coreMessage);
@ -476,7 +474,43 @@ public class TestConversions extends Assert {
encodedMessage = encodeAndCreateAMQPMessage(message);
}
}
}
@Test
public void testExpandNoReencode() throws Exception {
Map<String, Object> mapprop = createPropertiesMap();
ApplicationProperties properties = new ApplicationProperties(mapprop);
properties.getValue().put("hello", "hello");
MessageImpl message = (MessageImpl) Message.Factory.create();
MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
message.setMessageAnnotations(annotations);
message.setApplicationProperties(properties);
String text = "someText";
message.setBody(new AmqpValue(text));
AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
TypedProperties extraProperties = new TypedProperties();
encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
for (int i = 0; i < 100; i++) {
encodedMessage.setMessageID(333L);
if (i % 3 == 0) {
encodedMessage.referenceOriginalMessage(encodedMessage, "SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT");
} else {
encodedMessage.referenceOriginalMessage(encodedMessage, "XXX");
}
encodedMessage.putStringProperty("another " + i, "value " + i);
encodedMessage.messageChanged();
if (i % 2 == 0) {
encodedMessage.setAddress("THIS-IS-A-BIG-THIS-IS-A-BIG-ADDRESS-THIS-IS-A-BIG-ADDRESS-RIGHT");
} else {
encodedMessage.setAddress("A"); // small address
}
encodedMessage.messageChanged();
ICoreMessage coreMessage = encodedMessage.toCore();
}
}

View File

@ -106,8 +106,6 @@ public class DivertImpl implements Divert {
copy.setExpiration(message.getExpiration());
copy.reencode();
switch (routingType) {
case ANYCAST:
copy.setRoutingType(RoutingType.ANYCAST);
@ -126,7 +124,8 @@ public class DivertImpl implements Divert {
copy = transformer.transform(copy);
}
copy.messageChanged();
// We call reencode at the end only, in a single call.
copy.reencode();
} else {
copy = message;
}

View File

@ -16,6 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.divert;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@ -34,6 +41,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -42,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.junit.Assert;
import org.junit.Test;
@ -128,6 +137,85 @@ public class DivertTest extends ActiveMQTestBase {
Assert.assertNull(consumer2.receiveImmediate());
}
@Test
public void testCrossProtocol() throws Exception {
final String testForConvert = "testConvert";
final String testAddress = "testAddress";
final String forwardAddress = "forwardAddress";
DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).
setRoutingType(ComponentConfigurationRoutingType.ANYCAST);
Configuration config = createDefaultNettyConfig().addDivertConfiguration(divertConf);
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
server.start();
final SimpleString queueName1 = SimpleString.toSimpleString(testAddress);
final SimpleString queueName2 = SimpleString.toSimpleString(forwardAddress);
{ // this is setting up the queues
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
session.createQueue(new SimpleString(testAddress), RoutingType.ANYCAST, queueName1, null, true);
session.createQueue(new SimpleString(testForConvert), RoutingType.ANYCAST, SimpleString.toSimpleString(testForConvert), null, true);
session.createQueue(new SimpleString(forwardAddress), RoutingType.MULTICAST, queueName2, null, true);
}
ConnectionFactory coreCF = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
Connection coreConnection = coreCF.createConnection();
Session coreSession = coreConnection.createSession(Session.AUTO_ACKNOWLEDGE);
MessageProducer producerCore = coreSession.createProducer(coreSession.createQueue(testForConvert));
for (int i = 0; i < 10; i++) {
TextMessage textMessage = coreSession.createTextMessage("text" + i);
//if (i % 2 == 0) textMessage.setIntProperty("key", i);
producerCore.send(textMessage);
}
producerCore.close();
ConnectionFactory amqpCF = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");
Connection amqpConnection = amqpCF.createConnection();
Session amqpSession = amqpConnection.createSession(Session.AUTO_ACKNOWLEDGE);
Queue amqpQueue = amqpSession.createQueue(testAddress);
MessageProducer producer = amqpSession.createProducer(amqpQueue);
MessageConsumer consumerFromConvert = amqpSession.createConsumer(amqpSession.createQueue(testForConvert));
amqpConnection.start();
for (int i = 0; i < 10; i++) {
javax.jms.Message received = consumerFromConvert.receive(5000);
Assert.assertNotNull(received);
producer.send(received);
}
Queue outQueue = coreSession.createQueue(queueName2.toString());
MessageConsumer consumer = coreSession.createConsumer(outQueue);
coreConnection.start();
for (int i = 0; i < 10; i++) {
TextMessage textMessage = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(textMessage);
Assert.assertEquals("text" + i, textMessage.getText());
//if (i % 2 == 0) Assert.assertEquals(i, textMessage.getIntProperty("key"));
}
Assert.assertNull(consumer.receiveNoWait());
}
@Test
public void testSingleNonExclusiveDivertWithRoutingType() throws Exception {
final String testAddress = "testAddress";