This commit is contained in:
Justin Bertram 2017-04-28 13:08:10 -05:00
commit 27fce00468
7 changed files with 349 additions and 25 deletions

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.exceptions;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.qpid.proton.amqp.transport.AmqpError;
public class ActiveMQAMQPSecurityException extends ActiveMQAMQPException {
public ActiveMQAMQPSecurityException(String message, Throwable e) {
super(AmqpError.UNAUTHORIZED_ACCESS, message, e, ActiveMQExceptionType.SECURITY_EXCEPTION);
}
public ActiveMQAMQPSecurityException(String message) {
super(AmqpError.UNAUTHORIZED_ACCESS, message, ActiveMQExceptionType.SECURITY_EXCEPTION);
}
}

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalS
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidFieldException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPSecurityException;
import org.jboss.logging.Messages;
import org.jboss.logging.annotations.Message;
import org.jboss.logging.annotations.MessageBundle;
@ -77,4 +78,10 @@ public interface ActiveMQAMQPProtocolMessageBundle {
@Message(id = 219014, value = "Transaction not found: xid={0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQAMQPIllegalStateException txNotFound(String xidToString);
@Message(id = 219015, value = "not authorized to create consumer, {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQAMQPSecurityException securityErrorCreatingConsumer(String message);
@Message(id = 219016, value = "not authorized to create temporary destination, {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQAMQPSecurityException securityErrorCreatingTempDestination(String message);
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
import java.util.Arrays;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@ -30,6 +31,7 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
@ -96,6 +98,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
try {
sessionSPI.createTemporaryQueue(address, getRoutingType(target.getCapabilities()));
} catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(e.getMessage());
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
@ -160,10 +164,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (delivery.isPartial()) {
return;
}
receiver = ((Receiver) delivery.getLink());
Transaction tx = null;
byte[] data;
data = new byte[delivery.available()];
@ -171,7 +175,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
receiver.advance();
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
}
@ -183,7 +186,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
log.warn(e.getMessage(), e);
Rejected rejected = new Rejected();
ErrorCondition condition = new ErrorCondition();
condition.setCondition(Symbol.valueOf("failed"));
if (e instanceof ActiveMQSecurityException) {
condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
} else {
condition.setCondition(Symbol.valueOf("failed"));
}
condition.setDescription(e.getMessage());
rejected.setError(condition);
connection.lock();

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -262,7 +263,17 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
if (clientDefined) {
multicast = hasCapabilities(TOPIC, source);
AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
AddressQueryResult addressQueryResult = null;
try {
addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
} catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
} catch (ActiveMQAMQPException e) {
throw e;
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
if (!addressQueryResult.isExists()) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
}
@ -276,8 +287,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support");
}
} else {
//if not we look up the address
AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
// if not we look up the address
AddressQueryResult addressQueryResult = null;
try {
addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
} catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
} catch (ActiveMQAMQPException e) {
throw e;
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
if (!addressQueryResult.isExists()) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
}
@ -407,6 +428,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
} catch (ActiveMQAMQPResourceLimitExceededException e1) {
throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
} catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
} catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
}

View File

@ -58,6 +58,12 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
protected static final String BROKER_NAME = "localhost";
protected String noprivUser = "noprivs";
protected String noprivPass = "noprivs";
protected String browseUser = "browser";
protected String browsePass = "browser";
protected String guestUser = "guest";
protected String guestPass = "guest";
@ -220,6 +226,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
// User additions
securityManager.getConfiguration().addUser(noprivUser, noprivPass);
securityManager.getConfiguration().addRole(noprivUser, "nothing");
securityManager.getConfiguration().addUser(browseUser, browsePass);
securityManager.getConfiguration().addRole(browseUser, "browser");
securityManager.getConfiguration().addUser(guestUser, guestPass);
securityManager.getConfiguration().addRole(guestUser, "guest");
securityManager.getConfiguration().addUser(fullUser, fullPass);
@ -228,7 +238,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
// Configure roles
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
HashSet<Role> value = new HashSet<>();
value.add(new Role("guest", false, true, true, true, true, true, true, true));
value.add(new Role("nothing", false, false, false, false, false, false, false, false));
value.add(new Role("browser", false, false, false, false, false, false, false, true));
value.add(new Role("guest", false, true, false, false, false, false, false, true));
value.add(new Role("full", true, true, true, true, true, true, true, true));
securityRepository.addMatch(getQueueName(), value);

