diff --git a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java index e887e4dbcce..63001d4d0f5 100644 --- a/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java +++ b/lucene/core/src/java/org/apache/lucene/index/BufferedUpdatesStream.java @@ -333,8 +333,8 @@ class BufferedUpdatesStream implements Accountable { if (success) { totDelCount += segState.rld.getPendingDeleteCount() - segState.startDelCount; int fullDelCount = segState.rld.info.getDelCount() + segState.rld.getPendingDeleteCount(); - assert fullDelCount <= segState.rld.info.info.maxDoc(); - if (fullDelCount == segState.rld.info.info.maxDoc()) { + assert fullDelCount <= segState.rld.info.info.maxDoc() : fullDelCount + " > " + segState.rld.info.info.maxDoc(); + if (segState.rld.isFullyDeleted()) { if (allDeleted == null) { allDeleted = new ArrayList<>(); } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 40a53e0f324..2e141667a20 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -616,9 +616,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // did was move the state to disk: checkpointNoSIS(); } - - rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); - + if (rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream)) { + checkpointNoSIS(); + } if (rld.getNumDVUpdates() == 0) { rld.dropReaders(); readerMap.remove(rld.info); @@ -635,13 +635,15 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } void writeAllDocValuesUpdates() throws IOException { + assert Thread.holdsLock(IndexWriter.this); Collection copy; synchronized (this) { + // this needs to be protected by the reader pool lock otherwise we hit ConcurrentModificationException copy = new HashSet<>(readerMap.values()); } boolean any = false; for (ReadersAndUpdates rld : copy) { - any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); + any |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream); } if (any) { checkpoint(); @@ -649,11 +651,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { } void writeDocValuesUpdatesForMerge(List infos) throws IOException { + assert Thread.holdsLock(IndexWriter.this); boolean any = false; for (SegmentCommitInfo info : infos) { ReadersAndUpdates rld = get(info, false); if (rld != null) { - any |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); + any |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream); rld.setIsMerging(); } } @@ -706,7 +709,9 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { // Only acquire IW lock on each write, since this is a time consuming operation. This way // other threads get a chance to run in between our writes. synchronized (IndexWriter.this) { - rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); + if (rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream)) { + checkpointNoSIS(); + } } long bytesUsedAfter = rld.ramBytesUsed.get(); ramBytesUsed -= bytesUsedBefore - bytesUsedAfter; @@ -789,8 +794,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (rld != null) { assert rld.info == info; boolean changed = rld.writeLiveDocs(directory); - - changed |= rld.writeFieldUpdates(directory, bufferedUpdatesStream.getCompletedDelGen(), infoStream); + changed |= rld.writeFieldUpdates(directory, globalFieldNumberMap, bufferedUpdatesStream.getCompletedDelGen(), infoStream); if (changed) { // Make sure we only write del docs for a live segment: @@ -838,7 +842,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (create == false) { return null; } - rld = new ReadersAndUpdates(IndexWriter.this, info); + rld = new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), info, null, new PendingDeletes(null, info)); // Steal initial reference: readerMap.put(info, rld); } else { @@ -1147,7 +1151,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { LeafReaderContext leaf = leaves.get(i); SegmentReader segReader = (SegmentReader) leaf.reader(); SegmentReader newReader = new SegmentReader(segmentInfos.info(i), segReader, segReader.getLiveDocs(), segReader.numDocs()); - readerPool.readerMap.put(newReader.getSegmentInfo(), new ReadersAndUpdates(this, newReader)); + readerPool.readerMap.put(newReader.getSegmentInfo(), new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), newReader, new PendingDeletes(newReader, newReader.getSegmentInfo()))); } // We always assume we are carrying over incoming changes when opening from reader: @@ -1637,8 +1641,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { if (rld != null) { synchronized(bufferedUpdatesStream) { if (rld.delete(docID)) { - final int fullDelCount = rld.info.getDelCount() + rld.getPendingDeleteCount(); - if (fullDelCount == rld.info.info.maxDoc()) { + if (rld.isFullyDeleted()) { dropDeletedSegment(rld.info); checkpoint(); } @@ -4000,8 +4003,7 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable { final boolean allDeleted = merge.segments.size() == 0 || merge.info.info.maxDoc() == 0 || - (mergedUpdates != null && - mergedUpdates.getPendingDeleteCount() == merge.info.info.maxDoc()); + (mergedUpdates != null && mergedUpdates.isFullyDeleted()); if (infoStream.isEnabled("IW")) { if (allDeleted) { diff --git a/lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java b/lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java new file mode 100644 index 00000000000..74043f3f44a --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/PendingDeletes.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.index; + +import java.io.IOException; + +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.LiveDocsFormat; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.TrackingDirectoryWrapper; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.MutableBits; + +/** + * This class handles accounting and applying pending deletes for live segment readers + */ +final class PendingDeletes { + private final SegmentCommitInfo info; + // True if the current liveDocs is referenced by an + // external NRT reader: + private boolean liveDocsShared; + // Holds the current shared (readable and writable) + // liveDocs. This is null when there are no deleted + // docs, and it's copy-on-write (cloned whenever we need + // to change it but it's been shared to an external NRT + // reader). + private Bits liveDocs; + private int pendingDeleteCount; + + PendingDeletes(SegmentReader reader, SegmentCommitInfo info) { + this.info = info; + liveDocsShared = true; + liveDocs = reader != null ? reader.getLiveDocs() : null; + if (reader != null) { + pendingDeleteCount = reader.numDeletedDocs() - info.getDelCount(); + } else { + pendingDeleteCount = 0; + } + } + + + /** + * Marks a document as deleted in this segment and return true if a document got actually deleted or + * if the document was already deleted. + */ + boolean delete(int docID) throws IOException { + assert info.info.maxDoc() > 0; + if (liveDocsShared) { + // Copy on write: this means we've cloned a + // SegmentReader sharing the current liveDocs + // instance; must now make a private clone so we can + // change it: + LiveDocsFormat liveDocsFormat = info.info.getCodec().liveDocsFormat(); + if (liveDocs == null) { + liveDocs = liveDocsFormat.newLiveDocs(info.info.maxDoc()); + } else { + liveDocs = liveDocsFormat.newLiveDocs(liveDocs); + } + liveDocsShared = false; + } + + assert liveDocs != null; + assert docID >= 0 && docID < liveDocs.length() : "out of bounds: docid=" + docID + " liveDocsLength=" + liveDocs.length() + " seg=" + info.info.name + " maxDoc=" + info.info.maxDoc(); + assert !liveDocsShared; + final boolean didDelete = liveDocs.get(docID); + if (didDelete) { + ((MutableBits) liveDocs).clear(docID); + pendingDeleteCount++; + } + return didDelete; + } + + /** + * Should be called if the live docs returned from {@link #getLiveDocs()} are shared outside of the + * {@link ReadersAndUpdates} + */ + void liveDocsShared() { + liveDocsShared = true; + } + + /** + * Returns the current live docs or null if all docs are live. The returned instance might be mutable or is mutated behind the scenes. + * If the returned live docs are shared outside of the ReadersAndUpdates {@link #liveDocsShared()} should be called + * first. + */ + Bits getLiveDocs() { + return liveDocs; + } + + /** + * Returns the number of pending deletes that are not written to disk. + */ + int numPendingDeletes() { + return pendingDeleteCount; + } + + /** + * Called once a new reader is opened for this segment ie. when deletes or updates are applied. + */ + void onNewReader(SegmentReader reader, SegmentCommitInfo info) { + if (liveDocs == null) { + liveDocs = reader.getLiveDocs(); + } + } + + /** + * Resets the pending docs + */ + void reset() { + pendingDeleteCount = 0; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("PendingDeletes(seg=").append(info); + sb.append(" numPendingDeletes=").append(pendingDeleteCount); + sb.append(" liveDocsShared=").append(liveDocsShared); + return sb.toString(); + } + + /** + * Writes the live docs to disk and returns true if any new docs were written. + */ + boolean writeLiveDocs(Directory dir) throws IOException { + if (pendingDeleteCount == 0) { + return false; + } + + Bits liveDocs = this.liveDocs; + assert liveDocs != null; + // We have new deletes + assert liveDocs.length() == info.info.maxDoc(); + + // Do this so we can delete any created files on + // exception; this saves all codecs from having to do + // it: + TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(dir); + + // We can write directly to the actual name (vs to a + // .tmp & renaming it) because the file is not live + // until segments file is written: + boolean success = false; + try { + Codec codec = info.info.getCodec(); + codec.liveDocsFormat().writeLiveDocs((MutableBits)liveDocs, trackingDir, info, pendingDeleteCount, IOContext.DEFAULT); + success = true; + } finally { + if (!success) { + // Advance only the nextWriteDelGen so that a 2nd + // attempt to write will write to a new file + info.advanceNextWriteDelGen(); + + // Delete any partially created file(s): + for (String fileName : trackingDir.getCreatedFiles()) { + IOUtils.deleteFilesIgnoringExceptions(dir, fileName); + } + } + } + + // If we hit an exc in the line above (eg disk full) + // then info's delGen remains pointing to the previous + // (successfully written) del docs: + info.advanceDelGen(); + info.setDelCount(info.getDelCount() + pendingDeleteCount); + reset(); + return true; + } + + /** + * Returns true iff the segment represented by this {@link PendingDeletes} is fully deleted + */ + boolean isFullyDeleted() { + return info.getDelCount() + pendingDeleteCount == info.info.maxDoc(); + } +} diff --git a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java index 16ea1e5b9d9..8a0e17e2b71 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java +++ b/lucene/core/src/java/org/apache/lucene/index/ReadersAndUpdates.java @@ -20,6 +20,7 @@ package org.apache.lucene.index; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -34,7 +35,6 @@ import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.FieldInfosFormat; -import org.apache.lucene.codecs.LiveDocsFormat; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FlushInfo; import org.apache.lucene.store.IOContext; @@ -43,38 +43,27 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; -import org.apache.lucene.util.MutableBits; // Used by IndexWriter to hold open SegmentReaders (for // searching or merging), plus pending deletes and updates, // for a given segment -class ReadersAndUpdates { +final class ReadersAndUpdates { // Not final because we replace (clone) when we need to // change it and it's been shared: - public final SegmentCommitInfo info; + final SegmentCommitInfo info; // Tracks how many consumers are using this instance: private final AtomicInteger refCount = new AtomicInteger(1); - private final IndexWriter writer; - // Set once (null, and then maybe set, and never set again): private SegmentReader reader; - // Holds the current shared (readable and writable) - // liveDocs. This is null when there are no deleted - // docs, and it's copy-on-write (cloned whenever we need - // to change it but it's been shared to an external NRT - // reader). - private Bits liveDocs; - // How many further deletions we've done against // liveDocs vs when we loaded it or last wrote it: - private int pendingDeleteCount; + private final PendingDeletes pendingDeletes; - // True if the current liveDocs is referenced by an - // external NRT reader: - private boolean liveDocsShared; + // the major version this index was created with + private final int indexCreatedVersionMajor; // Indicates whether this segment is currently being merged. While a segment // is merging, all field updates are also registered in the @@ -96,25 +85,23 @@ class ReadersAndUpdates { // Only set if there are doc values updates against this segment, and the index is sorted: Sorter.DocMap sortMap; - public final AtomicLong ramBytesUsed = new AtomicLong(); - - public ReadersAndUpdates(IndexWriter writer, SegmentCommitInfo info) { - this.writer = writer; + final AtomicLong ramBytesUsed = new AtomicLong(); + + ReadersAndUpdates(int indexCreatedVersionMajor, SegmentCommitInfo info, SegmentReader reader, + PendingDeletes pendingDeletes) { this.info = info; - liveDocsShared = true; + this.pendingDeletes = pendingDeletes; + this.indexCreatedVersionMajor = indexCreatedVersionMajor; + this.reader = reader; } /** Init from a previously opened SegmentReader. * *

NOTE: steals incoming ref from reader. */ - public ReadersAndUpdates(IndexWriter writer, SegmentReader reader) { - this.writer = writer; - this.reader = reader; - info = reader.getSegmentInfo(); - liveDocs = reader.getLiveDocs(); - liveDocsShared = true; - pendingDeleteCount = reader.numDeletedDocs() - info.getDelCount(); - assert pendingDeleteCount >= 0: "got " + pendingDeleteCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs() + " info.getDelCount()=" + info.getDelCount() + " maxDoc=" + reader.maxDoc() + " numDocs=" + reader.numDocs(); + ReadersAndUpdates(int indexCreatedVersionMajor, SegmentReader reader, PendingDeletes pendingDeletes) { + this(indexCreatedVersionMajor, reader.getSegmentInfo(), reader, pendingDeletes); + assert pendingDeletes.numPendingDeletes() >= 0 + : "got " + pendingDeletes.numPendingDeletes() + " reader.numDeletedDocs()=" + reader.numDeletedDocs() + " info.getDelCount()=" + info.getDelCount() + " maxDoc=" + reader.maxDoc() + " numDocs=" + reader.numDocs(); } public void incRef() { @@ -134,7 +121,7 @@ class ReadersAndUpdates { } public synchronized int getPendingDeleteCount() { - return pendingDeleteCount; + return pendingDeletes.numPendingDeletes(); } private synchronized boolean assertNoDupGen(List fieldUpdates, DocValuesFieldUpdates update) { @@ -186,6 +173,7 @@ class ReadersAndUpdates { // Call only from assert! public synchronized boolean verifyDocCounts() { int count; + Bits liveDocs = pendingDeletes.getLiveDocs(); if (liveDocs != null) { count = 0; for(int docID=0;docID writeFieldInfosGen(FieldInfos fieldInfos, Directory dir, DocValuesFormat dvFormat, - FieldInfosFormat infosFormat) throws IOException { + private synchronized Set writeFieldInfosGen(FieldInfos fieldInfos, Directory dir, + FieldInfosFormat infosFormat) throws IOException { final long nextFieldInfosGen = info.getNextFieldInfosGen(); final String segmentSuffix = Long.toString(nextFieldInfosGen, Character.MAX_RADIX); // we write approximately that many bytes (based on Lucene46DVF): @@ -639,22 +558,15 @@ class ReadersAndUpdates { return trackingDir.getCreatedFiles(); } - public synchronized boolean writeFieldUpdates(Directory dir, long maxDelGen, InfoStream infoStream) throws IOException { - + public synchronized boolean writeFieldUpdates(Directory dir, FieldInfos.FieldNumbers fieldNumbers, long maxDelGen, InfoStream infoStream) throws IOException { long startTimeNS = System.nanoTime(); - - assert Thread.holdsLock(writer); - final Map> newDVFiles = new HashMap<>(); Set fieldInfosFiles = null; FieldInfos fieldInfos = null; - boolean any = false; - int count = 0; for (List updates : pendingDVUpdates.values()) { // Sort by increasing delGen: - Collections.sort(updates, (a, b) -> Long.compare(a.delGen, b.delGen)); - count += updates.size(); + Collections.sort(updates, Comparator.comparingLong(a -> a.delGen)); for (DocValuesFieldUpdates update : updates) { if (update.delGen <= maxDelGen && update.any()) { any = true; @@ -680,7 +592,7 @@ class ReadersAndUpdates { // IndexWriter.commitMergedDeletes). final SegmentReader reader; if (this.reader == null) { - reader = new SegmentReader(info, writer.segmentInfos.getIndexCreatedVersionMajor(), IOContext.READONCE); + reader = new SegmentReader(info, indexCreatedVersionMajor, IOContext.READONCE); } else { reader = this.reader; } @@ -688,7 +600,7 @@ class ReadersAndUpdates { try { // clone FieldInfos so that we can update their dvGen separately from // the reader's infos and write them to a new fieldInfos_gen file - FieldInfos.Builder builder = new FieldInfos.Builder(writer.globalFieldNumberMap); + FieldInfos.Builder builder = new FieldInfos.Builder(fieldNumbers); // cannot use builder.add(reader.getFieldInfos()) because it does not // clone FI.attributes as well FI.dvGen for (FieldInfo fi : reader.getFieldInfos()) { @@ -713,7 +625,7 @@ class ReadersAndUpdates { handleNumericDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream); handleBinaryDVUpdates(fieldInfos, trackingDir, docValuesFormat, reader, newDVFiles, maxDelGen, infoStream); - fieldInfosFiles = writeFieldInfosGen(fieldInfos, trackingDir, docValuesFormat, codec.fieldInfosFormat()); + fieldInfosFiles = writeFieldInfosGen(fieldInfos, trackingDir, codec.fieldInfosFormat()); } finally { if (reader != this.reader) { reader.close(); @@ -763,11 +675,12 @@ class ReadersAndUpdates { // if there is a reader open, reopen it to reflect the updates if (reader != null) { - SegmentReader newReader = new SegmentReader(info, reader, liveDocs, info.info.maxDoc() - info.getDelCount() - pendingDeleteCount); + SegmentReader newReader = new SegmentReader(info, reader, pendingDeletes.getLiveDocs(), info.info.maxDoc() - info.getDelCount() - pendingDeletes.numPendingDeletes()); boolean success2 = false; try { reader.decRef(); reader = newReader; + pendingDeletes.onNewReader(reader, info); success2 = true; } finally { if (success2 == false) { @@ -792,14 +705,10 @@ class ReadersAndUpdates { } info.setDocValuesUpdatesFiles(newDVFiles); - // wrote new files, should checkpoint() - writer.checkpointNoSIS(); - if (infoStream.isEnabled("BD")) { infoStream.message("BD", String.format(Locale.ROOT, "done write field updates for seg=%s; took %.3fs; new files: %s", info, (System.nanoTime() - startTimeNS)/1000000000.0, newDVFiles)); } - return true; } @@ -829,12 +738,11 @@ class ReadersAndUpdates { } SegmentReader reader = getReader(context); - int delCount = pendingDeleteCount + info.getDelCount(); + int delCount = pendingDeletes.numPendingDeletes() + info.getDelCount(); if (delCount != reader.numDeletedDocs()) { - // beware of zombies: assert delCount > reader.numDeletedDocs(): "delCount=" + delCount + " reader.numDeletedDocs()=" + reader.numDeletedDocs(); - + Bits liveDocs = pendingDeletes.getLiveDocs(); assert liveDocs != null; // Create a new reader with the latest live docs: @@ -842,6 +750,7 @@ class ReadersAndUpdates { boolean success = false; try { reader.decRef(); + pendingDeletes.onNewReader(newReader, info); success = true; } finally { if (success == false) { @@ -851,7 +760,7 @@ class ReadersAndUpdates { reader = newReader; } - liveDocsShared = true; + pendingDeletes.liveDocsShared(); assert verifyDocCounts(); @@ -877,8 +786,12 @@ class ReadersAndUpdates { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("ReadersAndLiveDocs(seg=").append(info); - sb.append(" pendingDeleteCount=").append(pendingDeleteCount); - sb.append(" liveDocsShared=").append(liveDocsShared); + sb.append(" pendingDeletes=").append(pendingDeletes); return sb.toString(); } + + public synchronized boolean isFullyDeleted() { + return pendingDeletes.isFullyDeleted(); + } + } diff --git a/lucene/core/src/java/org/apache/lucene/util/Bits.java b/lucene/core/src/java/org/apache/lucene/util/Bits.java index 101122e628f..29935e737b8 100644 --- a/lucene/core/src/java/org/apache/lucene/util/Bits.java +++ b/lucene/core/src/java/org/apache/lucene/util/Bits.java @@ -22,7 +22,7 @@ package org.apache.lucene.util; * @lucene.experimental */ -public interface Bits { +public interface Bits { /** * Returns the value of the bit with the specified index. * @param index index, should be non-negative and < {@link #length()}. diff --git a/lucene/core/src/test/org/apache/lucene/index/TestPendingDeletes.java b/lucene/core/src/test/org/apache/lucene/index/TestPendingDeletes.java new file mode 100644 index 00000000000..39f5680a74f --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestPendingDeletes.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; + +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.StringHelper; +import org.apache.lucene.util.TestUtil; +import org.apache.lucene.util.Version; + +public class TestPendingDeletes extends LuceneTestCase { + + public void testDeleteDoc() throws IOException { + RAMDirectory dir = new RAMDirectory(); + SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 10, false, Codec.getDefault(), + Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null); + SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0); + PendingDeletes deletes = new PendingDeletes(null, commitInfo); + assertNull(deletes.getLiveDocs()); + int docToDelete = TestUtil.nextInt(random(), 0, 7); + assertTrue(deletes.delete(docToDelete)); + assertNotNull(deletes.getLiveDocs()); + assertEquals(1, deletes.numPendingDeletes()); + + Bits liveDocs = deletes.getLiveDocs(); + assertFalse(liveDocs.get(docToDelete)); + assertFalse(deletes.delete(docToDelete)); // delete again + + // make sure we are live ie. mutable + assertTrue(liveDocs.get(8)); + assertTrue(deletes.delete(8)); + assertFalse(liveDocs.get(8)); + assertEquals(2, deletes.numPendingDeletes()); + + deletes.liveDocsShared(); + + // make sure we are live ie. mutable + assertTrue(liveDocs.get(9)); + assertTrue(deletes.delete(9)); + assertTrue(liveDocs.get(9)); + liveDocs = deletes.getLiveDocs(); + assertFalse(liveDocs.get(9)); + assertFalse(liveDocs.get(8)); + assertFalse(liveDocs.get(docToDelete)); + assertEquals(3, deletes.numPendingDeletes()); + dir.close(); + } + + public void testWriteLiveDocs() throws IOException { + RAMDirectory dir = new RAMDirectory(); + SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 6, false, Codec.getDefault(), + Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null); + SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0); + PendingDeletes deletes = new PendingDeletes(null, commitInfo); + assertFalse(deletes.writeLiveDocs(dir)); + assertEquals(0, dir.listAll().length); + boolean secondDocDeletes = random().nextBoolean(); + deletes.delete(5); + if (secondDocDeletes) { + deletes.liveDocsShared(); + deletes.delete(2); + } + assertEquals(0, commitInfo.getDelGen()); + assertEquals(0, commitInfo.getDelCount()); + + assertEquals(secondDocDeletes ? 2 : 1, deletes.numPendingDeletes()); + assertTrue(deletes.writeLiveDocs(dir)); + assertEquals(1, dir.listAll().length); + Bits liveDocs = Codec.getDefault().liveDocsFormat().readLiveDocs(dir, commitInfo, IOContext.DEFAULT); + assertFalse(liveDocs.get(5)); + if (secondDocDeletes) { + assertFalse(liveDocs.get(2)); + } else { + assertTrue(liveDocs.get(2)); + } + assertTrue(liveDocs.get(0)); + assertTrue(liveDocs.get(1)); + assertTrue(liveDocs.get(3)); + assertTrue(liveDocs.get(4)); + + assertEquals(0, deletes.numPendingDeletes()); + assertEquals(secondDocDeletes ? 2 : 1, commitInfo.getDelCount()); + assertEquals(1, commitInfo.getDelGen()); + + deletes.delete(0); + assertTrue(deletes.writeLiveDocs(dir)); + assertEquals(2, dir.listAll().length); + liveDocs = Codec.getDefault().liveDocsFormat().readLiveDocs(dir, commitInfo, IOContext.DEFAULT); + assertFalse(liveDocs.get(5)); + if (secondDocDeletes) { + assertFalse(liveDocs.get(2)); + } else { + assertTrue(liveDocs.get(2)); + } + assertFalse(liveDocs.get(0)); + assertTrue(liveDocs.get(1)); + assertTrue(liveDocs.get(3)); + assertTrue(liveDocs.get(4)); + + assertEquals(0, deletes.numPendingDeletes()); + assertEquals(secondDocDeletes ? 3 : 2, commitInfo.getDelCount()); + assertEquals(2, commitInfo.getDelGen()); + dir.close(); + } + + public void testIsFullyDeleted() throws IOException { + RAMDirectory dir = new RAMDirectory(); + SegmentInfo si = new SegmentInfo(dir, Version.LATEST, Version.LATEST, "test", 3, false, Codec.getDefault(), + Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), null); + SegmentCommitInfo commitInfo = new SegmentCommitInfo(si, 0, 0, 0, 0); + PendingDeletes deletes = new PendingDeletes(null, commitInfo); + for (int i = 0; i < 3; i++) { + assertTrue(deletes.delete(i)); + if (random().nextBoolean()) { + assertTrue(deletes.writeLiveDocs(dir)); + } + assertEquals(i == 2, deletes.isFullyDeleted()); + } + } +} diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 5854e0ff7fb..e7349cf5588 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -110,6 +110,8 @@ Optimizations * SOLR-12146: LIR should skip deleted replicas (Cao Manh Dat) +* SOLR-12066: Cleanup deleted core when node start (Cao Manh Dat) + Other Changes ---------------------- @@ -133,6 +135,11 @@ Other Changes * SOLR-12152: Split up TriggerIntegrationTest into multiple tests to isolate and increase reliability. (shalin) +* SOLR-12133: Fix race conditions that caused TriggerIntegrationTest.testEventQueue to fail. (Mark Miller, shalin) + +* SOLR-12169: Fix ComputePlanActionTest.testSelectedCollections fails on jenkins by aggressively cleaning up + trigger state left by other test methods in the test setup. (shalin) + ================== 7.3.0 ================== Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release. diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index c0ddd260adf..872a8b9d7e1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1661,6 +1661,9 @@ public class ZkController { Thread.currentThread().interrupt(); log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); + } catch (NotInClusterStateException e) { + // make the stack trace less verbose + throw e; } catch (Exception e) { log.error("", e); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e); @@ -1688,7 +1691,7 @@ public class ZkController { return true; } - private void checkStateInZk(CoreDescriptor cd) throws InterruptedException { + private void checkStateInZk(CoreDescriptor cd) throws InterruptedException, NotInClusterStateException { if (!Overseer.isLegacy(zkStateReader)) { CloudDescriptor cloudDesc = cd.getCloudDescriptor(); String nodeName = cloudDesc.getCoreNodeName(); @@ -1722,7 +1725,8 @@ public class ZkController { } Replica replica = slice.getReplica(coreNodeName); if (replica == null) { - errorMessage.set("coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId()); + errorMessage.set("coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() + + ", ignore the exception if the replica was deleted"); return false; } return true; @@ -1730,8 +1734,9 @@ public class ZkController { } catch (TimeoutException e) { String error = errorMessage.get(); if (error == null) - error = "Replica " + coreNodeName + " is not present in cluster state"; - throw new SolrException(ErrorCode.SERVER_ERROR, error + ": " + collectionState.get()); + error = "coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() + + ", ignore the exception if the replica was deleted"; + throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error); } } } @@ -2711,6 +2716,15 @@ public class ZkController { } } + /** + * Thrown during pre register process if the replica is not present in clusterstate + */ + public static class NotInClusterStateException extends SolrException { + public NotInClusterStateException(ErrorCode code, String msg) { + super(code, msg); + } + } + public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) { DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(dcore.getCollectionName()); if (collection != null) { diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java index ad89f2a6798..6190a499175 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java @@ -53,7 +53,7 @@ public class NodeAddedTrigger extends TriggerBase { SolrCloudManager cloudManager) { super(TriggerEventType.NODEADDED, name, properties, loader, cloudManager); lastLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes()); - log.debug("Initial livenodes: {}", lastLiveNodes); + log.debug("NodeAddedTrigger {} - Initial livenodes: {}", name, lastLiveNodes); log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties); } diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java index 1e7aec590a5..2981a485b67 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java @@ -52,7 +52,7 @@ public class NodeLostTrigger extends TriggerBase { SolrCloudManager dataProvider) { super(TriggerEventType.NODELOST, name, properties, loader, dataProvider); lastLiveNodes = new HashSet<>(dataProvider.getClusterStateProvider().getLiveNodes()); - log.debug("Initial livenodes: {}", lastLiveNodes); + log.debug("NodeLostTrigger {} - Initial livenodes: {}", name, lastLiveNodes); } @Override diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index b667bc06209..74b718cdca5 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -677,7 +677,7 @@ public class CoreContainer { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { - log.error("Error waiting for SolrCore to be created", e); + log.error("Error waiting for SolrCore to be loaded on startup", e.getCause()); } } } finally { @@ -1063,6 +1063,11 @@ public class CoreContainer { return core; } catch (Exception e) { coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e)); + if (e instanceof ZkController.NotInClusterStateException && !newCollection) { + // this mostly happen when the core is deleted when this node is down + unload(dcore.getName(), true, true, true); + throw e; + } solrCores.removeCoreDescriptor(dcore); final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e); if(core != null && !core.isClosed()) diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java index 0f4ff48d61d..33a1a55955d 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java @@ -17,6 +17,8 @@ package org.apache.solr.cloud; import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.util.concurrent.TimeUnit; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.embedded.JettySolrRunner; @@ -26,7 +28,11 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.core.CoreContainer; +import org.apache.solr.common.util.TimeSource; +import org.apache.solr.core.CoreDescriptor; +import org.apache.solr.core.SolrCore; +import org.apache.solr.util.FileUtils; +import org.apache.solr.util.TimeOut; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -64,6 +70,10 @@ public class DeleteInactiveReplicaTest extends SolrCloudTestCase { Slice shard = getRandomShard(collectionState); Replica replica = getRandomReplica(shard); JettySolrRunner jetty = cluster.getReplicaJetty(replica); + CoreDescriptor replicaCd; + try (SolrCore core = jetty.getCoreContainer().getCore(replica.getCoreName())) { + replicaCd = core.getCoreDescriptor(); + } cluster.stopJettySolrRunner(jetty); waitForState("Expected replica " + replica.getName() + " on down node to be removed from cluster state", collectionName, (n, c) -> { @@ -80,13 +90,9 @@ public class DeleteInactiveReplicaTest extends SolrCloudTestCase { cluster.startJettySolrRunner(jetty); log.info("restarted jetty"); - - CoreContainer cc = jetty.getCoreContainer(); - CoreContainer.CoreLoadFailure loadFailure = cc.getCoreInitFailures().get(replica.getCoreName()); - assertNotNull("Deleted core was still loaded!", loadFailure); - assertNotNull(loadFailure.exception.getCause()); - assertTrue("Unexpected load failure message: " + loadFailure.exception.getCause().getMessage(), - loadFailure.exception.getCause().getMessage().contains("does not exist in shard")); + TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME); + timeOut.waitFor("Expected data dir and instance dir of " + replica.getName() + " is deleted", () + -> !Files.exists(replicaCd.getInstanceDir()) && !FileUtils.fileExists(replicaCd.getDataDir())); // Check that we can't create a core with no coreNodeName try (SolrClient queryClient = getHttpSolrClient(jetty.getBaseUrl().toString())) { diff --git a/solr/core/src/test/org/apache/solr/cloud/LIROnShardRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/LIROnShardRestartTest.java index c83739efde3..31947be67be 100644 --- a/solr/core/src/test/org/apache/solr/cloud/LIROnShardRestartTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/LIROnShardRestartTest.java @@ -17,6 +17,7 @@ package org.apache.solr.cloud; +import java.lang.invoke.MethodHandles; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -45,12 +46,16 @@ import org.apache.solr.util.TimeOut; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @LuceneTestCase.Nightly @LuceneTestCase.Slow @Deprecated public class LIROnShardRestartTest extends SolrCloudTestCase { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + @BeforeClass public static void setupCluster() throws Exception { System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory"); @@ -132,6 +137,9 @@ public class LIROnShardRestartTest extends SolrCloudTestCase { // now expire each node for (Replica replica : docCollection.getReplicas()) { try { + // todo remove the condition for skipping leader after SOLR-12166 is fixed + if (newLeader.getName().equals(replica.getName())) continue; + cluster.getZkClient().makePath("/collections/" + collection + "/leader_initiated_recovery/shard1/" + replica.getName(), znodeData, true); } catch (KeeperException.NodeExistsException e) { @@ -153,7 +161,14 @@ public class LIROnShardRestartTest extends SolrCloudTestCase { if (electionNodes.isEmpty()) break; } assertFalse("Timeout waiting for replicas rejoin election", timeOut.hasTimedOut()); - waitForState("Timeout waiting for active replicas", collection, clusterShape(1, 3)); + try { + waitForState("Timeout waiting for active replicas", collection, clusterShape(1, 3)); + } catch (Throwable th) { + String electionPath = "/collections/allReplicasInLIR/leader_elect/shard1/election/"; + List children = zkClient().getChildren(electionPath, null, true); + LOG.info("Election queue {}", children); + throw th; + } assertEquals(103, cluster.getSolrClient().query(collection, new SolrQuery("*:*")).getResults().getNumFound()); diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java index 943e8fcdffa..67b5fa02c93 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java @@ -71,6 +71,7 @@ public class ComputePlanActionTest extends SolrCloudTestCase { private static CountDownLatch triggerFiredLatch = new CountDownLatch(1); private static final AtomicReference actionContextPropsRef = new AtomicReference<>(); private static final AtomicReference eventRef = new AtomicReference<>(); + private static SolrCloudManager cloudManager; @BeforeClass public static void setupCluster() throws Exception { @@ -83,10 +84,6 @@ public class ComputePlanActionTest extends SolrCloudTestCase { public void setUp() throws Exception { super.setUp(); - fired.set(false); - triggerFiredLatch = new CountDownLatch(1); - actionContextPropsRef.set(null); - // remove everything from autoscaling.json in ZK zkClient().setData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, "{}".getBytes(UTF_8), true); @@ -129,6 +126,20 @@ public class ComputePlanActionTest extends SolrCloudTestCase { req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPreferencesCommand); response = solrClient.request(req); assertEquals(response.get("result").toString(), "success"); + + cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager(); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH); + deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH); + + fired.set(false); + triggerFiredLatch = new CountDownLatch(1); + actionContextPropsRef.set(null); + } + + private void deleteChildrenRecursively(String path) throws Exception { + cloudManager.getDistribStateManager().removeRecursively(path, true, false); } @After @@ -365,7 +376,7 @@ public class ComputePlanActionTest extends SolrCloudTestCase { } public static class AssertingTriggerAction implements TriggerAction { - static String expectedNode; + static volatile String expectedNode; @Override public String getName() { @@ -397,8 +408,8 @@ public class ComputePlanActionTest extends SolrCloudTestCase { } @Test - @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") public void testSelectedCollections() throws Exception { + log.info("Found number of jetties: {}", cluster.getJettySolrRunners().size()); AssertingTriggerAction.expectedNode = null; // start 3 more nodes @@ -467,7 +478,7 @@ public class ComputePlanActionTest extends SolrCloudTestCase { Map context = actionContextPropsRef.get(); assertNotNull(context); List operations = (List) context.get("operations"); - assertNotNull("The operations computed by ComputePlanAction should not be null" + getNodeStateProviderState() + context, operations); + assertNotNull("The operations computed by ComputePlanAction should not be null. " + getNodeStateProviderState() + context, operations); assertEquals("ComputePlanAction should have computed exactly 2 operations", 2, operations.size()); SolrRequest request = operations.get(0); SolrParams params = request.getParams();