diff --git a/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java b/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java
index 47bb01d1c9..ecc28483d2 100644
--- a/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java
+++ b/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationBroker.java
@@ -45,25 +45,21 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
super(next);
this.authorizationMap = authorizationMap;
}
-
- @Override
- public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
- addDestination(context, info.getDestination(),true);
- super.addDestinationInfo(context, info);
- }
- @Override
- public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
+ protected SecurityContext checkSecurityContext(ConnectionContext context) throws SecurityException {
final SecurityContext securityContext = context.getSecurityContext();
if (securityContext == null) {
throw new SecurityException("User is not authenticated.");
}
-
+ return securityContext;
+ }
+
+ protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) {
Destination existing = this.getDestinationMap().get(destination);
if (existing != null) {
- return super.addDestination(context, destination,create);
+ return true;
}
-
+
if (!securityContext.isBrokerContext()) {
Set> allowedACLs = null;
if (!destination.isTemporary()) {
@@ -73,9 +69,29 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
}
if (allowedACLs != null && !securityContext.isInOneOf(allowedACLs)) {
- throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + destination);
+ return false;
}
+ }
+ return true;
+ }
+
+ @Override
+ public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
+ final SecurityContext securityContext = checkSecurityContext(context);
+ if (!checkDestinationAdmin(securityContext, info.getDestination())) {
+ throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + info.getDestination());
+ }
+
+ super.addDestinationInfo(context, info);
+ }
+
+ @Override
+ public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
+ final SecurityContext securityContext = checkSecurityContext(context);
+
+ if (!checkDestinationAdmin(securityContext, destination)) {
+ throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + destination);
}
return super.addDestination(context, destination,create);
@@ -83,31 +99,30 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
+ final SecurityContext securityContext = checkSecurityContext(context);
- final SecurityContext securityContext = context.getSecurityContext();
- if (securityContext == null) {
- throw new SecurityException("User is not authenticated.");
- }
- Set> allowedACLs = null;
- if (!destination.isTemporary()) {
- allowedACLs = authorizationMap.getAdminACLs(destination);
- } else {
- allowedACLs = authorizationMap.getTempDestinationAdminACLs();
- }
-
- if (!securityContext.isBrokerContext() && allowedACLs != null && !securityContext.isInOneOf(allowedACLs)) {
+ if (!checkDestinationAdmin(securityContext, destination)) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + destination);
}
+
super.removeDestination(context, destination, timeout);
}
@Override
- public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+ public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
+ final SecurityContext securityContext = checkSecurityContext(context);
- final SecurityContext subject = context.getSecurityContext();
- if (subject == null) {
- throw new SecurityException("User is not authenticated.");
+ if (!checkDestinationAdmin(securityContext, info.getDestination())) {
+ throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + info.getDestination());
}
+
+ super.removeDestinationInfo(context, info);
+ }
+
+ @Override
+ public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+ final SecurityContext securityContext = checkSecurityContext(context);
+
Set> allowedACLs = null;
if (!info.getDestination().isTemporary()) {
allowedACLs = authorizationMap.getReadACLs(info.getDestination());
@@ -115,10 +130,10 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
allowedACLs = authorizationMap.getTempDestinationReadACLs();
}
- if (!subject.isBrokerContext() && allowedACLs != null && !subject.isInOneOf(allowedACLs)) {
- throw new SecurityException("User " + subject.getUserName() + " is not authorized to read from: " + info.getDestination());
+ if (!securityContext.isBrokerContext() && allowedACLs != null && !securityContext.isInOneOf(allowedACLs)) {
+ throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to read from: " + info.getDestination());
}
- subject.getAuthorizedReadDests().put(info.getDestination(), info.getDestination());
+ securityContext.getAuthorizedReadDests().put(info.getDestination(), info.getDestination());
/*
* Need to think about this a little more. We could do per message
@@ -146,12 +161,9 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
@Override
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
+ final SecurityContext securityContext = checkSecurityContext(context);
- SecurityContext subject = context.getSecurityContext();
- if (subject == null) {
- throw new SecurityException("User is not authenticated.");
- }
- if (!subject.isBrokerContext() && info.getDestination() != null) {
+ if (!securityContext.isBrokerContext() && info.getDestination() != null) {
Set> allowedACLs = null;
if (!info.getDestination().isTemporary()) {
@@ -159,10 +171,10 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
} else {
allowedACLs = authorizationMap.getTempDestinationWriteACLs();
}
- if (allowedACLs != null && !subject.isInOneOf(allowedACLs)) {
- throw new SecurityException("User " + subject.getUserName() + " is not authorized to write to: " + info.getDestination());
+ if (allowedACLs != null && !securityContext.isInOneOf(allowedACLs)) {
+ throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to write to: " + info.getDestination());
}
- subject.getAuthorizedWriteDests().put(info.getDestination(), info.getDestination());
+ securityContext.getAuthorizedWriteDests().put(info.getDestination(), info.getDestination());
}
super.addProducer(context, info);
@@ -170,11 +182,9 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
- SecurityContext subject = producerExchange.getConnectionContext().getSecurityContext();
- if (subject == null) {
- throw new SecurityException("User is not authenticated.");
- }
- if (!subject.isBrokerContext() && !subject.getAuthorizedWriteDests().contains(messageSend.getDestination())) {
+ final SecurityContext securityContext = checkSecurityContext(producerExchange.getConnectionContext());
+
+ if (!securityContext.isBrokerContext() && !securityContext.getAuthorizedWriteDests().contains(messageSend.getDestination())) {
Set> allowedACLs = null;
if (!messageSend.getDestination().isTemporary()) {
@@ -183,10 +193,10 @@ public class AuthorizationBroker extends BrokerFilter implements SecurityAdminMB
allowedACLs = authorizationMap.getTempDestinationWriteACLs();
}
- if (allowedACLs != null && !subject.isInOneOf(allowedACLs)) {
- throw new SecurityException("User " + subject.getUserName() + " is not authorized to write to: " + messageSend.getDestination());
+ if (allowedACLs != null && !securityContext.isInOneOf(allowedACLs)) {
+ throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to write to: " + messageSend.getDestination());
}
- subject.getAuthorizedWriteDests().put(messageSend.getDestination(), messageSend.getDestination());
+ securityContext.getAuthorizedWriteDests().put(messageSend.getDestination(), messageSend.getDestination());
}
super.send(producerExchange, messageSend);
diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/TwoSecureBrokerRequestReplyTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/TwoSecureBrokerRequestReplyTest.java
new file mode 100644
index 0000000000..dca3b08914
--- /dev/null
+++ b/activemq-core/src/test/java/org/apache/activemq/usecases/TwoSecureBrokerRequestReplyTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+public class TwoSecureBrokerRequestReplyTest extends JmsMultipleBrokersTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(TwoSecureBrokerRequestReplyTest.class);
+
+ public void setUp() throws Exception {
+ super.setAutoFail(true);
+ super.setUp();
+
+ createBroker(new ClassPathResource("org/apache/activemq/usecases/sender-secured.xml"));
+ createBroker(new ClassPathResource("org/apache/activemq/usecases/receiver-secured.xml"));
+ }
+
+ public void testRequestReply() throws Exception {
+ ActiveMQQueue requestReplyDest = new ActiveMQQueue("RequestReply");
+
+ startAllBrokers();
+ waitForBridgeFormation();
+ waitForMinTopicRegionConsumerCount("sender", 1);
+ waitForMinTopicRegionConsumerCount("receiver", 1);
+
+
+ ConnectionFactory factory = getConnectionFactory("sender");
+ ActiveMQConnection conn = (ActiveMQConnection) factory.createConnection("system", "manager");
+ conn.setWatchTopicAdvisories(false);
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ConnectionFactory replyFactory = getConnectionFactory("receiver");
+ for (int i = 0; i < 2000; i++) {
+ TemporaryQueue tempDest = session.createTemporaryQueue();
+ MessageProducer producer = session.createProducer(requestReplyDest);
+ javax.jms.Message message = session.createTextMessage("req-" + i);
+ message.setJMSReplyTo(tempDest);
+
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(tempDest);
+ producer.send(message);
+
+ ActiveMQConnection replyConnection = (ActiveMQConnection) replyFactory.createConnection("system", "manager");
+ replyConnection.setWatchTopicAdvisories(false);
+ replyConnection.start();
+ Session replySession = replyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQMessageConsumer replyConsumer = (ActiveMQMessageConsumer) replySession.createConsumer(requestReplyDest);
+ javax.jms.Message msg = replyConsumer.receive(10000);
+ assertNotNull("request message not null: " + i, msg);
+ MessageProducer replyProducer = replySession.createProducer(msg.getJMSReplyTo());
+ replyProducer.send(session.createTextMessage("reply-" + i));
+ replyConnection.close();
+
+ javax.jms.Message reply = consumer.receive(10000);
+ assertNotNull("reply message : " + i + ", to: " + tempDest + ", by consumer:" + consumer.getConsumerId(), reply);
+ consumer.close();
+ tempDest.delete();
+ LOG.info("message #" + i + " processed");
+ }
+
+ }
+
+
+}
diff --git a/activemq-core/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml b/activemq-core/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml
index e525ee39d1..a21d13657f 100644
--- a/activemq-core/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml
+++ b/activemq-core/src/test/resources/org/apache/activemq/security/simple-auth-broker.xml
@@ -76,7 +76,7 @@
-
+
diff --git a/activemq-core/src/test/resources/org/apache/activemq/usecases/receiver-secured.xml b/activemq-core/src/test/resources/org/apache/activemq/usecases/receiver-secured.xml
new file mode 100755
index 0000000000..33b5a2ee1a
--- /dev/null
+++ b/activemq-core/src/test/resources/org/apache/activemq/usecases/receiver-secured.xml
@@ -0,0 +1,78 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/activemq-core/src/test/resources/org/apache/activemq/usecases/sender-secured.xml b/activemq-core/src/test/resources/org/apache/activemq/usecases/sender-secured.xml
new file mode 100755
index 0000000000..0eb9562d33
--- /dev/null
+++ b/activemq-core/src/test/resources/org/apache/activemq/usecases/sender-secured.xml
@@ -0,0 +1,82 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+