diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index d860dc068a6..54937ad504e 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -108,6 +108,10 @@ New Features * LUCENE-5530: ComplexPhraseQueryParser throws ParseException for fielded queries. (Erick Erickson via Tomas Fernandez Lobbe and Ahmet Arslan) +* LUCENE-5513: Add IndexWriter.updateBinaryDocValue which lets + you update the value of a BinaryDocValuesField without reindexing the + document(s). (Shai Erera) + API Changes * LUCENE-5454: Add RandomAccessOrds, an optional extension of SortedSetDocValues diff --git a/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java new file mode 100644 index 00000000000..571e4d37655 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/BinaryDocValuesFieldUpdates.java @@ -0,0 +1,233 @@ +package org.apache.lucene.index; + +import org.apache.lucene.document.BinaryDocValuesField; +import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.InPlaceMergeSorter; +import org.apache.lucene.util.packed.PackedInts; +import org.apache.lucene.util.packed.PagedGrowableWriter; +import org.apache.lucene.util.packed.PagedMutable; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A {@link DocValuesFieldUpdates} which holds updates of documents, of a single + * {@link BinaryDocValuesField}. + * + * @lucene.experimental + */ +class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates { + + final static class Iterator extends DocValuesFieldUpdates.Iterator { + private final PagedGrowableWriter offsets; + private final int size; + private final PagedGrowableWriter lengths; + private final PagedMutable docs; + private final FixedBitSet docsWithField; + private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE + private int doc = -1; + private final BytesRef value; + private int offset, length; + + Iterator(int size, PagedGrowableWriter offsets, PagedGrowableWriter lengths, + PagedMutable docs, BytesRef values, FixedBitSet docsWithField) { + this.offsets = offsets; + this.size = size; + this.lengths = lengths; + this.docs = docs; + this.docsWithField = docsWithField; + value = values.clone(); + } + + @Override + BytesRef value() { + if (offset == -1) { + return null; + } else { + value.offset = offset; + value.length = length; + return value; + } + } + + @Override + int nextDoc() { + if (idx >= size) { + offset = -1; + return doc = DocIdSetIterator.NO_MORE_DOCS; + } + doc = (int) docs.get(idx); + ++idx; + while (idx < size && docs.get(idx) == doc) { + ++idx; + } + // idx points to the "next" element + long prevIdx = idx - 1; + if (!docsWithField.get((int) prevIdx)) { + offset = -1; + } else { + // cannot change 'value' here because nextDoc is called before the + // value is used, and it's a waste to clone the BytesRef when we + // obtain the value + offset = (int) offsets.get(prevIdx); + length = (int) lengths.get(prevIdx); + } + return doc; + } + + @Override + int doc() { + return doc; + } + + @Override + void reset() { + doc = -1; + offset = -1; + idx = 0; + } + } + + private FixedBitSet docsWithField; + private PagedMutable docs; + private PagedGrowableWriter offsets, lengths; + private BytesRef values; + private int size; + + public BinaryDocValuesFieldUpdates(String field, int maxDoc) { + super(field, Type.BINARY); + docsWithField = new FixedBitSet(64); + docs = new PagedMutable(1, 1024, PackedInts.bitsRequired(maxDoc - 1), PackedInts.COMPACT); + offsets = new PagedGrowableWriter(1, 1024, 1, PackedInts.FAST); + lengths = new PagedGrowableWriter(1, 1024, 1, PackedInts.FAST); + values = new BytesRef(16); // start small + size = 0; + } + + @Override + public void add(int doc, Object value) { + // TODO: if the Sorter interface changes to take long indexes, we can remove that limitation + if (size == Integer.MAX_VALUE) { + throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries"); + } + + BytesRef val = (BytesRef) value; + if (val == null) { + val = BinaryDocValuesUpdate.MISSING; + } + + // grow the structures to have room for more elements + if (docs.size() == size) { + docs = docs.grow(size + 1); + offsets = offsets.grow(size + 1); + lengths = lengths.grow(size + 1); + docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size()); + } + + if (val != BinaryDocValuesUpdate.MISSING) { + // only mark the document as having a value in that field if the value wasn't set to null (MISSING) + docsWithField.set(size); + } + + docs.set(size, doc); + offsets.set(size, values.length); + lengths.set(size, val.length); + values.append(val); + ++size; + } + + @Override + public Iterator iterator() { + final PagedMutable docs = this.docs; + final PagedGrowableWriter offsets = this.offsets; + final PagedGrowableWriter lengths = this.lengths; + final BytesRef values = this.values; + final FixedBitSet docsWithField = this.docsWithField; + new InPlaceMergeSorter() { + @Override + protected void swap(int i, int j) { + long tmpDoc = docs.get(j); + docs.set(j, docs.get(i)); + docs.set(i, tmpDoc); + + long tmpOffset = offsets.get(j); + offsets.set(j, offsets.get(i)); + offsets.set(i, tmpOffset); + + long tmpLength = lengths.get(j); + lengths.set(j, lengths.get(i)); + lengths.set(i, tmpLength); + + boolean tmpBool = docsWithField.get(j); + if (docsWithField.get(i)) { + docsWithField.set(j); + } else { + docsWithField.clear(j); + } + if (tmpBool) { + docsWithField.set(i); + } else { + docsWithField.clear(i); + } + } + + @Override + protected int compare(int i, int j) { + int x = (int) docs.get(i); + int y = (int) docs.get(j); + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } + }.sort(0, size); + + return new Iterator(size, offsets, lengths, docs, values, docsWithField); + } + + @Override + public void merge(DocValuesFieldUpdates other) { + BinaryDocValuesFieldUpdates otherUpdates = (BinaryDocValuesFieldUpdates) other; + int newSize = size + otherUpdates.size; + if (newSize > Integer.MAX_VALUE) { + throw new IllegalStateException( + "cannot support more than Integer.MAX_VALUE doc/value entries; size=" + + size + " other.size=" + otherUpdates.size); + } + docs = docs.grow(newSize); + offsets = offsets.grow(newSize); + lengths = lengths.grow(newSize); + docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size()); + for (int i = 0; i < otherUpdates.size; i++) { + int doc = (int) otherUpdates.docs.get(i); + if (otherUpdates.docsWithField.get(i)) { + docsWithField.set(size); + } + docs.set(size, doc); + offsets.set(size, values.length + otherUpdates.offsets.get(i)); // correct relative offset + lengths.set(size, otherUpdates.lengths.get(i)); + ++size; + } + values.append(otherUpdates.values); + } + + @Override + public boolean any() { + return size > 0; + } + +} diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java index 245ff8ef22f..d6a3f80501d 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdates.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; +import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.search.Query; import org.apache.lucene.util.RamUsageEstimator; @@ -92,9 +94,39 @@ class BufferedUpdates { * NumericUpdate (val) counts its own size and isn't accounted for here. */ final static int BYTES_PER_NUMERIC_UPDATE_ENTRY = 7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_INT; + + /* Rough logic: BinaryUpdate calculates its actual size, + * including the update Term and DV field (String). The + * per-field map holds a reference to the updated field, and + * therefore we only account for the object reference and + * map space itself. This is incremented when we first see + * an updated field. + * + * HashMap has an array[Entry] w/ varying load + * factor (say 2*POINTER). Entry is an object w/ String key, + * LinkedHashMap val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT). + * + * LinkedHashMap (val) is counted as OBJ_HEADER, array[Entry] ref + header, 4*INT, 1*FLOAT, + * Set (entrySet) (2*OBJ_HEADER + ARRAY_HEADER + 2*POINTER + 4*INT + FLOAT) + */ + final static int BYTES_PER_BINARY_FIELD_ENTRY = + 7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 3*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + 5*RamUsageEstimator.NUM_BYTES_INT + RamUsageEstimator.NUM_BYTES_FLOAT; + + /* Rough logic: Incremented when we see another Term for an already updated + * field. + * LinkedHashMap has an array[Entry] w/ varying load factor + * (say 2*POINTER). Entry is an object w/ Term key, BinaryUpdate val, + * int hash, Entry next, Entry before, Entry after (OBJ_HEADER + 5*POINTER + INT). + * + * Term (key) is counted only as POINTER. + * BinaryUpdate (val) counts its own size and isn't accounted for here. + */ + final static int BYTES_PER_BINARY_UPDATE_ENTRY = 7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_INT; final AtomicInteger numTermDeletes = new AtomicInteger(); final AtomicInteger numNumericUpdates = new AtomicInteger(); + final AtomicInteger numBinaryUpdates = new AtomicInteger(); final Map terms = new HashMap<>(); final Map queries = new HashMap<>(); final List docIDs = new ArrayList<>(); @@ -106,7 +138,16 @@ class BufferedUpdates { // one that came in wins), and helps us detect faster if the same Term is // used to update the same field multiple times (so we later traverse it // only once). - final Map> numericUpdates = new HashMap<>(); + final Map> numericUpdates = new HashMap<>(); + + // Map> + // For each field we keep an ordered list of BinaryUpdates, key'd by the + // update Term. LinkedHashMap guarantees we will later traverse the map in + // insertion order (so that if two terms affect the same document, the last + // one that came in wins), and helps us detect faster if the same Term is + // used to update the same field multiple times (so we later traverse it + // only once). + final Map> binaryUpdates = new HashMap<>(); public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE); @@ -125,7 +166,7 @@ class BufferedUpdates { if (VERBOSE_DELETES) { return "gen=" + gen + " numTerms=" + numTermDeletes + ", terms=" + terms + ", queries=" + queries + ", docIDs=" + docIDs + ", numericUpdates=" + numericUpdates - + ", bytesUsed=" + bytesUsed; + + ", binaryUpdates=" + binaryUpdates + ", bytesUsed=" + bytesUsed; } else { String s = "gen=" + gen; if (numTermDeletes.get() != 0) { @@ -140,6 +181,9 @@ class BufferedUpdates { if (numNumericUpdates.get() != 0) { s += " " + numNumericUpdates.get() + " numeric updates (unique count=" + numericUpdates.size() + ")"; } + if (numBinaryUpdates.get() != 0) { + s += " " + numBinaryUpdates.get() + " binary updates (unique count=" + binaryUpdates.size() + ")"; + } if (bytesUsed.get() != 0) { s += " bytesUsed=" + bytesUsed.get(); } @@ -184,14 +228,14 @@ class BufferedUpdates { } } - public void addNumericUpdate(NumericUpdate update, int docIDUpto) { - LinkedHashMap fieldUpdates = numericUpdates.get(update.field); + public void addNumericUpdate(NumericDocValuesUpdate update, int docIDUpto) { + LinkedHashMap fieldUpdates = numericUpdates.get(update.field); if (fieldUpdates == null) { fieldUpdates = new LinkedHashMap<>(); numericUpdates.put(update.field, fieldUpdates); bytesUsed.addAndGet(BYTES_PER_NUMERIC_FIELD_ENTRY); } - final NumericUpdate current = fieldUpdates.get(update.term); + final NumericDocValuesUpdate current = fieldUpdates.get(update.term); if (current != null && docIDUpto < current.docIDUpto) { // Only record the new number if it's greater than or equal to the current // one. This is important because if multiple threads are replacing the @@ -213,17 +257,48 @@ class BufferedUpdates { } } + public void addBinaryUpdate(BinaryDocValuesUpdate update, int docIDUpto) { + LinkedHashMap fieldUpdates = binaryUpdates.get(update.field); + if (fieldUpdates == null) { + fieldUpdates = new LinkedHashMap<>(); + binaryUpdates.put(update.field, fieldUpdates); + bytesUsed.addAndGet(BYTES_PER_BINARY_FIELD_ENTRY); + } + final BinaryDocValuesUpdate current = fieldUpdates.get(update.term); + if (current != null && docIDUpto < current.docIDUpto) { + // Only record the new number if it's greater than or equal to the current + // one. This is important because if multiple threads are replacing the + // same doc at nearly the same time, it's possible that one thread that + // got a higher docID is scheduled before the other threads. + return; + } + + update.docIDUpto = docIDUpto; + // since it's a LinkedHashMap, we must first remove the Term entry so that + // it's added last (we're interested in insertion-order). + if (current != null) { + fieldUpdates.remove(update.term); + } + fieldUpdates.put(update.term, update); + numBinaryUpdates.incrementAndGet(); + if (current == null) { + bytesUsed.addAndGet(BYTES_PER_BINARY_UPDATE_ENTRY + update.sizeInBytes()); + } + } + void clear() { terms.clear(); queries.clear(); docIDs.clear(); numericUpdates.clear(); + binaryUpdates.clear(); numTermDeletes.set(0); numNumericUpdates.set(0); + numBinaryUpdates.set(0); bytesUsed.set(0); } boolean any() { - return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0 || numericUpdates.size() > 0; + return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0 || numericUpdates.size() > 0 || binaryUpdates.size() > 0; } } diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java index b20c5a9f559..11f860c2756 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java +++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java @@ -22,9 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -214,20 +212,22 @@ class BufferedUpdatesStream { int delCount = 0; final boolean segAllDeletes; try { - Map fieldUpdates = null; + final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container(); if (coalescedDeletes != null) { //System.out.println(" del coalesced"); delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); - fieldUpdates = applyNumericDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, fieldUpdates); + applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates); + applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates); } //System.out.println(" del exact"); // Don't delete by Term here; DocumentsWriterPerThread // already did that on flush: delCount += applyQueryDeletes(packet.queriesIterable(), rld, reader); - fieldUpdates = applyNumericDocValuesUpdates(Arrays.asList(packet.updates), rld, reader, fieldUpdates); - if (!fieldUpdates.isEmpty()) { - rld.writeFieldUpdates(info.info.dir, fieldUpdates); + applyDocValuesUpdates(Arrays.asList(packet.numericDVUpdates), rld, reader, dvUpdates); + applyDocValuesUpdates(Arrays.asList(packet.binaryDVUpdates), rld, reader, dvUpdates); + if (dvUpdates.any()) { + rld.writeFieldUpdates(info.info.dir, dvUpdates); } final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); assert fullDelCount <= rld.info.info.getDocCount(); @@ -275,9 +275,11 @@ class BufferedUpdatesStream { try { delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader); delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader); - Map fieldUpdates = applyNumericDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, null); - if (!fieldUpdates.isEmpty()) { - rld.writeFieldUpdates(info.info.dir, fieldUpdates); + DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container(); + applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates); + applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates); + if (dvUpdates.any()) { + rld.writeFieldUpdates(info.info.dir, dvUpdates); } final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); assert fullDelCount <= rld.info.info.getDocCount(); @@ -436,14 +438,13 @@ class BufferedUpdatesStream { return delCount; } - // NumericDocValues Updates - // If otherFieldUpdates != null, we need to merge the updates into them - private synchronized Map applyNumericDocValuesUpdates(Iterable updates, - ReadersAndUpdates rld, SegmentReader reader, Map otherFieldUpdates) throws IOException { + // DocValues updates + private synchronized void applyDocValuesUpdates(Iterable updates, + ReadersAndUpdates rld, SegmentReader reader, DocValuesFieldUpdates.Container dvUpdatesContainer) throws IOException { Fields fields = reader.fields(); if (fields == null) { // This reader has no postings - return Collections.emptyMap(); + return; } // TODO: we can process the updates per DV field, from last to first so that @@ -459,9 +460,9 @@ class BufferedUpdatesStream { String currentField = null; TermsEnum termsEnum = null; DocsEnum docs = null; - final Map result = otherFieldUpdates == null ? new HashMap() : otherFieldUpdates; + //System.out.println(Thread.currentThread().getName() + " numericDVUpdate reader=" + reader); - for (NumericUpdate update : updates) { + for (DocValuesUpdate update : updates) { Term term = update.term; int limit = update.docIDUpto; @@ -499,10 +500,9 @@ class BufferedUpdatesStream { //System.out.println("BDS: got docsEnum=" + docsEnum); - NumericFieldUpdates fieldUpdates = result.get(update.field); - if (fieldUpdates == null) { - fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(reader.maxDoc()); - result.put(update.field, fieldUpdates); + DocValuesFieldUpdates dvUpdates = dvUpdatesContainer.getUpdates(update.field, update.type); + if (dvUpdates == null) { + dvUpdates = dvUpdatesContainer.newUpdates(update.field, update.type, reader.maxDoc()); } int doc; while ((doc = docsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { @@ -510,13 +510,12 @@ class BufferedUpdatesStream { if (doc >= limit) { break; // no more docs that can be updated for this term } - fieldUpdates.add(doc, update.value); + dvUpdates.add(doc, update.value); } } } - return result; } - + public static class QueryAndLimit { public final Query query; public final int limit; diff --git a/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java index 8a0bbd35b18..61ecf7938d6 100644 --- a/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/CoalescedUpdates.java @@ -25,17 +25,23 @@ import java.util.Map; import org.apache.lucene.search.Query; import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit; +import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; +import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.MergedIterator; class CoalescedUpdates { final Map queries = new HashMap<>(); final List> iterables = new ArrayList<>(); - final List numericDVUpdates = new ArrayList<>(); + final List numericDVUpdates = new ArrayList<>(); + final List binaryDVUpdates = new ArrayList<>(); @Override public String toString() { // note: we could add/collect more debugging information - return "CoalescedUpdates(termSets=" + iterables.size() + ",queries=" + queries.size() + ",numericUpdates=" + numericDVUpdates.size() + ")"; + return "CoalescedUpdates(termSets=" + iterables.size() + ",queries=" + + queries.size() + ",numericDVUpdates=" + numericDVUpdates.size() + + ",binaryDVUpdates=" + binaryDVUpdates.size() + ")"; } void update(FrozenBufferedUpdates in) { @@ -46,11 +52,17 @@ class CoalescedUpdates { queries.put(query, BufferedUpdates.MAX_INT); } - for (NumericUpdate nu : in.updates) { - NumericUpdate clone = new NumericUpdate(nu.term, nu.field, nu.value); + for (NumericDocValuesUpdate nu : in.numericDVUpdates) { + NumericDocValuesUpdate clone = new NumericDocValuesUpdate(nu.term, nu.field, (Long) nu.value); clone.docIDUpto = Integer.MAX_VALUE; numericDVUpdates.add(clone); } + + for (BinaryDocValuesUpdate bu : in.binaryDVUpdates) { + BinaryDocValuesUpdate clone = new BinaryDocValuesUpdate(bu.term, bu.field, (BytesRef) bu.value); + clone.docIDUpto = Integer.MAX_VALUE; + binaryDVUpdates.add(clone); + } } public Iterable termsIterable() { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java new file mode 100644 index 00000000000..91c8e0937bd --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesFieldUpdates.java @@ -0,0 +1,154 @@ +package org.apache.lucene.index; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.lucene.index.NumericDocValuesFieldUpdates; +import org.apache.lucene.search.DocIdSetIterator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Holds updates of a single DocValues field, for a set of documents. + * + * @lucene.experimental + */ +abstract class DocValuesFieldUpdates { + + static enum Type { NUMERIC, BINARY } + + /** + * An iterator over documents and their updated values. Only documents with + * updates are returned by this iterator, and the documents are returned in + * increasing order. + */ + static abstract class Iterator { + + /** + * Returns the next document which has an update, or + * {@link DocIdSetIterator#NO_MORE_DOCS} if there are no more documents to + * return. + */ + abstract int nextDoc(); + + /** Returns the current document this iterator is on. */ + abstract int doc(); + + /** + * Returns the value of the document returned from {@link #nextDoc()}. A + * {@code null} value means that it was unset for this document. + */ + abstract Object value(); + + /** + * Reset the iterator's state. Should be called before {@link #nextDoc()} + * and {@link #value()}. + */ + abstract void reset(); + + } + + static class Container { + + final Map numericDVUpdates = new HashMap<>(); + final Map binaryDVUpdates = new HashMap<>(); + + boolean any() { + for (NumericDocValuesFieldUpdates updates : numericDVUpdates.values()) { + if (updates.any()) { + return true; + } + } + for (BinaryDocValuesFieldUpdates updates : binaryDVUpdates.values()) { + if (updates.any()) { + return true; + } + } + return false; + } + + int size() { + return numericDVUpdates.size() + binaryDVUpdates.size(); + } + + DocValuesFieldUpdates getUpdates(String field, Type type) { + switch (type) { + case NUMERIC: + return numericDVUpdates.get(field); + case BINARY: + return binaryDVUpdates.get(field); + default: + throw new IllegalArgumentException("unsupported type: " + type); + } + } + + DocValuesFieldUpdates newUpdates(String field, Type type, int maxDoc) { + switch (type) { + case NUMERIC: + assert numericDVUpdates.get(field) == null; + NumericDocValuesFieldUpdates numericUpdates = new NumericDocValuesFieldUpdates(field, maxDoc); + numericDVUpdates.put(field, numericUpdates); + return numericUpdates; + case BINARY: + assert binaryDVUpdates.get(field) == null; + BinaryDocValuesFieldUpdates binaryUpdates = new BinaryDocValuesFieldUpdates(field, maxDoc); + binaryDVUpdates.put(field, binaryUpdates); + return binaryUpdates; + default: + throw new IllegalArgumentException("unsupported type: " + type); + } + } + + @Override + public String toString() { + return "numericDVUpdates=" + numericDVUpdates + " binaryDVUpdates=" + binaryDVUpdates; + } + } + + final String field; + final Type type; + + protected DocValuesFieldUpdates(String field, Type type) { + this.field = field; + this.type = type; + } + + /** + * Add an update to a document. For unsetting a value you should pass + * {@code null}. + */ + public abstract void add(int doc, Object value); + + /** + * Returns an {@link Iterator} over the updated documents and their + * values. + */ + public abstract Iterator iterator(); + + /** + * Merge with another {@link DocValuesFieldUpdates}. This is called for a + * segment which received updates while it was being merged. The given updates + * should override whatever updates are in that instance. + */ + public abstract void merge(DocValuesFieldUpdates other); + + /** Returns true if this instance contains any updates. + * @return TODO*/ + public abstract boolean any(); + +} diff --git a/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java new file mode 100644 index 00000000000..f23cca75bc7 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/DocValuesUpdate.java @@ -0,0 +1,113 @@ +package org.apache.lucene.index; + +import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER; +import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_CHAR; +import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_INT; +import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_HEADER; +import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF; + +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** An in-place update to a DocValues field. */ +abstract class DocValuesUpdate { + + /* Rough logic: OBJ_HEADER + 3*PTR + INT + * Term: OBJ_HEADER + 2*PTR + * Term.field: 2*OBJ_HEADER + 4*INT + PTR + string.length*CHAR + * Term.bytes: 2*OBJ_HEADER + 2*INT + PTR + bytes.length + * String: 2*OBJ_HEADER + 4*INT + PTR + string.length*CHAR + * T: OBJ_HEADER + */ + private static final int RAW_SIZE_IN_BYTES = 8*NUM_BYTES_OBJECT_HEADER + 8*NUM_BYTES_OBJECT_REF + 8*NUM_BYTES_INT; + + final DocValuesFieldUpdates.Type type; + final Term term; + final String field; + final Object value; + int docIDUpto = -1; // unassigned until applied, and confusing that it's here, when it's just used in BufferedDeletes... + + /** + * Constructor. + * + * @param term the {@link Term} which determines the documents that will be updated + * @param field the {@link NumericDocValuesField} to update + * @param value the updated value + */ + protected DocValuesUpdate(DocValuesFieldUpdates.Type type, Term term, String field, Object value) { + this.type = type; + this.term = term; + this.field = field; + this.value = value; + } + + abstract long valueSizeInBytes(); + + final int sizeInBytes() { + int sizeInBytes = RAW_SIZE_IN_BYTES; + sizeInBytes += term.field.length() * NUM_BYTES_CHAR; + sizeInBytes += term.bytes.bytes.length; + sizeInBytes += field.length() * NUM_BYTES_CHAR; + sizeInBytes += valueSizeInBytes(); + return sizeInBytes; + } + + @Override + public String toString() { + return "term=" + term + ",field=" + field + ",value=" + value; + } + + /** An in-place update to a binary DocValues field */ + static final class BinaryDocValuesUpdate extends DocValuesUpdate { + + /* Size of BytesRef: 2*INT + ARRAY_HEADER + PTR */ + private static final long RAW_VALUE_SIZE_IN_BYTES = NUM_BYTES_ARRAY_HEADER + 2*NUM_BYTES_INT + NUM_BYTES_OBJECT_REF; + + static final BytesRef MISSING = new BytesRef(); + + BinaryDocValuesUpdate(Term term, String field, BytesRef value) { + super(DocValuesFieldUpdates.Type.BINARY, term, field, value == null ? MISSING : value); + } + + @Override + long valueSizeInBytes() { + return RAW_VALUE_SIZE_IN_BYTES + ((BytesRef) value).bytes.length; + } + + } + + /** An in-place update to a numeric DocValues field */ + static final class NumericDocValuesUpdate extends DocValuesUpdate { + + static final Long MISSING = new Long(0); + + NumericDocValuesUpdate(Term term, String field, Long value) { + super(DocValuesFieldUpdates.Type.NUMERIC, term, field, value == null ? MISSING : value); + } + + @Override + long valueSizeInBytes() { + return RamUsageEstimator.NUM_BYTES_LONG; + } + + } + +} diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java index 4ad6748ca7e..404c430978e 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -29,10 +29,13 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket; import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment; import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState; +import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; +import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; import org.apache.lucene.index.IndexWriter.Event; import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.InfoStream; /** @@ -160,7 +163,14 @@ final class DocumentsWriter { synchronized boolean updateNumericDocValue(Term term, String field, Long value) throws IOException { final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; - deleteQueue.addNumericUpdate(new NumericUpdate(term, field, value)); + deleteQueue.addNumericUpdate(new NumericDocValuesUpdate(term, field, value)); + flushControl.doOnDelete(); + return applyAllDeletes(deleteQueue); + } + + synchronized boolean updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException { + final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; + deleteQueue.addBinaryUpdate(new BinaryDocValuesUpdate(term, field, value)); flushControl.doOnDelete(); return applyAllDeletes(deleteQueue); } diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java index f951cdff25d..02a554012e4 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterDeleteQueue.java @@ -21,6 +21,8 @@ import java.util.Arrays; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.ReentrantLock; +import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; +import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.search.Query; /** @@ -107,10 +109,15 @@ final class DocumentsWriterDeleteQueue { tryApplyGlobalSlice(); } - void addNumericUpdate(NumericUpdate update) { + void addNumericUpdate(NumericDocValuesUpdate update) { add(new NumericUpdateNode(update)); tryApplyGlobalSlice(); } + + void addBinaryUpdate(BinaryDocValuesUpdate update) { + add(new BinaryUpdateNode(update)); + tryApplyGlobalSlice(); + } /** * invariant for document update @@ -385,9 +392,9 @@ final class DocumentsWriterDeleteQueue { } } - private static final class NumericUpdateNode extends Node { + private static final class NumericUpdateNode extends Node { - NumericUpdateNode(NumericUpdate update) { + NumericUpdateNode(NumericDocValuesUpdate update) { super(update); } @@ -401,6 +408,23 @@ final class DocumentsWriterDeleteQueue { return "update=" + item; } } + + private static final class BinaryUpdateNode extends Node { + + BinaryUpdateNode(BinaryDocValuesUpdate update) { + super(update); + } + + @Override + void apply(BufferedUpdates bufferedUpdates, int docIDUpto) { + bufferedUpdates.addBinaryUpdate(item, docIDUpto); + } + + @Override + public String toString() { + return "update=" + item; + } + } private boolean forceApplyGlobalSlice() { globalBufferLock.lock(); diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 9a108436862..22f4e5358a5 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -480,7 +480,7 @@ class DocumentsWriterPerThread { } final BufferedUpdates segmentDeletes; - if (pendingUpdates.queries.isEmpty() && pendingUpdates.numericUpdates.isEmpty()) { + if (pendingUpdates.queries.isEmpty() && pendingUpdates.numericUpdates.isEmpty() && pendingUpdates.binaryUpdates.isEmpty()) { pendingUpdates.clear(); segmentDeletes = null; } else { diff --git a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java index d23139e4278..bf4664f5a01 100644 --- a/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/FrozenBufferedUpdates.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit; +import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate; +import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; import org.apache.lucene.search.Query; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.RamUsageEstimator; @@ -48,7 +50,10 @@ class FrozenBufferedUpdates { final int[] queryLimits; // numeric DV update term and their updates - final NumericUpdate[] updates; + final NumericDocValuesUpdate[] numericDVUpdates; + + // binary DV update term and their updates + final BinaryDocValuesUpdate[] binaryDVUpdates; final int bytesUsed; final int numTermDeletes; @@ -83,17 +88,34 @@ class FrozenBufferedUpdates { // so that it maps to all fields it affects, sorted by their docUpto, and traverse // that Term only once, applying the update to all fields that still need to be // updated. - List allUpdates = new ArrayList<>(); + List allNumericUpdates = new ArrayList<>(); int numericUpdatesSize = 0; - for (LinkedHashMap fieldUpdates : deletes.numericUpdates.values()) { - for (NumericUpdate update : fieldUpdates.values()) { - allUpdates.add(update); + for (LinkedHashMap numericUpdates : deletes.numericUpdates.values()) { + for (NumericDocValuesUpdate update : numericUpdates.values()) { + allNumericUpdates.add(update); numericUpdatesSize += update.sizeInBytes(); } } - updates = allUpdates.toArray(new NumericUpdate[allUpdates.size()]); + numericDVUpdates = allNumericUpdates.toArray(new NumericDocValuesUpdate[allNumericUpdates.size()]); + + // TODO if a Term affects multiple fields, we could keep the updates key'd by Term + // so that it maps to all fields it affects, sorted by their docUpto, and traverse + // that Term only once, applying the update to all fields that still need to be + // updated. + List allBinaryUpdates = new ArrayList<>(); + int binaryUpdatesSize = 0; + for (LinkedHashMap binaryUpdates : deletes.binaryUpdates.values()) { + for (BinaryDocValuesUpdate update : binaryUpdates.values()) { + allBinaryUpdates.add(update); + binaryUpdatesSize += update.sizeInBytes(); + } + } + binaryDVUpdates = allBinaryUpdates.toArray(new BinaryDocValuesUpdate[allBinaryUpdates.size()]); + + bytesUsed = (int) terms.getSizeInBytes() + queries.length * BYTES_PER_DEL_QUERY + + numericUpdatesSize + numericDVUpdates.length * RamUsageEstimator.NUM_BYTES_OBJECT_REF + + binaryUpdatesSize + binaryDVUpdates.length * RamUsageEstimator.NUM_BYTES_OBJECT_REF; - bytesUsed = (int) terms.getSizeInBytes() + queries.length * BYTES_PER_DEL_QUERY + numericUpdatesSize + updates.length * RamUsageEstimator.NUM_BYTES_OBJECT_REF; numTermDeletes = deletes.numTermDeletes.get(); } @@ -161,6 +183,6 @@ class FrozenBufferedUpdates { } boolean any() { - return termCount > 0 || queries.length > 0 || updates.length > 0; + return termCount > 0 || queries.length > 0 || numericDVUpdates.length > 0 || binaryDVUpdates.length > 0; } } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 4083a314989..89ac4de8cb5 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -17,28 +17,6 @@ package org.apache.lucene.index; * limitations under the License. */ -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.codecs.Codec; -import org.apache.lucene.index.FieldInfo.DocValuesType; -import org.apache.lucene.index.FieldInfos.FieldNumbers; -import org.apache.lucene.index.IndexWriterConfig.OpenMode; -import org.apache.lucene.index.MergeState.CheckAbort; -import org.apache.lucene.index.NumericFieldUpdates.UpdatesIterator; -import org.apache.lucene.search.Query; -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.CompoundFileDirectory; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.Lock; -import org.apache.lucene.store.LockObtainFailedException; -import org.apache.lucene.store.MergeInfo; -import org.apache.lucene.store.TrackingDirectoryWrapper; -import org.apache.lucene.util.Bits; -import org.apache.lucene.util.Constants; -import org.apache.lucene.util.IOUtils; -import org.apache.lucene.util.InfoStream; -import org.apache.lucene.util.ThreadInterruptedException; - import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; @@ -57,6 +35,28 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.FieldInfo.DocValuesType; +import org.apache.lucene.index.FieldInfos.FieldNumbers; +import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.index.MergeState.CheckAbort; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.CompoundFileDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.store.MergeInfo; +import org.apache.lucene.store.TrackingDirectoryWrapper; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.Constants; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + /** An IndexWriter creates and maintains an index. @@ -1542,10 +1542,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ } /** - * Updates a document's NumericDocValue for field to the given - * value. This method can be used to 'unset' a document's value - * by passing {@code null} as the new value. Also, you can only update fields - * that already exist in the index, not add new fields through this method. + * Updates a document's {@link NumericDocValues} for field to the + * given value. This method can be used to 'unset' a document's + * value by passing {@code null} as the new value. Also, you can only update + * fields that already exist in the index, not add new fields through this + * method. * *

* NOTE: if this method hits an OutOfMemoryError you should immediately @@ -1555,7 +1556,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ * @param term * the term to identify the document(s) to be updated * @param field - * field name of the NumericDocValues field + * field name of the {@link NumericDocValues} field * @param value * new value for the field * @throws CorruptIndexException @@ -1577,6 +1578,47 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ } } + /** + * Updates a document's {@link BinaryDocValues} for field to the + * given value. This method can be used to 'unset' a document's + * value by passing {@code null} as the new value. Also, you can only update + * fields that already exist in the index, not add new fields through this + * method. + * + *

+ * NOTE: this method currently replaces the existing value of all + * affected documents with the new value. + * + *

+ * NOTE: if this method hits an OutOfMemoryError you should immediately + * close the writer. See above for details. + *

+ * + * @param term + * the term to identify the document(s) to be updated + * @param field + * field name of the {@link BinaryDocValues} field + * @param value + * new value for the field + * @throws CorruptIndexException + * if the index is corrupt + * @throws IOException + * if there is a low-level IO error + */ + public void updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException { + ensureOpen(); + if (!globalFieldNumberMap.contains(field, DocValuesType.BINARY)) { + throw new IllegalArgumentException("can only update existing binary-docvalues fields!"); + } + try { + if (docWriter.updateBinaryDocValue(term, field, value)) { + processEvents(true, false); + } + } catch (OutOfMemoryError oom) { + handleOOM(oom, "updateBinaryDocValue"); + } + } + // for test purpose final synchronized int getSegmentCount(){ return segmentInfos.size(); @@ -3162,13 +3204,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ return docMap; } - private void skipDeletedDoc(UpdatesIterator[] updatesIters, int deletedDoc) { - for (UpdatesIterator iter : updatesIters) { + private void skipDeletedDoc(DocValuesFieldUpdates.Iterator[] updatesIters, int deletedDoc) { + for (DocValuesFieldUpdates.Iterator iter : updatesIters) { if (iter.doc() == deletedDoc) { iter.nextDoc(); } // when entering the method, all iterators must already be beyond the - // deleted document, or right on it, in which case we advance them above + // deleted document, or right on it, in which case we advance them over // and they must be beyond it now. assert iter.doc() > deletedDoc : "updateDoc=" + iter.doc() + " deletedDoc=" + deletedDoc; } @@ -3203,7 +3245,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ ReadersAndUpdates mergedDeletesAndUpdates = null; boolean initWritableLiveDocs = false; MergePolicy.DocMap docMap = null; - final Map mergedFieldUpdates = new HashMap<>(); + final DocValuesFieldUpdates.Container mergedDVUpdates = new DocValuesFieldUpdates.Container(); for (int i = 0; i < sourceSegments.size(); i++) { SegmentCommitInfo info = sourceSegments.get(i); @@ -3214,19 +3256,28 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ // We hold a ref so it should still be in the pool: assert rld != null: "seg=" + info.info.name; final Bits currentLiveDocs = rld.getLiveDocs(); - final Map mergingFieldUpdates = rld.getMergingFieldUpdates(); + final Map mergingFieldUpdates = rld.getMergingFieldUpdates(); final String[] mergingFields; - final UpdatesIterator[] updatesIters; + final DocValuesFieldUpdates[] dvFieldUpdates; + final DocValuesFieldUpdates.Iterator[] updatesIters; if (mergingFieldUpdates.isEmpty()) { mergingFields = null; updatesIters = null; + dvFieldUpdates = null; } else { mergingFields = new String[mergingFieldUpdates.size()]; - updatesIters = new UpdatesIterator[mergingFieldUpdates.size()]; + dvFieldUpdates = new DocValuesFieldUpdates[mergingFieldUpdates.size()]; + updatesIters = new DocValuesFieldUpdates.Iterator[mergingFieldUpdates.size()]; int idx = 0; - for (Entry e : mergingFieldUpdates.entrySet()) { - mergingFields[idx] = e.getKey(); - updatesIters[idx] = e.getValue().getUpdates(); + for (Entry e : mergingFieldUpdates.entrySet()) { + String field = e.getKey(); + DocValuesFieldUpdates updates = e.getValue(); + mergingFields[idx] = field; + dvFieldUpdates[idx] = mergedDVUpdates.getUpdates(field, updates.type); + if (dvFieldUpdates[idx] == null) { + dvFieldUpdates[idx] = mergedDVUpdates.newUpdates(field, updates.type, mergeState.segmentInfo.getDocCount()); + } + updatesIters[idx] = updates.iterator(); updatesIters[idx].nextDoc(); // advance to first update doc ++idx; } @@ -3279,7 +3330,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ // document isn't deleted, check if any of the fields have an update to it int newDoc = -1; for (int idx = 0; idx < mergingFields.length; idx++) { - UpdatesIterator updatesIter = updatesIters[idx]; + DocValuesFieldUpdates.Iterator updatesIter = updatesIters[idx]; if (updatesIter.doc() == j) { // document has an update if (mergedDeletesAndUpdates == null) { mergedDeletesAndUpdates = readerPool.get(merge.info, true); @@ -3288,14 +3339,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ if (newDoc == -1) { // map once per all field updates, but only if there are any updates newDoc = docMap.map(docUpto); } - String field = mergingFields[idx]; - NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field); - if (fieldUpdates == null) { - // an approximantion of maxDoc, used to compute best bitsPerValue - fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount()); - mergedFieldUpdates.put(field, fieldUpdates); - } - fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value()); + DocValuesFieldUpdates dvUpdates = dvFieldUpdates[idx]; + dvUpdates.add(newDoc, updatesIter.value()); updatesIter.nextDoc(); // advance to next document } else { assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j; @@ -3312,7 +3357,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ // document isn't deleted, check if any of the fields have an update to it int newDoc = -1; for (int idx = 0; idx < mergingFields.length; idx++) { - UpdatesIterator updatesIter = updatesIters[idx]; + DocValuesFieldUpdates.Iterator updatesIter = updatesIters[idx]; if (updatesIter.doc() == j) { // document has an update if (mergedDeletesAndUpdates == null) { mergedDeletesAndUpdates = readerPool.get(merge.info, true); @@ -3321,14 +3366,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ if (newDoc == -1) { // map once per all field updates, but only if there are any updates newDoc = docMap.map(docUpto); } - String field = mergingFields[idx]; - NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field); - if (fieldUpdates == null) { - // an approximantion of maxDoc, used to compute best bitsPerValue - fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount()); - mergedFieldUpdates.put(field, fieldUpdates); - } - fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value()); + DocValuesFieldUpdates dvUpdates = dvFieldUpdates[idx]; + dvUpdates.add(newDoc, updatesIter.value()); updatesIter.nextDoc(); // advance to next document } else { assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j; @@ -3367,7 +3406,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ // document isn't deleted, check if any of the fields have an update to it int newDoc = -1; for (int idx = 0; idx < mergingFields.length; idx++) { - UpdatesIterator updatesIter = updatesIters[idx]; + DocValuesFieldUpdates.Iterator updatesIter = updatesIters[idx]; if (updatesIter.doc() == j) { // document has an update if (mergedDeletesAndUpdates == null) { mergedDeletesAndUpdates = readerPool.get(merge.info, true); @@ -3376,14 +3415,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ if (newDoc == -1) { // map once per all field updates, but only if there are any updates newDoc = docMap.map(docUpto); } - String field = mergingFields[idx]; - NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field); - if (fieldUpdates == null) { - // an approximantion of maxDoc, used to compute best bitsPerValue - fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount()); - mergedFieldUpdates.put(field, fieldUpdates); - } - fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value()); + DocValuesFieldUpdates dvUpdates = dvFieldUpdates[idx]; + dvUpdates.add(newDoc, updatesIter.value()); updatesIter.nextDoc(); // advance to next document } else { assert updatesIter.doc() > j : "field=" + mergingFields[idx] + " updateDoc=" + updatesIter.doc() + " curDoc=" + j; @@ -3397,7 +3430,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ for (int j = 0; j < docCount; j++) { int newDoc = -1; for (int idx = 0; idx < mergingFields.length; idx++) { - UpdatesIterator updatesIter = updatesIters[idx]; + DocValuesFieldUpdates.Iterator updatesIter = updatesIters[idx]; if (updatesIter.doc() == j) { // document has an update if (mergedDeletesAndUpdates == null) { mergedDeletesAndUpdates = readerPool.get(merge.info, true); @@ -3406,14 +3439,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ if (newDoc == -1) { // map once per all field updates, but only if there are any updates newDoc = docMap.map(docUpto); } - String field = mergingFields[idx]; - NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field); - if (fieldUpdates == null) { - // an approximantion of maxDoc, used to compute best bitsPerValue - fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount()); - mergedFieldUpdates.put(field, fieldUpdates); - } - fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value()); + DocValuesFieldUpdates dvUpdates = dvFieldUpdates[idx]; + dvUpdates.add(newDoc, updatesIter.value()); updatesIter.nextDoc(); // advance to next document } else { assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j; @@ -3430,7 +3457,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ assert docUpto == merge.info.info.getDocCount(); - if (!mergedFieldUpdates.isEmpty()) { + if (mergedDVUpdates.any()) { // System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: mergedDeletes.info=" + mergedDeletes.info + ", mergedFieldUpdates=" + mergedFieldUpdates); boolean success = false; try { @@ -3440,7 +3467,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ // NOTE: currently this is the only place which throws a true // IOException. If this ever changes, we need to extend that try/finally // block to the rest of the method too. - mergedDeletesAndUpdates.writeFieldUpdates(directory, mergedFieldUpdates); + mergedDeletesAndUpdates.writeFieldUpdates(directory, mergedDVUpdates); success = true; } finally { if (!success) { @@ -3455,8 +3482,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{ infoStream.message("IW", "no new deletes or field updates since merge started"); } else { String msg = mergedDeletesAndUpdates.getPendingDeleteCount() + " new deletes"; - if (!mergedFieldUpdates.isEmpty()) { - msg += " and " + mergedFieldUpdates.size() + " new field updates"; + if (mergedDVUpdates.any()) { + msg += " and " + mergedDVUpdates.size() + " new field updates"; } msg += " since merge started"; infoStream.message("IW", msg); diff --git a/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java new file mode 100644 index 00000000000..0957679acf7 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/NumericDocValuesFieldUpdates.java @@ -0,0 +1,201 @@ +package org.apache.lucene.index; + +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.InPlaceMergeSorter; +import org.apache.lucene.util.packed.PackedInts; +import org.apache.lucene.util.packed.PagedGrowableWriter; +import org.apache.lucene.util.packed.PagedMutable; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A {@link DocValuesFieldUpdates} which holds updates of documents, of a single + * {@link NumericDocValuesField}. + * + * @lucene.experimental + */ +class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates { + + final static class Iterator extends DocValuesFieldUpdates.Iterator { + private final int size; + private final PagedGrowableWriter values; + private final FixedBitSet docsWithField; + private final PagedMutable docs; + private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE + private int doc = -1; + private Long value = null; + + Iterator(int size, PagedGrowableWriter values, FixedBitSet docsWithField, PagedMutable docs) { + this.size = size; + this.values = values; + this.docsWithField = docsWithField; + this.docs = docs; + } + + @Override + Long value() { + return value; + } + + @Override + int nextDoc() { + if (idx >= size) { + value = null; + return doc = DocIdSetIterator.NO_MORE_DOCS; + } + doc = (int) docs.get(idx); + ++idx; + while (idx < size && docs.get(idx) == doc) { + ++idx; + } + if (!docsWithField.get((int) (idx - 1))) { + value = null; + } else { + // idx points to the "next" element + value = Long.valueOf(values.get(idx - 1)); + } + return doc; + } + + @Override + int doc() { + return doc; + } + + @Override + void reset() { + doc = -1; + value = null; + idx = 0; + } + } + + private FixedBitSet docsWithField; + private PagedMutable docs; + private PagedGrowableWriter values; + private int size; + + public NumericDocValuesFieldUpdates(String field, int maxDoc) { + super(field, Type.NUMERIC); + docsWithField = new FixedBitSet(64); + docs = new PagedMutable(1, 1024, PackedInts.bitsRequired(maxDoc - 1), PackedInts.COMPACT); + values = new PagedGrowableWriter(1, 1024, 1, PackedInts.FAST); + size = 0; + } + + @Override + public void add(int doc, Object value) { + // TODO: if the Sorter interface changes to take long indexes, we can remove that limitation + if (size == Integer.MAX_VALUE) { + throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries"); + } + + Long val = (Long) value; + if (val == null) { + val = NumericDocValuesUpdate.MISSING; + } + + // grow the structures to have room for more elements + if (docs.size() == size) { + docs = docs.grow(size + 1); + values = values.grow(size + 1); + docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size()); + } + + if (val != NumericDocValuesUpdate.MISSING) { + // only mark the document as having a value in that field if the value wasn't set to null (MISSING) + docsWithField.set(size); + } + + docs.set(size, doc); + values.set(size, val.longValue()); + ++size; + } + + @Override + public Iterator iterator() { + final PagedMutable docs = this.docs; + final PagedGrowableWriter values = this.values; + final FixedBitSet docsWithField = this.docsWithField; + new InPlaceMergeSorter() { + @Override + protected void swap(int i, int j) { + long tmpDoc = docs.get(j); + docs.set(j, docs.get(i)); + docs.set(i, tmpDoc); + + long tmpVal = values.get(j); + values.set(j, values.get(i)); + values.set(i, tmpVal); + + boolean tmpBool = docsWithField.get(j); + if (docsWithField.get(i)) { + docsWithField.set(j); + } else { + docsWithField.clear(j); + } + if (tmpBool) { + docsWithField.set(i); + } else { + docsWithField.clear(i); + } + } + + @Override + protected int compare(int i, int j) { + int x = (int) docs.get(i); + int y = (int) docs.get(j); + return (x < y) ? -1 : ((x == y) ? 0 : 1); + } + }.sort(0, size); + + return new Iterator(size, values, docsWithField, docs); + } + + @Override + public void merge(DocValuesFieldUpdates other) { + assert other instanceof NumericDocValuesFieldUpdates; + NumericDocValuesFieldUpdates otherUpdates = (NumericDocValuesFieldUpdates) other; + if (size + otherUpdates.size > Integer.MAX_VALUE) { + throw new IllegalStateException( + "cannot support more than Integer.MAX_VALUE doc/value entries; size=" + + size + " other.size=" + otherUpdates.size); + } + docs = docs.grow(size + otherUpdates.size); + values = values.grow(size + otherUpdates.size); + docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size()); + for (int i = 0; i < otherUpdates.size; i++) { + int doc = (int) otherUpdates.docs.get(i); + if (otherUpdates.docsWithField.get(i)) { + docsWithField.set(size); + } + docs.set(size, doc); + values.set(size, otherUpdates.values.get(i)); + ++size; + } + } + + @Override + public boolean any() { + return size > 0; + } + +} diff --git a/lucene/core/src/java/org/apache/lucene/index/NumericFieldUpdates.java b/lucene/core/src/java/org/apache/lucene/index/NumericFieldUpdates.java deleted file mode 100755 index 2935365d029..00000000000 --- a/lucene/core/src/java/org/apache/lucene/index/NumericFieldUpdates.java +++ /dev/null @@ -1,249 +0,0 @@ -package org.apache.lucene.index; - -import org.apache.lucene.document.NumericDocValuesField; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.util.FixedBitSet; -import org.apache.lucene.util.InPlaceMergeSorter; -import org.apache.lucene.util.packed.PackedInts; -import org.apache.lucene.util.packed.PagedGrowableWriter; -import org.apache.lucene.util.packed.PagedMutable; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Holds numeric values updates of documents, of a single - * {@link NumericDocValuesField}. - * - * @lucene.experimental - */ -interface NumericFieldUpdates { - - /** - * An iterator over documents and their updated values. Only documents with - * updates are returned by this iterator, and the documents are returned in - * increasing order. - */ - static abstract class UpdatesIterator { - - /** - * Returns the next document which has an update, or - * {@link DocIdSetIterator#NO_MORE_DOCS} if there are no more documents to - * return. - */ - abstract int nextDoc(); - - /** Returns the current document this iterator is on. */ - abstract int doc(); - - /** - * Returns the value of the document returned from {@link #nextDoc()}. A - * {@code null} value means that it was unset for this document. - */ - abstract Long value(); - - /** - * Reset the iterator's state. Should be called before {@link #nextDoc()} - * and {@link #value()}. - */ - abstract void reset(); - - } - - /** - * A {@link NumericFieldUpdates} which holds the updated documents and values - * in packed structures. Only supports up to 2B entries (docs and values) - * since we need to sort the docs/values and the Sorter interfaces currently - * only take integer indexes. - */ - static final class PackedNumericFieldUpdates implements NumericFieldUpdates { - - private FixedBitSet docsWithField; - private PagedMutable docs; - private PagedGrowableWriter values; - private int size; - - public PackedNumericFieldUpdates(int maxDoc) { - docsWithField = new FixedBitSet(64); - docs = new PagedMutable(1, 1024, PackedInts.bitsRequired(maxDoc - 1), PackedInts.COMPACT); - values = new PagedGrowableWriter(1, 1024, 1, PackedInts.FAST); - size = 0; - } - - @Override - public void add(int doc, Long value) { - assert value != null; - // TODO: if the Sorter interface changes to take long indexes, we can remove that limitation - if (size == Integer.MAX_VALUE) { - throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries"); - } - - // grow the structures to have room for more elements - if (docs.size() == size) { - docs = docs.grow(size + 1); - values = values.grow(size + 1); - docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size()); - } - - if (value != NumericUpdate.MISSING) { - // only mark the document as having a value in that field if the value wasn't set to null (MISSING) - docsWithField.set(size); - } - - docs.set(size, doc); - values.set(size, value.longValue()); - ++size; - } - - @Override - public UpdatesIterator getUpdates() { - final PagedMutable docs = this.docs; - final PagedGrowableWriter values = this.values; - final FixedBitSet docsWithField = this.docsWithField; - new InPlaceMergeSorter() { - @Override - protected void swap(int i, int j) { - long tmpDoc = docs.get(j); - docs.set(j, docs.get(i)); - docs.set(i, tmpDoc); - - long tmpVal = values.get(j); - values.set(j, values.get(i)); - values.set(i, tmpVal); - - boolean tmpBool = docsWithField.get(j); - if (docsWithField.get(i)) { - docsWithField.set(j); - } else { - docsWithField.clear(j); - } - if (tmpBool) { - docsWithField.set(i); - } else { - docsWithField.clear(i); - } - } - - @Override - protected int compare(int i, int j) { - int x = (int) docs.get(i); - int y = (int) docs.get(j); - return (x < y) ? -1 : ((x == y) ? 0 : 1); - } - }.sort(0, size); - - final int size = this.size; - return new UpdatesIterator() { - private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE - private int doc = -1; - private Long value = null; - - @Override - Long value() { - return value; - } - - @Override - int nextDoc() { - if (idx >= size) { - value = null; - return doc = DocIdSetIterator.NO_MORE_DOCS; - } - doc = (int) docs.get(idx); - ++idx; - while (idx < size && docs.get(idx) == doc) { - ++idx; - } - if (!docsWithField.get((int) (idx - 1))) { - value = null; - } else { - // idx points to the "next" element - value = Long.valueOf(values.get(idx - 1)); - } - return doc; - } - - @Override - int doc() { - return doc; - } - - @Override - void reset() { - doc = -1; - value = null; - idx = 0; - } - }; - } - - @Override - public void merge(NumericFieldUpdates other) { - if (other instanceof PackedNumericFieldUpdates) { - PackedNumericFieldUpdates packedOther = (PackedNumericFieldUpdates) other; - if (size + packedOther.size > Integer.MAX_VALUE) { - throw new IllegalStateException( - "cannot support more than Integer.MAX_VALUE doc/value entries; size=" - + size + " other.size=" + packedOther.size); - } - docs = docs.grow(size + packedOther.size); - values = values.grow(size + packedOther.size); - docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size()); - for (int i = 0; i < packedOther.size; i++) { - int doc = (int) packedOther.docs.get(i); - if (packedOther.docsWithField.get(i)) { - docsWithField.set(size); - } - docs.set(size, doc); - values.set(size, packedOther.values.get(i)); - ++size; - } - } else { - UpdatesIterator iter = other.getUpdates(); - int doc; - while ((doc = iter.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - Long value = iter.value(); - if (value == null) { - value = NumericUpdate.MISSING; - } - add(doc, value); - } - } - } - - } - - /** - * Add an update to a document. For unsetting a value you should pass - * {@link NumericUpdate#MISSING} instead of {@code null}. - */ - public void add(int doc, Long value); - - /** - * Returns an {@link UpdatesIterator} over the updated documents and their - * values. - */ - public UpdatesIterator getUpdates(); - - /** - * Merge with another {@link NumericFieldUpdates}. This is called for a - * segment which received updates while it was being merged. The given updates - * should override whatever numeric updates are in that instance. - */ - public void merge(NumericFieldUpdates other); - -} diff --git a/lucene/core/src/java/org/apache/lucene/index/NumericUpdate.java b/lucene/core/src/java/org/apache/lucene/index/NumericUpdate.java deleted file mode 100644 index bf6265d9b1a..00000000000 --- a/lucene/core/src/java/org/apache/lucene/index/NumericUpdate.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.apache.lucene.index; - -import static org.apache.lucene.util.RamUsageEstimator.*; - -import org.apache.lucene.document.NumericDocValuesField; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** An in-place update to a numeric docvalues field */ -final class NumericUpdate { - - /* Rough logic: OBJ_HEADER + 3*PTR + INT - * Term: OBJ_HEADER + 2*PTR - * Term.field: 2*OBJ_HEADER + 4*INT + PTR + string.length*CHAR - * Term.bytes: 2*OBJ_HEADER + 2*INT + PTR + bytes.length - * String: 2*OBJ_HEADER + 4*INT + PTR + string.length*CHAR - * Long: OBJ_HEADER + LONG - */ - private static final int RAW_SIZE_IN_BYTES = 9*NUM_BYTES_OBJECT_HEADER + 8*NUM_BYTES_OBJECT_REF + 8*NUM_BYTES_INT + NUM_BYTES_LONG; - - static final Long MISSING = new Long(0); - - Term term; - String field; - Long value; - int docIDUpto = -1; // unassigned until applied, and confusing that it's here, when it's just used in BufferedDeletes... - - /** - * Constructor. - * - * @param term the {@link Term} which determines the documents that will be updated - * @param field the {@link NumericDocValuesField} to update - * @param value the updated value - */ - NumericUpdate(Term term, String field, Long value) { - this.term = term; - this.field = field; - this.value = value == null ? MISSING : value; - } - - int sizeInBytes() { - int sizeInBytes = RAW_SIZE_IN_BYTES; - sizeInBytes += term.field.length() * NUM_BYTES_CHAR; - sizeInBytes += term.bytes.bytes.length; - sizeInBytes += field.length() * NUM_BYTES_CHAR; - return sizeInBytes; - } - - @Override - public String toString() { - return "term=" + term + ",field=" + field + ",value=" + value; - } -} diff --git a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java index 23b849ea53d..76b2d6629f1 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java @@ -30,12 +30,13 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.LiveDocsFormat; +import org.apache.lucene.document.BinaryDocValuesField; import org.apache.lucene.document.NumericDocValuesField; -import org.apache.lucene.index.NumericFieldUpdates.UpdatesIterator; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.TrackingDirectoryWrapper; import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.MutableBits; @@ -78,7 +79,7 @@ class ReadersAndUpdates { // updates on the merged segment too. private boolean isMerging = false; - private final Map mergingNumericUpdates = new HashMap<>(); + private final Map mergingDVUpdates = new HashMap<>(); public ReadersAndUpdates(IndexWriter writer, SegmentCommitInfo info) { this.info = info; @@ -294,11 +295,11 @@ class ReadersAndUpdates { } // Writes field updates (new _X_N updates files) to the directory - public synchronized void writeFieldUpdates(Directory dir, Map numericFieldUpdates) throws IOException { + public synchronized void writeFieldUpdates(Directory dir, DocValuesFieldUpdates.Container dvUpdates) throws IOException { assert Thread.holdsLock(writer); //System.out.println("rld.writeFieldUpdates: seg=" + info + " numericFieldUpdates=" + numericFieldUpdates); - assert numericFieldUpdates != null && !numericFieldUpdates.isEmpty(); + assert dvUpdates.any(); // Do this so we can delete any created files on // exception; this saves all codecs from having to do @@ -330,9 +331,13 @@ class ReadersAndUpdates { clone.setDocValuesGen(fi.getDocValuesGen()); } // create new fields or update existing ones to have NumericDV type - for (String f : numericFieldUpdates.keySet()) { + for (String f : dvUpdates.numericDVUpdates.keySet()) { builder.addOrUpdate(f, NumericDocValuesField.TYPE); } + // create new fields or update existing ones to have BinaryDV type + for (String f : dvUpdates.binaryDVUpdates.keySet()) { + builder.addOrUpdate(f, BinaryDocValuesField.TYPE); + } fieldInfos = builder.finish(); final long nextFieldInfosGen = info.getNextFieldInfosGen(); @@ -342,10 +347,10 @@ class ReadersAndUpdates { final DocValuesConsumer fieldsConsumer = docValuesFormat.fieldsConsumer(state); boolean fieldsConsumerSuccess = false; try { -// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: applying updates; seg=" + info + " updates=" + numericUpdates); - for (Entry e : numericFieldUpdates.entrySet()) { +// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeFieldUpdates: applying numeric updates; seg=" + info + " updates=" + numericFieldUpdates); + for (Entry e : dvUpdates.numericDVUpdates.entrySet()) { final String field = e.getKey(); - final NumericFieldUpdates fieldUpdates = e.getValue(); + final NumericDocValuesFieldUpdates fieldUpdates = e.getValue(); final FieldInfo fieldInfo = fieldInfos.fieldInfo(field); assert fieldInfo != null; @@ -355,7 +360,7 @@ class ReadersAndUpdates { final NumericDocValues currentValues = reader.getNumericDocValues(field); final Bits docsWithField = reader.getDocsWithField(field); final int maxDoc = reader.maxDoc(); - final UpdatesIterator updatesIter = fieldUpdates.getUpdates(); + final NumericDocValuesFieldUpdates.Iterator updatesIter = fieldUpdates.iterator(); @Override public Iterator iterator() { updatesIter.reset(); @@ -398,7 +403,68 @@ class ReadersAndUpdates { } }); } - + +// System.out.println("[" + Thread.currentThread().getName() + "] RAU.writeFieldUpdates: applying binary updates; seg=" + info + " updates=" + dvUpdates.binaryDVUpdates); + for (Entry e : dvUpdates.binaryDVUpdates.entrySet()) { + final String field = e.getKey(); + final BinaryDocValuesFieldUpdates dvFieldUpdates = e.getValue(); + final FieldInfo fieldInfo = fieldInfos.fieldInfo(field); + assert fieldInfo != null; + +// System.out.println("[" + Thread.currentThread().getName() + "] RAU.writeFieldUpdates: applying binary updates; seg=" + info + " f=" + dvFieldUpdates + ", updates=" + dvFieldUpdates); + + fieldInfo.setDocValuesGen(nextFieldInfosGen); + // write the numeric updates to a new gen'd docvalues file + fieldsConsumer.addBinaryField(fieldInfo, new Iterable() { + final BinaryDocValues currentValues = reader.getBinaryDocValues(field); + final Bits docsWithField = reader.getDocsWithField(field); + final int maxDoc = reader.maxDoc(); + final BinaryDocValuesFieldUpdates.Iterator updatesIter = dvFieldUpdates.iterator(); + @Override + public Iterator iterator() { + updatesIter.reset(); + return new Iterator() { + + int curDoc = -1; + int updateDoc = updatesIter.nextDoc(); + BytesRef scratch = new BytesRef(); + + @Override + public boolean hasNext() { + return curDoc < maxDoc - 1; + } + + @Override + public BytesRef next() { + if (++curDoc >= maxDoc) { + throw new NoSuchElementException("no more documents to return values for"); + } + if (curDoc == updateDoc) { // this document has an updated value + BytesRef value = updatesIter.value(); // either null (unset value) or updated value + updateDoc = updatesIter.nextDoc(); // prepare for next round + return value; + } else { + // no update for this document + assert curDoc < updateDoc; + if (currentValues != null && docsWithField.get(curDoc)) { + // only read the current value if the document had a value before + currentValues.get(curDoc, scratch); + return scratch; + } else { + return null; + } + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("this iterator does not support removing elements"); + } + }; + } + }); + } + codec.fieldInfosFormat().getFieldInfosWriter().write(trackingDir, info.info.name, segmentSuffix, fieldInfos, IOContext.DEFAULT); fieldsConsumerSuccess = true; } finally { @@ -436,12 +502,20 @@ class ReadersAndUpdates { info.advanceFieldInfosGen(); // copy all the updates to mergingUpdates, so they can later be applied to the merged segment if (isMerging) { - for (Entry e : numericFieldUpdates.entrySet()) { - NumericFieldUpdates fieldUpdates = mergingNumericUpdates.get(e.getKey()); - if (fieldUpdates == null) { - mergingNumericUpdates.put(e.getKey(), e.getValue()); + for (Entry e : dvUpdates.numericDVUpdates.entrySet()) { + DocValuesFieldUpdates updates = mergingDVUpdates.get(e.getKey()); + if (updates == null) { + mergingDVUpdates.put(e.getKey(), e.getValue()); } else { - fieldUpdates.merge(e.getValue()); + updates.merge(e.getValue()); + } + } + for (Entry e : dvUpdates.binaryDVUpdates.entrySet()) { + DocValuesFieldUpdates updates = mergingDVUpdates.get(e.getKey()); + if (updates == null) { + mergingDVUpdates.put(e.getKey(), e.getValue()); + } else { + updates.merge(e.getValue()); } } } @@ -502,13 +576,13 @@ class ReadersAndUpdates { * finished merging (whether successfully or not). */ public synchronized void dropMergingUpdates() { - mergingNumericUpdates.clear(); + mergingDVUpdates.clear(); isMerging = false; } /** Returns updates that came in while this segment was merging. */ - public synchronized Map getMergingFieldUpdates() { - return mergingNumericUpdates; + public synchronized Map getMergingFieldUpdates() { + return mergingDVUpdates; } @Override diff --git a/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java new file mode 100644 index 00000000000..3c3d8da50c1 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestBinaryDocValuesUpdates.java @@ -0,0 +1,1451 @@ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.analysis.MockTokenizer; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.asserting.AssertingDocValuesFormat; +import org.apache.lucene.codecs.lucene40.Lucene40RWCodec; +import org.apache.lucene.codecs.lucene41.Lucene41RWCodec; +import org.apache.lucene.codecs.lucene42.Lucene42RWCodec; +import org.apache.lucene.codecs.lucene45.Lucene45DocValuesFormat; +import org.apache.lucene.codecs.lucene45.Lucene45RWCodec; +import org.apache.lucene.codecs.lucene46.Lucene46Codec; +import org.apache.lucene.document.BinaryDocValuesField; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; +import org.apache.lucene.util.TestUtil; + +import com.carrotsearch.randomizedtesting.generators.RandomPicks; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@SuppressCodecs({"Lucene40","Lucene41","Lucene42","Lucene45"}) +public class TestBinaryDocValuesUpdates extends LuceneTestCase { + + static long getValue(BinaryDocValues bdv, int idx, BytesRef scratch) { + bdv.get(idx, scratch); + idx = scratch.offset; + byte b = scratch.bytes[idx++]; + long value = b & 0x7FL; + for (int shift = 7; (b & 0x80L) != 0; shift += 7) { + b = scratch.bytes[idx++]; + value |= (b & 0x7FL) << shift; + } + return value; + } + + // encodes a long into a BytesRef as VLong so that we get varying number of bytes when we update + static BytesRef toBytes(long value) { +// long orig = value; + BytesRef bytes = new BytesRef(10); // negative longs may take 10 bytes + while ((value & ~0x7FL) != 0L) { + bytes.bytes[bytes.length++] = (byte) ((value & 0x7FL) | 0x80L); + value >>>= 7; + } + bytes.bytes[bytes.length++] = (byte) value; +// System.err.println("[" + Thread.currentThread().getName() + "] value=" + orig + ", bytes=" + bytes); + return bytes; + } + + private Document doc(int id) { + Document doc = new Document(); + doc.add(new StringField("id", "doc-" + id, Store.NO)); + doc.add(new BinaryDocValuesField("val", toBytes(id + 1))); + return doc; + } + + public void testUpdatesAreFlushed() throws IOException { + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig( + TEST_VERSION_CURRENT, new MockAnalyzer(random(), MockTokenizer.WHITESPACE, false)).setRAMBufferSizeMB(0.00000001)); + writer.addDocument(doc(0)); // val=1 + writer.addDocument(doc(1)); // val=2 + writer.addDocument(doc(3)); // val=2 + writer.commit(); + assertEquals(1, writer.getFlushDeletesCount()); + writer.updateBinaryDocValue(new Term("id", "doc-0"), "val", toBytes(5)); + assertEquals(2, writer.getFlushDeletesCount()); + writer.updateBinaryDocValue(new Term("id", "doc-1"), "val", toBytes(6)); + assertEquals(3, writer.getFlushDeletesCount()); + writer.updateBinaryDocValue(new Term("id", "doc-2"), "val", toBytes(7)); + assertEquals(4, writer.getFlushDeletesCount()); + writer.getConfig().setRAMBufferSizeMB(1000d); + writer.updateBinaryDocValue(new Term("id", "doc-2"), "val", toBytes(7)); + assertEquals(4, writer.getFlushDeletesCount()); + writer.close(); + dir.close(); + } + + public void testSimple() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + // make sure random config doesn't flush on us + conf.setMaxBufferedDocs(10); + conf.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH); + IndexWriter writer = new IndexWriter(dir, conf); + writer.addDocument(doc(0)); // val=1 + writer.addDocument(doc(1)); // val=2 + if (random().nextBoolean()) { // randomly commit before the update is sent + writer.commit(); + } + writer.updateBinaryDocValue(new Term("id", "doc-0"), "val", toBytes(2)); // doc=0, exp=2 + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + assertEquals(1, reader.leaves().size()); + AtomicReader r = reader.leaves().get(0).reader(); + BinaryDocValues bdv = r.getBinaryDocValues("val"); + BytesRef scratch = new BytesRef(); + assertEquals(2, getValue(bdv, 0, scratch)); + assertEquals(2, getValue(bdv, 1, scratch)); + reader.close(); + + dir.close(); + } + + public void testUpdateFewSegments() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(2); // generate few segments + conf.setMergePolicy(NoMergePolicy.COMPOUND_FILES); // prevent merges for this test + IndexWriter writer = new IndexWriter(dir, conf); + int numDocs = 10; + long[] expectedValues = new long[numDocs]; + for (int i = 0; i < numDocs; i++) { + writer.addDocument(doc(i)); + expectedValues[i] = i + 1; + } + writer.commit(); + + // update few docs + for (int i = 0; i < numDocs; i++) { + if (random().nextDouble() < 0.4) { + long value = (i + 1) * 2; + writer.updateBinaryDocValue(new Term("id", "doc-" + i), "val", toBytes(value)); + expectedValues[i] = value; + } + } + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + BytesRef scratch = new BytesRef(); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + BinaryDocValues bdv = r.getBinaryDocValues("val"); + assertNotNull(bdv); + for (int i = 0; i < r.maxDoc(); i++) { + long expected = expectedValues[i + context.docBase]; + long actual = getValue(bdv, i, scratch); + assertEquals(expected, actual); + } + } + + reader.close(); + dir.close(); + } + + public void testReopen() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + writer.addDocument(doc(0)); + writer.addDocument(doc(1)); + + final boolean isNRT = random().nextBoolean(); + final DirectoryReader reader1; + if (isNRT) { + reader1 = DirectoryReader.open(writer, true); + } else { + writer.commit(); + reader1 = DirectoryReader.open(dir); + } + + // update doc + writer.updateBinaryDocValue(new Term("id", "doc-0"), "val", toBytes(10)); // update doc-0's value to 10 + if (!isNRT) { + writer.commit(); + } + + // reopen reader and assert only it sees the update + final DirectoryReader reader2 = DirectoryReader.openIfChanged(reader1); + assertNotNull(reader2); + assertTrue(reader1 != reader2); + + BytesRef scratch = new BytesRef(); + BinaryDocValues bdv1 = reader1.leaves().get(0).reader().getBinaryDocValues("val"); + BinaryDocValues bdv2 = reader2.leaves().get(0).reader().getBinaryDocValues("val"); + assertEquals(1, getValue(bdv1, 0, scratch)); + assertEquals(10, getValue(bdv2, 0, scratch)); + + IOUtils.close(writer, reader1, reader2, dir); + } + + public void testUpdatesAndDeletes() throws Exception { + // create an index with a segment with only deletes, a segment with both + // deletes and updates and a segment with only updates + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // control segment flushing + conf.setMergePolicy(NoMergePolicy.COMPOUND_FILES); // prevent merges for this test + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 6; i++) { + writer.addDocument(doc(i)); + if (i % 2 == 1) { + writer.commit(); // create 2-docs segments + } + } + + // delete doc-1 and doc-2 + writer.deleteDocuments(new Term("id", "doc-1"), new Term("id", "doc-2")); // 1st and 2nd segments + + // update docs 3 and 5 + writer.updateBinaryDocValue(new Term("id", "doc-3"), "val", toBytes(17L)); + writer.updateBinaryDocValue(new Term("id", "doc-5"), "val", toBytes(17L)); + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + AtomicReader slow = SlowCompositeReaderWrapper.wrap(reader); + + Bits liveDocs = slow.getLiveDocs(); + boolean[] expectedLiveDocs = new boolean[] { true, false, false, true, true, true }; + for (int i = 0; i < expectedLiveDocs.length; i++) { + assertEquals(expectedLiveDocs[i], liveDocs.get(i)); + } + + long[] expectedValues = new long[] { 1, 2, 3, 17, 5, 17}; + BinaryDocValues bdv = slow.getBinaryDocValues("val"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < expectedValues.length; i++) { + assertEquals(expectedValues[i], getValue(bdv, i, scratch)); + } + + reader.close(); + dir.close(); + } + + public void testUpdatesWithDeletes() throws Exception { + // update and delete different documents in the same commit session + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // control segment flushing + IndexWriter writer = new IndexWriter(dir, conf); + + writer.addDocument(doc(0)); + writer.addDocument(doc(1)); + + if (random().nextBoolean()) { + writer.commit(); + } + + writer.deleteDocuments(new Term("id", "doc-0")); + writer.updateBinaryDocValue(new Term("id", "doc-1"), "val", toBytes(17L)); + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + AtomicReader r = reader.leaves().get(0).reader(); + assertFalse(r.getLiveDocs().get(0)); + assertEquals(17, getValue(r.getBinaryDocValues("val"), 1, new BytesRef())); + + reader.close(); + dir.close(); + } + + public void testUpdateAndDeleteSameDocument() throws Exception { + // update and delete same document in same commit session + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // control segment flushing + IndexWriter writer = new IndexWriter(dir, conf); + + writer.addDocument(doc(0)); + writer.addDocument(doc(1)); + + if (random().nextBoolean()) { + writer.commit(); + } + + writer.deleteDocuments(new Term("id", "doc-0")); + writer.updateBinaryDocValue(new Term("id", "doc-0"), "val", toBytes(17L)); + + final DirectoryReader reader; + if (random().nextBoolean()) { // not NRT + writer.close(); + reader = DirectoryReader.open(dir); + } else { // NRT + reader = DirectoryReader.open(writer, true); + writer.close(); + } + + AtomicReader r = reader.leaves().get(0).reader(); + assertFalse(r.getLiveDocs().get(0)); + assertEquals(1, getValue(r.getBinaryDocValues("val"), 0, new BytesRef())); // deletes are currently applied first + + reader.close(); + dir.close(); + } + + public void testMultipleDocValuesTypes() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // prevent merges + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 4; i++) { + Document doc = new Document(); + doc.add(new StringField("dvUpdateKey", "dv", Store.NO)); + doc.add(new NumericDocValuesField("ndv", i)); + doc.add(new BinaryDocValuesField("bdv", new BytesRef(Integer.toString(i)))); + doc.add(new SortedDocValuesField("sdv", new BytesRef(Integer.toString(i)))); + doc.add(new SortedSetDocValuesField("ssdv", new BytesRef(Integer.toString(i)))); + doc.add(new SortedSetDocValuesField("ssdv", new BytesRef(Integer.toString(i * 2)))); + writer.addDocument(doc); + } + writer.commit(); + + // update all docs' bdv field + writer.updateBinaryDocValue(new Term("dvUpdateKey", "dv"), "bdv", toBytes(17L)); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + NumericDocValues ndv = r.getNumericDocValues("ndv"); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + SortedDocValues sdv = r.getSortedDocValues("sdv"); + SortedSetDocValues ssdv = r.getSortedSetDocValues("ssdv"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(i, ndv.get(i)); + assertEquals(17, getValue(bdv, i, scratch)); + sdv.get(i, scratch); + assertEquals(new BytesRef(Integer.toString(i)), scratch); + ssdv.setDocument(i); + long ord = ssdv.nextOrd(); + ssdv.lookupOrd(ord, scratch); + assertEquals(i, Integer.parseInt(scratch.utf8ToString())); + if (i != 0) { + ord = ssdv.nextOrd(); + ssdv.lookupOrd(ord, scratch); + assertEquals(i * 2, Integer.parseInt(scratch.utf8ToString())); + } + assertEquals(SortedSetDocValues.NO_MORE_ORDS, ssdv.nextOrd()); + } + + reader.close(); + dir.close(); + } + + public void testMultipleBinaryDocValues() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(10); // prevent merges + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 2; i++) { + Document doc = new Document(); + doc.add(new StringField("dvUpdateKey", "dv", Store.NO)); + doc.add(new BinaryDocValuesField("bdv1", toBytes(i))); + doc.add(new BinaryDocValuesField("bdv2", toBytes(i))); + writer.addDocument(doc); + } + writer.commit(); + + // update all docs' bdv1 field + writer.updateBinaryDocValue(new Term("dvUpdateKey", "dv"), "bdv1", toBytes(17L)); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + + BinaryDocValues bdv1 = r.getBinaryDocValues("bdv1"); + BinaryDocValues bdv2 = r.getBinaryDocValues("bdv2"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(17, getValue(bdv1, i, scratch)); + assertEquals(i, getValue(bdv2, i, scratch)); + } + + reader.close(); + dir.close(); + } + + public void testDocumentWithNoValue() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 2; i++) { + Document doc = new Document(); + doc.add(new StringField("dvUpdateKey", "dv", Store.NO)); + if (i == 0) { // index only one document with value + doc.add(new BinaryDocValuesField("bdv", toBytes(5L))); + } + writer.addDocument(doc); + } + writer.commit(); + + // update all docs' bdv field + writer.updateBinaryDocValue(new Term("dvUpdateKey", "dv"), "bdv", toBytes(17L)); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(17, getValue(bdv, i, scratch)); + } + + reader.close(); + dir.close(); + } + + public void testUnsetValue() throws Exception { + assumeTrue("codec does not support docsWithField", defaultCodecSupportsDocsWithField()); + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 2; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc" + i, Store.NO)); + doc.add(new BinaryDocValuesField("bdv", toBytes(5L))); + writer.addDocument(doc); + } + writer.commit(); + + // unset the value of 'doc0' + writer.updateBinaryDocValue(new Term("id", "doc0"), "bdv", null); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + if (i == 0) { + bdv.get(i, scratch); + assertEquals(0, scratch.length); + } else { + assertEquals(5, getValue(bdv, i, scratch)); + } + } + + Bits docsWithField = r.getDocsWithField("bdv"); + assertFalse(docsWithField.get(0)); + assertTrue(docsWithField.get(1)); + + reader.close(); + dir.close(); + } + + public void testUnsetAllValues() throws Exception { + assumeTrue("codec does not support docsWithField", defaultCodecSupportsDocsWithField()); + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + for (int i = 0; i < 2; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc", Store.NO)); + doc.add(new BinaryDocValuesField("bdv", toBytes(5L))); + writer.addDocument(doc); + } + writer.commit(); + + // unset the value of 'doc' + writer.updateBinaryDocValue(new Term("id", "doc"), "bdv", null); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = reader.leaves().get(0).reader(); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + bdv.get(i, scratch); + assertEquals(0, scratch.length); + } + + Bits docsWithField = r.getDocsWithField("bdv"); + assertFalse(docsWithField.get(0)); + assertFalse(docsWithField.get(1)); + + reader.close(); + dir.close(); + } + + public void testUpdateNonBinaryDocValuesField() throws Exception { + // we don't support adding new fields or updating existing non-binary-dv + // fields through binary updates + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new StringField("foo", "bar", Store.NO)); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + try { + writer.updateBinaryDocValue(new Term("key", "doc"), "bdv", toBytes(17L)); + fail("should not have allowed creating new fields through update"); + } catch (IllegalArgumentException e) { + // ok + } + + try { + writer.updateBinaryDocValue(new Term("key", "doc"), "foo", toBytes(17L)); + fail("should not have allowed updating an existing field to binary-dv"); + } catch (IllegalArgumentException e) { + // ok + } + + writer.close(); + dir.close(); + } + + public void testDifferentDVFormatPerField() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setCodec(new Lucene46Codec() { + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return new Lucene45DocValuesFormat(); + } + }); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new BinaryDocValuesField("bdv", toBytes(5L))); + doc.add(new SortedDocValuesField("sorted", new BytesRef("value"))); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + writer.updateBinaryDocValue(new Term("key", "doc"), "bdv", toBytes(17L)); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + + AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + SortedDocValues sdv = r.getSortedDocValues("sorted"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(17, getValue(bdv, i, scratch)); + sdv.get(i, scratch); + assertEquals(new BytesRef("value"), scratch); + } + + reader.close(); + dir.close(); + } + + public void testUpdateSameDocMultipleTimes() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new BinaryDocValuesField("bdv", toBytes(5L))); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + writer.updateBinaryDocValue(new Term("key", "doc"), "bdv", toBytes(17L)); // update existing field + writer.updateBinaryDocValue(new Term("key", "doc"), "bdv", toBytes(3L)); // update existing field 2nd time in this commit + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + final AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(3, getValue(bdv, i, scratch)); + } + reader.close(); + dir.close(); + } + + public void testSegmentMerges() throws Exception { + Directory dir = newDirectory(); + Random random = random(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); + IndexWriter writer = new IndexWriter(dir, conf.clone()); + + int docid = 0; + int numRounds = atLeast(10); + for (int rnd = 0; rnd < numRounds; rnd++) { + Document doc = new Document(); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new BinaryDocValuesField("bdv", toBytes(-1))); + int numDocs = atLeast(30); + for (int i = 0; i < numDocs; i++) { + doc.removeField("id"); + doc.add(new StringField("id", Integer.toString(docid++), Store.NO)); + writer.addDocument(doc); + } + + long value = rnd + 1; + writer.updateBinaryDocValue(new Term("key", "doc"), "bdv", toBytes(value)); + + if (random.nextDouble() < 0.2) { // randomly delete some docs + writer.deleteDocuments(new Term("id", Integer.toString(random.nextInt(docid)))); + } + + // randomly commit or reopen-IW (or nothing), before forceMerge + if (random.nextDouble() < 0.4) { + writer.commit(); + } else if (random.nextDouble() < 0.1) { + writer.close(); + writer = new IndexWriter(dir, conf.clone()); + } + + // add another document with the current value, to be sure forceMerge has + // something to merge (for instance, it could be that CMS finished merging + // all segments down to 1 before the delete was applied, so when + // forceMerge is called, the index will be with one segment and deletes + // and some MPs might now merge it, thereby invalidating test's + // assumption that the reader has no deletes). + doc = new Document(); + doc.add(new StringField("id", Integer.toString(docid++), Store.NO)); + doc.add(new StringField("key", "doc", Store.NO)); + doc.add(new BinaryDocValuesField("bdv", toBytes(value))); + writer.addDocument(doc); + + writer.forceMerge(1, true); + final DirectoryReader reader; + if (random.nextBoolean()) { + writer.commit(); + reader = DirectoryReader.open(dir); + } else { + reader = DirectoryReader.open(writer, true); + } + + assertEquals(1, reader.leaves().size()); + final AtomicReader r = reader.leaves().get(0).reader(); + assertNull("index should have no deletes after forceMerge", r.getLiveDocs()); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + assertNotNull(bdv); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(value, getValue(bdv, i, scratch)); + } + reader.close(); + } + + writer.close(); + dir.close(); + } + + public void testUpdateDocumentByMultipleTerms() throws Exception { + // make sure the order of updates is respected, even when multiple terms affect same document + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("k1", "v1", Store.NO)); + doc.add(new StringField("k2", "v2", Store.NO)); + doc.add(new BinaryDocValuesField("bdv", toBytes(5L))); + writer.addDocument(doc); // flushed document + writer.commit(); + writer.addDocument(doc); // in-memory document + + writer.updateBinaryDocValue(new Term("k1", "v1"), "bdv", toBytes(17L)); + writer.updateBinaryDocValue(new Term("k2", "v2"), "bdv", toBytes(3L)); + writer.close(); + + final DirectoryReader reader = DirectoryReader.open(dir); + final AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(3, getValue(bdv, i, scratch)); + } + reader.close(); + dir.close(); + } + + public void testManyReopensAndFields() throws Exception { + Directory dir = newDirectory(); + final Random random = random(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); + LogMergePolicy lmp = newLogMergePolicy(); + lmp.setMergeFactor(3); // merge often + conf.setMergePolicy(lmp); + IndexWriter writer = new IndexWriter(dir, conf); + + final boolean isNRT = random.nextBoolean(); + DirectoryReader reader; + if (isNRT) { + reader = DirectoryReader.open(writer, true); + } else { + writer.commit(); + reader = DirectoryReader.open(dir); + } + + final int numFields = random.nextInt(4) + 3; // 3-7 + final long[] fieldValues = new long[numFields]; + final boolean[] fieldHasValue = new boolean[numFields]; + Arrays.fill(fieldHasValue, true); + for (int i = 0; i < fieldValues.length; i++) { + fieldValues[i] = 1; + } + + int numRounds = atLeast(15); + int docID = 0; + for (int i = 0; i < numRounds; i++) { + int numDocs = atLeast(5); +// System.out.println("[" + Thread.currentThread().getName() + "]: round=" + i + ", numDocs=" + numDocs); + for (int j = 0; j < numDocs; j++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc-" + docID, Store.NO)); + doc.add(new StringField("key", "all", Store.NO)); // update key + // add all fields with their current value + for (int f = 0; f < fieldValues.length; f++) { + doc.add(new BinaryDocValuesField("f" + f, toBytes(fieldValues[f]))); + } + writer.addDocument(doc); + ++docID; + } + + // if field's value was unset before, unset it from all new added documents too + for (int field = 0; field < fieldHasValue.length; field++) { + if (!fieldHasValue[field]) { + writer.updateBinaryDocValue(new Term("key", "all"), "f" + field, null); + } + } + + int fieldIdx = random.nextInt(fieldValues.length); + String updateField = "f" + fieldIdx; + if (random.nextBoolean()) { +// System.out.println("[" + Thread.currentThread().getName() + "]: unset field '" + updateField + "'"); + fieldHasValue[fieldIdx] = false; + writer.updateBinaryDocValue(new Term("key", "all"), updateField, null); + } else { + fieldHasValue[fieldIdx] = true; + writer.updateBinaryDocValue(new Term("key", "all"), updateField, toBytes(++fieldValues[fieldIdx])); +// System.out.println("[" + Thread.currentThread().getName() + "]: updated field '" + updateField + "' to value " + fieldValues[fieldIdx]); + } + + if (random.nextDouble() < 0.2) { + int deleteDoc = random.nextInt(docID); // might also delete an already deleted document, ok! + writer.deleteDocuments(new Term("id", "doc-" + deleteDoc)); +// System.out.println("[" + Thread.currentThread().getName() + "]: deleted document: doc-" + deleteDoc); + } + + // verify reader + if (!isNRT) { + writer.commit(); + } + +// System.out.println("[" + Thread.currentThread().getName() + "]: reopen reader: " + reader); + DirectoryReader newReader = DirectoryReader.openIfChanged(reader); + assertNotNull(newReader); + reader.close(); + reader = newReader; +// System.out.println("[" + Thread.currentThread().getName() + "]: reopened reader: " + reader); + assertTrue(reader.numDocs() > 0); // we delete at most one document per round + BytesRef scratch = new BytesRef(); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); +// System.out.println(((SegmentReader) r).getSegmentName()); + Bits liveDocs = r.getLiveDocs(); + for (int field = 0; field < fieldValues.length; field++) { + String f = "f" + field; + BinaryDocValues bdv = r.getBinaryDocValues(f); + Bits docsWithField = r.getDocsWithField(f); + assertNotNull(bdv); + int maxDoc = r.maxDoc(); + for (int doc = 0; doc < maxDoc; doc++) { + if (liveDocs == null || liveDocs.get(doc)) { +// System.out.println("doc=" + (doc + context.docBase) + " f='" + f + "' vslue=" + getValue(bdv, doc, scratch)); + if (fieldHasValue[field]) { + assertTrue(docsWithField.get(doc)); + assertEquals("invalid value for doc=" + doc + ", field=" + f + ", reader=" + r, fieldValues[field], getValue(bdv, doc, scratch)); + } else { + assertFalse(docsWithField.get(doc)); + } + } + } + } + } +// System.out.println(); + } + + IOUtils.close(writer, reader, dir); + } + + public void testUpdateSegmentWithNoDocValues() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + // prevent merges, otherwise by the time updates are applied + // (writer.close()), the segments might have merged and that update becomes + // legit. + conf.setMergePolicy(NoMergePolicy.COMPOUND_FILES); + IndexWriter writer = new IndexWriter(dir, conf); + + // first segment with BDV + Document doc = new Document(); + doc.add(new StringField("id", "doc0", Store.NO)); + doc.add(new BinaryDocValuesField("bdv", toBytes(3L))); + writer.addDocument(doc); + doc = new Document(); + doc.add(new StringField("id", "doc4", Store.NO)); // document without 'bdv' field + writer.addDocument(doc); + writer.commit(); + + // second segment with no BDV + doc = new Document(); + doc.add(new StringField("id", "doc1", Store.NO)); + writer.addDocument(doc); + doc = new Document(); + doc.add(new StringField("id", "doc2", Store.NO)); // document that isn't updated + writer.addDocument(doc); + writer.commit(); + + // update document in the first segment - should not affect docsWithField of + // the document without BDV field + writer.updateBinaryDocValue(new Term("id", "doc0"), "bdv", toBytes(5L)); + + // update document in the second segment - field should be added and we should + // be able to handle the other document correctly (e.g. no NPE) + writer.updateBinaryDocValue(new Term("id", "doc1"), "bdv", toBytes(5L)); + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + BytesRef scratch = new BytesRef(); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + Bits docsWithField = r.getDocsWithField("bdv"); + assertNotNull(docsWithField); + assertTrue(docsWithField.get(0)); + assertEquals(5L, getValue(bdv, 0, scratch)); + assertFalse(docsWithField.get(1)); + bdv.get(1, scratch); + assertEquals(0, scratch.length); + } + reader.close(); + + dir.close(); + } + + public void testUpdateSegmentWithPostingButNoDocValues() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + // prevent merges, otherwise by the time updates are applied + // (writer.close()), the segments might have merged and that update becomes + // legit. + conf.setMergePolicy(NoMergePolicy.COMPOUND_FILES); + IndexWriter writer = new IndexWriter(dir, conf); + + // first segment with BDV + Document doc = new Document(); + doc.add(new StringField("id", "doc0", Store.NO)); + doc.add(new StringField("bdv", "mock-value", Store.NO)); + doc.add(new BinaryDocValuesField("bdv", toBytes(5L))); + writer.addDocument(doc); + writer.commit(); + + // second segment with no BDV + doc = new Document(); + doc.add(new StringField("id", "doc1", Store.NO)); + doc.add(new StringField("bdv", "mock-value", Store.NO)); + writer.addDocument(doc); + writer.commit(); + + // update document in the second segment + writer.updateBinaryDocValue(new Term("id", "doc1"), "bdv", toBytes(5L)); + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + BytesRef scratch = new BytesRef(); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(5L, getValue(bdv, i, scratch)); + } + } + reader.close(); + + dir.close(); + } + + public void testUpdateBinaryDVFieldWithSameNameAsPostingField() throws Exception { + // this used to fail because FieldInfos.Builder neglected to update + // globalFieldMaps.docValueTypes map + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("f", "mock-value", Store.NO)); + doc.add(new BinaryDocValuesField("f", toBytes(5L))); + writer.addDocument(doc); + writer.commit(); + writer.updateBinaryDocValue(new Term("f", "mock-value"), "f", toBytes(17L)); + writer.close(); + + DirectoryReader r = DirectoryReader.open(dir); + BinaryDocValues bdv = r.leaves().get(0).reader().getBinaryDocValues("f"); + assertEquals(17, getValue(bdv, 0, new BytesRef())); + r.close(); + + dir.close(); + } + + public void testUpdateOldSegments() throws Exception { + Codec[] oldCodecs = new Codec[] { new Lucene40RWCodec(), new Lucene41RWCodec(), new Lucene42RWCodec(), new Lucene45RWCodec() }; + Directory dir = newDirectory(); + + boolean oldValue = OLD_FORMAT_IMPERSONATION_IS_ACTIVE; + // create a segment with an old Codec + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setCodec(oldCodecs[random().nextInt(oldCodecs.length)]); + OLD_FORMAT_IMPERSONATION_IS_ACTIVE = true; + IndexWriter writer = new IndexWriter(dir, conf); + Document doc = new Document(); + doc.add(new StringField("id", "doc", Store.NO)); + doc.add(new BinaryDocValuesField("f", toBytes(5L))); + writer.addDocument(doc); + writer.close(); + + conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + writer = new IndexWriter(dir, conf); + writer.updateBinaryDocValue(new Term("id", "doc"), "f", toBytes(4L)); + OLD_FORMAT_IMPERSONATION_IS_ACTIVE = false; + try { + writer.close(); + fail("should not have succeeded to update a segment written with an old Codec"); + } catch (UnsupportedOperationException e) { + writer.rollback(); + } finally { + OLD_FORMAT_IMPERSONATION_IS_ACTIVE = oldValue; + } + + dir.close(); + } + + public void testStressMultiThreading() throws Exception { + final Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + final IndexWriter writer = new IndexWriter(dir, conf); + + // create index + final int numThreads = TestUtil.nextInt(random(), 3, 6); + final int numDocs = atLeast(2000); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc" + i, Store.NO)); + double group = random().nextDouble(); + String g; + if (group < 0.1) g = "g0"; + else if (group < 0.5) g = "g1"; + else if (group < 0.8) g = "g2"; + else g = "g3"; + doc.add(new StringField("updKey", g, Store.NO)); + for (int j = 0; j < numThreads; j++) { + long value = random().nextInt(); + doc.add(new BinaryDocValuesField("f" + j, toBytes(value))); + doc.add(new BinaryDocValuesField("cf" + j, toBytes(value * 2))); // control, always updated to f * 2 + } + writer.addDocument(doc); + } + + final CountDownLatch done = new CountDownLatch(numThreads); + final AtomicInteger numUpdates = new AtomicInteger(atLeast(100)); + + // same thread updates a field as well as reopens + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < threads.length; i++) { + final String f = "f" + i; + final String cf = "cf" + i; + threads[i] = new Thread("UpdateThread-" + i) { + @Override + public void run() { + DirectoryReader reader = null; + boolean success = false; + try { + Random random = random(); + while (numUpdates.getAndDecrement() > 0) { + double group = random.nextDouble(); + Term t; + if (group < 0.1) t = new Term("updKey", "g0"); + else if (group < 0.5) t = new Term("updKey", "g1"); + else if (group < 0.8) t = new Term("updKey", "g2"); + else t = new Term("updKey", "g3"); +// System.out.println("[" + Thread.currentThread().getName() + "] numUpdates=" + numUpdates + " updateTerm=" + t); + if (random.nextBoolean()) { // sometimes unset a value + writer.updateBinaryDocValue(t, f, null); + writer.updateBinaryDocValue(t, cf, null); + } else { + long updValue = random.nextInt(); + writer.updateBinaryDocValue(t, f, toBytes(updValue)); + writer.updateBinaryDocValue(t, cf, toBytes(updValue * 2)); + } + + if (random.nextDouble() < 0.2) { + // delete a random document + int doc = random.nextInt(numDocs); +// System.out.println("[" + Thread.currentThread().getName() + "] deleteDoc=doc" + doc); + writer.deleteDocuments(new Term("id", "doc" + doc)); + } + + if (random.nextDouble() < 0.05) { // commit every 20 updates on average +// System.out.println("[" + Thread.currentThread().getName() + "] commit"); + writer.commit(); + } + + if (random.nextDouble() < 0.1) { // reopen NRT reader (apply updates), on average once every 10 updates + if (reader == null) { +// System.out.println("[" + Thread.currentThread().getName() + "] open NRT"); + reader = DirectoryReader.open(writer, true); + } else { +// System.out.println("[" + Thread.currentThread().getName() + "] reopen NRT"); + DirectoryReader r2 = DirectoryReader.openIfChanged(reader, writer, true); + if (r2 != null) { + reader.close(); + reader = r2; + } + } + } + } +// System.out.println("[" + Thread.currentThread().getName() + "] DONE"); + success = true; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + if (success) { // suppress this exception only if there was another exception + throw new RuntimeException(e); + } + } + } + done.countDown(); + } + } + }; + } + + for (Thread t : threads) t.start(); + done.await(); + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + BytesRef scratch = new BytesRef(); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + for (int i = 0; i < numThreads; i++) { + BinaryDocValues bdv = r.getBinaryDocValues("f" + i); + BinaryDocValues control = r.getBinaryDocValues("cf" + i); + Bits docsWithBdv = r.getDocsWithField("f" + i); + Bits docsWithControl = r.getDocsWithField("cf" + i); + Bits liveDocs = r.getLiveDocs(); + for (int j = 0; j < r.maxDoc(); j++) { + if (liveDocs == null || liveDocs.get(j)) { + assertEquals(docsWithBdv.get(j), docsWithControl.get(j)); + if (docsWithBdv.get(j)) { + assertEquals(getValue(control, j, scratch), getValue(bdv, j, scratch) * 2); + } + } + } + } + } + reader.close(); + + dir.close(); + } + + public void testUpdateDifferentDocsInDifferentGens() throws Exception { + // update same document multiple times across generations + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(4); + IndexWriter writer = new IndexWriter(dir, conf); + final int numDocs = atLeast(10); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc" + i, Store.NO)); + long value = random().nextInt(); + doc.add(new BinaryDocValuesField("f", toBytes(value))); + doc.add(new BinaryDocValuesField("cf", toBytes(value * 2))); + writer.addDocument(doc); + } + + int numGens = atLeast(5); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < numGens; i++) { + int doc = random().nextInt(numDocs); + Term t = new Term("id", "doc" + doc); + long value = random().nextLong(); + writer.updateBinaryDocValue(t, "f", toBytes(value)); + writer.updateBinaryDocValue(t, "cf", toBytes(value * 2)); + DirectoryReader reader = DirectoryReader.open(writer, true); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + BinaryDocValues fbdv = r.getBinaryDocValues("f"); + BinaryDocValues cfbdv = r.getBinaryDocValues("cf"); + for (int j = 0; j < r.maxDoc(); j++) { + assertEquals(getValue(cfbdv, j, scratch), getValue(fbdv, j, scratch) * 2); + } + } + reader.close(); + } + writer.close(); + dir.close(); + } + + public void testChangeCodec() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMergePolicy(NoMergePolicy.COMPOUND_FILES); // disable merges to simplify test assertions. + conf.setCodec(new Lucene46Codec() { + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return new Lucene45DocValuesFormat(); + } + }); + IndexWriter writer = new IndexWriter(dir, conf.clone()); + Document doc = new Document(); + doc.add(new StringField("id", "d0", Store.NO)); + doc.add(new BinaryDocValuesField("f1", toBytes(5L))); + doc.add(new BinaryDocValuesField("f2", toBytes(13L))); + writer.addDocument(doc); + writer.close(); + + // change format + conf.setCodec(new Lucene46Codec() { + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return new AssertingDocValuesFormat(); + } + }); + writer = new IndexWriter(dir, conf.clone()); + doc = new Document(); + doc.add(new StringField("id", "d1", Store.NO)); + doc.add(new BinaryDocValuesField("f1", toBytes(17L))); + doc.add(new BinaryDocValuesField("f2", toBytes(2L))); + writer.addDocument(doc); + writer.updateBinaryDocValue(new Term("id", "d0"), "f1", toBytes(12L)); + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + AtomicReader r = SlowCompositeReaderWrapper.wrap(reader); + BinaryDocValues f1 = r.getBinaryDocValues("f1"); + BinaryDocValues f2 = r.getBinaryDocValues("f2"); + BytesRef scratch = new BytesRef(); + assertEquals(12L, getValue(f1, 0, scratch)); + assertEquals(13L, getValue(f2, 0, scratch)); + assertEquals(17L, getValue(f1, 1, scratch)); + assertEquals(2L, getValue(f2, 1, scratch)); + reader.close(); + dir.close(); + } + + public void testAddIndexes() throws Exception { + Directory dir1 = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir1, conf); + + final int numDocs = atLeast(50); + final int numTerms = TestUtil.nextInt(random(), 1, numDocs / 5); + Set randomTerms = new HashSet<>(); + while (randomTerms.size() < numTerms) { + randomTerms.add(TestUtil.randomSimpleString(random())); + } + + // create first index + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(new StringField("id", RandomPicks.randomFrom(random(), randomTerms), Store.NO)); + doc.add(new BinaryDocValuesField("bdv", toBytes(4L))); + doc.add(new BinaryDocValuesField("control", toBytes(8L))); + writer.addDocument(doc); + } + + if (random().nextBoolean()) { + writer.commit(); + } + + // update some docs to a random value + long value = random().nextInt(); + Term term = new Term("id", RandomPicks.randomFrom(random(), randomTerms)); + writer.updateBinaryDocValue(term, "bdv", toBytes(value)); + writer.updateBinaryDocValue(term, "control", toBytes(value * 2)); + writer.close(); + + Directory dir2 = newDirectory(); + conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + writer = new IndexWriter(dir2, conf); + if (random().nextBoolean()) { + writer.addIndexes(dir1); + } else { + DirectoryReader reader = DirectoryReader.open(dir1); + writer.addIndexes(reader); + reader.close(); + } + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir2); + BytesRef scratch = new BytesRef(); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + BinaryDocValues bdv = r.getBinaryDocValues("bdv"); + BinaryDocValues control = r.getBinaryDocValues("control"); + for (int i = 0; i < r.maxDoc(); i++) { + assertEquals(getValue(bdv, i, scratch)*2, getValue(control, i, scratch)); + } + } + reader.close(); + + IOUtils.close(dir1, dir2); + } + + public void testDeleteUnusedUpdatesFiles() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("id", "d0", Store.NO)); + doc.add(new BinaryDocValuesField("f", toBytes(1L))); + writer.addDocument(doc); + + // create first gen of update files + writer.updateBinaryDocValue(new Term("id", "d0"), "f", toBytes(2L)); + writer.commit(); + int numFiles = dir.listAll().length; + + DirectoryReader r = DirectoryReader.open(dir); + BytesRef scratch = new BytesRef(); + assertEquals(2L, getValue(r.leaves().get(0).reader().getBinaryDocValues("f"), 0, scratch)); + r.close(); + + // create second gen of update files, first gen should be deleted + writer.updateBinaryDocValue(new Term("id", "d0"), "f", toBytes(5L)); + writer.commit(); + assertEquals(numFiles, dir.listAll().length); + + r = DirectoryReader.open(dir); + assertEquals(5L, getValue(r.leaves().get(0).reader().getBinaryDocValues("f"), 0, scratch)); + r.close(); + + writer.close(); + dir.close(); + } + + public void testTonsOfUpdates() throws Exception { + // LUCENE-5248: make sure that when there are many updates, we don't use too much RAM + Directory dir = newDirectory(); + final Random random = random(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); + conf.setRAMBufferSizeMB(IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB); + conf.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH); // don't flush by doc + IndexWriter writer = new IndexWriter(dir, conf); + + // test data: lots of documents (few 10Ks) and lots of update terms (few hundreds) + final int numDocs = atLeast(20000); + final int numBinaryFields = atLeast(5); + final int numTerms = TestUtil.nextInt(random, 10, 100); // terms should affect many docs + Set updateTerms = new HashSet<>(); + while (updateTerms.size() < numTerms) { + updateTerms.add(TestUtil.randomSimpleString(random)); + } + +// System.out.println("numDocs=" + numDocs + " numBinaryFields=" + numBinaryFields + " numTerms=" + numTerms); + + // build a large index with many BDV fields and update terms + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + int numUpdateTerms = TestUtil.nextInt(random, 1, numTerms / 10); + for (int j = 0; j < numUpdateTerms; j++) { + doc.add(new StringField("upd", RandomPicks.randomFrom(random, updateTerms), Store.NO)); + } + for (int j = 0; j < numBinaryFields; j++) { + long val = random.nextInt(); + doc.add(new BinaryDocValuesField("f" + j, toBytes(val))); + doc.add(new BinaryDocValuesField("cf" + j, toBytes(val * 2))); + } + writer.addDocument(doc); + } + + writer.commit(); // commit so there's something to apply to + + // set to flush every 2048 bytes (approximately every 12 updates), so we get + // many flushes during binary updates + writer.getConfig().setRAMBufferSizeMB(2048.0 / 1024 / 1024); + final int numUpdates = atLeast(100); +// System.out.println("numUpdates=" + numUpdates); + for (int i = 0; i < numUpdates; i++) { + int field = random.nextInt(numBinaryFields); + Term updateTerm = new Term("upd", RandomPicks.randomFrom(random, updateTerms)); + long value = random.nextInt(); + writer.updateBinaryDocValue(updateTerm, "f" + field, toBytes(value)); + writer.updateBinaryDocValue(updateTerm, "cf" + field, toBytes(value * 2)); + } + + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + BytesRef scratch = new BytesRef(); + for (AtomicReaderContext context : reader.leaves()) { + for (int i = 0; i < numBinaryFields; i++) { + AtomicReader r = context.reader(); + BinaryDocValues f = r.getBinaryDocValues("f" + i); + BinaryDocValues cf = r.getBinaryDocValues("cf" + i); + for (int j = 0; j < r.maxDoc(); j++) { + assertEquals("reader=" + r + ", field=f" + i + ", doc=" + j, getValue(cf, j, scratch), getValue(f, j, scratch) * 2); + } + } + } + reader.close(); + + dir.close(); + } + + public void testUpdatesOrder() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("upd", "t1", Store.NO)); + doc.add(new StringField("upd", "t2", Store.NO)); + doc.add(new BinaryDocValuesField("f1", toBytes(1L))); + doc.add(new BinaryDocValuesField("f2", toBytes(1L))); + writer.addDocument(doc); + writer.updateBinaryDocValue(new Term("upd", "t1"), "f1", toBytes(2L)); // update f1 to 2 + writer.updateBinaryDocValue(new Term("upd", "t1"), "f2", toBytes(2L)); // update f2 to 2 + writer.updateBinaryDocValue(new Term("upd", "t2"), "f1", toBytes(3L)); // update f1 to 3 + writer.updateBinaryDocValue(new Term("upd", "t2"), "f2", toBytes(3L)); // update f2 to 3 + writer.updateBinaryDocValue(new Term("upd", "t1"), "f1", toBytes(4L)); // update f1 to 4 (but not f2) + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + BytesRef scratch = new BytesRef(); + assertEquals(4, getValue(reader.leaves().get(0).reader().getBinaryDocValues("f1"), 0, scratch)); + assertEquals(3, getValue(reader.leaves().get(0).reader().getBinaryDocValues("f2"), 0, scratch)); + reader.close(); + + dir.close(); + } + + public void testUpdateAllDeletedSegment() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("id", "doc", Store.NO)); + doc.add(new BinaryDocValuesField("f1", toBytes(1L))); + writer.addDocument(doc); + writer.addDocument(doc); + writer.commit(); + writer.deleteDocuments(new Term("id", "doc")); // delete all docs in the first segment + writer.addDocument(doc); + writer.updateBinaryDocValue(new Term("id", "doc"), "f1", toBytes(2L)); + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + assertEquals(1, reader.leaves().size()); + assertEquals(2L, getValue(reader.leaves().get(0).reader().getBinaryDocValues("f1"), 0, new BytesRef())); + reader.close(); + + dir.close(); + } + + public void testUpdateTwoNonexistingTerms() throws Exception { + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + IndexWriter writer = new IndexWriter(dir, conf); + + Document doc = new Document(); + doc.add(new StringField("id", "doc", Store.NO)); + doc.add(new BinaryDocValuesField("f1", toBytes(1L))); + writer.addDocument(doc); + // update w/ multiple nonexisting terms in same field + writer.updateBinaryDocValue(new Term("c", "foo"), "f1", toBytes(2L)); + writer.updateBinaryDocValue(new Term("c", "bar"), "f1", toBytes(2L)); + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + assertEquals(1, reader.leaves().size()); + assertEquals(1L, getValue(reader.leaves().get(0).reader().getBinaryDocValues("f1"), 0, new BytesRef())); + reader.close(); + + dir.close(); + } + +} diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java index 1689080d31c..821b21a2e72 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java @@ -1778,6 +1778,8 @@ public class TestIndexWriterExceptions extends LuceneTestCase { doc.add(new StringField("id", ""+(docBase+i), Field.Store.NO)); doc.add(new NumericDocValuesField("f", 1L)); doc.add(new NumericDocValuesField("cf", 2L)); + doc.add(new BinaryDocValuesField("bf", TestBinaryDocValuesUpdates.toBytes(1L))); + doc.add(new BinaryDocValuesField("bcf", TestBinaryDocValuesUpdates.toBytes(2L))); w.addDocument(doc); } docCount += numDocs; @@ -1802,8 +1804,18 @@ public class TestIndexWriterExceptions extends LuceneTestCase { if (VERBOSE) { System.out.println(" update id=" + (docBase+i) + " to value " + value); } - w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "f", value); - w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "cf", value * 2); + if (random().nextBoolean()) { // update only numeric field + w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "f", value); + w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "cf", value * 2); + } else if (random().nextBoolean()) { + w.updateBinaryDocValue(new Term("id", Integer.toString(docBase + i)), "bf", TestBinaryDocValuesUpdates.toBytes(value)); + w.updateBinaryDocValue(new Term("id", Integer.toString(docBase + i)), "bcf", TestBinaryDocValuesUpdates.toBytes(value * 2)); + } else { + w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "f", value); + w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "cf", value * 2); + w.updateBinaryDocValue(new Term("id", Integer.toString(docBase + i)), "bf", TestBinaryDocValuesUpdates.toBytes(value)); + w.updateBinaryDocValue(new Term("id", Integer.toString(docBase + i)), "bcf", TestBinaryDocValuesUpdates.toBytes(value * 2)); + } } // sometimes do both deletes and updates @@ -1877,13 +1889,18 @@ public class TestIndexWriterExceptions extends LuceneTestCase { r = w.getReader(); } assertEquals(docCount-deleteCount, r.numDocs()); + BytesRef scratch = new BytesRef(); for (AtomicReaderContext context : r.leaves()) { - Bits liveDocs = context.reader().getLiveDocs(); - NumericDocValues f = context.reader().getNumericDocValues("f"); - NumericDocValues cf = context.reader().getNumericDocValues("cf"); - for (int i = 0; i < context.reader().maxDoc(); i++) { + AtomicReader reader = context.reader(); + Bits liveDocs = reader.getLiveDocs(); + NumericDocValues f = reader.getNumericDocValues("f"); + NumericDocValues cf = reader.getNumericDocValues("cf"); + BinaryDocValues bf = reader.getBinaryDocValues("bf"); + BinaryDocValues bcf = reader.getBinaryDocValues("bcf"); + for (int i = 0; i < reader.maxDoc(); i++) { if (liveDocs == null || liveDocs.get(i)) { assertEquals("doc=" + (docBase + i), cf.get(i), f.get(i) * 2); + assertEquals("doc=" + (docBase + i), TestBinaryDocValuesUpdates.getValue(bcf, i, scratch), TestBinaryDocValuesUpdates.getValue(bf, i, scratch) * 2); } } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMixedDocValuesUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestMixedDocValuesUpdates.java new file mode 100644 index 00000000000..154a3ddb08f --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestMixedDocValuesUpdates.java @@ -0,0 +1,433 @@ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.BinaryDocValuesField; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; +import org.apache.lucene.util.TestUtil; + +import com.carrotsearch.randomizedtesting.generators.RandomPicks; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@SuppressCodecs({"Lucene40","Lucene41","Lucene42","Lucene45"}) +public class TestMixedDocValuesUpdates extends LuceneTestCase { + + public void testManyReopensAndFields() throws Exception { + Directory dir = newDirectory(); + final Random random = random(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); + LogMergePolicy lmp = newLogMergePolicy(); + lmp.setMergeFactor(3); // merge often + conf.setMergePolicy(lmp); + IndexWriter writer = new IndexWriter(dir, conf); + + final boolean isNRT = random.nextBoolean(); + DirectoryReader reader; + if (isNRT) { + reader = DirectoryReader.open(writer, true); + } else { + writer.commit(); + reader = DirectoryReader.open(dir); + } + + final int numFields = random.nextInt(4) + 3; // 3-7 + final int numNDVFields = random.nextInt(numFields/2) + 1; // 1-3 + final long[] fieldValues = new long[numFields]; + final boolean[] fieldHasValue = new boolean[numFields]; + Arrays.fill(fieldHasValue, true); + for (int i = 0; i < fieldValues.length; i++) { + fieldValues[i] = 1; + } + + int numRounds = atLeast(15); + int docID = 0; + for (int i = 0; i < numRounds; i++) { + int numDocs = atLeast(5); +// System.out.println("[" + Thread.currentThread().getName() + "]: round=" + i + ", numDocs=" + numDocs); + for (int j = 0; j < numDocs; j++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc-" + docID, Store.NO)); + doc.add(new StringField("key", "all", Store.NO)); // update key + // add all fields with their current value + for (int f = 0; f < fieldValues.length; f++) { + if (f < numNDVFields) { + doc.add(new NumericDocValuesField("f" + f, fieldValues[f])); + } else { + doc.add(new BinaryDocValuesField("f" + f, TestBinaryDocValuesUpdates.toBytes(fieldValues[f]))); + } + } + writer.addDocument(doc); + ++docID; + } + + // if field's value was unset before, unset it from all new added documents too + for (int field = 0; field < fieldHasValue.length; field++) { + if (!fieldHasValue[field]) { + if (field < numNDVFields) { + writer.updateNumericDocValue(new Term("key", "all"), "f" + field, null); + } else { + writer.updateBinaryDocValue(new Term("key", "all"), "f" + field, null); + } + } + } + + int fieldIdx = random.nextInt(fieldValues.length); + String updateField = "f" + fieldIdx; + if (random.nextBoolean()) { +// System.out.println("[" + Thread.currentThread().getName() + "]: unset field '" + updateField + "'"); + fieldHasValue[fieldIdx] = false; + if (fieldIdx < numNDVFields) { + writer.updateNumericDocValue(new Term("key", "all"), updateField, null); + } else { + writer.updateBinaryDocValue(new Term("key", "all"), updateField, null); + } + } else { + fieldHasValue[fieldIdx] = true; + if (fieldIdx < numNDVFields) { + writer.updateNumericDocValue(new Term("key", "all"), updateField, ++fieldValues[fieldIdx]); + } else { + writer.updateBinaryDocValue(new Term("key", "all"), updateField, TestBinaryDocValuesUpdates.toBytes(++fieldValues[fieldIdx])); + } +// System.out.println("[" + Thread.currentThread().getName() + "]: updated field '" + updateField + "' to value " + fieldValues[fieldIdx]); + } + + if (random.nextDouble() < 0.2) { + int deleteDoc = random.nextInt(docID); // might also delete an already deleted document, ok! + writer.deleteDocuments(new Term("id", "doc-" + deleteDoc)); +// System.out.println("[" + Thread.currentThread().getName() + "]: deleted document: doc-" + deleteDoc); + } + + // verify reader + if (!isNRT) { + writer.commit(); + } + +// System.out.println("[" + Thread.currentThread().getName() + "]: reopen reader: " + reader); + DirectoryReader newReader = DirectoryReader.openIfChanged(reader); + assertNotNull(newReader); + reader.close(); + reader = newReader; +// System.out.println("[" + Thread.currentThread().getName() + "]: reopened reader: " + reader); + assertTrue(reader.numDocs() > 0); // we delete at most one document per round + BytesRef scratch = new BytesRef(); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); +// System.out.println(((SegmentReader) r).getSegmentName()); + Bits liveDocs = r.getLiveDocs(); + for (int field = 0; field < fieldValues.length; field++) { + String f = "f" + field; + BinaryDocValues bdv = r.getBinaryDocValues(f); + NumericDocValues ndv = r.getNumericDocValues(f); + Bits docsWithField = r.getDocsWithField(f); + if (field < numNDVFields) { + assertNotNull(ndv); + assertNull(bdv); + } else { + assertNull(ndv); + assertNotNull(bdv); + } + int maxDoc = r.maxDoc(); + for (int doc = 0; doc < maxDoc; doc++) { + if (liveDocs == null || liveDocs.get(doc)) { +// System.out.println("doc=" + (doc + context.docBase) + " f='" + f + "' vslue=" + getValue(bdv, doc, scratch)); + if (fieldHasValue[field]) { + assertTrue(docsWithField.get(doc)); + if (field < numNDVFields) { + assertEquals("invalid value for doc=" + doc + ", field=" + f + ", reader=" + r, fieldValues[field], ndv.get(doc)); + } else { + assertEquals("invalid value for doc=" + doc + ", field=" + f + ", reader=" + r, fieldValues[field], TestBinaryDocValuesUpdates.getValue(bdv, doc, scratch)); + } + } else { + assertFalse(docsWithField.get(doc)); + } + } + } + } + } +// System.out.println(); + } + + IOUtils.close(writer, reader, dir); + } + + public void testStressMultiThreading() throws Exception { + final Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + final IndexWriter writer = new IndexWriter(dir, conf); + + // create index + final int numThreads = TestUtil.nextInt(random(), 3, 6); + final int numDocs = atLeast(2000); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc" + i, Store.NO)); + double group = random().nextDouble(); + String g; + if (group < 0.1) g = "g0"; + else if (group < 0.5) g = "g1"; + else if (group < 0.8) g = "g2"; + else g = "g3"; + doc.add(new StringField("updKey", g, Store.NO)); + for (int j = 0; j < numThreads; j++) { + long value = random().nextInt(); + doc.add(new BinaryDocValuesField("f" + j, TestBinaryDocValuesUpdates.toBytes(value))); + doc.add(new NumericDocValuesField("cf" + j, value * 2)); // control, always updated to f * 2 + } + writer.addDocument(doc); + } + + final CountDownLatch done = new CountDownLatch(numThreads); + final AtomicInteger numUpdates = new AtomicInteger(atLeast(100)); + + // same thread updates a field as well as reopens + Thread[] threads = new Thread[numThreads]; + for (int i = 0; i < threads.length; i++) { + final String f = "f" + i; + final String cf = "cf" + i; + threads[i] = new Thread("UpdateThread-" + i) { + @Override + public void run() { + DirectoryReader reader = null; + boolean success = false; + try { + Random random = random(); + while (numUpdates.getAndDecrement() > 0) { + double group = random.nextDouble(); + Term t; + if (group < 0.1) t = new Term("updKey", "g0"); + else if (group < 0.5) t = new Term("updKey", "g1"); + else if (group < 0.8) t = new Term("updKey", "g2"); + else t = new Term("updKey", "g3"); +// System.out.println("[" + Thread.currentThread().getName() + "] numUpdates=" + numUpdates + " updateTerm=" + t); + if (random.nextBoolean()) { // sometimes unset a value +// System.err.println("[" + Thread.currentThread().getName() + "] t=" + t + ", f=" + f + ", updValue=UNSET"); + writer.updateBinaryDocValue(t, f, null); + writer.updateNumericDocValue(t, cf, null); + } else { + long updValue = random.nextInt(); +// System.err.println("[" + Thread.currentThread().getName() + "] t=" + t + ", f=" + f + ", updValue=" + updValue); + writer.updateBinaryDocValue(t, f, TestBinaryDocValuesUpdates.toBytes(updValue)); + writer.updateNumericDocValue(t, cf, updValue * 2); + } + + if (random.nextDouble() < 0.2) { + // delete a random document + int doc = random.nextInt(numDocs); +// System.out.println("[" + Thread.currentThread().getName() + "] deleteDoc=doc" + doc); + writer.deleteDocuments(new Term("id", "doc" + doc)); + } + + if (random.nextDouble() < 0.05) { // commit every 20 updates on average +// System.out.println("[" + Thread.currentThread().getName() + "] commit"); + writer.commit(); + } + + if (random.nextDouble() < 0.1) { // reopen NRT reader (apply updates), on average once every 10 updates + if (reader == null) { +// System.out.println("[" + Thread.currentThread().getName() + "] open NRT"); + reader = DirectoryReader.open(writer, true); + } else { +// System.out.println("[" + Thread.currentThread().getName() + "] reopen NRT"); + DirectoryReader r2 = DirectoryReader.openIfChanged(reader, writer, true); + if (r2 != null) { + reader.close(); + reader = r2; + } + } + } + } +// System.out.println("[" + Thread.currentThread().getName() + "] DONE"); + success = true; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + if (success) { // suppress this exception only if there was another exception + throw new RuntimeException(e); + } + } + } + done.countDown(); + } + } + }; + } + + for (Thread t : threads) t.start(); + done.await(); + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + BytesRef scratch = new BytesRef(); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + for (int i = 0; i < numThreads; i++) { + BinaryDocValues bdv = r.getBinaryDocValues("f" + i); + NumericDocValues control = r.getNumericDocValues("cf" + i); + Bits docsWithBdv = r.getDocsWithField("f" + i); + Bits docsWithControl = r.getDocsWithField("cf" + i); + Bits liveDocs = r.getLiveDocs(); + for (int j = 0; j < r.maxDoc(); j++) { + if (liveDocs == null || liveDocs.get(j)) { + assertEquals(docsWithBdv.get(j), docsWithControl.get(j)); + if (docsWithBdv.get(j)) { + long ctrlValue = control.get(j); + long bdvValue = TestBinaryDocValuesUpdates.getValue(bdv, j, scratch) * 2; +// if (ctrlValue != bdvValue) { +// System.out.println("seg=" + r + ", f=f" + i + ", doc=" + j + ", group=" + r.document(j).get("updKey") + ", ctrlValue=" + ctrlValue + ", bdvBytes=" + scratch); +// } + assertEquals(ctrlValue, bdvValue); + } + } + } + } + } + reader.close(); + + dir.close(); + } + + public void testUpdateDifferentDocsInDifferentGens() throws Exception { + // update same document multiple times across generations + Directory dir = newDirectory(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random())); + conf.setMaxBufferedDocs(4); + IndexWriter writer = new IndexWriter(dir, conf); + final int numDocs = atLeast(10); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "doc" + i, Store.NO)); + long value = random().nextInt(); + doc.add(new BinaryDocValuesField("f", TestBinaryDocValuesUpdates.toBytes(value))); + doc.add(new NumericDocValuesField("cf", value * 2)); + writer.addDocument(doc); + } + + int numGens = atLeast(5); + BytesRef scratch = new BytesRef(); + for (int i = 0; i < numGens; i++) { + int doc = random().nextInt(numDocs); + Term t = new Term("id", "doc" + doc); + long value = random().nextLong(); + writer.updateBinaryDocValue(t, "f", TestBinaryDocValuesUpdates.toBytes(value)); + writer.updateNumericDocValue(t, "cf", value * 2); + DirectoryReader reader = DirectoryReader.open(writer, true); + for (AtomicReaderContext context : reader.leaves()) { + AtomicReader r = context.reader(); + BinaryDocValues fbdv = r.getBinaryDocValues("f"); + NumericDocValues cfndv = r.getNumericDocValues("cf"); + for (int j = 0; j < r.maxDoc(); j++) { + assertEquals(cfndv.get(j), TestBinaryDocValuesUpdates.getValue(fbdv, j, scratch) * 2); + } + } + reader.close(); + } + writer.close(); + dir.close(); + } + + public void testTonsOfUpdates() throws Exception { + // LUCENE-5248: make sure that when there are many updates, we don't use too much RAM + Directory dir = newDirectory(); + final Random random = random(); + IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random)); + conf.setRAMBufferSizeMB(IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB); + conf.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH); // don't flush by doc + IndexWriter writer = new IndexWriter(dir, conf); + + // test data: lots of documents (few 10Ks) and lots of update terms (few hundreds) + final int numDocs = atLeast(20000); + final int numBinaryFields = atLeast(5); + final int numTerms = TestUtil.nextInt(random, 10, 100); // terms should affect many docs + Set updateTerms = new HashSet<>(); + while (updateTerms.size() < numTerms) { + updateTerms.add(TestUtil.randomSimpleString(random)); + } + +// System.out.println("numDocs=" + numDocs + " numBinaryFields=" + numBinaryFields + " numTerms=" + numTerms); + + // build a large index with many BDV fields and update terms + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + int numUpdateTerms = TestUtil.nextInt(random, 1, numTerms / 10); + for (int j = 0; j < numUpdateTerms; j++) { + doc.add(new StringField("upd", RandomPicks.randomFrom(random, updateTerms), Store.NO)); + } + for (int j = 0; j < numBinaryFields; j++) { + long val = random.nextInt(); + doc.add(new BinaryDocValuesField("f" + j, TestBinaryDocValuesUpdates.toBytes(val))); + doc.add(new NumericDocValuesField("cf" + j, val * 2)); + } + writer.addDocument(doc); + } + + writer.commit(); // commit so there's something to apply to + + // set to flush every 2048 bytes (approximately every 12 updates), so we get + // many flushes during binary updates + writer.getConfig().setRAMBufferSizeMB(2048.0 / 1024 / 1024); + final int numUpdates = atLeast(100); +// System.out.println("numUpdates=" + numUpdates); + for (int i = 0; i < numUpdates; i++) { + int field = random.nextInt(numBinaryFields); + Term updateTerm = new Term("upd", RandomPicks.randomFrom(random, updateTerms)); + long value = random.nextInt(); + writer.updateBinaryDocValue(updateTerm, "f" + field, TestBinaryDocValuesUpdates.toBytes(value)); + writer.updateNumericDocValue(updateTerm, "cf" + field, value * 2); + } + + writer.close(); + + DirectoryReader reader = DirectoryReader.open(dir); + BytesRef scratch = new BytesRef(); + for (AtomicReaderContext context : reader.leaves()) { + for (int i = 0; i < numBinaryFields; i++) { + AtomicReader r = context.reader(); + BinaryDocValues f = r.getBinaryDocValues("f" + i); + NumericDocValues cf = r.getNumericDocValues("cf" + i); + for (int j = 0; j < r.maxDoc(); j++) { + assertEquals("reader=" + r + ", field=f" + i + ", doc=" + j, cf.get(j), TestBinaryDocValuesUpdates.getValue(f, j, scratch) * 2); + } + } + } + reader.close(); + + dir.close(); + } + +} diff --git a/lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java b/lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java index 1491d37ec6c..7bd768acf4e 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestNumericDocValuesUpdates.java @@ -33,7 +33,6 @@ import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase.SuppressCodecs; import org.apache.lucene.util.TestUtil; -import org.apache.lucene.util.TestUtil; import org.junit.Test; import com.carrotsearch.randomizedtesting.generators.RandomPicks; @@ -357,7 +356,7 @@ public class TestNumericDocValuesUpdates extends LuceneTestCase { SortedSetDocValues ssdv = r.getSortedSetDocValues("ssdv"); BytesRef scratch = new BytesRef(); for (int i = 0; i < r.maxDoc(); i++) { - assertEquals(17, ndv.get(0)); + assertEquals(17, ndv.get(i)); bdv.get(i, scratch); assertEquals(new BytesRef(Integer.toString(i)), scratch); sdv.get(i, scratch); diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java index 4790fa127d6..070e5690352 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java +++ b/lucene/test-framework/src/java/org/apache/lucene/index/RandomIndexWriter.java @@ -27,6 +27,7 @@ import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.NullInfoStream; @@ -224,6 +225,10 @@ public class RandomIndexWriter implements Closeable { w.updateNumericDocValue(term, field, value); } + public void updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException { + w.updateBinaryDocValue(term, field, value); + } + public void deleteDocuments(Term term) throws IOException { w.deleteDocuments(term); }