Only link fd* files during source-only snapshot (#53463)
Source-only snapshots currently create a second full source-only copy of the shard on disk to support incrementality during upload. Given that stored fields are occupying a substantial part of a shard's storage, this means that clusters with source-only snapshots can require up to 50% more local storage. Ideally we would only generate source-only parts of the shard for the things that need to be uploaded (i.e. do incrementality checks on original file instead of trimmed-down source-only versions), but that requires much bigger changes to the snapshot infrastructure. This here is an attempt to dramatically cut down on the storage used by the source-only copy of the shard by soft-linking the stored-fields files (fd*) instead of copying them. Relates #50231
This commit is contained in:
parent
534e734ad1
commit
060c72c799
|
@ -30,25 +30,34 @@ import org.apache.lucene.search.ScoreMode;
|
||||||
import org.apache.lucene.search.Scorer;
|
import org.apache.lucene.search.Scorer;
|
||||||
import org.apache.lucene.search.Weight;
|
import org.apache.lucene.search.Weight;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
|
import org.apache.lucene.store.FilterDirectory;
|
||||||
import org.apache.lucene.store.IOContext;
|
import org.apache.lucene.store.IOContext;
|
||||||
|
import org.apache.lucene.store.IndexInput;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
import org.apache.lucene.store.Lock;
|
import org.apache.lucene.store.Lock;
|
||||||
import org.apache.lucene.store.TrackingDirectoryWrapper;
|
import org.apache.lucene.store.TrackingDirectoryWrapper;
|
||||||
import org.apache.lucene.util.Bits;
|
import org.apache.lucene.util.Bits;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.apache.lucene.util.FixedBitSet;
|
import org.apache.lucene.util.FixedBitSet;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.core.internal.io.IOUtils;
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
|
import java.nio.file.NoSuchFileException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.FIELDS_EXTENSION;
|
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.FIELDS_EXTENSION;
|
||||||
|
@ -60,23 +69,23 @@ public class SourceOnlySnapshot {
|
||||||
|
|
||||||
private static final String FIELDS_INDEX_EXTENSION = INDEX_EXTENSION_PREFIX + FIELDS_INDEX_EXTENSION_SUFFIX;
|
private static final String FIELDS_INDEX_EXTENSION = INDEX_EXTENSION_PREFIX + FIELDS_INDEX_EXTENSION_SUFFIX;
|
||||||
private static final String FIELDS_META_EXTENSION = INDEX_EXTENSION_PREFIX + FIELDS_META_EXTENSION_SUFFIX;
|
private static final String FIELDS_META_EXTENSION = INDEX_EXTENSION_PREFIX + FIELDS_META_EXTENSION_SUFFIX;
|
||||||
private final Directory targetDirectory;
|
private final LinkedFilesDirectory targetDirectory;
|
||||||
private final Supplier<Query> deleteByQuerySupplier;
|
private final Supplier<Query> deleteByQuerySupplier;
|
||||||
|
|
||||||
public SourceOnlySnapshot(Directory targetDirectory, Supplier<Query> deleteByQuerySupplier) {
|
public SourceOnlySnapshot(LinkedFilesDirectory targetDirectory, Supplier<Query> deleteByQuerySupplier) {
|
||||||
this.targetDirectory = targetDirectory;
|
this.targetDirectory = targetDirectory;
|
||||||
this.deleteByQuerySupplier = deleteByQuerySupplier;
|
this.deleteByQuerySupplier = deleteByQuerySupplier;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SourceOnlySnapshot(Directory targetDirectory) {
|
public SourceOnlySnapshot(LinkedFilesDirectory targetDirectory) {
|
||||||
this(targetDirectory, null);
|
this(targetDirectory, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized List<String> syncSnapshot(IndexCommit commit) throws IOException {
|
public synchronized List<String> syncSnapshot(IndexCommit commit) throws IOException {
|
||||||
long generation;
|
long generation;
|
||||||
Map<BytesRef, SegmentCommitInfo> existingSegments = new HashMap<>();
|
Map<BytesRef, SegmentCommitInfo> existingSegments = new HashMap<>();
|
||||||
if (Lucene.indexExists(targetDirectory)) {
|
if (Lucene.indexExists(targetDirectory.getWrapped())) {
|
||||||
SegmentInfos existingsSegmentInfos = Lucene.readSegmentInfos(targetDirectory);
|
SegmentInfos existingsSegmentInfos = Lucene.readSegmentInfos(targetDirectory.getWrapped());
|
||||||
for (SegmentCommitInfo info : existingsSegmentInfos) {
|
for (SegmentCommitInfo info : existingsSegmentInfos) {
|
||||||
existingSegments.put(new BytesRef(info.info.getId()), info);
|
existingSegments.put(new BytesRef(info.info.getId()), info);
|
||||||
}
|
}
|
||||||
|
@ -191,63 +200,78 @@ public class SourceOnlySnapshot {
|
||||||
|
|
||||||
private SegmentCommitInfo syncSegment(SegmentCommitInfo segmentCommitInfo, LiveDocs liveDocs, FieldInfos fieldInfos,
|
private SegmentCommitInfo syncSegment(SegmentCommitInfo segmentCommitInfo, LiveDocs liveDocs, FieldInfos fieldInfos,
|
||||||
Map<BytesRef, SegmentCommitInfo> existingSegments, List<String> createdFiles) throws IOException {
|
Map<BytesRef, SegmentCommitInfo> existingSegments, List<String> createdFiles) throws IOException {
|
||||||
SegmentInfo si = segmentCommitInfo.info;
|
Directory toClose = null;
|
||||||
Codec codec = si.getCodec();
|
try {
|
||||||
final String segmentSuffix = "";
|
SegmentInfo si = segmentCommitInfo.info;
|
||||||
SegmentCommitInfo newInfo;
|
Codec codec = si.getCodec();
|
||||||
final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(targetDirectory);
|
Directory sourceDir = si.dir;
|
||||||
BytesRef segmentId = new BytesRef(si.getId());
|
|
||||||
boolean exists = existingSegments.containsKey(segmentId);
|
|
||||||
if (exists == false) {
|
|
||||||
SegmentInfo newSegmentInfo = new SegmentInfo(si.dir, si.getVersion(), si.getMinVersion(), si.name, si.maxDoc(), false,
|
|
||||||
si.getCodec(), si.getDiagnostics(), si.getId(), si.getAttributes(), null);
|
|
||||||
// we drop the sort on purpose since the field we sorted on doesn't exist in the target index anymore.
|
|
||||||
newInfo = new SegmentCommitInfo(newSegmentInfo, 0, 0, -1, -1, -1);
|
|
||||||
List<FieldInfo> fieldInfoCopy = new ArrayList<>(fieldInfos.size());
|
|
||||||
for (FieldInfo fieldInfo : fieldInfos) {
|
|
||||||
fieldInfoCopy.add(new FieldInfo(fieldInfo.name, fieldInfo.number,
|
|
||||||
false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, fieldInfo.attributes(), 0, 0, 0,
|
|
||||||
fieldInfo.isSoftDeletesField()));
|
|
||||||
}
|
|
||||||
FieldInfos newFieldInfos = new FieldInfos(fieldInfoCopy.toArray(new FieldInfo[0]));
|
|
||||||
codec.fieldInfosFormat().write(trackingDir, newSegmentInfo, segmentSuffix, newFieldInfos, IOContext.DEFAULT);
|
|
||||||
newInfo.setFieldInfosFiles(trackingDir.getCreatedFiles());
|
|
||||||
String idxFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_INDEX_EXTENSION);
|
|
||||||
String dataFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_EXTENSION);
|
|
||||||
String metaFile = IndexFileNames.segmentFileName(newSegmentInfo.name, segmentSuffix, FIELDS_META_EXTENSION);
|
|
||||||
Directory sourceDir = newSegmentInfo.dir;
|
|
||||||
if (si.getUseCompoundFile()) {
|
if (si.getUseCompoundFile()) {
|
||||||
sourceDir = codec.compoundFormat().getCompoundReader(sourceDir, si, IOContext.DEFAULT);
|
sourceDir = new LinkedFilesDirectory.CloseMePleaseWrapper(
|
||||||
|
codec.compoundFormat().getCompoundReader(sourceDir, si, IOContext.DEFAULT));
|
||||||
|
toClose = sourceDir;
|
||||||
}
|
}
|
||||||
|
final String segmentSuffix = "";
|
||||||
|
SegmentCommitInfo newInfo;
|
||||||
|
final TrackingDirectoryWrapper trackingDir = new TrackingDirectoryWrapper(targetDirectory);
|
||||||
|
BytesRef segmentId = new BytesRef(si.getId());
|
||||||
|
boolean exists = existingSegments.containsKey(segmentId);
|
||||||
|
if (exists == false) {
|
||||||
|
SegmentInfo newSegmentInfo = new SegmentInfo(targetDirectory, si.getVersion(), si.getMinVersion(), si.name, si.maxDoc(),
|
||||||
|
false, si.getCodec(), si.getDiagnostics(), si.getId(), si.getAttributes(), null);
|
||||||
|
// we drop the sort on purpose since the field we sorted on doesn't exist in the target index anymore.
|
||||||
|
newInfo = new SegmentCommitInfo(newSegmentInfo, 0, 0, -1, -1, -1);
|
||||||
|
List<FieldInfo> fieldInfoCopy = new ArrayList<>(fieldInfos.size());
|
||||||
|
for (FieldInfo fieldInfo : fieldInfos) {
|
||||||
|
fieldInfoCopy.add(new FieldInfo(fieldInfo.name, fieldInfo.number,
|
||||||
|
false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, fieldInfo.attributes(), 0, 0, 0,
|
||||||
|
fieldInfo.isSoftDeletesField()));
|
||||||
|
}
|
||||||
|
FieldInfos newFieldInfos = new FieldInfos(fieldInfoCopy.toArray(new FieldInfo[0]));
|
||||||
|
codec.fieldInfosFormat().write(trackingDir, newSegmentInfo, segmentSuffix, newFieldInfos, IOContext.DEFAULT);
|
||||||
|
newInfo.setFieldInfosFiles(trackingDir.getCreatedFiles());
|
||||||
|
} else {
|
||||||
|
newInfo = existingSegments.get(segmentId);
|
||||||
|
assert newInfo.info.getUseCompoundFile() == false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// link files for stored fields to target directory
|
||||||
|
final String idxFile = IndexFileNames.segmentFileName(newInfo.info.name, segmentSuffix, FIELDS_INDEX_EXTENSION);
|
||||||
|
final String dataFile = IndexFileNames.segmentFileName(newInfo.info.name, segmentSuffix, FIELDS_EXTENSION);
|
||||||
|
final String metaFile = IndexFileNames.segmentFileName(newInfo.info.name, segmentSuffix, FIELDS_META_EXTENSION);
|
||||||
trackingDir.copyFrom(sourceDir, idxFile, idxFile, IOContext.DEFAULT);
|
trackingDir.copyFrom(sourceDir, idxFile, idxFile, IOContext.DEFAULT);
|
||||||
|
assert targetDirectory.linkedFiles.containsKey(idxFile);
|
||||||
|
assert trackingDir.getCreatedFiles().contains(idxFile);
|
||||||
trackingDir.copyFrom(sourceDir, dataFile, dataFile, IOContext.DEFAULT);
|
trackingDir.copyFrom(sourceDir, dataFile, dataFile, IOContext.DEFAULT);
|
||||||
if (Arrays.asList(sourceDir.listAll()).contains(metaFile)) { // only exists for Lucene 8.5+ indices
|
assert targetDirectory.linkedFiles.containsKey(dataFile);
|
||||||
trackingDir.copyFrom(sourceDir, metaFile, metaFile, IOContext.DEFAULT);
|
assert trackingDir.getCreatedFiles().contains(dataFile);
|
||||||
}
|
if (Arrays.asList(sourceDir.listAll()).contains(metaFile)) { // only exists for Lucene 8.5+ indices
|
||||||
if (sourceDir != newSegmentInfo.dir) {
|
trackingDir.copyFrom(sourceDir, metaFile, metaFile, IOContext.DEFAULT);
|
||||||
sourceDir.close();
|
assert targetDirectory.linkedFiles.containsKey(metaFile);
|
||||||
|
assert trackingDir.getCreatedFiles().contains(metaFile);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
newInfo = existingSegments.get(segmentId);
|
if (liveDocs.bits != null && liveDocs.numDeletes != 0 && liveDocs.numDeletes != newInfo.getDelCount()) {
|
||||||
assert newInfo.info.getUseCompoundFile() == false;
|
assert newInfo.getDelCount() == 0 || assertLiveDocs(liveDocs.bits, liveDocs.numDeletes);
|
||||||
}
|
codec.liveDocsFormat().writeLiveDocs(liveDocs.bits, trackingDir, newInfo, liveDocs.numDeletes - newInfo.getDelCount(),
|
||||||
if (liveDocs.bits != null && liveDocs.numDeletes != 0 && liveDocs.numDeletes != newInfo.getDelCount()) {
|
IOContext.DEFAULT);
|
||||||
if (newInfo.getDelCount() != 0) {
|
SegmentCommitInfo info = new SegmentCommitInfo(newInfo.info, liveDocs.numDeletes, 0, newInfo.getNextDelGen(), -1, -1);
|
||||||
assert assertLiveDocs(liveDocs.bits, liveDocs.numDeletes);
|
info.setFieldInfosFiles(newInfo.getFieldInfosFiles());
|
||||||
|
info.info.setFiles(trackingDir.getCreatedFiles());
|
||||||
|
newInfo = info;
|
||||||
}
|
}
|
||||||
codec.liveDocsFormat().writeLiveDocs(liveDocs.bits, trackingDir, newInfo, liveDocs.numDeletes - newInfo.getDelCount(),
|
if (exists == false) {
|
||||||
IOContext.DEFAULT);
|
newInfo.info.setFiles(trackingDir.getCreatedFiles());
|
||||||
SegmentCommitInfo info = new SegmentCommitInfo(newInfo.info, liveDocs.numDeletes, 0, newInfo.getNextDelGen(), -1, -1);
|
codec.segmentInfoFormat().write(trackingDir, newInfo.info, IOContext.DEFAULT);
|
||||||
info.setFieldInfosFiles(newInfo.getFieldInfosFiles());
|
}
|
||||||
info.info.setFiles(trackingDir.getCreatedFiles());
|
final Set<String> createdFilesForThisSegment = trackingDir.getCreatedFiles();
|
||||||
newInfo = info;
|
createdFilesForThisSegment.remove(idxFile);
|
||||||
|
createdFilesForThisSegment.remove(dataFile);
|
||||||
|
createdFilesForThisSegment.remove(metaFile);
|
||||||
|
createdFiles.addAll(createdFilesForThisSegment);
|
||||||
|
return newInfo;
|
||||||
|
} finally {
|
||||||
|
IOUtils.close(toClose);
|
||||||
}
|
}
|
||||||
if (exists == false) {
|
|
||||||
newInfo.info.setFiles(trackingDir.getCreatedFiles());
|
|
||||||
codec.segmentInfoFormat().write(trackingDir, newInfo.info, IOContext.DEFAULT);
|
|
||||||
}
|
|
||||||
createdFiles.addAll(trackingDir.getCreatedFiles());
|
|
||||||
return newInfo;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean assertLiveDocs(Bits liveDocs, int deletes) {
|
private boolean assertLiveDocs(Bits liveDocs, int deletes) {
|
||||||
|
@ -270,4 +294,165 @@ public class SourceOnlySnapshot {
|
||||||
this.bits = bits;
|
this.bits = bits;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class LinkedFilesDirectory extends Directory {
|
||||||
|
|
||||||
|
private final Directory wrapped;
|
||||||
|
private final Map<String, Directory> linkedFiles = new HashMap<>();
|
||||||
|
|
||||||
|
public LinkedFilesDirectory(Directory wrapped) {
|
||||||
|
this.wrapped = wrapped;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Directory getWrapped() {
|
||||||
|
return wrapped;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] listAll() throws IOException {
|
||||||
|
Set<String> files = new HashSet<>();
|
||||||
|
Collections.addAll(files, wrapped.listAll());
|
||||||
|
files.addAll(linkedFiles.keySet());
|
||||||
|
String[] result = files.toArray(Strings.EMPTY_ARRAY);
|
||||||
|
Arrays.sort(result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteFile(String name) throws IOException {
|
||||||
|
final Directory directory = linkedFiles.remove(name);
|
||||||
|
if (directory == null) {
|
||||||
|
wrapped.deleteFile(name);
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
wrapped.deleteFile(name);
|
||||||
|
} catch (NoSuchFileException | FileNotFoundException e) {
|
||||||
|
// ignore
|
||||||
|
} finally {
|
||||||
|
directory.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long fileLength(String name) throws IOException {
|
||||||
|
final Directory linkedDir = linkedFiles.get(name);
|
||||||
|
if (linkedDir != null) {
|
||||||
|
return linkedDir.fileLength(name);
|
||||||
|
} else {
|
||||||
|
return wrapped.fileLength(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||||
|
if (linkedFiles.containsKey(name)) {
|
||||||
|
throw new IllegalArgumentException("file cannot be created as linked file with name " + name + " already exists");
|
||||||
|
} else {
|
||||||
|
return wrapped.createOutput(name, context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
|
||||||
|
return wrapped.createTempOutput(prefix, suffix, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sync(Collection<String> names) throws IOException {
|
||||||
|
final List<String> primaryNames = new ArrayList<>();
|
||||||
|
|
||||||
|
for (String name : names) {
|
||||||
|
if (linkedFiles.containsKey(name) == false) {
|
||||||
|
primaryNames.add(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (primaryNames.isEmpty() == false) {
|
||||||
|
wrapped.sync(primaryNames);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void syncMetaData() throws IOException {
|
||||||
|
wrapped.syncMetaData();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void rename(String source, String dest) throws IOException {
|
||||||
|
if (linkedFiles.containsKey(source) || linkedFiles.containsKey(dest)) {
|
||||||
|
throw new IllegalArgumentException("file cannot be renamed as linked file with name " + source + " or " + dest +
|
||||||
|
" already exists");
|
||||||
|
} else {
|
||||||
|
wrapped.rename(source, dest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexInput openInput(String name, IOContext context) throws IOException {
|
||||||
|
final Directory linkedDir = linkedFiles.get(name);
|
||||||
|
if (linkedDir != null) {
|
||||||
|
return linkedDir.openInput(name, context);
|
||||||
|
} else {
|
||||||
|
return wrapped.openInput(name, context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Lock obtainLock(String name) throws IOException {
|
||||||
|
return wrapped.obtainLock(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
IOUtils.close(() -> IOUtils.close(linkedFiles.values()), linkedFiles::clear, wrapped);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
|
||||||
|
if (src.equals(dest) == false) {
|
||||||
|
throw new IllegalArgumentException();
|
||||||
|
} else {
|
||||||
|
final Directory previous;
|
||||||
|
if (from instanceof CloseMePleaseWrapper) {
|
||||||
|
((CloseMePleaseWrapper) from).incRef();
|
||||||
|
previous = linkedFiles.put(src, from);
|
||||||
|
} else {
|
||||||
|
previous = linkedFiles.put(src, new FilterDirectory(from) {
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
IOUtils.close(previous);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class CloseMePleaseWrapper extends FilterDirectory {
|
||||||
|
|
||||||
|
private final AtomicInteger refCount = new AtomicInteger(1);
|
||||||
|
|
||||||
|
CloseMePleaseWrapper(Directory in) {
|
||||||
|
super(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incRef() {
|
||||||
|
int ref = refCount.incrementAndGet();
|
||||||
|
assert ref > 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (refCount.decrementAndGet() == 0) {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> getPendingDeletions() throws IOException {
|
||||||
|
return wrapped.getPendingDeletions();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -156,9 +156,10 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
|
||||||
Path snapPath = dataPath.resolve(SNAPSHOT_DIR_NAME);
|
Path snapPath = dataPath.resolve(SNAPSHOT_DIR_NAME);
|
||||||
final List<Closeable> toClose = new ArrayList<>(3);
|
final List<Closeable> toClose = new ArrayList<>(3);
|
||||||
try {
|
try {
|
||||||
FSDirectory directory = new SimpleFSDirectory(snapPath);
|
SourceOnlySnapshot.LinkedFilesDirectory overlayDir = new SourceOnlySnapshot.LinkedFilesDirectory(
|
||||||
toClose.add(directory);
|
new SimpleFSDirectory(snapPath));
|
||||||
Store tempStore = new Store(store.shardId(), store.indexSettings(), directory, new ShardLock(store.shardId()) {
|
toClose.add(overlayDir);
|
||||||
|
Store tempStore = new Store(store.shardId(), store.indexSettings(), overlayDir, new ShardLock(store.shardId()) {
|
||||||
@Override
|
@Override
|
||||||
protected void closeInternal() {
|
protected void closeInternal() {
|
||||||
// do nothing;
|
// do nothing;
|
||||||
|
@ -166,7 +167,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
|
||||||
}, Store.OnClose.EMPTY);
|
}, Store.OnClose.EMPTY);
|
||||||
Supplier<Query> querySupplier = mapperService.hasNested() ? Queries::newNestedFilter : null;
|
Supplier<Query> querySupplier = mapperService.hasNested() ? Queries::newNestedFilter : null;
|
||||||
// SourceOnlySnapshot will take care of soft- and hard-deletes no special casing needed here
|
// SourceOnlySnapshot will take care of soft- and hard-deletes no special casing needed here
|
||||||
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(tempStore.directory(), querySupplier);
|
SourceOnlySnapshot snapshot = new SourceOnlySnapshot(overlayDir, querySupplier);
|
||||||
snapshot.syncSnapshot(snapshotIndexCommit);
|
snapshot.syncSnapshot(snapshotIndexCommit);
|
||||||
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
|
// we will use the lucene doc ID as the seq ID so we set the local checkpoint to maxDoc with a new index UUID
|
||||||
SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo();
|
SegmentInfos segmentInfos = tempStore.readLastCommittedSegmentsInfo();
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.lucene.search.DocValuesFieldExistsQuery;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
import org.apache.lucene.search.IndexSearcher;
|
||||||
import org.apache.lucene.search.TermQuery;
|
import org.apache.lucene.search.TermQuery;
|
||||||
import org.apache.lucene.search.TopDocs;
|
import org.apache.lucene.search.TopDocs;
|
||||||
|
import org.apache.lucene.store.BaseDirectoryWrapper;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.util.IOSupplier;
|
import org.apache.lucene.util.IOSupplier;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
|
@ -48,7 +49,7 @@ import java.util.List;
|
||||||
|
|
||||||
public class SourceOnlySnapshotTests extends ESTestCase {
|
public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
public void testSourceOnlyRandom() throws IOException {
|
public void testSourceOnlyRandom() throws IOException {
|
||||||
try (Directory dir = newDirectory(); Directory targetDir = newDirectory()) {
|
try (Directory dir = newDirectory(); BaseDirectoryWrapper targetDir = newDirectory()) {
|
||||||
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
|
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
|
||||||
IndexWriterConfig indexWriterConfig = newIndexWriterConfig().setIndexDeletionPolicy
|
IndexWriterConfig indexWriterConfig = newIndexWriterConfig().setIndexDeletionPolicy
|
||||||
(deletionPolicy).setSoftDeletesField(random().nextBoolean() ? null : Lucene.SOFT_DELETES_FIELD);
|
(deletionPolicy).setSoftDeletesField(random().nextBoolean() ? null : Lucene.SOFT_DELETES_FIELD);
|
||||||
|
@ -56,7 +57,9 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
final String softDeletesField = writer.w.getConfig().getSoftDeletesField();
|
final String softDeletesField = writer.w.getConfig().getSoftDeletesField();
|
||||||
// we either use the soft deletes directly or manually delete them to test the additional delete functionality
|
// we either use the soft deletes directly or manually delete them to test the additional delete functionality
|
||||||
boolean modifyDeletedDocs = softDeletesField != null && randomBoolean();
|
boolean modifyDeletedDocs = softDeletesField != null && randomBoolean();
|
||||||
SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(targetDir,
|
targetDir.setCheckIndexOnClose(false);
|
||||||
|
final SourceOnlySnapshot.LinkedFilesDirectory wrappedDir = new SourceOnlySnapshot.LinkedFilesDirectory(targetDir);
|
||||||
|
SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(wrappedDir,
|
||||||
modifyDeletedDocs ? () -> new DocValuesFieldExistsQuery(softDeletesField) : null) {
|
modifyDeletedDocs ? () -> new DocValuesFieldExistsQuery(softDeletesField) : null) {
|
||||||
@Override
|
@Override
|
||||||
DirectoryReader wrapReader(DirectoryReader reader) throws IOException {
|
DirectoryReader wrapReader(DirectoryReader reader) throws IOException {
|
||||||
|
@ -92,7 +95,7 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
IndexCommit snapshot = deletionPolicy.snapshot();
|
IndexCommit snapshot = deletionPolicy.snapshot();
|
||||||
try {
|
try {
|
||||||
snapshoter.syncSnapshot(snapshot);
|
snapshoter.syncSnapshot(snapshot);
|
||||||
try (DirectoryReader snapReader = snapshoter.wrapReader(DirectoryReader.open(targetDir));
|
try (DirectoryReader snapReader = snapshoter.wrapReader(DirectoryReader.open(wrappedDir));
|
||||||
DirectoryReader wrappedReader = snapshoter.wrapReader(DirectoryReader.open(snapshot))) {
|
DirectoryReader wrappedReader = snapshoter.wrapReader(DirectoryReader.open(snapshot))) {
|
||||||
DirectoryReader reader = modifyDeletedDocs
|
DirectoryReader reader = modifyDeletedDocs
|
||||||
? new SoftDeletesDirectoryReaderWrapper(wrappedReader, softDeletesField) :
|
? new SoftDeletesDirectoryReaderWrapper(wrappedReader, softDeletesField) :
|
||||||
|
@ -111,6 +114,7 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
deletionPolicy.release(snapshot);
|
deletionPolicy.release(snapshot);
|
||||||
|
wrappedDir.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -165,13 +169,15 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
doc.add(new StoredField("src", "the quick brown fox"));
|
doc.add(new StoredField("src", "the quick brown fox"));
|
||||||
writer.softUpdateDocument(new Term("id", "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1));
|
writer.softUpdateDocument(new Term("id", "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1));
|
||||||
writer.commit();
|
writer.commit();
|
||||||
Directory targetDir = newDirectory();
|
BaseDirectoryWrapper targetDir = newDirectory();
|
||||||
|
targetDir.setCheckIndexOnClose(false);
|
||||||
IndexCommit snapshot = deletionPolicy.snapshot();
|
IndexCommit snapshot = deletionPolicy.snapshot();
|
||||||
SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(targetDir);
|
SourceOnlySnapshot.LinkedFilesDirectory wrappedDir = new SourceOnlySnapshot.LinkedFilesDirectory(targetDir);
|
||||||
|
SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(wrappedDir);
|
||||||
snapshoter.syncSnapshot(snapshot);
|
snapshoter.syncSnapshot(snapshot);
|
||||||
|
|
||||||
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(snapshot);
|
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(snapshot);
|
||||||
try (DirectoryReader snapReader = DirectoryReader.open(targetDir)) {
|
try (DirectoryReader snapReader = DirectoryReader.open(wrappedDir)) {
|
||||||
assertEquals(snapReader.maxDoc(), 3);
|
assertEquals(snapReader.maxDoc(), 3);
|
||||||
assertEquals(snapReader.numDocs(), 2);
|
assertEquals(snapReader.numDocs(), 2);
|
||||||
for (int i = 0; i < 3; i++) {
|
for (int i = 0; i < 3; i++) {
|
||||||
|
@ -182,7 +188,12 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
assertEquals(0, id.totalHits.value);
|
assertEquals(0, id.totalHits.value);
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshoter = new SourceOnlySnapshot(targetDir);
|
targetDir = newDirectory(targetDir);
|
||||||
|
targetDir.setCheckIndexOnClose(false);
|
||||||
|
wrappedDir.close();
|
||||||
|
wrappedDir = new SourceOnlySnapshot.LinkedFilesDirectory(targetDir);
|
||||||
|
|
||||||
|
snapshoter = new SourceOnlySnapshot(wrappedDir);
|
||||||
List<String> createdFiles = snapshoter.syncSnapshot(snapshot);
|
List<String> createdFiles = snapshoter.syncSnapshot(snapshot);
|
||||||
assertEquals(0, createdFiles.size());
|
assertEquals(0, createdFiles.size());
|
||||||
deletionPolicy.release(snapshot);
|
deletionPolicy.release(snapshot);
|
||||||
|
@ -200,17 +211,18 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
doc.add(new StoredField("src", "the quick blue fox"));
|
doc.add(new StoredField("src", "the quick blue fox"));
|
||||||
writer.addDocument(doc);
|
writer.addDocument(doc);
|
||||||
writer.commit();
|
writer.commit();
|
||||||
|
targetDir = newDirectory(targetDir);
|
||||||
|
targetDir.setCheckIndexOnClose(false);
|
||||||
|
wrappedDir.close();
|
||||||
|
wrappedDir = new SourceOnlySnapshot.LinkedFilesDirectory(targetDir);
|
||||||
{
|
{
|
||||||
snapshot = deletionPolicy.snapshot();
|
snapshot = deletionPolicy.snapshot();
|
||||||
snapshoter = new SourceOnlySnapshot(targetDir);
|
snapshoter = new SourceOnlySnapshot(wrappedDir);
|
||||||
createdFiles = snapshoter.syncSnapshot(snapshot);
|
createdFiles = snapshoter.syncSnapshot(snapshot);
|
||||||
assertEquals(5, createdFiles.size());
|
assertEquals(2, createdFiles.size());
|
||||||
for (String file : createdFiles) {
|
for (String file : createdFiles) {
|
||||||
String extension = IndexFileNames.getExtension(file);
|
String extension = IndexFileNames.getExtension(file);
|
||||||
switch (extension) {
|
switch (extension) {
|
||||||
case "fdt":
|
|
||||||
case "fdx":
|
|
||||||
case "fdm":
|
|
||||||
case "fnm":
|
case "fnm":
|
||||||
case "si":
|
case "si":
|
||||||
break;
|
break;
|
||||||
|
@ -218,7 +230,7 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
fail("unexpected extension: " + extension);
|
fail("unexpected extension: " + extension);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try(DirectoryReader snapReader = DirectoryReader.open(targetDir)) {
|
try(DirectoryReader snapReader = DirectoryReader.open(wrappedDir)) {
|
||||||
assertEquals(snapReader.maxDoc(), 5);
|
assertEquals(snapReader.maxDoc(), 5);
|
||||||
assertEquals(snapReader.numDocs(), 4);
|
assertEquals(snapReader.numDocs(), 4);
|
||||||
}
|
}
|
||||||
|
@ -226,9 +238,13 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
writer.deleteDocuments(new Term("id", "5"));
|
writer.deleteDocuments(new Term("id", "5"));
|
||||||
writer.commit();
|
writer.commit();
|
||||||
|
targetDir = newDirectory(targetDir);
|
||||||
|
targetDir.setCheckIndexOnClose(false);
|
||||||
|
wrappedDir.close();
|
||||||
|
wrappedDir = new SourceOnlySnapshot.LinkedFilesDirectory(targetDir);
|
||||||
{
|
{
|
||||||
snapshot = deletionPolicy.snapshot();
|
snapshot = deletionPolicy.snapshot();
|
||||||
snapshoter = new SourceOnlySnapshot(targetDir);
|
snapshoter = new SourceOnlySnapshot(wrappedDir);
|
||||||
createdFiles = snapshoter.syncSnapshot(snapshot);
|
createdFiles = snapshoter.syncSnapshot(snapshot);
|
||||||
assertEquals(1, createdFiles.size());
|
assertEquals(1, createdFiles.size());
|
||||||
for (String file : createdFiles) {
|
for (String file : createdFiles) {
|
||||||
|
@ -240,15 +256,15 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
fail("unexpected extension: " + extension);
|
fail("unexpected extension: " + extension);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try(DirectoryReader snapReader = DirectoryReader.open(targetDir)) {
|
try(DirectoryReader snapReader = DirectoryReader.open(wrappedDir)) {
|
||||||
assertEquals(snapReader.maxDoc(), 5);
|
assertEquals(snapReader.maxDoc(), 5);
|
||||||
assertEquals(snapReader.numDocs(), 3);
|
assertEquals(snapReader.numDocs(), 3);
|
||||||
}
|
}
|
||||||
deletionPolicy.release(snapshot);
|
deletionPolicy.release(snapshot);
|
||||||
}
|
}
|
||||||
writer.close();
|
writer.close();
|
||||||
targetDir.close();
|
|
||||||
reader.close();
|
reader.close();
|
||||||
|
wrappedDir.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -284,12 +300,14 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
doc.add(new StoredField("src", "the quick brown fox"));
|
doc.add(new StoredField("src", "the quick brown fox"));
|
||||||
writer.softUpdateDocument(new Term("id", "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1));
|
writer.softUpdateDocument(new Term("id", "1"), doc, new NumericDocValuesField(Lucene.SOFT_DELETES_FIELD, 1));
|
||||||
writer.commit();
|
writer.commit();
|
||||||
try (Directory targetDir = newDirectory()) {
|
try (BaseDirectoryWrapper targetDir = newDirectory()) {
|
||||||
|
targetDir.setCheckIndexOnClose(false);
|
||||||
IndexCommit snapshot = deletionPolicy.snapshot();
|
IndexCommit snapshot = deletionPolicy.snapshot();
|
||||||
SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(targetDir);
|
SourceOnlySnapshot.LinkedFilesDirectory wrappedDir = new SourceOnlySnapshot.LinkedFilesDirectory(targetDir);
|
||||||
|
SourceOnlySnapshot snapshoter = new SourceOnlySnapshot(wrappedDir);
|
||||||
snapshoter.syncSnapshot(snapshot);
|
snapshoter.syncSnapshot(snapshot);
|
||||||
|
|
||||||
try (DirectoryReader snapReader = DirectoryReader.open(targetDir)) {
|
try (DirectoryReader snapReader = DirectoryReader.open(wrappedDir)) {
|
||||||
assertEquals(snapReader.maxDoc(), 1);
|
assertEquals(snapReader.maxDoc(), 1);
|
||||||
assertEquals(snapReader.numDocs(), 1);
|
assertEquals(snapReader.numDocs(), 1);
|
||||||
assertEquals("3", snapReader.document(0).getField("rank").stringValue());
|
assertEquals("3", snapReader.document(0).getField("rank").stringValue());
|
||||||
|
@ -298,6 +316,7 @@ public class SourceOnlySnapshotTests extends ESTestCase {
|
||||||
assertEquals(writerReader.maxDoc(), 2);
|
assertEquals(writerReader.maxDoc(), 2);
|
||||||
assertEquals(writerReader.numDocs(), 1);
|
assertEquals(writerReader.numDocs(), 1);
|
||||||
}
|
}
|
||||||
|
wrappedDir.close();
|
||||||
}
|
}
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue