ARTEMIS-2964 - fire advisory messages via post office, independent of connection state. consume via regular auth

This commit is contained in:
gtully 2020-10-28 12:23:06 +00:00
parent 4f8afd5dd1
commit e5566d5211
10 changed files with 332 additions and 49 deletions

View File

@ -50,6 +50,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -165,6 +166,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private final Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
private ConnectionState state;
private volatile boolean noLocal;
@ -177,8 +180,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
*/
private final Map<TransactionId, Transaction> txMap = new ConcurrentHashMap<>();
private volatile AMQSession advisorySession;
private final ActiveMQServer server;
/**
@ -711,14 +712,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
public void setAdvisorySession(AMQSession amqSession) {
this.advisorySession = amqSession;
}
public AMQSession getAdvisorySession() {
return this.advisorySession;
}
public AMQConnectionContext getContext() {
return this.context;
}
@ -1032,22 +1025,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public void addSessions(Set<SessionId> sessionSet) {
for (SessionId sid : sessionSet) {
addSession(getState().getSessionState(sid).getInfo(), true);
addSession(getState().getSessionState(sid).getInfo());
}
}
public AMQSession addSession(SessionInfo ss) {
return addSession(ss, false);
}
public AMQSession addSession(SessionInfo ss, boolean internal) {
AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager);
AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager, coreMessageObjectPools);
amqSession.initialize();
if (internal) {
amqSession.disableSecurity();
}
sessions.put(ss.getSessionId(), amqSession);
sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
return amqSession;
@ -1807,4 +1792,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return transportConnection.getLocalAddress();
}
public CoreMessageObjectPools getCoreMessageObjectPools() {
return coreMessageObjectPools;
}
}

View File

@ -40,9 +40,8 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -72,13 +71,11 @@ import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.DestinationPath;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;
@ -376,10 +373,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
String url = context.getConnection().getLocalAddress();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, context.getConnection().getLocalAddress());
// set the data structure
advisoryMessage.setDataStructure(command);
@ -390,19 +384,16 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
advisoryMessage.setDestination(topic);
advisoryMessage.setResponseRequired(false);
advisoryMessage.setProducerId(advisoryProducerId);
boolean originalFlowControl = context.isProducerFlowControl();
final AMQProducerBrokerExchange producerExchange = new AMQProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
try {
context.setProducerFlowControl(false);
AMQSession sess = context.getConnection().getAdvisorySession();
if (sess != null) {
sess.send(producerExchange.getProducerState().getInfo(), advisoryMessage, false);
}
} finally {
context.setProducerFlowControl(originalFlowControl);
}
advisoryMessage.setTimestamp(System.currentTimeMillis());
final CoreMessageObjectPools objectPools = context.getConnection().getCoreMessageObjectPools();
final org.apache.activemq.artemis.api.core.Message coreMessage = OpenWireMessageConverter.inbound(advisoryMessage, wireFormat, objectPools);
final SimpleString address = SimpleString.toSimpleString(topic.getPhysicalName(), objectPools.getAddressStringSimpleStringPool());
coreMessage.setAddress(address);
coreMessage.setRoutingType(RoutingType.MULTICAST);
// follow pattern from management notification to route directly
server.getPostOffice().route(coreMessage, false);
}
public String getBrokerName() {

View File

@ -90,7 +90,7 @@ public class AMQSession implements SessionCallback {
private final Runnable enableAutoReadAndTtl;
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
private final CoreMessageObjectPools coreMessageObjectPools;
private String[] existingQueuesCache;
@ -100,7 +100,7 @@ public class AMQSession implements SessionCallback {
SessionInfo sessInfo,
ActiveMQServer server,
OpenWireConnection connection,
OpenWireProtocolManager protocolManager) {
OpenWireProtocolManager protocolManager, CoreMessageObjectPools coreMessageObjectPools) {
this.connInfo = connInfo;
this.sessInfo = sessInfo;
this.clientId = SimpleString.toSimpleString(connInfo.getClientId());
@ -111,6 +111,7 @@ public class AMQSession implements SessionCallback {
this.protocolManagerWireFormat = protocolManager.wireFormat().copy();
this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
this.existingQueuesCache = null;
this.coreMessageObjectPools = coreMessageObjectPools;
}
public boolean isClosed() {
@ -132,11 +133,6 @@ public class AMQSession implements SessionCallback {
try {
coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes(), protocolManager.getSecurityDomain());
long sessionId = sessInfo.getSessionId().getValue();
if (sessionId == -1) {
this.connection.setAdvisorySession(this);
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.error("error init session", e);
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.openwire;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.tests.util.Wait;
@ -26,6 +27,8 @@ import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AdvisoryOpenWireTest extends BasicOpenWireTest {
@ -147,4 +150,37 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest {
}
}
@Test
public void testConnectionAdvisory() throws Exception {
final Connection[] connections = new Connection[20];
connections[0] = factory.createConnection();
connections[0].start();
final CountDownLatch numConnectionsCreatedViaAdvisoryNotificationsLatch = new CountDownLatch(19);
connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE)
.createConsumer(AdvisorySupport.getConnectionAdvisoryTopic()).setMessageListener(message -> numConnectionsCreatedViaAdvisoryNotificationsLatch.countDown());
try {
for (int i = 1; i < connections.length; i++) {
connections[i] = factory.createConnection();
connections[i].start();
}
Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
session.close();
assertTrue("Got all the advisories on time", numConnectionsCreatedViaAdvisoryNotificationsLatch.await(5, TimeUnit.SECONDS));
} finally {
for (Connection conn : connections) {
if (conn != null) {
conn.close();
}
}
}
}
}

View File

@ -16,14 +16,30 @@
*/
package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import java.io.Serializable;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Assert;
@ -173,7 +189,6 @@ public class BasicSecurityTest extends BasicOpenWireTest {
Connection conn1 = null;
Connection conn2 = null;
//Sender
try {
conn1 = factory.createConnection("openwireGuest", "GuEsT");
conn1.start();
@ -205,4 +220,233 @@ public class BasicSecurityTest extends BasicOpenWireTest {
}
}
@Test
public void testConnectionConsumer() throws Exception {
Connection conn1 = null;
try {
conn1 = factory.createConnection("openwireGuest", "GuEsT");
conn1.start();
try {
Destination dest = new ActiveMQQueue(queueName);
conn1.createConnectionConsumer(dest, null, new ServerSessionPool() {
@Override
public ServerSession getServerSession() throws JMSException {
return new ServerSession() {
@Override
public Session getSession() throws JMSException {
return new Session() {
@Override
public BytesMessage createBytesMessage() throws JMSException {
return null;
}
@Override
public MapMessage createMapMessage() throws JMSException {
return null;
}
@Override
public Message createMessage() throws JMSException {
return null;
}
@Override
public ObjectMessage createObjectMessage() throws JMSException {
return null;
}
@Override
public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
return null;
}
@Override
public StreamMessage createStreamMessage() throws JMSException {
return null;
}
@Override
public TextMessage createTextMessage() throws JMSException {
return null;
}
@Override
public TextMessage createTextMessage(String text) throws JMSException {
return null;
}
@Override
public boolean getTransacted() throws JMSException {
return false;
}
@Override
public int getAcknowledgeMode() throws JMSException {
return 0;
}
@Override
public void commit() throws JMSException {
}
@Override
public void rollback() throws JMSException {
}
@Override
public void close() throws JMSException {
}
@Override
public void recover() throws JMSException {
}
@Override
public MessageListener getMessageListener() throws JMSException {
return null;
}
@Override
public void setMessageListener(MessageListener listener) throws JMSException {
}
@Override
public void run() {
}
@Override
public MessageProducer createProducer(Destination destination) throws JMSException {
return null;
}
@Override
public MessageConsumer createConsumer(Destination destination) throws JMSException {
return null;
}
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
return null;
}
@Override
public MessageConsumer createConsumer(Destination destination,
String messageSelector,
boolean NoLocal) throws JMSException {
return null;
}
@Override
public Queue createQueue(String queueName) throws JMSException {
return null;
}
@Override
public Topic createTopic(String topicName) throws JMSException {
return null;
}
@Override
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
return null;
}
@Override
public TopicSubscriber createDurableSubscriber(Topic topic,
String name,
String messageSelector,
boolean noLocal) throws JMSException {
return null;
}
@Override
public QueueBrowser createBrowser(Queue queue) throws JMSException {
return null;
}
@Override
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
return null;
}
@Override
public TemporaryQueue createTemporaryQueue() throws JMSException {
return null;
}
@Override
public TemporaryTopic createTemporaryTopic() throws JMSException {
return null;
}
@Override
public void unsubscribe(String name) throws JMSException {
}
@Override
public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException {
return null;
}
@Override
public MessageConsumer createSharedConsumer(Topic topic,
String sharedSubscriptionName,
String messageSelector) throws JMSException {
return null;
}
@Override
public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
return null;
}
@Override
public MessageConsumer createDurableConsumer(Topic topic,
String name,
String messageSelector,
boolean noLocal) throws JMSException {
return null;
}
@Override
public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
return null;
}
@Override
public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException {
return null;
}
};
}
@Override
public void start() throws JMSException {
}
};
}
}, 100);
} catch (JMSSecurityException e) {
//expected
}
} finally {
if (conn1 != null) {
conn1.close();
}
}
}
}

View File

@ -99,6 +99,19 @@ public class OpenWireTestBase extends ActiveMQTestBase {
roles.add(destRole);
server.getConfiguration().putSecurityRoles("#", roles);
// advisory addresses, anyone can create/consume
// broker can produce
Role advisoryReceiverRole = new Role("advisoryReceiver", false, true, false, false, true, true, false, true, true, false);
roles = new HashSet<>();
roles.add(advisoryReceiverRole);
server.getConfiguration().putSecurityRoles("ActiveMQ.Advisory.#", roles);
securityManager.getConfiguration().addRole("openwireReceiver", "advisoryReceiver");
securityManager.getConfiguration().addRole("openwireSender", "advisoryReceiver");
securityManager.getConfiguration().addRole("openwireGuest", "advisoryReceiver");
securityManager.getConfiguration().addRole("openwireDestinationManager", "advisoryReceiver");
}
mbeanServer = MBeanServerFactory.createMBeanServer();

View File

@ -56,6 +56,7 @@ public class SecurityOpenWireTest extends BasicOpenWireTest {
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
securityManager.getConfiguration().addUser("denyQ", "denyQ");
securityManager.getConfiguration().addRole("denyQ", "denyQ");
securityManager.getConfiguration().addRole("denyQ", "advisoryReceiver");
}
@Test

