mirror of https://github.com/apache/lucene.git
LUCENE-5513: add IndexWriter.updateBinaryDocValue
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1578784 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6ea3907374
commit
c5263086b4
lucene
CHANGES.txt
core/src
java/org/apache/lucene/index
BinaryDocValuesFieldUpdates.javaBufferedUpdates.javaBufferedUpdatesStream.javaCoalescedUpdates.javaDocValuesFieldUpdates.javaDocValuesUpdate.javaDocumentsWriter.javaDocumentsWriterDeleteQueue.javaDocumentsWriterPerThread.javaFrozenBufferedUpdates.javaIndexWriter.javaNumericDocValuesFieldUpdates.javaNumericFieldUpdates.javaNumericUpdate.javaReadersAndUpdates.java
test/org/apache/lucene/index
test-framework/src/java/org/apache/lucene/index
|
@ -108,6 +108,10 @@ New Features
|
|||
* LUCENE-5530: ComplexPhraseQueryParser throws ParseException for fielded queries.
|
||||
(Erick Erickson via Tomas Fernandez Lobbe and Ahmet Arslan)
|
||||
|
||||
* LUCENE-5513: Add IndexWriter.updateBinaryDocValue which lets
|
||||
you update the value of a BinaryDocValuesField without reindexing the
|
||||
document(s). (Shai Erera)
|
||||
|
||||
API Changes
|
||||
|
||||
* LUCENE-5454: Add RandomAccessOrds, an optional extension of SortedSetDocValues
|
||||
|
|
|
@ -0,0 +1,233 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import org.apache.lucene.document.BinaryDocValuesField;
|
||||
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
import org.apache.lucene.util.InPlaceMergeSorter;
|
||||
import org.apache.lucene.util.packed.PackedInts;
|
||||
import org.apache.lucene.util.packed.PagedGrowableWriter;
|
||||
import org.apache.lucene.util.packed.PagedMutable;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* A {@link DocValuesFieldUpdates} which holds updates of documents, of a single
|
||||
* {@link BinaryDocValuesField}.
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
class BinaryDocValuesFieldUpdates extends DocValuesFieldUpdates {
|
||||
|
||||
final static class Iterator extends DocValuesFieldUpdates.Iterator {
|
||||
private final PagedGrowableWriter offsets;
|
||||
private final int size;
|
||||
private final PagedGrowableWriter lengths;
|
||||
private final PagedMutable docs;
|
||||
private final FixedBitSet docsWithField;
|
||||
private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE
|
||||
private int doc = -1;
|
||||
private final BytesRef value;
|
||||
private int offset, length;
|
||||
|
||||
Iterator(int size, PagedGrowableWriter offsets, PagedGrowableWriter lengths,
|
||||
PagedMutable docs, BytesRef values, FixedBitSet docsWithField) {
|
||||
this.offsets = offsets;
|
||||
this.size = size;
|
||||
this.lengths = lengths;
|
||||
this.docs = docs;
|
||||
this.docsWithField = docsWithField;
|
||||
value = values.clone();
|
||||
}
|
||||
|
||||
@Override
|
||||
BytesRef value() {
|
||||
if (offset == -1) {
|
||||
return null;
|
||||
} else {
|
||||
value.offset = offset;
|
||||
value.length = length;
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
int nextDoc() {
|
||||
if (idx >= size) {
|
||||
offset = -1;
|
||||
return doc = DocIdSetIterator.NO_MORE_DOCS;
|
||||
}
|
||||
doc = (int) docs.get(idx);
|
||||
++idx;
|
||||
while (idx < size && docs.get(idx) == doc) {
|
||||
++idx;
|
||||
}
|
||||
// idx points to the "next" element
|
||||
long prevIdx = idx - 1;
|
||||
if (!docsWithField.get((int) prevIdx)) {
|
||||
offset = -1;
|
||||
} else {
|
||||
// cannot change 'value' here because nextDoc is called before the
|
||||
// value is used, and it's a waste to clone the BytesRef when we
|
||||
// obtain the value
|
||||
offset = (int) offsets.get(prevIdx);
|
||||
length = (int) lengths.get(prevIdx);
|
||||
}
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
int doc() {
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
void reset() {
|
||||
doc = -1;
|
||||
offset = -1;
|
||||
idx = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private FixedBitSet docsWithField;
|
||||
private PagedMutable docs;
|
||||
private PagedGrowableWriter offsets, lengths;
|
||||
private BytesRef values;
|
||||
private int size;
|
||||
|
||||
public BinaryDocValuesFieldUpdates(String field, int maxDoc) {
|
||||
super(field, Type.BINARY);
|
||||
docsWithField = new FixedBitSet(64);
|
||||
docs = new PagedMutable(1, 1024, PackedInts.bitsRequired(maxDoc - 1), PackedInts.COMPACT);
|
||||
offsets = new PagedGrowableWriter(1, 1024, 1, PackedInts.FAST);
|
||||
lengths = new PagedGrowableWriter(1, 1024, 1, PackedInts.FAST);
|
||||
values = new BytesRef(16); // start small
|
||||
size = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(int doc, Object value) {
|
||||
// TODO: if the Sorter interface changes to take long indexes, we can remove that limitation
|
||||
if (size == Integer.MAX_VALUE) {
|
||||
throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries");
|
||||
}
|
||||
|
||||
BytesRef val = (BytesRef) value;
|
||||
if (val == null) {
|
||||
val = BinaryDocValuesUpdate.MISSING;
|
||||
}
|
||||
|
||||
// grow the structures to have room for more elements
|
||||
if (docs.size() == size) {
|
||||
docs = docs.grow(size + 1);
|
||||
offsets = offsets.grow(size + 1);
|
||||
lengths = lengths.grow(size + 1);
|
||||
docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size());
|
||||
}
|
||||
|
||||
if (val != BinaryDocValuesUpdate.MISSING) {
|
||||
// only mark the document as having a value in that field if the value wasn't set to null (MISSING)
|
||||
docsWithField.set(size);
|
||||
}
|
||||
|
||||
docs.set(size, doc);
|
||||
offsets.set(size, values.length);
|
||||
lengths.set(size, val.length);
|
||||
values.append(val);
|
||||
++size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator iterator() {
|
||||
final PagedMutable docs = this.docs;
|
||||
final PagedGrowableWriter offsets = this.offsets;
|
||||
final PagedGrowableWriter lengths = this.lengths;
|
||||
final BytesRef values = this.values;
|
||||
final FixedBitSet docsWithField = this.docsWithField;
|
||||
new InPlaceMergeSorter() {
|
||||
@Override
|
||||
protected void swap(int i, int j) {
|
||||
long tmpDoc = docs.get(j);
|
||||
docs.set(j, docs.get(i));
|
||||
docs.set(i, tmpDoc);
|
||||
|
||||
long tmpOffset = offsets.get(j);
|
||||
offsets.set(j, offsets.get(i));
|
||||
offsets.set(i, tmpOffset);
|
||||
|
||||
long tmpLength = lengths.get(j);
|
||||
lengths.set(j, lengths.get(i));
|
||||
lengths.set(i, tmpLength);
|
||||
|
||||
boolean tmpBool = docsWithField.get(j);
|
||||
if (docsWithField.get(i)) {
|
||||
docsWithField.set(j);
|
||||
} else {
|
||||
docsWithField.clear(j);
|
||||
}
|
||||
if (tmpBool) {
|
||||
docsWithField.set(i);
|
||||
} else {
|
||||
docsWithField.clear(i);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int compare(int i, int j) {
|
||||
int x = (int) docs.get(i);
|
||||
int y = (int) docs.get(j);
|
||||
return (x < y) ? -1 : ((x == y) ? 0 : 1);
|
||||
}
|
||||
}.sort(0, size);
|
||||
|
||||
return new Iterator(size, offsets, lengths, docs, values, docsWithField);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(DocValuesFieldUpdates other) {
|
||||
BinaryDocValuesFieldUpdates otherUpdates = (BinaryDocValuesFieldUpdates) other;
|
||||
int newSize = size + otherUpdates.size;
|
||||
if (newSize > Integer.MAX_VALUE) {
|
||||
throw new IllegalStateException(
|
||||
"cannot support more than Integer.MAX_VALUE doc/value entries; size="
|
||||
+ size + " other.size=" + otherUpdates.size);
|
||||
}
|
||||
docs = docs.grow(newSize);
|
||||
offsets = offsets.grow(newSize);
|
||||
lengths = lengths.grow(newSize);
|
||||
docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size());
|
||||
for (int i = 0; i < otherUpdates.size; i++) {
|
||||
int doc = (int) otherUpdates.docs.get(i);
|
||||
if (otherUpdates.docsWithField.get(i)) {
|
||||
docsWithField.set(size);
|
||||
}
|
||||
docs.set(size, doc);
|
||||
offsets.set(size, values.length + otherUpdates.offsets.get(i)); // correct relative offset
|
||||
lengths.set(size, otherUpdates.lengths.get(i));
|
||||
++size;
|
||||
}
|
||||
values.append(otherUpdates.values);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean any() {
|
||||
return size > 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -25,6 +25,8 @@ import java.util.Map;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
|
||||
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
||||
|
@ -92,9 +94,39 @@ class BufferedUpdates {
|
|||
* NumericUpdate (val) counts its own size and isn't accounted for here.
|
||||
*/
|
||||
final static int BYTES_PER_NUMERIC_UPDATE_ENTRY = 7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_INT;
|
||||
|
||||
/* Rough logic: BinaryUpdate calculates its actual size,
|
||||
* including the update Term and DV field (String). The
|
||||
* per-field map holds a reference to the updated field, and
|
||||
* therefore we only account for the object reference and
|
||||
* map space itself. This is incremented when we first see
|
||||
* an updated field.
|
||||
*
|
||||
* HashMap has an array[Entry] w/ varying load
|
||||
* factor (say 2*POINTER). Entry is an object w/ String key,
|
||||
* LinkedHashMap val, int hash, Entry next (OBJ_HEADER + 3*POINTER + INT).
|
||||
*
|
||||
* LinkedHashMap (val) is counted as OBJ_HEADER, array[Entry] ref + header, 4*INT, 1*FLOAT,
|
||||
* Set (entrySet) (2*OBJ_HEADER + ARRAY_HEADER + 2*POINTER + 4*INT + FLOAT)
|
||||
*/
|
||||
final static int BYTES_PER_BINARY_FIELD_ENTRY =
|
||||
7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + 3*RamUsageEstimator.NUM_BYTES_OBJECT_HEADER +
|
||||
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + 5*RamUsageEstimator.NUM_BYTES_INT + RamUsageEstimator.NUM_BYTES_FLOAT;
|
||||
|
||||
/* Rough logic: Incremented when we see another Term for an already updated
|
||||
* field.
|
||||
* LinkedHashMap has an array[Entry] w/ varying load factor
|
||||
* (say 2*POINTER). Entry is an object w/ Term key, BinaryUpdate val,
|
||||
* int hash, Entry next, Entry before, Entry after (OBJ_HEADER + 5*POINTER + INT).
|
||||
*
|
||||
* Term (key) is counted only as POINTER.
|
||||
* BinaryUpdate (val) counts its own size and isn't accounted for here.
|
||||
*/
|
||||
final static int BYTES_PER_BINARY_UPDATE_ENTRY = 7*RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_INT;
|
||||
|
||||
final AtomicInteger numTermDeletes = new AtomicInteger();
|
||||
final AtomicInteger numNumericUpdates = new AtomicInteger();
|
||||
final AtomicInteger numBinaryUpdates = new AtomicInteger();
|
||||
final Map<Term,Integer> terms = new HashMap<>();
|
||||
final Map<Query,Integer> queries = new HashMap<>();
|
||||
final List<Integer> docIDs = new ArrayList<>();
|
||||
|
@ -106,7 +138,16 @@ class BufferedUpdates {
|
|||
// one that came in wins), and helps us detect faster if the same Term is
|
||||
// used to update the same field multiple times (so we later traverse it
|
||||
// only once).
|
||||
final Map<String,LinkedHashMap<Term,NumericUpdate>> numericUpdates = new HashMap<>();
|
||||
final Map<String,LinkedHashMap<Term,NumericDocValuesUpdate>> numericUpdates = new HashMap<>();
|
||||
|
||||
// Map<dvField,Map<updateTerm,BinaryUpdate>>
|
||||
// For each field we keep an ordered list of BinaryUpdates, key'd by the
|
||||
// update Term. LinkedHashMap guarantees we will later traverse the map in
|
||||
// insertion order (so that if two terms affect the same document, the last
|
||||
// one that came in wins), and helps us detect faster if the same Term is
|
||||
// used to update the same field multiple times (so we later traverse it
|
||||
// only once).
|
||||
final Map<String,LinkedHashMap<Term,BinaryDocValuesUpdate>> binaryUpdates = new HashMap<>();
|
||||
|
||||
public static final Integer MAX_INT = Integer.valueOf(Integer.MAX_VALUE);
|
||||
|
||||
|
@ -125,7 +166,7 @@ class BufferedUpdates {
|
|||
if (VERBOSE_DELETES) {
|
||||
return "gen=" + gen + " numTerms=" + numTermDeletes + ", terms=" + terms
|
||||
+ ", queries=" + queries + ", docIDs=" + docIDs + ", numericUpdates=" + numericUpdates
|
||||
+ ", bytesUsed=" + bytesUsed;
|
||||
+ ", binaryUpdates=" + binaryUpdates + ", bytesUsed=" + bytesUsed;
|
||||
} else {
|
||||
String s = "gen=" + gen;
|
||||
if (numTermDeletes.get() != 0) {
|
||||
|
@ -140,6 +181,9 @@ class BufferedUpdates {
|
|||
if (numNumericUpdates.get() != 0) {
|
||||
s += " " + numNumericUpdates.get() + " numeric updates (unique count=" + numericUpdates.size() + ")";
|
||||
}
|
||||
if (numBinaryUpdates.get() != 0) {
|
||||
s += " " + numBinaryUpdates.get() + " binary updates (unique count=" + binaryUpdates.size() + ")";
|
||||
}
|
||||
if (bytesUsed.get() != 0) {
|
||||
s += " bytesUsed=" + bytesUsed.get();
|
||||
}
|
||||
|
@ -184,14 +228,14 @@ class BufferedUpdates {
|
|||
}
|
||||
}
|
||||
|
||||
public void addNumericUpdate(NumericUpdate update, int docIDUpto) {
|
||||
LinkedHashMap<Term,NumericUpdate> fieldUpdates = numericUpdates.get(update.field);
|
||||
public void addNumericUpdate(NumericDocValuesUpdate update, int docIDUpto) {
|
||||
LinkedHashMap<Term,NumericDocValuesUpdate> fieldUpdates = numericUpdates.get(update.field);
|
||||
if (fieldUpdates == null) {
|
||||
fieldUpdates = new LinkedHashMap<>();
|
||||
numericUpdates.put(update.field, fieldUpdates);
|
||||
bytesUsed.addAndGet(BYTES_PER_NUMERIC_FIELD_ENTRY);
|
||||
}
|
||||
final NumericUpdate current = fieldUpdates.get(update.term);
|
||||
final NumericDocValuesUpdate current = fieldUpdates.get(update.term);
|
||||
if (current != null && docIDUpto < current.docIDUpto) {
|
||||
// Only record the new number if it's greater than or equal to the current
|
||||
// one. This is important because if multiple threads are replacing the
|
||||
|
@ -213,17 +257,48 @@ class BufferedUpdates {
|
|||
}
|
||||
}
|
||||
|
||||
public void addBinaryUpdate(BinaryDocValuesUpdate update, int docIDUpto) {
|
||||
LinkedHashMap<Term,BinaryDocValuesUpdate> fieldUpdates = binaryUpdates.get(update.field);
|
||||
if (fieldUpdates == null) {
|
||||
fieldUpdates = new LinkedHashMap<>();
|
||||
binaryUpdates.put(update.field, fieldUpdates);
|
||||
bytesUsed.addAndGet(BYTES_PER_BINARY_FIELD_ENTRY);
|
||||
}
|
||||
final BinaryDocValuesUpdate current = fieldUpdates.get(update.term);
|
||||
if (current != null && docIDUpto < current.docIDUpto) {
|
||||
// Only record the new number if it's greater than or equal to the current
|
||||
// one. This is important because if multiple threads are replacing the
|
||||
// same doc at nearly the same time, it's possible that one thread that
|
||||
// got a higher docID is scheduled before the other threads.
|
||||
return;
|
||||
}
|
||||
|
||||
update.docIDUpto = docIDUpto;
|
||||
// since it's a LinkedHashMap, we must first remove the Term entry so that
|
||||
// it's added last (we're interested in insertion-order).
|
||||
if (current != null) {
|
||||
fieldUpdates.remove(update.term);
|
||||
}
|
||||
fieldUpdates.put(update.term, update);
|
||||
numBinaryUpdates.incrementAndGet();
|
||||
if (current == null) {
|
||||
bytesUsed.addAndGet(BYTES_PER_BINARY_UPDATE_ENTRY + update.sizeInBytes());
|
||||
}
|
||||
}
|
||||
|
||||
void clear() {
|
||||
terms.clear();
|
||||
queries.clear();
|
||||
docIDs.clear();
|
||||
numericUpdates.clear();
|
||||
binaryUpdates.clear();
|
||||
numTermDeletes.set(0);
|
||||
numNumericUpdates.set(0);
|
||||
numBinaryUpdates.set(0);
|
||||
bytesUsed.set(0);
|
||||
}
|
||||
|
||||
boolean any() {
|
||||
return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0 || numericUpdates.size() > 0;
|
||||
return terms.size() > 0 || docIDs.size() > 0 || queries.size() > 0 || numericUpdates.size() > 0 || binaryUpdates.size() > 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,9 +22,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
|
@ -214,20 +212,22 @@ class BufferedUpdatesStream {
|
|||
int delCount = 0;
|
||||
final boolean segAllDeletes;
|
||||
try {
|
||||
Map<String,NumericFieldUpdates> fieldUpdates = null;
|
||||
final DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
|
||||
if (coalescedDeletes != null) {
|
||||
//System.out.println(" del coalesced");
|
||||
delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
|
||||
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
|
||||
fieldUpdates = applyNumericDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, fieldUpdates);
|
||||
applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates);
|
||||
applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates);
|
||||
}
|
||||
//System.out.println(" del exact");
|
||||
// Don't delete by Term here; DocumentsWriterPerThread
|
||||
// already did that on flush:
|
||||
delCount += applyQueryDeletes(packet.queriesIterable(), rld, reader);
|
||||
fieldUpdates = applyNumericDocValuesUpdates(Arrays.asList(packet.updates), rld, reader, fieldUpdates);
|
||||
if (!fieldUpdates.isEmpty()) {
|
||||
rld.writeFieldUpdates(info.info.dir, fieldUpdates);
|
||||
applyDocValuesUpdates(Arrays.asList(packet.numericDVUpdates), rld, reader, dvUpdates);
|
||||
applyDocValuesUpdates(Arrays.asList(packet.binaryDVUpdates), rld, reader, dvUpdates);
|
||||
if (dvUpdates.any()) {
|
||||
rld.writeFieldUpdates(info.info.dir, dvUpdates);
|
||||
}
|
||||
final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
|
||||
assert fullDelCount <= rld.info.info.getDocCount();
|
||||
|
@ -275,9 +275,11 @@ class BufferedUpdatesStream {
|
|||
try {
|
||||
delCount += applyTermDeletes(coalescedDeletes.termsIterable(), rld, reader);
|
||||
delCount += applyQueryDeletes(coalescedDeletes.queriesIterable(), rld, reader);
|
||||
Map<String,NumericFieldUpdates> fieldUpdates = applyNumericDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, null);
|
||||
if (!fieldUpdates.isEmpty()) {
|
||||
rld.writeFieldUpdates(info.info.dir, fieldUpdates);
|
||||
DocValuesFieldUpdates.Container dvUpdates = new DocValuesFieldUpdates.Container();
|
||||
applyDocValuesUpdates(coalescedDeletes.numericDVUpdates, rld, reader, dvUpdates);
|
||||
applyDocValuesUpdates(coalescedDeletes.binaryDVUpdates, rld, reader, dvUpdates);
|
||||
if (dvUpdates.any()) {
|
||||
rld.writeFieldUpdates(info.info.dir, dvUpdates);
|
||||
}
|
||||
final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
|
||||
assert fullDelCount <= rld.info.info.getDocCount();
|
||||
|
@ -436,14 +438,13 @@ class BufferedUpdatesStream {
|
|||
return delCount;
|
||||
}
|
||||
|
||||
// NumericDocValues Updates
|
||||
// If otherFieldUpdates != null, we need to merge the updates into them
|
||||
private synchronized Map<String,NumericFieldUpdates> applyNumericDocValuesUpdates(Iterable<NumericUpdate> updates,
|
||||
ReadersAndUpdates rld, SegmentReader reader, Map<String,NumericFieldUpdates> otherFieldUpdates) throws IOException {
|
||||
// DocValues updates
|
||||
private synchronized void applyDocValuesUpdates(Iterable<? extends DocValuesUpdate> updates,
|
||||
ReadersAndUpdates rld, SegmentReader reader, DocValuesFieldUpdates.Container dvUpdatesContainer) throws IOException {
|
||||
Fields fields = reader.fields();
|
||||
if (fields == null) {
|
||||
// This reader has no postings
|
||||
return Collections.emptyMap();
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: we can process the updates per DV field, from last to first so that
|
||||
|
@ -459,9 +460,9 @@ class BufferedUpdatesStream {
|
|||
String currentField = null;
|
||||
TermsEnum termsEnum = null;
|
||||
DocsEnum docs = null;
|
||||
final Map<String,NumericFieldUpdates> result = otherFieldUpdates == null ? new HashMap<String,NumericFieldUpdates>() : otherFieldUpdates;
|
||||
|
||||
//System.out.println(Thread.currentThread().getName() + " numericDVUpdate reader=" + reader);
|
||||
for (NumericUpdate update : updates) {
|
||||
for (DocValuesUpdate update : updates) {
|
||||
Term term = update.term;
|
||||
int limit = update.docIDUpto;
|
||||
|
||||
|
@ -499,10 +500,9 @@ class BufferedUpdatesStream {
|
|||
|
||||
//System.out.println("BDS: got docsEnum=" + docsEnum);
|
||||
|
||||
NumericFieldUpdates fieldUpdates = result.get(update.field);
|
||||
if (fieldUpdates == null) {
|
||||
fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(reader.maxDoc());
|
||||
result.put(update.field, fieldUpdates);
|
||||
DocValuesFieldUpdates dvUpdates = dvUpdatesContainer.getUpdates(update.field, update.type);
|
||||
if (dvUpdates == null) {
|
||||
dvUpdates = dvUpdatesContainer.newUpdates(update.field, update.type, reader.maxDoc());
|
||||
}
|
||||
int doc;
|
||||
while ((doc = docsEnum.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
|
@ -510,13 +510,12 @@ class BufferedUpdatesStream {
|
|||
if (doc >= limit) {
|
||||
break; // no more docs that can be updated for this term
|
||||
}
|
||||
fieldUpdates.add(doc, update.value);
|
||||
dvUpdates.add(doc, update.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
public static class QueryAndLimit {
|
||||
public final Query query;
|
||||
public final int limit;
|
||||
|
|
|
@ -25,17 +25,23 @@ import java.util.Map;
|
|||
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit;
|
||||
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
|
||||
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.MergedIterator;
|
||||
|
||||
class CoalescedUpdates {
|
||||
final Map<Query,Integer> queries = new HashMap<>();
|
||||
final List<Iterable<Term>> iterables = new ArrayList<>();
|
||||
final List<NumericUpdate> numericDVUpdates = new ArrayList<>();
|
||||
final List<NumericDocValuesUpdate> numericDVUpdates = new ArrayList<>();
|
||||
final List<BinaryDocValuesUpdate> binaryDVUpdates = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
// note: we could add/collect more debugging information
|
||||
return "CoalescedUpdates(termSets=" + iterables.size() + ",queries=" + queries.size() + ",numericUpdates=" + numericDVUpdates.size() + ")";
|
||||
return "CoalescedUpdates(termSets=" + iterables.size() + ",queries="
|
||||
+ queries.size() + ",numericDVUpdates=" + numericDVUpdates.size()
|
||||
+ ",binaryDVUpdates=" + binaryDVUpdates.size() + ")";
|
||||
}
|
||||
|
||||
void update(FrozenBufferedUpdates in) {
|
||||
|
@ -46,11 +52,17 @@ class CoalescedUpdates {
|
|||
queries.put(query, BufferedUpdates.MAX_INT);
|
||||
}
|
||||
|
||||
for (NumericUpdate nu : in.updates) {
|
||||
NumericUpdate clone = new NumericUpdate(nu.term, nu.field, nu.value);
|
||||
for (NumericDocValuesUpdate nu : in.numericDVUpdates) {
|
||||
NumericDocValuesUpdate clone = new NumericDocValuesUpdate(nu.term, nu.field, (Long) nu.value);
|
||||
clone.docIDUpto = Integer.MAX_VALUE;
|
||||
numericDVUpdates.add(clone);
|
||||
}
|
||||
|
||||
for (BinaryDocValuesUpdate bu : in.binaryDVUpdates) {
|
||||
BinaryDocValuesUpdate clone = new BinaryDocValuesUpdate(bu.term, bu.field, (BytesRef) bu.value);
|
||||
clone.docIDUpto = Integer.MAX_VALUE;
|
||||
binaryDVUpdates.add(clone);
|
||||
}
|
||||
}
|
||||
|
||||
public Iterable<Term> termsIterable() {
|
||||
|
|
|
@ -0,0 +1,154 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.index.NumericDocValuesFieldUpdates;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Holds updates of a single DocValues field, for a set of documents.
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
abstract class DocValuesFieldUpdates {
|
||||
|
||||
static enum Type { NUMERIC, BINARY }
|
||||
|
||||
/**
|
||||
* An iterator over documents and their updated values. Only documents with
|
||||
* updates are returned by this iterator, and the documents are returned in
|
||||
* increasing order.
|
||||
*/
|
||||
static abstract class Iterator {
|
||||
|
||||
/**
|
||||
* Returns the next document which has an update, or
|
||||
* {@link DocIdSetIterator#NO_MORE_DOCS} if there are no more documents to
|
||||
* return.
|
||||
*/
|
||||
abstract int nextDoc();
|
||||
|
||||
/** Returns the current document this iterator is on. */
|
||||
abstract int doc();
|
||||
|
||||
/**
|
||||
* Returns the value of the document returned from {@link #nextDoc()}. A
|
||||
* {@code null} value means that it was unset for this document.
|
||||
*/
|
||||
abstract Object value();
|
||||
|
||||
/**
|
||||
* Reset the iterator's state. Should be called before {@link #nextDoc()}
|
||||
* and {@link #value()}.
|
||||
*/
|
||||
abstract void reset();
|
||||
|
||||
}
|
||||
|
||||
static class Container {
|
||||
|
||||
final Map<String,NumericDocValuesFieldUpdates> numericDVUpdates = new HashMap<>();
|
||||
final Map<String,BinaryDocValuesFieldUpdates> binaryDVUpdates = new HashMap<>();
|
||||
|
||||
boolean any() {
|
||||
for (NumericDocValuesFieldUpdates updates : numericDVUpdates.values()) {
|
||||
if (updates.any()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
for (BinaryDocValuesFieldUpdates updates : binaryDVUpdates.values()) {
|
||||
if (updates.any()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int size() {
|
||||
return numericDVUpdates.size() + binaryDVUpdates.size();
|
||||
}
|
||||
|
||||
DocValuesFieldUpdates getUpdates(String field, Type type) {
|
||||
switch (type) {
|
||||
case NUMERIC:
|
||||
return numericDVUpdates.get(field);
|
||||
case BINARY:
|
||||
return binaryDVUpdates.get(field);
|
||||
default:
|
||||
throw new IllegalArgumentException("unsupported type: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
DocValuesFieldUpdates newUpdates(String field, Type type, int maxDoc) {
|
||||
switch (type) {
|
||||
case NUMERIC:
|
||||
assert numericDVUpdates.get(field) == null;
|
||||
NumericDocValuesFieldUpdates numericUpdates = new NumericDocValuesFieldUpdates(field, maxDoc);
|
||||
numericDVUpdates.put(field, numericUpdates);
|
||||
return numericUpdates;
|
||||
case BINARY:
|
||||
assert binaryDVUpdates.get(field) == null;
|
||||
BinaryDocValuesFieldUpdates binaryUpdates = new BinaryDocValuesFieldUpdates(field, maxDoc);
|
||||
binaryDVUpdates.put(field, binaryUpdates);
|
||||
return binaryUpdates;
|
||||
default:
|
||||
throw new IllegalArgumentException("unsupported type: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "numericDVUpdates=" + numericDVUpdates + " binaryDVUpdates=" + binaryDVUpdates;
|
||||
}
|
||||
}
|
||||
|
||||
final String field;
|
||||
final Type type;
|
||||
|
||||
protected DocValuesFieldUpdates(String field, Type type) {
|
||||
this.field = field;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an update to a document. For unsetting a value you should pass
|
||||
* {@code null}.
|
||||
*/
|
||||
public abstract void add(int doc, Object value);
|
||||
|
||||
/**
|
||||
* Returns an {@link Iterator} over the updated documents and their
|
||||
* values.
|
||||
*/
|
||||
public abstract Iterator iterator();
|
||||
|
||||
/**
|
||||
* Merge with another {@link DocValuesFieldUpdates}. This is called for a
|
||||
* segment which received updates while it was being merged. The given updates
|
||||
* should override whatever updates are in that instance.
|
||||
*/
|
||||
public abstract void merge(DocValuesFieldUpdates other);
|
||||
|
||||
/** Returns true if this instance contains any updates.
|
||||
* @return TODO*/
|
||||
public abstract boolean any();
|
||||
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
|
||||
import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_CHAR;
|
||||
import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_INT;
|
||||
import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_HEADER;
|
||||
import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
|
||||
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/** An in-place update to a DocValues field. */
|
||||
abstract class DocValuesUpdate {
|
||||
|
||||
/* Rough logic: OBJ_HEADER + 3*PTR + INT
|
||||
* Term: OBJ_HEADER + 2*PTR
|
||||
* Term.field: 2*OBJ_HEADER + 4*INT + PTR + string.length*CHAR
|
||||
* Term.bytes: 2*OBJ_HEADER + 2*INT + PTR + bytes.length
|
||||
* String: 2*OBJ_HEADER + 4*INT + PTR + string.length*CHAR
|
||||
* T: OBJ_HEADER
|
||||
*/
|
||||
private static final int RAW_SIZE_IN_BYTES = 8*NUM_BYTES_OBJECT_HEADER + 8*NUM_BYTES_OBJECT_REF + 8*NUM_BYTES_INT;
|
||||
|
||||
final DocValuesFieldUpdates.Type type;
|
||||
final Term term;
|
||||
final String field;
|
||||
final Object value;
|
||||
int docIDUpto = -1; // unassigned until applied, and confusing that it's here, when it's just used in BufferedDeletes...
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param term the {@link Term} which determines the documents that will be updated
|
||||
* @param field the {@link NumericDocValuesField} to update
|
||||
* @param value the updated value
|
||||
*/
|
||||
protected DocValuesUpdate(DocValuesFieldUpdates.Type type, Term term, String field, Object value) {
|
||||
this.type = type;
|
||||
this.term = term;
|
||||
this.field = field;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
abstract long valueSizeInBytes();
|
||||
|
||||
final int sizeInBytes() {
|
||||
int sizeInBytes = RAW_SIZE_IN_BYTES;
|
||||
sizeInBytes += term.field.length() * NUM_BYTES_CHAR;
|
||||
sizeInBytes += term.bytes.bytes.length;
|
||||
sizeInBytes += field.length() * NUM_BYTES_CHAR;
|
||||
sizeInBytes += valueSizeInBytes();
|
||||
return sizeInBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "term=" + term + ",field=" + field + ",value=" + value;
|
||||
}
|
||||
|
||||
/** An in-place update to a binary DocValues field */
|
||||
static final class BinaryDocValuesUpdate extends DocValuesUpdate {
|
||||
|
||||
/* Size of BytesRef: 2*INT + ARRAY_HEADER + PTR */
|
||||
private static final long RAW_VALUE_SIZE_IN_BYTES = NUM_BYTES_ARRAY_HEADER + 2*NUM_BYTES_INT + NUM_BYTES_OBJECT_REF;
|
||||
|
||||
static final BytesRef MISSING = new BytesRef();
|
||||
|
||||
BinaryDocValuesUpdate(Term term, String field, BytesRef value) {
|
||||
super(DocValuesFieldUpdates.Type.BINARY, term, field, value == null ? MISSING : value);
|
||||
}
|
||||
|
||||
@Override
|
||||
long valueSizeInBytes() {
|
||||
return RAW_VALUE_SIZE_IN_BYTES + ((BytesRef) value).bytes.length;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** An in-place update to a numeric DocValues field */
|
||||
static final class NumericDocValuesUpdate extends DocValuesUpdate {
|
||||
|
||||
static final Long MISSING = new Long(0);
|
||||
|
||||
NumericDocValuesUpdate(Term term, String field, Long value) {
|
||||
super(DocValuesFieldUpdates.Type.NUMERIC, term, field, value == null ? MISSING : value);
|
||||
}
|
||||
|
||||
@Override
|
||||
long valueSizeInBytes() {
|
||||
return RamUsageEstimator.NUM_BYTES_LONG;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -29,10 +29,13 @@ import org.apache.lucene.analysis.Analyzer;
|
|||
import org.apache.lucene.index.DocumentsWriterFlushQueue.SegmentFlushTicket;
|
||||
import org.apache.lucene.index.DocumentsWriterPerThread.FlushedSegment;
|
||||
import org.apache.lucene.index.DocumentsWriterPerThreadPool.ThreadState;
|
||||
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
|
||||
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
|
||||
import org.apache.lucene.index.IndexWriter.Event;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
|
||||
/**
|
||||
|
@ -160,7 +163,14 @@ final class DocumentsWriter {
|
|||
|
||||
synchronized boolean updateNumericDocValue(Term term, String field, Long value) throws IOException {
|
||||
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
|
||||
deleteQueue.addNumericUpdate(new NumericUpdate(term, field, value));
|
||||
deleteQueue.addNumericUpdate(new NumericDocValuesUpdate(term, field, value));
|
||||
flushControl.doOnDelete();
|
||||
return applyAllDeletes(deleteQueue);
|
||||
}
|
||||
|
||||
synchronized boolean updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException {
|
||||
final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
|
||||
deleteQueue.addBinaryUpdate(new BinaryDocValuesUpdate(term, field, value));
|
||||
flushControl.doOnDelete();
|
||||
return applyAllDeletes(deleteQueue);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ import java.util.Arrays;
|
|||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
|
||||
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
|
||||
import org.apache.lucene.search.Query;
|
||||
|
||||
/**
|
||||
|
@ -107,10 +109,15 @@ final class DocumentsWriterDeleteQueue {
|
|||
tryApplyGlobalSlice();
|
||||
}
|
||||
|
||||
void addNumericUpdate(NumericUpdate update) {
|
||||
void addNumericUpdate(NumericDocValuesUpdate update) {
|
||||
add(new NumericUpdateNode(update));
|
||||
tryApplyGlobalSlice();
|
||||
}
|
||||
|
||||
void addBinaryUpdate(BinaryDocValuesUpdate update) {
|
||||
add(new BinaryUpdateNode(update));
|
||||
tryApplyGlobalSlice();
|
||||
}
|
||||
|
||||
/**
|
||||
* invariant for document update
|
||||
|
@ -385,9 +392,9 @@ final class DocumentsWriterDeleteQueue {
|
|||
}
|
||||
}
|
||||
|
||||
private static final class NumericUpdateNode extends Node<NumericUpdate> {
|
||||
private static final class NumericUpdateNode extends Node<NumericDocValuesUpdate> {
|
||||
|
||||
NumericUpdateNode(NumericUpdate update) {
|
||||
NumericUpdateNode(NumericDocValuesUpdate update) {
|
||||
super(update);
|
||||
}
|
||||
|
||||
|
@ -401,6 +408,23 @@ final class DocumentsWriterDeleteQueue {
|
|||
return "update=" + item;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class BinaryUpdateNode extends Node<BinaryDocValuesUpdate> {
|
||||
|
||||
BinaryUpdateNode(BinaryDocValuesUpdate update) {
|
||||
super(update);
|
||||
}
|
||||
|
||||
@Override
|
||||
void apply(BufferedUpdates bufferedUpdates, int docIDUpto) {
|
||||
bufferedUpdates.addBinaryUpdate(item, docIDUpto);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "update=" + item;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean forceApplyGlobalSlice() {
|
||||
globalBufferLock.lock();
|
||||
|
|
|
@ -480,7 +480,7 @@ class DocumentsWriterPerThread {
|
|||
}
|
||||
|
||||
final BufferedUpdates segmentDeletes;
|
||||
if (pendingUpdates.queries.isEmpty() && pendingUpdates.numericUpdates.isEmpty()) {
|
||||
if (pendingUpdates.queries.isEmpty() && pendingUpdates.numericUpdates.isEmpty() && pendingUpdates.binaryUpdates.isEmpty()) {
|
||||
pendingUpdates.clear();
|
||||
segmentDeletes = null;
|
||||
} else {
|
||||
|
|
|
@ -24,6 +24,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import org.apache.lucene.index.BufferedUpdatesStream.QueryAndLimit;
|
||||
import org.apache.lucene.index.DocValuesUpdate.BinaryDocValuesUpdate;
|
||||
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
|
@ -48,7 +50,10 @@ class FrozenBufferedUpdates {
|
|||
final int[] queryLimits;
|
||||
|
||||
// numeric DV update term and their updates
|
||||
final NumericUpdate[] updates;
|
||||
final NumericDocValuesUpdate[] numericDVUpdates;
|
||||
|
||||
// binary DV update term and their updates
|
||||
final BinaryDocValuesUpdate[] binaryDVUpdates;
|
||||
|
||||
final int bytesUsed;
|
||||
final int numTermDeletes;
|
||||
|
@ -83,17 +88,34 @@ class FrozenBufferedUpdates {
|
|||
// so that it maps to all fields it affects, sorted by their docUpto, and traverse
|
||||
// that Term only once, applying the update to all fields that still need to be
|
||||
// updated.
|
||||
List<NumericUpdate> allUpdates = new ArrayList<>();
|
||||
List<NumericDocValuesUpdate> allNumericUpdates = new ArrayList<>();
|
||||
int numericUpdatesSize = 0;
|
||||
for (LinkedHashMap<Term,NumericUpdate> fieldUpdates : deletes.numericUpdates.values()) {
|
||||
for (NumericUpdate update : fieldUpdates.values()) {
|
||||
allUpdates.add(update);
|
||||
for (LinkedHashMap<Term,NumericDocValuesUpdate> numericUpdates : deletes.numericUpdates.values()) {
|
||||
for (NumericDocValuesUpdate update : numericUpdates.values()) {
|
||||
allNumericUpdates.add(update);
|
||||
numericUpdatesSize += update.sizeInBytes();
|
||||
}
|
||||
}
|
||||
updates = allUpdates.toArray(new NumericUpdate[allUpdates.size()]);
|
||||
numericDVUpdates = allNumericUpdates.toArray(new NumericDocValuesUpdate[allNumericUpdates.size()]);
|
||||
|
||||
// TODO if a Term affects multiple fields, we could keep the updates key'd by Term
|
||||
// so that it maps to all fields it affects, sorted by their docUpto, and traverse
|
||||
// that Term only once, applying the update to all fields that still need to be
|
||||
// updated.
|
||||
List<BinaryDocValuesUpdate> allBinaryUpdates = new ArrayList<>();
|
||||
int binaryUpdatesSize = 0;
|
||||
for (LinkedHashMap<Term,BinaryDocValuesUpdate> binaryUpdates : deletes.binaryUpdates.values()) {
|
||||
for (BinaryDocValuesUpdate update : binaryUpdates.values()) {
|
||||
allBinaryUpdates.add(update);
|
||||
binaryUpdatesSize += update.sizeInBytes();
|
||||
}
|
||||
}
|
||||
binaryDVUpdates = allBinaryUpdates.toArray(new BinaryDocValuesUpdate[allBinaryUpdates.size()]);
|
||||
|
||||
bytesUsed = (int) terms.getSizeInBytes() + queries.length * BYTES_PER_DEL_QUERY
|
||||
+ numericUpdatesSize + numericDVUpdates.length * RamUsageEstimator.NUM_BYTES_OBJECT_REF
|
||||
+ binaryUpdatesSize + binaryDVUpdates.length * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
|
||||
|
||||
bytesUsed = (int) terms.getSizeInBytes() + queries.length * BYTES_PER_DEL_QUERY + numericUpdatesSize + updates.length * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
|
||||
numTermDeletes = deletes.numTermDeletes.get();
|
||||
}
|
||||
|
||||
|
@ -161,6 +183,6 @@ class FrozenBufferedUpdates {
|
|||
}
|
||||
|
||||
boolean any() {
|
||||
return termCount > 0 || queries.length > 0 || updates.length > 0;
|
||||
return termCount > 0 || queries.length > 0 || numericDVUpdates.length > 0 || binaryDVUpdates.length > 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,28 +17,6 @@ package org.apache.lucene.index;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.index.FieldInfo.DocValuesType;
|
||||
import org.apache.lucene.index.FieldInfos.FieldNumbers;
|
||||
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
|
||||
import org.apache.lucene.index.MergeState.CheckAbort;
|
||||
import org.apache.lucene.index.NumericFieldUpdates.UpdatesIterator;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.CompoundFileDirectory;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.Lock;
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.apache.lucene.store.MergeInfo;
|
||||
import org.apache.lucene.store.TrackingDirectoryWrapper;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.ThreadInterruptedException;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -57,6 +35,28 @@ import java.util.Queue;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.index.FieldInfo.DocValuesType;
|
||||
import org.apache.lucene.index.FieldInfos.FieldNumbers;
|
||||
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
|
||||
import org.apache.lucene.index.MergeState.CheckAbort;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.CompoundFileDirectory;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.Lock;
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.apache.lucene.store.MergeInfo;
|
||||
import org.apache.lucene.store.TrackingDirectoryWrapper;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.ThreadInterruptedException;
|
||||
|
||||
/**
|
||||
An <code>IndexWriter</code> creates and maintains an index.
|
||||
|
||||
|
@ -1542,10 +1542,11 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
}
|
||||
|
||||
/**
|
||||
* Updates a document's NumericDocValue for <code>field</code> to the given
|
||||
* <code>value</code>. This method can be used to 'unset' a document's value
|
||||
* by passing {@code null} as the new value. Also, you can only update fields
|
||||
* that already exist in the index, not add new fields through this method.
|
||||
* Updates a document's {@link NumericDocValues} for <code>field</code> to the
|
||||
* given <code>value</code>. This method can be used to 'unset' a document's
|
||||
* value by passing {@code null} as the new value. Also, you can only update
|
||||
* fields that already exist in the index, not add new fields through this
|
||||
* method.
|
||||
*
|
||||
* <p>
|
||||
* <b>NOTE</b>: if this method hits an OutOfMemoryError you should immediately
|
||||
|
@ -1555,7 +1556,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
* @param term
|
||||
* the term to identify the document(s) to be updated
|
||||
* @param field
|
||||
* field name of the NumericDocValues field
|
||||
* field name of the {@link NumericDocValues} field
|
||||
* @param value
|
||||
* new value for the field
|
||||
* @throws CorruptIndexException
|
||||
|
@ -1577,6 +1578,47 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates a document's {@link BinaryDocValues} for <code>field</code> to the
|
||||
* given <code>value</code>. This method can be used to 'unset' a document's
|
||||
* value by passing {@code null} as the new value. Also, you can only update
|
||||
* fields that already exist in the index, not add new fields through this
|
||||
* method.
|
||||
*
|
||||
* <p>
|
||||
* <b>NOTE:</b> this method currently replaces the existing value of all
|
||||
* affected documents with the new value.
|
||||
*
|
||||
* <p>
|
||||
* <b>NOTE:</b> if this method hits an OutOfMemoryError you should immediately
|
||||
* close the writer. See <a href="#OOME">above</a> for details.
|
||||
* </p>
|
||||
*
|
||||
* @param term
|
||||
* the term to identify the document(s) to be updated
|
||||
* @param field
|
||||
* field name of the {@link BinaryDocValues} field
|
||||
* @param value
|
||||
* new value for the field
|
||||
* @throws CorruptIndexException
|
||||
* if the index is corrupt
|
||||
* @throws IOException
|
||||
* if there is a low-level IO error
|
||||
*/
|
||||
public void updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException {
|
||||
ensureOpen();
|
||||
if (!globalFieldNumberMap.contains(field, DocValuesType.BINARY)) {
|
||||
throw new IllegalArgumentException("can only update existing binary-docvalues fields!");
|
||||
}
|
||||
try {
|
||||
if (docWriter.updateBinaryDocValue(term, field, value)) {
|
||||
processEvents(true, false);
|
||||
}
|
||||
} catch (OutOfMemoryError oom) {
|
||||
handleOOM(oom, "updateBinaryDocValue");
|
||||
}
|
||||
}
|
||||
|
||||
// for test purpose
|
||||
final synchronized int getSegmentCount(){
|
||||
return segmentInfos.size();
|
||||
|
@ -3162,13 +3204,13 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
return docMap;
|
||||
}
|
||||
|
||||
private void skipDeletedDoc(UpdatesIterator[] updatesIters, int deletedDoc) {
|
||||
for (UpdatesIterator iter : updatesIters) {
|
||||
private void skipDeletedDoc(DocValuesFieldUpdates.Iterator[] updatesIters, int deletedDoc) {
|
||||
for (DocValuesFieldUpdates.Iterator iter : updatesIters) {
|
||||
if (iter.doc() == deletedDoc) {
|
||||
iter.nextDoc();
|
||||
}
|
||||
// when entering the method, all iterators must already be beyond the
|
||||
// deleted document, or right on it, in which case we advance them above
|
||||
// deleted document, or right on it, in which case we advance them over
|
||||
// and they must be beyond it now.
|
||||
assert iter.doc() > deletedDoc : "updateDoc=" + iter.doc() + " deletedDoc=" + deletedDoc;
|
||||
}
|
||||
|
@ -3203,7 +3245,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
ReadersAndUpdates mergedDeletesAndUpdates = null;
|
||||
boolean initWritableLiveDocs = false;
|
||||
MergePolicy.DocMap docMap = null;
|
||||
final Map<String,NumericFieldUpdates> mergedFieldUpdates = new HashMap<>();
|
||||
final DocValuesFieldUpdates.Container mergedDVUpdates = new DocValuesFieldUpdates.Container();
|
||||
|
||||
for (int i = 0; i < sourceSegments.size(); i++) {
|
||||
SegmentCommitInfo info = sourceSegments.get(i);
|
||||
|
@ -3214,19 +3256,28 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
// We hold a ref so it should still be in the pool:
|
||||
assert rld != null: "seg=" + info.info.name;
|
||||
final Bits currentLiveDocs = rld.getLiveDocs();
|
||||
final Map<String,NumericFieldUpdates> mergingFieldUpdates = rld.getMergingFieldUpdates();
|
||||
final Map<String,DocValuesFieldUpdates> mergingFieldUpdates = rld.getMergingFieldUpdates();
|
||||
final String[] mergingFields;
|
||||
final UpdatesIterator[] updatesIters;
|
||||
final DocValuesFieldUpdates[] dvFieldUpdates;
|
||||
final DocValuesFieldUpdates.Iterator[] updatesIters;
|
||||
if (mergingFieldUpdates.isEmpty()) {
|
||||
mergingFields = null;
|
||||
updatesIters = null;
|
||||
dvFieldUpdates = null;
|
||||
} else {
|
||||
mergingFields = new String[mergingFieldUpdates.size()];
|
||||
updatesIters = new UpdatesIterator[mergingFieldUpdates.size()];
|
||||
dvFieldUpdates = new DocValuesFieldUpdates[mergingFieldUpdates.size()];
|
||||
updatesIters = new DocValuesFieldUpdates.Iterator[mergingFieldUpdates.size()];
|
||||
int idx = 0;
|
||||
for (Entry<String,NumericFieldUpdates> e : mergingFieldUpdates.entrySet()) {
|
||||
mergingFields[idx] = e.getKey();
|
||||
updatesIters[idx] = e.getValue().getUpdates();
|
||||
for (Entry<String,DocValuesFieldUpdates> e : mergingFieldUpdates.entrySet()) {
|
||||
String field = e.getKey();
|
||||
DocValuesFieldUpdates updates = e.getValue();
|
||||
mergingFields[idx] = field;
|
||||
dvFieldUpdates[idx] = mergedDVUpdates.getUpdates(field, updates.type);
|
||||
if (dvFieldUpdates[idx] == null) {
|
||||
dvFieldUpdates[idx] = mergedDVUpdates.newUpdates(field, updates.type, mergeState.segmentInfo.getDocCount());
|
||||
}
|
||||
updatesIters[idx] = updates.iterator();
|
||||
updatesIters[idx].nextDoc(); // advance to first update doc
|
||||
++idx;
|
||||
}
|
||||
|
@ -3279,7 +3330,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
// document isn't deleted, check if any of the fields have an update to it
|
||||
int newDoc = -1;
|
||||
for (int idx = 0; idx < mergingFields.length; idx++) {
|
||||
UpdatesIterator updatesIter = updatesIters[idx];
|
||||
DocValuesFieldUpdates.Iterator updatesIter = updatesIters[idx];
|
||||
if (updatesIter.doc() == j) { // document has an update
|
||||
if (mergedDeletesAndUpdates == null) {
|
||||
mergedDeletesAndUpdates = readerPool.get(merge.info, true);
|
||||
|
@ -3288,14 +3339,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
if (newDoc == -1) { // map once per all field updates, but only if there are any updates
|
||||
newDoc = docMap.map(docUpto);
|
||||
}
|
||||
String field = mergingFields[idx];
|
||||
NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field);
|
||||
if (fieldUpdates == null) {
|
||||
// an approximantion of maxDoc, used to compute best bitsPerValue
|
||||
fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount());
|
||||
mergedFieldUpdates.put(field, fieldUpdates);
|
||||
}
|
||||
fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value());
|
||||
DocValuesFieldUpdates dvUpdates = dvFieldUpdates[idx];
|
||||
dvUpdates.add(newDoc, updatesIter.value());
|
||||
updatesIter.nextDoc(); // advance to next document
|
||||
} else {
|
||||
assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j;
|
||||
|
@ -3312,7 +3357,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
// document isn't deleted, check if any of the fields have an update to it
|
||||
int newDoc = -1;
|
||||
for (int idx = 0; idx < mergingFields.length; idx++) {
|
||||
UpdatesIterator updatesIter = updatesIters[idx];
|
||||
DocValuesFieldUpdates.Iterator updatesIter = updatesIters[idx];
|
||||
if (updatesIter.doc() == j) { // document has an update
|
||||
if (mergedDeletesAndUpdates == null) {
|
||||
mergedDeletesAndUpdates = readerPool.get(merge.info, true);
|
||||
|
@ -3321,14 +3366,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
if (newDoc == -1) { // map once per all field updates, but only if there are any updates
|
||||
newDoc = docMap.map(docUpto);
|
||||
}
|
||||
String field = mergingFields[idx];
|
||||
NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field);
|
||||
if (fieldUpdates == null) {
|
||||
// an approximantion of maxDoc, used to compute best bitsPerValue
|
||||
fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount());
|
||||
mergedFieldUpdates.put(field, fieldUpdates);
|
||||
}
|
||||
fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value());
|
||||
DocValuesFieldUpdates dvUpdates = dvFieldUpdates[idx];
|
||||
dvUpdates.add(newDoc, updatesIter.value());
|
||||
updatesIter.nextDoc(); // advance to next document
|
||||
} else {
|
||||
assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j;
|
||||
|
@ -3367,7 +3406,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
// document isn't deleted, check if any of the fields have an update to it
|
||||
int newDoc = -1;
|
||||
for (int idx = 0; idx < mergingFields.length; idx++) {
|
||||
UpdatesIterator updatesIter = updatesIters[idx];
|
||||
DocValuesFieldUpdates.Iterator updatesIter = updatesIters[idx];
|
||||
if (updatesIter.doc() == j) { // document has an update
|
||||
if (mergedDeletesAndUpdates == null) {
|
||||
mergedDeletesAndUpdates = readerPool.get(merge.info, true);
|
||||
|
@ -3376,14 +3415,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
if (newDoc == -1) { // map once per all field updates, but only if there are any updates
|
||||
newDoc = docMap.map(docUpto);
|
||||
}
|
||||
String field = mergingFields[idx];
|
||||
NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field);
|
||||
if (fieldUpdates == null) {
|
||||
// an approximantion of maxDoc, used to compute best bitsPerValue
|
||||
fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount());
|
||||
mergedFieldUpdates.put(field, fieldUpdates);
|
||||
}
|
||||
fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value());
|
||||
DocValuesFieldUpdates dvUpdates = dvFieldUpdates[idx];
|
||||
dvUpdates.add(newDoc, updatesIter.value());
|
||||
updatesIter.nextDoc(); // advance to next document
|
||||
} else {
|
||||
assert updatesIter.doc() > j : "field=" + mergingFields[idx] + " updateDoc=" + updatesIter.doc() + " curDoc=" + j;
|
||||
|
@ -3397,7 +3430,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
for (int j = 0; j < docCount; j++) {
|
||||
int newDoc = -1;
|
||||
for (int idx = 0; idx < mergingFields.length; idx++) {
|
||||
UpdatesIterator updatesIter = updatesIters[idx];
|
||||
DocValuesFieldUpdates.Iterator updatesIter = updatesIters[idx];
|
||||
if (updatesIter.doc() == j) { // document has an update
|
||||
if (mergedDeletesAndUpdates == null) {
|
||||
mergedDeletesAndUpdates = readerPool.get(merge.info, true);
|
||||
|
@ -3406,14 +3439,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
if (newDoc == -1) { // map once per all field updates, but only if there are any updates
|
||||
newDoc = docMap.map(docUpto);
|
||||
}
|
||||
String field = mergingFields[idx];
|
||||
NumericFieldUpdates fieldUpdates = mergedFieldUpdates.get(field);
|
||||
if (fieldUpdates == null) {
|
||||
// an approximantion of maxDoc, used to compute best bitsPerValue
|
||||
fieldUpdates = new NumericFieldUpdates.PackedNumericFieldUpdates(mergeState.segmentInfo.getDocCount());
|
||||
mergedFieldUpdates.put(field, fieldUpdates);
|
||||
}
|
||||
fieldUpdates.add(newDoc, updatesIter.value() == null ? NumericUpdate.MISSING : updatesIter.value());
|
||||
DocValuesFieldUpdates dvUpdates = dvFieldUpdates[idx];
|
||||
dvUpdates.add(newDoc, updatesIter.value());
|
||||
updatesIter.nextDoc(); // advance to next document
|
||||
} else {
|
||||
assert updatesIter.doc() > j : "updateDoc=" + updatesIter.doc() + " curDoc=" + j;
|
||||
|
@ -3430,7 +3457,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
|
||||
assert docUpto == merge.info.info.getDocCount();
|
||||
|
||||
if (!mergedFieldUpdates.isEmpty()) {
|
||||
if (mergedDVUpdates.any()) {
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] IW.commitMergedDeletes: mergedDeletes.info=" + mergedDeletes.info + ", mergedFieldUpdates=" + mergedFieldUpdates);
|
||||
boolean success = false;
|
||||
try {
|
||||
|
@ -3440,7 +3467,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
// NOTE: currently this is the only place which throws a true
|
||||
// IOException. If this ever changes, we need to extend that try/finally
|
||||
// block to the rest of the method too.
|
||||
mergedDeletesAndUpdates.writeFieldUpdates(directory, mergedFieldUpdates);
|
||||
mergedDeletesAndUpdates.writeFieldUpdates(directory, mergedDVUpdates);
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
|
@ -3455,8 +3482,8 @@ public class IndexWriter implements Closeable, TwoPhaseCommit{
|
|||
infoStream.message("IW", "no new deletes or field updates since merge started");
|
||||
} else {
|
||||
String msg = mergedDeletesAndUpdates.getPendingDeleteCount() + " new deletes";
|
||||
if (!mergedFieldUpdates.isEmpty()) {
|
||||
msg += " and " + mergedFieldUpdates.size() + " new field updates";
|
||||
if (mergedDVUpdates.any()) {
|
||||
msg += " and " + mergedDVUpdates.size() + " new field updates";
|
||||
}
|
||||
msg += " since merge started";
|
||||
infoStream.message("IW", msg);
|
||||
|
|
|
@ -0,0 +1,201 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.index.DocValuesUpdate.NumericDocValuesUpdate;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
import org.apache.lucene.util.InPlaceMergeSorter;
|
||||
import org.apache.lucene.util.packed.PackedInts;
|
||||
import org.apache.lucene.util.packed.PagedGrowableWriter;
|
||||
import org.apache.lucene.util.packed.PagedMutable;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* A {@link DocValuesFieldUpdates} which holds updates of documents, of a single
|
||||
* {@link NumericDocValuesField}.
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
class NumericDocValuesFieldUpdates extends DocValuesFieldUpdates {
|
||||
|
||||
final static class Iterator extends DocValuesFieldUpdates.Iterator {
|
||||
private final int size;
|
||||
private final PagedGrowableWriter values;
|
||||
private final FixedBitSet docsWithField;
|
||||
private final PagedMutable docs;
|
||||
private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE
|
||||
private int doc = -1;
|
||||
private Long value = null;
|
||||
|
||||
Iterator(int size, PagedGrowableWriter values, FixedBitSet docsWithField, PagedMutable docs) {
|
||||
this.size = size;
|
||||
this.values = values;
|
||||
this.docsWithField = docsWithField;
|
||||
this.docs = docs;
|
||||
}
|
||||
|
||||
@Override
|
||||
Long value() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
int nextDoc() {
|
||||
if (idx >= size) {
|
||||
value = null;
|
||||
return doc = DocIdSetIterator.NO_MORE_DOCS;
|
||||
}
|
||||
doc = (int) docs.get(idx);
|
||||
++idx;
|
||||
while (idx < size && docs.get(idx) == doc) {
|
||||
++idx;
|
||||
}
|
||||
if (!docsWithField.get((int) (idx - 1))) {
|
||||
value = null;
|
||||
} else {
|
||||
// idx points to the "next" element
|
||||
value = Long.valueOf(values.get(idx - 1));
|
||||
}
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
int doc() {
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
void reset() {
|
||||
doc = -1;
|
||||
value = null;
|
||||
idx = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private FixedBitSet docsWithField;
|
||||
private PagedMutable docs;
|
||||
private PagedGrowableWriter values;
|
||||
private int size;
|
||||
|
||||
public NumericDocValuesFieldUpdates(String field, int maxDoc) {
|
||||
super(field, Type.NUMERIC);
|
||||
docsWithField = new FixedBitSet(64);
|
||||
docs = new PagedMutable(1, 1024, PackedInts.bitsRequired(maxDoc - 1), PackedInts.COMPACT);
|
||||
values = new PagedGrowableWriter(1, 1024, 1, PackedInts.FAST);
|
||||
size = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(int doc, Object value) {
|
||||
// TODO: if the Sorter interface changes to take long indexes, we can remove that limitation
|
||||
if (size == Integer.MAX_VALUE) {
|
||||
throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries");
|
||||
}
|
||||
|
||||
Long val = (Long) value;
|
||||
if (val == null) {
|
||||
val = NumericDocValuesUpdate.MISSING;
|
||||
}
|
||||
|
||||
// grow the structures to have room for more elements
|
||||
if (docs.size() == size) {
|
||||
docs = docs.grow(size + 1);
|
||||
values = values.grow(size + 1);
|
||||
docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size());
|
||||
}
|
||||
|
||||
if (val != NumericDocValuesUpdate.MISSING) {
|
||||
// only mark the document as having a value in that field if the value wasn't set to null (MISSING)
|
||||
docsWithField.set(size);
|
||||
}
|
||||
|
||||
docs.set(size, doc);
|
||||
values.set(size, val.longValue());
|
||||
++size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator iterator() {
|
||||
final PagedMutable docs = this.docs;
|
||||
final PagedGrowableWriter values = this.values;
|
||||
final FixedBitSet docsWithField = this.docsWithField;
|
||||
new InPlaceMergeSorter() {
|
||||
@Override
|
||||
protected void swap(int i, int j) {
|
||||
long tmpDoc = docs.get(j);
|
||||
docs.set(j, docs.get(i));
|
||||
docs.set(i, tmpDoc);
|
||||
|
||||
long tmpVal = values.get(j);
|
||||
values.set(j, values.get(i));
|
||||
values.set(i, tmpVal);
|
||||
|
||||
boolean tmpBool = docsWithField.get(j);
|
||||
if (docsWithField.get(i)) {
|
||||
docsWithField.set(j);
|
||||
} else {
|
||||
docsWithField.clear(j);
|
||||
}
|
||||
if (tmpBool) {
|
||||
docsWithField.set(i);
|
||||
} else {
|
||||
docsWithField.clear(i);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int compare(int i, int j) {
|
||||
int x = (int) docs.get(i);
|
||||
int y = (int) docs.get(j);
|
||||
return (x < y) ? -1 : ((x == y) ? 0 : 1);
|
||||
}
|
||||
}.sort(0, size);
|
||||
|
||||
return new Iterator(size, values, docsWithField, docs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(DocValuesFieldUpdates other) {
|
||||
assert other instanceof NumericDocValuesFieldUpdates;
|
||||
NumericDocValuesFieldUpdates otherUpdates = (NumericDocValuesFieldUpdates) other;
|
||||
if (size + otherUpdates.size > Integer.MAX_VALUE) {
|
||||
throw new IllegalStateException(
|
||||
"cannot support more than Integer.MAX_VALUE doc/value entries; size="
|
||||
+ size + " other.size=" + otherUpdates.size);
|
||||
}
|
||||
docs = docs.grow(size + otherUpdates.size);
|
||||
values = values.grow(size + otherUpdates.size);
|
||||
docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size());
|
||||
for (int i = 0; i < otherUpdates.size; i++) {
|
||||
int doc = (int) otherUpdates.docs.get(i);
|
||||
if (otherUpdates.docsWithField.get(i)) {
|
||||
docsWithField.set(size);
|
||||
}
|
||||
docs.set(size, doc);
|
||||
values.set(size, otherUpdates.values.get(i));
|
||||
++size;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean any() {
|
||||
return size > 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,249 +0,0 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
import org.apache.lucene.util.InPlaceMergeSorter;
|
||||
import org.apache.lucene.util.packed.PackedInts;
|
||||
import org.apache.lucene.util.packed.PagedGrowableWriter;
|
||||
import org.apache.lucene.util.packed.PagedMutable;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Holds numeric values updates of documents, of a single
|
||||
* {@link NumericDocValuesField}.
|
||||
*
|
||||
* @lucene.experimental
|
||||
*/
|
||||
interface NumericFieldUpdates {
|
||||
|
||||
/**
|
||||
* An iterator over documents and their updated values. Only documents with
|
||||
* updates are returned by this iterator, and the documents are returned in
|
||||
* increasing order.
|
||||
*/
|
||||
static abstract class UpdatesIterator {
|
||||
|
||||
/**
|
||||
* Returns the next document which has an update, or
|
||||
* {@link DocIdSetIterator#NO_MORE_DOCS} if there are no more documents to
|
||||
* return.
|
||||
*/
|
||||
abstract int nextDoc();
|
||||
|
||||
/** Returns the current document this iterator is on. */
|
||||
abstract int doc();
|
||||
|
||||
/**
|
||||
* Returns the value of the document returned from {@link #nextDoc()}. A
|
||||
* {@code null} value means that it was unset for this document.
|
||||
*/
|
||||
abstract Long value();
|
||||
|
||||
/**
|
||||
* Reset the iterator's state. Should be called before {@link #nextDoc()}
|
||||
* and {@link #value()}.
|
||||
*/
|
||||
abstract void reset();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link NumericFieldUpdates} which holds the updated documents and values
|
||||
* in packed structures. Only supports up to 2B entries (docs and values)
|
||||
* since we need to sort the docs/values and the Sorter interfaces currently
|
||||
* only take integer indexes.
|
||||
*/
|
||||
static final class PackedNumericFieldUpdates implements NumericFieldUpdates {
|
||||
|
||||
private FixedBitSet docsWithField;
|
||||
private PagedMutable docs;
|
||||
private PagedGrowableWriter values;
|
||||
private int size;
|
||||
|
||||
public PackedNumericFieldUpdates(int maxDoc) {
|
||||
docsWithField = new FixedBitSet(64);
|
||||
docs = new PagedMutable(1, 1024, PackedInts.bitsRequired(maxDoc - 1), PackedInts.COMPACT);
|
||||
values = new PagedGrowableWriter(1, 1024, 1, PackedInts.FAST);
|
||||
size = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void add(int doc, Long value) {
|
||||
assert value != null;
|
||||
// TODO: if the Sorter interface changes to take long indexes, we can remove that limitation
|
||||
if (size == Integer.MAX_VALUE) {
|
||||
throw new IllegalStateException("cannot support more than Integer.MAX_VALUE doc/value entries");
|
||||
}
|
||||
|
||||
// grow the structures to have room for more elements
|
||||
if (docs.size() == size) {
|
||||
docs = docs.grow(size + 1);
|
||||
values = values.grow(size + 1);
|
||||
docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size());
|
||||
}
|
||||
|
||||
if (value != NumericUpdate.MISSING) {
|
||||
// only mark the document as having a value in that field if the value wasn't set to null (MISSING)
|
||||
docsWithField.set(size);
|
||||
}
|
||||
|
||||
docs.set(size, doc);
|
||||
values.set(size, value.longValue());
|
||||
++size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdatesIterator getUpdates() {
|
||||
final PagedMutable docs = this.docs;
|
||||
final PagedGrowableWriter values = this.values;
|
||||
final FixedBitSet docsWithField = this.docsWithField;
|
||||
new InPlaceMergeSorter() {
|
||||
@Override
|
||||
protected void swap(int i, int j) {
|
||||
long tmpDoc = docs.get(j);
|
||||
docs.set(j, docs.get(i));
|
||||
docs.set(i, tmpDoc);
|
||||
|
||||
long tmpVal = values.get(j);
|
||||
values.set(j, values.get(i));
|
||||
values.set(i, tmpVal);
|
||||
|
||||
boolean tmpBool = docsWithField.get(j);
|
||||
if (docsWithField.get(i)) {
|
||||
docsWithField.set(j);
|
||||
} else {
|
||||
docsWithField.clear(j);
|
||||
}
|
||||
if (tmpBool) {
|
||||
docsWithField.set(i);
|
||||
} else {
|
||||
docsWithField.clear(i);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int compare(int i, int j) {
|
||||
int x = (int) docs.get(i);
|
||||
int y = (int) docs.get(j);
|
||||
return (x < y) ? -1 : ((x == y) ? 0 : 1);
|
||||
}
|
||||
}.sort(0, size);
|
||||
|
||||
final int size = this.size;
|
||||
return new UpdatesIterator() {
|
||||
private long idx = 0; // long so we don't overflow if size == Integer.MAX_VALUE
|
||||
private int doc = -1;
|
||||
private Long value = null;
|
||||
|
||||
@Override
|
||||
Long value() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
int nextDoc() {
|
||||
if (idx >= size) {
|
||||
value = null;
|
||||
return doc = DocIdSetIterator.NO_MORE_DOCS;
|
||||
}
|
||||
doc = (int) docs.get(idx);
|
||||
++idx;
|
||||
while (idx < size && docs.get(idx) == doc) {
|
||||
++idx;
|
||||
}
|
||||
if (!docsWithField.get((int) (idx - 1))) {
|
||||
value = null;
|
||||
} else {
|
||||
// idx points to the "next" element
|
||||
value = Long.valueOf(values.get(idx - 1));
|
||||
}
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
int doc() {
|
||||
return doc;
|
||||
}
|
||||
|
||||
@Override
|
||||
void reset() {
|
||||
doc = -1;
|
||||
value = null;
|
||||
idx = 0;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(NumericFieldUpdates other) {
|
||||
if (other instanceof PackedNumericFieldUpdates) {
|
||||
PackedNumericFieldUpdates packedOther = (PackedNumericFieldUpdates) other;
|
||||
if (size + packedOther.size > Integer.MAX_VALUE) {
|
||||
throw new IllegalStateException(
|
||||
"cannot support more than Integer.MAX_VALUE doc/value entries; size="
|
||||
+ size + " other.size=" + packedOther.size);
|
||||
}
|
||||
docs = docs.grow(size + packedOther.size);
|
||||
values = values.grow(size + packedOther.size);
|
||||
docsWithField = FixedBitSet.ensureCapacity(docsWithField, (int) docs.size());
|
||||
for (int i = 0; i < packedOther.size; i++) {
|
||||
int doc = (int) packedOther.docs.get(i);
|
||||
if (packedOther.docsWithField.get(i)) {
|
||||
docsWithField.set(size);
|
||||
}
|
||||
docs.set(size, doc);
|
||||
values.set(size, packedOther.values.get(i));
|
||||
++size;
|
||||
}
|
||||
} else {
|
||||
UpdatesIterator iter = other.getUpdates();
|
||||
int doc;
|
||||
while ((doc = iter.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
Long value = iter.value();
|
||||
if (value == null) {
|
||||
value = NumericUpdate.MISSING;
|
||||
}
|
||||
add(doc, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an update to a document. For unsetting a value you should pass
|
||||
* {@link NumericUpdate#MISSING} instead of {@code null}.
|
||||
*/
|
||||
public void add(int doc, Long value);
|
||||
|
||||
/**
|
||||
* Returns an {@link UpdatesIterator} over the updated documents and their
|
||||
* values.
|
||||
*/
|
||||
public UpdatesIterator getUpdates();
|
||||
|
||||
/**
|
||||
* Merge with another {@link NumericFieldUpdates}. This is called for a
|
||||
* segment which received updates while it was being merged. The given updates
|
||||
* should override whatever numeric updates are in that instance.
|
||||
*/
|
||||
public void merge(NumericFieldUpdates other);
|
||||
|
||||
}
|
|
@ -1,68 +0,0 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import static org.apache.lucene.util.RamUsageEstimator.*;
|
||||
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/** An in-place update to a numeric docvalues field */
|
||||
final class NumericUpdate {
|
||||
|
||||
/* Rough logic: OBJ_HEADER + 3*PTR + INT
|
||||
* Term: OBJ_HEADER + 2*PTR
|
||||
* Term.field: 2*OBJ_HEADER + 4*INT + PTR + string.length*CHAR
|
||||
* Term.bytes: 2*OBJ_HEADER + 2*INT + PTR + bytes.length
|
||||
* String: 2*OBJ_HEADER + 4*INT + PTR + string.length*CHAR
|
||||
* Long: OBJ_HEADER + LONG
|
||||
*/
|
||||
private static final int RAW_SIZE_IN_BYTES = 9*NUM_BYTES_OBJECT_HEADER + 8*NUM_BYTES_OBJECT_REF + 8*NUM_BYTES_INT + NUM_BYTES_LONG;
|
||||
|
||||
static final Long MISSING = new Long(0);
|
||||
|
||||
Term term;
|
||||
String field;
|
||||
Long value;
|
||||
int docIDUpto = -1; // unassigned until applied, and confusing that it's here, when it's just used in BufferedDeletes...
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param term the {@link Term} which determines the documents that will be updated
|
||||
* @param field the {@link NumericDocValuesField} to update
|
||||
* @param value the updated value
|
||||
*/
|
||||
NumericUpdate(Term term, String field, Long value) {
|
||||
this.term = term;
|
||||
this.field = field;
|
||||
this.value = value == null ? MISSING : value;
|
||||
}
|
||||
|
||||
int sizeInBytes() {
|
||||
int sizeInBytes = RAW_SIZE_IN_BYTES;
|
||||
sizeInBytes += term.field.length() * NUM_BYTES_CHAR;
|
||||
sizeInBytes += term.bytes.bytes.length;
|
||||
sizeInBytes += field.length() * NUM_BYTES_CHAR;
|
||||
return sizeInBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "term=" + term + ",field=" + field + ",value=" + value;
|
||||
}
|
||||
}
|
|
@ -30,12 +30,13 @@ import org.apache.lucene.codecs.Codec;
|
|||
import org.apache.lucene.codecs.DocValuesConsumer;
|
||||
import org.apache.lucene.codecs.DocValuesFormat;
|
||||
import org.apache.lucene.codecs.LiveDocsFormat;
|
||||
import org.apache.lucene.document.BinaryDocValuesField;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.index.NumericFieldUpdates.UpdatesIterator;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.TrackingDirectoryWrapper;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.MutableBits;
|
||||
|
||||
|
@ -78,7 +79,7 @@ class ReadersAndUpdates {
|
|||
// updates on the merged segment too.
|
||||
private boolean isMerging = false;
|
||||
|
||||
private final Map<String,NumericFieldUpdates> mergingNumericUpdates = new HashMap<>();
|
||||
private final Map<String,DocValuesFieldUpdates> mergingDVUpdates = new HashMap<>();
|
||||
|
||||
public ReadersAndUpdates(IndexWriter writer, SegmentCommitInfo info) {
|
||||
this.info = info;
|
||||
|
@ -294,11 +295,11 @@ class ReadersAndUpdates {
|
|||
}
|
||||
|
||||
// Writes field updates (new _X_N updates files) to the directory
|
||||
public synchronized void writeFieldUpdates(Directory dir, Map<String,NumericFieldUpdates> numericFieldUpdates) throws IOException {
|
||||
public synchronized void writeFieldUpdates(Directory dir, DocValuesFieldUpdates.Container dvUpdates) throws IOException {
|
||||
assert Thread.holdsLock(writer);
|
||||
//System.out.println("rld.writeFieldUpdates: seg=" + info + " numericFieldUpdates=" + numericFieldUpdates);
|
||||
|
||||
assert numericFieldUpdates != null && !numericFieldUpdates.isEmpty();
|
||||
assert dvUpdates.any();
|
||||
|
||||
// Do this so we can delete any created files on
|
||||
// exception; this saves all codecs from having to do
|
||||
|
@ -330,9 +331,13 @@ class ReadersAndUpdates {
|
|||
clone.setDocValuesGen(fi.getDocValuesGen());
|
||||
}
|
||||
// create new fields or update existing ones to have NumericDV type
|
||||
for (String f : numericFieldUpdates.keySet()) {
|
||||
for (String f : dvUpdates.numericDVUpdates.keySet()) {
|
||||
builder.addOrUpdate(f, NumericDocValuesField.TYPE);
|
||||
}
|
||||
// create new fields or update existing ones to have BinaryDV type
|
||||
for (String f : dvUpdates.binaryDVUpdates.keySet()) {
|
||||
builder.addOrUpdate(f, BinaryDocValuesField.TYPE);
|
||||
}
|
||||
|
||||
fieldInfos = builder.finish();
|
||||
final long nextFieldInfosGen = info.getNextFieldInfosGen();
|
||||
|
@ -342,10 +347,10 @@ class ReadersAndUpdates {
|
|||
final DocValuesConsumer fieldsConsumer = docValuesFormat.fieldsConsumer(state);
|
||||
boolean fieldsConsumerSuccess = false;
|
||||
try {
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeLiveDocs: applying updates; seg=" + info + " updates=" + numericUpdates);
|
||||
for (Entry<String,NumericFieldUpdates> e : numericFieldUpdates.entrySet()) {
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] RLD.writeFieldUpdates: applying numeric updates; seg=" + info + " updates=" + numericFieldUpdates);
|
||||
for (Entry<String,NumericDocValuesFieldUpdates> e : dvUpdates.numericDVUpdates.entrySet()) {
|
||||
final String field = e.getKey();
|
||||
final NumericFieldUpdates fieldUpdates = e.getValue();
|
||||
final NumericDocValuesFieldUpdates fieldUpdates = e.getValue();
|
||||
final FieldInfo fieldInfo = fieldInfos.fieldInfo(field);
|
||||
assert fieldInfo != null;
|
||||
|
||||
|
@ -355,7 +360,7 @@ class ReadersAndUpdates {
|
|||
final NumericDocValues currentValues = reader.getNumericDocValues(field);
|
||||
final Bits docsWithField = reader.getDocsWithField(field);
|
||||
final int maxDoc = reader.maxDoc();
|
||||
final UpdatesIterator updatesIter = fieldUpdates.getUpdates();
|
||||
final NumericDocValuesFieldUpdates.Iterator updatesIter = fieldUpdates.iterator();
|
||||
@Override
|
||||
public Iterator<Number> iterator() {
|
||||
updatesIter.reset();
|
||||
|
@ -398,7 +403,68 @@ class ReadersAndUpdates {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] RAU.writeFieldUpdates: applying binary updates; seg=" + info + " updates=" + dvUpdates.binaryDVUpdates);
|
||||
for (Entry<String,BinaryDocValuesFieldUpdates> e : dvUpdates.binaryDVUpdates.entrySet()) {
|
||||
final String field = e.getKey();
|
||||
final BinaryDocValuesFieldUpdates dvFieldUpdates = e.getValue();
|
||||
final FieldInfo fieldInfo = fieldInfos.fieldInfo(field);
|
||||
assert fieldInfo != null;
|
||||
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] RAU.writeFieldUpdates: applying binary updates; seg=" + info + " f=" + dvFieldUpdates + ", updates=" + dvFieldUpdates);
|
||||
|
||||
fieldInfo.setDocValuesGen(nextFieldInfosGen);
|
||||
// write the numeric updates to a new gen'd docvalues file
|
||||
fieldsConsumer.addBinaryField(fieldInfo, new Iterable<BytesRef>() {
|
||||
final BinaryDocValues currentValues = reader.getBinaryDocValues(field);
|
||||
final Bits docsWithField = reader.getDocsWithField(field);
|
||||
final int maxDoc = reader.maxDoc();
|
||||
final BinaryDocValuesFieldUpdates.Iterator updatesIter = dvFieldUpdates.iterator();
|
||||
@Override
|
||||
public Iterator<BytesRef> iterator() {
|
||||
updatesIter.reset();
|
||||
return new Iterator<BytesRef>() {
|
||||
|
||||
int curDoc = -1;
|
||||
int updateDoc = updatesIter.nextDoc();
|
||||
BytesRef scratch = new BytesRef();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return curDoc < maxDoc - 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BytesRef next() {
|
||||
if (++curDoc >= maxDoc) {
|
||||
throw new NoSuchElementException("no more documents to return values for");
|
||||
}
|
||||
if (curDoc == updateDoc) { // this document has an updated value
|
||||
BytesRef value = updatesIter.value(); // either null (unset value) or updated value
|
||||
updateDoc = updatesIter.nextDoc(); // prepare for next round
|
||||
return value;
|
||||
} else {
|
||||
// no update for this document
|
||||
assert curDoc < updateDoc;
|
||||
if (currentValues != null && docsWithField.get(curDoc)) {
|
||||
// only read the current value if the document had a value before
|
||||
currentValues.get(curDoc, scratch);
|
||||
return scratch;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("this iterator does not support removing elements");
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
codec.fieldInfosFormat().getFieldInfosWriter().write(trackingDir, info.info.name, segmentSuffix, fieldInfos, IOContext.DEFAULT);
|
||||
fieldsConsumerSuccess = true;
|
||||
} finally {
|
||||
|
@ -436,12 +502,20 @@ class ReadersAndUpdates {
|
|||
info.advanceFieldInfosGen();
|
||||
// copy all the updates to mergingUpdates, so they can later be applied to the merged segment
|
||||
if (isMerging) {
|
||||
for (Entry<String,NumericFieldUpdates> e : numericFieldUpdates.entrySet()) {
|
||||
NumericFieldUpdates fieldUpdates = mergingNumericUpdates.get(e.getKey());
|
||||
if (fieldUpdates == null) {
|
||||
mergingNumericUpdates.put(e.getKey(), e.getValue());
|
||||
for (Entry<String,NumericDocValuesFieldUpdates> e : dvUpdates.numericDVUpdates.entrySet()) {
|
||||
DocValuesFieldUpdates updates = mergingDVUpdates.get(e.getKey());
|
||||
if (updates == null) {
|
||||
mergingDVUpdates.put(e.getKey(), e.getValue());
|
||||
} else {
|
||||
fieldUpdates.merge(e.getValue());
|
||||
updates.merge(e.getValue());
|
||||
}
|
||||
}
|
||||
for (Entry<String,BinaryDocValuesFieldUpdates> e : dvUpdates.binaryDVUpdates.entrySet()) {
|
||||
DocValuesFieldUpdates updates = mergingDVUpdates.get(e.getKey());
|
||||
if (updates == null) {
|
||||
mergingDVUpdates.put(e.getKey(), e.getValue());
|
||||
} else {
|
||||
updates.merge(e.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -502,13 +576,13 @@ class ReadersAndUpdates {
|
|||
* finished merging (whether successfully or not).
|
||||
*/
|
||||
public synchronized void dropMergingUpdates() {
|
||||
mergingNumericUpdates.clear();
|
||||
mergingDVUpdates.clear();
|
||||
isMerging = false;
|
||||
}
|
||||
|
||||
/** Returns updates that came in while this segment was merging. */
|
||||
public synchronized Map<String,NumericFieldUpdates> getMergingFieldUpdates() {
|
||||
return mergingNumericUpdates;
|
||||
public synchronized Map<String,DocValuesFieldUpdates> getMergingFieldUpdates() {
|
||||
return mergingDVUpdates;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1778,6 +1778,8 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
|
|||
doc.add(new StringField("id", ""+(docBase+i), Field.Store.NO));
|
||||
doc.add(new NumericDocValuesField("f", 1L));
|
||||
doc.add(new NumericDocValuesField("cf", 2L));
|
||||
doc.add(new BinaryDocValuesField("bf", TestBinaryDocValuesUpdates.toBytes(1L)));
|
||||
doc.add(new BinaryDocValuesField("bcf", TestBinaryDocValuesUpdates.toBytes(2L)));
|
||||
w.addDocument(doc);
|
||||
}
|
||||
docCount += numDocs;
|
||||
|
@ -1802,8 +1804,18 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
|
|||
if (VERBOSE) {
|
||||
System.out.println(" update id=" + (docBase+i) + " to value " + value);
|
||||
}
|
||||
w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "f", value);
|
||||
w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "cf", value * 2);
|
||||
if (random().nextBoolean()) { // update only numeric field
|
||||
w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "f", value);
|
||||
w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "cf", value * 2);
|
||||
} else if (random().nextBoolean()) {
|
||||
w.updateBinaryDocValue(new Term("id", Integer.toString(docBase + i)), "bf", TestBinaryDocValuesUpdates.toBytes(value));
|
||||
w.updateBinaryDocValue(new Term("id", Integer.toString(docBase + i)), "bcf", TestBinaryDocValuesUpdates.toBytes(value * 2));
|
||||
} else {
|
||||
w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "f", value);
|
||||
w.updateNumericDocValue(new Term("id", Integer.toString(docBase + i)), "cf", value * 2);
|
||||
w.updateBinaryDocValue(new Term("id", Integer.toString(docBase + i)), "bf", TestBinaryDocValuesUpdates.toBytes(value));
|
||||
w.updateBinaryDocValue(new Term("id", Integer.toString(docBase + i)), "bcf", TestBinaryDocValuesUpdates.toBytes(value * 2));
|
||||
}
|
||||
}
|
||||
|
||||
// sometimes do both deletes and updates
|
||||
|
@ -1877,13 +1889,18 @@ public class TestIndexWriterExceptions extends LuceneTestCase {
|
|||
r = w.getReader();
|
||||
}
|
||||
assertEquals(docCount-deleteCount, r.numDocs());
|
||||
BytesRef scratch = new BytesRef();
|
||||
for (AtomicReaderContext context : r.leaves()) {
|
||||
Bits liveDocs = context.reader().getLiveDocs();
|
||||
NumericDocValues f = context.reader().getNumericDocValues("f");
|
||||
NumericDocValues cf = context.reader().getNumericDocValues("cf");
|
||||
for (int i = 0; i < context.reader().maxDoc(); i++) {
|
||||
AtomicReader reader = context.reader();
|
||||
Bits liveDocs = reader.getLiveDocs();
|
||||
NumericDocValues f = reader.getNumericDocValues("f");
|
||||
NumericDocValues cf = reader.getNumericDocValues("cf");
|
||||
BinaryDocValues bf = reader.getBinaryDocValues("bf");
|
||||
BinaryDocValues bcf = reader.getBinaryDocValues("bcf");
|
||||
for (int i = 0; i < reader.maxDoc(); i++) {
|
||||
if (liveDocs == null || liveDocs.get(i)) {
|
||||
assertEquals("doc=" + (docBase + i), cf.get(i), f.get(i) * 2);
|
||||
assertEquals("doc=" + (docBase + i), TestBinaryDocValuesUpdates.getValue(bcf, i, scratch), TestBinaryDocValuesUpdates.getValue(bf, i, scratch) * 2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,433 @@
|
|||
package org.apache.lucene.index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.lucene.analysis.MockAnalyzer;
|
||||
import org.apache.lucene.document.BinaryDocValuesField;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field.Store;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
@SuppressCodecs({"Lucene40","Lucene41","Lucene42","Lucene45"})
|
||||
public class TestMixedDocValuesUpdates extends LuceneTestCase {
|
||||
|
||||
public void testManyReopensAndFields() throws Exception {
|
||||
Directory dir = newDirectory();
|
||||
final Random random = random();
|
||||
IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
|
||||
LogMergePolicy lmp = newLogMergePolicy();
|
||||
lmp.setMergeFactor(3); // merge often
|
||||
conf.setMergePolicy(lmp);
|
||||
IndexWriter writer = new IndexWriter(dir, conf);
|
||||
|
||||
final boolean isNRT = random.nextBoolean();
|
||||
DirectoryReader reader;
|
||||
if (isNRT) {
|
||||
reader = DirectoryReader.open(writer, true);
|
||||
} else {
|
||||
writer.commit();
|
||||
reader = DirectoryReader.open(dir);
|
||||
}
|
||||
|
||||
final int numFields = random.nextInt(4) + 3; // 3-7
|
||||
final int numNDVFields = random.nextInt(numFields/2) + 1; // 1-3
|
||||
final long[] fieldValues = new long[numFields];
|
||||
final boolean[] fieldHasValue = new boolean[numFields];
|
||||
Arrays.fill(fieldHasValue, true);
|
||||
for (int i = 0; i < fieldValues.length; i++) {
|
||||
fieldValues[i] = 1;
|
||||
}
|
||||
|
||||
int numRounds = atLeast(15);
|
||||
int docID = 0;
|
||||
for (int i = 0; i < numRounds; i++) {
|
||||
int numDocs = atLeast(5);
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "]: round=" + i + ", numDocs=" + numDocs);
|
||||
for (int j = 0; j < numDocs; j++) {
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", "doc-" + docID, Store.NO));
|
||||
doc.add(new StringField("key", "all", Store.NO)); // update key
|
||||
// add all fields with their current value
|
||||
for (int f = 0; f < fieldValues.length; f++) {
|
||||
if (f < numNDVFields) {
|
||||
doc.add(new NumericDocValuesField("f" + f, fieldValues[f]));
|
||||
} else {
|
||||
doc.add(new BinaryDocValuesField("f" + f, TestBinaryDocValuesUpdates.toBytes(fieldValues[f])));
|
||||
}
|
||||
}
|
||||
writer.addDocument(doc);
|
||||
++docID;
|
||||
}
|
||||
|
||||
// if field's value was unset before, unset it from all new added documents too
|
||||
for (int field = 0; field < fieldHasValue.length; field++) {
|
||||
if (!fieldHasValue[field]) {
|
||||
if (field < numNDVFields) {
|
||||
writer.updateNumericDocValue(new Term("key", "all"), "f" + field, null);
|
||||
} else {
|
||||
writer.updateBinaryDocValue(new Term("key", "all"), "f" + field, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int fieldIdx = random.nextInt(fieldValues.length);
|
||||
String updateField = "f" + fieldIdx;
|
||||
if (random.nextBoolean()) {
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "]: unset field '" + updateField + "'");
|
||||
fieldHasValue[fieldIdx] = false;
|
||||
if (fieldIdx < numNDVFields) {
|
||||
writer.updateNumericDocValue(new Term("key", "all"), updateField, null);
|
||||
} else {
|
||||
writer.updateBinaryDocValue(new Term("key", "all"), updateField, null);
|
||||
}
|
||||
} else {
|
||||
fieldHasValue[fieldIdx] = true;
|
||||
if (fieldIdx < numNDVFields) {
|
||||
writer.updateNumericDocValue(new Term("key", "all"), updateField, ++fieldValues[fieldIdx]);
|
||||
} else {
|
||||
writer.updateBinaryDocValue(new Term("key", "all"), updateField, TestBinaryDocValuesUpdates.toBytes(++fieldValues[fieldIdx]));
|
||||
}
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "]: updated field '" + updateField + "' to value " + fieldValues[fieldIdx]);
|
||||
}
|
||||
|
||||
if (random.nextDouble() < 0.2) {
|
||||
int deleteDoc = random.nextInt(docID); // might also delete an already deleted document, ok!
|
||||
writer.deleteDocuments(new Term("id", "doc-" + deleteDoc));
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "]: deleted document: doc-" + deleteDoc);
|
||||
}
|
||||
|
||||
// verify reader
|
||||
if (!isNRT) {
|
||||
writer.commit();
|
||||
}
|
||||
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "]: reopen reader: " + reader);
|
||||
DirectoryReader newReader = DirectoryReader.openIfChanged(reader);
|
||||
assertNotNull(newReader);
|
||||
reader.close();
|
||||
reader = newReader;
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "]: reopened reader: " + reader);
|
||||
assertTrue(reader.numDocs() > 0); // we delete at most one document per round
|
||||
BytesRef scratch = new BytesRef();
|
||||
for (AtomicReaderContext context : reader.leaves()) {
|
||||
AtomicReader r = context.reader();
|
||||
// System.out.println(((SegmentReader) r).getSegmentName());
|
||||
Bits liveDocs = r.getLiveDocs();
|
||||
for (int field = 0; field < fieldValues.length; field++) {
|
||||
String f = "f" + field;
|
||||
BinaryDocValues bdv = r.getBinaryDocValues(f);
|
||||
NumericDocValues ndv = r.getNumericDocValues(f);
|
||||
Bits docsWithField = r.getDocsWithField(f);
|
||||
if (field < numNDVFields) {
|
||||
assertNotNull(ndv);
|
||||
assertNull(bdv);
|
||||
} else {
|
||||
assertNull(ndv);
|
||||
assertNotNull(bdv);
|
||||
}
|
||||
int maxDoc = r.maxDoc();
|
||||
for (int doc = 0; doc < maxDoc; doc++) {
|
||||
if (liveDocs == null || liveDocs.get(doc)) {
|
||||
// System.out.println("doc=" + (doc + context.docBase) + " f='" + f + "' vslue=" + getValue(bdv, doc, scratch));
|
||||
if (fieldHasValue[field]) {
|
||||
assertTrue(docsWithField.get(doc));
|
||||
if (field < numNDVFields) {
|
||||
assertEquals("invalid value for doc=" + doc + ", field=" + f + ", reader=" + r, fieldValues[field], ndv.get(doc));
|
||||
} else {
|
||||
assertEquals("invalid value for doc=" + doc + ", field=" + f + ", reader=" + r, fieldValues[field], TestBinaryDocValuesUpdates.getValue(bdv, doc, scratch));
|
||||
}
|
||||
} else {
|
||||
assertFalse(docsWithField.get(doc));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// System.out.println();
|
||||
}
|
||||
|
||||
IOUtils.close(writer, reader, dir);
|
||||
}
|
||||
|
||||
public void testStressMultiThreading() throws Exception {
|
||||
final Directory dir = newDirectory();
|
||||
IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
|
||||
final IndexWriter writer = new IndexWriter(dir, conf);
|
||||
|
||||
// create index
|
||||
final int numThreads = TestUtil.nextInt(random(), 3, 6);
|
||||
final int numDocs = atLeast(2000);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", "doc" + i, Store.NO));
|
||||
double group = random().nextDouble();
|
||||
String g;
|
||||
if (group < 0.1) g = "g0";
|
||||
else if (group < 0.5) g = "g1";
|
||||
else if (group < 0.8) g = "g2";
|
||||
else g = "g3";
|
||||
doc.add(new StringField("updKey", g, Store.NO));
|
||||
for (int j = 0; j < numThreads; j++) {
|
||||
long value = random().nextInt();
|
||||
doc.add(new BinaryDocValuesField("f" + j, TestBinaryDocValuesUpdates.toBytes(value)));
|
||||
doc.add(new NumericDocValuesField("cf" + j, value * 2)); // control, always updated to f * 2
|
||||
}
|
||||
writer.addDocument(doc);
|
||||
}
|
||||
|
||||
final CountDownLatch done = new CountDownLatch(numThreads);
|
||||
final AtomicInteger numUpdates = new AtomicInteger(atLeast(100));
|
||||
|
||||
// same thread updates a field as well as reopens
|
||||
Thread[] threads = new Thread[numThreads];
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
final String f = "f" + i;
|
||||
final String cf = "cf" + i;
|
||||
threads[i] = new Thread("UpdateThread-" + i) {
|
||||
@Override
|
||||
public void run() {
|
||||
DirectoryReader reader = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
Random random = random();
|
||||
while (numUpdates.getAndDecrement() > 0) {
|
||||
double group = random.nextDouble();
|
||||
Term t;
|
||||
if (group < 0.1) t = new Term("updKey", "g0");
|
||||
else if (group < 0.5) t = new Term("updKey", "g1");
|
||||
else if (group < 0.8) t = new Term("updKey", "g2");
|
||||
else t = new Term("updKey", "g3");
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] numUpdates=" + numUpdates + " updateTerm=" + t);
|
||||
if (random.nextBoolean()) { // sometimes unset a value
|
||||
// System.err.println("[" + Thread.currentThread().getName() + "] t=" + t + ", f=" + f + ", updValue=UNSET");
|
||||
writer.updateBinaryDocValue(t, f, null);
|
||||
writer.updateNumericDocValue(t, cf, null);
|
||||
} else {
|
||||
long updValue = random.nextInt();
|
||||
// System.err.println("[" + Thread.currentThread().getName() + "] t=" + t + ", f=" + f + ", updValue=" + updValue);
|
||||
writer.updateBinaryDocValue(t, f, TestBinaryDocValuesUpdates.toBytes(updValue));
|
||||
writer.updateNumericDocValue(t, cf, updValue * 2);
|
||||
}
|
||||
|
||||
if (random.nextDouble() < 0.2) {
|
||||
// delete a random document
|
||||
int doc = random.nextInt(numDocs);
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] deleteDoc=doc" + doc);
|
||||
writer.deleteDocuments(new Term("id", "doc" + doc));
|
||||
}
|
||||
|
||||
if (random.nextDouble() < 0.05) { // commit every 20 updates on average
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] commit");
|
||||
writer.commit();
|
||||
}
|
||||
|
||||
if (random.nextDouble() < 0.1) { // reopen NRT reader (apply updates), on average once every 10 updates
|
||||
if (reader == null) {
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] open NRT");
|
||||
reader = DirectoryReader.open(writer, true);
|
||||
} else {
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] reopen NRT");
|
||||
DirectoryReader r2 = DirectoryReader.openIfChanged(reader, writer, true);
|
||||
if (r2 != null) {
|
||||
reader.close();
|
||||
reader = r2;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// System.out.println("[" + Thread.currentThread().getName() + "] DONE");
|
||||
success = true;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
try {
|
||||
reader.close();
|
||||
} catch (IOException e) {
|
||||
if (success) { // suppress this exception only if there was another exception
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
done.countDown();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
for (Thread t : threads) t.start();
|
||||
done.await();
|
||||
writer.close();
|
||||
|
||||
DirectoryReader reader = DirectoryReader.open(dir);
|
||||
BytesRef scratch = new BytesRef();
|
||||
for (AtomicReaderContext context : reader.leaves()) {
|
||||
AtomicReader r = context.reader();
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
BinaryDocValues bdv = r.getBinaryDocValues("f" + i);
|
||||
NumericDocValues control = r.getNumericDocValues("cf" + i);
|
||||
Bits docsWithBdv = r.getDocsWithField("f" + i);
|
||||
Bits docsWithControl = r.getDocsWithField("cf" + i);
|
||||
Bits liveDocs = r.getLiveDocs();
|
||||
for (int j = 0; j < r.maxDoc(); j++) {
|
||||
if (liveDocs == null || liveDocs.get(j)) {
|
||||
assertEquals(docsWithBdv.get(j), docsWithControl.get(j));
|
||||
if (docsWithBdv.get(j)) {
|
||||
long ctrlValue = control.get(j);
|
||||
long bdvValue = TestBinaryDocValuesUpdates.getValue(bdv, j, scratch) * 2;
|
||||
// if (ctrlValue != bdvValue) {
|
||||
// System.out.println("seg=" + r + ", f=f" + i + ", doc=" + j + ", group=" + r.document(j).get("updKey") + ", ctrlValue=" + ctrlValue + ", bdvBytes=" + scratch);
|
||||
// }
|
||||
assertEquals(ctrlValue, bdvValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
reader.close();
|
||||
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testUpdateDifferentDocsInDifferentGens() throws Exception {
|
||||
// update same document multiple times across generations
|
||||
Directory dir = newDirectory();
|
||||
IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random()));
|
||||
conf.setMaxBufferedDocs(4);
|
||||
IndexWriter writer = new IndexWriter(dir, conf);
|
||||
final int numDocs = atLeast(10);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("id", "doc" + i, Store.NO));
|
||||
long value = random().nextInt();
|
||||
doc.add(new BinaryDocValuesField("f", TestBinaryDocValuesUpdates.toBytes(value)));
|
||||
doc.add(new NumericDocValuesField("cf", value * 2));
|
||||
writer.addDocument(doc);
|
||||
}
|
||||
|
||||
int numGens = atLeast(5);
|
||||
BytesRef scratch = new BytesRef();
|
||||
for (int i = 0; i < numGens; i++) {
|
||||
int doc = random().nextInt(numDocs);
|
||||
Term t = new Term("id", "doc" + doc);
|
||||
long value = random().nextLong();
|
||||
writer.updateBinaryDocValue(t, "f", TestBinaryDocValuesUpdates.toBytes(value));
|
||||
writer.updateNumericDocValue(t, "cf", value * 2);
|
||||
DirectoryReader reader = DirectoryReader.open(writer, true);
|
||||
for (AtomicReaderContext context : reader.leaves()) {
|
||||
AtomicReader r = context.reader();
|
||||
BinaryDocValues fbdv = r.getBinaryDocValues("f");
|
||||
NumericDocValues cfndv = r.getNumericDocValues("cf");
|
||||
for (int j = 0; j < r.maxDoc(); j++) {
|
||||
assertEquals(cfndv.get(j), TestBinaryDocValuesUpdates.getValue(fbdv, j, scratch) * 2);
|
||||
}
|
||||
}
|
||||
reader.close();
|
||||
}
|
||||
writer.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
public void testTonsOfUpdates() throws Exception {
|
||||
// LUCENE-5248: make sure that when there are many updates, we don't use too much RAM
|
||||
Directory dir = newDirectory();
|
||||
final Random random = random();
|
||||
IndexWriterConfig conf = newIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(random));
|
||||
conf.setRAMBufferSizeMB(IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB);
|
||||
conf.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH); // don't flush by doc
|
||||
IndexWriter writer = new IndexWriter(dir, conf);
|
||||
|
||||
// test data: lots of documents (few 10Ks) and lots of update terms (few hundreds)
|
||||
final int numDocs = atLeast(20000);
|
||||
final int numBinaryFields = atLeast(5);
|
||||
final int numTerms = TestUtil.nextInt(random, 10, 100); // terms should affect many docs
|
||||
Set<String> updateTerms = new HashSet<>();
|
||||
while (updateTerms.size() < numTerms) {
|
||||
updateTerms.add(TestUtil.randomSimpleString(random));
|
||||
}
|
||||
|
||||
// System.out.println("numDocs=" + numDocs + " numBinaryFields=" + numBinaryFields + " numTerms=" + numTerms);
|
||||
|
||||
// build a large index with many BDV fields and update terms
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
Document doc = new Document();
|
||||
int numUpdateTerms = TestUtil.nextInt(random, 1, numTerms / 10);
|
||||
for (int j = 0; j < numUpdateTerms; j++) {
|
||||
doc.add(new StringField("upd", RandomPicks.randomFrom(random, updateTerms), Store.NO));
|
||||
}
|
||||
for (int j = 0; j < numBinaryFields; j++) {
|
||||
long val = random.nextInt();
|
||||
doc.add(new BinaryDocValuesField("f" + j, TestBinaryDocValuesUpdates.toBytes(val)));
|
||||
doc.add(new NumericDocValuesField("cf" + j, val * 2));
|
||||
}
|
||||
writer.addDocument(doc);
|
||||
}
|
||||
|
||||
writer.commit(); // commit so there's something to apply to
|
||||
|
||||
// set to flush every 2048 bytes (approximately every 12 updates), so we get
|
||||
// many flushes during binary updates
|
||||
writer.getConfig().setRAMBufferSizeMB(2048.0 / 1024 / 1024);
|
||||
final int numUpdates = atLeast(100);
|
||||
// System.out.println("numUpdates=" + numUpdates);
|
||||
for (int i = 0; i < numUpdates; i++) {
|
||||
int field = random.nextInt(numBinaryFields);
|
||||
Term updateTerm = new Term("upd", RandomPicks.randomFrom(random, updateTerms));
|
||||
long value = random.nextInt();
|
||||
writer.updateBinaryDocValue(updateTerm, "f" + field, TestBinaryDocValuesUpdates.toBytes(value));
|
||||
writer.updateNumericDocValue(updateTerm, "cf" + field, value * 2);
|
||||
}
|
||||
|
||||
writer.close();
|
||||
|
||||
DirectoryReader reader = DirectoryReader.open(dir);
|
||||
BytesRef scratch = new BytesRef();
|
||||
for (AtomicReaderContext context : reader.leaves()) {
|
||||
for (int i = 0; i < numBinaryFields; i++) {
|
||||
AtomicReader r = context.reader();
|
||||
BinaryDocValues f = r.getBinaryDocValues("f" + i);
|
||||
NumericDocValues cf = r.getNumericDocValues("cf" + i);
|
||||
for (int j = 0; j < r.maxDoc(); j++) {
|
||||
assertEquals("reader=" + r + ", field=f" + i + ", doc=" + j, cf.get(j), TestBinaryDocValuesUpdates.getValue(f, j, scratch) * 2);
|
||||
}
|
||||
}
|
||||
}
|
||||
reader.close();
|
||||
|
||||
dir.close();
|
||||
}
|
||||
|
||||
}
|
|
@ -33,7 +33,6 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
|
@ -357,7 +356,7 @@ public class TestNumericDocValuesUpdates extends LuceneTestCase {
|
|||
SortedSetDocValues ssdv = r.getSortedSetDocValues("ssdv");
|
||||
BytesRef scratch = new BytesRef();
|
||||
for (int i = 0; i < r.maxDoc(); i++) {
|
||||
assertEquals(17, ndv.get(0));
|
||||
assertEquals(17, ndv.get(i));
|
||||
bdv.get(i, scratch);
|
||||
assertEquals(new BytesRef(Integer.toString(i)), scratch);
|
||||
sdv.get(i, scratch);
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.lucene.analysis.MockAnalyzer;
|
|||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.apache.lucene.util.NullInfoStream;
|
||||
|
@ -224,6 +225,10 @@ public class RandomIndexWriter implements Closeable {
|
|||
w.updateNumericDocValue(term, field, value);
|
||||
}
|
||||
|
||||
public void updateBinaryDocValue(Term term, String field, BytesRef value) throws IOException {
|
||||
w.updateBinaryDocValue(term, field, value);
|
||||
}
|
||||
|
||||
public void deleteDocuments(Term term) throws IOException {
|
||||
w.deleteDocuments(term);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue