https://issues.apache.org/jira/browse/AMQ-5630 - add rejectDurableConsumers boolen attribute - when true, requests to create durable subscriptions will fail with a JMSException - not allowed

This commit is contained in:
gtully 2015-03-03 13:29:36 +00:00
parent 4fe2bd534a
commit 741e3aad3e
3 changed files with 136 additions and 0 deletions

View File

@ -249,6 +249,7 @@ public class BrokerService implements Service {
private boolean restartAllowed = true;
private boolean restartRequested = false;
private boolean rejectDurableConsumers = false;
private int storeOpenWireVersion = OpenWireFormat.DEFAULT_VERSION;
@ -3053,4 +3054,12 @@ public class BrokerService implements Service {
public void incrementTotalConnections() {
this.totalConnections.incrementAndGet();
}
public boolean isRejectDurableConsumers() {
return rejectDurableConsumers;
}
public void setRejectDurableConsumers(boolean rejectDurableConsumers) {
this.rejectDurableConsumers = rejectDurableConsumers;
}
}

View File

@ -109,6 +109,9 @@ public class TopicRegion extends AbstractRegion {
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
if (info.isDurable()) {
if (broker.getBrokerService().isRejectDurableConsumers()) {
throw new JMSException("Durable Consumers are not allowed");
}
ActiveMQDestination destination = info.getDestination();
if (!destination.isPattern()) {
// Make sure the destination is created.

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompConnection;
public class BrokerDurableRejectedTest extends TestSupport {
protected Connection connection;
protected Session consumeSession;
protected Destination consumerDestination;
@Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm:(broker:(stomp://localhost:0)?persistent=false&rejectDurableConsumers=true)");
}
protected void setUp() throws Exception {
super.setUp();
connectionFactory = createConnectionFactory();
connection = createConnection();
connection.setClientID(getClass().getName());
consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
consumerDestination = consumeSession.createTopic("TestDurableRejected");
connection.start();
}
public void testDurableTopicConsumerJms() throws Exception {
consumeSession.createConsumer(consumerDestination);
try {
consumeSession.createDurableSubscriber((Topic)consumerDestination, getName());
fail("Expect not allowed jms exception on durable creation");
} catch (JMSException expected) {
assertTrue("expected exception", expected.getMessage().contains("not allowed"));
}
}
public void testDurableTopicConsumerStomp() throws Exception {
// verify stomp ok in this case
StompConnection stompConnection = new StompConnection();
stompConnection.open("localhost", BrokerRegistry.getInstance().findFirst().getTransportConnectorByScheme("stomp").getPublishableConnectURI().getPort());
// connect
String frame = "CONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
// subscribe
frame = "SUBSCRIBE\n" + "destination:/topic/" + ((Topic) consumerDestination).getTopicName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
assertTrue("contains expected message -" + frame, frame.contains("not allowed"));
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
public void testDurableTopicConsumerStompWithReceipt() throws Exception {
// verify stomp ok in this case
StompConnection stompConnection = new StompConnection();
stompConnection.open("localhost", BrokerRegistry.getInstance().findFirst().getTransportConnectorByScheme("stomp").getPublishableConnectURI().getPort());
// connect
String frame = "CONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
// subscribe
frame = "SUBSCRIBE\n" + "destination:/topic/" + ((Topic) consumerDestination).getTopicName() + "\nreceipt:1\n"
+ "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
assertTrue("contains expected message -" + frame, frame.contains("not allowed"));
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
super.tearDown();
}
}