mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@630253 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b34f730231
commit
230255a479
|
@ -85,7 +85,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
|
private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
|
||||||
private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
|
private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
|
||||||
private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
|
private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
|
||||||
private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
|
private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
|
||||||
private static final boolean BROKEN_FILE_LOCK;
|
private static final boolean BROKEN_FILE_LOCK;
|
||||||
private static final boolean DISABLE_LOCKING;
|
private static final boolean DISABLE_LOCKING;
|
||||||
|
@ -266,14 +266,14 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
Scheduler.cancel(periodicCheckpointTask);
|
Scheduler.cancel(periodicCheckpointTask);
|
||||||
Scheduler.cancel(periodicCleanupTask);
|
Scheduler.cancel(periodicCleanupTask);
|
||||||
}
|
}
|
||||||
Iterator<AMQMessageStore> iterator = queues.values().iterator();
|
Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
|
||||||
while (iterator.hasNext()) {
|
while (queueIterator.hasNext()) {
|
||||||
AMQMessageStore ms = iterator.next();
|
AMQMessageStore ms = queueIterator.next();
|
||||||
ms.stop();
|
ms.stop();
|
||||||
}
|
}
|
||||||
iterator = topics.values().iterator();
|
Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
|
||||||
while (iterator.hasNext()) {
|
while (topicIterator.hasNext()) {
|
||||||
final AMQTopicMessageStore ms = (AMQTopicMessageStore)iterator.next();
|
final AMQTopicMessageStore ms = topicIterator.next();
|
||||||
ms.stop();
|
ms.stop();
|
||||||
}
|
}
|
||||||
// Take one final checkpoint and stop checkpoint processing.
|
// Take one final checkpoint and stop checkpoint processing.
|
||||||
|
@ -344,17 +344,17 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
}
|
}
|
||||||
|
|
||||||
Location newMark = null;
|
Location newMark = null;
|
||||||
Iterator<AMQMessageStore> iterator = queues.values().iterator();
|
Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
|
||||||
while (iterator.hasNext()) {
|
while (queueIterator.hasNext()) {
|
||||||
final AMQMessageStore ms = iterator.next();
|
final AMQMessageStore ms = queueIterator.next();
|
||||||
Location mark = (Location)ms.getMark();
|
Location mark = (Location)ms.getMark();
|
||||||
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
|
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
|
||||||
newMark = mark;
|
newMark = mark;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
iterator = topics.values().iterator();
|
Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
|
||||||
while (iterator.hasNext()) {
|
while (topicIterator.hasNext()) {
|
||||||
final AMQTopicMessageStore ms = (AMQTopicMessageStore)iterator.next();
|
final AMQTopicMessageStore ms = topicIterator.next();
|
||||||
Location mark = (Location)ms.getMark();
|
Location mark = (Location)ms.getMark();
|
||||||
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
|
if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
|
||||||
newMark = mark;
|
newMark = mark;
|
||||||
|
@ -430,7 +430,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
}
|
}
|
||||||
|
|
||||||
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
|
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
|
||||||
AMQTopicMessageStore store = (AMQTopicMessageStore)topics.get(destinationName.getPhysicalName());
|
AMQTopicMessageStore store = topics.get(destinationName);
|
||||||
if (store == null) {
|
if (store == null) {
|
||||||
TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
|
TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
|
||||||
store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
|
store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
|
||||||
|
|
Loading…
Reference in New Issue