This closes #2906
This commit is contained in:
commit
1ba1de4248
|
@ -20,6 +20,7 @@ import java.io.InputStream;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
|
||||
|
@ -77,6 +78,11 @@ public interface Message {
|
|||
// The value is somewhat higher on 64 bit architectures, probably due to different alignment
|
||||
int memoryOffset = 352;
|
||||
|
||||
// We use properties to establish routing context on clustering.
|
||||
// However if the client resends the message after receiving, it needs to be removed, so we mark these internal
|
||||
Predicate<SimpleString> INTERNAL_PROPERTY_NAMES_PREDICATE =
|
||||
name -> (name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) ||
|
||||
(name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS));
|
||||
|
||||
SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_AMQ_ROUTE_TO");
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.activemq.artemis.core.message.impl;
|
|||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.zip.DataFormatException;
|
||||
import java.util.zip.Inflater;
|
||||
|
||||
|
@ -52,11 +51,6 @@ import org.jboss.logging.Logger;
|
|||
* consumers */
|
||||
public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
||||
|
||||
// We use properties to establish routing context on clustering.
|
||||
// However if the client resends the message after receiving, it needs to be removed, so we mark these internal
|
||||
private static final Predicate<SimpleString> INTERNAL_PROPERTY_NAMES_PREDICATE =
|
||||
name -> (name.startsWith(Message.HDR_ROUTE_TO_IDS) && !name.equals(Message.HDR_ROUTE_TO_IDS)) ||
|
||||
(name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && !name.equals(Message.HDR_ROUTE_TO_ACK_IDS));
|
||||
public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE;
|
||||
|
||||
private volatile int memoryEstimate = -1;
|
||||
|
|
|
@ -849,7 +849,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
|
||||
public TypedProperties createExtraProperties() {
|
||||
if (extraProperties == null) {
|
||||
extraProperties = new TypedProperties();
|
||||
extraProperties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
|
||||
}
|
||||
return extraProperties;
|
||||
}
|
||||
|
@ -878,6 +878,13 @@ public class AMQPMessage extends RefCountMessage {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearInternalProperties() {
|
||||
if (extraProperties != null) {
|
||||
extraProperties.clearInternalProperties();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] removeExtraBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException {
|
||||
if (extraProperties == null) {
|
||||
|
|
|
@ -75,7 +75,7 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
|
|||
int size = buffer.readInt();
|
||||
|
||||
if (size != 0) {
|
||||
TypedProperties properties = new TypedProperties();
|
||||
TypedProperties properties = new TypedProperties(Message.INTERNAL_PROPERTY_NAMES_PREDICATE);
|
||||
properties.decode(buffer.byteBuf());
|
||||
message.setExtraProperties(properties);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,158 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
public class AMQPMessageRedistributionTest extends ClusterTestBase {
|
||||
|
||||
final String queue = "exampleQueue";
|
||||
final String broker0 = "amqp://localhost:61616";
|
||||
final String broker1 = "amqp://localhost:61617";
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
start();
|
||||
}
|
||||
|
||||
private void start() throws Exception {
|
||||
setupServers();
|
||||
|
||||
setRedistributionDelay(0);
|
||||
}
|
||||
|
||||
protected boolean isNetty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessageRedistributionWithoutDupAMQP() throws Exception {
|
||||
setupCluster(MessageLoadBalancingType.ON_DEMAND);
|
||||
|
||||
startServers(0, 1);
|
||||
|
||||
setupSessionFactory(0, isNetty());
|
||||
setupSessionFactory(1, isNetty());
|
||||
|
||||
createQueue(0, queue, queue, null, true, null, null, RoutingType.ANYCAST);
|
||||
createQueue(1, queue, queue, null, true, null, null, RoutingType.ANYCAST);
|
||||
|
||||
waitForBindings(0, queue, 1, 0, true);
|
||||
waitForBindings(1, queue, 1, 0, true);
|
||||
|
||||
waitForBindings(0, queue, 1, 0, false);
|
||||
waitForBindings(1, queue, 1, 0, false);
|
||||
|
||||
final int NUMBER_OF_MESSAGES = 20;
|
||||
|
||||
JmsConnectionFactory factory = new JmsConnectionFactory(broker0);
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(session.createQueue(queue));
|
||||
|
||||
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||
producer.send(session.createTextMessage("hello " + i));
|
||||
}
|
||||
connection.close();
|
||||
|
||||
receiveOnBothNodes(NUMBER_OF_MESSAGES);
|
||||
}
|
||||
|
||||
private void receiveOnBothNodes(int NUMBER_OF_MESSAGES) throws Exception {
|
||||
receiveBroker(broker1, 1);
|
||||
receiveBroker(broker0, 1);
|
||||
receiveBrokerAll(broker1, NUMBER_OF_MESSAGES - 2);
|
||||
}
|
||||
|
||||
private void receiveBrokerAll(String brokerurl, int num) throws Exception {
|
||||
JmsConnectionFactory factory = new JmsConnectionFactory(brokerurl);
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createConsumer(session.createQueue(queue));
|
||||
connection.start();
|
||||
|
||||
for (int i = 0; i < num; i++) {
|
||||
TextMessage msg = (TextMessage) consumer.receive(5000);
|
||||
assertNotNull(msg);
|
||||
}
|
||||
Message msg = consumer.receiveNoWait();
|
||||
assertNull(msg);
|
||||
connection.close();
|
||||
}
|
||||
|
||||
private void receiveBroker(String brokerurl, int num) throws Exception {
|
||||
JmsConnectionFactory factory = new JmsConnectionFactory(brokerurl);
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createConsumer(session.createQueue(queue));
|
||||
connection.start();
|
||||
|
||||
for (int i = 0; i < num; i++) {
|
||||
Message msg = consumer.receive(5000);
|
||||
assertNotNull(msg);
|
||||
}
|
||||
connection.close();
|
||||
}
|
||||
|
||||
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
|
||||
setupClusterConnection("cluster0", queue, messageLoadBalancingType, 1, isNetty(), 0, 1);
|
||||
|
||||
setupClusterConnection("cluster1", queue, messageLoadBalancingType, 1, isNetty(), 1, 0);
|
||||
}
|
||||
|
||||
protected void setRedistributionDelay(final long delay) {
|
||||
AddressSettings as = new AddressSettings().setRedistributionDelay(delay);
|
||||
|
||||
getServer(0).getAddressSettingsRepository().addMatch("*", as);
|
||||
getServer(1).getAddressSettingsRepository().addMatch("*", as);
|
||||
}
|
||||
|
||||
protected void setupServers() throws Exception {
|
||||
setupServer(0, isFileStorage(), isNetty());
|
||||
setupServer(1, isFileStorage(), isNetty());
|
||||
|
||||
servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
|
||||
servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
|
||||
}
|
||||
|
||||
protected void stopServers() throws Exception {
|
||||
closeAllConsumers();
|
||||
|
||||
closeAllSessionFactories();
|
||||
|
||||
closeAllServerLocatorsFactories();
|
||||
|
||||
stopServers(0, 1);
|
||||
|
||||
clearServer(0, 1);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue