diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index fd305669d4..9927aaf57e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -609,6 +609,9 @@ public abstract class BaseDestination implements Destination { public void setPrioritizedMessages(boolean prioritizedMessages) { this.prioritizedMessages = prioritizedMessages; + if (store != null) { + store.setPrioritizedMessages(prioritizedMessages); + } } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java index cbdeb8a129..45b6ffbd48 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java @@ -30,6 +30,7 @@ import org.apache.activemq.usage.MemoryUsage; abstract public class AbstractMessageStore implements MessageStore { public static final FutureTask FUTURE; protected final ActiveMQDestination destination; + protected boolean prioritizedMessages; public AbstractMessageStore(ActiveMQDestination destination) { this.destination = destination; @@ -63,6 +64,14 @@ abstract public class AbstractMessageStore implements MessageStore { public boolean isEmpty() throws Exception { return getMessageCount() == 0; } + + public void setPrioritizedMessages(boolean prioritizedMessages) { + this.prioritizedMessages = prioritizedMessages; + } + + public boolean isPrioritizedMessages() { + return this.prioritizedMessages; + } public Future asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException { addMessage(context, message); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java index 3fbbd1f180..66fa5d6ed5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java @@ -148,4 +148,16 @@ public interface MessageStore extends Service { */ boolean isEmpty() throws Exception; + /** + * A hint to the store to try recover messages according to priority + * @param prioritizedMessages + */ + public void setPrioritizedMessages(boolean prioritizedMessages); + + /** + * + * @return true if store is trying to recover messages according to priority + */ + public boolean isPrioritizedMessages(); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index 729396bdb3..10d4723364 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -113,4 +113,12 @@ public class ProxyMessageStore implements MessageStore { public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { delegate.removeAsyncMessage(context, ack); } + + public void setPrioritizedMessages(boolean prioritizedMessages) { + delegate.setPrioritizedMessages(prioritizedMessages); + } + + public boolean isPrioritizedMessages() { + return delegate.isPrioritizedMessages(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index 7ea7880c9e..68831309e8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -153,4 +153,12 @@ public class ProxyTopicMessageStore implements TopicMessageStore { public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { delegate.removeAsyncMessage(context, ack); } + + public void setPrioritizedMessages(boolean prioritizedMessages) { + delegate.setPrioritizedMessages(prioritizedMessages); + } + + public boolean isPrioritizedMessages() { + return delegate.isPrioritizedMessages(); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java new file mode 100644 index 0000000000..735a47d047 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; + +abstract public class MessagePriorityTest extends TestCase { + + BrokerService broker; + PersistenceAdapter adapter; + + ActiveMQConnectionFactory factory; + Connection conn; + Session sess; + + abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception; + + protected void setUp() throws Exception { + broker = new BrokerService(); + broker.setBrokerName("priorityTest"); + adapter = createPersistenceAdapter(true); + broker.setPersistenceAdapter(adapter); + PolicyEntry policy = new PolicyEntry(); + policy.setPrioritizedMessages(true); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policy); + broker.setDestinationPolicy(policyMap); + broker.start(); + broker.waitUntilStarted(); + + factory = new ActiveMQConnectionFactory("vm://priorityTest"); + conn = factory.createConnection(); + sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + protected void tearDown() throws Exception { + sess.close(); + conn.close(); + + broker.stop(); + broker.waitUntilStopped(); + } + + public void testStoreConfigured() throws Exception { + Queue queue = sess.createQueue("TEST"); + Topic topic = sess.createTopic("TEST"); + + MessageProducer queueProducer = sess.createProducer(queue); + MessageProducer topicProducer = sess.createProducer(topic); + + + Thread.sleep(100); // get it all propagated + + assertTrue(broker.getRegionBroker().getDestinationMap().get(queue).getMessageStore().isPrioritizedMessages()); + assertTrue(broker.getRegionBroker().getDestinationMap().get(topic).getMessageStore().isPrioritizedMessages()); + + queueProducer.close(); + topicProducer.close(); + + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java new file mode 100644 index 0000000000..6dceb760f5 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.store.jdbc; + +import org.apache.activemq.store.MessagePriorityTest; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.derby.jdbc.EmbeddedDataSource; + +public class JDBCMessagePriorityTest extends MessagePriorityTest { + + @Override + protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception { + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + EmbeddedDataSource dataSource = new EmbeddedDataSource(); + dataSource.setDatabaseName("derbyDb"); + dataSource.setCreateDatabase("create"); + jdbc.setDataSource(dataSource); + return jdbc; + } + +}