LUCENE-4061: improvements to DirectoryTaxonomyWriter

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1339150 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shai Erera 2012-05-16 13:11:07 +00:00
parent ad7914c5b2
commit b4c22fcf37
6 changed files with 243 additions and 115 deletions

View File

@ -915,6 +915,10 @@ Optimizations
longer accepts empty string match (it did previously, but ignored
it). (Dawid Weiss, Mike McCandless)
* LUCENE-4061: improve synchronization in DirectoryTaxonomyWriter.addCategory
and few general improvements to DirectoryTaxonomyWriter.
(Shai Erera, Gilad Barkai)
Bug fixes
* LUCENE-2803: The FieldCache can miss values if an entry for a reader

View File

@ -27,6 +27,7 @@ import org.apache.lucene.facet.taxonomy.TaxonomyWriter;
import org.apache.lucene.facet.taxonomy.writercache.TaxonomyWriterCache;
import org.apache.lucene.facet.taxonomy.writercache.cl2o.Cl2oTaxonomyWriterCache;
import org.apache.lucene.facet.taxonomy.writercache.lru.LruTaxonomyWriterCache;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocsEnum;
@ -40,7 +41,6 @@ import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
@ -347,18 +347,6 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
closeResources();
}
/**
* Returns the number of memory bytes used by the cache.
* @return Number of cache bytes in memory, for CL2O only; zero otherwise.
*/
public int getCacheMemoryUsage() {
ensureOpen();
if (this.cache == null || !(this.cache instanceof Cl2oTaxonomyWriterCache)) {
return 0;
}
return ((Cl2oTaxonomyWriterCache)this.cache).getMemoryUsage();
}
/**
* A hook for extending classes to close additional resources that were used.
* The default implementation closes the {@link IndexReader} as well as the
@ -413,21 +401,26 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
reader = openReader();
}
// TODO (Facet): avoid Multi*?
Bits liveDocs = MultiFields.getLiveDocs(reader);
DocsEnum docs = MultiFields.getTermDocsEnum(reader, liveDocs, Consts.FULL,
new BytesRef(categoryPath.toString(delimiter)),
false);
if (docs == null || docs.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) {
return -1; // category does not exist in taxonomy
int base = 0;
int doc = -1;
for (AtomicReader r : reader.getSequentialSubReaders()) {
DocsEnum docs = r.termDocsEnum(null, Consts.FULL,
new BytesRef(categoryPath.toString(delimiter)), false);
if (docs != null) {
doc = docs.nextDoc() + base;
break;
}
base += r.maxDoc(); // we don't have deletions, so it's ok to call maxDoc
}
// Note: we do NOT add to the cache the fact that the category
// does not exist. The reason is that our only use for this
// method is just before we actually add this category. If
// in the future this usage changes, we should consider caching
// the fact that the category is not in the taxonomy.
addToCache(categoryPath, docs.docID());
return docs.docID();
if (doc > 0) {
addToCache(categoryPath, doc);
}
return doc;
}
/**
@ -451,37 +444,47 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
if (reader == null) {
reader = openReader();
}
Bits liveDocs = MultiFields.getLiveDocs(reader);
DocsEnum docs = MultiFields.getTermDocsEnum(reader, liveDocs, Consts.FULL,
new BytesRef(categoryPath.toString(delimiter, prefixLen)),
false);
if (docs == null || docs.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) {
return -1; // category does not exist in taxonomy
int base = 0;
int doc = -1;
for (AtomicReader r : reader.getSequentialSubReaders()) {
DocsEnum docs = r.termDocsEnum(null, Consts.FULL,
new BytesRef(categoryPath.toString(delimiter, prefixLen)), false);
if (docs != null) {
doc = docs.nextDoc() + base;
break;
}
addToCache(categoryPath, prefixLen, docs.docID());
return docs.docID();
base += r.maxDoc(); // we don't have deletions, so it's ok to call maxDoc
}
if (doc > 0) {
addToCache(categoryPath, prefixLen, doc);
}
return doc;
}
// TODO (Facet): addCategory() is synchronized. This means that if indexing is
// multi-threaded, a new category that needs to be written to disk (and
// potentially even trigger a lengthy merge) locks out other addCategory()
// calls - even those which could immediately return a cached value.
// We definitely need to fix this situation!
@Override
public synchronized int addCategory(CategoryPath categoryPath) throws IOException {
public int addCategory(CategoryPath categoryPath) throws IOException {
ensureOpen();
// If the category is already in the cache and/or the taxonomy, we
// should return its existing ordinal:
// should return its existing ordinal
int res = findCategory(categoryPath);
if (res < 0) {
// the category is neither in the cache nor in the index - following code
// cannot be executed in parallel.
synchronized (this) {
res = findCategory(categoryPath);
if (res < 0) {
// This is a new category, and we need to insert it into the index
// (and the cache). Actually, we might also need to add some of
// the category's ancestors before we can add the category itself
// (while keeping the invariant that a parent is always added to
// the taxonomy before its child). internalAddCategory() does all
// this recursively:
// this recursively
res = internalAddCategory(categoryPath, categoryPath.length());
}
}
}
return res;
}
@ -496,7 +499,7 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
* recursion.
*/
private int internalAddCategory(CategoryPath categoryPath, int length)
throws CorruptIndexException, IOException {
throws IOException {
// Find our parent's ordinal (recursively adding the parent category
// to the taxonomy if it's not already there). Then add the parent
@ -528,13 +531,12 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
}
}
// Note that the methods calling addCategoryDocument() are synchornized,
// so this method is effectively synchronized as well, but we'll add
// synchronized to be on the safe side, and we can reuse class-local objects
// instead of allocating them every time
protected synchronized int addCategoryDocument(CategoryPath categoryPath,
int length, int parent)
throws CorruptIndexException, IOException {
/**
* Note that the methods calling addCategoryDocument() are synchornized, so
* this method is effectively synchronized as well.
*/
private int addCategoryDocument(CategoryPath categoryPath, int length,
int parent) throws IOException {
// Before Lucene 2.9, position increments >=0 were supported, so we
// added 1 to parent to allow the parent -1 (the parent of the root).
// Unfortunately, starting with Lucene 2.9, after LUCENE-1542, this is
@ -601,8 +603,7 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
}
}
private void addToCache(CategoryPath categoryPath, int id)
throws CorruptIndexException, IOException {
private void addToCache(CategoryPath categoryPath, int id) throws IOException {
if (cache.put(categoryPath, id)) {
// If cache.put() returned true, it means the cache was limited in
// size, became full, so parts of it had to be cleared.
@ -620,7 +621,7 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
}
private void addToCache(CategoryPath categoryPath, int prefixLen, int id)
throws CorruptIndexException, IOException {
throws IOException {
if (cache.put(categoryPath, prefixLen, id)) {
refreshReader();
cacheIsComplete = false;
@ -766,7 +767,29 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
}
CategoryPath cp = new CategoryPath();
Terms terms = MultiFields.getTerms(reader, Consts.FULL);
TermsEnum termsEnum = null;
DocsEnum docsEnum = null;
int base = 0;
for (AtomicReader r : reader.getSequentialSubReaders()) {
Terms terms = r.terms(Consts.FULL);
if (terms != null) { // cannot really happen, but be on the safe side
termsEnum = terms.iterator(termsEnum);
while (termsEnum.next() != null) {
BytesRef t = termsEnum.term();
// Since we guarantee uniqueness of categories, each term has exactly
// one document. Also, since we do not allow removing categories (and
// hence documents), there are no deletions in the index. Therefore, it
// is sufficient to call next(), and then doc(), exactly once with no
// 'validation' checks.
cp.clear();
cp.add(t.utf8ToString(), delimiter);
docsEnum = termsEnum.docs(null, docsEnum, false);
cache.put(cp, docsEnum.nextDoc() + base);
}
}
base += r.maxDoc(); // we don't have any deletions, so we're ok
}
/*Terms terms = MultiFields.getTerms(reader, Consts.FULL);
// The check is done here to avoid checking it on every iteration of the
// below loop. A null term wlil be returned if there are no terms in the
// lexicon, or after the Consts.FULL term. However while the loop is
@ -786,11 +809,10 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
docsEnum = termsEnum.docs(liveDocs, docsEnum, false);
docsEnum.nextDoc();
cp.clear();
// TODO (Facet): avoid String creation/use bytes?
cp.add(t.utf8ToString(), delimiter);
cache.put(cp, docsEnum.docID());
}
}
}*/
cacheIsComplete = true;
// No sense to keep the reader open - we will not need to read from it
@ -832,17 +854,18 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
*/
public void addTaxonomy(Directory taxoDir, OrdinalMap map) throws IOException {
ensureOpen();
IndexReader r = DirectoryReader.open(taxoDir);
DirectoryReader r = DirectoryReader.open(taxoDir);
try {
final int size = r.numDocs();
final OrdinalMap ordinalMap = map;
ordinalMap.setSize(size);
CategoryPath cp = new CategoryPath();
Terms terms = MultiFields.getTerms(r, Consts.FULL);
TermsEnum te = terms.iterator(null);
Bits liveDocs = MultiFields.getLiveDocs(r);
int base = 0;
TermsEnum te = null;
DocsEnum docs = null;
// we call next() first, to skip the root category which always exists.
for (AtomicReader ar : r.getSequentialSubReaders()) {
Terms terms = ar.terms(Consts.FULL);
te = terms.iterator(te);
while (te.next() != null) {
String value = te.term().utf8ToString();
cp.clear();
@ -854,13 +877,11 @@ public class DirectoryTaxonomyWriter implements TaxonomyWriter {
// the findCategory() call above failed to find it.
ordinal = addCategory(cp);
}
docs = te.docs(liveDocs, docs, false);
ordinalMap.addMapping(docs.nextDoc(), ordinal);
docs = te.docs(null, docs, false);
ordinalMap.addMapping(docs.nextDoc() + base, ordinal);
}
base += ar.maxDoc(); // no deletions, so we're ok
}
// we must add the root ordinal map, so that the map will be complete
// (otherwise e.g. DiskOrdinalMap may fail because it expects more
// categories to exist in the file).
ordinalMap.addMapping(0, 0);
ordinalMap.addDone();
} finally {
r.close();

View File

@ -1,5 +1,8 @@
package org.apache.lucene.facet.taxonomy.writercache.cl2o;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.lucene.facet.taxonomy.CategoryPath;
import org.apache.lucene.facet.taxonomy.writercache.TaxonomyWriterCache;
@ -30,44 +33,71 @@ import org.apache.lucene.facet.taxonomy.writercache.TaxonomyWriterCache;
*/
public class Cl2oTaxonomyWriterCache implements TaxonomyWriterCache {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private CompactLabelToOrdinal cache;
public Cl2oTaxonomyWriterCache(int initialCapcity, float loadFactor, int numHashArrays) {
this.cache = new CompactLabelToOrdinal(initialCapcity, loadFactor, numHashArrays);
}
public void close() {
@Override
public synchronized void close() {
cache = null;
}
@Override
public boolean hasRoom(int n) {
// This cache is unlimited, so we always have room for remembering more:
return true;
}
@Override
public int get(CategoryPath categoryPath) {
lock.readLock().lock();
try {
return cache.getOrdinal(categoryPath);
} finally {
lock.readLock().unlock();
}
}
@Override
public int get(CategoryPath categoryPath, int length) {
if (length < 0 || length > categoryPath.length()) {
length = categoryPath.length();
}
lock.readLock().lock();
try {
return cache.getOrdinal(categoryPath, length);
} finally {
lock.readLock().unlock();
}
}
@Override
public boolean put(CategoryPath categoryPath, int ordinal) {
lock.writeLock().lock();
try {
cache.addLabel(categoryPath, ordinal);
// Tell the caller we didn't clear part of the cache, so it doesn't
// have to flush its on-disk index now
return false;
} finally {
lock.writeLock().unlock();
}
}
@Override
public boolean put(CategoryPath categoryPath, int prefixLen, int ordinal) {
lock.writeLock().lock();
try {
cache.addLabel(categoryPath, prefixLen, ordinal);
// Tell the caller we didn't clear part of the cache, so it doesn't
// have to flush its on-disk index now
return false;
} finally {
lock.writeLock().unlock();
}
}
/**
@ -75,8 +105,7 @@ public class Cl2oTaxonomyWriterCache implements TaxonomyWriterCache {
* @return Number of bytes in memory used by this object.
*/
public int getMemoryUsage() {
int memoryUsage = (this.cache == null) ? 0 : this.cache.getMemoryUsage();
return memoryUsage;
return cache == null ? 0 : cache.getMemoryUsage();
}
}

View File

@ -60,16 +60,19 @@ public class LruTaxonomyWriterCache implements TaxonomyWriterCache {
}
}
public boolean hasRoom(int n) {
@Override
public synchronized boolean hasRoom(int n) {
return n <= (cache.getMaxSize() - cache.getSize());
}
public void close() {
@Override
public synchronized void close() {
cache.clear();
cache = null;
}
public int get(CategoryPath categoryPath) {
@Override
public synchronized int get(CategoryPath categoryPath) {
Integer res = cache.get(categoryPath);
if (res == null) {
return -1;
@ -78,7 +81,8 @@ public class LruTaxonomyWriterCache implements TaxonomyWriterCache {
return res.intValue();
}
public int get(CategoryPath categoryPath, int length) {
@Override
public synchronized int get(CategoryPath categoryPath, int length) {
if (length<0 || length>categoryPath.length()) {
length = categoryPath.length();
}
@ -94,7 +98,8 @@ public class LruTaxonomyWriterCache implements TaxonomyWriterCache {
return res.intValue();
}
public boolean put(CategoryPath categoryPath, int ordinal) {
@Override
public synchronized boolean put(CategoryPath categoryPath, int ordinal) {
boolean ret = cache.put(categoryPath, new Integer(ordinal));
// If the cache is full, we need to clear one or more old entries
// from the cache. However, if we delete from the cache a recent
@ -109,7 +114,8 @@ public class LruTaxonomyWriterCache implements TaxonomyWriterCache {
return ret;
}
public boolean put(CategoryPath categoryPath, int prefixLen, int ordinal) {
@Override
public synchronized boolean put(CategoryPath categoryPath, int prefixLen, int ordinal) {
boolean ret = cache.put(categoryPath, prefixLen, new Integer(ordinal));
// If the cache is full, we need to clear one or more old entries
// from the cache. However, if we delete from the cache a recent
@ -125,4 +131,3 @@ public class LruTaxonomyWriterCache implements TaxonomyWriterCache {
}
}

View File

@ -3,6 +3,7 @@ package org.apache.lucene.facet.taxonomy.directory;
import java.io.IOException;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.facet.taxonomy.CategoryPath;
import org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.DiskOrdinalMap;
@ -32,16 +33,32 @@ import org.apache.lucene.util._TestUtil;
public class TestAddTaxonomy extends LuceneTestCase {
private void dotest(int ncats, int range) throws Exception {
private void dotest(int ncats, final int range) throws Exception {
final AtomicInteger numCats = new AtomicInteger(ncats);
Directory dirs[] = new Directory[2];
Random random = random();
for (int i = 0; i < dirs.length; i++) {
dirs[i] = newDirectory();
DirectoryTaxonomyWriter tw = new DirectoryTaxonomyWriter(dirs[i]);
for (int j = 0; j < ncats; j++) {
final DirectoryTaxonomyWriter tw = new DirectoryTaxonomyWriter(dirs[i]);
Thread[] addThreads = new Thread[4];
for (int j = 0; j < addThreads.length; j++) {
addThreads[j] = new Thread() {
@Override
public void run() {
Random random = random();
while (numCats.decrementAndGet() > 0) {
String cat = Integer.toString(random.nextInt(range));
try {
tw.addCategory(new CategoryPath("a", cat));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
};
}
for (Thread t : addThreads) t.start();
for (Thread t : addThreads) t.join();
tw.close();
}
@ -133,11 +150,9 @@ public class TestAddTaxonomy extends LuceneTestCase {
}
// A more comprehensive and big random test.
@Nightly
public void testBig() throws Exception {
dotest(200, 10000);
dotest(1000, 20000);
// really big
dotest(400000, 1000000);
}

View File

@ -3,11 +3,16 @@ package org.apache.lucene.facet.taxonomy.directory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.facet.taxonomy.CategoryPath;
import org.apache.lucene.facet.taxonomy.InconsistentTaxonomyException;
import org.apache.lucene.facet.taxonomy.writercache.TaxonomyWriterCache;
import org.apache.lucene.facet.taxonomy.writercache.cl2o.Cl2oTaxonomyWriterCache;
import org.apache.lucene.facet.taxonomy.writercache.lru.LruTaxonomyWriterCache;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
@ -42,11 +47,17 @@ public class TestDirectoryTaxonomyWriter extends LuceneTestCase {
NoOpCache() { }
@Override
public void close() {}
@Override
public int get(CategoryPath categoryPath) { return -1; }
@Override
public int get(CategoryPath categoryPath, int length) { return get(categoryPath); }
@Override
public boolean put(CategoryPath categoryPath, int ordinal) { return true; }
@Override
public boolean put(CategoryPath categoryPath, int prefixLen, int ordinal) { return true; }
@Override
public boolean hasRoom(int numberOfEntries) { return false; }
}
@ -202,4 +213,47 @@ public class TestDirectoryTaxonomyWriter extends LuceneTestCase {
dir.close();
}
public void testConcurrency() throws Exception {
int ncats = atLeast(100000); // add many categories
final int range = ncats * 3; // affects the categories selection
final AtomicInteger numCats = new AtomicInteger(ncats);
Directory dir = newDirectory();
final ConcurrentHashMap<Integer,Integer> values = new ConcurrentHashMap<Integer,Integer>();
TaxonomyWriterCache cache = random().nextBoolean()
? new Cl2oTaxonomyWriterCache(1024, 0.15f, 3)
: new LruTaxonomyWriterCache(ncats / 10);
final DirectoryTaxonomyWriter tw = new DirectoryTaxonomyWriter(dir, OpenMode.CREATE, cache);
Thread[] addThreads = new Thread[atLeast(4)];
for (int z = 0; z < addThreads.length; z++) {
addThreads[z] = new Thread() {
@Override
public void run() {
Random random = random();
while (numCats.decrementAndGet() > 0) {
try {
int value = random.nextInt(range);
tw.addCategory(new CategoryPath("a", Integer.toString(value)));
values.put(value, value);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
};
}
for (Thread t : addThreads) t.start();
for (Thread t : addThreads) t.join();
tw.close();
DirectoryTaxonomyReader dtr = new DirectoryTaxonomyReader(dir);
assertEquals(values.size() + 2, dtr.getSize()); // +2 for root category + "a"
for (Integer value : values.keySet()) {
assertTrue("category not found a/" + value, dtr.getOrdinal(new CategoryPath("a", value.toString())) > 0);
}
dtr.close();
dir.close();
}
}