git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@934961 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-04-16 15:27:54 +00:00
parent db9533b631
commit 789789f06b
6 changed files with 285 additions and 6 deletions

View File

@ -20,6 +20,7 @@ import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageDispatchNotification;
@ -46,6 +47,8 @@ public class TempQueueRegion extends AbstractTempRegion {
protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
TempQueue result = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
brokerService.getDestinationPolicy();
configureQueue(result, destination);
result.initialize();
return result;
}
@ -85,4 +88,16 @@ public class TempQueueRegion extends AbstractTempRegion {
processDispatchNotificationViaDestination(messageDispatchNotification);
}
protected void configureQueue(Queue queue, ActiveMQDestination destination) {
if (broker == null) {
throw new IllegalStateException("broker property is not set");
}
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(broker,queue);
}
}
}
}

View File

@ -79,6 +79,14 @@ public class DestinationDotFileInterceptor extends DotFileInterceptorSupport {
writer.println("}");
writer.println();
writer.println("subgraph temp queues {");
writer.println(" node [fillcolor=red]; ");
writer.println(" label = \"TempQueues\"");
writer.println();
printNodeLinks(writer, map.getTempQueueRootNode(), "tempqueue");
writer.println("}");
writer.println();
writer.println("subgraph topics {");
writer.println(" node [fillcolor=green]; ");
writer.println(" label = \"Topics\"");
@ -87,12 +95,26 @@ public class DestinationDotFileInterceptor extends DotFileInterceptorSupport {
writer.println("}");
writer.println();
writer.println("subgraph temp topics {");
writer.println(" node [fillcolor=green]; ");
writer.println(" label = \"TempTopics\"");
writer.println();
printNodeLinks(writer, map.getTempTopicRootNode(), "temptopic");
writer.println("}");
writer.println();
printNodes(writer, map.getQueueRootNode(), "queue");
writer.println();
printNodes(writer, map.getTempQueueRootNode(), "tempqueue");
writer.println();
printNodes(writer, map.getTopicRootNode(), "topic");
writer.println();
printNodes(writer, map.getTempTopicRootNode(), "temptopic");
writer.println();
writer.println("}");
}

View File

@ -43,7 +43,9 @@ public class DestinationMap {
protected static final String ANY_CHILD = DestinationFilter.ANY_CHILD;
private DestinationMapNode queueRootNode = new DestinationMapNode(null);
private DestinationMapNode tempQueueRootNode = new DestinationMapNode(null);
private DestinationMapNode topicRootNode = new DestinationMapNode(null);
private DestinationMapNode tempTopicRootNode = new DestinationMapNode(null);
/**
* Looks up the value(s) matching the given Destination key. For simple
@ -119,6 +121,14 @@ public class DestinationMap {
return topicRootNode;
}
public DestinationMapNode getTempQueueRootNode() {
return tempQueueRootNode;
}
public DestinationMapNode getTempTopicRootNode() {
return tempTopicRootNode;
}
// Implementation methods
// -------------------------------------------------------------------------
@ -195,10 +205,18 @@ public class DestinationMap {
* Returns the root node for the given destination type
*/
protected DestinationMapNode getRootNode(ActiveMQDestination key) {
if (key.isQueue()) {
return queueRootNode;
if (key.isTemporary()){
if (key.isQueue()) {
return tempQueueRootNode;
} else {
return tempTopicRootNode;
}
} else {
return topicRootNode;
if (key.isQueue()) {
return queueRootNode;
} else {
return topicRootNode;
}
}
}
}

View File

@ -17,9 +17,8 @@
package org.apache.activemq.filter;
import javax.annotation.PostConstruct;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.*;
/**
* A base class for entry objects used to construct a destination based policy
@ -57,6 +56,14 @@ public abstract class DestinationMapEntry implements Comparable {
setDestination(new ActiveMQTopic(name));
}
public void setTempTopic(boolean flag){
setDestination(new ActiveMQTempTopic(">"));
}
public void setTempQueue(boolean flag){
setDestination(new ActiveMQTempQueue(">"));
}
public ActiveMQDestination getDestination() {
return destination;
}

View File

@ -0,0 +1,214 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.advisory;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import javax.jms.*;
import java.util.ArrayList;
import java.util.List;
public class AdvisoryTempDestinationTests extends TestCase {
protected static final int MESSAGE_COUNT = 2000;
protected BrokerService broker;
protected Connection connection;
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected int topicCount;
public void testNoSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue();
MessageConsumer consumer = s.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
}
});
Topic advisoryTopic = AdvisorySupport
.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNull(msg);
}
public void testSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue();
MessageConsumer consumer = s.createConsumer(queue);
Topic advisoryTopic = AdvisorySupport
.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
}
public void testMessageDeliveryAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue();
MessageConsumer consumer = s.createConsumer(queue);
Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
}
public void testTempMessageConsumedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue();
MessageConsumer consumer = s.createConsumer(queue);
Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
String id = m.getJMSMessageID();
Message msg = consumer.receive(1000);
assertNotNull(msg);
msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
String originalId = payload.getJMSMessageID();
assertEquals(originalId, id);
}
public void testMessageExpiredAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
producer.setTimeToLive(1);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(2000);
assertNotNull(msg);
}
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
}
ConnectionFactory factory = createConnectionFactory();
connection = factory.createConnection();
connection.start();
super.setUp();
}
protected void tearDown() throws Exception {
super.tearDown();
connection.close();
if (broker != null) {
broker.stop();
}
}
protected ActiveMQConnectionFactory createConnectionFactory()
throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
return cf;
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
configureBroker(answer);
answer.start();
return answer;
}
protected void configureBroker(BrokerService answer) throws Exception {
answer.setPersistent(false);
ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy();
strategy.setLimit(10);
PolicyEntry tempQueueEntry = createPolicyEntry(strategy);
tempQueueEntry.setTempQueue(true);
PolicyEntry tempTopicEntry = createPolicyEntry(strategy);
tempTopicEntry.setTempTopic(true);
PolicyMap pMap = new PolicyMap();
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
policyEntries.add(tempQueueEntry);
policyEntries.add(tempTopicEntry);
pMap.setPolicyEntries(policyEntries);
answer.setDestinationPolicy(pMap);
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
private PolicyEntry createPolicyEntry(ConstantPendingMessageLimitStrategy strategy) {
PolicyEntry policy = new PolicyEntry();
policy.setAdvisdoryForFastProducers(true);
policy.setAdvisoryForConsumed(true);
policy.setAdvisoryForDelivery(true);
policy.setAdvisoryForDiscardingMessages(true);
policy.setAdvisoryForSlowConsumers(true);
policy.setAdvisoryWhenFull(true);
policy.setProducerFlowControl(false);
policy.setPendingMessageLimitStrategy(strategy);
return policy;
}
}

View File

@ -65,6 +65,9 @@
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry tempTopic="true" advisoryForConsumed="true" />
<policyEntry tempQueue="true" advisoryForConsumed="true" />
</policyEntries>
</policyMap>
</destinationPolicy>