Merge pull request #3315 from gtully/ARTEMIS-2964
ARTEMIS-2964 - fire advisory messages via post office, independent of…
This commit is contained in:
commit
7dd50bd58e
|
@ -50,6 +50,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
|
||||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||||
|
@ -165,6 +166,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
|
|
||||||
private final Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
|
private final Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
|
||||||
|
|
||||||
private ConnectionState state;
|
private ConnectionState state;
|
||||||
|
|
||||||
private volatile boolean noLocal;
|
private volatile boolean noLocal;
|
||||||
|
@ -177,8 +180,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
*/
|
*/
|
||||||
private final Map<TransactionId, Transaction> txMap = new ConcurrentHashMap<>();
|
private final Map<TransactionId, Transaction> txMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private volatile AMQSession advisorySession;
|
|
||||||
|
|
||||||
private final ActiveMQServer server;
|
private final ActiveMQServer server;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -711,14 +712,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setAdvisorySession(AMQSession amqSession) {
|
|
||||||
this.advisorySession = amqSession;
|
|
||||||
}
|
|
||||||
|
|
||||||
public AMQSession getAdvisorySession() {
|
|
||||||
return this.advisorySession;
|
|
||||||
}
|
|
||||||
|
|
||||||
public AMQConnectionContext getContext() {
|
public AMQConnectionContext getContext() {
|
||||||
return this.context;
|
return this.context;
|
||||||
}
|
}
|
||||||
|
@ -1032,22 +1025,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
|
|
||||||
public void addSessions(Set<SessionId> sessionSet) {
|
public void addSessions(Set<SessionId> sessionSet) {
|
||||||
for (SessionId sid : sessionSet) {
|
for (SessionId sid : sessionSet) {
|
||||||
addSession(getState().getSessionState(sid).getInfo(), true);
|
addSession(getState().getSessionState(sid).getInfo());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public AMQSession addSession(SessionInfo ss) {
|
public AMQSession addSession(SessionInfo ss) {
|
||||||
return addSession(ss, false);
|
AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager, coreMessageObjectPools);
|
||||||
}
|
|
||||||
|
|
||||||
public AMQSession addSession(SessionInfo ss, boolean internal) {
|
|
||||||
AMQSession amqSession = new AMQSession(getState().getInfo(), ss, server, this, protocolManager);
|
|
||||||
amqSession.initialize();
|
amqSession.initialize();
|
||||||
|
|
||||||
if (internal) {
|
|
||||||
amqSession.disableSecurity();
|
|
||||||
}
|
|
||||||
|
|
||||||
sessions.put(ss.getSessionId(), amqSession);
|
sessions.put(ss.getSessionId(), amqSession);
|
||||||
sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
|
sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
|
||||||
return amqSession;
|
return amqSession;
|
||||||
|
@ -1807,4 +1792,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
||||||
return transportConnection.getLocalAddress();
|
return transportConnection.getLocalAddress();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CoreMessageObjectPools getCoreMessageObjectPools() {
|
||||||
|
return coreMessageObjectPools;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,9 +40,8 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
||||||
import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
||||||
|
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
|
||||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
|
||||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
|
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
@ -72,13 +71,11 @@ import org.apache.activemq.command.DestinationInfo;
|
||||||
import org.apache.activemq.command.MessageDispatch;
|
import org.apache.activemq.command.MessageDispatch;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.command.ProducerId;
|
import org.apache.activemq.command.ProducerId;
|
||||||
import org.apache.activemq.command.ProducerInfo;
|
|
||||||
import org.apache.activemq.command.WireFormatInfo;
|
import org.apache.activemq.command.WireFormatInfo;
|
||||||
import org.apache.activemq.filter.DestinationFilter;
|
import org.apache.activemq.filter.DestinationFilter;
|
||||||
import org.apache.activemq.filter.DestinationPath;
|
import org.apache.activemq.filter.DestinationPath;
|
||||||
import org.apache.activemq.openwire.OpenWireFormat;
|
import org.apache.activemq.openwire.OpenWireFormat;
|
||||||
import org.apache.activemq.openwire.OpenWireFormatFactory;
|
import org.apache.activemq.openwire.OpenWireFormatFactory;
|
||||||
import org.apache.activemq.state.ProducerState;
|
|
||||||
import org.apache.activemq.util.IdGenerator;
|
import org.apache.activemq.util.IdGenerator;
|
||||||
import org.apache.activemq.util.InetAddressUtil;
|
import org.apache.activemq.util.InetAddressUtil;
|
||||||
import org.apache.activemq.util.LongSequenceGenerator;
|
import org.apache.activemq.util.LongSequenceGenerator;
|
||||||
|
@ -376,10 +373,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
|
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
|
||||||
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
|
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
|
||||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
|
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
|
||||||
|
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, context.getConnection().getLocalAddress());
|
||||||
String url = context.getConnection().getLocalAddress();
|
|
||||||
|
|
||||||
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
|
|
||||||
|
|
||||||
// set the data structure
|
// set the data structure
|
||||||
advisoryMessage.setDataStructure(command);
|
advisoryMessage.setDataStructure(command);
|
||||||
|
@ -390,19 +384,16 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
||||||
advisoryMessage.setDestination(topic);
|
advisoryMessage.setDestination(topic);
|
||||||
advisoryMessage.setResponseRequired(false);
|
advisoryMessage.setResponseRequired(false);
|
||||||
advisoryMessage.setProducerId(advisoryProducerId);
|
advisoryMessage.setProducerId(advisoryProducerId);
|
||||||
boolean originalFlowControl = context.isProducerFlowControl();
|
advisoryMessage.setTimestamp(System.currentTimeMillis());
|
||||||
final AMQProducerBrokerExchange producerExchange = new AMQProducerBrokerExchange();
|
|
||||||
producerExchange.setConnectionContext(context);
|
final CoreMessageObjectPools objectPools = context.getConnection().getCoreMessageObjectPools();
|
||||||
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
|
final org.apache.activemq.artemis.api.core.Message coreMessage = OpenWireMessageConverter.inbound(advisoryMessage, wireFormat, objectPools);
|
||||||
try {
|
|
||||||
context.setProducerFlowControl(false);
|
final SimpleString address = SimpleString.toSimpleString(topic.getPhysicalName(), objectPools.getAddressStringSimpleStringPool());
|
||||||
AMQSession sess = context.getConnection().getAdvisorySession();
|
coreMessage.setAddress(address);
|
||||||
if (sess != null) {
|
coreMessage.setRoutingType(RoutingType.MULTICAST);
|
||||||
sess.send(producerExchange.getProducerState().getInfo(), advisoryMessage, false);
|
// follow pattern from management notification to route directly
|
||||||
}
|
server.getPostOffice().route(coreMessage, false);
|
||||||
} finally {
|
|
||||||
context.setProducerFlowControl(originalFlowControl);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getBrokerName() {
|
public String getBrokerName() {
|
||||||
|
|
|
@ -90,7 +90,7 @@ public class AMQSession implements SessionCallback {
|
||||||
|
|
||||||
private final Runnable enableAutoReadAndTtl;
|
private final Runnable enableAutoReadAndTtl;
|
||||||
|
|
||||||
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
|
private final CoreMessageObjectPools coreMessageObjectPools;
|
||||||
|
|
||||||
private String[] existingQueuesCache;
|
private String[] existingQueuesCache;
|
||||||
|
|
||||||
|
@ -100,7 +100,7 @@ public class AMQSession implements SessionCallback {
|
||||||
SessionInfo sessInfo,
|
SessionInfo sessInfo,
|
||||||
ActiveMQServer server,
|
ActiveMQServer server,
|
||||||
OpenWireConnection connection,
|
OpenWireConnection connection,
|
||||||
OpenWireProtocolManager protocolManager) {
|
OpenWireProtocolManager protocolManager, CoreMessageObjectPools coreMessageObjectPools) {
|
||||||
this.connInfo = connInfo;
|
this.connInfo = connInfo;
|
||||||
this.sessInfo = sessInfo;
|
this.sessInfo = sessInfo;
|
||||||
this.clientId = SimpleString.toSimpleString(connInfo.getClientId());
|
this.clientId = SimpleString.toSimpleString(connInfo.getClientId());
|
||||||
|
@ -111,6 +111,7 @@ public class AMQSession implements SessionCallback {
|
||||||
this.protocolManagerWireFormat = protocolManager.wireFormat().copy();
|
this.protocolManagerWireFormat = protocolManager.wireFormat().copy();
|
||||||
this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
|
this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
|
||||||
this.existingQueuesCache = null;
|
this.existingQueuesCache = null;
|
||||||
|
this.coreMessageObjectPools = coreMessageObjectPools;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isClosed() {
|
public boolean isClosed() {
|
||||||
|
@ -132,11 +133,6 @@ public class AMQSession implements SessionCallback {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes(), protocolManager.getSecurityDomain());
|
coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes(), protocolManager.getSecurityDomain());
|
||||||
|
|
||||||
long sessionId = sessInfo.getSessionId().getValue();
|
|
||||||
if (sessionId == -1) {
|
|
||||||
this.connection.setAdvisorySession(this);
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
ActiveMQServerLogger.LOGGER.error("error init session", e);
|
ActiveMQServerLogger.LOGGER.error("error init session", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.openwire;
|
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||||
|
|
||||||
|
import org.apache.activemq.advisory.AdvisorySupport;
|
||||||
import org.apache.activemq.artemis.api.core.management.AddressControl;
|
import org.apache.activemq.artemis.api.core.management.AddressControl;
|
||||||
import org.apache.activemq.artemis.api.core.management.QueueControl;
|
import org.apache.activemq.artemis.api.core.management.QueueControl;
|
||||||
import org.apache.activemq.artemis.tests.util.Wait;
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
@ -26,6 +27,8 @@ import javax.jms.Connection;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TemporaryQueue;
|
import javax.jms.TemporaryQueue;
|
||||||
import javax.jms.TemporaryTopic;
|
import javax.jms.TemporaryTopic;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
public class AdvisoryOpenWireTest extends BasicOpenWireTest {
|
public class AdvisoryOpenWireTest extends BasicOpenWireTest {
|
||||||
|
|
||||||
|
@ -147,4 +150,37 @@ public class AdvisoryOpenWireTest extends BasicOpenWireTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConnectionAdvisory() throws Exception {
|
||||||
|
final Connection[] connections = new Connection[20];
|
||||||
|
|
||||||
|
connections[0] = factory.createConnection();
|
||||||
|
connections[0].start();
|
||||||
|
|
||||||
|
final CountDownLatch numConnectionsCreatedViaAdvisoryNotificationsLatch = new CountDownLatch(19);
|
||||||
|
connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE)
|
||||||
|
.createConsumer(AdvisorySupport.getConnectionAdvisoryTopic()).setMessageListener(message -> numConnectionsCreatedViaAdvisoryNotificationsLatch.countDown());
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (int i = 1; i < connections.length; i++) {
|
||||||
|
connections[i] = factory.createConnection();
|
||||||
|
connections[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
session.close();
|
||||||
|
|
||||||
|
assertTrue("Got all the advisories on time", numConnectionsCreatedViaAdvisoryNotificationsLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
for (Connection conn : connections) {
|
||||||
|
if (conn != null) {
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,14 +16,30 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.openwire;
|
package org.apache.activemq.artemis.tests.integration.openwire;
|
||||||
|
|
||||||
|
import javax.jms.BytesMessage;
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSException;
|
||||||
import javax.jms.JMSSecurityException;
|
import javax.jms.JMSSecurityException;
|
||||||
|
import javax.jms.MapMessage;
|
||||||
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.ObjectMessage;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.QueueBrowser;
|
||||||
|
import javax.jms.ServerSession;
|
||||||
|
import javax.jms.ServerSessionPool;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
import javax.jms.StreamMessage;
|
||||||
import javax.jms.TemporaryQueue;
|
import javax.jms.TemporaryQueue;
|
||||||
|
import javax.jms.TemporaryTopic;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
import javax.jms.TopicSubscriber;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -173,7 +189,6 @@ public class BasicSecurityTest extends BasicOpenWireTest {
|
||||||
Connection conn1 = null;
|
Connection conn1 = null;
|
||||||
Connection conn2 = null;
|
Connection conn2 = null;
|
||||||
|
|
||||||
//Sender
|
|
||||||
try {
|
try {
|
||||||
conn1 = factory.createConnection("openwireGuest", "GuEsT");
|
conn1 = factory.createConnection("openwireGuest", "GuEsT");
|
||||||
conn1.start();
|
conn1.start();
|
||||||
|
@ -205,4 +220,233 @@ public class BasicSecurityTest extends BasicOpenWireTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConnectionConsumer() throws Exception {
|
||||||
|
Connection conn1 = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
conn1 = factory.createConnection("openwireGuest", "GuEsT");
|
||||||
|
conn1.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
Destination dest = new ActiveMQQueue(queueName);
|
||||||
|
|
||||||
|
conn1.createConnectionConsumer(dest, null, new ServerSessionPool() {
|
||||||
|
@Override
|
||||||
|
public ServerSession getServerSession() throws JMSException {
|
||||||
|
return new ServerSession() {
|
||||||
|
@Override
|
||||||
|
public Session getSession() throws JMSException {
|
||||||
|
return new Session() {
|
||||||
|
@Override
|
||||||
|
public BytesMessage createBytesMessage() throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MapMessage createMapMessage() throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Message createMessage() throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ObjectMessage createObjectMessage() throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StreamMessage createStreamMessage() throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TextMessage createTextMessage() throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TextMessage createTextMessage(String text) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean getTransacted() throws JMSException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getAcknowledgeMode() throws JMSException {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void commit() throws JMSException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void rollback() throws JMSException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws JMSException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recover() throws JMSException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageListener getMessageListener() throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMessageListener(MessageListener listener) throws JMSException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageProducer createProducer(Destination destination) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageConsumer createConsumer(Destination destination) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageConsumer createConsumer(Destination destination,
|
||||||
|
String messageSelector,
|
||||||
|
boolean NoLocal) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Queue createQueue(String queueName) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Topic createTopic(String topicName) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TopicSubscriber createDurableSubscriber(Topic topic,
|
||||||
|
String name,
|
||||||
|
String messageSelector,
|
||||||
|
boolean noLocal) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueBrowser createBrowser(Queue queue) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TemporaryQueue createTemporaryQueue() throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TemporaryTopic createTemporaryTopic() throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unsubscribe(String name) throws JMSException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageConsumer createSharedConsumer(Topic topic,
|
||||||
|
String sharedSubscriptionName,
|
||||||
|
String messageSelector) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageConsumer createDurableConsumer(Topic topic,
|
||||||
|
String name,
|
||||||
|
String messageSelector,
|
||||||
|
boolean noLocal) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() throws JMSException {
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}, 100);
|
||||||
|
} catch (JMSSecurityException e) {
|
||||||
|
//expected
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (conn1 != null) {
|
||||||
|
conn1.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,19 @@ public class OpenWireTestBase extends ActiveMQTestBase {
|
||||||
roles.add(destRole);
|
roles.add(destRole);
|
||||||
|
|
||||||
server.getConfiguration().putSecurityRoles("#", roles);
|
server.getConfiguration().putSecurityRoles("#", roles);
|
||||||
|
|
||||||
|
// advisory addresses, anyone can create/consume
|
||||||
|
// broker can produce
|
||||||
|
Role advisoryReceiverRole = new Role("advisoryReceiver", false, true, false, false, true, true, false, true, true, false);
|
||||||
|
|
||||||
|
roles = new HashSet<>();
|
||||||
|
roles.add(advisoryReceiverRole);
|
||||||
|
server.getConfiguration().putSecurityRoles("ActiveMQ.Advisory.#", roles);
|
||||||
|
|
||||||
|
securityManager.getConfiguration().addRole("openwireReceiver", "advisoryReceiver");
|
||||||
|
securityManager.getConfiguration().addRole("openwireSender", "advisoryReceiver");
|
||||||
|
securityManager.getConfiguration().addRole("openwireGuest", "advisoryReceiver");
|
||||||
|
securityManager.getConfiguration().addRole("openwireDestinationManager", "advisoryReceiver");
|
||||||
}
|
}
|
||||||
|
|
||||||
mbeanServer = MBeanServerFactory.createMBeanServer();
|
mbeanServer = MBeanServerFactory.createMBeanServer();
|
||||||
|
|
|
@ -56,6 +56,7 @@ public class SecurityOpenWireTest extends BasicOpenWireTest {
|
||||||
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
|
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
|
||||||
securityManager.getConfiguration().addUser("denyQ", "denyQ");
|
securityManager.getConfiguration().addUser("denyQ", "denyQ");
|
||||||
securityManager.getConfiguration().addRole("denyQ", "denyQ");
|
securityManager.getConfiguration().addRole("denyQ", "denyQ");
|
||||||
|
securityManager.getConfiguration().addRole("denyQ", "advisoryReceiver");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -117,6 +117,12 @@ public class SecurityPerAcceptorJmsTest extends ActiveMQTestBase {
|
||||||
Set<Role> roles = new HashSet<>();
|
Set<Role> roles = new HashSet<>();
|
||||||
roles.add(new Role("programmers", false, false, false, false, false, false, false, false, false, false));
|
roles.add(new Role("programmers", false, false, false, false, false, false, false, false, false, false));
|
||||||
server.getConfiguration().putSecurityRoles("#", roles);
|
server.getConfiguration().putSecurityRoles("#", roles);
|
||||||
|
|
||||||
|
// ensure advisory permission is still set for openwire to allow connection to succeed, alternative is url param jms.watchTopicAdvisories=false on the client connection factory
|
||||||
|
roles = new HashSet<>();
|
||||||
|
roles.add(new Role("programmers", false, true, false, false, true, true, false, false, true, false));
|
||||||
|
server.getConfiguration().putSecurityRoles("ActiveMQ.Advisory.#", roles);
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
|
||||||
server.createQueue(new QueueConfiguration(ADDRESS).setAddress(ADDRESS).setRoutingType(RoutingType.ANYCAST));
|
server.createQueue(new QueueConfiguration(ADDRESS).setAddress(ADDRESS).setRoutingType(RoutingType.ANYCAST));
|
||||||
|
|
|
@ -226,6 +226,11 @@ public class SecurityTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
|
server.getConfiguration().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
|
||||||
|
|
||||||
|
// ensure advisory permission is still set for openwire to allow connection to succeed, alternative is url param jms.watchTopicAdvisories=false on the client connection factory
|
||||||
|
HashSet<Role> roles = new HashSet<>();
|
||||||
|
roles.add(new Role("programmers", false, true, false, false, true, true, false, false, true, false));
|
||||||
|
server.getConfiguration().putSecurityRoles("ActiveMQ.Advisory.#", roles);
|
||||||
|
|
||||||
server.start();
|
server.start();
|
||||||
|
|
||||||
ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory("ssl://localhost:61616");
|
ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory("ssl://localhost:61616");
|
||||||
|
@ -274,6 +279,7 @@ public class SecurityTest extends ActiveMQTestBase {
|
||||||
factory.setTrustStorePassword("secureexample");
|
factory.setTrustStorePassword("secureexample");
|
||||||
factory.setKeyStore("client-side-keystore.jks");
|
factory.setKeyStore("client-side-keystore.jks");
|
||||||
factory.setKeyStorePassword("secureexample");
|
factory.setKeyStorePassword("secureexample");
|
||||||
|
factory.setWatchTopicAdvisories(false);
|
||||||
|
|
||||||
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
|
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
|
@ -263,6 +263,8 @@ public class SecureConfigurationTest extends ActiveMQTestBase {
|
||||||
org.apache.activemq.ActiveMQConnectionFactory activeMQConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616");
|
org.apache.activemq.ActiveMQConnectionFactory activeMQConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616");
|
||||||
activeMQConnectionFactory.setUserName(user);
|
activeMQConnectionFactory.setUserName(user);
|
||||||
activeMQConnectionFactory.setPassword(password);
|
activeMQConnectionFactory.setPassword(password);
|
||||||
|
// don't listen for advisories to avoid the need for advisory permissions
|
||||||
|
activeMQConnectionFactory.setWatchTopicAdvisories(false);
|
||||||
return activeMQConnectionFactory;
|
return activeMQConnectionFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue