This commit is contained in:
Karl Wright 2018-03-31 10:04:01 -04:00
commit dc9c60322a
14 changed files with 484 additions and 176 deletions

View File

@ -333,8 +333,8 @@ class BufferedUpdatesStream implements Accountable {
if (success) {
totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount;
int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount();
assert fullDelCount <= segState.rld.info.info.maxDoc();
if (fullDelCount == segState.rld.info.info.maxDoc()) {
assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc();
if (segState.rld.isFullyDeleted()) {
if (allDeleted == null) {
allDeleted = new ArrayList<>();
}

View File

@ -616,9 +616,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// did was move the state to disk:
checkpointNoSIS();
}
rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
if (rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream)) {
checkpointNoSIS();
}
if (rld.getNumDVUpdates() == 0) {
rld.dropReaders();
readerMap.remove(rld.info);
@ -635,13 +635,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
void writeAllDocValuesUpdates() throws IOException {
assert Thread.holdsLock(IndexWriter.this);
Collection<ReadersAndUpdates> copy;
synchronized (this) {
// this needs to be protected by the reader pool lock otherwise we hit ConcurrentModificationException
copy = new HashSet<>(readerMap.values());
}
boolean any = false;
for (ReadersAndUpdates rld : copy) {
any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
any |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
}
if (any) {
checkpoint();
@ -649,11 +651,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
}
void writeDocValuesUpdatesForMerge(List<SegmentCommitInfo> infos) throws IOException {
assert Thread.holdsLock(IndexWriter.this);
boolean any = false;
for (SegmentCommitInfo info : infos) {
ReadersAndUpdates rld = get(info, false);
if (rld != null) {
any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
any |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
rld.setIsMerging();
}
}
@ -706,7 +709,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
// Only acquire IW lock on each write, since this is a time consuming operation. This way
// other threads get a chance to run in between our writes.
synchronized (IndexWriter.this) {
rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
if (rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream)) {
checkpointNoSIS();
}
}
long bytesUsedAfter = rld.ramBytesUsed.get();
ramBytesUsed -= bytesUsedBefore - bytesUsedAfter;
@ -789,8 +794,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (rld != null) {
assert rld.info == info;
boolean changed = rld.writeLiveDocs(directory);
changed |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
changed |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream);
if (changed) {
// Make sure we only write del docs for a live segment:
@ -838,7 +842,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (create == false) {
return null;
}
rld = new ReadersAndUpdates(IndexWriter.this, info);
rld = new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), info, null, new PendingDeletes(null, info));
// Steal initial reference:
readerMap.put(info, rld);
} else {
@ -1147,7 +1151,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
LeafReaderContext leaf = leaves.get(i);
SegmentReader segReader = (SegmentReader) leaf.reader();
SegmentReader newReader = new SegmentReader(segmentInfos.info(i), segReader, segReader.getLiveDocs(), segReader.numDocs());
readerPool.readerMap.put(newReader.getSegmentInfo(), new ReadersAndUpdates(this, newReader));
readerPool.readerMap.put(newReader.getSegmentInfo(), new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), newReader, new PendingDeletes(newReader, newReader.getSegmentInfo())));
}
// We always assume we are carrying over incoming changes when opening from reader:
@ -1637,8 +1641,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
if (rld != null) {
synchronized(bufferedUpdatesStream) {
if (rld.delete(docID)) {
final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount();
if (fullDelCount == rld.info.info.maxDoc()) {
if (rld.isFullyDeleted()) {
dropDeletedSegment(rld.info);
checkpoint();
}
@ -4000,8 +4003,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable {
final boolean allDeleted = merge.segments.size() == 0 ||
merge.info.info.maxDoc() == 0 ||
(mergedUpdates != null &&
mergedUpdates.getPendingDeleteCount() == merge.info.info.maxDoc());
(mergedUpdates != null && mergedUpdates.isFullyDeleted());
if (infoStream.isEnabled("IW")) {
if (allDeleted) {

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.LiveDocsFormat;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.MutableBits;
/**
* This class handles accounting and applying pending deletes for live segment readers
*/
final class PendingDeletes {
private final SegmentCommitInfo info;
// True if the current liveDocs is referenced by an
// external NRT reader:
private boolean liveDocsShared;
// Holds the current shared (readable and writable)
// liveDocs. This is null when there are no deleted
// docs, and it's copy-on-write (cloned whenever we need
// to change it but it's been shared to an external NRT
// reader).
private Bits liveDocs;
private int pendingDeleteCount;
PendingDeletes(SegmentReader reader, SegmentCommitInfo info) {
this.info = info;
liveDocsShared = true;
liveDocs = reader != null ? reader.getLiveDocs() : null;
if (reader != null) {
pendingDeleteCount = reader.numDeletedDocs() - info.getDelCount();
} else {
pendingDeleteCount = 0;
}
}
/**
* Marks a document as deleted in this segment and return true if a document got actually deleted or
* if the document was already deleted.
*/
boolean delete(int docID) throws IOException {
assert info.info.maxDoc() > 0;
if (liveDocsShared) {
// Copy on write: this means we've cloned a
// SegmentReader sharing the current liveDocs
// instance; must now make a private clone so we can
// change it:
LiveDocsFormat liveDocsFormat = info.info.getCodec().liveDocsFormat();
if (liveDocs == null) {
liveDocs = liveDocsFormat.newLiveDocs(info.info.maxDoc());
} else {
liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
}
liveDocsShared = false;
}
assert liveDocs != null;
assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.info.name + " maxDoc=" + info.info.maxDoc();
assert !liveDocsShared;
final boolean didDelete = liveDocs.get(docID);
if (didDelete) {
((MutableBits) liveDocs).clear(docID);
pendingDeleteCount++;
}
return didDelete;
}
/**
* Should be called if the live docs returned from {@link #getLiveDocs()} are shared outside of the
* {@link ReadersAndUpdates}
*/
void liveDocsShared() {
liveDocsShared = true;
}
/**
* Returns the current live docs or null if all docs are live. The returned instance might be mutable or is mutated behind the scenes.
* If the returned live docs are shared outside of the ReadersAndUpdates {@link #liveDocsShared()} should be called
* first.
*/
Bits getLiveDocs() {
return liveDocs;
}
/**
* Returns the number of pending deletes that are not written to disk.
*/
int numPendingDeletes() {
return pendingDeleteCount;
}
/**
* Called once a new reader is opened for this segment ie. when deletes or updates are applied.
*/
void onNewReader(SegmentReader reader, SegmentCommitInfo info) {
if (liveDocs == null) {
liveDocs = reader.getLiveDocs();
}
}
/**
* Resets the pending docs
*/
void reset() {
pendingDeleteCount = 0;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("PendingDeletes(seg=").append(info);
sb.append(" numPendingDeletes=").append(pendingDeleteCount);
sb.append(" liveDocsShared=").append(liveDocsShared);
return sb.toString();
}
/**
* Writes the live docs to disk and returns <code>true</code> if any new docs were written.
*/
boolean writeLiveDocs(Directory dir) throws IOException {
if (pendingDeleteCount == 0) {
return false;
}
Bits liveDocs = this.liveDocs;
assert liveDocs != null;
// We have new deletes
assert liveDocs.length() == info.info.maxDoc();
// Do this so we can delete any created files on
// exception; this saves all codecs from having to do
// it:
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
// We can write directly to the actual name (vs to a
// .tmp & renaming it) because the file is not live
// until segments file is written:
boolean success = false;
try {
Codec codec = info.info.getCodec();
codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT);
success = true;
} finally {
if (!success) {
// Advance only the nextWriteDelGen so that a 2nd
// attempt to write will write to a new file
info.advanceNextWriteDelGen();
// Delete any partially created file(s):
for (String fileName : trackingDir.getCreatedFiles()) {
IOUtils.deleteFilesIgnoringExceptions(dir, fileName);
}
}
}
// If we hit an exc in the line above (eg disk full)
// then info's delGen remains pointing to the previous
// (successfully written) del docs:
info.advanceDelGen();
info.setDelCount(info.getDelCount() + pendingDeleteCount);
reset();
return true;
}
/**
* Returns <code>true</code> iff the segment represented by this {@link PendingDeletes} is fully deleted
*/
boolean isFullyDeleted() {
return info.getDelCount() + pendingDeleteCount == info.info.maxDoc();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -34,7 +35,6 @@ import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.FieldInfosFormat;
import org.apache.lucene.codecs.LiveDocsFormat;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
@ -43,38 +43,27 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.MutableBits;
// Used by IndexWriter to hold open SegmentReaders (for
// searching or merging), plus pending deletes and updates,
// for a given segment
class ReadersAndUpdates {
final class ReadersAndUpdates {
// Not final because we replace (clone) when we need to
// change it and it's been shared:
public final SegmentCommitInfo info;
final SegmentCommitInfo info;
// Tracks how many consumers are using this instance:
private final AtomicInteger refCount = new AtomicInteger(1);
private final IndexWriter writer;
// Set once (null, and then maybe set, and never set again):
private SegmentReader reader;
// Holds the current shared (readable and writable)
// liveDocs. This is null when there are no deleted
// docs, and it's copy-on-write (cloned whenever we need
// to change it but it's been shared to an external NRT
// reader).
private Bits liveDocs;
// How many further deletions we've done against
// liveDocs vs when we loaded it or last wrote it:
private int pendingDeleteCount;
private final PendingDeletes pendingDeletes;
// True if the current liveDocs is referenced by an
// external NRT reader:
private boolean liveDocsShared;
// the major version this index was created with
private final int indexCreatedVersionMajor;
// Indicates whether this segment is currently being merged. While a segment
// is merging, all field updates are also registered in the
@ -96,25 +85,23 @@ class ReadersAndUpdates {
// Only set if there are doc values updates against this segment, and the index is sorted:
Sorter.DocMap sortMap;
public final AtomicLong ramBytesUsed = new AtomicLong();
public ReadersAndUpdates(IndexWriter writer, SegmentCommitInfo info) {
this.writer = writer;
final AtomicLong ramBytesUsed = new AtomicLong();
ReadersAndUpdates(int indexCreatedVersionMajor, SegmentCommitInfo info, SegmentReader reader,
PendingDeletes pendingDeletes) {
this.info = info;
liveDocsShared = true;
this.pendingDeletes = pendingDeletes;
this.indexCreatedVersionMajor = indexCreatedVersionMajor;
this.reader = reader;
}
/** Init from a previously opened SegmentReader.
*
* <p>NOTE: steals incoming ref from reader. */
public ReadersAndUpdates(IndexWriter writer, SegmentReader reader) {
this.writer = writer;
this.reader = reader;
info = reader.getSegmentInfo();
liveDocs = reader.getLiveDocs();
liveDocsShared = true;
pendingDeleteCount = reader.numDeletedDocs() - info.getDelCount();
assert pendingDeleteCount >= 0: "got " + pendingDeleteCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs() + " info.getDelCount()=" + info.getDelCount() + " maxDoc=" + reader.maxDoc() + " numDocs=" + reader.numDocs();
ReadersAndUpdates(int indexCreatedVersionMajor, SegmentReader reader, PendingDeletes pendingDeletes) {
this(indexCreatedVersionMajor, reader.getSegmentInfo(), reader, pendingDeletes);
assert pendingDeletes.numPendingDeletes() >= 0
: "got " + pendingDeletes.numPendingDeletes() + " reader.numDeletedDocs()=" + reader.numDeletedDocs() + " info.getDelCount()=" + info.getDelCount() + " maxDoc=" + reader.maxDoc() + " numDocs=" + reader.numDocs();
}
public void incRef() {
@ -134,7 +121,7 @@ class ReadersAndUpdates {
}
public synchronized int getPendingDeleteCount() {
return pendingDeleteCount;
return pendingDeletes.numPendingDeletes();
}
private synchronized boolean assertNoDupGen(List<DocValuesFieldUpdates> fieldUpdates, DocValuesFieldUpdates update) {
@ -186,6 +173,7 @@ class ReadersAndUpdates {
// Call only from assert!
public synchronized boolean verifyDocCounts() {
int count;
Bits liveDocs = pendingDeletes.getLiveDocs();
if (liveDocs != null) {
count = 0;
for(int docID=0;docID<info.info.maxDoc();docID++) {
@ -197,7 +185,7 @@ class ReadersAndUpdates {
count = info.info.maxDoc();
}
assert info.info.maxDoc() - info.getDelCount() - pendingDeleteCount == count: "info.maxDoc=" + info.info.maxDoc() + " info.getDelCount()=" + info.getDelCount() + " pendingDeleteCount=" + pendingDeleteCount + " count=" + count;
assert info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes() == count: "info.maxDoc=" + info.info.maxDoc() + " info.getDelCount()=" + info.getDelCount() + " pendingDeletes=" + pendingDeletes.numPendingDeletes() + " count=" + count;
return true;
}
@ -205,12 +193,9 @@ class ReadersAndUpdates {
public synchronized SegmentReader getReader(IOContext context) throws IOException {
if (reader == null) {
// We steal returned ref:
reader = new SegmentReader(info, writer.segmentInfos.getIndexCreatedVersionMajor(), context);
if (liveDocs == null) {
liveDocs = reader.getLiveDocs();
}
reader = new SegmentReader(info, indexCreatedVersionMajor, context);
pendingDeletes.onNewReader(reader, info);
}
// Ref for caller
reader.incRef();
return reader;
@ -222,16 +207,7 @@ class ReadersAndUpdates {
}
public synchronized boolean delete(int docID) throws IOException {
initWritableLiveDocs();
assert liveDocs != null;
assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.info.name + " maxDoc=" + info.info.maxDoc();
assert !liveDocsShared;
final boolean didDelete = liveDocs.get(docID);
if (didDelete) {
((MutableBits) liveDocs).clear(docID);
pendingDeleteCount++;
}
return didDelete;
return pendingDeletes.delete(docID);
}
// NOTE: removes callers ref
@ -258,10 +234,11 @@ class ReadersAndUpdates {
getReader(context).decRef();
assert reader != null;
}
// force new liveDocs in initWritableLiveDocs even if it's null
liveDocsShared = true;
// force new liveDocs
Bits liveDocs = pendingDeletes.getLiveDocs();
pendingDeletes.liveDocsShared();
if (liveDocs != null) {
return new SegmentReader(reader.getSegmentInfo(), reader, liveDocs, info.info.maxDoc() - info.getDelCount() - pendingDeleteCount);
return new SegmentReader(reader.getSegmentInfo(), reader, liveDocs, info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes());
} else {
// liveDocs == null and reader != null. That can only be if there are no deletes
assert reader.getLiveDocs() == null;
@ -270,29 +247,12 @@ class ReadersAndUpdates {
}
}
private synchronized void initWritableLiveDocs() throws IOException {
assert info.info.maxDoc() > 0;
if (liveDocsShared) {
// Copy on write: this means we've cloned a
// SegmentReader sharing the current liveDocs
// instance; must now make a private clone so we can
// change it:
LiveDocsFormat liveDocsFormat = info.info.getCodec().liveDocsFormat();
if (liveDocs == null) {
liveDocs = liveDocsFormat.newLiveDocs(info.info.maxDoc());
} else {
liveDocs = liveDocsFormat.newLiveDocs(liveDocs);
}
liveDocsShared = false;
}
}
public synchronized Bits getLiveDocs() {
return liveDocs;
return pendingDeletes.getLiveDocs();
}
public synchronized void dropChanges() {
assert Thread.holdsLock(writer);
// Discard (don't save) changes when we are dropping
// the reader; this is used only on the sub-readers
// after a successful merge. If deletes had
@ -300,7 +260,7 @@ class ReadersAndUpdates {
// is running, by now we have carried forward those
// deletes onto the newly merged segment, so we can
// discard them on the sub-readers:
pendingDeleteCount = 0;
pendingDeletes.reset();
dropMergingUpdates();
}
@ -308,47 +268,7 @@ class ReadersAndUpdates {
// _X_N updates files) to the directory; returns true if it wrote any file
// and false if there were no new deletes or updates to write:
public synchronized boolean writeLiveDocs(Directory dir) throws IOException {
if (pendingDeleteCount == 0) {
return false;
}
// We have new deletes
assert liveDocs.length() == info.info.maxDoc();
// Do this so we can delete any created files on
// exception; this saves all codecs from having to do
// it:
TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir);
// We can write directly to the actual name (vs to a
// .tmp & renaming it) because the file is not live
// until segments file is written:
boolean success = false;
try {
Codec codec = info.info.getCodec();
codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT);
success = true;
} finally {
if (!success) {
// Advance only the nextWriteDelGen so that a 2nd
// attempt to write will write to a new file
info.advanceNextWriteDelGen();
// Delete any partially created file(s):
for (String fileName : trackingDir.getCreatedFiles()) {
IOUtils.deleteFilesIgnoringExceptions(dir, fileName);
}
}
}
// If we hit an exc in the line above (eg disk full)
// then info's delGen remains pointing to the previous
// (successfully written) del docs:
info.advanceDelGen();
info.setDelCount(info.getDelCount() + pendingDeleteCount);
pendingDeleteCount = 0;
return true;
return pendingDeletes.writeLiveDocs(dir);
}
@SuppressWarnings("synthetic-access")
@ -404,7 +324,6 @@ class ReadersAndUpdates {
if (fieldInfoIn != fieldInfo) {
throw new IllegalArgumentException("wrong fieldInfo");
}
final int maxDoc = reader.maxDoc();
DocValuesFieldUpdates.Iterator[] subs = new DocValuesFieldUpdates.Iterator[updatesToApply.size()];
for(int i=0;i<subs.length;i++) {
subs[i] = updatesToApply.get(i).iterator();
@ -623,8 +542,8 @@ class ReadersAndUpdates {
}
}
private synchronized Set<String> writeFieldInfosGen(FieldInfos fieldInfos, Directory dir, DocValuesFormat dvFormat,
FieldInfosFormat infosFormat) throws IOException {
private synchronized Set<String> writeFieldInfosGen(FieldInfos fieldInfos, Directory dir,
FieldInfosFormat infosFormat) throws IOException {
final long nextFieldInfosGen = info.getNextFieldInfosGen();
final String segmentSuffix = Long.toString(nextFieldInfosGen, Character.MAX_RADIX);
// we write approximately that many bytes (based on Lucene46DVF):
@ -639,22 +558,15 @@ class ReadersAndUpdates {
return trackingDir.getCreatedFiles();
}
public synchronized boolean writeFieldUpdates(Directory dir, long maxDelGen, InfoStream infoStream) throws IOException {
public synchronized boolean writeFieldUpdates(Directory dir, FieldInfos.FieldNumbers fieldNumbers, long maxDelGen, InfoStream infoStream) throws IOException {
long startTimeNS = System.nanoTime();
assert Thread.holdsLock(writer);
final Map<Integer,Set<String>> newDVFiles = new HashMap<>();
Set<String> fieldInfosFiles = null;
FieldInfos fieldInfos = null;
boolean any = false;
int count = 0;
for (List<DocValuesFieldUpdates> updates : pendingDVUpdates.values()) {
// Sort by increasing delGen:
Collections.sort(updates, (a, b) -> Long.compare(a.delGen, b.delGen));
count += updates.size();
Collections.sort(updates, Comparator.comparingLong(a -> a.delGen));
for (DocValuesFieldUpdates update : updates) {
if (update.delGen <= maxDelGen && update.any()) {
any = true;
@ -680,7 +592,7 @@ class ReadersAndUpdates {
// IndexWriter.commitMergedDeletes).
final SegmentReader reader;
if (this.reader == null) {
reader = new SegmentReader(info, writer.segmentInfos.getIndexCreatedVersionMajor(), IOContext.READONCE);
reader = new SegmentReader(info, indexCreatedVersionMajor, IOContext.READONCE);
} else {
reader = this.reader;
}
@ -688,7 +600,7 @@ class ReadersAndUpdates {
try {
// clone FieldInfos so that we can update their dvGen separately from
// the reader's infos and write them to a new fieldInfos_gen file
FieldInfos.Builder builder = new FieldInfos.Builder(writer.globalFieldNumberMap);
FieldInfos.Builder builder = new FieldInfos.Builder(fieldNumbers);
// cannot use builder.add(reader.getFieldInfos()) because it does not
// clone FI.attributes as well FI.dvGen
for (FieldInfo fi : reader.getFieldInfos()) {
@ -713,7 +625,7 @@ class ReadersAndUpdates {
handleNumericDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream);
handleBinaryDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream);
fieldInfosFiles = writeFieldInfosGen(fieldInfos, trackingDir, docValuesFormat, codec.fieldInfosFormat());
fieldInfosFiles = writeFieldInfosGen(fieldInfos, trackingDir, codec.fieldInfosFormat());
} finally {
if (reader != this.reader) {
reader.close();
@ -763,11 +675,12 @@ class ReadersAndUpdates {
// if there is a reader open, reopen it to reflect the updates
if (reader != null) {
SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.maxDoc() - info.getDelCount() - pendingDeleteCount);
SegmentReader newReader = new SegmentReader(info, reader, pendingDeletes.getLiveDocs(), info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes());
boolean success2 = false;
try {
reader.decRef();
reader = newReader;
pendingDeletes.onNewReader(reader, info);
success2 = true;
} finally {
if (success2 == false) {
@ -792,14 +705,10 @@ class ReadersAndUpdates {
}
info.setDocValuesUpdatesFiles(newDVFiles);
// wrote new files, should checkpoint()
writer.checkpointNoSIS();
if (infoStream.isEnabled("BD")) {
infoStream.message("BD", String.format(Locale.ROOT, "done write field updates for seg=%s; took %.3fs; new files: %s",
info, (System.nanoTime() - startTimeNS)/1000000000.0, newDVFiles));
}
return true;
}
@ -829,12 +738,11 @@ class ReadersAndUpdates {
}
SegmentReader reader = getReader(context);
int delCount = pendingDeleteCount + info.getDelCount();
int delCount = pendingDeletes.numPendingDeletes() + info.getDelCount();
if (delCount != reader.numDeletedDocs()) {
// beware of zombies:
assert delCount > reader.numDeletedDocs(): "delCount=" + delCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs();
Bits liveDocs = pendingDeletes.getLiveDocs();
assert liveDocs != null;
// Create a new reader with the latest live docs:
@ -842,6 +750,7 @@ class ReadersAndUpdates {
boolean success = false;
try {
reader.decRef();
pendingDeletes.onNewReader(newReader, info);
success = true;
} finally {
if (success == false) {
@ -851,7 +760,7 @@ class ReadersAndUpdates {
reader = newReader;
}
liveDocsShared = true;
pendingDeletes.liveDocsShared();
assert verifyDocCounts();
@ -877,8 +786,12 @@ class ReadersAndUpdates {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("ReadersAndLiveDocs(seg=").append(info);
sb.append(" pendingDeleteCount=").append(pendingDeleteCount);
sb.append(" liveDocsShared=").append(liveDocsShared);
sb.append(" pendingDeletes=").append(pendingDeletes);
return sb.toString();
}
public synchronized boolean isFullyDeleted() {
return pendingDeletes.isFullyDeleted();
}
}

View File

@ -22,7 +22,7 @@ package org.apache.lucene.util;
* @lucene.experimental
*/
public interface Bits {
public interface Bits {
/**
* Returns the value of the bit with the specified <code>index</code>.
* @param index index, should be non-negative and &lt; {@link #length()}.

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;
public class TestPendingDeletes extends LuceneTestCase {
public void testDeleteDoc() throws IOException {
RAMDirectory dir = new RAMDirectory();
SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 10, false, Codec.getDefault(),
Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null);
SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0);
PendingDeletes deletes = new PendingDeletes(null, commitInfo);
assertNull(deletes.getLiveDocs());
int docToDelete = TestUtil.nextInt(random(), 0, 7);
assertTrue(deletes.delete(docToDelete));
assertNotNull(deletes.getLiveDocs());
assertEquals(1, deletes.numPendingDeletes());
Bits liveDocs = deletes.getLiveDocs();
assertFalse(liveDocs.get(docToDelete));
assertFalse(deletes.delete(docToDelete)); // delete again
// make sure we are live ie. mutable
assertTrue(liveDocs.get(8));
assertTrue(deletes.delete(8));
assertFalse(liveDocs.get(8));
assertEquals(2, deletes.numPendingDeletes());
deletes.liveDocsShared();
// make sure we are live ie. mutable
assertTrue(liveDocs.get(9));
assertTrue(deletes.delete(9));
assertTrue(liveDocs.get(9));
liveDocs = deletes.getLiveDocs();
assertFalse(liveDocs.get(9));
assertFalse(liveDocs.get(8));
assertFalse(liveDocs.get(docToDelete));
assertEquals(3, deletes.numPendingDeletes());
dir.close();
}
public void testWriteLiveDocs() throws IOException {
RAMDirectory dir = new RAMDirectory();
SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 6, false, Codec.getDefault(),
Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null);
SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0);
PendingDeletes deletes = new PendingDeletes(null, commitInfo);
assertFalse(deletes.writeLiveDocs(dir));
assertEquals(0, dir.listAll().length);
boolean secondDocDeletes = random().nextBoolean();
deletes.delete(5);
if (secondDocDeletes) {
deletes.liveDocsShared();
deletes.delete(2);
}
assertEquals(0, commitInfo.getDelGen());
assertEquals(0, commitInfo.getDelCount());
assertEquals(secondDocDeletes ? 2 : 1, deletes.numPendingDeletes());
assertTrue(deletes.writeLiveDocs(dir));
assertEquals(1, dir.listAll().length);
Bits liveDocs = Codec.getDefault().liveDocsFormat().readLiveDocs(dir, commitInfo, IOContext.DEFAULT);
assertFalse(liveDocs.get(5));
if (secondDocDeletes) {
assertFalse(liveDocs.get(2));
} else {
assertTrue(liveDocs.get(2));
}
assertTrue(liveDocs.get(0));
assertTrue(liveDocs.get(1));
assertTrue(liveDocs.get(3));
assertTrue(liveDocs.get(4));
assertEquals(0, deletes.numPendingDeletes());
assertEquals(secondDocDeletes ? 2 : 1, commitInfo.getDelCount());
assertEquals(1, commitInfo.getDelGen());
deletes.delete(0);
assertTrue(deletes.writeLiveDocs(dir));
assertEquals(2, dir.listAll().length);
liveDocs = Codec.getDefault().liveDocsFormat().readLiveDocs(dir, commitInfo, IOContext.DEFAULT);
assertFalse(liveDocs.get(5));
if (secondDocDeletes) {
assertFalse(liveDocs.get(2));
} else {
assertTrue(liveDocs.get(2));
}
assertFalse(liveDocs.get(0));
assertTrue(liveDocs.get(1));
assertTrue(liveDocs.get(3));
assertTrue(liveDocs.get(4));
assertEquals(0, deletes.numPendingDeletes());
assertEquals(secondDocDeletes ? 3 : 2, commitInfo.getDelCount());
assertEquals(2, commitInfo.getDelGen());
dir.close();
}
public void testIsFullyDeleted() throws IOException {
RAMDirectory dir = new RAMDirectory();
SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 3, false, Codec.getDefault(),
Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null);
SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0);
PendingDeletes deletes = new PendingDeletes(null, commitInfo);
for (int i = 0; i < 3; i++) {
assertTrue(deletes.delete(i));
if (random().nextBoolean()) {
assertTrue(deletes.writeLiveDocs(dir));
}
assertEquals(i == 2, deletes.isFullyDeleted());
}
}
}

View File

@ -110,6 +110,8 @@ Optimizations
* SOLR-12146: LIR should skip deleted replicas (Cao Manh Dat)
* SOLR-12066: Cleanup deleted core when node start (Cao Manh Dat)
Other Changes
----------------------
@ -133,6 +135,11 @@ Other Changes
* SOLR-12152: Split up TriggerIntegrationTest into multiple tests to isolate and increase reliability. (shalin)
* SOLR-12133: Fix race conditions that caused TriggerIntegrationTest.testEventQueue to fail. (Mark Miller, shalin)
* SOLR-12169: Fix ComputePlanActionTest.testSelectedCollections fails on jenkins by aggressively cleaning up
trigger state left by other test methods in the test setup. (shalin)
================== 7.3.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -1661,6 +1661,9 @@ public class ZkController {
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
} catch (NotInClusterStateException e) {
// make the stack trace less verbose
throw e;
} catch (Exception e) {
log.error("", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
@ -1688,7 +1691,7 @@ public class ZkController {
return true;
}
private void checkStateInZk(CoreDescriptor cd) throws InterruptedException {
private void checkStateInZk(CoreDescriptor cd) throws InterruptedException, NotInClusterStateException {
if (!Overseer.isLegacy(zkStateReader)) {
CloudDescriptor cloudDesc = cd.getCloudDescriptor();
String nodeName = cloudDesc.getCoreNodeName();
@ -1722,7 +1725,8 @@ public class ZkController {
}
Replica replica = slice.getReplica(coreNodeName);
if (replica == null) {
errorMessage.set("coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId());
errorMessage.set("coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() +
", ignore the exception if the replica was deleted");
return false;
}
return true;
@ -1730,8 +1734,9 @@ public class ZkController {
} catch (TimeoutException e) {
String error = errorMessage.get();
if (error == null)
error = "Replica " + coreNodeName + " is not present in cluster state";
throw new SolrException(ErrorCode.SERVER_ERROR, error + ": " + collectionState.get());
error = "coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() +
", ignore the exception if the replica was deleted";
throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
}
}
}
@ -2711,6 +2716,15 @@ public class ZkController {
}
}
/**
* Thrown during pre register process if the replica is not present in clusterstate
*/
public static class NotInClusterStateException extends SolrException {
public NotInClusterStateException(ErrorCode code, String msg) {
super(code, msg);
}
}
public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) {
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(dcore.getCollectionName());
if (collection != null) {

View File

@ -53,7 +53,7 @@ public class NodeAddedTrigger extends TriggerBase {
SolrCloudManager cloudManager) {
super(TriggerEventType.NODEADDED, name, properties, loader, cloudManager);
lastLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
log.debug("NodeAddedTrigger {} - Initial livenodes: {}", name, lastLiveNodes);
log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties);
}

View File

@ -52,7 +52,7 @@ public class NodeLostTrigger extends TriggerBase {
SolrCloudManager dataProvider) {
super(TriggerEventType.NODELOST, name, properties, loader, dataProvider);
lastLiveNodes = new HashSet<>(dataProvider.getClusterStateProvider().getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
log.debug("NodeLostTrigger {} - Initial livenodes: {}", name, lastLiveNodes);
}
@Override

View File

@ -677,7 +677,7 @@ public class CoreContainer {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Error waiting for SolrCore to be created", e);
log.error("Error waiting for SolrCore to be loaded on startup", e.getCause());
}
}
} finally {
@ -1063,6 +1063,11 @@ public class CoreContainer {
return core;
} catch (Exception e) {
coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
if (e instanceof ZkController.NotInClusterStateException && !newCollection) {
// this mostly happen when the core is deleted when this node is down
unload(dcore.getName(), true, true, true);
throw e;
}
solrCores.removeCoreDescriptor(dcore);
final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e);
if(core != null && !core.isClosed())

View File

@ -17,6 +17,8 @@
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@ -26,7 +28,11 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.TimeOut;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@ -64,6 +70,10 @@ public class DeleteInactiveReplicaTest extends SolrCloudTestCase {
Slice shard = getRandomShard(collectionState);
Replica replica = getRandomReplica(shard);
JettySolrRunner jetty = cluster.getReplicaJetty(replica);
CoreDescriptor replicaCd;
try (SolrCore core = jetty.getCoreContainer().getCore(replica.getCoreName())) {
replicaCd = core.getCoreDescriptor();
}
cluster.stopJettySolrRunner(jetty);
waitForState("Expected replica " + replica.getName() + " on down node to be removed from cluster state", collectionName, (n, c) -> {
@ -80,13 +90,9 @@ public class DeleteInactiveReplicaTest extends SolrCloudTestCase {
cluster.startJettySolrRunner(jetty);
log.info("restarted jetty");
CoreContainer cc = jetty.getCoreContainer();
CoreContainer.CoreLoadFailure loadFailure = cc.getCoreInitFailures().get(replica.getCoreName());
assertNotNull("Deleted core was still loaded!", loadFailure);
assertNotNull(loadFailure.exception.getCause());
assertTrue("Unexpected load failure message: " + loadFailure.exception.getCause().getMessage(),
loadFailure.exception.getCause().getMessage().contains("does not exist in shard"));
TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
timeOut.waitFor("Expected data dir and instance dir of " + replica.getName() + " is deleted", ()
-> !Files.exists(replicaCd.getInstanceDir()) && !FileUtils.fileExists(replicaCd.getDataDir()));
// Check that we can't create a core with no coreNodeName
try (SolrClient queryClient = getHttpSolrClient(jetty.getBaseUrl().toString())) {

View File

@ -17,6 +17,7 @@
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -45,12 +46,16 @@ import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@LuceneTestCase.Nightly
@LuceneTestCase.Slow
@Deprecated
public class LIROnShardRestartTest extends SolrCloudTestCase {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
@ -132,6 +137,9 @@ public class LIROnShardRestartTest extends SolrCloudTestCase {
// now expire each node
for (Replica replica : docCollection.getReplicas()) {
try {
// todo remove the condition for skipping leader after SOLR-12166 is fixed
if (newLeader.getName().equals(replica.getName())) continue;
cluster.getZkClient().makePath("/collections/" + collection + "/leader_initiated_recovery/shard1/" + replica.getName(),
znodeData, true);
} catch (KeeperException.NodeExistsException e) {
@ -153,7 +161,14 @@ public class LIROnShardRestartTest extends SolrCloudTestCase {
if (electionNodes.isEmpty()) break;
}
assertFalse("Timeout waiting for replicas rejoin election", timeOut.hasTimedOut());
waitForState("Timeout waiting for active replicas", collection, clusterShape(1, 3));
try {
waitForState("Timeout waiting for active replicas", collection, clusterShape(1, 3));
} catch (Throwable th) {
String electionPath = "/collections/allReplicasInLIR/leader_elect/shard1/election/";
List<String> children = zkClient().getChildren(electionPath, null, true);
LOG.info("Election queue {}", children);
throw th;
}
assertEquals(103, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound());

View File

@ -71,6 +71,7 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
private static final AtomicReference<Map> actionContextPropsRef = new AtomicReference<>();
private static final AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
private static SolrCloudManager cloudManager;
@BeforeClass
public static void setupCluster() throws Exception {
@ -83,10 +84,6 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
public void setUp() throws Exception {
super.setUp();
fired.set(false);
triggerFiredLatch = new CountDownLatch(1);
actionContextPropsRef.set(null);
// remove everything from autoscaling.json in ZK
zkClient().setData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, "{}".getBytes(UTF_8), true);
@ -129,6 +126,20 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPreferencesCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
fired.set(false);
triggerFiredLatch = new CountDownLatch(1);
actionContextPropsRef.set(null);
}
private void deleteChildrenRecursively(String path) throws Exception {
cloudManager.getDistribStateManager().removeRecursively(path, true, false);
}
@After
@ -365,7 +376,7 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
}
public static class AssertingTriggerAction implements TriggerAction {
static String expectedNode;
static volatile String expectedNode;
@Override
public String getName() {
@ -397,8 +408,8 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
}
@Test
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
public void testSelectedCollections() throws Exception {
log.info("Found number of jetties: {}", cluster.getJettySolrRunners().size());
AssertingTriggerAction.expectedNode = null;
// start 3 more nodes
@ -467,7 +478,7 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
Map context = actionContextPropsRef.get();
assertNotNull(context);
List<SolrRequest> operations = (List<SolrRequest>) context.get("operations");
assertNotNull("The operations computed by ComputePlanAction should not be null" + getNodeStateProviderState() + context, operations);
assertNotNull("The operations computed by ComputePlanAction should not be null. " + getNodeStateProviderState() + context, operations);
assertEquals("ComputePlanAction should have computed exactly 2 operations", 2, operations.size());
SolrRequest request = operations.get(0);
SolrParams params = request.getParams();