ARTEMIS-696 Broker fails when client sends messages in multiple transfer frames

This commit is contained in:
Howard Gao 2016-08-24 21:54:08 +08:00 committed by Clebert Suconic
parent b51142ae0c
commit 8fccd5df42
6 changed files with 195 additions and 45 deletions

View File

@ -64,6 +64,8 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
* */
private String pubSubPrefix = ActiveMQDestination.JMS_TOPIC_ADDRESS_PREFIX;
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) {
this.factory = factory;
this.server = server;
@ -111,7 +113,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
String id = server.getConfiguration().getName();
AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().
createConnection(connectionCallback, id, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
createConnection(connectionCallback, id, (int) ttl, getMaxFrameSize(), DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool());
Executor executor = server.getExecutorFactory().getExecutor();
@ -164,4 +166,11 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
}
public int getMaxFrameSize() {
return maxFrameSize;
}
public void setMaxFrameSize(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
}

View File

@ -116,7 +116,10 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
receiver = ((Receiver) delivery.getLink());
if (!delivery.isReadable()) {
System.err.println("!!!!! Readable!!!!!!!");
return;
}
if (delivery.isPartial()) {
return;
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.proton;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Test;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ProtonMaxFrameSizeTest extends ProtonTestBase {
private static final int FRAME_SIZE = 512;
protected void configureAmqp(Map<String, Object> params) {
params.put("maxFrameSize", FRAME_SIZE);
}
@Test
public void testMultipleTransfers() throws Exception {
String testQueueName = "ConnectionFrameSize";
int nMsgs = 200;
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.createConnection();
try {
amqpConnection.connect();
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender("jms.queue." + testQueueName);
final int payload = FRAME_SIZE * 16;
for (int i = 0; i < nMsgs; ++i) {
AmqpMessage message = createAmqpMessage((byte) 'A', payload);
sender.send(message);
}
int count = getMessageCount(server.getPostOffice(), "jms.queue." + testQueueName);
assertEquals(nMsgs, count);
AmqpReceiver receiver = session.createReceiver("jms.queue." + testQueueName);
receiver.flow(nMsgs);
for (int i = 0; i < nMsgs; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("failed at " + i, message);
MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
Data data = (Data) wrapped.getBody();
System.out.println("received : message: " + data.getValue().getLength());
assertEquals(payload, data.getValue().getLength());
message.accept();
}
}
finally {
amqpConnection.close();
}
}
private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[payloadSize];
for (int i = 0; i < payload.length; i++) {
payload[i] = value;
}
message.setBytes(payload);
return message;
}
}

View File

@ -17,10 +17,6 @@
package org.apache.activemq.artemis.tests.integration.proton;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.message.ProtonJMessage;
@ -45,36 +41,27 @@ import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ProtonPubSubTest extends ActiveMQTestBase {
public class ProtonPubSubTest extends ProtonTestBase {
private final String prefix = "foo.bar.";
private final String pubAddress = "pubAddress";
private final String prefixedPubAddress = prefix + "pubAddress";
private final SimpleString ssPubAddress = new SimpleString(pubAddress);
private final SimpleString ssprefixedPubAddress = new SimpleString(prefixedPubAddress);
private ActiveMQServer server;
private Connection connection;
private JmsConnectionFactory factory;
protected void configureAmqp(Map<String, Object> params) {
params.put("pubSubPrefix", prefix);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
disableCheckThread();
server = this.createServer(true, true);
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, "5672");
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
HashMap<String, Object> extraParams = new HashMap<>();
extraParams.put("pubSubPrefix", prefix);
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "foo", extraParams);
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
server.start();
server.createQueue(ssPubAddress, ssPubAddress, new SimpleString("foo=bar"), false, true);
server.createQueue(ssprefixedPubAddress, ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
factory = new JmsConnectionFactory("amqp://localhost:5672");
@ -97,8 +84,6 @@ public class ProtonPubSubTest extends ActiveMQTestBase {
if (connection != null) {
connection.close();
}
server.stop();
}
finally {
super.tearDown();

View File

@ -51,13 +51,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -88,7 +85,7 @@ import org.proton.plug.test.Constants;
import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
@RunWith(Parameterized.class)
public class ProtonTest extends ActiveMQTestBase {
public class ProtonTest extends ProtonTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672";
@ -131,7 +128,6 @@ public class ProtonTest extends ActiveMQTestBase {
}
}
private ActiveMQServer server;
private final String coreAddress;
private final String address;
private Connection connection;
@ -140,23 +136,7 @@ public class ProtonTest extends ActiveMQTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
disableCheckThread();
server = this.createServer(true, true);
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, "5672");
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
server.getConfiguration().setName(brokerName);
// Default Page
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
server.getConfiguration().getAddressesSettings().put("#", addressSettings);
server.start();
server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null, true, false);
server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress + "1"), null, true, false);
server.createQueue(new SimpleString(coreAddress + "2"), new SimpleString(coreAddress + "2"), null, true, false);
@ -191,8 +171,6 @@ public class ProtonTest extends ActiveMQTestBase {
if (connection != null) {
connection.close();
}
server.stop();
}
finally {
super.tearDown();

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.proton;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Before;
import java.util.HashMap;
import java.util.Map;
public class ProtonTestBase extends ActiveMQTestBase {
protected String brokerName = "my-broker";
protected ActiveMQServer server;
protected String tcpAmqpConnectionUri = "tcp://localhost:5672";
protected String userName = "guest";
protected String password = "guest";
@Override
@Before
public void setUp() throws Exception {
super.setUp();
disableCheckThread();
server = this.createServer(true, true);
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, "5672");
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
HashMap<String, Object> amqpParams = new HashMap<>();
configureAmqp(amqpParams);
TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
server.getConfiguration().setName(brokerName);
// Default Page
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
server.getConfiguration().getAddressesSettings().put("#", addressSettings);
server.start();
}
protected void configureAmqp(Map<String, Object> params) {
}
@Override
@After
public void tearDown() throws Exception {
try {
server.stop();
}
finally {
super.tearDown();
}
}
}