View File

@ -20,13 +20,19 @@ import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.junit.Test;
@ -96,10 +102,26 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
@Override
public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
DeliveryState state = delivery.getRemoteState();
if (!delivery.remotelySettled()) {
markAsInvalid("delivery is not remotely settled");
}
if (state instanceof Rejected) {
Rejected rejected = (Rejected) state;
if (rejected.getError() == null || rejected.getError().getCondition() == null) {
markAsInvalid("Delivery should have been Rejected with an error condition");
} else {
ErrorCondition error = rejected.getError();
if (!error.getCondition().equals(AmqpError.UNAUTHORIZED_ACCESS)) {
markAsInvalid("Should have been tagged with unauthorized access error");
}
}
} else {
markAsInvalid("Delivery should have been Rejected");
}
latch.countDown();
}
});
@ -107,26 +129,59 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
message.setMessageAnnotation("serialNo", 1);
message.setText("Test-Message");
try {
sender.send(message);
} catch (IOException e) {
}
AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
connection.getStateInspector().assertValid();
connection.close();
message.setMessageId("msg" + 1);
message.setMessageAnnotation("serialNo", 1);
message.setText("Test-Message");
try {
sender.send(message);
} catch (IOException e) {
}
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AmqpClient client = createAmqpClient(guestUser, guestPass);
client.setValidator(new AmqpValidator() {
@Override
public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
DeliveryState state = delivery.getRemoteState();
if (!delivery.remotelySettled()) {
markAsInvalid("delivery is not remotely settled");
}
if (state instanceof Rejected) {
Rejected rejected = (Rejected) state;
if (rejected.getError() == null || rejected.getError().getCondition() == null) {
markAsInvalid("Delivery should have been Rejected with an error condition");
} else {
ErrorCondition error = rejected.getError();
if (!error.getCondition().equals(AmqpError.UNAUTHORIZED_ACCESS)) {
markAsInvalid("Should have been tagged with unauthorized access error");
}
}
} else {
markAsInvalid("Delivery should have been Rejected");
}
latch.countDown();
}
});
AmqpConnection connection = client.connect();
try {
@ -147,6 +202,83 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
} finally {
sender.close();
}
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testReceiverNotAuthorized() throws Exception {
AmqpClient client = createAmqpClient(noprivUser, noprivPass);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Receiver receiver) {
ErrorCondition condition = receiver.getRemoteCondition();
if (condition != null && condition.getCondition() != null) {
if (!condition.getCondition().equals(AmqpError.UNAUTHORIZED_ACCESS)) {
markAsInvalid("Should have been tagged with unauthorized access error");
}
} else {
markAsInvalid("Receiver should have been opened with an error");
}
}
});
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
try {
session.createReceiver(getQueueName());
fail("Should not be able to consume here.");
} catch (Exception ex) {
IntegrationTestLogger.LOGGER.info("Caught expected exception");
}
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testConsumerNotAuthorizedToCreateQueues() throws Exception {
AmqpClient client = createAmqpClient(noprivUser, noprivPass);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Sender sender) {
ErrorCondition condition = sender.getRemoteCondition();
if (condition != null && condition.getCondition() != null) {
if (!condition.getCondition().equals(AmqpError.UNAUTHORIZED_ACCESS)) {
markAsInvalid("Should have been tagged with unauthorized access error");
}
} else {
markAsInvalid("Sender should have been opened with an error");
}
}
});
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
try {
session.createReceiver(getQueueName(getPrecreatedQueueSize() + 1));
fail("Should not be able to consume here.");
} catch (Exception ex) {
IntegrationTestLogger.LOGGER.info("Caught expected exception");
}
connection.getStateInspector().assertValid();
} finally {
connection.close();
}

View File

