diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java index 6eb6146aff..9e3d63e204 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java @@ -89,7 +89,7 @@ public final class BridgeConfiguration implements Serializable { private boolean useDuplicateDetection = ActiveMQDefaultConfiguration.isDefaultBridgeDuplicateDetection(); - private int confirmationWindowSize = ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE; + private int confirmationWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(); // disable flow control private int producerWindowSize = ActiveMQDefaultConfiguration.getDefaultBridgeProducerWindowSize(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java index 17a8050781..91e6f5cd87 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java @@ -97,7 +97,7 @@ public class BridgeConfigurationTest { Assert.assertEquals("-1", jsonObject.get(BridgeConfiguration.RECONNECT_ATTEMPTS).toString()); Assert.assertEquals("10", jsonObject.get(BridgeConfiguration.RECONNECT_ATTEMPTS_ON_SAME_NODE).toString()); Assert.assertEquals("true", jsonObject.get(BridgeConfiguration.USE_DUPLICATE_DETECTION).toString()); - Assert.assertEquals("-1", jsonObject.get(BridgeConfiguration.CONFIRMATION_WINDOW_SIZE).toString()); + Assert.assertEquals("10485760", jsonObject.get(BridgeConfiguration.CONFIRMATION_WINDOW_SIZE).toString()); Assert.assertEquals("-1", jsonObject.get(BridgeConfiguration.PRODUCER_WINDOW_SIZE).toString()); Assert.assertEquals("30000", jsonObject.get(BridgeConfiguration.CLIENT_FAILURE_CHECK_PERIOD).toString()); Assert.assertEquals("2000", jsonObject.get(BridgeConfiguration.MAX_RETRY_INTERVAL).toString()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index 4ad7df8894..de8f4f823a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -2011,6 +2011,37 @@ public class BridgeTest extends ActiveMQTestBase { assertEquals(transformer, ((BridgeImpl) bridge).getTransformer()); } + @Test + public void testDefaultConfirmationWindowSize() throws Exception { + final SimpleString ADDRESS = new SimpleString("myAddress"); + final SimpleString QUEUE = new SimpleString("myQueue"); + final SimpleString FORWARDING_ADDRESS = new SimpleString("myForwardingAddress"); + final SimpleString FORWARDING_QUEUE = new SimpleString("myForwardingQueue"); + final String BRIDGE = "myBridge"; + + Configuration config = createDefaultConfig(0, isNetty()).addConnectorConfiguration("myConnector", new TransportConfiguration(getConnector())); + ActiveMQServer server = addServer(new ActiveMQServerImpl(config)); + server.start(); + server.waitForActivation(100, TimeUnit.MILLISECONDS); + server.createQueue(new QueueConfiguration(QUEUE).setAddress(ADDRESS).setRoutingType(RoutingType.ANYCAST).setDurable(false)); + server.createQueue(new QueueConfiguration(FORWARDING_QUEUE).setAddress(FORWARDING_ADDRESS).setRoutingType(RoutingType.ANYCAST).setDurable(false)); + server.deployBridge(new BridgeConfiguration() + .setName(BRIDGE) + .setQueueName(QUEUE.toString()) + .setForwardingAddress(FORWARDING_ADDRESS.toString()) + .setStaticConnectors(List.of("myConnector"))); + + // now we actually have to use the bridge to make sure it connected correctly + locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(getConnector()))); + ClientSessionFactory sf = addSessionFactory(locator.createSessionFactory()); + ClientSession session = addClientSession(sf.createSession(false, true, true)); + ClientProducer producer = addClientProducer(session.createProducer(ADDRESS)); + ClientConsumer consumer = addClientConsumer(session.createConsumer(FORWARDING_QUEUE)); + session.start(); + producer.send(session.createMessage(true)); + Assert.assertNotNull(consumer.receive(200)); + } + /** * It will inspect the journal directly and determine if there are queues on this journal, *