View File

@ -117,6 +117,12 @@ public class SecurityPerAcceptorJmsTest extends ActiveMQTestBase {
Set<Role> roles = new HashSet<>();
roles.add(new Role("programmers", false, false, false, false, false, false, false, false, false, false));
server.getConfiguration().putSecurityRoles("#", roles);
// ensure advisory permission is still set for openwire to allow connection to succeed, alternative is url param jms.watchTopicAdvisories=false on the client connection factory
roles = new HashSet<>();
roles.add(new Role("programmers", false, true, false, false, true, true, false, false, true, false));
server.getConfiguration().putSecurityRoles("ActiveMQ.Advisory.#", roles);
server.start();
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(ADDRESS).setAddress(ADDRESS).setRoutingType(RoutingType.ANYCAST));

View File

@ -226,6 +226,11 @@ public class SecurityTest extends ActiveMQTestBase {
server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
// ensure advisory permission is still set for openwire to allow connection to succeed, alternative is url param jms.watchTopicAdvisories=false on the client connection factory
HashSet<Role> roles = new HashSet<>();
roles.add(new Role("programmers", false, true, false, false, true, true, false, false, true, false));
server.getConfiguration().putSecurityRoles("ActiveMQ.Advisory.#", roles);
server.start();
ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory("ssl://localhost:61616");
@ -274,6 +279,7 @@ public class SecurityTest extends ActiveMQTestBase {
factory.setTrustStorePassword("secureexample");
factory.setKeyStore("client-side-keystore.jks");
factory.setKeyStorePassword("secureexample");
factory.setWatchTopicAdvisories(false);
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

View File

@ -263,6 +263,8 @@ public class SecureConfigurationTest extends ActiveMQTestBase {
org.apache.activemq.ActiveMQConnectionFactory activeMQConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616");
activeMQConnectionFactory.setUserName(user);
activeMQConnectionFactory.setPassword(password);
// don't listen for advisories to avoid the need for advisory permissions
activeMQConnectionFactory.setWatchTopicAdvisories(false);
return activeMQConnectionFactory;
}