ARTEMIS-1132 return security errors instead of generic failed
When creating some AMQP resources (senders, receivers, etc) the broker can return an error of 'failed' instead of the security error that is expected in these cases. In the case of a receiver being created and a security error happening the broker fails to send back a response causing the client to hang waiting for an attach response.
This commit is contained in:
parent
4970d99fd0
commit
19a640db3a
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.protocol.amqp.exceptions;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||||
|
|
||||||
|
public class ActiveMQAMQPSecurityException extends ActiveMQAMQPException {
|
||||||
|
|
||||||
|
public ActiveMQAMQPSecurityException(String message, Throwable e) {
|
||||||
|
super(AmqpError.UNAUTHORIZED_ACCESS, message, e, ActiveMQExceptionType.SECURITY_EXCEPTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ActiveMQAMQPSecurityException(String message) {
|
||||||
|
super(AmqpError.UNAUTHORIZED_ACCESS, message, ActiveMQExceptionType.SECURITY_EXCEPTION);
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalS
|
||||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
|
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidFieldException;
|
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidFieldException;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
|
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPSecurityException;
|
||||||
import org.jboss.logging.Messages;
|
import org.jboss.logging.Messages;
|
||||||
import org.jboss.logging.annotations.Message;
|
import org.jboss.logging.annotations.Message;
|
||||||
import org.jboss.logging.annotations.MessageBundle;
|
import org.jboss.logging.annotations.MessageBundle;
|
||||||
|
@ -77,4 +78,10 @@ public interface ActiveMQAMQPProtocolMessageBundle {
|
||||||
@Message(id = 219014, value = "Transaction not found: xid={0}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 219014, value = "Transaction not found: xid={0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
ActiveMQAMQPIllegalStateException txNotFound(String xidToString);
|
ActiveMQAMQPIllegalStateException txNotFound(String xidToString);
|
||||||
|
|
||||||
|
@Message(id = 219015, value = "not authorized to create consumer, {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
ActiveMQAMQPSecurityException securityErrorCreatingConsumer(String message);
|
||||||
|
|
||||||
|
@Message(id = 219016, value = "not authorized to create temporary destination, {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
ActiveMQAMQPSecurityException securityErrorCreatingTempDestination(String message);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
|
||||||
|
@ -30,6 +31,7 @@ import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||||
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
||||||
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
|
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
|
||||||
|
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||||
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
|
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
|
||||||
import org.apache.qpid.proton.engine.Delivery;
|
import org.apache.qpid.proton.engine.Delivery;
|
||||||
|
@ -96,6 +98,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
||||||
|
|
||||||
try {
|
try {
|
||||||
sessionSPI.createTemporaryQueue(address, getRoutingType(target.getCapabilities()));
|
sessionSPI.createTemporaryQueue(address, getRoutingType(target.getCapabilities()));
|
||||||
|
} catch (ActiveMQSecurityException e) {
|
||||||
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(e.getMessage());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
|
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
@ -160,10 +164,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
||||||
if (delivery.isPartial()) {
|
if (delivery.isPartial()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
receiver = ((Receiver) delivery.getLink());
|
receiver = ((Receiver) delivery.getLink());
|
||||||
|
|
||||||
Transaction tx = null;
|
Transaction tx = null;
|
||||||
|
|
||||||
byte[] data;
|
byte[] data;
|
||||||
|
|
||||||
data = new byte[delivery.available()];
|
data = new byte[delivery.available()];
|
||||||
|
@ -171,7 +175,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
||||||
receiver.advance();
|
receiver.advance();
|
||||||
|
|
||||||
if (delivery.getRemoteState() instanceof TransactionalState) {
|
if (delivery.getRemoteState() instanceof TransactionalState) {
|
||||||
|
|
||||||
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
|
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
|
||||||
tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
|
tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
|
||||||
}
|
}
|
||||||
|
@ -183,7 +186,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
|
||||||
log.warn(e.getMessage(), e);
|
log.warn(e.getMessage(), e);
|
||||||
Rejected rejected = new Rejected();
|
Rejected rejected = new Rejected();
|
||||||
ErrorCondition condition = new ErrorCondition();
|
ErrorCondition condition = new ErrorCondition();
|
||||||
condition.setCondition(Symbol.valueOf("failed"));
|
|
||||||
|
if (e instanceof ActiveMQSecurityException) {
|
||||||
|
condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
|
||||||
|
} else {
|
||||||
|
condition.setCondition(Symbol.valueOf("failed"));
|
||||||
|
}
|
||||||
|
|
||||||
condition.setDescription(e.getMessage());
|
condition.setDescription(e.getMessage());
|
||||||
rejected.setError(condition);
|
rejected.setError(condition);
|
||||||
connection.lock();
|
connection.lock();
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
@ -262,7 +263,17 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
|
boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
|
||||||
if (clientDefined) {
|
if (clientDefined) {
|
||||||
multicast = hasCapabilities(TOPIC, source);
|
multicast = hasCapabilities(TOPIC, source);
|
||||||
AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
|
AddressQueryResult addressQueryResult = null;
|
||||||
|
try {
|
||||||
|
addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
|
||||||
|
} catch (ActiveMQSecurityException e) {
|
||||||
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
|
||||||
|
} catch (ActiveMQAMQPException e) {
|
||||||
|
throw e;
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
|
||||||
if (!addressQueryResult.isExists()) {
|
if (!addressQueryResult.isExists()) {
|
||||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
|
||||||
}
|
}
|
||||||
|
@ -276,8 +287,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support");
|
throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
//if not we look up the address
|
// if not we look up the address
|
||||||
AddressQueryResult addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
|
AddressQueryResult addressQueryResult = null;
|
||||||
|
try {
|
||||||
|
addressQueryResult = sessionSPI.addressQuery(addressToUse.toString(), defaultRoutingType, true);
|
||||||
|
} catch (ActiveMQSecurityException e) {
|
||||||
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
|
||||||
|
} catch (ActiveMQAMQPException e) {
|
||||||
|
throw e;
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
|
||||||
if (!addressQueryResult.isExists()) {
|
if (!addressQueryResult.isExists()) {
|
||||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
|
||||||
}
|
}
|
||||||
|
@ -407,6 +428,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
|
||||||
brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
|
brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
|
||||||
} catch (ActiveMQAMQPResourceLimitExceededException e1) {
|
} catch (ActiveMQAMQPResourceLimitExceededException e1) {
|
||||||
throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
|
throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
|
||||||
|
} catch (ActiveMQSecurityException e) {
|
||||||
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
|
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,12 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
||||||
|
|
||||||
protected static final String BROKER_NAME = "localhost";
|
protected static final String BROKER_NAME = "localhost";
|
||||||
|
|
||||||
|
protected String noprivUser = "noprivs";
|
||||||
|
protected String noprivPass = "noprivs";
|
||||||
|
|
||||||
|
protected String browseUser = "browser";
|
||||||
|
protected String browsePass = "browser";
|
||||||
|
|
||||||
protected String guestUser = "guest";
|
protected String guestUser = "guest";
|
||||||
protected String guestPass = "guest";
|
protected String guestPass = "guest";
|
||||||
|
|
||||||
|
@ -220,6 +226,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
||||||
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
|
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
|
||||||
|
|
||||||
// User additions
|
// User additions
|
||||||
|
securityManager.getConfiguration().addUser(noprivUser, noprivPass);
|
||||||
|
securityManager.getConfiguration().addRole(noprivUser, "nothing");
|
||||||
|
securityManager.getConfiguration().addUser(browseUser, browsePass);
|
||||||
|
securityManager.getConfiguration().addRole(browseUser, "browser");
|
||||||
securityManager.getConfiguration().addUser(guestUser, guestPass);
|
securityManager.getConfiguration().addUser(guestUser, guestPass);
|
||||||
securityManager.getConfiguration().addRole(guestUser, "guest");
|
securityManager.getConfiguration().addRole(guestUser, "guest");
|
||||||
securityManager.getConfiguration().addUser(fullUser, fullPass);
|
securityManager.getConfiguration().addUser(fullUser, fullPass);
|
||||||
|
@ -228,7 +238,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
|
||||||
// Configure roles
|
// Configure roles
|
||||||
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
|
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
|
||||||
HashSet<Role> value = new HashSet<>();
|
HashSet<Role> value = new HashSet<>();
|
||||||
value.add(new Role("guest", false, true, true, true, true, true, true, true));
|
value.add(new Role("nothing", false, false, false, false, false, false, false, false));
|
||||||
|
value.add(new Role("browser", false, false, false, false, false, false, false, true));
|
||||||
|
value.add(new Role("guest", false, true, false, false, false, false, false, true));
|
||||||
value.add(new Role("full", true, true, true, true, true, true, true, true));
|
value.add(new Role("full", true, true, true, true, true, true, true, true));
|
||||||
securityRepository.addMatch(getQueueName(), value);
|
securityRepository.addMatch(getQueueName(), value);
|
||||||
|
|
||||||
|
|
|
@ -20,13 +20,19 @@ import java.io.IOException;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
import org.apache.activemq.transport.amqp.client.AmqpValidator;
|
import org.apache.activemq.transport.amqp.client.AmqpValidator;
|
||||||
|
import org.apache.qpid.proton.amqp.messaging.Rejected;
|
||||||
|
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||||
|
import org.apache.qpid.proton.amqp.transport.DeliveryState;
|
||||||
|
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||||
import org.apache.qpid.proton.engine.Delivery;
|
import org.apache.qpid.proton.engine.Delivery;
|
||||||
|
import org.apache.qpid.proton.engine.Receiver;
|
||||||
import org.apache.qpid.proton.engine.Sender;
|
import org.apache.qpid.proton.engine.Sender;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -96,10 +102,26 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
|
public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
|
||||||
|
DeliveryState state = delivery.getRemoteState();
|
||||||
|
|
||||||
if (!delivery.remotelySettled()) {
|
if (!delivery.remotelySettled()) {
|
||||||
markAsInvalid("delivery is not remotely settled");
|
markAsInvalid("delivery is not remotely settled");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (state instanceof Rejected) {
|
||||||
|
Rejected rejected = (Rejected) state;
|
||||||
|
if (rejected.getError() == null || rejected.getError().getCondition() == null) {
|
||||||
|
markAsInvalid("Delivery should have been Rejected with an error condition");
|
||||||
|
} else {
|
||||||
|
ErrorCondition error = rejected.getError();
|
||||||
|
if (!error.getCondition().equals(AmqpError.UNAUTHORIZED_ACCESS)) {
|
||||||
|
markAsInvalid("Should have been tagged with unauthorized access error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
markAsInvalid("Delivery should have been Rejected");
|
||||||
|
}
|
||||||
|
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -107,26 +129,59 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
|
||||||
AmqpConnection connection = addConnection(client.connect());
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
AmqpSession session = connection.createSession();
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
AmqpSender sender = session.createSender(getQueueName());
|
|
||||||
AmqpMessage message = new AmqpMessage();
|
|
||||||
|
|
||||||
message.setMessageId("msg" + 1);
|
|
||||||
message.setMessageAnnotation("serialNo", 1);
|
|
||||||
message.setText("Test-Message");
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
sender.send(message);
|
AmqpSender sender = session.createSender(getQueueName());
|
||||||
} catch (IOException e) {
|
AmqpMessage message = new AmqpMessage();
|
||||||
}
|
|
||||||
|
|
||||||
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
|
message.setMessageId("msg" + 1);
|
||||||
connection.getStateInspector().assertValid();
|
message.setMessageAnnotation("serialNo", 1);
|
||||||
connection.close();
|
message.setText("Test-Message");
|
||||||
|
|
||||||
|
try {
|
||||||
|
sender.send(message);
|
||||||
|
} catch (IOException e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
|
||||||
|
connection.getStateInspector().assertValid();
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception {
|
public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception {
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
AmqpClient client = createAmqpClient(guestUser, guestPass);
|
AmqpClient client = createAmqpClient(guestUser, guestPass);
|
||||||
|
client.setValidator(new AmqpValidator() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
|
||||||
|
DeliveryState state = delivery.getRemoteState();
|
||||||
|
|
||||||
|
if (!delivery.remotelySettled()) {
|
||||||
|
markAsInvalid("delivery is not remotely settled");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (state instanceof Rejected) {
|
||||||
|
Rejected rejected = (Rejected) state;
|
||||||
|
if (rejected.getError() == null || rejected.getError().getCondition() == null) {
|
||||||
|
markAsInvalid("Delivery should have been Rejected with an error condition");
|
||||||
|
} else {
|
||||||
|
ErrorCondition error = rejected.getError();
|
||||||
|
if (!error.getCondition().equals(AmqpError.UNAUTHORIZED_ACCESS)) {
|
||||||
|
markAsInvalid("Should have been tagged with unauthorized access error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
markAsInvalid("Delivery should have been Rejected");
|
||||||
|
}
|
||||||
|
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
AmqpConnection connection = client.connect();
|
AmqpConnection connection = client.connect();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -147,6 +202,83 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
|
||||||
} finally {
|
} finally {
|
||||||
sender.close();
|
sender.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
|
||||||
|
connection.getStateInspector().assertValid();
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testReceiverNotAuthorized() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient(noprivUser, noprivPass);
|
||||||
|
client.setValidator(new AmqpValidator() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void inspectOpenedResource(Receiver receiver) {
|
||||||
|
ErrorCondition condition = receiver.getRemoteCondition();
|
||||||
|
|
||||||
|
if (condition != null && condition.getCondition() != null) {
|
||||||
|
if (!condition.getCondition().equals(AmqpError.UNAUTHORIZED_ACCESS)) {
|
||||||
|
markAsInvalid("Should have been tagged with unauthorized access error");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
markAsInvalid("Receiver should have been opened with an error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AmqpConnection connection = client.connect();
|
||||||
|
|
||||||
|
try {
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
try {
|
||||||
|
session.createReceiver(getQueueName());
|
||||||
|
fail("Should not be able to consume here.");
|
||||||
|
} catch (Exception ex) {
|
||||||
|
IntegrationTestLogger.LOGGER.info("Caught expected exception");
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.getStateInspector().assertValid();
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testConsumerNotAuthorizedToCreateQueues() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient(noprivUser, noprivPass);
|
||||||
|
client.setValidator(new AmqpValidator() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void inspectOpenedResource(Sender sender) {
|
||||||
|
ErrorCondition condition = sender.getRemoteCondition();
|
||||||
|
|
||||||
|
if (condition != null && condition.getCondition() != null) {
|
||||||
|
if (!condition.getCondition().equals(AmqpError.UNAUTHORIZED_ACCESS)) {
|
||||||
|
markAsInvalid("Should have been tagged with unauthorized access error");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
markAsInvalid("Sender should have been opened with an error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
AmqpConnection connection = client.connect();
|
||||||
|
|
||||||
|
try {
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
try {
|
||||||
|
session.createReceiver(getQueueName(getPrecreatedQueueSize() + 1));
|
||||||
|
fail("Should not be able to consume here.");
|
||||||
|
} catch (Exception ex) {
|
||||||
|
IntegrationTestLogger.LOGGER.info("Caught expected exception");
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.getStateInspector().assertValid();
|
||||||
} finally {
|
} finally {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import javax.jms.JMSSecurityException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.QueueBrowser;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
@ -37,34 +38,49 @@ public class JMSConnectionWithSecurityTest extends JMSClientTestSupport {
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testNoUserOrPassword() throws Exception {
|
public void testNoUserOrPassword() throws Exception {
|
||||||
|
Connection connection = null;
|
||||||
try {
|
try {
|
||||||
Connection connection = createConnection("", "", null, false);
|
connection = createConnection("", "", null, false);
|
||||||
connection.start();
|
connection.start();
|
||||||
fail("Expected JMSException");
|
fail("Expected JMSException");
|
||||||
} catch (JMSSecurityException ex) {
|
} catch (JMSSecurityException ex) {
|
||||||
IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with no user / password.");
|
IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with no user / password.");
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testUnknownUser() throws Exception {
|
public void testUnknownUser() throws Exception {
|
||||||
|
Connection connection = null;
|
||||||
try {
|
try {
|
||||||
Connection connection = createConnection("nosuchuser", "blah", null, false);
|
connection = createConnection("nosuchuser", "blah", null, false);
|
||||||
connection.start();
|
connection.start();
|
||||||
fail("Expected JMSException");
|
fail("Expected JMSException");
|
||||||
} catch (JMSSecurityException ex) {
|
} catch (JMSSecurityException ex) {
|
||||||
IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with unknown user ID");
|
IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with unknown user ID");
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
@Test(timeout = 10000)
|
||||||
public void testKnownUserWrongPassword() throws Exception {
|
public void testKnownUserWrongPassword() throws Exception {
|
||||||
|
Connection connection = null;
|
||||||
try {
|
try {
|
||||||
Connection connection = createConnection(fullUser, "wrongPassword", null, false);
|
connection = createConnection(fullUser, "wrongPassword", null, false);
|
||||||
connection.start();
|
connection.start();
|
||||||
fail("Expected JMSException");
|
fail("Expected JMSException");
|
||||||
} catch (JMSSecurityException ex) {
|
} catch (JMSSecurityException ex) {
|
||||||
IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with incorrect password.");
|
IntegrationTestLogger.LOGGER.debug("Failed to authenticate connection with incorrect password.");
|
||||||
|
} finally {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,6 +129,102 @@ public class JMSConnectionWithSecurityTest extends JMSClientTestSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testConsumerNotAuthorized() throws Exception {
|
||||||
|
Connection connection = createConnection(noprivUser, noprivPass);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
javax.jms.Queue queue = session.createQueue(getQueueName());
|
||||||
|
try {
|
||||||
|
session.createConsumer(queue);
|
||||||
|
fail("Should not be able to consume here.");
|
||||||
|
} catch (JMSSecurityException jmsSE) {
|
||||||
|
IntegrationTestLogger.LOGGER.info("Caught expected exception");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testBrowserNotAuthorized() throws Exception {
|
||||||
|
Connection connection = createConnection(noprivUser, noprivPass);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
javax.jms.Queue queue = session.createQueue(getQueueName());
|
||||||
|
try {
|
||||||
|
QueueBrowser browser = session.createBrowser(queue);
|
||||||
|
// Browser is not created until an enumeration is requesteda
|
||||||
|
browser.getEnumeration();
|
||||||
|
fail("Should not be able to consume here.");
|
||||||
|
} catch (JMSSecurityException jmsSE) {
|
||||||
|
IntegrationTestLogger.LOGGER.info("Caught expected exception");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testConsumerNotAuthorizedToCreateQueues() throws Exception {
|
||||||
|
Connection connection = createConnection(noprivUser, noprivPass);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
javax.jms.Queue queue = session.createQueue(getQueueName(getPrecreatedQueueSize() + 1));
|
||||||
|
try {
|
||||||
|
session.createConsumer(queue);
|
||||||
|
fail("Should not be able to consume here.");
|
||||||
|
} catch (JMSSecurityException jmsSE) {
|
||||||
|
IntegrationTestLogger.LOGGER.info("Caught expected exception");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testProducerNotAuthorized() throws Exception {
|
||||||
|
Connection connection = createConnection(guestUser, guestPass);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
javax.jms.Queue queue = session.createQueue(getQueueName());
|
||||||
|
try {
|
||||||
|
// TODO - This seems a bit odd, can attach but not send
|
||||||
|
MessageProducer producer = session.createProducer(queue);
|
||||||
|
producer.send(session.createMessage());
|
||||||
|
fail("Should not be able to produce here.");
|
||||||
|
} catch (JMSSecurityException jmsSE) {
|
||||||
|
IntegrationTestLogger.LOGGER.info("Caught expected exception");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testAnonymousProducerNotAuthorized() throws Exception {
|
||||||
|
Connection connection = createConnection(guestUser, guestPass);
|
||||||
|
|
||||||
|
try {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
javax.jms.Queue queue = session.createQueue(getQueueName());
|
||||||
|
MessageProducer producer = session.createProducer(null);
|
||||||
|
|
||||||
|
try {
|
||||||
|
producer.send(queue, session.createTextMessage());
|
||||||
|
fail("Should not be able to produce here.");
|
||||||
|
} catch (JMSSecurityException jmsSE) {
|
||||||
|
IntegrationTestLogger.LOGGER.info("Caught expected exception");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 30000)
|
@Test(timeout = 30000)
|
||||||
public void testCreateTemporaryQueueNotAuthorized() throws JMSException {
|
public void testCreateTemporaryQueueNotAuthorized() throws JMSException {
|
||||||
Connection connection = createConnection(guestUser, guestPass);
|
Connection connection = createConnection(guestUser, guestPass);
|
||||||
|
@ -123,7 +235,6 @@ public class JMSConnectionWithSecurityTest extends JMSClientTestSupport {
|
||||||
try {
|
try {
|
||||||
session.createTemporaryQueue();
|
session.createTemporaryQueue();
|
||||||
} catch (JMSSecurityException jmsse) {
|
} catch (JMSSecurityException jmsse) {
|
||||||
} catch (JMSException jmse) {
|
|
||||||
IntegrationTestLogger.LOGGER.info("Client should have thrown a JMSSecurityException but only threw JMSException");
|
IntegrationTestLogger.LOGGER.info("Client should have thrown a JMSSecurityException but only threw JMSException");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,7 +255,6 @@ public class JMSConnectionWithSecurityTest extends JMSClientTestSupport {
|
||||||
try {
|
try {
|
||||||
session.createTemporaryTopic();
|
session.createTemporaryTopic();
|
||||||
} catch (JMSSecurityException jmsse) {
|
} catch (JMSSecurityException jmsse) {
|
||||||
} catch (JMSException jmse) {
|
|
||||||
IntegrationTestLogger.LOGGER.info("Client should have thrown a JMSSecurityException but only threw JMSException");
|
IntegrationTestLogger.LOGGER.info("Client should have thrown a JMSSecurityException but only threw JMSException");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue