diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java index 163fc6b826..3567307888 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java @@ -64,6 +64,8 @@ public class ProtonProtocolManager implements ProtocolManager, Noti * */ private String pubSubPrefix = ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX; + private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; + public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; @@ -111,7 +113,7 @@ public class ProtonProtocolManager implements ProtocolManager, Noti String id = server.getConfiguration().getName(); AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory(). - createConnection(connectionCallback, id, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool()); + createConnection(connectionCallback, id, (int) ttl, getMaxFrameSize(), DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool()); Executor executor = server.getExecutorFactory().getExecutor(); @@ -164,4 +166,11 @@ public class ProtonProtocolManager implements ProtocolManager, Noti } + public int getMaxFrameSize() { + return maxFrameSize; + } + + public void setMaxFrameSize(int maxFrameSize) { + this.maxFrameSize = maxFrameSize; + } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java index 0bbe8ca3a3..c564a9e610 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java @@ -116,7 +116,10 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext { receiver = ((Receiver) delivery.getLink()); if (!delivery.isReadable()) { - System.err.println("!!!!! Readable!!!!!!!"); + return; + } + + if (delivery.isPartial()) { return; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java new file mode 100644 index 0000000000..658ea8102a --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonMaxFrameSizeTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.proton; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.junit.Test; + +import java.net.URI; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class ProtonMaxFrameSizeTest extends ProtonTestBase { + + private static final int FRAME_SIZE = 512; + + protected void configureAmqp(Map params) { + params.put("maxFrameSize", FRAME_SIZE); + } + + @Test + public void testMultipleTransfers() throws Exception { + + String testQueueName = "ConnectionFrameSize"; + int nMsgs = 200; + + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + + + AmqpConnection amqpConnection = client.createConnection(); + + try { + amqpConnection.connect(); + + AmqpSession session = amqpConnection.createSession(); + AmqpSender sender = session.createSender("jms.queue." + testQueueName); + + final int payload = FRAME_SIZE * 16; + + for (int i = 0; i < nMsgs; ++i) { + AmqpMessage message = createAmqpMessage((byte) 'A', payload); + sender.send(message); + } + + int count = getMessageCount(server.getPostOffice(), "jms.queue." + testQueueName); + assertEquals(nMsgs, count); + + AmqpReceiver receiver = session.createReceiver("jms.queue." + testQueueName); + receiver.flow(nMsgs); + + for (int i = 0; i < nMsgs; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("failed at " + i, message); + MessageImpl wrapped = (MessageImpl) message.getWrappedMessage(); + Data data = (Data) wrapped.getBody(); + System.out.println("received : message: " + data.getValue().getLength()); + assertEquals(payload, data.getValue().getLength()); + message.accept(); + } + + } + finally { + amqpConnection.close(); + } + } + + private AmqpMessage createAmqpMessage(byte value, int payloadSize) { + AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payload.length; i++) { + payload[i] = value; + } + message.setBytes(payload); + return message; + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java index bf4e38cbee..a0fe626b5d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonPubSubTest.java @@ -17,10 +17,6 @@ package org.apache.activemq.artemis.tests.integration.proton; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.message.ProtonJMessage; @@ -45,36 +41,27 @@ import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; -import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; -public class ProtonPubSubTest extends ActiveMQTestBase { +public class ProtonPubSubTest extends ProtonTestBase { private final String prefix = "foo.bar."; private final String pubAddress = "pubAddress"; private final String prefixedPubAddress = prefix + "pubAddress"; private final SimpleString ssPubAddress = new SimpleString(pubAddress); private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress); - private ActiveMQServer server; private Connection connection; private JmsConnectionFactory factory; + protected void configureAmqp(Map params) { + params.put("pubSubPrefix", prefix); + } @Override @Before public void setUp() throws Exception { super.setUp(); - disableCheckThread(); - server = this.createServer(true, true); - HashMap params = new HashMap<>(); - params.put(TransportConstants.PORT_PROP_NAME, "5672"); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP"); - HashMap extraParams = new HashMap<>(); - extraParams.put("pubSubPrefix", prefix); - TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "foo", extraParams); - - server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); - server.start(); server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true); server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true); factory = new JmsConnectionFactory("amqp://localhost:5672"); @@ -97,8 +84,6 @@ public class ProtonPubSubTest extends ActiveMQTestBase { if (connection != null) { connection.close(); } - - server.stop(); } finally { super.tearDown(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index b197bc6428..711f6ff34e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -51,13 +51,10 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -88,7 +85,7 @@ import org.proton.plug.test.Constants; import org.proton.plug.test.minimalclient.SimpleAMQPConnector; @RunWith(Parameterized.class) -public class ProtonTest extends ActiveMQTestBase { +public class ProtonTest extends ProtonTestBase { private static final String amqpConnectionUri = "amqp://localhost:5672"; @@ -131,7 +128,6 @@ public class ProtonTest extends ActiveMQTestBase { } } - private ActiveMQServer server; private final String coreAddress; private final String address; private Connection connection; @@ -140,23 +136,7 @@ public class ProtonTest extends ActiveMQTestBase { @Before public void setUp() throws Exception { super.setUp(); - disableCheckThread(); - server = this.createServer(true, true); - HashMap params = new HashMap<>(); - params.put(TransportConstants.PORT_PROP_NAME, "5672"); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP"); - TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); - - server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); - server.getConfiguration().setName(brokerName); - - // Default Page - AddressSettings addressSettings = new AddressSettings(); - addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); - server.getConfiguration().getAddressesSettings().put("#", addressSettings); - - server.start(); server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false); server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false); server.createQueue(new SimpleString(coreAddress + "2"), new SimpleString(coreAddress + "2"), null, true, false); @@ -191,8 +171,6 @@ public class ProtonTest extends ActiveMQTestBase { if (connection != null) { connection.close(); } - - server.stop(); } finally { super.tearDown(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java new file mode 100644 index 0000000000..0acd4aef94 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestBase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.proton; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.After; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; + +public class ProtonTestBase extends ActiveMQTestBase { + + protected String brokerName = "my-broker"; + protected ActiveMQServer server; + + protected String tcpAmqpConnectionUri = "tcp://localhost:5672"; + protected String userName = "guest"; + protected String password = "guest"; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + disableCheckThread(); + + server = this.createServer(true, true); + HashMap params = new HashMap<>(); + params.put(TransportConstants.PORT_PROP_NAME, "5672"); + params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP"); + HashMap amqpParams = new HashMap<>(); + configureAmqp(amqpParams); + TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams); + + server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); + server.getConfiguration().setName(brokerName); + + // Default Page + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + server.getConfiguration().getAddressesSettings().put("#", addressSettings); + + server.start(); + } + + protected void configureAmqp(Map params) { + } + + @Override + @After + public void tearDown() throws Exception { + try { + server.stop(); + } + finally { + super.tearDown(); + } + } +}