This commit is contained in:
Martyn Taylor 2017-11-16 11:28:47 +00:00
commit 3c04de3abb
6 changed files with 223 additions and 5 deletions

View File

@ -209,7 +209,7 @@ public class AMQPSessionCallback implements SessionCallback {
filter = SelectorTranslator.convertToActiveMQFilterString(filter);
ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly);
ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filter), browserOnly, false, null);
// AMQP handles its own flow control for when it's started
consumer.setStarted(true);

View File

@ -186,6 +186,7 @@ public class AmqpCoreConverter {
result.getInnerMessage().setReplyTo(message.getReplyTo());
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
result.getInnerMessage().setAddress(message.getAddress());
result.encode();

View File

@ -200,8 +200,10 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess
@Override
public void reset() throws JMSException {
bytesMessageReset(getReadBodyBuffer());
bytesMessageReset(getWriteBodyBuffer());
if (!message.isLargeMessage()) {
bytesMessageReset(getReadBodyBuffer());
bytesMessageReset(getWriteBodyBuffer());
}
}
}

View File

@ -369,11 +369,15 @@ public class ServerJMSMessage implements Message {
* Encode the body into the internal message
*/
public void encode() throws Exception {
message.getBodyBuffer().resetReaderIndex();
if (!message.isLargeMessage()) {
message.getBodyBuffer().resetReaderIndex();
}
}
public void decode() throws Exception {
message.getBodyBuffer().resetReaderIndex();
if (!message.isLargeMessage()) {
message.getBodyBuffer().resetReaderIndex();
}
}
@Override

View File

@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
@ -197,6 +198,27 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
return currentRefCount;
}
// Even though not recommended, in certain instances
// we may need to convert a large message back to a whole buffer
// in a way you can convert
@Override
public ActiveMQBuffer getReadOnlyBodyBuffer() {
try {
file.open();
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer((int) file.size());
file.read(buffer.toByteBuffer());
return buffer;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
file.close();
} catch (Exception ignored) {
}
}
}
@Override
public boolean isLargeMessage() {
return true;

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
public class AmqpLargeMessageTest extends AmqpClientTestSupport {
private static final int FRAME_SIZE = 10024;
private static final int PAYLOAD = 110 * 1024;
String testQueueName = "ConnectionFrameSize";
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
params.put("maxFrameSize", FRAME_SIZE);
}
@Override
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
}
@Override
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
//server.getConfiguration().addAcceptorConfiguration("tcp", "tcp://localhost:5445");
server.getConfiguration().addAcceptorConfiguration("tcp", "tcp://localhost:61616");
}
@Test(timeout = 60000)
public void testSendAMQPReceiveCore() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
int nMsgs = 200;
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
sendMessages(nMsgs, connection);
int count = getMessageCount(server.getPostOffice(), testQueueName);
assertEquals(nMsgs, count);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
receiveJMS(nMsgs, factory);
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendAMQPReceiveOpenWire() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
int nMsgs = 200;
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
sendMessages(nMsgs, connection);
int count = getMessageCount(server.getPostOffice(), testQueueName);
assertEquals(nMsgs, count);
ConnectionFactory factory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616");
receiveJMS(nMsgs, factory);
} finally {
connection.close();
}
}
private void sendMessages(int nMsgs, AmqpConnection connection) throws Exception {
connection.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(testQueueName);
for (int i = 0; i < nMsgs; ++i) {
AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
message.setApplicationProperty("i", (Integer) i);
message.setDurable(true);
sender.send(message);
}
session.close();
}
private void receiveJMS(int nMsgs,
ConnectionFactory factory) throws JMSException {
Connection connection2 = factory.createConnection();
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection2.start();
MessageConsumer consumer = session2.createConsumer(session2.createQueue(testQueueName));
for (int i = 0; i < nMsgs; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getIntProperty("i"));
}
connection2.close();
}
@Test(timeout = 60000)
public void testSendAMQPReceiveAMQP() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
String testQueueName = "ConnectionFrameSize";
int nMsgs = 200;
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
sendMessages(nMsgs, connection);
int count = getMessageCount(server.getPostOffice(), testQueueName);
assertEquals(nMsgs, count);
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(testQueueName);
receiver.flow(nMsgs);
for (int i = 0; i < nMsgs; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("failed at " + i, message);
MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
if (wrapped.getBody() instanceof Data) {
// converters can change this to AmqValue
Data data = (Data) wrapped.getBody();
System.out.println("received : message: " + data.getValue().getLength());
assertEquals(PAYLOAD, data.getValue().getLength());
}
message.accept();
}
session.close();
} finally {
connection.close();
}
}
private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[payloadSize];
for (int i = 0; i < payload.length; i++) {
payload[i] = value;
}
message.setBytes(payload);
return message;
}
}