mirror of https://github.com/apache/activemq.git
Prevent usage of stale cache data. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1300723 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f5b69f8783
commit
0bbb73529d
|
@ -17,18 +17,19 @@
|
||||||
package org.apache.kahadb.index;
|
package org.apache.kahadb.index;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.ref.WeakReference;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.kahadb.index.ListNode.ListIterator;
|
import org.apache.kahadb.index.ListNode.ListIterator;
|
||||||
import org.apache.kahadb.page.Page;
|
import org.apache.kahadb.page.Page;
|
||||||
import org.apache.kahadb.page.PageFile;
|
import org.apache.kahadb.page.PageFile;
|
||||||
import org.apache.kahadb.page.Transaction;
|
import org.apache.kahadb.page.Transaction;
|
||||||
import org.apache.kahadb.util.Marshaller;
|
import org.apache.kahadb.util.Marshaller;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
public class ListIndex<Key,Value> implements Index<Key,Value> {
|
public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
|
|
||||||
|
@ -120,6 +121,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
|
|
||||||
private ListNode<Key, Value> lastGetNodeCache = null;
|
private ListNode<Key, Value> lastGetNodeCache = null;
|
||||||
private Map.Entry<Key, Value> lastGetEntryCache = null;
|
private Map.Entry<Key, Value> lastGetEntryCache = null;
|
||||||
|
private WeakReference<Transaction> lastCacheTxSrc = new WeakReference<Transaction>(null);
|
||||||
|
|
||||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||||
synchronized public Value get(Transaction tx, Key key) throws IOException {
|
synchronized public Value get(Transaction tx, Key key) throws IOException {
|
||||||
|
@ -129,6 +131,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
if (key.equals(candidate.getKey())) {
|
if (key.equals(candidate.getKey())) {
|
||||||
this.lastGetNodeCache = ((ListIterator) iterator).getCurrent();
|
this.lastGetNodeCache = ((ListIterator) iterator).getCurrent();
|
||||||
this.lastGetEntryCache = candidate;
|
this.lastGetEntryCache = candidate;
|
||||||
|
this.lastCacheTxSrc = new WeakReference<Transaction>(tx);
|
||||||
return candidate.getValue();
|
return candidate.getValue();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -146,7 +149,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
|
|
||||||
Value oldValue = null;
|
Value oldValue = null;
|
||||||
|
|
||||||
if (lastGetNodeCache != null) {
|
if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
|
||||||
|
|
||||||
if(lastGetEntryCache.getKey().equals(key)) {
|
if(lastGetEntryCache.getKey().equals(key)) {
|
||||||
oldValue = lastGetEntryCache.setValue(value);
|
oldValue = lastGetEntryCache.setValue(value);
|
||||||
|
@ -166,6 +169,8 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
return oldValue;
|
return oldValue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
flushCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not found because the cache wasn't set or its not at the end of the list so we
|
// Not found because the cache wasn't set or its not at the end of the list so we
|
||||||
|
@ -207,7 +212,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lastGetNodeCache != null) {
|
if (lastGetNodeCache != null && tx.equals(lastCacheTxSrc.get())) {
|
||||||
|
|
||||||
// This searches from the last location of a call to get for the element to remove
|
// This searches from the last location of a call to get for the element to remove
|
||||||
// all the way to the end of the ListIndex.
|
// all the way to the end of the ListIndex.
|
||||||
|
@ -216,9 +221,12 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
Map.Entry<Key, Value> entry = iterator.next();
|
Map.Entry<Key, Value> entry = iterator.next();
|
||||||
if (entry.getKey().equals(key)) {
|
if (entry.getKey().equals(key)) {
|
||||||
iterator.remove();
|
iterator.remove();
|
||||||
|
flushCache();
|
||||||
return entry.getValue();
|
return entry.getValue();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
flushCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not found because the cache wasn't set or its not at the end of the list so we
|
// Not found because the cache wasn't set or its not at the end of the list so we
|
||||||
|
@ -229,6 +237,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
Map.Entry<Key, Value> entry = iterator.next();
|
Map.Entry<Key, Value> entry = iterator.next();
|
||||||
if (entry.getKey().equals(key)) {
|
if (entry.getKey().equals(key)) {
|
||||||
iterator.remove();
|
iterator.remove();
|
||||||
|
flushCache();
|
||||||
return entry.getValue();
|
return entry.getValue();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -238,6 +247,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
|
|
||||||
public void onRemove() {
|
public void onRemove() {
|
||||||
size.decrementAndGet();
|
size.decrementAndGet();
|
||||||
|
flushCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isTransient() {
|
public boolean isTransient() {
|
||||||
|
@ -251,6 +261,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
// break up the transaction
|
// break up the transaction
|
||||||
tx.commit();
|
tx.commit();
|
||||||
}
|
}
|
||||||
|
flushCache();
|
||||||
size.set(0);
|
size.set(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,10 +297,14 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
|
|
||||||
ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
|
ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
|
||||||
Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
|
Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
|
||||||
ListNode<Key, Value> node = page.get();
|
try {
|
||||||
node.setPage(page);
|
ListNode<Key, Value> node = page.get();
|
||||||
node.setContainingList(this);
|
node.setPage(page);
|
||||||
return node;
|
node.setContainingList(this);
|
||||||
|
return node;
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
|
ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
|
||||||
|
@ -354,5 +369,6 @@ public class ListIndex<Key,Value> implements Index<Key,Value> {
|
||||||
private void flushCache() {
|
private void flushCache() {
|
||||||
this.lastGetEntryCache = null;
|
this.lastGetEntryCache = null;
|
||||||
this.lastGetNodeCache = null;
|
this.lastGetNodeCache = null;
|
||||||
|
this.lastCacheTxSrc.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue