Hiram R. Chirino 2006-06-12 15:08:33 +00:00
parent 855076cee3
commit c1a0cd6875
8 changed files with 42 additions and 2 deletions

View File

@ -34,6 +34,12 @@ public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecover
volatile private MessageReference messages[]; volatile private MessageReference messages[];
private int maximumSize=100; private int maximumSize=100;
private int tail=0; private int tail=0;
public SubscriptionRecoveryPolicy copy() {
FixedCountSubscriptionRecoveryPolicy rc = new FixedCountSubscriptionRecoveryPolicy();
rc.setMaximumSize(maximumSize);
return rc;
}
synchronized public boolean add(ConnectionContext context,MessageReference node) throws Exception{ synchronized public boolean add(ConnectionContext context,MessageReference node) throws Exception{
messages[tail++]=node; messages[tail++]=node;
@ -109,4 +115,5 @@ public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecover
} }
return (Message[]) result.toArray(new Message[result.size()]); return (Message[]) result.toArray(new Message[result.size()]);
} }
} }

View File

@ -48,6 +48,13 @@ public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecover
private int maximumSize = 100 * 64 * 1024; private int maximumSize = 100 * 64 * 1024;
private boolean useSharedBuffer = true; private boolean useSharedBuffer = true;
public SubscriptionRecoveryPolicy copy() {
FixedSizedSubscriptionRecoveryPolicy rc = new FixedSizedSubscriptionRecoveryPolicy();
rc.setMaximumSize(maximumSize);
rc.setUseSharedBuffer(useSharedBuffer);
return rc;
}
public boolean add(ConnectionContext context, MessageReference message) throws Exception { public boolean add(ConnectionContext context, MessageReference message) throws Exception {
buffer.add(message); buffer.add(message);
return true; return true;
@ -125,4 +132,5 @@ public class FixedSizedSubscriptionRecoveryPolicy implements SubscriptionRecover
return new DestinationBasedMessageList(maximumSize); return new DestinationBasedMessageList(maximumSize);
} }
} }
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
@ -76,4 +77,8 @@ public class LastImageSubscriptionRecoveryPolicy implements SubscriptionRecovery
return (Message[])result.toArray(new Message[result.size()]); return (Message[])result.toArray(new Message[result.size()]);
} }
public SubscriptionRecoveryPolicy copy() {
return new LastImageSubscriptionRecoveryPolicy();
}
} }

View File

@ -32,6 +32,12 @@ import org.apache.activemq.command.Message;
*/ */
public class NoSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { public class NoSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
public SubscriptionRecoveryPolicy copy() {
// This object is immutable
return this;
}
public boolean add(ConnectionContext context, MessageReference node) throws Exception { public boolean add(ConnectionContext context, MessageReference node) throws Exception {
return true; return true;
} }

View File

@ -65,7 +65,7 @@ public class PolicyEntry extends DestinationMapEntry {
topic.setDeadLetterStrategy(deadLetterStrategy); topic.setDeadLetterStrategy(deadLetterStrategy);
} }
if (subscriptionRecoveryPolicy != null) { if (subscriptionRecoveryPolicy != null) {
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy); topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
} }
topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers); topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
if( memoryLimit>0 ) { if( memoryLimit>0 ) {

View File

@ -55,7 +55,10 @@ public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecover
private IdGenerator idGenerator = new IdGenerator(); private IdGenerator idGenerator = new IdGenerator();
private ProducerId producerId = createProducerId(); private ProducerId producerId = createProducerId();
public QueryBasedSubscriptionRecoveryPolicy() { public SubscriptionRecoveryPolicy copy() {
QueryBasedSubscriptionRecoveryPolicy rc = new QueryBasedSubscriptionRecoveryPolicy();
rc.setQuery(query);
return rc;
} }
public boolean add(ConnectionContext context, MessageReference message) throws Exception { public boolean add(ConnectionContext context, MessageReference message) throws Exception {

View File

@ -64,4 +64,8 @@ public interface SubscriptionRecoveryPolicy extends Service {
*/ */
Message[] browse(ActiveMQDestination dest) throws Exception; Message[] browse(ActiveMQDestination dest) throws Exception;
/**
* Used to copy the policy object.
*/
SubscriptionRecoveryPolicy copy();
} }

View File

@ -21,6 +21,7 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
@ -67,6 +68,12 @@ public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPoli
} }
}; };
public SubscriptionRecoveryPolicy copy() {
TimedSubscriptionRecoveryPolicy rc = new TimedSubscriptionRecoveryPolicy();
rc.setRecoverDuration(recoverDuration);
return rc;
}
public boolean add(ConnectionContext context, MessageReference message) throws Exception { public boolean add(ConnectionContext context, MessageReference message) throws Exception {
buffer.add(new TimestampWrapper(message, lastGCRun)); buffer.add(new TimestampWrapper(message, lastGCRun));
return true; return true;