From e2c83d6fb6355dcf65821268cd1843ff603dc5f9 Mon Sep 17 00:00:00 2001 From: David Jencks Date: Tue, 6 Jan 2009 23:48:44 +0000 Subject: [PATCH] Remove some dangerous method calls from the constructor (that did nothing), make some fields final, remove duplicate start tracking, and finish generic conversion git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@732182 13f79535-47bb-0310-9956-ffa450edef68 --- .../cursors/StoreDurableSubscriberCursor.java | 71 ++++++++----------- 1 file changed, 29 insertions(+), 42 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index c84d826da6..d972aec04e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -16,13 +16,13 @@ */ package org.apache.activemq.broker.region.cursors; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; + import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; @@ -44,22 +44,19 @@ import org.apache.commons.logging.LogFactory; public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class); - private String clientId; - private String subscriberName; - private Map topics = new HashMap(); - private List storePrefetches = new CopyOnWriteArrayList(); - private boolean started; - private PendingMessageCursor nonPersistent; + private final String clientId; + private final String subscriberName; + private final Map topics = new HashMap(); + private final List storePrefetches = new CopyOnWriteArrayList(); + private final PendingMessageCursor nonPersistent; private PendingMessageCursor currentCursor; - private Subscription subscription; + private final Subscription subscription; /** - * @param broker - * @param topic - * @param clientId - * @param subscriberName - * @param maxBatchSize - * @param subscription - * @throws IOException + * @param broker Broker for this cursor + * @param clientId clientId for this cursor + * @param subscriberName subscriber name for this cursor + * @param maxBatchSize currently ignored + * @param subscription subscription for this cursor */ public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) { this.subscription=subscription; @@ -70,17 +67,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { }else { this.nonPersistent = new VMPendingMessageCursor(); } - this.nonPersistent.setMaxBatchSize(getMaxBatchSize()); + //TODO is this correct? we are ignoring the constructor parameter matchBatchSize +// this.nonPersistent.setMaxBatchSize(getMaxBatchSize()); this.nonPersistent.setSystemUsage(systemUsage); - this.nonPersistent.setEnableAudit(isEnableAudit()); - this.nonPersistent.setMaxAuditDepth(getMaxAuditDepth()); - this.nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit()); this.storePrefetches.add(this.nonPersistent); } public synchronized void start() throws Exception { - if (!started) { - started = true; + if (!isStarted()) { super.start(); for (PendingMessageCursor tsp : storePrefetches) { tsp.setMessageAudit(getMessageAudit()); @@ -90,8 +84,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } public synchronized void stop() throws Exception { - if (started) { - started = false; + if (isStarted()) { super.stop(); for (PendingMessageCursor tsp : storePrefetches) { tsp.stop(); @@ -116,7 +109,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { tsp.setMaxProducersToAudit(getMaxProducersToAudit()); topics.put(destination, tsp); storePrefetches.add(tsp); - if (started) { + if (isStarted()) { tsp.start(); } } @@ -130,7 +123,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { * @throws Exception */ public synchronized List remove(ConnectionContext context, Destination destination) throws Exception { - Object tsp = topics.remove(destination); + PendingMessageCursor tsp = topics.remove(destination); if (tsp != null) { storePrefetches.remove(tsp); } @@ -161,7 +154,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { * Informs the Broker if the subscription needs to intervention to recover * it's state e.g. DurableTopicSubscriber may do * - * @see org.apache.activemq.region.cursors.PendingMessageCursor + * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor * @return true if recovery required */ public boolean isRecoveryRequired() { @@ -171,7 +164,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { public synchronized void addMessageLast(MessageReference node) throws Exception { if (node != null) { Message msg = node.getMessage(); - if (started) { + if (isStarted()) { if (!msg.isPersistent()) { nonPersistent.addMessageLast(node); } @@ -228,16 +221,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } public synchronized void reset() { - for (Iterator i = storePrefetches.iterator(); i.hasNext();) { - AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next(); - tsp.reset(); + for (PendingMessageCursor storePrefetch : storePrefetches) { + storePrefetch.reset(); } } public synchronized void release() { - for (Iterator i = storePrefetches.iterator(); i.hasNext();) { - AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next(); - tsp.release(); + for (PendingMessageCursor storePrefetch : storePrefetches) { + storePrefetch.release(); } } @@ -250,24 +241,21 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } public void setMaxBatchSize(int maxBatchSize) { - for (Iterator i = storePrefetches.iterator(); i.hasNext();) { - AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next(); - tsp.setMaxBatchSize(maxBatchSize); + for (PendingMessageCursor storePrefetch : storePrefetches) { + storePrefetch.setMaxBatchSize(maxBatchSize); } super.setMaxBatchSize(maxBatchSize); } public synchronized void gc() { - for (Iterator i = storePrefetches.iterator(); i.hasNext();) { - PendingMessageCursor tsp = i.next(); + for (PendingMessageCursor tsp : storePrefetches) { tsp.gc(); } } public void setSystemUsage(SystemUsage usageManager) { super.setSystemUsage(usageManager); - for (Iterator i = storePrefetches.iterator(); i.hasNext();) { - PendingMessageCursor tsp = i.next(); + for (PendingMessageCursor tsp : storePrefetches) { tsp.setSystemUsage(usageManager); } } @@ -303,8 +291,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { protected synchronized PendingMessageCursor getNextCursor() throws Exception { if (currentCursor == null || currentCursor.isEmpty()) { currentCursor = null; - for (Iterator i = storePrefetches.iterator(); i.hasNext();) { - AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next(); + for (PendingMessageCursor tsp : storePrefetches) { if (tsp.hasNext()) { currentCursor = tsp; break;