This closes #3281
This commit is contained in:
commit
0845ff2353
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.security;
|
||||
|
||||
import javax.security.auth.Subject;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
|
||||
|
@ -34,4 +35,6 @@ public interface SecurityStore {
|
|||
void setSecurityEnabled(boolean securityEnabled);
|
||||
|
||||
void stop();
|
||||
|
||||
Subject getSessionSubject(SecurityAuth session);
|
||||
}
|
||||
|
|
|
@ -349,6 +349,22 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
|
|||
return validatedUser;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the cached Subject. If the Subject is not in the cache then authenticate again to retrieve
|
||||
* it.
|
||||
*
|
||||
* @param session contains the authentication data
|
||||
* @return the authenticated Subject with all associated role principals or null if not
|
||||
* authenticated or JAAS is not supported by the SecurityManager.
|
||||
*/
|
||||
@Override
|
||||
public Subject getSessionSubject(SecurityAuth session) {
|
||||
if (securityManager instanceof ActiveMQSecurityManager5) {
|
||||
return getSubjectForAuthorization(session, (ActiveMQSecurityManager5) securityManager);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the cached Subject. If the Subject is not in the cache then authenticate again to retrieve it.
|
||||
*
|
||||
|
|
|
@ -289,6 +289,9 @@ public interface ActiveMQServer extends ServiceComponent {
|
|||
|
||||
void callBrokerMessagePlugins(ActiveMQPluginRunnable<ActiveMQServerMessagePlugin> pluginRun) throws ActiveMQException;
|
||||
|
||||
boolean callBrokerMessagePluginsCanAccept(ServerConsumer serverConsumer,
|
||||
MessageReference messageReference) throws ActiveMQException;
|
||||
|
||||
void callBrokerBridgePlugins(ActiveMQPluginRunnable<ActiveMQServerBridgePlugin> pluginRun) throws ActiveMQException;
|
||||
|
||||
void callBrokerCriticalPlugins(ActiveMQPluginRunnable<ActiveMQServerCriticalPlugin> pluginRun) throws ActiveMQException;
|
||||
|
|
|
@ -131,6 +131,7 @@ import org.apache.activemq.artemis.core.server.JournalType;
|
|||
import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.LoggingConfigurationFileReloader;
|
||||
import org.apache.activemq.artemis.core.server.MemoryManager;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
|
||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.core.server.PostQueueCreationCallback;
|
||||
|
@ -139,6 +140,7 @@ import org.apache.activemq.artemis.core.server.Queue;
|
|||
import org.apache.activemq.artemis.core.server.QueueFactory;
|
||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.ServiceComponent;
|
||||
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
||||
|
@ -2559,6 +2561,27 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
callBrokerPlugins(getBrokerMessagePlugins(), pluginRun);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean callBrokerMessagePluginsCanAccept(ServerConsumer serverConsumer, MessageReference messageReference) throws ActiveMQException {
|
||||
for (ActiveMQServerMessagePlugin plugin : getBrokerMessagePlugins()) {
|
||||
try {
|
||||
//if ANY plugin returned false the message will not be accepted for that consumer
|
||||
if (!plugin.canAccept(serverConsumer, messageReference)) {
|
||||
return false;
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
if (e instanceof ActiveMQException) {
|
||||
logger.debug("plugin " + plugin + " is throwing ActiveMQException");
|
||||
throw (ActiveMQException) e;
|
||||
} else {
|
||||
logger.warn("Internal error on plugin " + plugin, e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
//if ALL plugins have returned true consumer can accept message
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void callBrokerBridgePlugins(final ActiveMQPluginRunnable<ActiveMQServerBridgePlugin> pluginRun) throws ActiveMQException {
|
||||
callBrokerPlugins(getBrokerBridgePlugins(), pluginRun);
|
||||
|
|
|
@ -404,6 +404,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
|
||||
return HandleStatus.BUSY;
|
||||
}
|
||||
if (server.hasBrokerMessagePlugins() && !server.callBrokerMessagePluginsCanAccept(this, ref)) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Reference " + ref + " is not allowed to be consumed by " + this + " due to message plugin filter.");
|
||||
}
|
||||
return HandleStatus.NO_MATCH;
|
||||
}
|
||||
|
||||
synchronized (lock) {
|
||||
// If the consumer is stopped then we don't accept the message, it
|
||||
|
|
|
@ -158,6 +158,17 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a message is delivered to a client consumer
|
||||
*
|
||||
* @param consumer the consumer the message will be delivered to
|
||||
* @param reference message reference
|
||||
* @throws ActiveMQException
|
||||
*/
|
||||
default boolean canAccept(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Before a message is delivered to a client consumer
|
||||
*
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.core.server.plugin.impl;
|
||||
|
||||
import javax.security.auth.Subject;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.core.security.SecurityStore;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ConsumerInfo;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
public class BrokerMessageAuthorizationPlugin implements ActiveMQServerPlugin {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(BrokerMessageAuthorizationPlugin.class);
|
||||
|
||||
private static final String ROLE_PROPERTY = "ROLE_PROPERTY";
|
||||
private final AtomicReference<ActiveMQServer> server = new AtomicReference<>();
|
||||
private String roleProperty = "requiredRole";
|
||||
|
||||
@Override
|
||||
public void init(Map<String, String> properties) {
|
||||
roleProperty = properties.getOrDefault(ROLE_PROPERTY, "requiredRole");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registered(ActiveMQServer server) {
|
||||
this.server.set(server);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregistered(ActiveMQServer server) {
|
||||
this.server.set(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canAccept(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
|
||||
|
||||
String requiredRole = reference.getMessage().getStringProperty(roleProperty);
|
||||
if (requiredRole == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Subject subject = getSubject(consumer);
|
||||
if (subject == null) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Subject not found for consumer: " + consumer.getID());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
boolean permitted = new RolePrincipal(requiredRole).implies(subject);
|
||||
if (!permitted && logger.isDebugEnabled()) {
|
||||
logger.debug("Message consumer: " + consumer.getID() + " does not have required role `" + requiredRole + "` needed to receive message: " + reference.getMessageID());
|
||||
}
|
||||
return permitted;
|
||||
}
|
||||
|
||||
private Subject getSubject(ConsumerInfo consumer) {
|
||||
final ActiveMQServer activeMQServer = server.get();
|
||||
final SecurityStore securityStore = activeMQServer.getSecurityStore();
|
||||
ServerSession session = activeMQServer.getSessionByID(consumer.getSessionName());
|
||||
return securityStore.getSessionSubject(session);
|
||||
}
|
||||
|
||||
}
|
|
@ -126,3 +126,26 @@ In the example below both `SEND_CONNECTION_NOTIFICATIONS` and
|
|||
</broker-plugins>
|
||||
```
|
||||
|
||||
## Using the BrokerMessageAuthorizationPlugin
|
||||
|
||||
The `BrokerMessageAuthorizationPlugin` filters messages sent to consumers based on if they have a role that matches the value specified in a message property.
|
||||
|
||||
You can select which property will be used to specify the required role for consuming a message by setting the following configuration.
|
||||
|
||||
Property|Property Description|Default Value
|
||||
---|---|---
|
||||
`ROLE_PROPERTY`|Property name used to determine the role required to consume a message.|`requiredRole`.
|
||||
|
||||
|
||||
If the message does not have a property matching the configured `ROLE_PROPERTY` then the message will be sent to any consumer.
|
||||
|
||||
To configure the plugin, you can add the following configuration to the broker.
|
||||
In the example below `ROLE_PROPERTY` is set to `permissions` when that property is present messages will only be sent to consumers with a role matching its value.
|
||||
|
||||
```xml
|
||||
<broker-plugins>
|
||||
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.BrokerMessageAuthorizationPlugin">
|
||||
<property key="ROLE_PROPERTY" value="permissions" />
|
||||
</broker-plugin>
|
||||
</broker-plugins>
|
||||
```
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.activemq.examples.broker</groupId>
|
||||
<artifactId>jms-examples</artifactId>
|
||||
<version>2.17.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>broker-msg-auth-plugin</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>ActiveMQ Artemis Broker Auth Plugin Example</name>
|
||||
|
||||
<properties>
|
||||
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-jms-client-all</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-server</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-amqp-protocol</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.qpid</groupId>
|
||||
<artifactId>qpid-jms-client</artifactId>
|
||||
<version>${qpid.jms.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-jms-client-all</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
<artifactId>artemis-maven-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>create</id>
|
||||
<phase>verify</phase>
|
||||
<configuration>
|
||||
<!-- The broker plugin will install this library on the server's classpath -->
|
||||
<libList><arg>org.apache.activemq.examples.broker:broker-msg-auth-plugin:${project.version}</arg></libList>
|
||||
<ignore>${noServer}</ignore>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>create</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>start</id>
|
||||
<goals>
|
||||
<goal>cli</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<spawn>true</spawn>
|
||||
<ignore>${noServer}</ignore>
|
||||
<testURI>tcp://localhost:61616</testURI>
|
||||
<args>
|
||||
<param>run</param>
|
||||
</args>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>runClient</id>
|
||||
<goals>
|
||||
<goal>runClient</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<clientClass>org.apache.activemq.artemis.jms.example.BrokerAuthPluginExample</clientClass>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>stop</id>
|
||||
<goals>
|
||||
<goal>cli</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<ignore>${noServer}</ignore>
|
||||
<args>
|
||||
<param>stop</param>
|
||||
</args>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq.examples.broker</groupId>
|
||||
<artifactId>broker-msg-auth-plugin</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-clean-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
|
@ -0,0 +1,5 @@
|
|||
# Broker Plugin Example
|
||||
|
||||
To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to start and create the broker manually.
|
||||
|
||||
This example shows how a message plugin can be used to filter message sent to a consumer depending on that consumers roles. Credentials for a user are by default invalidated every 10 seconds so this plugin may cause excessive authentication if used without configuring the security-invalidation-interval limit appropriately.
|
|
@ -0,0 +1,161 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.jms.example;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
|
||||
/**
|
||||
* A simple example which shows how to use the BrokerMessageAuthorizationPlugin to filter messages given to user based on there role
|
||||
*/
|
||||
public class BrokerAuthPluginExample {
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
|
||||
// This example will send and receive an AMQP message
|
||||
sendConsumeAMQP();
|
||||
|
||||
// And it will also send and receive a Core message
|
||||
sendConsumeCore();
|
||||
}
|
||||
|
||||
private static void sendConsumeAMQP() throws JMSException {
|
||||
Connection adminConn = null;
|
||||
Connection guestConn = null;
|
||||
ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:5672");
|
||||
|
||||
try {
|
||||
|
||||
// Create an amqp qpid 1.0 connection
|
||||
adminConn = connectionFactory.createConnection("admin", "admin");
|
||||
guestConn = connectionFactory.createConnection();
|
||||
|
||||
// Create a session
|
||||
Session adminSession = adminConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Session guestSession = guestConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
// Create a sender
|
||||
// Topic destination = adminSession.createTopic("exampleTopic");
|
||||
Queue destination = adminSession.createQueue("exampleQueue");
|
||||
MessageProducer sender = adminSession.createProducer(destination);
|
||||
|
||||
TextMessage textMessage = adminSession.createTextMessage("Hello world ");
|
||||
textMessage.setStringProperty("requiredRole", "admin");
|
||||
|
||||
// create a moving receiver, this means the message will be removed from the queue
|
||||
MessageConsumer guestConsumer = guestSession.createConsumer(destination);
|
||||
MessageConsumer adminConsumer = adminSession.createConsumer(destination);
|
||||
|
||||
// send a simple message
|
||||
sender.send(textMessage);
|
||||
|
||||
guestConn.start();
|
||||
adminConn.start();
|
||||
|
||||
// receive the simple message
|
||||
TextMessage guestMessage = (TextMessage) guestConsumer.receive(5000);
|
||||
TextMessage adminMessage = (TextMessage) adminConsumer.receive(5000);
|
||||
|
||||
if (adminMessage == null) {
|
||||
throw new RuntimeException(("admin did not receive message"));
|
||||
}
|
||||
if (guestMessage != null) {
|
||||
throw new RuntimeException(("guest received a message that should have been filtered."));
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (adminConn != null) {
|
||||
// close the connection
|
||||
adminConn.close();
|
||||
}
|
||||
if (guestConn != null) {
|
||||
// close the connection
|
||||
guestConn.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void sendConsumeCore() throws JMSException {
|
||||
Connection adminConn = null;
|
||||
Connection guestConn = null;
|
||||
|
||||
try {
|
||||
// Perform a lookup on the Connection Factory
|
||||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||
|
||||
// Topic destination = new ActiveMQTopic("exampleTopic");
|
||||
Queue destination = new ActiveMQQueue("exampleQueue");
|
||||
|
||||
// Create a JMS Connection
|
||||
adminConn = connectionFactory.createConnection("admin", "admin");
|
||||
guestConn = connectionFactory.createConnection();
|
||||
|
||||
// Create a JMS Session
|
||||
Session adminSession = adminConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Session guestSession = guestConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
// Create a JMS Message Producer
|
||||
MessageProducer sender = adminSession.createProducer(destination);
|
||||
|
||||
// Create a Text Message
|
||||
TextMessage textMessage = adminSession.createTextMessage("Hello world ");
|
||||
textMessage.setStringProperty("requiredRole", "admin");
|
||||
|
||||
// create a moving receiver, this means the message will be removed from the queue
|
||||
MessageConsumer guestConsumer = guestSession.createConsumer(destination);
|
||||
MessageConsumer adminConsumer = adminSession.createConsumer(destination);
|
||||
|
||||
// send a simple message
|
||||
sender.send(textMessage);
|
||||
|
||||
guestConn.start();
|
||||
adminConn.start();
|
||||
|
||||
// receive the simple message
|
||||
TextMessage guestMessage = (TextMessage) guestConsumer.receive(5000);
|
||||
TextMessage adminMessage = (TextMessage) adminConsumer.receive(5000);
|
||||
|
||||
if (adminMessage == null) {
|
||||
throw new RuntimeException(("admin did not receive message"));
|
||||
}
|
||||
if (guestMessage != null) {
|
||||
throw new RuntimeException(("guest received a message that should have been filtered."));
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (adminConn != null) {
|
||||
// close the connection
|
||||
adminConn.close();
|
||||
}
|
||||
if (guestConn != null) {
|
||||
// close the connection
|
||||
guestConn.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
## ---------------------------------------------------------------------------
|
||||
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
## contributor license agreements. See the NOTICE file distributed with
|
||||
## this work for additional information regarding copyright ownership.
|
||||
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
## (the "License"); you may not use this file except in compliance with
|
||||
## the License. You may obtain a copy of the License at
|
||||
##
|
||||
## http://www.apache.org/licenses/LICENSE-2.0
|
||||
##
|
||||
## Unless required by applicable law or agreed to in writing, software
|
||||
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
## See the License for the specific language governing permissions and
|
||||
## limitations under the License.
|
||||
## ---------------------------------------------------------------------------
|
||||
|
||||
guest=guest,admin
|
||||
admin=admin
|
|
@ -0,0 +1,19 @@
|
|||
## ---------------------------------------------------------------------------
|
||||
## Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
## contributor license agreements. See the NOTICE file distributed with
|
||||
## this work for additional information regarding copyright ownership.
|
||||
## The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
## (the "License"); you may not use this file except in compliance with
|
||||
## the License. You may obtain a copy of the License at
|
||||
##
|
||||
## http://www.apache.org/licenses/LICENSE-2.0
|
||||
##
|
||||
## Unless required by applicable law or agreed to in writing, software
|
||||
## distributed under the License is distributed on an "AS IS" BASIS,
|
||||
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
## See the License for the specific language governing permissions and
|
||||
## limitations under the License.
|
||||
## ---------------------------------------------------------------------------
|
||||
|
||||
guest = ENC(1024:81BF1AEC3990E160673D0E1708E7DD9B47DF022A227CEDA94D2D50D856D1DDC6:49747BC4EF76C4DF4FD0CC42BA7F938458E5C465783A0E9498827FB904875C692CE4139753D8929ED3CA3D0B2CF50412252430FC853586CFDBFB42EDB8A9C3C0)
|
||||
admin = admin
|
|
@ -0,0 +1,202 @@
|
|||
<?xml version='1.0'?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
<configuration xmlns="urn:activemq"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
|
||||
|
||||
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="urn:activemq:core ">
|
||||
|
||||
<name>0.0.0.0</name>
|
||||
|
||||
<persistence-enabled>true</persistence-enabled>
|
||||
|
||||
<!-- this could be ASYNCIO, MAPPED, NIO
|
||||
ASYNCIO: Linux Libaio
|
||||
MAPPED: mmap files
|
||||
NIO: Plain Java Files
|
||||
-->
|
||||
<journal-type>NIO</journal-type>
|
||||
|
||||
<paging-directory>./data/paging</paging-directory>
|
||||
|
||||
<bindings-directory>./data/bindings</bindings-directory>
|
||||
|
||||
<journal-directory>./data/journal</journal-directory>
|
||||
|
||||
<large-messages-directory>./data/large-messages</large-messages-directory>
|
||||
|
||||
<journal-datasync>true</journal-datasync>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
||||
<journal-file-size>10M</journal-file-size>
|
||||
<!--
|
||||
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
|
||||
<network-check-NIC>theNicName</network-check-NIC>
|
||||
-->
|
||||
|
||||
<!--
|
||||
Use this to use an HTTP server to validate the network
|
||||
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
|
||||
|
||||
<!-- <network-check-period>10000</network-check-period> -->
|
||||
<!-- <network-check-timeout>1000</network-check-timeout> -->
|
||||
|
||||
<!-- this is a comma separated list, no spaces, just DNS or IPs
|
||||
it should accept IPV6
|
||||
|
||||
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
|
||||
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
|
||||
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
|
||||
<!-- <network-check-list>10.0.0.1</network-check-list> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv4 addresses -->
|
||||
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
|
||||
|
||||
<!-- use this to customize the ping used for ipv6 addresses -->
|
||||
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
|
||||
|
||||
|
||||
|
||||
|
||||
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
|
||||
<disk-scan-period>5000</disk-scan-period>
|
||||
|
||||
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
|
||||
that won't support flow control. -->
|
||||
<max-disk-usage>90</max-disk-usage>
|
||||
|
||||
<!-- should the broker detect dead locks and other issues -->
|
||||
<critical-analyzer>true</critical-analyzer>
|
||||
|
||||
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
|
||||
|
||||
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
|
||||
|
||||
<critical-analyzer-policy>HALT</critical-analyzer-policy>
|
||||
|
||||
<!-- the system will enter into page mode once you hit this limit.
|
||||
This is an estimate in bytes of how much the messages are using in memory
|
||||
|
||||
The system will use half of the available memory (-Xmx) by default for the global-max-size.
|
||||
You may specify a different value here if you need to customize it to your needs.
|
||||
|
||||
<global-max-size>100Mb</global-max-size>
|
||||
|
||||
-->
|
||||
<security-invalidation-interval>10800000</security-invalidation-interval>
|
||||
|
||||
<broker-plugins>
|
||||
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.BrokerMessageAuthorizationPlugin">
|
||||
<property key="ROLE_PROPERTY" value="requiredRole"/>
|
||||
</broker-plugin>
|
||||
</broker-plugins>
|
||||
|
||||
<acceptors>
|
||||
|
||||
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
||||
<!-- amqpCredits: The number of credits sent to AMQP producers -->
|
||||
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
|
||||
|
||||
<!-- Acceptor for every supported protocol -->
|
||||
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
|
||||
|
||||
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
|
||||
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
|
||||
|
||||
<!-- STOMP Acceptor. -->
|
||||
<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
|
||||
|
||||
<!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
|
||||
<acceptor name="hornetq">tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
|
||||
|
||||
<!-- MQTT Acceptor -->
|
||||
<acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
|
||||
|
||||
</acceptors>
|
||||
|
||||
|
||||
<security-settings>
|
||||
<security-setting match="#">
|
||||
<permission type="createNonDurableQueue" roles="guest,admin"/>
|
||||
<permission type="deleteNonDurableQueue" roles="guest,admin"/>
|
||||
<permission type="createDurableQueue" roles="guest,admin"/>
|
||||
<permission type="deleteDurableQueue" roles="guest,admin"/>
|
||||
<permission type="createAddress" roles="guest,admin"/>
|
||||
<permission type="deleteAddress" roles="guest,admin"/>
|
||||
<permission type="consume" roles="guest,admin"/>
|
||||
<permission type="browse" roles="guest,admin"/>
|
||||
<permission type="send" roles="guest,admin"/>
|
||||
<!-- we need this otherwise ./artemis data imp wouldn't work -->
|
||||
<permission type="manage" roles="guest,admin,role1"/>
|
||||
</security-setting>
|
||||
</security-settings>
|
||||
|
||||
<address-settings>
|
||||
<!-- if you define auto-create on certain queues, management has to be auto-create -->
|
||||
<address-setting match="activemq.management#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
<!--default for catch all-->
|
||||
<address-setting match="#">
|
||||
<dead-letter-address>DLQ</dead-letter-address>
|
||||
<expiry-address>ExpiryQueue</expiry-address>
|
||||
<redelivery-delay>0</redelivery-delay>
|
||||
<!-- with -1 only the global-max-size is in use for limiting -->
|
||||
<max-size-bytes>-1</max-size-bytes>
|
||||
<message-counter-history-day-limit>10</message-counter-history-day-limit>
|
||||
<address-full-policy>PAGE</address-full-policy>
|
||||
<auto-create-queues>true</auto-create-queues>
|
||||
<auto-create-addresses>true</auto-create-addresses>
|
||||
<auto-create-jms-queues>true</auto-create-jms-queues>
|
||||
<auto-create-jms-topics>true</auto-create-jms-topics>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
|
||||
<addresses>
|
||||
<address name="DLQ">
|
||||
<anycast>
|
||||
<queue name="DLQ" />
|
||||
</anycast>
|
||||
</address>
|
||||
<address name="ExpiryQueue">
|
||||
<anycast>
|
||||
<queue name="ExpiryQueue" />
|
||||
</anycast>
|
||||
</address>
|
||||
|
||||
</addresses>
|
||||
|
||||
</core>
|
||||
</configuration>
|
|
@ -43,6 +43,7 @@ under the License.
|
|||
<modules>
|
||||
<module>auto-closeable</module>
|
||||
<module>browser</module>
|
||||
<module>broker-msg-auth-plugin</module>
|
||||
<module>broker-plugin</module>
|
||||
<module>camel</module>
|
||||
<module>cdi</module>
|
||||
|
|
|
@ -0,0 +1,207 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.management;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.URL;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.security.Role;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.plugin.impl.BrokerMessageAuthorizationPlugin;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||
import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class MessageAuthorizationTest extends ActiveMQTestBase {
|
||||
|
||||
static {
|
||||
String path = System.getProperty("java.security.auth.login.config");
|
||||
if (path == null) {
|
||||
URL resource = SecurityTest.class.getClassLoader().getResource("login.config");
|
||||
if (resource != null) {
|
||||
path = resource.getFile();
|
||||
System.setProperty("java.security.auth.login.config", path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ActiveMQServer server;
|
||||
private SimpleString QUEUE = new SimpleString("TestQueue");
|
||||
private SimpleString TOPIC = new SimpleString("TestTopic");
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager("PropertiesLogin");
|
||||
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultNettyConfig().setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, true));
|
||||
server.getConfiguration().setPopulateValidatedUser(true);
|
||||
Set<Role> roles = new HashSet<>();
|
||||
roles.add(new Role("programmers", true, true, true, true, true, true, true, true, true, true));
|
||||
roles.add(new Role("a", false, true, true, true, true, false, false, false, true, true));
|
||||
roles.add(new Role("b", false, true, true, true, true, false, false, false, true, true));
|
||||
server.getConfiguration().putSecurityRoles("#", roles);
|
||||
|
||||
BrokerMessageAuthorizationPlugin plugin = new BrokerMessageAuthorizationPlugin();
|
||||
plugin.init(Collections.emptyMap());
|
||||
server.registerBrokerPlugin(plugin);
|
||||
server.start();
|
||||
server.createQueue(new QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST).setDurable(true));
|
||||
server.createQueue(new QueueConfiguration(TOPIC).setRoutingType(RoutingType.MULTICAST).setDurable(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessageAuthorizationQueue() throws Exception {
|
||||
JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:61616");
|
||||
Connection connection = factory.createConnection("first", "secret");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue queue = session.createQueue(QUEUE.toString());
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
TextMessage aMessage = session.createTextMessage();
|
||||
aMessage.setStringProperty("requiredRole", "a");
|
||||
TextMessage bMessage = session.createTextMessage();
|
||||
bMessage.setStringProperty("requiredRole", "b");
|
||||
Connection aConnection = factory.createConnection("a", "a");
|
||||
Session aSession = aConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
aConnection.start();
|
||||
Connection bConnection = factory.createConnection("b", "b");
|
||||
Session bSession = bConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
bConnection.start();
|
||||
MessageConsumer aConsumer = aSession.createConsumer(queue);
|
||||
MessageConsumer bConsumer = bSession.createConsumer(queue);
|
||||
|
||||
producer.send(aMessage);
|
||||
producer.send(bMessage);
|
||||
connection.close();
|
||||
|
||||
Message aMsg = aConsumer.receiveNoWait();
|
||||
Assert.assertNotNull(aMsg);
|
||||
Assert.assertEquals("a", aMsg.getStringProperty("requiredRole"));
|
||||
|
||||
Message bMsg = bConsumer.receiveNoWait();
|
||||
Assert.assertNotNull(bMsg);
|
||||
Assert.assertEquals("b", bMsg.getStringProperty("requiredRole"));
|
||||
|
||||
aConnection.close();
|
||||
bConnection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessageAuthorizationQueueNotAuthorized() throws Exception {
|
||||
JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:61616");
|
||||
Connection connection = factory.createConnection("first", "secret");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue queue = session.createQueue("TestQueueNotAuth");
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
TextMessage bMessage = session.createTextMessage();
|
||||
bMessage.setStringProperty("requiredRole", "b");
|
||||
Connection aConnection = factory.createConnection("a", "a");
|
||||
Session aSession = aConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
aConnection.start();
|
||||
MessageConsumer aConsumer = aSession.createConsumer(queue);
|
||||
|
||||
producer.send(bMessage);
|
||||
connection.close();
|
||||
|
||||
Assert.assertNull(aConsumer.receiveNoWait());
|
||||
|
||||
aConnection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessageAuthorizationTopic() throws Exception {
|
||||
JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:61616");
|
||||
Connection connection = factory.createConnection("first", "secret");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Topic topic = session.createTopic(TOPIC.toString());
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
TextMessage aMessage = session.createTextMessage();
|
||||
aMessage.setStringProperty("requiredRole", "a");
|
||||
TextMessage bMessage = session.createTextMessage();
|
||||
bMessage.setStringProperty("requiredRole", "b");
|
||||
|
||||
Connection aConnection = factory.createConnection("a", "a");
|
||||
Session aSession = aConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
aConnection.start();
|
||||
Connection bConnection = factory.createConnection("b", "b");
|
||||
Session bSession = bConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
bConnection.start();
|
||||
MessageConsumer aConsumer = aSession.createConsumer(topic);
|
||||
MessageConsumer bConsumer = bSession.createConsumer(topic);
|
||||
|
||||
producer.send(aMessage);
|
||||
producer.send(bMessage);
|
||||
connection.close();
|
||||
|
||||
Message bMsg = bConsumer.receiveNoWait();
|
||||
Assert.assertNotNull(bMsg);
|
||||
Assert.assertEquals("b", bMsg.getStringProperty("requiredRole"));
|
||||
Assert.assertNull(bConsumer.receiveNoWait());
|
||||
|
||||
Message aMsg = aConsumer.receiveNoWait();
|
||||
Assert.assertNotNull(aMsg);
|
||||
Assert.assertEquals("a", aMsg.getStringProperty("requiredRole"));
|
||||
Assert.assertNull(aConsumer.receiveNoWait());
|
||||
|
||||
aConnection.close();
|
||||
bConnection.close();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMessageAuthorizationTopicNotAuthorized() throws Exception {
|
||||
JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:61616");
|
||||
Connection connection = factory.createConnection("first", "secret");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Topic topic = session.createTopic("TestTopicNotAuth");
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
TextMessage bMessage = session.createTextMessage();
|
||||
bMessage.setStringProperty("requiredRole", "b");
|
||||
|
||||
Connection aConnection = factory.createConnection("a", "a");
|
||||
Session aSession = aConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
aConnection.start();
|
||||
MessageConsumer aConsumer = aSession.createConsumer(topic);
|
||||
|
||||
producer.send(bMessage);
|
||||
connection.close();
|
||||
|
||||
Assert.assertNull(aConsumer.receiveNoWait());
|
||||
|
||||
aConnection.close();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue