This commit is contained in:
Justin Bertram 2022-10-13 10:36:38 -05:00
commit a7bbe3c1fb
No known key found for this signature in database
GPG Key ID: F41830B875BB8633
3 changed files with 61 additions and 2 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.config.federation;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@ -98,4 +99,9 @@ public abstract class FederationStreamConfiguration <T extends FederationStreamC
policyRefs.add(buffer.readString());
}
}
public T setStaticConnectors(List<String> connectors) {
connectionConfiguration.setStaticConnectors(connectors);
return (T) this;
}
}

View File

@ -167,8 +167,7 @@ public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, Sessi
clientConsumer = null;
clientSession = null;
if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() ||
clientSessionFactory.numSessions() == 0)) {
if (clientSessionFactory != null && clientSessionFactory.numSessions() == 0 && !upstream.getConnection().isSharedConnection()) {
clientSessionFactory.close();
clientSessionFactory = null;
}

View File

@ -25,7 +25,10 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Collections;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
@ -80,6 +83,57 @@ public class FederatedQueueTest extends FederatedTestBase {
testFederatedQueueRemoteConsume(queueName);
}
@Test
public void testMultipleFederatedQueueRemoteConsumersUpstream() throws Exception {
String connector = "server1";
getServer(0).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
getServer(1).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
getServer(1).createQueue(new QueueConfiguration("Test.Q.1").setRoutingType(RoutingType.ANYCAST));
getServer(1).createQueue(new QueueConfiguration("Test.Q.2").setRoutingType(RoutingType.ANYCAST));
getServer(0).getConfiguration().getFederationConfigurations().add(new FederationConfiguration()
.setName("default")
.addFederationPolicy(new FederationQueuePolicyConfiguration()
.setName("myQueuePolicy")
.addInclude(new FederationQueuePolicyConfiguration.Matcher()
.setQueueMatch("#")
.setAddressMatch("Test.#")))
.addUpstreamConfiguration(new FederationUpstreamConfiguration()
.setName("server1-upstream")
.addPolicyRef("myQueuePolicy")
.setStaticConnectors(Collections.singletonList(connector))));
getServer(0).getFederationManager().deploy();
ConnectionFactory cf1 = getCF(0);
ConnectionFactory cf2 = getCF(0);
ConnectionFactory cf3 = getCF(1);
try (Connection consumer1Connection = cf1.createConnection(); Connection consumer2Connection = cf2.createConnection(); Connection producerConnection = cf3.createConnection()) {
consumer1Connection.start();
Session session1 = consumer1Connection.createSession();
Queue queue1 = session1.createQueue("Test.Q.1");
MessageConsumer consumer1 = session1.createConsumer(queue1);
consumer2Connection.start();
Session session2 = consumer2Connection.createSession();
Queue queue2 = session2.createQueue("Test.Q.2");
MessageConsumer consumer2 = session2.createConsumer(queue2);
Session session3 = producerConnection.createSession();
MessageProducer producer = session3.createProducer(queue2);
producer.send(session3.createTextMessage("hello"));
assertNotNull(consumer2.receive(1000));
consumer1Connection.close();
producer.send(session3.createTextMessage("hello"));
assertNotNull(consumer2.receive(1000));
}
}
@Test
public void testFederatedQueueRemoteConsumeUpstreamPriorityAdjustment() throws Exception {
String queueName = getName();