ARTEMIS-4378 ignore address federation config if connection is configured as pull, consumerWindowSize=0
This commit is contained in:
parent
29fafb5fed
commit
84c16f1a0d
|
@ -36,6 +36,7 @@ public class FederationConnection {
|
||||||
private volatile ClientSessionFactory clientSessionFactory;
|
private volatile ClientSessionFactory clientSessionFactory;
|
||||||
private volatile boolean started;
|
private volatile boolean started;
|
||||||
private volatile boolean sharedConnection;
|
private volatile boolean sharedConnection;
|
||||||
|
private boolean isPull;
|
||||||
|
|
||||||
public FederationConnection(Configuration configuration, String name, FederationConnectionConfiguration config) {
|
public FederationConnection(Configuration configuration, String name, FederationConnectionConfiguration config) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
@ -95,8 +96,8 @@ public class FederationConnection {
|
||||||
BeanSupport.setData(serverLocator, possibleLocatorParameters);
|
BeanSupport.setData(serverLocator, possibleLocatorParameters);
|
||||||
} catch (Exception ignoredAsErrorsVisibleViaBeanUtilsLogging) {
|
} catch (Exception ignoredAsErrorsVisibleViaBeanUtilsLogging) {
|
||||||
}
|
}
|
||||||
|
isPull = ("0".equals(possibleLocatorParameters.get("consumerWindowSize")));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void start() {
|
public synchronized void start() {
|
||||||
|
@ -117,6 +118,10 @@ public class FederationConnection {
|
||||||
return started;
|
return started;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPull() {
|
||||||
|
return isPull;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isSharedConnection() {
|
public boolean isSharedConnection() {
|
||||||
return sharedConnection;
|
return sharedConnection;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.artemis.core.server.federation.address;
|
package org.apache.activemq.artemis.core.server.federation.address;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -52,6 +53,8 @@ import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.Match;
|
import org.apache.activemq.artemis.core.settings.impl.Match;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Federated Address, replicate messages from the remote brokers address to itself.
|
* Federated Address, replicate messages from the remote brokers address to itself.
|
||||||
|
@ -65,6 +68,8 @@ import org.apache.activemq.artemis.utils.ByteUtil;
|
||||||
*/
|
*/
|
||||||
public class FederatedAddress extends FederatedAbstract implements ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin, Serializable {
|
public class FederatedAddress extends FederatedAbstract implements ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin, Serializable {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
|
||||||
public static final String FEDERATED_QUEUE_PREFIX = "federated";
|
public static final String FEDERATED_QUEUE_PREFIX = "federated";
|
||||||
|
|
||||||
public static final SimpleString HDR_HOPS = new SimpleString("_AMQ_Hops");
|
public static final SimpleString HDR_HOPS = new SimpleString("_AMQ_Hops");
|
||||||
|
@ -74,6 +79,7 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
|
||||||
private final Set<Matcher> excludes;
|
private final Set<Matcher> excludes;
|
||||||
private final FederationAddressPolicyConfiguration config;
|
private final FederationAddressPolicyConfiguration config;
|
||||||
private final Map<DivertBinding, Set<SimpleString>> matchingDiverts = new HashMap<>();
|
private final Map<DivertBinding, Set<SimpleString>> matchingDiverts = new HashMap<>();
|
||||||
|
private final boolean hasPullConnectionConfig;
|
||||||
|
|
||||||
public FederatedAddress(Federation federation, FederationAddressPolicyConfiguration config, ActiveMQServer server, FederationUpstream upstream) {
|
public FederatedAddress(Federation federation, FederationAddressPolicyConfiguration config, ActiveMQServer server, FederationUpstream upstream) {
|
||||||
super(federation, server, upstream);
|
super(federation, server, upstream);
|
||||||
|
@ -102,6 +108,7 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
|
||||||
excludes.add(new Matcher(exclude, wildcardConfiguration));
|
excludes.add(new Matcher(exclude, wildcardConfiguration));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
hasPullConnectionConfig = upstream.getConnection().isPull();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -310,8 +317,14 @@ public class FederatedAddress extends FederatedAbstract implements ActiveMQServe
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean match(SimpleString address, RoutingType routingType) {
|
private boolean match(SimpleString address, RoutingType routingType) {
|
||||||
//Currently only supporting Multicast currently.
|
|
||||||
if (RoutingType.ANYCAST.equals(routingType)) {
|
if (RoutingType.ANYCAST.equals(routingType)) {
|
||||||
|
logger.debug("ignoring unsupported ANYCAST address {}", address);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (hasPullConnectionConfig) {
|
||||||
|
// multicast address federation has no local queue to trigger batch pull requests, a regular fast consumer with credit window is necessary
|
||||||
|
// otherwise the upstream would fill up and block.
|
||||||
|
logger.debug("ignoring MULTICAST address {} on unsupported pull connection, consumerWindowSize=0 ", address);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
for (Matcher exclude : excludes) {
|
for (Matcher exclude : excludes) {
|
||||||
|
|
|
@ -22,14 +22,18 @@ import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import javax.jms.Topic;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.config.FederationConfiguration;
|
import org.apache.activemq.artemis.core.config.FederationConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration;
|
||||||
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
|
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
|
||||||
import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
|
import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.tests.util.Wait;
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
@ -65,6 +69,44 @@ public class FederatedQueuePullConsumerTest extends FederatedTestBase {
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddressFederatedConfiguredWithPullQueueConsumerEnabledNotAnOption() throws Exception {
|
||||||
|
String connector = "server-pull-1";
|
||||||
|
|
||||||
|
getServer(0).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
|
||||||
|
getServer(1).getAddressSettingsRepository().getMatch("#").setAutoCreateAddresses(true).setAutoCreateQueues(true);
|
||||||
|
|
||||||
|
getServer(0).addAddressInfo(new AddressInfo(SimpleString.toSimpleString("source"), RoutingType.MULTICAST));
|
||||||
|
getServer(1).addAddressInfo(new AddressInfo(SimpleString.toSimpleString("source"), RoutingType.MULTICAST));
|
||||||
|
|
||||||
|
getServer(0).getConfiguration().getFederationConfigurations().add(new FederationConfiguration().setName("default").addFederationPolicy(new FederationAddressPolicyConfiguration().setName("myAddressPolicy").addInclude(new FederationAddressPolicyConfiguration.Matcher().setAddressMatch("#"))).addUpstreamConfiguration(new FederationUpstreamConfiguration().setName("server1-upstream").addPolicyRef("myAddressPolicy").setStaticConnectors(Collections.singletonList(connector))));
|
||||||
|
|
||||||
|
getServer(0).getFederationManager().deploy();
|
||||||
|
|
||||||
|
final ConnectionFactory cf1 = getCF(0);
|
||||||
|
final ConnectionFactory cf2 = getCF(1);
|
||||||
|
|
||||||
|
try (Connection consumer1Connection = cf1.createConnection(); Connection producerConnection = cf2.createConnection()) {
|
||||||
|
consumer1Connection.start();
|
||||||
|
final Session session1 = consumer1Connection.createSession();
|
||||||
|
final Topic topic1 = session1.createTopic("source");
|
||||||
|
final MessageConsumer consumer1 = session1.createConsumer(topic1);
|
||||||
|
|
||||||
|
// Remote
|
||||||
|
final Session session2 = producerConnection.createSession();
|
||||||
|
final Topic topic2 = session2.createTopic("source");
|
||||||
|
final MessageProducer producer = session2.createProducer(topic2);
|
||||||
|
|
||||||
|
producer.send(session2.createTextMessage("hello"));
|
||||||
|
|
||||||
|
// no federation of this address
|
||||||
|
// consumer visible on local
|
||||||
|
assertTrue(waitForBindings(getServer(0), "source", true, 1, 1, 1000));
|
||||||
|
// federation consumer not visible on remote
|
||||||
|
assertFalse(waitForBindings(getServer(1), "source", true, 1, 1, 100));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFederatedQueuePullFromUpstream() throws Exception {
|
public void testFederatedQueuePullFromUpstream() throws Exception {
|
||||||
String queueName = getName();
|
String queueName = getName();
|
||||||
|
|
Loading…
Reference in New Issue