From 9daa0321b668fbb1f45349f9e33937aa5a7c705e Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 13 Nov 2017 16:36:47 -0500 Subject: [PATCH] ARTEMIS-1514 Large message fix I'm doing an overal improvement on large message support for AMQP However this commit is just about a Bug on the converter. It will be moot after all the changes I'm making, but I would rather keep this separate as a way to cherry-pick on previous versions eventually. --- .../amqp/broker/AMQPSessionCallback.java | 2 +- .../amqp/converter/AmqpCoreConverter.java | 1 + .../converter/jms/ServerJMSBytesMessage.java | 6 +- .../amqp/converter/jms/ServerJMSMessage.java | 8 +- .../impl/journal/LargeServerMessageImpl.java | 22 ++ .../amqp/AmqpLargeMessageTest.java | 189 ++++++++++++++++++ 6 files changed, 223 insertions(+), 5 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 667d57aeee..42e9625337 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -209,7 +209,7 @@ public class AMQPSessionCallback implements SessionCallback { filter = SelectorTranslator.convertToActiveMQFilterString(filter); - ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly); + ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly, false, null); // AMQP handles its own flow control for when it's started consumer.setStarted(true); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index acd940bd6d..8d05b2c713 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -186,6 +186,7 @@ public class AmqpCoreConverter { result.getInnerMessage().setReplyTo(message.getReplyTo()); result.getInnerMessage().setDurable(message.isDurable()); result.getInnerMessage().setPriority(message.getPriority()); + result.getInnerMessage().setAddress(message.getAddress()); result.encode(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java index b6a829d78c..a94cfde25f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java @@ -200,8 +200,10 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess @Override public void reset() throws JMSException { - bytesMessageReset(getReadBodyBuffer()); - bytesMessageReset(getWriteBodyBuffer()); + if (!message.isLargeMessage()) { + bytesMessageReset(getReadBodyBuffer()); + bytesMessageReset(getWriteBodyBuffer()); + } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java index 2a52f7af76..5962e39b63 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java @@ -369,11 +369,15 @@ public class ServerJMSMessage implements Message { * Encode the body into the internal message */ public void encode() throws Exception { - message.getBodyBuffer().resetReaderIndex(); + if (!message.isLargeMessage()) { + message.getBodyBuffer().resetReaderIndex(); + } } public void decode() throws Exception { - message.getBodyBuffer().resetReaderIndex(); + if (!message.isLargeMessage()) { + message.getBodyBuffer().resetReaderIndex(); + } } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 0a2d3b29db..11d1a2148c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; @@ -197,6 +198,27 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe return currentRefCount; } + // Even though not recommended, in certain instances + // we may need to convert a large message back to a whole buffer + // in a way you can convert + @Override + public ActiveMQBuffer getReadOnlyBodyBuffer() { + try { + file.open(); + ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer((int) file.size()); + file.read(buffer.toByteBuffer()); + return buffer; + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + try { + file.close(); + } catch (Exception ignored) { + } + + } + } + @Override public boolean isLargeMessage() { return true; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java new file mode 100644 index 0000000000..07ab5a566d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.junit.Assert; +import org.junit.Test; + +public class AmqpLargeMessageTest extends AmqpClientTestSupport { + + private static final int FRAME_SIZE = 10024; + private static final int PAYLOAD = 110 * 1024; + + String testQueueName = "ConnectionFrameSize"; + + @Override + protected void configureAMQPAcceptorParameters(Map params) { + params.put("maxFrameSize", FRAME_SIZE); + } + + @Override + protected void createAddressAndQueues(ActiveMQServer server) throws Exception { + } + + + @Override + protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { + //server.getConfiguration().addAcceptorConfiguration("tcp", "tcp://localhost:5445"); + server.getConfiguration().addAcceptorConfiguration("tcp", "tcp://localhost:61616"); + } + + + @Test(timeout = 60000) + public void testSendAMQPReceiveCore() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + int nMsgs = 200; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + sendMessages(nMsgs, connection); + + int count = getMessageCount(server.getPostOffice(), testQueueName); + assertEquals(nMsgs, count); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + receiveJMS(nMsgs, factory); + } finally { + connection.close(); + } + } + + + @Test(timeout = 60000) + public void testSendAMQPReceiveOpenWire() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + int nMsgs = 200; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + sendMessages(nMsgs, connection); + + int count = getMessageCount(server.getPostOffice(), testQueueName); + assertEquals(nMsgs, count); + + ConnectionFactory factory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); + receiveJMS(nMsgs, factory); + } finally { + connection.close(); + } + } + + private void sendMessages(int nMsgs, AmqpConnection connection) throws Exception { + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(testQueueName); + + for (int i = 0; i < nMsgs; ++i) { + AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD); + message.setApplicationProperty("i", (Integer) i); + message.setDurable(true); + sender.send(message); + } + + session.close(); + } + + private void receiveJMS(int nMsgs, + ConnectionFactory factory) throws JMSException { + Connection connection2 = factory.createConnection(); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection2.start(); + MessageConsumer consumer = session2.createConsumer(session2.createQueue(testQueueName)); + + for (int i = 0; i < nMsgs; i++) { + Message message = consumer.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(i, message.getIntProperty("i")); + } + + connection2.close(); + } + + @Test(timeout = 60000) + public void testSendAMQPReceiveAMQP() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST)); + + String testQueueName = "ConnectionFrameSize"; + int nMsgs = 200; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + try { + sendMessages(nMsgs, connection); + + int count = getMessageCount(server.getPostOffice(), testQueueName); + assertEquals(nMsgs, count); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(testQueueName); + receiver.flow(nMsgs); + + for (int i = 0; i < nMsgs; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("failed at " + i, message); + MessageImpl wrapped = (MessageImpl) message.getWrappedMessage(); + if (wrapped.getBody() instanceof Data) { + // converters can change this to AmqValue + Data data = (Data) wrapped.getBody(); + System.out.println("received : message: " + data.getValue().getLength()); + assertEquals(PAYLOAD, data.getValue().getLength()); + } + message.accept(); + } + session.close(); + + } finally { + connection.close(); + } + } + + private AmqpMessage createAmqpMessage(byte value, int payloadSize) { + AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payload.length; i++) { + payload[i] = value; + } + message.setBytes(payload); + return message; + } +}