ARTEMIS-4035 all consumers of federated queue drop if only one consumer drops
This commit is contained in:
parent
8cba446e2b
commit
0ab098e456
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.config.federation;
|
|||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -98,4 +99,9 @@ public abstract class FederationStreamConfiguration <T extends FederationStreamC
|
|||
policyRefs.add(buffer.readString());
|
||||
}
|
||||
}
|
||||
|
||||
public T setStaticConnectors(List<String> connectors) {
|
||||
connectionConfiguration.setStaticConnectors(connectors);
|
||||
return (T) this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -167,8 +167,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
|
|||
clientConsumer = null;
|
||||
clientSession = null;
|
||||
|
||||
if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() ||
|
||||
clientSessionFactory.numSessions() == 0)) {
|
||||
if (clientSessionFactory != null && clientSessionFactory.numSessions() == 0 && !upstream.getConnection().isSharedConnection()) {
|
||||
clientSessionFactory.close();
|
||||
clientSessionFactory = null;
|
||||
}
|
||||
|
|
|
@ -25,7 +25,10 @@ import javax.jms.MessageProducer;
|
|||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.FederationConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
|
||||
|
@ -80,6 +83,57 @@ public class FederatedQueueTest extends FederatedTestBase {
|
|||
testFederatedQueueRemoteConsume(queueName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleFederatedQueueRemoteConsumersUpstream() throws Exception {
|
||||
String connector = "server1";
|
||||
|
||||
getServer(0).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
|
||||
getServer(1).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
|
||||
|
||||
getServer(1).createQueue(new QueueConfiguration("Test.Q.1").setRoutingType(RoutingType.ANYCAST));
|
||||
getServer(1).createQueue(new QueueConfiguration("Test.Q.2").setRoutingType(RoutingType.ANYCAST));
|
||||
|
||||
getServer(0).getConfiguration().getFederationConfigurations().add(new FederationConfiguration()
|
||||
.setName("default")
|
||||
.addFederationPolicy(new FederationQueuePolicyConfiguration()
|
||||
.setName("myQueuePolicy")
|
||||
.addInclude(new FederationQueuePolicyConfiguration.Matcher()
|
||||
.setQueueMatch("#")
|
||||
.setAddressMatch("Test.#")))
|
||||
.addUpstreamConfiguration(new FederationUpstreamConfiguration()
|
||||
.setName("server1-upstream")
|
||||
.addPolicyRef("myQueuePolicy")
|
||||
.setStaticConnectors(Collections.singletonList(connector))));
|
||||
getServer(0).getFederationManager().deploy();
|
||||
|
||||
ConnectionFactory cf1 = getCF(0);
|
||||
ConnectionFactory cf2 = getCF(0);
|
||||
ConnectionFactory cf3 = getCF(1);
|
||||
try (Connection consumer1Connection = cf1.createConnection(); Connection consumer2Connection = cf2.createConnection(); Connection producerConnection = cf3.createConnection()) {
|
||||
consumer1Connection.start();
|
||||
Session session1 = consumer1Connection.createSession();
|
||||
Queue queue1 = session1.createQueue("Test.Q.1");
|
||||
MessageConsumer consumer1 = session1.createConsumer(queue1);
|
||||
|
||||
consumer2Connection.start();
|
||||
Session session2 = consumer2Connection.createSession();
|
||||
Queue queue2 = session2.createQueue("Test.Q.2");
|
||||
MessageConsumer consumer2 = session2.createConsumer(queue2);
|
||||
|
||||
Session session3 = producerConnection.createSession();
|
||||
MessageProducer producer = session3.createProducer(queue2);
|
||||
producer.send(session3.createTextMessage("hello"));
|
||||
|
||||
assertNotNull(consumer2.receive(1000));
|
||||
|
||||
consumer1Connection.close();
|
||||
|
||||
producer.send(session3.createTextMessage("hello"));
|
||||
|
||||
assertNotNull(consumer2.receive(1000));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFederatedQueueRemoteConsumeUpstreamPriorityAdjustment() throws Exception {
|
||||
String queueName = getName();
|
||||
|
|
Loading…
Reference in New Issue