This commit is contained in:
Martyn Taylor 2017-12-15 14:36:55 +00:00
commit 3815d2c37c
7 changed files with 518 additions and 0 deletions

View File

@ -88,6 +88,7 @@ under the License.
<module>security</module>
<module>security-ldap</module>
<module>send-acknowledgements</module>
<module>slow-consumer</module>
<module>spring-integration</module>
<module>ssl-enabled</module>
<module>ssl-enabled-dual-authentication</module>

View File

@ -0,0 +1,117 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.broker</groupId>
<artifactId>jms-examples</artifactId>
<version>2.5.0-SNAPSHOT</version>
</parent>
<artifactId>slow-consumer</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis JMS Slow Consumer Example</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client-all</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
</configuration>
</execution>
<execution>
<id>start</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<spawn>true</spawn>
<ignore>${noServer}</ignore>
<testURI>tcp://localhost:61616</testURI>
<args>
<param>run</param>
</args>
</configuration>
</execution>
<execution>
<id>runClientKill</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.jms.example.KillSlowConsumerExample</clientClass>
</configuration>
</execution>
<execution>
<id>runClientNotify</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.jms.example.NotifySlowConsumerExample</clientClass>
</configuration>
</execution>
<execution>
<id>stop</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.broker</groupId>
<artifactId>slow-consumer</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,50 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<html>
<head>
<title>ActiveMQ Artemis JMS Slow Consumer Example</title>
<link rel="stylesheet" type="text/css" href="../../../common/common.css" />
<link rel="stylesheet" type="text/css" href="../../../common/prettify.css" />
<script type="text/javascript" src="../../../common/prettify.js"></script>
</head>
<body onload="prettyPrint()">
<h1>JMS Slow Consumer Example</h1>
<pre>To run the example, simply type <b>mvn verify</b> from this directory, <br>or <b>mvn -PnoServer verify</b> if you want to start and create the server manually.</pre>
<p>This example demonstrates Slow Consumer policy KILL and NOTIFY and the associated address-settings.</p>
<p>How often the broker checks for Slow Consumers is configurable by <b>slow-consumer-check-period</b> in the brokers address-settings. The default value is for
<b>slow-consumer-check-period</b> is 5 seconds. A broker considers a consumer slow if the <b>slow-consumer-threshold</b> is not been met. The
<b>slow-consumer-threshold</b> is the number of messages consumed by the consumer within a second. When a slow consumer is detected, the broker action depends
on which <b>slow-consumer-policy</b> is configured.</p>
<p>The <b>slow-consumer-policy</b> <b>KILL</b> will kill the consumers connection</p>
<p>The <b>slow-consumer-policy</b> <b>NOTIFY</b> will send a CONSUMER_SLOW management notification that an application can receive</p>
<p> There are 2 example clients:<p>
<p><b>KillSlowConsumerExample</b> sends messages to a queue "slow.consumer.kill". It then starts a consumer BUT does not consume any messages. It waits for 8 seconds and tries
to consume a message. It expects to receive an exception as the connection should already be closed.</p>
<p><b>NotifySlowConsumerExample</b> sends messages to a queue "slow.consumer.notify". It creates a consumer on the topic "notify.topic" that has been configured as the broker's
<b>management-notification-address</b>. It then starts a consumer on "slow.consumer.notify" BUT does not consume any messages. The consumer on "notify.topic" will receive a CONSUMER_SLOW management notification
and then stop the client.</p>
</body>
</html>

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException;
/**
* A simple JMS example that demonstrates the KILL slow consumer policy:
*
* - sends messages to a queue "slow.consumer.kill".
* - starts a consumer BUT does not consume any messages.
* - waits for 8 seconds and tries to consume a message.
* - receive an exception as the connection should already be closed.
*
*/
public class KillSlowConsumerExample {
public static final int WAIT_TIME = 7;
public static void main(final String[] args) throws Exception {
// Step 1. Create an initial context to perform the JNDI lookup.
InitialContext initialContext = new InitialContext();
// Step 2. Perform a lookup on the queue
Queue slowConsumerKillQueue = (Queue) initialContext.lookup("queue/slow.consumer.kill");
// Step 3. Perform a lookup on the Connection Factory
ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
// Step 4.Create a JMS Connection
try (Connection connection = connectionFactory.createConnection()) {
// Step 5. Create a JMS Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 6. Create a JMS Message Producer
MessageProducer producer = session.createProducer(slowConsumerKillQueue);
// Step 7. Create a Text Message
TextMessage message = session.createTextMessage("This is a text message");
System.out.println("Sending messages to queue ... ");
// Step 8. Send messages to the queue
for (int i = 0; i < 50; i++) {
producer.send(message);
}
// Step 9. Create a JMS Message Consumer
MessageConsumer messageConsumer = session.createConsumer(slowConsumerKillQueue);
// Step 10. Start the Connection
connection.start();
System.out.println("About to wait for " + WAIT_TIME + " seconds");
// Step 11. Wait for slow consumer to be detected
Thread.sleep(TimeUnit.SECONDS.toMillis(WAIT_TIME));
try {
//step 12. Try to us the connection - expect it to be closed already
messageConsumer.receive(TimeUnit.SECONDS.toMillis(1));
//messageConsumer.receive() should throw exception - we should not get to here.
throw new RuntimeException("SlowConsumerExample.slowConsumerKill() FAILED - expected " +
"connection to be shutdown by Slow Consumer policy");
} catch (JMSException ex) {
if (ex.getCause() instanceof ActiveMQObjectClosedException) {
//received exception - as expected
System.out.println("SUCCESS! Received EXPECTED exception: " + ex );
} else {
throw new RuntimeException("SlowConsumerExample.slowConsumerKill() FAILED - expected " +
"ActiveMQObjectClosedException BUT got " + ex.getCause());
}
}
}
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
/**
* A simple JMS example that demonstrates the NOTIFY slow consumer policy
*
* - send messages to a queue "slow.consumer.notify".
* - create a consumer on the topic "notify.topic" that has been configured as the broker's <b>management-notification-address</b>.
* - start a consumer on "slow.consumer.notify" BUT does not consume any messages.
* - consumer on "notify.topic" will receive a CONSUMER_SLOW management notification and signal to main thread.
*/
public class NotifySlowConsumerExample {
public static final int WAIT_TIME = 10;
public static void main(final String[] args) throws Exception {
// Step 1. Create an initial context to perform the JNDI lookup
InitialContext initialContext = new InitialContext();
// Step 2. Perform a lookup on the queue and topic
Queue slowConsumerNotifyQueue = (Queue) initialContext.lookup("queue/slow.consumer.notify");
Topic notificationTopic = (Topic) initialContext.lookup("topic/notify.topic");
// Step 3. Perform a lookup on the Connection Factory
ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
// Step 4.Create a JMS Connection
try (Connection connection = connectionFactory.createConnection()) {
// Step 5. Create a JMS Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 6. Create a JMS Message Producer
MessageProducer producer = session.createProducer(slowConsumerNotifyQueue);
CountDownLatch waitForSlowConsumerNotif = new CountDownLatch(1);
// Step 7. Start the Connection
connection.start();
// Step 8. create a consumer on the broker's management-notification-address
Session notifSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer notificationConsumer = notifSession.createConsumer(notificationTopic,
ManagementHelper.HDR_NOTIFICATION_TYPE + " = '" +
CoreNotificationType.CONSUMER_SLOW + "'");
// Step 9. add a message listener to consumer listening to broker's management-notification-address,
// when it receives notification it signals main thread.
notificationConsumer.setMessageListener(message -> {
System.out.println("SUCCESS! Received CONSUMER_SLOW notification as expected: " + message);
//signal CONSUMER_SLOW notification received.
waitForSlowConsumerNotif.countDown();
});
// Step 10. Create a Text Message
TextMessage message = session.createTextMessage("This is a text message");
// Step 11. Send the Message
System.out.println("Sending messages to queue ... ");
for (int i = 0; i < 50; i++) {
producer.send(message);
}
// Step 12. Create a JMS Message Consumer
MessageConsumer messageConsumer = session.createConsumer(slowConsumerNotifyQueue);
System.out.println("About to wait for CONSUMER_SLOW notification, will timeout after " + WAIT_TIME + " seconds ...");
// Step 13. wait for CONSUMER_SLOW notification
boolean isNotified = waitForSlowConsumerNotif.await(WAIT_TIME, TimeUnit.SECONDS);
// Step 14. ensure CONSUMER_SLOW notification was received and "waitForSlowConsumerNotif" did not timeout
if (!isNotified) {
throw new RuntimeException("SlowConsumerExample.demoSlowConsumerNotify() FAILED; timeout occurred before" +
" - slow consumer notification was received. ");
}
}
}
}

View File

@ -0,0 +1,103 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<bindings-directory>./data/messaging/bindings</bindings-directory>
<journal-directory>./data/messaging/journal</journal-directory>
<large-messages-directory>./data/messaging/largemessages</large-messages-directory>
<paging-directory>./data/messaging/paging</paging-directory>
<management-notification-address>notify.topic</management-notification-address>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
</acceptors>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="slow.consumer.kill">
<permission roles="guest" type="createDurableQueue"/>
<permission roles="guest" type="deleteDurableQueue"/>
<permission roles="guest" type="createNonDurableQueue"/>
<permission roles="guest" type="deleteNonDurableQueue"/>
<permission roles="guest" type="consume"/>
<permission roles="guest" type="send"/>
</security-setting>
<security-setting match="slow.consumer.notify">
<permission roles="guest" type="createDurableQueue"/>
<permission roles="guest" type="deleteDurableQueue"/>
<permission roles="guest" type="createNonDurableQueue"/>
<permission roles="guest" type="deleteNonDurableQueue"/>
<permission roles="guest" type="consume"/>
<permission roles="guest" type="send"/>
</security-setting>
<security-setting match="notify.topic">
<permission roles="guest" type="createDurableQueue"/>
<permission roles="guest" type="deleteDurableQueue"/>
<permission roles="guest" type="createNonDurableQueue"/>
<permission roles="guest" type="deleteNonDurableQueue"/>
<permission roles="guest" type="consume"/>
<permission roles="guest" type="send"/>
</security-setting>
</security-settings>
<address-settings>
<address-setting match="slow.consumer.kill">
<slow-consumer-threshold>10</slow-consumer-threshold>
<slow-consumer-policy>KILL</slow-consumer-policy>
<slow-consumer-check-period>5</slow-consumer-check-period>
</address-setting>
<address-setting match="slow.consumer.notify">
<slow-consumer-threshold>10</slow-consumer-threshold>
<slow-consumer-policy>NOTIFY</slow-consumer-policy>
<slow-consumer-check-period>5</slow-consumer-check-period>
</address-setting>
</address-settings>
<addresses>
<address name="notify.topic">
<multicast/>
</address>
<address name="slow.consumer.kill">
<anycast>
<queue name="slow.consumer.kill"/>
</anycast>
</address>
<address name="slow.consumer.notify">
<anycast>
<queue name="slow.consumer.notify"/>
</anycast>
</address>
</addresses>
</core>
</configuration>

View File

@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.ConnectionFactory=tcp://localhost:61616
queue.queue/slow.consumer.notify=slow.consumer.notify
queue.queue/slow.consumer.kill=slow.consumer.kill
topic.topic/notify.topic=notify.topic