ARTEMIS-3293 System test on Paging and DLQ processing

This commit is contained in:
Clebert Suconic 2021-05-07 11:56:22 -04:00 committed by clebertsuconic
parent 924a7fe7b6
commit a30b3a81b9
5 changed files with 730 additions and 1 deletions

View File

@ -803,7 +803,29 @@
<configuration>${basedir}/target/classes/servers/infinite-redelivery</configuration>
</configuration>
</execution>
<execution>
<phase>test-compile</phase>
<id>create-mmfactory</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<!-- this makes it easier in certain envs -->
<configuration>${basedir}/target/classes/servers/mmfactory</configuration>
<allowAnonymous>true</allowAnonymous>
<user>admin</user>
<password>admin</password>
<instance>${basedir}/target/mmfactory</instance>
<noWeb>false</noWeb>
<args>
<arg>--java-options</arg>
<arg>-Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.port=11099 -Dcom.sun.management.jmxremote.rmi.port=11098
-Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
</arg>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>

View File

@ -0,0 +1,244 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<persistence-enabled>true</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>NIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
<journal-file-size>10M</journal-file-size>
<!--
This value was determined through a calculation.
Your system could perform 0.14 writes per millisecond
on the current journal configuration.
That translates as a sync write every 7056000 nanoseconds.
Note: If you specify 0 the system will perform writes directly to the disk.
We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
-->
<journal-buffer-timeout>10000</journal-buffer-timeout>
<journal-sync-non-transactional>true</journal-sync-non-transactional>
<!--
When using ASYNCIO, this will determine the writing queue depth for libaio.
-->
<journal-max-io>1</journal-max-io>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<amqp-use-core-subscription-naming>true</amqp-use-core-subscription-naming>
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<page-sync-timeout>70560</page-sync-timeout>
<!-- the system will enter into page mode once you hit this limit.
This is an estimate in bytes of how much the messages are using in memory
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
<global-max-size>10Mb</global-max-size>
-->
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false
as duplicate detection requires applicationProperties to be parsed on the server. -->
<!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data.
default: 102400, -1 would mean to disable large mesasge control -->
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
<acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-delivery-attempts>2</max-delivery-attempts>
<max-size-bytes>524680</max-size-bytes>
<page-size-bytes>124680</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
<address name="MMControl">
<multicast>
</multicast>
</address>
<address name="MMFactory">
<multicast>
<queue name="MMConsumer" group-buckets="64" group-rebalance="true">
<filter string="(color='red' OR color='blue')" />
<durable>true</durable>
</queue>
</multicast>
</address>
</addresses>
</core>
</configuration>

View File

@ -17,6 +17,9 @@
package org.apache.activemq.artemis.tests.smoke.common;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
@ -86,4 +89,24 @@ public class SmokeTestBase extends ActiveMQTestBase {
return process;
}
protected static JMXConnector getJmxConnector(String hostname, int port) throws Exception {
// Without this, the RMI server would bind to the default interface IP (the user's local IP mostly)
System.setProperty("java.rmi.server.hostname", hostname);
// I don't specify both ports here manually on purpose. See actual RMI registry connection port extraction below.
String urlString = "service:jmx:rmi:///jndi/rmi://" + hostname + ":" + port + "/jmxrmi";
JMXServiceURL url = new JMXServiceURL(urlString);
JMXConnector jmxConnector = null;
try {
jmxConnector = JMXConnectorFactory.connect(url);
System.out.println("Successfully connected to: " + urlString);
} catch (Exception e) {
jmxConnector = null;
e.printStackTrace();
Assert.fail(e.getMessage());
}
return jmxConnector;
}
}

View File

@ -110,4 +110,5 @@ public class JmxServerControlTest extends SmokeTestBase {
jmxConnector.close();
}
}
}

View File

