Store Pending Deletions Fix (#40345)

FilterDirectory.getPendingDeletions does not delegate, fixed
temporarily by overriding in StoreDirectory.

This in turn caused duplicate file name use after a trimUnsafeCommits
had been done, since a new IndexWriter would not consider the pending
deletes in IndexFileDeleter. This should only happen on windows (AFAIK).

Reenabled doing index updates for all tests using
IndexShardTests.indexOnReplicaWithGaps (which could fail due to above
when using mocked WindowsFS).

Added getPendingDeletions delegation to all elasticsearch
FilterDirectory subclasses that were not trivial test-only overrides to
minimize the risk of hitting this issue in another case.
This commit is contained in:
Henning Andersen 2019-03-26 15:11:51 +01:00 committed by Henning Andersen
parent b5a8de9a7f
commit bf444b9f02
9 changed files with 81 additions and 18 deletions

View File

@ -19,17 +19,19 @@
package org.elasticsearch.index.store;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Set;
/**
* This class is used to wrap an existing {@link org.apache.lucene.store.FSDirectory} so that
* the new shard segment files will be opened for Read and Write access.
@ -78,4 +80,10 @@ public final class SmbDirectoryWrapper extends FilterDirectory {
CHUNK_SIZE);
}
}
// temporary override until LUCENE-8735 is integrated
@Override
public Set<String> getPendingDeletions() throws IOException {
return in.getPendingDeletions();
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.index.store.Store;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
final class LocalShardSnapshot implements Closeable {
@ -116,6 +117,12 @@ final class LocalShardSnapshot implements Closeable {
public void close() throws IOException {
throw new UnsupportedOperationException("nobody should close this directory wrapper");
}
// temporary override until LUCENE-8735 is integrated
@Override
public Set<String> getPendingDeletions() throws IOException {
return in.getPendingDeletions();
}
};
}

View File

@ -259,6 +259,12 @@ final class StoreRecovery {
assert index.getFileDetails(dest).recovered() == l : index.getFileDetails(dest).toString();
}
}
// temporary override until LUCENE-8735 is integrated
@Override
public Set<String> getPendingDeletions() throws IOException {
return in.getPendingDeletions();
}
}
/**

View File

@ -32,6 +32,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.AccessDeniedException;
import java.nio.file.NoSuchFileException;
import java.util.Set;
final class ByteSizeCachingDirectory extends FilterDirectory {
@ -180,4 +181,9 @@ final class ByteSizeCachingDirectory extends FilterDirectory {
}
}
// temporary override until LUCENE-8735 is integrated
@Override
public Set<String> getPendingDeletions() throws IOException {
return in.getPendingDeletions();
}
}

View File

@ -760,6 +760,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public String toString() {
return "store(" + in.toString() + ")";
}
@Override
public Set<String> getPendingDeletions() throws IOException {
// FilterDirectory.getPendingDeletions does not delegate, working around it here.
// to be removed once fixed in FilterDirectory.
return unwrap(this).getPendingDeletions();
}
}
/**

View File

@ -546,7 +546,7 @@ public class IndexShardTests extends IndexShardTestCase {
// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), false);
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
final int maxSeqNo = result.maxSeqNo;
@ -1093,7 +1093,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), true);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
@ -1159,9 +1159,7 @@ public class IndexShardTests extends IndexShardTestCase {
// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
// todo: all tests should run with allowUpdates=true, but this specific test sometimes fails during lucene commit when updates are
// added (seed = F37E9647ABE5928)
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), false);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED));
final long globalCheckpointOnReplica = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint());
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test");
@ -1204,7 +1202,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(newMaxSeqNoOfUpdates));
// ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()), false);
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));
closeShard(indexShard, false);
}
@ -1462,6 +1460,12 @@ public class IndexShardTests extends IndexShardTestCase {
return super.listAll();
}
}
// temporary override until LUCENE-8735 is integrated
@Override
public Set<String> getPendingDeletions() throws IOException {
return in.getPendingDeletions();
}
};
try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) {
@ -3166,15 +3170,13 @@ public class IndexShardTests extends IndexShardTestCase {
* @param indexShard the shard
* @param operations the number of operations
* @param offset the starting sequence number
* @param allowUpdates whether updates should be added.
* @return a pair of the maximum sequence number and whether or not a gap was introduced
* @throws IOException if an I/O exception occurs while indexing on the shard
*/
private Result indexOnReplicaWithGaps(
final IndexShard indexShard,
final int operations,
final int offset,
boolean allowUpdates) throws IOException {
final int offset) throws IOException {
int localCheckpoint = offset;
int max = offset;
boolean gap = false;
@ -3182,7 +3184,7 @@ public class IndexShardTests extends IndexShardTestCase {
for (int i = offset + 1; i < operations; i++) {
if (!rarely() || i == operations - 1) { // last operation can't be a gap as it's not a gap anymore
final String id = ids.isEmpty() || randomBoolean() ? Integer.toString(i) : randomFrom(ids);
if (allowUpdates && ids.add(id) == false) { // this is an update
if (ids.add(id) == false) { // this is an update
indexShard.advanceMaxSeqNoOfUpdatesOrDeletes(i);
}
SourceToParse sourceToParse = new SourceToParse(indexShard.shardId().getIndexName(), "_doc", id,
@ -3639,7 +3641,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testResetEngine() throws Exception {
IndexShard shard = newStartedShard(false);
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()), false);
indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint()));
final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint());
shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
Set<String> docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream()
@ -3679,7 +3681,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception {
final IndexShard replica = newStartedShard(false);
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()), false);
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));
final int nbTermUpdates = randomIntBetween(1, 5);

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Set;
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
public class ByteSizeCachingDirectoryTests extends ESTestCase {
@ -45,6 +46,12 @@ public class ByteSizeCachingDirectoryTests extends ESTestCase {
numFileLengthCalls++;
return super.fileLength(name);
}
// temporary override until LUCENE-8735 is integrated
@Override
public Set<String> getPendingDeletions() throws IOException {
return in.getPendingDeletions();
}
}
public void testBasics() throws IOException {

View File

@ -43,10 +43,12 @@ import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.ByteBufferIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.TestUtil;
@ -1123,4 +1125,16 @@ public class StoreTests extends ESTestCase {
}
}
}
public void testGetPendingFiles() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
final String testfile = "testfile";
try (Store store = new Store(shardId, INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId))) {
store.directory().createOutput(testfile, IOContext.DEFAULT).close();
try (IndexInput input = store.directory().openInput(testfile, IOContext.DEFAULT)) {
store.directory().deleteFile(testfile);
assertEquals(FilterDirectory.unwrap(store.directory()).getPendingDeletions(), store.directory().getPendingDeletions());
}
}
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.test.store;
import com.carrotsearch.randomizedtesting.SeedUtils;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.store.BaseDirectoryWrapper;
@ -55,6 +54,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Random;
import java.util.Set;
public class MockFSDirectoryService extends FsDirectoryService {
@ -179,6 +179,12 @@ public class MockFSDirectoryService extends FsDirectoryService {
super.crash();
}
}
// temporary override until LUCENE-8735 is integrated
@Override
public Set<String> getPendingDeletions() throws IOException {
return in.getPendingDeletions();
}
}
static final class CloseableDirectory implements Closeable {