mirror of https://github.com/apache/activemq.git
Applied patch for https://issues.apache.org/activemq/browse/AMQ-2795
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@960298 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c14c7e791d
commit
42ee51fe9a
|
@ -116,6 +116,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
private int consumersBeforeDispatchStarts = 0;
|
||||
private CountDownLatch consumersBeforeStartsLatch;
|
||||
private final AtomicLong pendingWakeups = new AtomicLong();
|
||||
private boolean allConsumersExclusiveByDefault = false;
|
||||
|
||||
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
|
||||
public void run() {
|
||||
|
@ -368,7 +369,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
}
|
||||
|
||||
addToConsumerList(sub);
|
||||
if (sub.getConsumerInfo().isExclusive()) {
|
||||
if (sub.getConsumerInfo().isExclusive() || isAllConsumersExclusiveByDefault()) {
|
||||
Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
|
||||
if (exclusiveConsumer == null) {
|
||||
exclusiveConsumer = sub;
|
||||
|
@ -428,6 +429,16 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
}
|
||||
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
|
||||
}
|
||||
} else if (isAllConsumersExclusiveByDefault()) {
|
||||
Subscription exclusiveConsumer = null;
|
||||
for (Subscription s : consumers) {
|
||||
if (exclusiveConsumer == null
|
||||
|| s.getConsumerInfo().getPriority() > exclusiveConsumer
|
||||
.getConsumerInfo().getPriority()) {
|
||||
exclusiveConsumer = s;
|
||||
}
|
||||
}
|
||||
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
|
||||
}
|
||||
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
|
||||
getMessageGroupOwners().removeConsumer(consumerId);
|
||||
|
@ -881,6 +892,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
|
||||
}
|
||||
|
||||
public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
|
||||
this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
|
||||
}
|
||||
|
||||
public boolean isAllConsumersExclusiveByDefault() {
|
||||
return allConsumersExclusiveByDefault;
|
||||
}
|
||||
|
||||
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
private QueueMessageReference createMessageReference(Message message) {
|
||||
|
|
|
@ -88,6 +88,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
private int storeUsageHighWaterMark = 100;
|
||||
private SlowConsumerStrategy slowConsumerStrategy;
|
||||
private boolean prioritizedMessages;
|
||||
private boolean allConsumersExclusiveByDefault;
|
||||
|
||||
|
||||
public void configure(Broker broker,Queue queue) {
|
||||
|
@ -111,6 +112,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
queue.setLazyDispatch(isLazyDispatch());
|
||||
queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
|
||||
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
|
||||
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
|
||||
}
|
||||
|
||||
public void configure(Broker broker,Topic topic) {
|
||||
|
@ -751,4 +753,12 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
this.prioritizedMessages = prioritizedMessages;
|
||||
}
|
||||
|
||||
public void setAllConsumersExclusiveByDefault(boolean allConsumersExclusiveByDefault) {
|
||||
this.allConsumersExclusiveByDefault = allConsumersExclusiveByDefault;
|
||||
}
|
||||
|
||||
public boolean isAllConsumersExclusiveByDefault() {
|
||||
return allConsumersExclusiveByDefault;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import junit.framework.Assert;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
|
||||
public class ExclusiveConsumerStartupDestinationTest extends EmbeddedBrokerTestSupport{
|
||||
|
||||
private static final String VM_BROKER_URL = "vm://localhost";
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = new BrokerService();
|
||||
answer.setPersistent(false);
|
||||
PolicyMap map = new PolicyMap();
|
||||
PolicyEntry entry = new PolicyEntry();
|
||||
entry.setAllConsumersExclusiveByDefault(true);
|
||||
map.setDefaultEntry(entry);
|
||||
answer.setDestinationPolicy(map);
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected String getBrokerConfigUri() {
|
||||
return "org/apache/activemq/broker/exclusive-consumer-startup-destination.xml";
|
||||
}
|
||||
|
||||
private Connection createConnection(final boolean start) throws JMSException {
|
||||
ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL);
|
||||
Connection conn = cf.createConnection();
|
||||
if (start) {
|
||||
conn.start();
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException {
|
||||
Connection conn = createConnection(true);
|
||||
|
||||
Session exclusiveSession = null;
|
||||
Session fallbackSession = null;
|
||||
Session senderSession = null;
|
||||
|
||||
try {
|
||||
|
||||
exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1");
|
||||
MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
|
||||
|
||||
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1");
|
||||
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
|
||||
|
||||
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1");
|
||||
|
||||
MessageProducer producer = senderSession.createProducer(senderQueue);
|
||||
|
||||
Message msg = senderSession.createTextMessage("test");
|
||||
producer.send(msg);
|
||||
// TODO need two send a 2nd message - bug AMQ-1024
|
||||
// producer.send(msg);
|
||||
Thread.sleep(100);
|
||||
|
||||
// Verify exclusive consumer receives the message.
|
||||
Assert.assertNotNull(exclusiveConsumer.receive(100));
|
||||
Assert.assertNull(fallbackConsumer.receive(100));
|
||||
|
||||
} finally {
|
||||
fallbackSession.close();
|
||||
senderSession.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException,
|
||||
InterruptedException {
|
||||
Connection conn = createConnection(true);
|
||||
|
||||
Session exclusiveSession1 = null;
|
||||
Session exclusiveSession2 = null;
|
||||
Session fallbackSession = null;
|
||||
Session senderSession = null;
|
||||
|
||||
try {
|
||||
|
||||
exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
// This creates the exclusive consumer first which avoids AMQ-1024
|
||||
// bug.
|
||||
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2");
|
||||
MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
|
||||
MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
|
||||
|
||||
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2");
|
||||
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
|
||||
|
||||
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2");
|
||||
|
||||
MessageProducer producer = senderSession.createProducer(senderQueue);
|
||||
|
||||
Message msg = senderSession.createTextMessage("test");
|
||||
producer.send(msg);
|
||||
Thread.sleep(100);
|
||||
|
||||
// Verify exclusive consumer receives the message.
|
||||
Assert.assertNotNull(exclusiveConsumer1.receive(100));
|
||||
Assert.assertNull(exclusiveConsumer2.receive(100));
|
||||
Assert.assertNull(fallbackConsumer.receive(100));
|
||||
|
||||
// Close the exclusive consumer to verify the non-exclusive consumer
|
||||
// takes over
|
||||
exclusiveConsumer1.close();
|
||||
|
||||
producer.send(msg);
|
||||
producer.send(msg);
|
||||
|
||||
Assert.assertNotNull("Should have received a message", exclusiveConsumer2.receive(100));
|
||||
Assert.assertNull("Should not have received a message", fallbackConsumer.receive(100));
|
||||
|
||||
} finally {
|
||||
fallbackSession.close();
|
||||
senderSession.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException {
|
||||
Connection conn = createConnection(true);
|
||||
|
||||
Session exclusiveSession = null;
|
||||
Session fallbackSession = null;
|
||||
Session senderSession = null;
|
||||
|
||||
try {
|
||||
|
||||
exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
// This creates the exclusive consumer first which avoids AMQ-1024
|
||||
// bug.
|
||||
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3");
|
||||
MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
|
||||
|
||||
ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3");
|
||||
MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
|
||||
|
||||
ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3");
|
||||
|
||||
MessageProducer producer = senderSession.createProducer(senderQueue);
|
||||
|
||||
Message msg = senderSession.createTextMessage("test");
|
||||
producer.send(msg);
|
||||
Thread.sleep(100);
|
||||
|
||||
// Verify exclusive consumer receives the message.
|
||||
Assert.assertNotNull(exclusiveConsumer.receive(100));
|
||||
Assert.assertNull(fallbackConsumer.receive(100));
|
||||
|
||||
// Close the exclusive consumer to verify the non-exclusive consumer
|
||||
// takes over
|
||||
exclusiveConsumer.close();
|
||||
|
||||
producer.send(msg);
|
||||
|
||||
Assert.assertNotNull(fallbackConsumer.receive(100));
|
||||
|
||||
} finally {
|
||||
fallbackSession.close();
|
||||
senderSession.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- this file can only be parsed using the xbean-spring library -->
|
||||
<!-- START SNIPPET: xbean -->
|
||||
<beans
|
||||
xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
||||
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
|
||||
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
|
||||
|
||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
|
||||
|
||||
<broker xmlns="http://activemq.apache.org/schema/core">
|
||||
<destinationPolicy>
|
||||
<policyMap>
|
||||
<policyEntries>
|
||||
<policyEntry queue="TEST.>" allConsumersExclusiveByDefault="true"/>
|
||||
</policyEntries>
|
||||
</policyMap>
|
||||
</destinationPolicy>
|
||||
<destinations>
|
||||
<queue physicalName="TEST.QUEUE1"/>
|
||||
<queue physicalName="TEST.QUEUE2"/>
|
||||
<queue physicalName="TEST.QUEUE3"/>
|
||||
</destinations>
|
||||
</broker>
|
||||
|
||||
</beans>
|
||||
<!-- END SNIPPET: xbean -->
|
Loading…
Reference in New Issue