ARTEMIS-2352 - Add the ability to reject messages without a validated user set
https://issues.apache.org/jira/browse/ARTEMIS-2352
This commit is contained in:
parent
500aa09360
commit
959c38bd8d
|
@ -421,6 +421,9 @@ public final class ActiveMQDefaultConfiguration {
|
|||
// Will the broker populate the message with the name of the validated user
|
||||
private static boolean DEFAULT_POPULATE_VALIDATED_USER = false;
|
||||
|
||||
// Will the broker allow messages with no validated user
|
||||
private static boolean DEFAULT_REJECT_EMPTY_VALIDATED_USER = false;
|
||||
|
||||
// its possible that you only want a server to partake in scale down as a receiver, via a group. In this case set scale-down to false
|
||||
private static boolean DEFAULT_SCALE_DOWN_ENABLED = true;
|
||||
|
||||
|
@ -1252,6 +1255,10 @@ public final class ActiveMQDefaultConfiguration {
|
|||
return DEFAULT_POPULATE_VALIDATED_USER;
|
||||
}
|
||||
|
||||
public static boolean isDefaultRejectEmptyValidatedUser() {
|
||||
return DEFAULT_REJECT_EMPTY_VALIDATED_USER;
|
||||
}
|
||||
|
||||
/**
|
||||
* its possible that you only want a server to partake in scale down as a receiver, via a group. In this case set scale-down to false
|
||||
*/
|
||||
|
|
|
@ -1090,6 +1090,10 @@ public interface Configuration {
|
|||
|
||||
Configuration setPopulateValidatedUser(boolean populateValidatedUser);
|
||||
|
||||
boolean isRejectEmptyValidatedUser();
|
||||
|
||||
Configuration setRejectEmptyValidatedUser(boolean rejectEmptyValidatedUser);
|
||||
|
||||
/**
|
||||
* It will return all the connectors in a toString manner for debug purposes.
|
||||
*/
|
||||
|
|
|
@ -289,6 +289,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
|
||||
protected boolean populateValidatedUser = ActiveMQDefaultConfiguration.isDefaultPopulateValidatedUser();
|
||||
|
||||
protected boolean rejectEmptyValidatedUser = ActiveMQDefaultConfiguration.isDefaultRejectEmptyValidatedUser();
|
||||
|
||||
private long connectionTtlCheckInterval = ActiveMQDefaultConfiguration.getDefaultConnectionTtlCheckInterval();
|
||||
|
||||
private URL configurationUrl;
|
||||
|
@ -1734,6 +1736,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRejectEmptyValidatedUser() {
|
||||
return rejectEmptyValidatedUser;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration setRejectEmptyValidatedUser(boolean rejectEmptyValidatedUser) {
|
||||
this.rejectEmptyValidatedUser = rejectEmptyValidatedUser;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getConnectionTtlCheckInterval() {
|
||||
return connectionTtlCheckInterval;
|
||||
|
|
|
@ -373,6 +373,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
config.setPopulateValidatedUser(getBoolean(e, "populate-validated-user", config.isPopulateValidatedUser()));
|
||||
|
||||
config.setRejectEmptyValidatedUser(getBoolean(e, "reject-empty-validated-user", config.isRejectEmptyValidatedUser()));
|
||||
|
||||
config.setConnectionTtlCheckInterval(getLong(e, "connection-ttl-check-interval", config.getConnectionTtlCheckInterval(), Validators.GT_ZERO));
|
||||
|
||||
config.setConfigurationFileRefreshPeriod(getLong(e, "configuration-file-refresh-period", config.getConfigurationFileRefreshPeriod(), Validators.GT_ZERO));
|
||||
|
|
|
@ -470,4 +470,7 @@ public interface ActiveMQMessageBundle {
|
|||
|
||||
@Message(id = 229224, value = "User {0} does not exist", format = Message.Format.MESSAGE_FORMAT)
|
||||
IllegalArgumentException userDoesNotExist(String user);
|
||||
|
||||
@Message(id = 229225, value = "Validated User is not set", format = Message.Format.MESSAGE_FORMAT)
|
||||
ActiveMQIllegalStateException rejectEmptyValidatedUser();
|
||||
}
|
||||
|
|
|
@ -2023,6 +2023,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
msg.setValidatedUserID(validatedUser);
|
||||
}
|
||||
|
||||
if (server.getConfiguration().isRejectEmptyValidatedUser() && msg.getValidatedUserID() == null) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.rejectEmptyValidatedUser();
|
||||
}
|
||||
|
||||
if (tx == null || autoCommitSends) {
|
||||
routingContext.setTransaction(null);
|
||||
} else {
|
||||
|
|
|
@ -396,6 +396,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="reject-empty-validated-user" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
true means that the server will not allow any message that doesn't have a validated user, in JMS this is JMSXUserID
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="connectors" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
|
@ -139,6 +139,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
Assert.assertEquals(true, conf.isGracefulShutdownEnabled());
|
||||
Assert.assertEquals(12345, conf.getGracefulShutdownTimeout());
|
||||
Assert.assertEquals(true, conf.isPopulateValidatedUser());
|
||||
Assert.assertEquals(false, conf.isRejectEmptyValidatedUser());
|
||||
Assert.assertEquals(98765, conf.getConnectionTtlCheckInterval());
|
||||
Assert.assertEquals(1234567, conf.getConfigurationFileRefreshPeriod());
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@
|
|||
<journal-datasync>false</journal-datasync>
|
||||
<persist-id-cache>true</persist-id-cache>
|
||||
<populate-validated-user>true</populate-validated-user>
|
||||
<reject-empty-validated-user>false</reject-empty-validated-user>
|
||||
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
|
||||
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
|
||||
<global-max-size>1234567</global-max-size>
|
||||
|
|
|
@ -54,6 +54,7 @@
|
|||
<journal-datasync>false</journal-datasync>
|
||||
<persist-id-cache>true</persist-id-cache>
|
||||
<populate-validated-user>true</populate-validated-user>
|
||||
<reject-empty-validated-user>false</reject-empty-validated-user>
|
||||
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
|
||||
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
|
||||
<global-max-size>1234567</global-max-size>
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.security;
|
||||
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import java.util.Map;
|
||||
|
||||
public class JMSXUserIDPluginTest extends ActiveMQTestBase {
|
||||
|
||||
private ActiveMQServer server;
|
||||
private SimpleString ADDRESS = new SimpleString("TestQueue");
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultNettyConfig(), true));
|
||||
|
||||
JMSXUserIDPlugin plugin = new JMSXUserIDPlugin();
|
||||
plugin.setPopulateValidatedUser("testuser");
|
||||
|
||||
server.registerBrokerPlugin(plugin);
|
||||
server.start();
|
||||
server.createQueue(ADDRESS, RoutingType.ANYCAST, ADDRESS, null, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddValidatedUserCore() throws Exception {
|
||||
ServerLocator locator = createNettyNonHALocator();
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
ClientProducer producer = session.createProducer(ADDRESS.toString());
|
||||
producer.send(session.createMessage(true));
|
||||
ClientConsumer consumer = session.createConsumer(ADDRESS.toString());
|
||||
session.start();
|
||||
ClientMessage clientMessage = consumer.receiveImmediate();
|
||||
Assert.assertNotNull(clientMessage);
|
||||
Assert.assertEquals(clientMessage.getValidatedUserID(), "testuser");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddValidatedUserAMQP() throws Exception {
|
||||
JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:61616");
|
||||
Connection connection = factory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue queue = session.createQueue(ADDRESS.toString());
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
producer.send(session.createMessage());
|
||||
connection.close();
|
||||
|
||||
server.stop();
|
||||
server.start();
|
||||
|
||||
connection = factory.createConnection();
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
connection.start();
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
Message message = consumer.receive(5000);
|
||||
Assert.assertNotNull(message);
|
||||
Assert.assertEquals(message.getStringProperty("_AMQ_VALIDATED_USER"), "testuser");
|
||||
connection.close();
|
||||
}
|
||||
|
||||
|
||||
private static class JMSXUserIDPlugin implements ActiveMQServerPlugin {
|
||||
|
||||
private static String POPULATE_VALIDATED_USER = "POPULATE_VALIDATED_USER";
|
||||
|
||||
private String populateValidatedUser;
|
||||
|
||||
/**
|
||||
* used to pass configured properties to Plugin
|
||||
*
|
||||
* @param properties
|
||||
*/
|
||||
@Override
|
||||
public void init(Map<String, String> properties) {
|
||||
populateValidatedUser = properties.getOrDefault(POPULATE_VALIDATED_USER, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeSend(ServerSession session, Transaction tx, org.apache.activemq.artemis.api.core.Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
|
||||
if (populateValidatedUser != null && !message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
|
||||
message.messageChanged();
|
||||
message.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER, populateValidatedUser);
|
||||
}
|
||||
}
|
||||
|
||||
public String getPopulateValidatedUser() {
|
||||
return populateValidatedUser;
|
||||
}
|
||||
|
||||
public void setPopulateValidatedUser(String populateValidatedUser) {
|
||||
this.populateValidatedUser = populateValidatedUser;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.security;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
|
||||
public class RejectValidatedUserTest extends ActiveMQTestBase {
|
||||
|
||||
private static final String ADDRESS = "TestQueue";
|
||||
private ActiveMQServer server;
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
|
||||
server.getConfiguration().setRejectEmptyValidatedUser(true);
|
||||
server.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRejectException() throws Exception {
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory sessionFactory = locator.createSessionFactory();
|
||||
ClientSession session = sessionFactory.createSession();
|
||||
ClientProducer producer = session.createProducer(ADDRESS);
|
||||
try {
|
||||
producer.send(session.createMessage(true));
|
||||
Assert.fail("Should throw exception");
|
||||
} catch (ActiveMQIllegalStateException e) {
|
||||
//pass
|
||||
}
|
||||
locator.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAcceptException() throws Exception {
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory sessionFactory = locator.createSessionFactory();
|
||||
ClientSession session = sessionFactory.createSession();
|
||||
ClientProducer producer = session.createProducer(ADDRESS);
|
||||
ClientMessage message = session.createMessage(true);
|
||||
message.setValidatedUserID("testuser");
|
||||
producer.send(message);
|
||||
locator.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAcceptJMSException() throws Exception {
|
||||
ActiveMQConnectionFactory connectionFactory = ActiveMQJMSClient.createConnectionFactory("vm://0", "0");
|
||||
Connection connection = connectionFactory.createConnection();
|
||||
Session session = connection.createSession();
|
||||
Queue queue = session.createQueue(ADDRESS.toString());
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty("JMSXUserID", "testuser");
|
||||
producer.send(message);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue