mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-07 18:49:14 +00:00
ARTEMIS-1514 Large message fix
I'm doing an overal improvement on large message support for AMQP However this commit is just about a Bug on the converter. It will be moot after all the changes I'm making, but I would rather keep this separate as a way to cherry-pick on previous versions eventually.
This commit is contained in:
parent
5cea228dbc
commit
9daa0321b6
@ -209,7 +209,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||||||
|
|
||||||
filter = SelectorTranslator.convertToActiveMQFilterString(filter);
|
filter = SelectorTranslator.convertToActiveMQFilterString(filter);
|
||||||
|
|
||||||
ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly);
|
ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly, false, null);
|
||||||
|
|
||||||
// AMQP handles its own flow control for when it's started
|
// AMQP handles its own flow control for when it's started
|
||||||
consumer.setStarted(true);
|
consumer.setStarted(true);
|
||||||
|
@ -186,6 +186,7 @@ public class AmqpCoreConverter {
|
|||||||
result.getInnerMessage().setReplyTo(message.getReplyTo());
|
result.getInnerMessage().setReplyTo(message.getReplyTo());
|
||||||
result.getInnerMessage().setDurable(message.isDurable());
|
result.getInnerMessage().setDurable(message.isDurable());
|
||||||
result.getInnerMessage().setPriority(message.getPriority());
|
result.getInnerMessage().setPriority(message.getPriority());
|
||||||
|
result.getInnerMessage().setAddress(message.getAddress());
|
||||||
|
|
||||||
result.encode();
|
result.encode();
|
||||||
|
|
||||||
|
@ -200,8 +200,10 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset() throws JMSException {
|
public void reset() throws JMSException {
|
||||||
bytesMessageReset(getReadBodyBuffer());
|
if (!message.isLargeMessage()) {
|
||||||
bytesMessageReset(getWriteBodyBuffer());
|
bytesMessageReset(getReadBodyBuffer());
|
||||||
|
bytesMessageReset(getWriteBodyBuffer());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -369,11 +369,15 @@ public class ServerJMSMessage implements Message {
|
|||||||
* Encode the body into the internal message
|
* Encode the body into the internal message
|
||||||
*/
|
*/
|
||||||
public void encode() throws Exception {
|
public void encode() throws Exception {
|
||||||
message.getBodyBuffer().resetReaderIndex();
|
if (!message.isLargeMessage()) {
|
||||||
|
message.getBodyBuffer().resetReaderIndex();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void decode() throws Exception {
|
public void decode() throws Exception {
|
||||||
message.getBodyBuffer().resetReaderIndex();
|
if (!message.isLargeMessage()) {
|
||||||
|
message.getBodyBuffer().resetReaderIndex();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
|
||||||
@ -197,6 +198,27 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
|
|||||||
return currentRefCount;
|
return currentRefCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Even though not recommended, in certain instances
|
||||||
|
// we may need to convert a large message back to a whole buffer
|
||||||
|
// in a way you can convert
|
||||||
|
@Override
|
||||||
|
public ActiveMQBuffer getReadOnlyBodyBuffer() {
|
||||||
|
try {
|
||||||
|
file.open();
|
||||||
|
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer((int) file.size());
|
||||||
|
file.read(buffer.toByteBuffer());
|
||||||
|
return buffer;
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
file.close();
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isLargeMessage() {
|
public boolean isLargeMessage() {
|
||||||
return true;
|
return true;
|
||||||
|
@ -0,0 +1,189 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.Data;
|
||||||
|
import org.apache.qpid.proton.message.impl.MessageImpl;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class AmqpLargeMessageTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
|
private static final int FRAME_SIZE = 10024;
|
||||||
|
private static final int PAYLOAD = 110 * 1024;
|
||||||
|
|
||||||
|
String testQueueName = "ConnectionFrameSize";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
|
||||||
|
params.put("maxFrameSize", FRAME_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
|
||||||
|
//server.getConfiguration().addAcceptorConfiguration("tcp", "tcp://localhost:5445");
|
||||||
|
server.getConfiguration().addAcceptorConfiguration("tcp", "tcp://localhost:61616");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testSendAMQPReceiveCore() throws Exception {
|
||||||
|
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
|
||||||
|
|
||||||
|
int nMsgs = 200;
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
try {
|
||||||
|
sendMessages(nMsgs, connection);
|
||||||
|
|
||||||
|
int count = getMessageCount(server.getPostOffice(), testQueueName);
|
||||||
|
assertEquals(nMsgs, count);
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||||
|
receiveJMS(nMsgs, factory);
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testSendAMQPReceiveOpenWire() throws Exception {
|
||||||
|
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
|
||||||
|
|
||||||
|
int nMsgs = 200;
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
try {
|
||||||
|
sendMessages(nMsgs, connection);
|
||||||
|
|
||||||
|
int count = getMessageCount(server.getPostOffice(), testQueueName);
|
||||||
|
assertEquals(nMsgs, count);
|
||||||
|
|
||||||
|
ConnectionFactory factory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||||
|
receiveJMS(nMsgs, factory);
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendMessages(int nMsgs, AmqpConnection connection) throws Exception {
|
||||||
|
connection.connect();
|
||||||
|
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
AmqpSender sender = session.createSender(testQueueName);
|
||||||
|
|
||||||
|
for (int i = 0; i < nMsgs; ++i) {
|
||||||
|
AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
|
||||||
|
message.setApplicationProperty("i", (Integer) i);
|
||||||
|
message.setDurable(true);
|
||||||
|
sender.send(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void receiveJMS(int nMsgs,
|
||||||
|
ConnectionFactory factory) throws JMSException {
|
||||||
|
Connection connection2 = factory.createConnection();
|
||||||
|
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
connection2.start();
|
||||||
|
MessageConsumer consumer = session2.createConsumer(session2.createQueue(testQueueName));
|
||||||
|
|
||||||
|
for (int i = 0; i < nMsgs; i++) {
|
||||||
|
Message message = consumer.receive(5000);
|
||||||
|
Assert.assertNotNull(message);
|
||||||
|
Assert.assertEquals(i, message.getIntProperty("i"));
|
||||||
|
}
|
||||||
|
|
||||||
|
connection2.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testSendAMQPReceiveAMQP() throws Exception {
|
||||||
|
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
|
||||||
|
|
||||||
|
String testQueueName = "ConnectionFrameSize";
|
||||||
|
int nMsgs = 200;
|
||||||
|
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
try {
|
||||||
|
sendMessages(nMsgs, connection);
|
||||||
|
|
||||||
|
int count = getMessageCount(server.getPostOffice(), testQueueName);
|
||||||
|
assertEquals(nMsgs, count);
|
||||||
|
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
AmqpReceiver receiver = session.createReceiver(testQueueName);
|
||||||
|
receiver.flow(nMsgs);
|
||||||
|
|
||||||
|
for (int i = 0; i < nMsgs; ++i) {
|
||||||
|
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull("failed at " + i, message);
|
||||||
|
MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
|
||||||
|
if (wrapped.getBody() instanceof Data) {
|
||||||
|
// converters can change this to AmqValue
|
||||||
|
Data data = (Data) wrapped.getBody();
|
||||||
|
System.out.println("received : message: " + data.getValue().getLength());
|
||||||
|
assertEquals(PAYLOAD, data.getValue().getLength());
|
||||||
|
}
|
||||||
|
message.accept();
|
||||||
|
}
|
||||||
|
session.close();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
byte[] payload = new byte[payloadSize];
|
||||||
|
for (int i = 0; i < payload.length; i++) {
|
||||||
|
payload[i] = value;
|
||||||
|
}
|
||||||
|
message.setBytes(payload);
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user