https://issues.apache.org/jira/browse/AMQ-5141 - send messages to dlq using broker security context

This commit is contained in:
Dejan Bosanac 2014-04-14 17:03:54 +02:00
parent 81141b03e2
commit 7646526c0a
4 changed files with 141 additions and 9 deletions

View File

@ -742,10 +742,11 @@ public class RegionBroker extends EmptyBroker {
// it is only populated if the message is routed to
// another destination like the DLQ
ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription);
if (context.getBroker() == null) {
context.setBroker(getRoot());
ConnectionContext adminContext = context;
if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) {
adminContext = BrokerSupport.getConnectionContext(this);
}
BrokerSupport.resendNoCopy(context, message, deadLetterDestination);
BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination);
return true;
}
} else {

View File

@ -111,7 +111,7 @@ public abstract class DeadLetterTestSupport extends TestSupport {
}
}
protected void makeDlqConsumer() throws JMSException {
protected void makeDlqConsumer() throws Exception {
dlqDestination = createDlqDestination();
LOG.info("Consuming from dead letter on: " + dlqDestination);

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.policy;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.security.*;
import javax.jms.*;
import static org.apache.activemq.security.SimpleSecurityBrokerSystemTest.*;
public class SecureDLQTest extends DeadLetterTestSupport {
Connection dlqConnection;
Session dlqSession;
public static AuthorizationMap createAuthorizationMap() {
DestinationMap readAccess = new DefaultAuthorizationMap();
readAccess.put(new ActiveMQQueue("TEST"), ADMINS);
readAccess.put(new ActiveMQQueue("TEST"), USERS);
readAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS);
DestinationMap writeAccess = new DefaultAuthorizationMap();
writeAccess.put(new ActiveMQQueue("TEST"), ADMINS);
writeAccess.put(new ActiveMQQueue("TEST"), USERS);
writeAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS);
readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
DestinationMap adminAccess = new DefaultAuthorizationMap();
adminAccess.put(new ActiveMQQueue("TEST"), ADMINS);
adminAccess.put(new ActiveMQQueue("TEST"), USERS);
adminAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS);
adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
return new SimpleAuthorizationMap(writeAccess, readAccess, adminAccess);
}
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(createAuthorizationMap());
broker.setPlugins(new BrokerPlugin[] {authorizationPlugin, new SimpleSecurityBrokerSystemTest.SimpleAuthenticationFactory()});
return broker;
}
// lets disable the inapplicable tests
public void testTransientTopicMessage() throws Exception {
}
public void testDurableTopicMessage() throws Exception {
}
@Override
protected void doTest() throws Exception {
timeToLive = 1000;
acknowledgeMode = Session.CLIENT_ACKNOWLEDGE;
makeConsumer();
sendMessages();
Thread.sleep(1000);
consumer.close();
Thread.sleep(1000);
// this should try to send expired messages to dlq
makeConsumer();
makeDlqConsumer();
for (int i = 0; i < messageCount; i++) {
Message msg = dlqConsumer.receive(1000);
assertMessage(msg, i);
assertNotNull("Should be a DLQ message for loop: " + i, msg);
}
}
@Override
public void tearDown() throws Exception {
super.tearDown();
if (dlqSession != null) {
dlqSession.close();
}
if (dlqConsumer != null) {
dlqConsumer.close();
}
}
@Override
protected Connection createConnection() throws Exception {
return getConnectionFactory().createConnection("user", "password");
}
@Override
protected void makeDlqConsumer() throws Exception {
dlqDestination = createDlqDestination();
dlqConnection = getConnectionFactory().createConnection("system", "manager");
dlqConnection.start();
dlqSession = dlqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
dlqConsumer = dlqSession.createConsumer(dlqDestination);
}
@Override
protected Destination createDlqDestination() {
return new ActiveMQQueue("ActiveMQ.DLQ");
}
@Override
protected String getDestinationString() {
return "TEST";
}
}

View File

@ -52,10 +52,10 @@ import javax.management.openmbean.CompositeData;
public class SimpleSecurityBrokerSystemTest extends SecurityTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(SimpleSecurityBrokerSystemTest.class);
static final GroupPrincipal GUESTS = new GroupPrincipal("guests");
static final GroupPrincipal USERS = new GroupPrincipal("users");
static final GroupPrincipal ADMINS = new GroupPrincipal("admins");
static Principal WILDCARD;
public static final GroupPrincipal GUESTS = new GroupPrincipal("guests");
public static final GroupPrincipal USERS = new GroupPrincipal("users");
public static final GroupPrincipal ADMINS = new GroupPrincipal("admins");
public static Principal WILDCARD;
static {
try {
WILDCARD = (Principal) DefaultAuthorizationMap.createGroupPrincipal("*", GroupPrincipal.class.getName());
@ -144,7 +144,7 @@ public class SimpleSecurityBrokerSystemTest extends SecurityTestSupport {
return new SimpleAuthorizationMap(writeAccess, readAccess, adminAccess);
}
static class SimpleAuthenticationFactory implements BrokerPlugin {
public static class SimpleAuthenticationFactory implements BrokerPlugin {
public Broker installPlugin(Broker broker) {
HashMap<String, String> u = new HashMap<String, String>();