ARTEMIS-3652 - tighten up test assertions and cleanup, track errors via connection listener and consider missing send reply

This commit is contained in:
gtully 2022-01-25 14:33:22 +00:00 committed by Gary Tully
parent c05177d723
commit 90535a2401
1 changed files with 75 additions and 25 deletions

View File

@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -73,6 +74,7 @@ public class ElasticQueueTest extends ActiveMQTestBase {
static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
final int base_port = 61616;
final Stack<Worker> workers = new Stack<>();
final Stack<EmbeddedActiveMQ> nodes = new Stack<>();
private final String balancerConfigName = "role_name_sharder";
@ -86,6 +88,10 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}
}
nodes.clear();
for (Worker worker : workers) {
worker.done.set(true);
}
workers.clear();
executorService.shutdownNow();
}
@ -98,8 +104,8 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}
builder.append("amqp://localhost:").append(port_start++);
}
// fast reconnect, randomize to get to all brokers and timeout sends that block on no credit
builder.append(")?failover.randomize=true&failover.maxReconnectAttempts=1&jms.sendTimeout=" + 1000);
// fast reconnect, randomize to get to all brokers and timeout sends that block on no credit once connected
builder.append(")?failover.randomize=true&failover.maxReconnectAttempts=0&jms.sendTimeout=" + 1000);
return builder.toString();
}
@ -107,9 +113,11 @@ public class ElasticQueueTest extends ActiveMQTestBase {
static class ConnectionListener implements JmsConnectionListener {
AtomicInteger connectionCount;
final AtomicReference<JMSException> failureReason;
ConnectionListener(AtomicInteger connectionCount) {
ConnectionListener(AtomicInteger connectionCount, AtomicReference<JMSException> failureReason) {
this.connectionCount = connectionCount;
this.failureReason = failureReason;
}
@Override
@ -118,6 +126,11 @@ public class ElasticQueueTest extends ActiveMQTestBase {
@Override
public void onConnectionFailure(Throwable throwable) {
if (failureReason != null) {
JMSException wrapper = new JMSException("ConnectionFailureViaListener");
wrapper.setLinkedException(new RuntimeException(throwable));
failureReason.set(wrapper);
}
}
@Override
@ -146,12 +159,18 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}
}
abstract class Worker implements Runnable {
final AtomicBoolean done = new AtomicBoolean();
Worker() {
workers.push(this);
}
}
// slow consumer
class EQConsumer implements Runnable {
class EQConsumer extends Worker {
final AtomicInteger consumedCount = new AtomicInteger();
final AtomicInteger connectionCount = new AtomicInteger();
final AtomicBoolean done = new AtomicBoolean();
final AtomicInteger delayMillis;
private final String url;
long lastConsumed = 0;
@ -171,18 +190,18 @@ public class ElasticQueueTest extends ActiveMQTestBase {
try {
while (!done.get()) {
JmsConnectionFactory factory = new JmsConnectionFactory("CONSUMER", "PASSWORD", url);
try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
// track disconnects via faiover listener
connectionCount.incrementAndGet();
connection.addConnectionListener(new ConnectionListener(connectionCount));
final AtomicReference<JMSException> fatalError = new AtomicReference<>();
connection.addConnectionListener(new ConnectionListener(connectionCount, fatalError));
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(session.createQueue(qName));
while (!done.get()) {
while (!done.get() && fatalError.get() == null) {
Message receivedMessage = messageConsumer.receiveNoWait();
if (receivedMessage != null) {
consumedCount.incrementAndGet();
@ -205,11 +224,10 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}
// regular producer
static class EQProducer implements Runnable {
class EQProducer extends Worker {
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger connectionCount = new AtomicInteger();
final AtomicBoolean done = new AtomicBoolean();
private final String url;
EQProducer(String url) {
@ -227,7 +245,8 @@ public class ElasticQueueTest extends ActiveMQTestBase {
// track disconnects via faiover listener
connectionCount.incrementAndGet();
connection.addConnectionListener(new ConnectionListener(connectionCount));
final AtomicReference<JMSException> fatalError = new AtomicReference<>();
connection.addConnectionListener(new ConnectionListener(connectionCount, fatalError));
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -236,7 +255,7 @@ public class ElasticQueueTest extends ActiveMQTestBase {
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[1024]);
while (!done.get()) {
while (!done.get() && fatalError.get() == null) {
connectedToUri = connection.getConnectedURI();
message.setLongProperty("PID", producedCount.get() + 1);
messageProducer.send(message);
@ -254,17 +273,17 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}
// combined producer/ async consumer
static class EQProducerAsyncConsumer implements Runnable {
class EQProducerAsyncConsumer extends Worker {
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger connectionCount = new AtomicInteger();
final AtomicBoolean done = new AtomicBoolean();
final AtomicBoolean producerDone = new AtomicBoolean();
final AtomicInteger consumerSleepMillis = new AtomicInteger(1000);
private final String url;
final AtomicInteger consumedCount = new AtomicInteger();
private final String user;
private long lastConsumed;
private AtomicReference<JmsConnection> currentConnection = new AtomicReference<>();
EQProducerAsyncConsumer(String url, String user) {
this.url = url;
@ -277,11 +296,13 @@ public class ElasticQueueTest extends ActiveMQTestBase {
while (!done.get()) {
JmsConnectionFactory factory = new JmsConnectionFactory(user, "PASSWORD", url);
final AtomicReference<JMSException> fatalError = new AtomicReference<>();
try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
currentConnection.set(connection);
// track disconnects via faiover listener
connectionCount.incrementAndGet();
connection.addConnectionListener(new ConnectionListener(connectionCount));
connection.addConnectionListener(new ConnectionListener(connectionCount, fatalError));
connection.start();
Session clientSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@ -295,7 +316,11 @@ public class ElasticQueueTest extends ActiveMQTestBase {
TimeUnit.MILLISECONDS.sleep(consumerSleepMillis.get());
}
message.acknowledge();
} catch (JMSException | InterruptedException ignored) {
} catch (JMSException | InterruptedException treatAsFatal) {
JMSException errorWrapper = new JMSException("ERROR from onMessage");
errorWrapper.setLinkedException(treatAsFatal);
fatalError.set(errorWrapper);
System.out.println("OnMessage Got: " + treatAsFatal + ", lastConsumed:" + lastConsumed + ", connectionCount:" + connectionCount.get());
}
});
@ -304,16 +329,20 @@ public class ElasticQueueTest extends ActiveMQTestBase {
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[1024]);
while (!done.get()) {
if (fatalError.get() != null) {
throw fatalError.get();
}
if (!producerDone.get()) {
message.setLongProperty("PID", producedCount.get() + 1);
messageProducer.send(message);
producedCount.incrementAndGet();
} else {
// just hang about and let the consumer listener work
TimeUnit.SECONDS.sleep(5);
TimeUnit.SECONDS.sleep(1);
}
}
} catch (JMSException | InterruptedException ignored) {
System.out.println("Exception: " + ignored.toString() + ", PC=" + producedCount.get());
}
}
}
@ -325,6 +354,16 @@ public class ElasticQueueTest extends ActiveMQTestBase {
public long getLastConsumed() {
return lastConsumed;
}
@Override
public String toString() {
JmsConnection connectedTo = currentConnection.get();
if (connectedTo != null) {
return "EQProducerAsyncConsumer on:" + connectedTo.getConnectedURI();
} else {
return super.toString();
}
}
}
MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
@ -491,8 +530,7 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}, 10000, 200));
assertTrue("Got all produced", Wait.waitFor(() -> {
System.out.println("consumed pid: " + eqConsumer.getLastConsumed() + ", produced: " + eqProducer.getLastProduced());
return (eqProducer.getLastProduced() == eqConsumer.getLastConsumed());
return pidInRange("head&tail", eqProducer.getLastProduced(), eqConsumer.getLastConsumed());
}, 4000, 100));
eqConsumer.done.set(true);
@ -584,13 +622,12 @@ public class ElasticQueueTest extends ActiveMQTestBase {
// head should drain
assertTrue(Wait.waitFor(() -> {
int usage = addressControl1.getAddressLimitPercent();
System.out.println("Node1 usage % " + usage);
System.out.println("Node1 usage % " + usage + ", eqProducerConsumer: " + eqProducerConsumer);
return usage == 0;
},10000, 200));
},10000, 500));
assertTrue(Wait.waitFor(() -> {
System.out.println("current head&tail lastProduced: " + eqProducerConsumer.getLastProduced() + ", consumed: " + eqProducerConsumer.getLastConsumed());
return eqProducerConsumer.getLastProduced() == eqProducerConsumer.getLastConsumed();
return pidInRange("head&tail", eqProducerConsumer.getLastProduced(), eqProducerConsumer.getLastConsumed());
},5000, 100));
eqProducerConsumer.done.set(true);
@ -689,12 +726,25 @@ public class ElasticQueueTest extends ActiveMQTestBase {
}, 20000, 200));
assertTrue(Wait.waitFor(() -> {
System.out.println("current head&tail lastProduced: " + eqProducerConsumer.getLastProduced() + ", consumed: " + eqProducerConsumer.getLastConsumed());
return eqProducerConsumer.getLastProduced() == eqProducerConsumer.getLastConsumed();
return pidInRange("head&tail", eqProducerConsumer.getLastProduced(), eqProducerConsumer.getLastConsumed());
}, 20000, 200));
eqProducerConsumer.done.set(true);
nodes.get(1).stop();
}
private boolean pidInRange(String s, long lastProduced, long lastConsumed) {
System.out.println(String.format("pidInRange - %s, lastProduced: %d, lastConsumed: %d", s, lastProduced, lastConsumed));
if (lastConsumed == lastProduced) {
return true;
}
// in case of a send timeout or just a missed reply to send (the send disposition is lost), produced pid does not get
// incremented to reflect the send completion on the broker. An off by one error is expected sometimes.
// the connection can be whacked by the broker in the case of a slow consumer
if (lastConsumed == lastProduced + 1) {
return true;
}
return false;
}
}