ARTEMIS-1858 Expiry messages are not transversing clustering with AMQP
This commit is contained in:
parent
9b7ebef9fd
commit
1ae2784dc6
|
@ -355,10 +355,28 @@ public interface Message {
|
||||||
|
|
||||||
String getAddress();
|
String getAddress();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Look at {@link #setAddress(SimpleString)} for the doc.
|
||||||
|
* @param address
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
Message setAddress(String address);
|
Message setAddress(String address);
|
||||||
|
|
||||||
SimpleString getAddressSimpleString();
|
SimpleString getAddressSimpleString();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This will set the address on CoreMessage.
|
||||||
|
*
|
||||||
|
* Note for AMQPMessages:
|
||||||
|
* in AMQPMessages this will not really change the address on the message. Instead it will add a property
|
||||||
|
* on extraProperties which only transverse internally at the broker.
|
||||||
|
* Whatever you change here it won't affect anything towards the received message.
|
||||||
|
*
|
||||||
|
* If you wish to change AMQPMessages address you will have to do it directly at the AMQP Message, however beware
|
||||||
|
* that AMQPMessages are not supposed to be changed at the broker, so only do it if you know what you are doing.
|
||||||
|
* @param address
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
Message setAddress(SimpleString address);
|
Message setAddress(SimpleString address);
|
||||||
|
|
||||||
long getTimestamp();
|
long getTimestamp();
|
||||||
|
|
|
@ -54,4 +54,8 @@ public class CoreMessageObjectPools {
|
||||||
public TypedProperties.TypedPropertiesStringSimpleStringPools getPropertiesStringSimpleStringPools() {
|
public TypedProperties.TypedPropertiesStringSimpleStringPools getPropertiesStringSimpleStringPools() {
|
||||||
return propertiesStringSimpleStringPools.get();
|
return propertiesStringSimpleStringPools.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static SimpleString cachedAddressSimpleString(String address, CoreMessageObjectPools coreMessageObjectPools) {
|
||||||
|
return SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,8 @@ import io.netty.buffer.Unpooled;
|
||||||
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
|
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
|
||||||
public class AMQPMessage extends RefCountMessage {
|
public class AMQPMessage extends RefCountMessage {
|
||||||
|
|
||||||
|
public static final SimpleString ADDRESS_PROPERTY = SimpleString.toSimpleString("_AMQ_AD");
|
||||||
|
|
||||||
public static final int DEFAULT_MESSAGE_PRIORITY = 4;
|
public static final int DEFAULT_MESSAGE_PRIORITY = 4;
|
||||||
public static final int MAX_MESSAGE_PRIORITY = 9;
|
public static final int MAX_MESSAGE_PRIORITY = 9;
|
||||||
|
|
||||||
|
@ -103,19 +105,20 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
* these are properties created by the broker only */
|
* these are properties created by the broker only */
|
||||||
private volatile TypedProperties extraProperties;
|
private volatile TypedProperties extraProperties;
|
||||||
|
|
||||||
public AMQPMessage(long messageFormat, byte[] data) {
|
public AMQPMessage(long messageFormat, byte[] data, TypedProperties extraProperties) {
|
||||||
this(messageFormat, data, null);
|
this(messageFormat, data, extraProperties, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public AMQPMessage(long messageFormat, byte[] data, CoreMessageObjectPools coreMessageObjectPools) {
|
public AMQPMessage(long messageFormat, byte[] data, TypedProperties extraProperties, CoreMessageObjectPools coreMessageObjectPools) {
|
||||||
this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), coreMessageObjectPools);
|
this(messageFormat, ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(data)), extraProperties, coreMessageObjectPools);
|
||||||
}
|
}
|
||||||
|
|
||||||
public AMQPMessage(long messageFormat, ReadableBuffer data, CoreMessageObjectPools coreMessageObjectPools) {
|
public AMQPMessage(long messageFormat, ReadableBuffer data, TypedProperties extraProperties, CoreMessageObjectPools coreMessageObjectPools) {
|
||||||
this.data = data;
|
this.data = data;
|
||||||
this.messageFormat = messageFormat;
|
this.messageFormat = messageFormat;
|
||||||
this.bufferValid = true;
|
this.bufferValid = true;
|
||||||
this.coreMessageObjectPools = coreMessageObjectPools;
|
this.coreMessageObjectPools = coreMessageObjectPools;
|
||||||
|
this.extraProperties = extraProperties == null ? null : new TypedProperties(extraProperties);
|
||||||
parseHeaders();
|
parseHeaders();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -496,7 +499,7 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
view.position(messagePaylodStart);
|
view.position(messagePaylodStart);
|
||||||
view.get(newData, headerEnds, view.remaining());
|
view.get(newData, headerEnds, view.remaining());
|
||||||
|
|
||||||
AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
|
AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData, extraProperties, coreMessageObjectPools);
|
||||||
newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
|
newEncode.setDurable(isDurable()).setMessageID(this.getMessageID());
|
||||||
return newEncode;
|
return newEncode;
|
||||||
}
|
}
|
||||||
|
@ -604,26 +607,40 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
return addressSimpleString == null ? null : addressSimpleString.toString();
|
return addressSimpleString == null ? null : addressSimpleString.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public SimpleString cachedAddressSimpleString(String address) {
|
||||||
|
return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AMQPMessage setAddress(String address) {
|
public AMQPMessage setAddress(String address) {
|
||||||
this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool());
|
setAddress(cachedAddressSimpleString(address));
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AMQPMessage setAddress(SimpleString address) {
|
public AMQPMessage setAddress(SimpleString address) {
|
||||||
this.address = address;
|
this.address = address;
|
||||||
|
createExtraProperties().putSimpleStringProperty(ADDRESS_PROPERTY, address);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SimpleString getAddressSimpleString() {
|
public SimpleString getAddressSimpleString() {
|
||||||
if (address == null) {
|
if (address == null) {
|
||||||
Properties properties = getProtonMessage().getProperties();
|
TypedProperties extraProperties = getExtraProperties();
|
||||||
if (properties != null) {
|
|
||||||
setAddress(properties.getTo());
|
// we first check if extraProperties is not null, no need to create it just to check it here
|
||||||
} else {
|
if (extraProperties != null) {
|
||||||
return null;
|
address = extraProperties.getSimpleStringProperty(ADDRESS_PROPERTY);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (address == null) {
|
||||||
|
// if it still null, it will look for the address on the getTo();
|
||||||
|
Properties properties = getProperties();
|
||||||
|
if (properties != null && properties.getTo() != null) {
|
||||||
|
address = cachedAddressSimpleString(properties.getTo());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return address;
|
return address;
|
||||||
|
@ -1261,6 +1278,9 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
", messageID=" + getMessageID() +
|
", messageID=" + getMessageID() +
|
||||||
", address=" + getAddress() +
|
", address=" + getAddress() +
|
||||||
", size=" + getEncodeSize() +
|
", size=" + getEncodeSize() +
|
||||||
|
", applicationProperties=" + getApplicationProperties() +
|
||||||
|
", properties=" + getProperties() +
|
||||||
|
", extraProperties = " + getExtraProperties() +
|
||||||
"]";
|
"]";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -460,7 +460,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
||||||
SimpleString address,
|
SimpleString address,
|
||||||
int messageFormat,
|
int messageFormat,
|
||||||
ReadableBuffer data) throws Exception {
|
ReadableBuffer data) throws Exception {
|
||||||
AMQPMessage message = new AMQPMessage(messageFormat, data, coreMessageObjectPools);
|
AMQPMessage message = new AMQPMessage(messageFormat, data, null, coreMessageObjectPools);
|
||||||
if (address != null) {
|
if (address != null) {
|
||||||
message.setAddress(address);
|
message.setAddress(address);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -179,6 +179,9 @@ public class AmqpCoreConverter {
|
||||||
TypedProperties properties = message.getExtraProperties();
|
TypedProperties properties = message.getExtraProperties();
|
||||||
if (properties != null) {
|
if (properties != null) {
|
||||||
for (SimpleString str : properties.getPropertyNames()) {
|
for (SimpleString str : properties.getPropertyNames()) {
|
||||||
|
if (str.equals(AMQPMessage.ADDRESS_PROPERTY)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
result.getInnerMessage().putBytesProperty(str, properties.getBytesProperty(str));
|
result.getInnerMessage().putBytesProperty(str, properties.getBytesProperty(str));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -316,7 +316,7 @@ public class CoreAmqpConverter {
|
||||||
byte[] data = new byte[buffer.writerIndex()];
|
byte[] data = new byte[buffer.writerIndex()];
|
||||||
buffer.readBytes(data);
|
buffer.readBytes(data);
|
||||||
|
|
||||||
AMQPMessage amqpMessage = new AMQPMessage(messageFormat, data);
|
AMQPMessage amqpMessage = new AMQPMessage(messageFormat, data, null);
|
||||||
amqpMessage.setMessageID(message.getInnerMessage().getMessageID());
|
amqpMessage.setMessageID(message.getInnerMessage().getMessageID());
|
||||||
amqpMessage.setReplyTo(coreMessage.getReplyTo());
|
amqpMessage.setReplyTo(coreMessage.getReplyTo());
|
||||||
return amqpMessage;
|
return amqpMessage;
|
||||||
|
|
|
@ -291,7 +291,7 @@ public class AMQPMessageTest {
|
||||||
byte[] bytes = new byte[nettyBuffer.writerIndex()];
|
byte[] bytes = new byte[nettyBuffer.writerIndex()];
|
||||||
nettyBuffer.readBytes(bytes);
|
nettyBuffer.readBytes(bytes);
|
||||||
|
|
||||||
return new AMQPMessage(0, bytes);
|
return new AMQPMessage(0, bytes, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private AMQPMessage encodeDelivery(AMQPMessage message, int deliveryCount) {
|
private AMQPMessage encodeDelivery(AMQPMessage message, int deliveryCount) {
|
||||||
|
@ -302,6 +302,6 @@ public class AMQPMessageTest {
|
||||||
byte[] bytes = new byte[nettyBuffer.writerIndex()];
|
byte[] bytes = new byte[nettyBuffer.writerIndex()];
|
||||||
nettyBuffer.readBytes(bytes);
|
nettyBuffer.readBytes(bytes);
|
||||||
|
|
||||||
return new AMQPMessage(0, bytes);
|
return new AMQPMessage(0, bytes, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -977,9 +977,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
// arrived the target node
|
// arrived the target node
|
||||||
// as described on https://issues.jboss.org/browse/JBPAPP-6130
|
// as described on https://issues.jboss.org/browse/JBPAPP-6130
|
||||||
Message copyRedistribute = message.copy(storageManager.generateID());
|
Message copyRedistribute = message.copy(storageManager.generateID());
|
||||||
if (copyRedistribute.getAddress() == null) {
|
copyRedistribute.setAddress(originatingQueue.getAddress());
|
||||||
copyRedistribute.setAddress(originatingQueue.getAddress());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tx != null) {
|
if (tx != null) {
|
||||||
tx.addOperation(new TransactionOperationAbstract() {
|
tx.addOperation(new TransactionOperationAbstract() {
|
||||||
|
|
|
@ -0,0 +1,284 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.ConnectionFactory;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||||
|
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
|
||||||
|
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
|
||||||
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
|
||||||
|
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(value = Parameterized.class)
|
||||||
|
public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
|
||||||
|
|
||||||
|
private static final int NUMBER_OF_SERVERS = 2;
|
||||||
|
private static final SimpleString queueName = SimpleString.toSimpleString("queues.0");
|
||||||
|
|
||||||
|
|
||||||
|
// I'm taking any number that /2 = Odd
|
||||||
|
// to avoid perfect roundings and making sure messages are evenly distributed
|
||||||
|
private static final int NUMBER_OF_MESSAGES = 77 * 2;
|
||||||
|
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "protocol={0}")
|
||||||
|
public static Collection getParameters() {
|
||||||
|
return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Parameterized.Parameter(0)
|
||||||
|
public String protocol;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startServers(MessageLoadBalancingType loadBalancingType) throws Exception {
|
||||||
|
setupServers();
|
||||||
|
|
||||||
|
setRedistributionDelay(0);
|
||||||
|
|
||||||
|
setupCluster(loadBalancingType);
|
||||||
|
|
||||||
|
AddressSettings as = new AddressSettings().setRedistributionDelay(0).setExpiryAddress(SimpleString.toSimpleString("queues.expiry"));
|
||||||
|
|
||||||
|
getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
|
||||||
|
getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
|
||||||
|
|
||||||
|
startServers(0);
|
||||||
|
startServers(1);
|
||||||
|
|
||||||
|
createQueue(SimpleString.toSimpleString("queues.expiry"));
|
||||||
|
createQueue(queueName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createQueue(SimpleString queueName) throws Exception {
|
||||||
|
servers[0].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true);
|
||||||
|
servers[1].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isNetty() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ConnectionFactory getJmsConnectionFactory(int node) {
|
||||||
|
if (protocol.equals("AMQP")) {
|
||||||
|
return new JmsConnectionFactory("amqp://localhost:" + (61616 + node));
|
||||||
|
} else {
|
||||||
|
return new ActiveMQConnectionFactory("tcp://localhost:" + (61616 + node));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void pauseClusteringBridges(ActiveMQServer server) throws Exception {
|
||||||
|
for (ClusterConnection clusterConnection : server.getClusterManager().getClusterConnections()) {
|
||||||
|
for (MessageFlowRecord record : ((ClusterConnectionImpl)clusterConnection).getRecords().values()) {
|
||||||
|
record.getBridge().pause();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoadBalancing() throws Exception {
|
||||||
|
|
||||||
|
startServers(MessageLoadBalancingType.STRICT);
|
||||||
|
|
||||||
|
ConnectionFactory[] factory = new ConnectionFactory[NUMBER_OF_SERVERS];
|
||||||
|
Connection[] connection = new Connection[NUMBER_OF_SERVERS];
|
||||||
|
Session[] session = new Session[NUMBER_OF_SERVERS];
|
||||||
|
MessageConsumer[] consumer = new MessageConsumer[NUMBER_OF_SERVERS];
|
||||||
|
|
||||||
|
// this will pre create consumers to make sure messages are distributed evenly without redistribution
|
||||||
|
for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
|
||||||
|
factory[node] = getJmsConnectionFactory(node);
|
||||||
|
connection[node] = factory[node].createConnection();
|
||||||
|
session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForBindings(0, "queues.0", 1, 1, true);
|
||||||
|
waitForBindings(1, "queues.0", 1, 1, true);
|
||||||
|
|
||||||
|
waitForBindings(0, "queues.0", 1, 1, false);
|
||||||
|
waitForBindings(1, "queues.0", 1, 1, false);
|
||||||
|
|
||||||
|
pauseClusteringBridges(servers[0]);
|
||||||
|
|
||||||
|
|
||||||
|
// sending Messages.. they should be load balanced
|
||||||
|
{
|
||||||
|
ConnectionFactory cf = getJmsConnectionFactory(0);
|
||||||
|
Connection cn = cf.createConnection();
|
||||||
|
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
|
||||||
|
|
||||||
|
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||||
|
pd.send(sn.createTextMessage("hello " + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
cn.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true);
|
||||||
|
connection[1].start();
|
||||||
|
Assert.assertNull(consumer[1].receiveNoWait());
|
||||||
|
connection[1].stop();
|
||||||
|
|
||||||
|
servers[0].stop();
|
||||||
|
clearServer(0);
|
||||||
|
|
||||||
|
setupServer(0, isFileStorage(), isNetty());
|
||||||
|
servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
|
||||||
|
|
||||||
|
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.STRICT, 1, isNetty(), 0, 1);
|
||||||
|
|
||||||
|
servers[0].start();
|
||||||
|
|
||||||
|
receiveMessages(connection[1], consumer[1], NUMBER_OF_MESSAGES / 2, true);
|
||||||
|
for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
|
||||||
|
connection[node].close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExpireRedistributed() throws Exception {
|
||||||
|
|
||||||
|
startServers(MessageLoadBalancingType.ON_DEMAND);
|
||||||
|
|
||||||
|
ConnectionFactory factory = getJmsConnectionFactory(1);
|
||||||
|
|
||||||
|
|
||||||
|
waitForBindings(0, "queues.0", 1, 0, true);
|
||||||
|
waitForBindings(1, "queues.0", 1, 0, true);
|
||||||
|
|
||||||
|
waitForBindings(0, "queues.0", 1, 0, false);
|
||||||
|
waitForBindings(1, "queues.0", 1, 0, false);
|
||||||
|
|
||||||
|
|
||||||
|
// sending Messages..
|
||||||
|
{
|
||||||
|
ConnectionFactory cf = getJmsConnectionFactory(0);
|
||||||
|
Connection cn = cf.createConnection();
|
||||||
|
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
|
||||||
|
pd.setTimeToLive(200);
|
||||||
|
|
||||||
|
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||||
|
pd.send(sn.createTextMessage("hello " + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
cn.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// time to let stuff expire
|
||||||
|
Thread.sleep(200);
|
||||||
|
|
||||||
|
|
||||||
|
Connection connection = factory.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer = session.createConsumer(session.createQueue("queues.expiry"));
|
||||||
|
|
||||||
|
receiveMessages(connection, consumer, NUMBER_OF_MESSAGES, true);
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void receiveMessages(Connection connection,
|
||||||
|
MessageConsumer messageConsumer,
|
||||||
|
int messageCount,
|
||||||
|
boolean exactCount) throws JMSException {
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < messageCount; i++) {
|
||||||
|
Message msg = messageConsumer.receive(5000);
|
||||||
|
Assert.assertNotNull(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
// this means no more messages received
|
||||||
|
if (exactCount) {
|
||||||
|
Assert.assertNull(messageConsumer.receiveNoWait());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
|
||||||
|
setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1);
|
||||||
|
|
||||||
|
setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setRedistributionDelay(final long delay) {
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setupServers() throws Exception {
|
||||||
|
setupServer(0, isFileStorage(), isNetty());
|
||||||
|
setupServer(1, isFileStorage(), isNetty());
|
||||||
|
|
||||||
|
servers[0].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
|
||||||
|
servers[1].addProtocolManagerFactory(new ProtonProtocolManagerFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void stopServers() throws Exception {
|
||||||
|
closeAllConsumers();
|
||||||
|
|
||||||
|
closeAllSessionFactories();
|
||||||
|
|
||||||
|
closeAllServerLocatorsFactories();
|
||||||
|
|
||||||
|
stopServers(0, 1);
|
||||||
|
|
||||||
|
clearServer(0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param serverID
|
||||||
|
* @return
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected ConfigurationImpl createBasicConfig(final int serverID) {
|
||||||
|
ConfigurationImpl configuration = super.createBasicConfig(serverID);
|
||||||
|
configuration.setMessageExpiryScanPeriod(100);
|
||||||
|
|
||||||
|
return configuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
* This package contains tests about messages crossing over protocols
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol;
|
Loading…
Reference in New Issue