Remove some dangerous method calls from the constructor (that did nothing), make some fields final, remove duplicate start tracking, and finish generic conversion

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@732182 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
David Jencks 2009-01-06 23:48:44 +00:00
parent d8c14e075f
commit e2c83d6fb6
1 changed files with 29 additions and 42 deletions

View File

@ -16,13 +16,13 @@
*/ */
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -44,22 +44,19 @@ import org.apache.commons.logging.LogFactory;
public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class); private static final Log LOG = LogFactory.getLog(StoreDurableSubscriberCursor.class);
private String clientId; private final String clientId;
private String subscriberName; private final String subscriberName;
private Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>(); private final Map<Destination, TopicStorePrefetch> topics = new HashMap<Destination, TopicStorePrefetch>();
private List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>(); private final List<PendingMessageCursor> storePrefetches = new CopyOnWriteArrayList<PendingMessageCursor>();
private boolean started; private final PendingMessageCursor nonPersistent;
private PendingMessageCursor nonPersistent;
private PendingMessageCursor currentCursor; private PendingMessageCursor currentCursor;
private Subscription subscription; private final Subscription subscription;
/** /**
* @param broker * @param broker Broker for this cursor
* @param topic * @param clientId clientId for this cursor
* @param clientId * @param subscriberName subscriber name for this cursor
* @param subscriberName * @param maxBatchSize currently ignored
* @param maxBatchSize * @param subscription subscription for this cursor
* @param subscription
* @throws IOException
*/ */
public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) { public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) {
this.subscription=subscription; this.subscription=subscription;
@ -70,17 +67,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}else { }else {
this.nonPersistent = new VMPendingMessageCursor(); this.nonPersistent = new VMPendingMessageCursor();
} }
this.nonPersistent.setMaxBatchSize(getMaxBatchSize()); //TODO is this correct? we are ignoring the constructor parameter matchBatchSize
// this.nonPersistent.setMaxBatchSize(getMaxBatchSize());
this.nonPersistent.setSystemUsage(systemUsage); this.nonPersistent.setSystemUsage(systemUsage);
this.nonPersistent.setEnableAudit(isEnableAudit());
this.nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
this.nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
this.storePrefetches.add(this.nonPersistent); this.storePrefetches.add(this.nonPersistent);
} }
public synchronized void start() throws Exception { public synchronized void start() throws Exception {
if (!started) { if (!isStarted()) {
started = true;
super.start(); super.start();
for (PendingMessageCursor tsp : storePrefetches) { for (PendingMessageCursor tsp : storePrefetches) {
tsp.setMessageAudit(getMessageAudit()); tsp.setMessageAudit(getMessageAudit());
@ -90,8 +84,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
} }
public synchronized void stop() throws Exception { public synchronized void stop() throws Exception {
if (started) { if (isStarted()) {
started = false;
super.stop(); super.stop();
for (PendingMessageCursor tsp : storePrefetches) { for (PendingMessageCursor tsp : storePrefetches) {
tsp.stop(); tsp.stop();
@ -116,7 +109,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
tsp.setMaxProducersToAudit(getMaxProducersToAudit()); tsp.setMaxProducersToAudit(getMaxProducersToAudit());
topics.put(destination, tsp); topics.put(destination, tsp);
storePrefetches.add(tsp); storePrefetches.add(tsp);
if (started) { if (isStarted()) {
tsp.start(); tsp.start();
} }
} }
@ -130,7 +123,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
* @throws Exception * @throws Exception
*/ */
public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { public synchronized List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
Object tsp = topics.remove(destination); PendingMessageCursor tsp = topics.remove(destination);
if (tsp != null) { if (tsp != null) {
storePrefetches.remove(tsp); storePrefetches.remove(tsp);
} }
@ -161,7 +154,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
* Informs the Broker if the subscription needs to intervention to recover * Informs the Broker if the subscription needs to intervention to recover
* it's state e.g. DurableTopicSubscriber may do * it's state e.g. DurableTopicSubscriber may do
* *
* @see org.apache.activemq.region.cursors.PendingMessageCursor * @see org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
* @return true if recovery required * @return true if recovery required
*/ */
public boolean isRecoveryRequired() { public boolean isRecoveryRequired() {
@ -171,7 +164,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
public synchronized void addMessageLast(MessageReference node) throws Exception { public synchronized void addMessageLast(MessageReference node) throws Exception {
if (node != null) { if (node != null) {
Message msg = node.getMessage(); Message msg = node.getMessage();
if (started) { if (isStarted()) {
if (!msg.isPersistent()) { if (!msg.isPersistent()) {
nonPersistent.addMessageLast(node); nonPersistent.addMessageLast(node);
} }
@ -228,16 +221,14 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
} }
public synchronized void reset() { public synchronized void reset() {
for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();) { for (PendingMessageCursor storePrefetch : storePrefetches) {
AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next(); storePrefetch.reset();
tsp.reset();
} }
} }
public synchronized void release() { public synchronized void release() {
for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();) { for (PendingMessageCursor storePrefetch : storePrefetches) {
AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next(); storePrefetch.release();
tsp.release();
} }
} }
@ -250,24 +241,21 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
} }
public void setMaxBatchSize(int maxBatchSize) { public void setMaxBatchSize(int maxBatchSize) {
for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();) { for (PendingMessageCursor storePrefetch : storePrefetches) {
AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next(); storePrefetch.setMaxBatchSize(maxBatchSize);
tsp.setMaxBatchSize(maxBatchSize);
} }
super.setMaxBatchSize(maxBatchSize); super.setMaxBatchSize(maxBatchSize);
} }
public synchronized void gc() { public synchronized void gc() {
for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();) { for (PendingMessageCursor tsp : storePrefetches) {
PendingMessageCursor tsp = i.next();
tsp.gc(); tsp.gc();
} }
} }
public void setSystemUsage(SystemUsage usageManager) { public void setSystemUsage(SystemUsage usageManager) {
super.setSystemUsage(usageManager); super.setSystemUsage(usageManager);
for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();) { for (PendingMessageCursor tsp : storePrefetches) {
PendingMessageCursor tsp = i.next();
tsp.setSystemUsage(usageManager); tsp.setSystemUsage(usageManager);
} }
} }
@ -303,8 +291,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
protected synchronized PendingMessageCursor getNextCursor() throws Exception { protected synchronized PendingMessageCursor getNextCursor() throws Exception {
if (currentCursor == null || currentCursor.isEmpty()) { if (currentCursor == null || currentCursor.isEmpty()) {
currentCursor = null; currentCursor = null;
for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();) { for (PendingMessageCursor tsp : storePrefetches) {
AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next();
if (tsp.hasNext()) { if (tsp.hasNext()) {
currentCursor = tsp; currentCursor = tsp;
break; break;