From f99a9d7ab0497b88f5a49dbe7b7bbb958756aa1a Mon Sep 17 00:00:00 2001 From: Shai Erera Date: Thu, 29 May 2014 12:24:48 +0000 Subject: [PATCH] LUCENE-5680: add aotmic DocValues updates git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1598272 13f79535-47bb-0310-9956-ffa450edef68 --- lucene/CHANGES.txt | 3 + .../index/BinaryDocValuesFieldUpdates.java | 2 +- .../lucene/index/DocValuesFieldUpdates.java | 10 ++- .../apache/lucene/index/DocValuesUpdate.java | 10 +-- .../apache/lucene/index/DocumentsWriter.java | 14 +---- .../index/DocumentsWriterDeleteQueue.java | 57 +++++++++-------- .../org/apache/lucene/index/IndexWriter.java | 61 +++++++++++++++++-- .../index/NumericDocValuesFieldUpdates.java | 2 +- .../index/TestBinaryDocValuesUpdates.java | 28 ++++----- .../index/TestIndexWriterExceptions.java | 41 +++---------- .../index/TestMixedDocValuesUpdates.java | 33 +++++----- .../index/TestNumericDocValuesUpdates.java | 29 +++++---- .../lucene/index/RandomIndexWriter.java | 6 ++ 13 files changed, 160 insertions(+), 136 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 93806e1b064..3a1743438c7 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -118,6 +118,9 @@ New Features * LUCENE-5675: Add IDVersionPostingsFormat, a postings format optimized for primary-key (ID) fields that also record a version (long) for each ID. (Robert Muir, Mike McCandless) + +* LUCENE-5680: Add ability to atomically update a set of DocValues + fields. (Shai Erera) Changes in Backwards Compatibility Policy diff --git a/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java index 43211a94f27..53bf5a18c86 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java @@ -101,7 +101,7 @@ class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates { private final int bitsPerValue; public BinaryDocValuesFieldUpdates(String field, int maxDoc) { - super(field, Type.BINARY); + super(field, FieldInfo.DocValuesType.BINARY); bitsPerValue = PackedInts.bitsRequired(maxDoc - 1); docs = new PagedMutable(1, PAGE_SIZE, bitsPerValue, PackedInts.COMPACT); offsets = new PagedGrowableWriter(1, PAGE_SIZE, 1, PackedInts.FAST); diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java index 6593d8a8a5c..7db6a7a4fa7 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java @@ -33,8 +33,6 @@ abstract class DocValuesFieldUpdates { protected static final int PAGE_SIZE = 1024; - static enum Type { NUMERIC, BINARY } - /** * An iterator over documents and their updated values. Only documents with * updates are returned by this iterator, and the documents are returned in @@ -100,7 +98,7 @@ abstract class DocValuesFieldUpdates { return ramBytesPerDoc; } - DocValuesFieldUpdates getUpdates(String field, Type type) { + DocValuesFieldUpdates getUpdates(String field, FieldInfo.DocValuesType type) { switch (type) { case NUMERIC: return numericDVUpdates.get(field); @@ -111,7 +109,7 @@ abstract class DocValuesFieldUpdates { } } - DocValuesFieldUpdates newUpdates(String field, Type type, int maxDoc) { + DocValuesFieldUpdates newUpdates(String field, FieldInfo.DocValuesType type, int maxDoc) { switch (type) { case NUMERIC: assert numericDVUpdates.get(field) == null; @@ -135,9 +133,9 @@ abstract class DocValuesFieldUpdates { } final String field; - final Type type; + final FieldInfo.DocValuesType type; - protected DocValuesFieldUpdates(String field, Type type) { + protected DocValuesFieldUpdates(String field, FieldInfo.DocValuesType type) { this.field = field; this.type = type; } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java index 336a878b3e3..57ba1b60c66 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java @@ -39,7 +39,7 @@ abstract class DocValuesUpdate { */ private static final int RAW_SIZE_IN_BYTES = 8*NUM_BYTES_OBJECT_HEADER + 8*NUM_BYTES_OBJECT_REF + 8*NUM_BYTES_INT; - final DocValuesFieldUpdates.Type type; + final FieldInfo.DocValuesType type; final Term term; final String field; final Object value; @@ -52,7 +52,7 @@ abstract class DocValuesUpdate { * @param field the {@link NumericDocValuesField} to update * @param value the updated value */ - protected DocValuesUpdate(DocValuesFieldUpdates.Type type, Term term, String field, Object value) { + protected DocValuesUpdate(FieldInfo.DocValuesType type, Term term, String field, Object value) { this.type = type; this.term = term; this.field = field; @@ -82,7 +82,7 @@ abstract class DocValuesUpdate { private static final long RAW_VALUE_SIZE_IN_BYTES = NUM_BYTES_ARRAY_HEADER + 2*NUM_BYTES_INT + NUM_BYTES_OBJECT_REF; BinaryDocValuesUpdate(Term term, String field, BytesRef value) { - super(DocValuesFieldUpdates.Type.BINARY, term, field, value); + super(FieldInfo.DocValuesType.BINARY, term, field, value); } @Override @@ -94,9 +94,9 @@ abstract class DocValuesUpdate { /** An in-place update to a numeric DocValues field */ static final class NumericDocValuesUpdate extends DocValuesUpdate { - + NumericDocValuesUpdate(Term term, String field, Long value) { - super(DocValuesFieldUpdates.Type.NUMERIC, term, field, value); + super(FieldInfo.DocValuesType.NUMERIC, term, field, value); } @Override diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 30aec59deb3..62af0773775 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -30,13 +30,10 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket; import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; -import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; -import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.IndexWriter.Event; import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; -import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.InfoStream; /** @@ -156,20 +153,13 @@ final class DocumentsWriter implements Closeable { return applyAllDeletes( deleteQueue); } - synchronized boolean updateNumericDocValue(Term term, String field, long value) throws IOException { + synchronized boolean updateDocValues(DocValuesUpdate... updates) throws IOException { final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; - deleteQueue.addNumericUpdate(new NumericDocValuesUpdate(term, field, Long.valueOf(value))); + deleteQueue.addDocValuesUpdates(updates); flushControl.doOnDelete(); return applyAllDeletes(deleteQueue); } - synchronized boolean updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException { - final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; - deleteQueue.addBinaryUpdate(new BinaryDocValuesUpdate(term, field, value)); - flushControl.doOnDelete(); - return applyAllDeletes(deleteQueue); - } - DocumentsWriterDeleteQueue currentDeleteSession() { return deleteQueue; } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java index 02a554012e4..95ebd780479 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java @@ -24,6 +24,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.search.Query; +import org.apache.lucene.util.BytesRef; /** * {@link DocumentsWriterDeleteQueue} is a non-blocking linked pending deletes @@ -109,16 +110,11 @@ final class DocumentsWriterDeleteQueue { tryApplyGlobalSlice(); } - void addNumericUpdate(NumericDocValuesUpdate update) { - add(new NumericUpdateNode(update)); + void addDocValuesUpdates(DocValuesUpdate... updates) { + add(new DocValuesUpdatesNode(updates)); tryApplyGlobalSlice(); } - void addBinaryUpdate(BinaryDocValuesUpdate update) { - add(new BinaryUpdateNode(update)); - tryApplyGlobalSlice(); - } - /** * invariant for document update */ @@ -392,40 +388,43 @@ final class DocumentsWriterDeleteQueue { } } - private static final class NumericUpdateNode extends Node { + private static final class DocValuesUpdatesNode extends Node { - NumericUpdateNode(NumericDocValuesUpdate update) { - super(update); + DocValuesUpdatesNode(DocValuesUpdate... updates) { + super(updates); } @Override void apply(BufferedUpdates bufferedUpdates, int docIDUpto) { - bufferedUpdates.addNumericUpdate(item, docIDUpto); + for (DocValuesUpdate update : item) { + switch (update.type) { + case NUMERIC: + bufferedUpdates.addNumericUpdate(new NumericDocValuesUpdate(update.term, update.field, (Long) update.value), docIDUpto); + break; + case BINARY: + bufferedUpdates.addBinaryUpdate(new BinaryDocValuesUpdate(update.term, update.field, (BytesRef) update.value), docIDUpto); + break; + default: + throw new IllegalArgumentException(update.type + " DocValues updates not supported yet!"); + } + } } @Override public String toString() { - return "update=" + item; + StringBuilder sb = new StringBuilder(); + sb.append("docValuesUpdates: "); + if (item.length > 0) { + sb.append("term=").append(item[0].term).append("; updates: ["); + for (DocValuesUpdate update : item) { + sb.append(update.field).append(':').append(update.value).append(','); + } + sb.setCharAt(sb.length()-1, ']'); + } + return sb.toString(); } } - private static final class BinaryUpdateNode extends Node { - - BinaryUpdateNode(BinaryDocValuesUpdate update) { - super(update); - } - - @Override - void apply(BufferedUpdates bufferedUpdates, int docIDUpto) { - bufferedUpdates.addBinaryUpdate(item, docIDUpto); - } - - @Override - public String toString() { - return "update=" + item; - } - } - private boolean forceApplyGlobalSlice() { globalBufferLock.lock(); final Node currentTail = tail; diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 5f15abbec49..d27a8c7acc0 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -32,14 +32,17 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Locale; -import java.util.Map.Entry; import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.document.Field; +import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; +import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.index.FieldInfo.DocValuesType; import org.apache.lucene.index.FieldInfos.FieldNumbers; import org.apache.lucene.index.IndexWriterConfig.OpenMode; @@ -1429,7 +1432,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ throw new IllegalArgumentException("can only update existing numeric-docvalues fields!"); } try { - if (docWriter.updateNumericDocValue(term, field, value)) { + if (docWriter.updateDocValues(new NumericDocValuesUpdate(term, field, value))) { processEvents(true, false); } } catch (OutOfMemoryError oom) { @@ -1471,14 +1474,64 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ throw new IllegalArgumentException("can only update existing binary-docvalues fields!"); } try { - if (docWriter.updateBinaryDocValue(term, field, value)) { + if (docWriter.updateDocValues(new BinaryDocValuesUpdate(term, field, value))) { processEvents(true, false); } } catch (OutOfMemoryError oom) { handleOOM(oom, "updateBinaryDocValue"); } } - + + /** + * Updates documents' DocValues fields to the given values. Each field update + * is applied to the set of documents that are associated with the + * {@link Term} to the same value. All updates are atomically applied and + * flushed together. + * + *

+ * NOTE: if this method hits an OutOfMemoryError you should immediately + * close the writer. See above for details. + *

+ * + * @param updates + * the updates to apply + * @throws CorruptIndexException + * if the index is corrupt + * @throws IOException + * if there is a low-level IO error + */ + public void updateDocValues(Term term, Field... updates) throws IOException { + ensureOpen(); + DocValuesUpdate[] dvUpdates = new DocValuesUpdate[updates.length]; + for (int i = 0; i < updates.length; i++) { + final Field f = updates[i]; + final DocValuesType dvType = f.fieldType().docValueType(); + if (dvType == null) { + throw new IllegalArgumentException("can only update NUMERIC or BINARY fields! field=" + f.name()); + } + if (!globalFieldNumberMap.contains(f.name(), dvType)) { + throw new IllegalArgumentException("can only update existing docvalues fields! field=" + f.name() + ", type=" + dvType); + } + switch (dvType) { + case NUMERIC: + dvUpdates[i] = new NumericDocValuesUpdate(term, f.name(), (Long) f.numericValue()); + break; + case BINARY: + dvUpdates[i] = new BinaryDocValuesUpdate(term, f.name(), f.binaryValue()); + break; + default: + throw new IllegalArgumentException("can only update NUMERIC or BINARY fields: field=" + f.name() + ", type=" + dvType); + } + } + try { + if (docWriter.updateDocValues(dvUpdates)) { + processEvents(true, false); + } + } catch (OutOfMemoryError oom) { + handleOOM(oom, "updateDocValues"); + } + } + // for test purpose final synchronized int getSegmentCount(){ return segmentInfos.size(); diff --git a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java index 16ad698251c..583b8592864 100644 --- a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java @@ -86,7 +86,7 @@ class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates { private int size; public NumericDocValuesFieldUpdates(String field, int maxDoc) { - super(field, Type.NUMERIC); + super(field, FieldInfo.DocValuesType.NUMERIC); bitsPerValue = PackedInts.bitsRequired(maxDoc - 1); docs = new PagedMutable(1, PAGE_SIZE, bitsPerValue, PackedInts.COMPACT); values = new PagedGrowableWriter(1, PAGE_SIZE, 1, PackedInts.FAST); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java index 1902481c92f..f1b1191dc46 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java @@ -911,7 +911,7 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase { final IndexWriter writer = new IndexWriter(dir, conf); // create index - final int numThreads = TestUtil.nextInt(random(), 3, 6); + final int numFields = TestUtil.nextInt(random(), 1, 4); final int numDocs = atLeast(2000); for (int i = 0; i < numDocs; i++) { Document doc = new Document(); @@ -923,7 +923,7 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase { else if (group < 0.8) g = "g2"; else g = "g3"; doc.add(new StringField("updKey", g, Store.NO)); - for (int j = 0; j < numThreads; j++) { + for (int j = 0; j < numFields; j++) { long value = random().nextInt(); doc.add(new BinaryDocValuesField("f" + j, toBytes(value))); doc.add(new BinaryDocValuesField("cf" + j, toBytes(value * 2))); // control, always updated to f * 2 @@ -931,14 +931,13 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase { writer.addDocument(doc); } + final int numThreads = TestUtil.nextInt(random(), 3, 6); final CountDownLatch done = new CountDownLatch(numThreads); final AtomicInteger numUpdates = new AtomicInteger(atLeast(100)); // same thread updates a field as well as reopens Thread[] threads = new Thread[numThreads]; for (int i = 0; i < threads.length; i++) { - final String f = "f" + i; - final String cf = "cf" + i; threads[i] = new Thread("UpdateThread-" + i) { @Override public void run() { @@ -953,10 +952,13 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase { else if (group < 0.5) t = new Term("updKey", "g1"); else if (group < 0.8) t = new Term("updKey", "g2"); else t = new Term("updKey", "g3"); -// System.out.println("[" + Thread.currentThread().getName() + "] numUpdates=" + numUpdates + " updateTerm=" + t); + + final int field = random().nextInt(numFields); + final String f = "f" + field; + final String cf = "cf" + field; +// System.out.println("[" + Thread.currentThread().getName() + "] numUpdates=" + numUpdates + " updateTerm=" + t + " field=" + field); long updValue = random.nextInt(); - writer.updateBinaryDocValue(t, f, toBytes(updValue)); - writer.updateBinaryDocValue(t, cf, toBytes(updValue * 2)); + writer.updateDocValues(t, new BinaryDocValuesField(f, toBytes(updValue)), new BinaryDocValuesField(cf, toBytes(updValue*2))); if (random.nextDouble() < 0.2) { // delete a random document @@ -1012,7 +1014,7 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase { BytesRef scratch = new BytesRef(); for (AtomicReaderContext context : reader.leaves()) { AtomicReader r = context.reader(); - for (int i = 0; i < numThreads; i++) { + for (int i = 0; i < numFields; i++) { BinaryDocValues bdv = r.getBinaryDocValues("f" + i); BinaryDocValues control = r.getBinaryDocValues("cf" + i); Bits docsWithBdv = r.getDocsWithField("f" + i); @@ -1054,8 +1056,7 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase { int doc = random().nextInt(numDocs); Term t = new Term("id", "doc" + doc); long value = random().nextLong(); - writer.updateBinaryDocValue(t, "f", toBytes(value)); - writer.updateBinaryDocValue(t, "cf", toBytes(value * 2)); + writer.updateDocValues(t, new BinaryDocValuesField("f", toBytes(value)), new BinaryDocValuesField("cf", toBytes(value*2))); DirectoryReader reader = DirectoryReader.open(writer, true); for (AtomicReaderContext context : reader.leaves()) { AtomicReader r = context.reader(); @@ -1146,8 +1147,7 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase { // update some docs to a random value long value = random().nextInt(); Term term = new Term("id", RandomPicks.randomFrom(random(), randomTerms)); - writer.updateBinaryDocValue(term, "bdv", toBytes(value)); - writer.updateBinaryDocValue(term, "control", toBytes(value * 2)); + writer.updateDocValues(term, new BinaryDocValuesField("bdv", toBytes(value)), new BinaryDocValuesField("control", toBytes(value * 2))); writer.shutdown(); Directory dir2 = newDirectory(); @@ -1252,8 +1252,8 @@ public class TestBinaryDocValuesUpdates extends LuceneTestCase { int field = random.nextInt(numBinaryFields); Term updateTerm = new Term("upd", RandomPicks.randomFrom(random, updateTerms)); long value = random.nextInt(); - writer.updateBinaryDocValue(updateTerm, "f" + field, toBytes(value)); - writer.updateBinaryDocValue(updateTerm, "cf" + field, toBytes(value * 2)); + writer.updateDocValues(updateTerm, new BinaryDocValuesField("f" + field, toBytes(value)), + new BinaryDocValuesField("cf" + field, toBytes(value * 2))); } writer.shutdown(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java index 8d78d5d2217..f8a12d30f61 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java @@ -35,7 +35,6 @@ import org.apache.lucene.analysis.MockTokenizer; import org.apache.lucene.analysis.Token; import org.apache.lucene.analysis.TokenFilter; import org.apache.lucene.analysis.TokenStream; -import org.apache.lucene.analysis.Analyzer.TokenStreamComponents; import org.apache.lucene.document.BinaryDocValuesField; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -1979,8 +1978,6 @@ public class TestIndexWriterExceptions extends LuceneTestCase { shouldFail.set(true); boolean doClose = false; - int updatingDocID = -1; - long updatingValue = -1; try { boolean defaultCodecSupportsFieldUpdates = defaultCodecSupportsFieldUpdates(); for(int i=0;i