This commit is contained in:
Clebert Suconic 2018-04-06 13:58:27 -04:00
commit 619b2cb2a4
2 changed files with 157 additions and 0 deletions

View File

@ -220,6 +220,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
this.server = server;
}
/** For tests mainly */
public boolean isBlockedOnFlowControl() {
return blockedOnFlowControl;
}
public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID) {
byte[] bytes = new byte[24];
@ -924,6 +929,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
}
// need to reset blockedOnFlowControl after creating a new producer
// otherwise in case the bridge was blocked before a previous failure
// this would never resume
blockedOnFlowControl = false;
producer = session.createProducer();
session.addFailureListener(BridgeImpl.this);

View File

@ -75,10 +75,13 @@ import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
@ -251,6 +254,151 @@ public class BridgeTest extends ActiveMQTestBase {
System.out.println(timeTaken + "ms");
}
@Test
public void testBlockedBridgeAndReconnect() throws Exception {
long time = System.currentTimeMillis();
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
Map<String, Object> server1Params = new HashMap<>();
addTargetParameters(server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
server1.getAddressSettingsRepository().clear();
server1.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(10124 * 10).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
server0.getAddressSettingsRepository().clear();
server0.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(Long.MAX_VALUE).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
HashMap<String, TransportConfiguration> connectors = new HashMap<>();
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
final int messageSize = 1024;
final int numMessages = 1000;
ArrayList<String> connectorConfig = new ArrayList<>();
connectorConfig.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(100).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(true).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig).setProducerWindowSize(1024);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
server0.start();
locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
ClientSession session0 = sf0.createSession(false, true, 0);
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
ClientSession session1 = sf1.createSession(true, true, 0);
ClientConsumer consumer1 = session1.createConsumer(queueName1);
session1.start();
final byte[] bytes = new byte[messageSize];
final SimpleString propKey = new SimpleString("testkey");
for (int i = 0; i < numMessages; i++) {
ClientMessage message = session0.createMessage(true);
message.putIntProperty(propKey, i);
message.getBodyBuffer().writeBytes(bytes);
producer0.send(message);
if (i % 100 == 0) {
session0.commit();
}
}
session0.commit();
for (int i = 0; i < numMessages / 2; i++) {
ClientMessage message = consumer1.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getObjectProperty(propKey));
message.acknowledge();
}
session1.commit();
BridgeImpl bridge = (BridgeImpl)server0.getClusterManager().getBridges().get("bridge1");
// stop in the middle. wait the bridge to block
Wait.assertTrue("bridge is never blocked", bridge::isBlockedOnFlowControl);
session1.close();
sf1.close();
// now restart the server.. the bridge should be reconnecting now
server1.stop();
server1.start();
sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
session1 = sf1.createSession(true, true, 0);
consumer1 = session1.createConsumer(queueName1);
session1.start();
// consume the rest of the messages
for (int i = numMessages / 2; i < numMessages; i++) {
ClientMessage message = consumer1.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getObjectProperty(propKey));
message.acknowledge();
}
Wait.assertEquals(0, server0.locateQueue(SimpleString.toSimpleString("queue0"))::getMessageCount);
Assert.assertNull(consumer1.receiveImmediate());
session0.close();
session1.close();
sf0.close();
sf1.close();
closeFields();
if (server0.getConfiguration().isPersistenceEnabled()) {
assertEquals(0, loadQueues(server0).size());
}
long timeTaken = System.currentTimeMillis() - time;
System.out.println(timeTaken + "ms");
}
public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles) throws Exception {
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);