@ -22,6 +22,7 @@ import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
@ -37,34 +38,49 @@ public class JMSConnectionWithSecurityTest extends JMSClientTestSupport {
@Test(timeout = 10000)
public void testNoUserOrPassword() throws Exception {
Connection connection = null;
try {
Connection connection = createConnection("", "", null, false);
connection = createConnection("", "", null, false);
connection.start();
fail("Expected JMSException");
} catch (JMSSecurityException ex) {
IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with no user / password.");
} finally {
if (connection != null) {
connection.close();
}
}
}
@Test(timeout = 10000)
public void testUnknownUser() throws Exception {
Connection connection = null;
try {
Connection connection = createConnection("nosuchuser", "blah", null, false);
connection = createConnection("nosuchuser", "blah", null, false);
connection.start();
fail("Expected JMSException");
} catch (JMSSecurityException ex) {
IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with unknown user ID");
} finally {
if (connection != null) {
connection.close();
}
}
}
@Test(timeout = 10000)
public void testKnownUserWrongPassword() throws Exception {
Connection connection = null;
try {
Connection connection = createConnection(fullUser, "wrongPassword", null, false);
connection = createConnection(fullUser, "wrongPassword", null, false);
connection.start();
fail("Expected JMSException");
} catch (JMSSecurityException ex) {
IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with incorrect password.");
} finally {
if (connection != null) {
connection.close();
}
}
}
@ -113,6 +129,102 @@ public class JMSConnectionWithSecurityTest extends JMSClientTestSupport {
}
}
@Test(timeout = 30000)
public void testConsumerNotAuthorized() throws Exception {
Connection connection = createConnection(noprivUser, noprivPass);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
try {
session.createConsumer(queue);
fail("Should not be able to consume here.");
} catch (JMSSecurityException jmsSE) {
IntegrationTestLogger.LOGGER.info("Caught expected exception");
}
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testBrowserNotAuthorized() throws Exception {
Connection connection = createConnection(noprivUser, noprivPass);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
try {
QueueBrowser browser = session.createBrowser(queue);
// Browser is not created until an enumeration is requesteda
browser.getEnumeration();
fail("Should not be able to consume here.");
} catch (JMSSecurityException jmsSE) {
IntegrationTestLogger.LOGGER.info("Caught expected exception");
}
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testConsumerNotAuthorizedToCreateQueues() throws Exception {
Connection connection = createConnection(noprivUser, noprivPass);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName(getPrecreatedQueueSize() + 1));
try {
session.createConsumer(queue);
fail("Should not be able to consume here.");
} catch (JMSSecurityException jmsSE) {
IntegrationTestLogger.LOGGER.info("Caught expected exception");
}
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testProducerNotAuthorized() throws Exception {
Connection connection = createConnection(guestUser, guestPass);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
try {
// TODO - This seems a bit odd, can attach but not send
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
fail("Should not be able to produce here.");
} catch (JMSSecurityException jmsSE) {
IntegrationTestLogger.LOGGER.info("Caught expected exception");
}
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testAnonymousProducerNotAuthorized() throws Exception {
Connection connection = createConnection(guestUser, guestPass);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(null);
try {
producer.send(queue, session.createTextMessage());
fail("Should not be able to produce here.");
} catch (JMSSecurityException jmsSE) {
IntegrationTestLogger.LOGGER.info("Caught expected exception");
}
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testCreateTemporaryQueueNotAuthorized() throws JMSException {
Connection connection = createConnection(guestUser, guestPass);
@ -123,7 +235,6 @@ public class JMSConnectionWithSecurityTest extends JMSClientTestSupport {
try {
session.createTemporaryQueue();
} catch (JMSSecurityException jmsse) {
} catch (JMSException jmse) {
IntegrationTestLogger.LOGGER.info("Client should have thrown a JMSSecurityException but only threw JMSException");
}
@ -144,7 +255,6 @@ public class JMSConnectionWithSecurityTest extends JMSClientTestSupport {
try {
session.createTemporaryTopic();
} catch (JMSSecurityException jmsse) {
} catch (JMSException jmse) {
IntegrationTestLogger.LOGGER.info("Client should have thrown a JMSSecurityException but only threw JMSException");
}