mirror of https://github.com/apache/activemq.git
Add support for JMS mapping compliant temp topic and temp queue creation as well as responding correctly to authorization errors if the connection has no rights to create them. Also cleans up code to use a consistent createDestination implementation that uses the names only and not attempt to interpret the client only destination annotations.
This commit is contained in:
parent
741e3aad3e
commit
2ec586f267
|
@ -67,6 +67,7 @@ import org.apache.activemq.selector.SelectorParser;
|
|||
import org.apache.activemq.store.PersistenceAdapterSupport;
|
||||
import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
|
||||
import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
|
||||
import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
|
||||
import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
|
||||
import org.apache.activemq.transport.amqp.message.EncodedMessage;
|
||||
import org.apache.activemq.transport.amqp.message.InboundTransformer;
|
||||
|
@ -130,6 +131,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
|||
private static final Symbol COPY = Symbol.getSymbol("copy");
|
||||
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
|
||||
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
|
||||
private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
|
||||
private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
|
||||
|
||||
private final AmqpTransport amqpTransport;
|
||||
private final AmqpWireFormat amqpWireFormat;
|
||||
|
@ -700,13 +703,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
|||
if (!closed) {
|
||||
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
|
||||
final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
|
||||
|
||||
// TODO - we need to cast TempTopic to TempQueue as we internally are using temp queues for all dynamic destinations
|
||||
// we need to figure out how to support both queues and topics
|
||||
if (message.getJMSReplyTo() != null && message.getJMSReplyTo() instanceof ActiveMQTempTopic) {
|
||||
ActiveMQTempTopic tempTopic = (ActiveMQTempTopic)message.getJMSReplyTo();
|
||||
message.setJMSReplyTo(new ActiveMQTempQueue(tempTopic.getPhysicalName()));
|
||||
}
|
||||
current = null;
|
||||
|
||||
if (destination != null) {
|
||||
|
@ -928,29 +924,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
|||
} else {
|
||||
Target target = (Target) remoteTarget;
|
||||
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
|
||||
ActiveMQDestination dest = null;
|
||||
ActiveMQDestination destination = null;
|
||||
boolean anonymous = false;
|
||||
String targetNodeName = target.getAddress();
|
||||
|
||||
if ((targetNodeName == null || targetNodeName.length() == 0) && !target.getDynamic()) {
|
||||
anonymous = true;
|
||||
} else if (target.getDynamic()) {
|
||||
dest = createTempQueue();
|
||||
destination = createTemporaryDestination(receiver, target.getCapabilities());
|
||||
Target actualTarget = new Target();
|
||||
actualTarget.setAddress(dest.getQualifiedName());
|
||||
actualTarget.setAddress(destination.getQualifiedName());
|
||||
actualTarget.setDynamic(true);
|
||||
receiver.setTarget(actualTarget);
|
||||
} else {
|
||||
dest = createDestination(remoteTarget);
|
||||
destination = createDestination(remoteTarget);
|
||||
}
|
||||
|
||||
final ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous);
|
||||
final ProducerContext producerContext = new ProducerContext(producerId, destination, anonymous);
|
||||
|
||||
receiver.setContext(producerContext);
|
||||
receiver.flow(flow);
|
||||
|
||||
ProducerInfo producerInfo = new ProducerInfo(producerId);
|
||||
producerInfo.setDestination(dest);
|
||||
producerInfo.setDestination(destination);
|
||||
sendToActiveMQ(producerInfo, new ResponseHandler() {
|
||||
@Override
|
||||
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
|
||||
|
@ -979,25 +975,24 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
|||
}
|
||||
}
|
||||
|
||||
private ActiveMQDestination createDestination(Object terminus) throws AmqpProtocolException {
|
||||
if (terminus == null) {
|
||||
private ActiveMQDestination createDestination(Object endpoint) throws AmqpProtocolException {
|
||||
if (endpoint == null) {
|
||||
return null;
|
||||
} else if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
|
||||
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) terminus;
|
||||
if (source.getAddress() == null || source.getAddress().length() == 0) {
|
||||
throw new AmqpProtocolException("amqp:invalid-field", "source address not set");
|
||||
}
|
||||
return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE);
|
||||
} else if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Target) {
|
||||
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) terminus;
|
||||
if (target.getAddress() == null || target.getAddress().length() == 0) {
|
||||
throw new AmqpProtocolException("amqp:invalid-field", "target address not set");
|
||||
}
|
||||
return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE);
|
||||
} else if (terminus instanceof Coordinator) {
|
||||
} else if (endpoint instanceof Coordinator) {
|
||||
return null;
|
||||
} else if (endpoint instanceof org.apache.qpid.proton.amqp.messaging.Terminus) {
|
||||
org.apache.qpid.proton.amqp.messaging.Terminus terminus = (org.apache.qpid.proton.amqp.messaging.Terminus) endpoint;
|
||||
if (terminus.getAddress() == null || terminus.getAddress().length() == 0) {
|
||||
if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
|
||||
throw new AmqpProtocolException("amqp:invalid-field", "source address not set");
|
||||
} else {
|
||||
throw new AmqpProtocolException("amqp:invalid-field", "target address not set");
|
||||
}
|
||||
}
|
||||
|
||||
return ActiveMQDestination.createDestination(terminus.getAddress(), ActiveMQDestination.QUEUE_TYPE);
|
||||
} else {
|
||||
throw new RuntimeException("Unexpected terminus type: " + terminus);
|
||||
throw new RuntimeException("Unexpected terminus type: " + endpoint);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1415,7 +1410,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
|||
}
|
||||
} else if (source.getDynamic()) {
|
||||
// lets create a temp dest.
|
||||
destination = createTempQueue();
|
||||
destination = createTemporaryDestination(sender, source.getCapabilities());
|
||||
source = new org.apache.qpid.proton.amqp.messaging.Source();
|
||||
source.setAddress(destination.getQualifiedName());
|
||||
source.setDynamic(true);
|
||||
|
@ -1517,17 +1512,59 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
|
|||
return result;
|
||||
}
|
||||
|
||||
private ActiveMQDestination createTempQueue() {
|
||||
ActiveMQDestination rc;
|
||||
rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
|
||||
private ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) {
|
||||
ActiveMQDestination rc = null;
|
||||
if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) {
|
||||
rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++);
|
||||
} else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) {
|
||||
rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
|
||||
} else {
|
||||
LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue");
|
||||
rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
|
||||
}
|
||||
|
||||
DestinationInfo info = new DestinationInfo();
|
||||
info.setConnectionId(connectionId);
|
||||
info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
|
||||
info.setDestination(rc);
|
||||
sendToActiveMQ(info, null);
|
||||
|
||||
sendToActiveMQ(info, new ResponseHandler() {
|
||||
|
||||
@Override
|
||||
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
|
||||
if (response.isException()) {
|
||||
link.setSource(null);
|
||||
|
||||
Throwable exception = ((ExceptionResponse) response).getException();
|
||||
if (exception instanceof SecurityException) {
|
||||
link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
|
||||
} else {
|
||||
link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
|
||||
}
|
||||
|
||||
link.close();
|
||||
link.free();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
private boolean contains(Symbol[] symbols, Symbol key) {
|
||||
if (symbols == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (Symbol symbol : symbols) {
|
||||
if (symbol.equals(key)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// //////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
// Implementation methods
|
||||
|
|
|
@ -37,17 +37,21 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
|
|||
|
||||
@Override
|
||||
public EncodedMessage transform(Message msg) throws Exception {
|
||||
if( msg == null )
|
||||
if (msg == null) {
|
||||
return null;
|
||||
if( !(msg instanceof BytesMessage) )
|
||||
}
|
||||
if (!(msg instanceof BytesMessage)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
|
||||
if (!msg.getBooleanProperty(prefixVendor + "NATIVE")) {
|
||||
return null;
|
||||
}
|
||||
} catch (MessageFormatException e) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return transform(this, (BytesMessage) msg);
|
||||
}
|
||||
|
||||
|
@ -65,15 +69,15 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
|
|||
|
||||
try {
|
||||
int count = msg.getIntProperty("JMSXDeliveryCount");
|
||||
if( count > 1 ) {
|
||||
if (count > 1) {
|
||||
|
||||
// decode...
|
||||
ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
|
||||
int offset = 0;
|
||||
int len = data.length;
|
||||
while( len > 0 ) {
|
||||
while (len > 0) {
|
||||
final int decoded = amqp.decode(data, offset, len);
|
||||
assert decoded > 0: "Make progress decoding the message";
|
||||
assert decoded > 0 : "Make progress decoding the message";
|
||||
offset += decoded;
|
||||
len -= decoded;
|
||||
}
|
||||
|
@ -84,11 +88,11 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
|
|||
amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
|
||||
|
||||
// Re-encode...
|
||||
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
|
||||
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
|
||||
final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
|
||||
int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
|
||||
if( overflow.position() > 0 ) {
|
||||
buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
|
||||
if (overflow.position() > 0) {
|
||||
buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
|
||||
c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
|
||||
}
|
||||
data = buffer.array();
|
||||
|
@ -99,5 +103,4 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
|
|||
|
||||
return new EncodedMessage(messageFormat, data, 0, dataSize);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
|
|||
|
||||
final long now = System.currentTimeMillis();
|
||||
rc.setJMSTimestamp(now);
|
||||
if( defaultTtl > 0 ) {
|
||||
if (defaultTtl > 0) {
|
||||
rc.setJMSExpiration(now + defaultTtl);
|
||||
}
|
||||
|
||||
|
|
|
@ -14,39 +14,28 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp;
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQBytesMessage;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMapMessage;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQObjectMessage;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQStreamMessage;
|
||||
import org.apache.activemq.command.ActiveMQTempQueue;
|
||||
import org.apache.activemq.command.ActiveMQTempTopic;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.transport.amqp.message.JMSVendor;
|
||||
|
||||
public class ActiveMQJMSVendor extends JMSVendor {
|
||||
public class ActiveMQJMSVendor implements JMSVendor {
|
||||
|
||||
final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
|
||||
|
||||
private static final String PREFIX_MARKER = "://";
|
||||
|
||||
private ActiveMQJMSVendor() {
|
||||
}
|
||||
|
||||
|
@ -82,32 +71,7 @@ public class ActiveMQJMSVendor extends JMSVendor {
|
|||
|
||||
@Override
|
||||
public Destination createDestination(String name) {
|
||||
return super.createDestination(name, Destination.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends Destination> T createDestination(String name, Class<T> kind) {
|
||||
String destinationName = name;
|
||||
int prefixEnd = name.lastIndexOf(PREFIX_MARKER);
|
||||
|
||||
if (prefixEnd >= 0) {
|
||||
destinationName = name.substring(prefixEnd + PREFIX_MARKER.length());
|
||||
}
|
||||
|
||||
if (kind == Queue.class) {
|
||||
return kind.cast(new ActiveMQQueue(destinationName));
|
||||
}
|
||||
if (kind == Topic.class) {
|
||||
return kind.cast(new ActiveMQTopic(destinationName));
|
||||
}
|
||||
if (kind == TemporaryQueue.class) {
|
||||
return kind.cast(new ActiveMQTempQueue(destinationName));
|
||||
}
|
||||
if (kind == TemporaryTopic.class) {
|
||||
return kind.cast(new ActiveMQTempTopic(destinationName));
|
||||
}
|
||||
|
||||
return kind.cast(ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE));
|
||||
return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
|
||||
}
|
||||
|
||||
@Override
|
|
@ -16,19 +16,12 @@
|
|||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Decimal128;
|
||||
|
@ -141,19 +134,12 @@ public abstract class InboundTransformer {
|
|||
}
|
||||
}
|
||||
|
||||
Class<? extends Destination> toAttributes = Destination.class;
|
||||
Class<? extends Destination> replyToAttributes = Destination.class;
|
||||
|
||||
final MessageAnnotations ma = amqp.getMessageAnnotations();
|
||||
if (ma != null) {
|
||||
for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
if ("x-opt-jms-type".equals(key.toString()) && entry.getValue() != null) {
|
||||
jms.setJMSType(entry.getValue().toString());
|
||||
} else if ("x-opt-to-type".equals(key.toString())) {
|
||||
toAttributes = toClassFromAttributes(entry.getValue().toString());
|
||||
} else if ("x-opt-reply-type".equals(key.toString())) {
|
||||
replyToAttributes = toClassFromAttributes(entry.getValue().toString());
|
||||
} else {
|
||||
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
|
||||
}
|
||||
|
@ -186,13 +172,13 @@ public abstract class InboundTransformer {
|
|||
vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
|
||||
}
|
||||
if (properties.getTo() != null) {
|
||||
jms.setJMSDestination(vendor.createDestination(properties.getTo(), toAttributes));
|
||||
jms.setJMSDestination(vendor.createDestination(properties.getTo()));
|
||||
}
|
||||
if (properties.getSubject() != null) {
|
||||
jms.setStringProperty(prefixVendor + "Subject", properties.getSubject());
|
||||
}
|
||||
if (properties.getReplyTo() != null) {
|
||||
jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo(), replyToAttributes));
|
||||
jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
|
||||
}
|
||||
if (properties.getCorrelationId() != null) {
|
||||
jms.setJMSCorrelationID(properties.getCorrelationId().toString());
|
||||
|
@ -243,42 +229,6 @@ public abstract class InboundTransformer {
|
|||
}
|
||||
}
|
||||
|
||||
private static final Set<String> QUEUE_ATTRIBUTES = createSet("queue");
|
||||
private static final Set<String> TOPIC_ATTRIBUTES = createSet("topic");
|
||||
private static final Set<String> TEMP_QUEUE_ATTRIBUTES = createSet("queue", "temporary");
|
||||
private static final Set<String> TEMP_TOPIC_ATTRIBUTES = createSet("topic", "temporary");
|
||||
|
||||
private static Set<String> createSet(String... args) {
|
||||
HashSet<String> s = new HashSet<String>();
|
||||
for (String arg : args) {
|
||||
s.add(arg);
|
||||
}
|
||||
return Collections.unmodifiableSet(s);
|
||||
}
|
||||
|
||||
Class<? extends Destination> toClassFromAttributes(String value) {
|
||||
if( value ==null ) {
|
||||
return null;
|
||||
}
|
||||
HashSet<String> attributes = new HashSet<String>();
|
||||
for( String x: value.split("\\s*,\\s*") ) {
|
||||
attributes.add(x);
|
||||
}
|
||||
if( QUEUE_ATTRIBUTES.equals(attributes) ) {
|
||||
return Queue.class;
|
||||
}
|
||||
if( TOPIC_ATTRIBUTES.equals(attributes) ) {
|
||||
return Topic.class;
|
||||
}
|
||||
if( TEMP_QUEUE_ATTRIBUTES.equals(attributes) ) {
|
||||
return TemporaryQueue.class;
|
||||
}
|
||||
if( TEMP_TOPIC_ATTRIBUTES.equals(attributes) ) {
|
||||
return TemporaryTopic.class;
|
||||
}
|
||||
return Destination.class;
|
||||
}
|
||||
|
||||
private void setProperty(Message msg, String key, Object value) throws JMSException {
|
||||
if (value instanceof UnsignedLong) {
|
||||
long v = ((UnsignedLong) value).longValue();
|
||||
|
|
|
@ -24,7 +24,7 @@ import javax.jms.ObjectMessage;
|
|||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
abstract public class JMSVendor {
|
||||
public interface JMSVendor {
|
||||
|
||||
public abstract BytesMessage createBytesMessage();
|
||||
|
||||
|
@ -40,14 +40,7 @@ abstract public class JMSVendor {
|
|||
|
||||
public abstract void setJMSXUserID(Message message, String value);
|
||||
|
||||
@Deprecated
|
||||
public Destination createDestination(String name) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public <T extends Destination> T createDestination(String name, Class<T> kind) {
|
||||
return kind.cast(createDestination(name));
|
||||
}
|
||||
public Destination createDestination(String name);
|
||||
|
||||
public abstract void setJMSXGroupID(Message message, String groupId);
|
||||
|
||||
|
|
|
@ -134,7 +134,7 @@ public class JMSClientSimpleAuthTest {
|
|||
public void testSendReceive() throws Exception {
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue("txQueue");
|
||||
Queue queue = session.createQueue("USERS.txQueue");
|
||||
MessageProducer p = session.createProducer(queue);
|
||||
TextMessage message = null;
|
||||
message = session.createTextMessage();
|
||||
|
@ -153,6 +153,42 @@ public class JMSClientSimpleAuthTest {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testCreateTemporaryQueueNotAuthorized() throws JMSException {
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
try {
|
||||
session.createTemporaryQueue();
|
||||
} catch (JMSSecurityException jmsse) {
|
||||
} catch (JMSException jmse) {
|
||||
LOG.info("Client should have thrown a JMSSecurityException but only threw JMSException");
|
||||
}
|
||||
|
||||
// Should not be fatal
|
||||
assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testCreateTemporaryTopicNotAuthorized() throws JMSException {
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
try {
|
||||
session.createTemporaryTopic();
|
||||
} catch (JMSSecurityException jmsse) {
|
||||
} catch (JMSException jmse) {
|
||||
LOG.info("Client should have thrown a JMSSecurityException but only threw JMSException");
|
||||
}
|
||||
|
||||
// Should not be fatal
|
||||
assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
|
||||
|
||||
session.close();
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
return createBroker(SIMPLE_AUTH_AMQP_BROKER_XML);
|
||||
}
|
||||
|
|
|
@ -43,6 +43,8 @@ import javax.jms.MessageProducer;
|
|||
import javax.jms.Queue;
|
||||
import javax.jms.QueueBrowser;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import javax.jms.TopicConnection;
|
||||
|
@ -55,6 +57,7 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean;
|
|||
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.objectweb.jtests.jms.framework.TestConfig;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -1048,6 +1051,95 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
consumer.close();
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
public void testCreateTemporaryQueue() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
|
||||
connection = createConnection();
|
||||
{
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createTemporaryQueue();
|
||||
assertNotNull(queue);
|
||||
assertTrue(queue instanceof TemporaryQueue);
|
||||
|
||||
final BrokerViewMBean broker = getProxyToBroker();
|
||||
assertEquals(1, broker.getTemporaryQueues().length);
|
||||
}
|
||||
}
|
||||
|
||||
@Ignore("Broker cannot currently tell if it should delete a temp destination")
|
||||
@Test(timeout=30000)
|
||||
public void testDeleteTemporaryQueue() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
|
||||
connection = createConnection();
|
||||
{
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createTemporaryQueue();
|
||||
assertNotNull(queue);
|
||||
assertTrue(queue instanceof TemporaryQueue);
|
||||
|
||||
final BrokerViewMBean broker = getProxyToBroker();
|
||||
assertEquals(1, broker.getTemporaryQueues().length);
|
||||
|
||||
TemporaryQueue tempQueue = (TemporaryQueue) queue;
|
||||
tempQueue.delete();
|
||||
|
||||
assertTrue("Temp Queue should be deleted.", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return broker.getTemporaryQueues().length == 0;
|
||||
}
|
||||
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
|
||||
}
|
||||
}
|
||||
|
||||
@Ignore("Legacy QPid client does not support creation of TemporaryTopics correctly")
|
||||
@Test(timeout=30000)
|
||||
public void testCreateTemporaryTopic() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
|
||||
connection = createConnection();
|
||||
{
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTemporaryTopic();
|
||||
assertNotNull(topic);
|
||||
assertTrue(topic instanceof TemporaryTopic);
|
||||
|
||||
final BrokerViewMBean broker = getProxyToBroker();
|
||||
assertEquals(1, broker.getTemporaryTopics().length);
|
||||
}
|
||||
}
|
||||
|
||||
@Ignore("Broker cannot currently tell if it should delete a temp destination")
|
||||
@Test(timeout=30000)
|
||||
public void testDeleteTemporaryTopic() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
|
||||
connection = createConnection();
|
||||
{
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTemporaryTopic();
|
||||
assertNotNull(topic);
|
||||
assertTrue(topic instanceof TemporaryTopic);
|
||||
|
||||
final BrokerViewMBean broker = getProxyToBroker();
|
||||
assertEquals(1, broker.getTemporaryTopics().length);
|
||||
|
||||
TemporaryTopic tempTopic = (TemporaryTopic) topic;
|
||||
tempTopic.delete();
|
||||
|
||||
assertTrue("Temp Topic should be deleted.", Wait.waitFor(new Wait.Condition() {
|
||||
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return broker.getTemporaryTopics().length == 0;
|
||||
}
|
||||
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
|
||||
}
|
||||
}
|
||||
|
||||
protected void receiveMessages(MessageConsumer consumer) throws Exception {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Message message = consumer.receive(1000);
|
||||
|
|
|
@ -107,7 +107,8 @@ public class JMSMappingInboundTransformerTest {
|
|||
|
||||
// Verify that createDestination was called with the provided 'to'
|
||||
// address and 'Destination' class
|
||||
Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
|
||||
// TODO - No need to really test this bit ?
|
||||
// Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
|
||||
}
|
||||
|
||||
// ======= JMSReplyTo Handling =========
|
||||
|
@ -160,7 +161,8 @@ public class JMSMappingInboundTransformerTest {
|
|||
|
||||
// Verify that createDestination was called with the provided 'replyTo'
|
||||
// address and 'Destination' class
|
||||
Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass);
|
||||
// TODO - No need to really test this bit ?
|
||||
// Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass);
|
||||
}
|
||||
|
||||
// ======= Utility Methods =========
|
||||
|
|
|
@ -31,11 +31,6 @@
|
|||
<queue physicalName="TEST.Q" />
|
||||
</destinations>
|
||||
|
||||
<!-- Use a non-default port in case the default port is in use -->
|
||||
<managementContext>
|
||||
<managementContext connectorPort="1199"/>
|
||||
</managementContext>
|
||||
|
||||
<transportConnectors>
|
||||
<transportConnector name="openwire" uri="vm://localhost" />
|
||||
<transportConnector name="amqp" uri="amqp://0.0.0.0:0"/>
|
||||
|
@ -49,6 +44,27 @@
|
|||
<authenticationUser username="guest" password="guestPassword" groups="guests"/>
|
||||
</users>
|
||||
</simpleAuthenticationPlugin>
|
||||
|
||||
<authorizationPlugin>
|
||||
<map>
|
||||
<authorizationMap>
|
||||
<authorizationEntries>
|
||||
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
|
||||
<authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
|
||||
<authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
|
||||
|
||||
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
|
||||
<authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
|
||||
<authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
|
||||
|
||||
<authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
|
||||
</authorizationEntries>
|
||||
<tempDestinationAuthorizationEntry>
|
||||
<tempDestinationAuthorizationEntry read="admins" write="admins" admin="admins"/>
|
||||
</tempDestinationAuthorizationEntry>
|
||||
</authorizationMap>
|
||||
</map>
|
||||
</authorizationPlugin>
|
||||
</plugins>
|
||||
</broker>
|
||||
|
||||
|
|
Loading…
Reference in New Issue