mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-23 10:52:56 +00:00
ARTEMIS-2327 Removing Bridge Test after fix
This test was playing with an ignore packet, which does not make any more sense after the last change. After a packet loss the bridge will reconnect, and this test makes no more sense.
This commit is contained in:
parent
415ff9b26a
commit
c4238e154f
@ -29,7 +29,6 @@ import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
@ -62,25 +61,22 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId
|
||||
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Bridge;
|
||||
import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
|
||||
import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
@ -318,7 +314,6 @@ public class BridgeTest extends ActiveMQTestBase {
|
||||
ClientSession session1 = sf1.createSession(true, true, 0);
|
||||
ClientConsumer consumer1 = session1.createConsumer(queueName1);
|
||||
|
||||
|
||||
session1.start();
|
||||
|
||||
final byte[] bytes = new byte[messageSize];
|
||||
@ -351,7 +346,7 @@ public class BridgeTest extends ActiveMQTestBase {
|
||||
}
|
||||
session1.commit();
|
||||
|
||||
BridgeImpl bridge = (BridgeImpl)server0.getClusterManager().getBridges().get("bridge1");
|
||||
BridgeImpl bridge = (BridgeImpl) server0.getClusterManager().getBridges().get("bridge1");
|
||||
|
||||
// stop in the middle. wait the bridge to block
|
||||
Wait.assertTrue("bridge is never blocked", bridge::isBlockedOnFlowControl);
|
||||
@ -379,7 +374,6 @@ public class BridgeTest extends ActiveMQTestBase {
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
|
||||
Wait.assertEquals(0, server0.locateQueue(SimpleString.toSimpleString("queue0"))::getMessageCount);
|
||||
|
||||
Assert.assertNull(consumer1.receiveImmediate());
|
||||
@ -514,160 +508,6 @@ public class BridgeTest extends ActiveMQTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLostMessageSimpleMessage() throws Exception {
|
||||
internalTestMessageLoss(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLostMessageLargeMessage() throws Exception {
|
||||
internalTestMessageLoss(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test will ignore messages
|
||||
* What will cause the bridge to fail with a timeout
|
||||
* The bridge should still recover the failure and reconnect on that case
|
||||
*/
|
||||
public void internalTestMessageLoss(final boolean largeMessage) throws Exception {
|
||||
class MyInterceptor implements Interceptor {
|
||||
|
||||
public boolean ignoreSends = true;
|
||||
public CountDownLatch latch;
|
||||
|
||||
MyInterceptor(int numberOfIgnores) {
|
||||
latch = new CountDownLatch(numberOfIgnores);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
|
||||
if (ignoreSends && packet instanceof SessionSendMessage ||
|
||||
ignoreSends && packet instanceof SessionSendLargeMessage ||
|
||||
ignoreSends && packet instanceof SessionSendContinuationMessage && !((SessionSendContinuationMessage) packet).isContinues()) {
|
||||
IntegrationTestLogger.LOGGER.info("IGNORED: " + packet);
|
||||
latch.countDown();
|
||||
return false;
|
||||
} else {
|
||||
IntegrationTestLogger.LOGGER.info(packet);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
MyInterceptor myInterceptor = new MyInterceptor(3);
|
||||
|
||||
Map<String, Object> server0Params = new HashMap<>();
|
||||
server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
|
||||
|
||||
Map<String, Object> server1Params = new HashMap<>();
|
||||
addTargetParameters(server1Params);
|
||||
server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
|
||||
|
||||
final String testAddress = "testAddress";
|
||||
final String queueName0 = "queue0";
|
||||
final String forwardAddress = "forwardAddress";
|
||||
final String queueName1 = "queue1";
|
||||
|
||||
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
|
||||
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
|
||||
|
||||
HashMap<String, TransportConfiguration> connectors = new HashMap<>();
|
||||
connectors.put(server1tc.getName(), server1tc);
|
||||
server0.getConfiguration().setConnectorConfigurations(connectors);
|
||||
|
||||
final int messageSize = 1024;
|
||||
|
||||
final int numMessages = 1;
|
||||
|
||||
ArrayList<String> connectorConfig = new ArrayList<>();
|
||||
connectorConfig.add(server1tc.getName());
|
||||
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(100).setReconnectAttempts(-1).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig).setCallTimeout(500);
|
||||
|
||||
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
|
||||
bridgeConfigs.add(bridgeConfiguration);
|
||||
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
|
||||
|
||||
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
|
||||
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
|
||||
queueConfigs0.add(queueConfig0);
|
||||
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
|
||||
|
||||
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
|
||||
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
|
||||
queueConfigs1.add(queueConfig1);
|
||||
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
|
||||
|
||||
server1.start();
|
||||
|
||||
server1.getRemotingService().addIncomingInterceptor(myInterceptor);
|
||||
|
||||
server0.start();
|
||||
locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
|
||||
ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
|
||||
|
||||
ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
|
||||
|
||||
ClientSession session0 = sf0.createSession(false, true, true);
|
||||
|
||||
ClientSession session1 = sf1.createSession(false, true, true);
|
||||
|
||||
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
|
||||
|
||||
ClientConsumer consumer1 = session1.createConsumer(queueName1);
|
||||
|
||||
session1.start();
|
||||
|
||||
final byte[] bytes = new byte[messageSize];
|
||||
|
||||
final SimpleString propKey = new SimpleString("testkey");
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = session0.createMessage(true);
|
||||
|
||||
if (largeMessage) {
|
||||
message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10 * 1024));
|
||||
}
|
||||
|
||||
message.putIntProperty(propKey, i);
|
||||
|
||||
message.getBodyBuffer().writeBytes(bytes);
|
||||
|
||||
producer0.send(message);
|
||||
}
|
||||
|
||||
assertTrue("where is the countDown?", myInterceptor.latch.await(30, TimeUnit.SECONDS));
|
||||
myInterceptor.ignoreSends = false;
|
||||
server1.getRemotingService().removeIncomingInterceptor(myInterceptor);
|
||||
IntegrationTestLogger.LOGGER.info("No longer ignoring packets.");
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = consumer1.receive(30000);
|
||||
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
Assert.assertEquals(i, message.getObjectProperty(propKey));
|
||||
|
||||
if (largeMessage) {
|
||||
readLargeMessages(message, 10);
|
||||
}
|
||||
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
Assert.assertNull(consumer1.receiveImmediate());
|
||||
|
||||
session0.close();
|
||||
|
||||
session1.close();
|
||||
|
||||
sf0.close();
|
||||
|
||||
sf1.close();
|
||||
closeFields();
|
||||
assertEquals("there should be no queues", 0, loadQueues(server0).size());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param server1Params
|
||||
*/
|
||||
@ -1216,7 +1056,6 @@ public class BridgeTest extends ActiveMQTestBase {
|
||||
final String propKey = "bridged";
|
||||
final String propValue = "true";
|
||||
|
||||
|
||||
TransformerConfiguration transformerConfiguration = new TransformerConfiguration(AddHeadersTransformer.class.getName());
|
||||
transformerConfiguration.getProperties().put(propKey, propValue);
|
||||
|
||||
@ -1278,7 +1117,6 @@ public class BridgeTest extends ActiveMQTestBase {
|
||||
|
||||
final int numMessages = 10;
|
||||
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = session0.createMessage(true);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user