ARTEMIS-1928 Fixing body conversion of LargeMessages to AMQP

This commit is contained in:
Clebert Suconic 2018-06-22 15:37:42 -04:00
parent 532317ceff
commit efd966d88d
2 changed files with 39 additions and 7 deletions

View File

@ -60,6 +60,7 @@ import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@ -381,17 +382,17 @@ public class CoreAmqpConverter {
// will be unknown so we check for special cases of messages with special data
// encoded into the server message body.
ICoreMessage internalMessage = message.getInnerMessage();
int readerIndex = internalMessage.getBodyBuffer().readerIndex();
// this will represent a readOnly buffer for the message
ActiveMQBuffer buffer = internalMessage.getDataBuffer();
try {
Object s = internalMessage.getBodyBuffer().readNullableSimpleString();
Object s = buffer.readNullableSimpleString();
if (s != null) {
body = new AmqpValue(s.toString());
}
} catch (Throwable ignored) {
logger.debug("Exception ignored during conversion", ignored.getMessage(), ignored);
body = new AmqpValue("Conversion to AMQP error!");
} finally {
internalMessage.getBodyBuffer().readerIndex(readerIndex);
}
}

View File

@ -29,6 +29,12 @@ import java.util.Collection;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -392,9 +398,34 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
waitForBindings(1, "queues.0", 1, 1, false);
// sending Messages.. they should be load balanced
{
ConnectionFactory cf = getJmsConnectionFactory(0);
if (protocol.equals("AMQP")) {
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
locator.setMinLargeMessageSize(1024);
ClientSessionFactory coreFactory = locator.createSessionFactory();
ClientSession clientSession = coreFactory.createSession();
ClientProducer producer = clientSession.createProducer(queueName);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage message = clientSession.createMessage((byte)0, true);
StringBuffer stringbuffer = new StringBuffer();
stringbuffer.append("hello");
if (i % 3 == 0) {
// making 1/3 of the messages to be large message
for (int j = 0; j < 10 * 1024; j++) {
stringbuffer.append(" ");
}
}
message.getBodyBuffer().writeUTF(stringbuffer.toString());
producer.send(message);
}
coreFactory.close();
} else {
// sending Messages.. they should be load balanced
ConnectionFactory cf = getJmsConnectionFactory(0);
Connection cn = cf.createConnection();
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));