@ -0,0 +1,439 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.mmfactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.jboss.logging.Logger;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class MMSFactoryTest extends SmokeTestBase {
private static final Logger logger = Logger.getLogger(MMSFactoryTest.class);
public static final String SERVER_NAME_0 = "mmfactory";
private static final String JMX_SERVER_HOSTNAME = "localhost";
private static final int JMX_SERVER_PORT = 11099;
final String theprotocol;
final int BATCH_SIZE;
// how many times the server will be restarted
final int restarts;
// how many times the clients will run per restart
final int clientRuns;
// As the produces sends messages, a client will be killed every X messages. This is it!
final int killClientEveryX;
@Parameterized.Parameters(name = "protocol={0}, batchSize={1}, restarts={2}, clientRuns={3}, killEveryX={4}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][]{{"CORE", 2000, 2, 2, 500}, {"AMQP", 2000, 2, 2, 500}});
}
public MMSFactoryTest(String protocol, int batchSize, int restarts, int clientRuns, int killClientEveryX) {
this.theprotocol = protocol;
this.BATCH_SIZE = batchSize;
this.restarts = restarts;
this.clientRuns = clientRuns;
this.killClientEveryX = killClientEveryX;
}
public static void main(String[] arg) {
try {
Consumer consumer = new Consumer(arg[0], Integer.parseInt(arg[1]), arg[2], Integer.parseInt(arg[3]), arg[4], Integer.parseInt(arg[5]));
//consumer.run();
consumer.runListener();
while (true) {
Thread.sleep(10_000);
}
} catch (Throwable e) {
System.exit(1);
}
}
public static String getConsumerLog(int id) {
return getServerLocation(SERVER_NAME_0) + "/data/" + "Consumer" + id + ".log";
}
public static ConnectionFactory createConnectionFactory(String protocol, String uri) {
if (protocol.toUpperCase().equals("OPENWIRE")) {
return new org.apache.activemq.ActiveMQConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("AMQP")) {
if (uri.startsWith("tcp://")) {
// replacing tcp:// by amqp://
uri = "amqp" + uri.substring(3);
}
return new JmsConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) {
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri);
} else {
throw new IllegalStateException("Unkown:" + protocol);
}
}
Process startConsumerProcess(String protocol,
int slowTime,
String queueName,
int credits,
int consumerID) throws Exception {
return SpawnedVMSupport.spawnVM(MMSFactoryTest.class.getName(), protocol, "" + slowTime, queueName, "" + credits, getConsumerLog(consumerID), "" + consumerID);
}
Process serverProcess;
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
disableCheckThread();
serverProcess = startServer(SERVER_NAME_0, 0, 30000);
}
@Test
public void testMMSorting() throws Exception {
for (int i = 0; i < restarts; i++) {
logger.debug("*******************************************************************************************************************************");
logger.debug("Starting " + clientRuns);
logger.debug("*******************************************************************************************************************************");
testMMSorting(clientRuns * i, clientRuns * (i + 1));
stopServerWithFile(getServerLocation(SERVER_NAME_0));
Thread.sleep(1000);
try {
serverProcess.destroyForcibly();
} catch (Throwable ignored) {
}
serverProcess = startServer(SERVER_NAME_0, 0, 30000);
}
}
public void testMMSorting(int countStart, int countEnd) throws Exception {
JMXConnector jmxConnector = getJmxConnector(JMX_SERVER_HOSTNAME, JMX_SERVER_PORT);
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
String brokerName = "0.0.0.0"; // configured e.g. in broker.xml <broker-name> element
ObjectNameBuilder objectNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), brokerName, true);
//ActiveMQServerControl activeMQServerControl = MBeanServerInvocationHandler.newProxyInstance(mBeanServerConnection, objectNameBuilder.getActiveMQServerObjectName(), ActiveMQServerControl.class, false);
ObjectName queueObjectName = objectNameBuilder.getQueueObjectName(SimpleString.toSimpleString("MMFactory"), SimpleString.toSimpleString("MMConsumer"), RoutingType.MULTICAST);
QueueControl queueControl = MBeanServerInvocationHandler.newProxyInstance(mBeanServerConnection, queueObjectName, QueueControl.class, false);
final int NUMBER_OF_CONSUMERS = 6;
final int NUMBER_OF_FASTCONSUMERS = 0; // not using at the moment
Process[] consumers = new Process[NUMBER_OF_CONSUMERS + NUMBER_OF_FASTCONSUMERS];
int[] timeForConsumers = new int[NUMBER_OF_CONSUMERS + NUMBER_OF_FASTCONSUMERS];
for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
timeForConsumers[i] = (i % 2 == 0 ? 200 : 300);
}
timeForConsumers[1] = 100;
timeForConsumers[5] = 500;
for (int i = NUMBER_OF_CONSUMERS; i < NUMBER_OF_CONSUMERS + NUMBER_OF_FASTCONSUMERS; i++) {
timeForConsumers[i] = 0;
}
for (int i = 0; i < consumers.length; i++) {
consumers[i] = startConsumerProcess(theprotocol, timeForConsumers[i], "MMFactory::MMConsumer", 100, i);
}
Process dlqProcess = startConsumerProcess(theprotocol, 0, "DLQ", 100, 1000);
AtomicInteger retryNumber = new AtomicInteger(0);
int expectedTotalSize = 0;
ConnectionFactory factory = createConnectionFactory(theprotocol, "tcp://localhost:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic queue = session.createTopic("MMFactory");
MessageProducer mmsFactory = session.createProducer(queue);
Topic controlTopic = session.createTopic("MMControl");
String largeString;
{
StringBuffer largeStringBuffer = new StringBuffer();
while (largeStringBuffer.length() < 10) {
largeStringBuffer.append(RandomUtil.randomString());
}
largeString = largeStringBuffer.toString();
}
try {
for (int run = countStart; run <= countEnd; run++) {
AtomicInteger lastTime = new AtomicInteger((int)queueControl.getMessagesAcknowledged());
for (int i = 0; i < BATCH_SIZE; i++) {
if (i > 0 && i % killClientEveryX == 0) {
System.out.println("REconnecting...");
logger.debug("Reconnecting...");
consumers[0].destroyForcibly();
consumers[0] = startConsumerProcess(theprotocol, timeForConsumers[0], "MMFactory::MMConsumer", 100, 0);
consumers[1].destroyForcibly();
consumers[1] = startConsumerProcess(theprotocol, timeForConsumers[1], "MMFactory::MMConsumer", 100, 1);
logger.debug("...Reconnected");
logger.debug("retry=" + retryNumber + ",sent=" + i + ", acked on this batch = " + (queueControl.getMessagesAcknowledged() - (retryNumber.get() * BATCH_SIZE * 2)) + ", total acked = " + queueControl.getMessagesAcknowledged());
}
TextMessage message = session.createTextMessage("This is blue " + largeString);
message.setStringProperty("color", "blue");
message.setIntProperty("i", i);
mmsFactory.send(message);
message = session.createTextMessage("This is red " + largeString);
message.setStringProperty("color", "red");
message.setIntProperty("i", i);
mmsFactory.send(message);
if (i % 10 == 0) {
logger.debug("Sending " + i + " run = " + run);
}
if (i == 0) {
waitForAckChange(queueControl, lastTime);
}
}
Session sessionControl = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producerControl = sessionControl.createProducer(controlTopic);
Message controlmessage = sessionControl.createMessage();
controlmessage.setStringProperty("control", "flush");
producerControl.send(controlmessage);
sessionControl.commit();
sessionControl.close();
Wait.assertTrue(() -> {
// We will wait a bit here until it's a least a bit closer to the whole Batch
if ((queueControl.getMessagesAcknowledged() + queueControl.getMessagesKilled() + queueControl.getMessagesExpired()) - (retryNumber.get() * BATCH_SIZE * 2) > (BATCH_SIZE * 2 - 500)) {
return true;
} else {
logger.debug("Received " + queueControl.getMessagesAcknowledged());
return false;
}
}, 45_000, 1_000);
expectedTotalSize += BATCH_SIZE * 2;
Wait.assertEquals(expectedTotalSize, queueControl::getMessagesAdded);
Wait.assertEquals(expectedTotalSize, () -> queueControl.getMessagesAcknowledged() + queueControl.getMessagesKilled());
retryNumber.incrementAndGet();
for (Process c : consumers) {
c.destroyForcibly();
}
for (int i = 0; i < consumers.length; i++) {
File file = new File(getConsumerLog(i));
if (!file.delete()) {
logger.debug("not possible to remove " + file);
}
}
for (int r = 0; r < consumers.length; r++) {
consumers[r] = startConsumerProcess(theprotocol, timeForConsumers[r], "MMFactory::MMConsumer", 100, r);
}
}
Thread.sleep(1000);
} finally {
for (Process c : consumers) {
c.destroyForcibly();
}
dlqProcess.destroyForcibly();
for (int i = 0; i < consumers.length; i++) {
File file = new File(getConsumerLog(i));
if (!file.delete()) {
logger.warn("not possible to remove " + file);
}
}
File file = new File(getConsumerLog(1000)); //the DLQ processing ID used
if (!file.delete()) {
logger.warn("not possible to remove " + file);
}
}
}
}
private void waitForAckChange(QueueControl queueControl, AtomicInteger lastTime) throws Exception {
Wait.waitFor(() -> {
if (lastTime.get() == queueControl.getMessagesAcknowledged()) {
logger.debug("Waiting some change on " + queueControl.getMessagesAcknowledged() + " with messages Added = " + queueControl.getMessagesAdded() + " and killed = " + queueControl.getMessagesKilled());
return false;
} else {
logger.debug("Condition met! with " + queueControl.getMessagesAcknowledged() + " with messages Added = " + queueControl.getMessagesAdded() + " and killed = " + queueControl.getMessagesKilled());
lastTime.set((int)queueControl.getMessagesAcknowledged());
return true;
}
}, 5_000);
}
static class Consumer implements MessageListener {
boolean clientAck = false;
volatile int slowTime;
final String queuename;
final int credits;
final String protocol;
final int id;
ConnectionFactory factory;
Connection connection;
Session session;
MessageConsumer consumer;
MessageConsumer controlConsumer;
Queue queue;
Session sessionControl;
PrintStream fileStream;
Consumer(String protocol, int slowTime, String queueName, int credits, String logOutput, int id) throws Exception {
this.slowTime = slowTime;
this.queuename = queueName;
this.credits = credits;
this.protocol = protocol;
fileStream = new PrintStream(new FileOutputStream(logOutput, true), true);
this.id = id;
}
@Override
public void onMessage(Message message) {
try {
String color = message.getStringProperty("color");
int messageSequence = message.getIntProperty("i");
if (queuename.equals("DLQ")) {
logger.debug("Processing DLQ on color=" + color + ", sequence=" + messageSequence);
} else if (slowTime > 0) {
Thread.sleep(slowTime);
}
if (clientAck) {
message.acknowledge();
}
fileStream.println(color + ";" + messageSequence);
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
}
class ControlListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
// This is received at the client, on a remote machine, System.out is the best option to log here
System.out.println("Received control message");
if (message.getStringProperty("control").equals("flush")) {
Consumer.this.slowTime = 0;
System.out.println("Setting slow time to 0");
}
sessionControl.commit();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
public void runListener() {
//factory = createConnectionFactory(protocol, "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + credits);
factory = createConnectionFactory(protocol, "tcp://localhost:61616");
System.out.println("Starting");
connect();
try {
consumer.setMessageListener(Consumer.this);
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
private void connect() {
try {
if (connection != null) {
connection.close();
}
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, clientAck ? Session.CLIENT_ACKNOWLEDGE : Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(queuename);
consumer = session.createConsumer(queue);
sessionControl = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = sessionControl.createTopic("MMControl");
controlConsumer = sessionControl.createSharedDurableConsumer(topic, "consumer" + id);
controlConsumer.setMessageListener(new ControlListener());
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
}
}