We now optimize out persisting persistent messages to a topic if it does not have any durable consumers.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@366274 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-01-05 20:39:41 +00:00
parent 013f37294e
commit 04968b6811
4 changed files with 172 additions and 28 deletions

View File

@ -120,7 +120,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
recovered=true;
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
topic.recover(this, false);
topic.recover(context, this, false);
}
} else {
if( !isFull() ) {

View File

@ -21,9 +21,9 @@ import java.io.IOException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQDestination;
@ -43,6 +43,7 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* The Topic is a destination that sends a copy of a message to every active
@ -60,10 +61,11 @@ public class Topic implements Destination {
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedCountSubscriptionRecoveryPolicy();
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private AtomicInteger durableSubscriberCounter = new AtomicInteger();
public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) {
@ -90,19 +92,54 @@ public class Topic implements Destination {
destinationStatistics.getConsumers().increment();
sub.add(context, this);
if (sub.getConsumerInfo().isDurable()) {
recover((DurableTopicSubscription) sub, true);
recover(context, (DurableTopicSubscription) sub, true);
}
else {
if (sub.getConsumerInfo().isRetroactive()) {
recover(context, sub);
}
}
/**
* Used to recover the message list non durable subscriptions. Recovery only happens if the consumer is
* retroactive.
*
* @param context
* @param sub
* @throws Throwable
*/
private void recover(ConnectionContext context, final Subscription sub) throws Throwable {
if (sub.getConsumerInfo().isRetroactive()) {
// synchronize with dispatch method so that no new messages are sent
// while we are recovering a subscription to avoid out of order messages.
dispatchValve.turnOff();
try {
synchronized(consumers) {
consumers.add(sub);
}
subscriptionRecoveryPolicy.recover(context, this, sub);
} finally {
dispatchValve.turnOn();
}
} else {
synchronized(consumers) {
consumers.add(sub);
}
}
}
public void recover(final DurableTopicSubscription sub, boolean initialActivation) throws Throwable {
/**
* Used to recover the message list for a durable subscription.
*
* @param context
* @param sub
* @param initialActivation
* @throws Throwable
*/
public void recover(ConnectionContext context, final DurableTopicSubscription sub, boolean initialActivation) throws Throwable {
// synchronize with dispatch method so that no new messages are sent
// while
@ -110,9 +147,11 @@ public class Topic implements Destination {
dispatchValve.turnOff();
try {
boolean persistenceWasOptimized = canOptimizeOutPersistence();
if (initialActivation) {
synchronized(consumers) {
synchronized(consumers) {
consumers.add(sub);
durableSubscriberCounter.incrementAndGet();
}
}
@ -160,6 +199,16 @@ public class Topic implements Destination {
throw new RuntimeException("Should not be called.");
}
});
if( initialActivation && sub.getConsumerInfo().isRetroactive() ) {
// Then use the subscriptionRecoveryPolicy since there will not be any messages in the persistent store.
if( persistenceWasOptimized ) {
subscriptionRecoveryPolicy.recover(context, this, sub);
} else {
// TODO: implement something like
// subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub);
}
}
}
}
@ -173,6 +222,9 @@ public class Topic implements Destination {
destinationStatistics.getConsumers().decrement();
synchronized(consumers) {
consumers.remove(sub);
if( sub.getConsumerInfo().isDurable() ) {
durableSubscriberCounter.decrementAndGet();
}
}
sub.remove(context, this);
}
@ -184,7 +236,7 @@ public class Topic implements Destination {
message.setRegionDestination(this);
if (store != null && message.isPersistent())
if (store != null && message.isPersistent() && !canOptimizeOutPersistence() )
store.addMessage(context, message);
message.incrementReferenceCount();
@ -209,6 +261,10 @@ public class Topic implements Destination {
}
private boolean canOptimizeOutPersistence() {
return durableSubscriberCounter.get()==0;
}
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
if (store != null) {
store.deleteSubscription(key.clientId, key.subscriptionName);

View File

@ -0,0 +1,96 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.filter.MessageEvaluationContext;
/**
* This implementation of {@link SubscriptionRecoveryPolicy} will only keep
* the last message.
*
* @org.xbean.XBean
*
* @version $Revision$
*/
public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
volatile private MessageReference messages[];
private int maximumSize=100;
private int tail=0;
synchronized public boolean add(ConnectionContext context, MessageReference node) throws Throwable {
messages[tail++] = node;
if( tail >= messages.length )
tail = 0;
return true;
}
synchronized public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable {
// Re-dispatch the last message seen.
int t = tail;
// The buffer may not have rolled over yet..., start from the front
if( messages[t]==null )
t=0;
// Well the buffer is really empty then.
if( messages[t]==null )
return;
// Keep dispatching until t hit's tail again.
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
do {
MessageReference node = messages[t];
try {
msgContext.setDestination(node.getRegionDestination().getActiveMQDestination());
msgContext.setMessageReference(node);
if (sub.matches(node, msgContext)) {
sub.add(node);
}
} finally {
msgContext.clear();
}
t++;
if( t >= messages.length )
t = 0;
} while( t!=tail );
}
public void start() throws Exception {
messages = new MessageReference[maximumSize];
}
public void stop() throws Exception {
messages = null;
}
public int getMaximumSize() {
return maximumSize;
}
/**
* Sets the maximum number of messages that this destination will hold around in RAM
*/
public void setMaximumSize(int maximumSize) {
this.maximumSize = maximumSize;
}
}

View File

@ -518,27 +518,19 @@ public class BrokerTest extends BrokerTestSupport {
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
// the behavior is VERY dependent on the recovery policy used.
// But the default broker settings try to make it as consistent as possible
if( deliveryMode == DeliveryMode.NON_PERSISTENT && durableConsumer ) {
// Durable subs don't keep non persistent messages around!
for( int i=0; i < 2 ; i++ ) {
Message m2 = receiveMessage(connection1);
assertNotNull(m2);
}
} else {
// Subscription should see all messages sent.
Message m2 = receiveMessage(connection1);
// Subscription should see all messages sent.
Message m2 = receiveMessage(connection1);
assertNotNull(m2);
assertEquals(m.getMessageId(), m2.getMessageId());
for( int i=0; i < 2 ; i++ ) {
m2 = receiveMessage(connection1);
assertNotNull(m2);
assertEquals(m.getMessageId(), m2.getMessageId());
for( int i=0; i < 2 ; i++ ) {
m2 = receiveMessage(connection1);
assertNotNull(m2);
}
}
assertNoMessagesLeft(connection1);
}