diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 8a8add64ab..5c34affde3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -23,13 +23,11 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; - import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.broker.region.QueueMessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java index 8e3084e134..e0e1823e03 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java @@ -17,10 +17,6 @@ package org.apache.activemq.kaha.impl.index.hash; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; /** * Bin in a HashIndex @@ -28,12 +24,13 @@ import org.apache.commons.logging.LogFactory; * @version $Revision: 1.1.1.1 $ */ class HashBin { - private static final transient Log LOG = LogFactory.getLog(HashBin.class); private HashIndex hashIndex; private int id; private int maximumEntries; private int size; - private List hashPages = new ArrayList(); + private int numberOfPages =0; + private HashPageInfo root = null; + private HashPageInfo tail = null; /** * Constructor @@ -62,43 +59,49 @@ class HashBin { } public int hashCode() { - return (int)id; + return (int)getId(); } - int getId() { + int getId() { return id; } - void setId(int id) { + void setId(int id) { this.id = id; } - boolean isEmpty() { + boolean isEmpty() { return true; } - int getMaximumEntries() { + int getMaximumEntries() { return this.maximumEntries; } - void setMaximumEntries(int maximumEntries) { + void setMaximumEntries(int maximumEntries) { this.maximumEntries = maximumEntries; } - int size() { + int size() { return size; } - HashPageInfo addHashPageInfo(long id, int size) throws IOException { + HashPageInfo addHashPageInfo(long id, int size) throws IOException { HashPageInfo info = new HashPageInfo(hashIndex); info.setId(id); info.setSize(size); - hashPages.add(info); + if (root == null) { + root=info; + }else { + tail.linkAfter(info); + } + tail=info; + this.numberOfPages++; this.size += size; return info; } - public HashEntry find(HashEntry key) throws IOException { + public HashEntry find(HashEntry key) throws IOException { HashEntry result = null; try { int low = 0; @@ -122,7 +125,7 @@ class HashBin { return result; } - boolean put(HashEntry newEntry) throws IOException { + boolean put(HashEntry newEntry) throws IOException { boolean replace = false; try { int low = 0; @@ -151,7 +154,7 @@ class HashBin { return replace; } - HashEntry remove(HashEntry entry) throws IOException { + HashEntry remove(HashEntry entry) throws IOException { HashEntry result = null; try { int low = 0; @@ -191,8 +194,10 @@ class HashBin { int count = 0; int countSoFar=0; int pageNo = 0; - for (HashPageInfo page : hashPages) { + HashPageInfo page = root; + while (page != null) { count += page.size(); + pageToUse=page; if (index < count ) { offset = index - countSoFar; break; @@ -203,13 +208,12 @@ class HashBin { } countSoFar += page.size(); pageNo++; + page = (HashPageInfo) page.getNext(); } - while(pageNo >= hashPages.size()) { - HashPage hp = hashIndex.createPage(id); - addHashPageInfo(hp.getId(), 0); - } - pageToUse = hashPages.get(pageNo); - + while(pageNo >= this.numberOfPages) { + HashPage hp = hashIndex.createPage(id); + pageToUse = addHashPageInfo(hp.getId(), 0); + } } pageToUse.begin(); pageToUse.addHashEntry(offset, entry); @@ -222,7 +226,14 @@ class HashBin { HashEntry result = page.removeHashEntry(offset); if (page.isEmpty()) { - hashPages.remove(page); + if (root.equals(page)) { + root=(HashPageInfo) root.getNext(); + } + if (tail.equals(page)) { + tail=(HashPageInfo) page.getPrevious(); + } + page.unlink(); + this.numberOfPages--; hashIndex.releasePage(page.getPage()); } doUnderFlow(index); @@ -239,21 +250,22 @@ class HashBin { private int getMaximumBinSize() { - return maximumEntries * hashPages.size(); + return maximumEntries * this.numberOfPages; } private HashPageInfo getRetrievePage(int index) throws IOException { HashPageInfo result = null; int count = 0; - int pageNo = 0; - for (HashPageInfo page : hashPages) { + HashPageInfo page = root; + while (page != null) { count += page.size(); + result = page; if (index < count) { break; } - pageNo++; + page = (HashPageInfo) page.getNext(); } - result = hashPages.get(pageNo); + result.begin(); return result; } @@ -261,12 +273,14 @@ class HashBin { private int getRetrieveOffset(int index) throws IOException { int result = 0; int count = 0; - for (HashPageInfo page : hashPages) { + HashPageInfo page = root; + while (page != null) { if ((index + 1) <= (count + page.size())) { result = index - count; break; } count += page.size(); + page = (HashPageInfo) page.getNext(); } return result; } @@ -277,43 +291,51 @@ class HashBin { // overflowed info.begin(); HashEntry entry = info.removeHashEntry(info.size() - 1); - doOverFlow(hashPages.indexOf(info)+1, entry); + doOverFlow(getNextPage(info), entry); } } - private void doOverFlow(int pageNo, HashEntry entry) throws IOException { + private void doOverFlow(HashPageInfo next, HashEntry entry) throws IOException { HashPageInfo info = null; - if (pageNo >= hashPages.size()) { + if (next == null) { HashPage page = hashIndex.createPage(id); info = addHashPageInfo(page.getId(), 0); info.setPage(page); } else { - info = hashPages.get(pageNo); + info = next; } info.begin(); info.addHashEntry(0, entry); if (info.size() > maximumEntries) { // overflowed HashEntry overflowed = info.removeHashEntry(info.size() - 1); - doOverFlow(pageNo+1, overflowed); + doOverFlow(getNextPage(info), overflowed); } } + + private HashPageInfo getNextPage(HashPageInfo start) { + return (HashPageInfo) start.getNext(); + } private void doUnderFlow(int index) { } String dump() throws IOException { - String str = "[" + hashPages.size()+"]"; - for (HashPageInfo page : hashPages) { + String str = "[" + this.numberOfPages+"]"; + HashPageInfo page = root; + while (page != null) { page.begin(); str +=page.dump(); page.end(); + page = (HashPageInfo) page.getNext(); } return str; } private void end() throws IOException { - for (HashPageInfo info : hashPages) { - info.end(); + HashPageInfo page = root; + while (page != null) { + page.end(); + page = (HashPageInfo) page.getNext(); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java index 02f7388699..068dc09c5e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java @@ -463,6 +463,12 @@ public class HashIndex implements Index, HashIndexMBean { openIndexFile(); doLoad(); } + + + public String toString() { + String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName(); + return str; + } static int hash(Object x) { diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java index 070a0690d0..80dbe5cab4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java @@ -17,13 +17,14 @@ package org.apache.activemq.kaha.impl.index.hash; import java.io.IOException; +import org.apache.activemq.util.LinkedNode; /** * A Page within a HashPageInfo * * @version $Revision: 1.1.1.1 $ */ -class HashPageInfo { +class HashPageInfo extends LinkedNode{ private HashIndex hashIndex; private long id; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java index a802bfb6c9..e89162111e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java @@ -59,7 +59,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements private static final Log LOG = LogFactory.getLog(KahaReferenceStoreAdapter.class); private static final String STORE_STATE = "store-state"; private static final String INDEX_VERSION_NAME = "INDEX_VERSION"; - private static final Integer INDEX_VERSION = new Integer(5); + private static final Integer INDEX_VERSION = new Integer(6); private static final String RECORD_REFERENCES = "record-references"; private static final String TRANSACTIONS = "transactions-state"; private MapContainer stateMap; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index 5c156f3d47..30bf24a28b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -352,7 +352,9 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic } private String getSubscriptionContainerName(String subscriptionKey) { - StringBuffer buffer = new StringBuffer(subscriptionKey); - return buffer.append(":").append(destination.getQualifiedName()).append(TOPIC_SUB_NAME).toString(); + StringBuffer result = new StringBuffer(TOPIC_SUB_NAME); + result.append(destination.getQualifiedName()); + result.append(subscriptionKey); + return result.toString(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java b/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java index 4d9a6a50a2..591b42d23a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java @@ -133,6 +133,7 @@ public class LinkedNode { public void unlink() { // If we are allready unlinked... if (prev == this) { + reset(); return; } @@ -145,6 +146,10 @@ public class LinkedNode { prev.next = next; // Update our links.. + reset(); + } + + public void reset() { next = this; prev = this; tail = true;