ARTEMIS-3498 Bridge reconnects will not clear delivering count statistics on internal queues

This commit is contained in:
Clebert Suconic 2021-09-24 10:43:35 -04:00
parent e4276e8cd0
commit 116545c589
5 changed files with 370 additions and 11 deletions

View File

@ -103,7 +103,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private final SimpleString forwardingAddress;
private final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
final java.util.Map<Long, MessageReference> refs = new LinkedHashMap<>();
private final Transformer transformer;

View File

@ -3307,10 +3307,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final boolean ignoreRedeliveryDelay) throws Exception {
if (internalQueue) {
if (logger.isTraceEnabled()) {
logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
}
// no DLQ check on internal queues
// we just need to return statistics on the delivering
decDelivering(reference);
return new Pair<>(true, false);
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.cluster.impl;
import java.util.Map;
import java.util.function.Function;
import org.apache.activemq.artemis.core.server.MessageReference;
/** it will provide accessors for Bridge during testing.
* Do not use this outside of the context of UnitTesting. */
public class BridgeTestAccessor {
public static Map<Long, MessageReference> getRefs(BridgeImpl bridge) {
return bridge.refs;
}
public static boolean withinRefs(BridgeImpl bridge, Function<Map<Long, MessageReference>, Boolean> function) {
Map<Long, MessageReference> refs = getRefs(bridge);
return function.apply(refs);
}
}

View File

@ -18,6 +18,10 @@ package org.apache.activemq.artemis.tests.integration.cluster.bridge;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -27,6 +31,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@ -79,6 +84,7 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
ClientProducer producer = session0.createProducer("queues.testaddress");
int NUMBER_OF_MESSAGES = 100;
int REPEATS = 5;
Assert.assertEquals(1, servers[0].getClusterManager().getClusterConnections().size());
@ -91,16 +97,55 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
Wait.assertEquals(2, () -> bridge.getSessionFactory().getServerLocator().getTopology().getMembers().size());
ArrayList<TopologyMemberImpl> originalmembers = new ArrayList<>(bridge.getSessionFactory().getServerLocator().getTopology().getMembers());
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = session0.createMessage(true);
producer.send(msg);
session0.commit();
AtomicInteger errors = new AtomicInteger(0);
if (i == 17) {
bridge.getSessionFactory().getConnection().fail(new ActiveMQException("failed once!"));
}
// running the loop a couple of times
for (int repeat = 0; repeat < REPEATS; repeat++) {
CountDownLatch latchSent = new CountDownLatch(1);
Thread t = new Thread(() -> {
try {
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = session0.createMessage(true);
producer.send(msg);
latchSent.countDown();
if (i % 10 == 0) {
session0.commit();
}
}
session0.commit();
} catch (Exception e) {
errors.incrementAndGet();
// not really supposed to happen
e.printStackTrace();
}
});
t.start();
Executor executorFail = servers[0].getExecutorFactory().getExecutor();
Assert.assertTrue(latchSent.await(10, TimeUnit.SECONDS));
Wait.waitFor(() -> BridgeTestAccessor.withinRefs(bridge, (refs) -> {
synchronized (refs) {
if (refs.size() > 0) {
executorFail.execute(() -> {
bridge.connectionFailed(new ActiveMQException("bye"), false);
});
return true;
} else {
return false;
}
}
}), 500, 1);
Wait.assertEquals(0L, bridge.getQueue()::getMessageCount, 5000, 1);
Wait.assertEquals(0, bridge.getQueue()::getDeliveringCount, 5000, 1);
t.join(5000);
}
Assert.assertEquals(0, errors.get());
Wait.assertEquals(2, () -> bridge.getSessionFactory().getServerLocator().getTopology().getMembers().size());
ArrayList<TopologyMemberImpl> afterReconnectedMembers = new ArrayList<>(bridge.getSessionFactory().getServerLocator().getTopology().getMembers());
@ -126,7 +171,7 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
int cons0Count = 0, cons1Count = 0;
while (true) {
ClientMessage msg = consumers[0].getConsumer().receive(1000);
ClientMessage msg = consumers[0].getConsumer().receiveImmediate();
if (msg == null) {
break;
}
@ -136,7 +181,7 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
}
while (true) {
ClientMessage msg = consumers[1].getConsumer().receive(1000);
ClientMessage msg = consumers[1].getConsumer().receiveImmediate();
if (msg == null) {
break;
}
@ -145,7 +190,7 @@ public class ClusteredBridgeReconnectTest extends ClusterTestBase {
session1.commit();
}
Assert.assertEquals("cons0 = " + cons0Count + ", cons1 = " + cons1Count, NUMBER_OF_MESSAGES, cons0Count + cons1Count);
Assert.assertEquals("cons0 = " + cons0Count + ", cons1 = " + cons1Count, NUMBER_OF_MESSAGES * REPEATS, cons0Count + cons1Count);
session0.commit();
session1.commit();

View File

@ -0,0 +1,273 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.bridge;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class NettyBridgeReconnectTest extends BridgeTestBase {
private static final Logger log = Logger.getLogger(NettyBridgeReconnectTest.class);
final String bridgeName = "bridge1";
final String testAddress = "testAddress";
final int confirmationWindowSize = 100 * 1024;
Map<String, Object> server0Params;
Map<String, Object> server1Params;
ActiveMQServer server0;
ActiveMQServer server1;
private TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
private TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params, "server1tc");
private Map<String, TransportConfiguration> connectors;
private ArrayList<String> staticConnectors;
@After
public void destroyServer() throws Exception {
if (server1 != null) {
server1.stop();
}
if (server0 != null) {
server0.stop();
}
}
private void server1Start() throws Exception {
server1.start();
}
public void server0Start() throws Exception {
server0 = createActiveMQServer(0, server0Params, isNetty(), null);
server0.getConfiguration().setConnectorConfigurations(connectors);
server0.start();
}
@Before
public void setServer() throws Exception {
server0Params = new HashMap<>();
server1Params = new HashMap<>();
connectors = new HashMap<>();
server1 = createActiveMQServer(1, isNetty(), server1Params);
connectors.put(server1tc.getName(), server1tc);
connectors.put(server0tc.getName(), server0tc);
connectors.put(server1tc.getName(), server1tc);
staticConnectors = new ArrayList<>();
staticConnectors.add(server1tc.getName());
}
protected boolean isNetty() {
return true;
}
/**
* @return
*/
private String getConnector() {
return NETTY_CONNECTOR_FACTORY;
}
@Test
public void testFailoverWhileSending() throws Exception {
internalFailoverWhileSending(false);
}
@Test
public void testFailoverWhileSendingInternalQueue() throws Exception {
internalFailoverWhileSending(true);
}
private void internalFailoverWhileSending(boolean forceInternal) throws Exception {
connectors.put(server0tc.getName(), server0tc);
connectors.put(server1tc.getName(), server1tc);
server1.getConfiguration().setConnectorConfigurations(connectors);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName(bridgeName).setQueueName(testAddress).setForwardingAddress(testAddress).setRetryInterval(10).setReconnectAttempts(-1).setReconnectAttemptsOnSameNode(-1).setConfirmationWindowSize(confirmationWindowSize).setPassword(CLUSTER_PASSWORD);
List<String> bridgeConnectors = new ArrayList<>();
bridgeConnectors.add(server0tc.getName());
bridgeConfiguration.setStaticConnectors(bridgeConnectors);
bridgeConfiguration.setQueueName(testAddress);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);
server1.getConfiguration().setBridgeConfigurations(bridgeConfigs);
server0Start();
server1Start();
org.apache.activemq.artemis.core.server.Queue serverQueue1 = server1.locateQueue(testAddress);
if (forceInternal) {
// pretending this is an internal queue
// as the check acks will play it differently
serverQueue1.setInternalQueue(true);
}
int TRANSACTIONS = 10;
int NUM_MESSAGES = 1000;
try (ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory) CFUtil.createConnectionFactory("core", "tcp://localhost:61617");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
Queue queue = session.createQueue(testAddress);
MessageProducer producer = session.createProducer(queue);
int sequenceID = 0;
for (int j = 0; j < TRANSACTIONS; j++) {
for (int i = 0; i < NUM_MESSAGES; i++) {
producer.send(session.createTextMessage("" + (sequenceID++)));
}
session.commit();
}
}
//remoteServer0 = SpawnedVMSupport.spawnVM(NettyBridgeReconnectTest.class.getName(), getTestDir());
Wait.waitFor(() -> serverQueue1.getDeliveringCount() > 10, 300_000);
BridgeImpl bridge = null;
for (Consumer c : serverQueue1.getConsumers()) {
System.out.println("Consumer " + c);
if (c instanceof BridgeImpl) {
bridge = (BridgeImpl) c;
}
}
Assert.assertNotNull(bridge);
Executor executorFail = server1.getExecutorFactory().getExecutor();
{
BridgeImpl finalBridge = bridge;
Wait.assertTrue(() -> BridgeTestAccessor.withinRefs(finalBridge, (refs) -> {
synchronized (refs) {
if (refs.size() > 100) {
executorFail.execute(() -> {
finalBridge.connectionFailed(new ActiveMQException("bye"), false);
});
return true;
} else {
return false;
}
}
}));
}
AtomicInteger queuesTested = new AtomicInteger(0);
// Everything is transferred, so everything should be zeroed on the addressses
server1.getPostOffice().getAllBindings().filter(b -> b instanceof LocalQueueBinding).forEach((b) -> {
queuesTested.incrementAndGet();
try {
Wait.assertEquals(0, ((LocalQueueBinding) b).getQueue().getPagingStore()::getAddressSize);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
});
Assert.assertTrue(queuesTested.get() > 0);
Wait.assertEquals(0, serverQueue1::getDeliveringCount);
Wait.assertEquals(0, serverQueue1::getMessageCount);
try (ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory) CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue(testAddress);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
HashSet<String> received = new HashSet<>();
for (int j = 0; j < TRANSACTIONS * NUM_MESSAGES; j++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
received.add(message.getText());
//Assert.assertEquals("" + j, message.getText());
}
Assert.assertEquals(TRANSACTIONS * NUM_MESSAGES, received.size());
Assert.assertNull(consumer.receiveNoWait());
}
queuesTested.set(0);
// After everything is consumed.. we will check the sizes being 0
server0.getPostOffice().getAllBindings().filter(b -> b instanceof LocalQueueBinding).forEach((b) -> {
queuesTested.incrementAndGet();
try {
Wait.assertEquals(0, ((LocalQueueBinding) b).getQueue().getPagingStore()::getAddressSize);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
});
Assert.assertTrue(queuesTested.get() > 0);
}
@Override
protected ActiveMQServer createActiveMQServer(final int id,
final Map<String, Object> params,
final boolean netty,
final NodeManager nodeManager) throws Exception {
ActiveMQServer server = super.createActiveMQServer(id, params, netty, nodeManager);
QueueConfiguration queueConfig0 = new QueueConfiguration(testAddress).setAddress(testAddress).setRoutingType(RoutingType.ANYCAST);
List<QueueConfiguration> queueConfigs0 = new ArrayList<>();
queueConfigs0.add(queueConfig0);
CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration();
addressConfiguration.setName(testAddress).addRoutingType(RoutingType.ANYCAST);
addressConfiguration.addQueueConfiguration(new QueueConfiguration(testAddress).setAddress(testAddress).setRoutingType(RoutingType.ANYCAST));
server.getConfiguration().addAddressConfiguration(addressConfiguration);
server.getConfiguration().setPersistIDCache(true);
return server;
}
}