Concurrent file chunk fetching for CCR restore (#38656)

Adds the ability to fetch chunks from different files in parallel, configurable using the new `ccr.indices.recovery.max_concurrent_file_chunks` setting, which defaults to 5 in this PR.

The implementation uses the parallel file writer functionality that is also used by peer recoveries.
This commit is contained in:
Tim Brooks 2019-02-09 21:19:57 -07:00 committed by GitHub
parent c202900915
commit 023e3c207a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 476 additions and 301 deletions

View File

@ -191,6 +191,13 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
return 0;
}
/**
* Checks whether the deletion policy is holding on to snapshotted commits
*/
synchronized boolean hasSnapshottedCommits() {
return snapshottedCommits.isEmpty() == false;
}
/**
* Checks if the deletion policy can release some index commits with the latest global checkpoint.
*/

View File

@ -476,6 +476,11 @@ public class InternalEngine extends Engine {
return translog;
}
// Package private for testing purposes only
boolean hasSnapshottedCommits() {
return combinedDeletionPolicy.hasSnapshottedCommits();
}
@Override
public boolean isTranslogSyncNeeded() {
return getTranslog().syncNeeded();

View File

@ -0,0 +1,213 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentMap;
public class MultiFileWriter implements Releasable {
public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
this.store = store;
this.indexState = indexState;
this.tempFilePrefix = tempFilePrefix;
this.logger = logger;
this.ensureOpen = ensureOpen;
}
private final Runnable ensureOpen;
private final Logger logger;
private final Store store;
private final RecoveryState.Index indexState;
private final String tempFilePrefix;
private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<String, FileChunkWriter> fileChunkWriters = ConcurrentCollections.newConcurrentMap();
final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk)
throws IOException {
final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter());
writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk));
}
/** Get a temporary name for the provided file name. */
String getTempNameForFile(String origFile) {
return tempFilePrefix + origFile;
}
public IndexOutput getOpenIndexOutput(String key) {
ensureOpen.run();
return openIndexOutputs.get(key);
}
/** remove and {@link IndexOutput} for a given file. It is the caller's responsibility to close it */
public IndexOutput removeOpenIndexOutputs(String name) {
ensureOpen.run();
return openIndexOutputs.remove(name);
}
/**
* Creates an {@link IndexOutput} for the given file name. Note that the
* IndexOutput actually point at a temporary file.
* <p>
* Note: You can use {@link #getOpenIndexOutput(String)} with the same filename to retrieve the same IndexOutput
* at a later stage
*/
public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData metaData, Store store) throws IOException {
ensureOpen.run();
String tempFileName = getTempNameForFile(fileName);
if (tempFileNames.containsKey(tempFileName)) {
throw new IllegalStateException("output for file [" + fileName + "] has already been created");
}
// add first, before it's created
tempFileNames.put(tempFileName, fileName);
IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, metaData, IOContext.DEFAULT);
openIndexOutputs.put(fileName, indexOutput);
return indexOutput;
}
private void innerWriteFileChunk(StoreFileMetaData fileMetaData, long position,
BytesReference content, boolean lastChunk) throws IOException {
final String name = fileMetaData.name();
IndexOutput indexOutput;
if (position == 0) {
indexOutput = openAndPutIndexOutput(name, fileMetaData, store);
} else {
indexOutput = getOpenIndexOutput(name);
}
assert indexOutput.getFilePointer() == position : "file-pointer " + indexOutput.getFilePointer() + " != " + position;
BytesRefIterator iterator = content.iterator();
BytesRef scratch;
while((scratch = iterator.next()) != null) { // we iterate over all pages - this is a 0-copy for all core impls
indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length);
}
indexState.addRecoveredBytesToFile(name, content.length());
if (indexOutput.getFilePointer() >= fileMetaData.length() || lastChunk) {
try {
Store.verify(indexOutput);
} finally {
// we are done
indexOutput.close();
}
final String temporaryFileName = getTempNameForFile(name);
assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName) :
"expected: [" + temporaryFileName + "] in " + Arrays.toString(store.directory().listAll());
store.directory().sync(Collections.singleton(temporaryFileName));
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
}
@Override
public void close() {
fileChunkWriters.clear();
// clean open index outputs
Iterator<Map.Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
logger.trace("closing IndexOutput file [{}]", entry.getValue());
try {
entry.getValue().close();
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("error while closing recovery output [{}]", entry.getValue()), e);
}
iterator.remove();
}
if (Strings.hasText(tempFilePrefix)) {
// trash temporary files
for (String file : tempFileNames.keySet()) {
logger.trace("cleaning temporary file [{}]", file);
store.deleteQuiet(file);
}
}
}
/** renames all temporary files to their true name, potentially overriding existing files */
public void renameAllTempFiles() throws IOException {
ensureOpen.run();
store.renameTempFilesSafe(tempFileNames);
}
static final class FileChunk {
final StoreFileMetaData md;
final BytesReference content;
final long position;
final boolean lastChunk;
FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) {
this.md = md;
this.content = content;
this.position = position;
this.lastChunk = lastChunk;
}
}
private final class FileChunkWriter {
// chunks can be delivered out of order, we need to buffer chunks if there's a gap between them.
final PriorityQueue<FileChunk> pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position));
long lastPosition = 0;
void writeChunk(FileChunk newChunk) throws IOException {
synchronized (this) {
pendingChunks.add(newChunk);
}
while (true) {
final FileChunk chunk;
synchronized (this) {
chunk = pendingChunks.peek();
if (chunk == null || chunk.position != lastPosition) {
return;
}
pendingChunks.remove();
}
innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk);
synchronized (this) {
assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position;
lastPosition += chunk.content.length();
if (chunk.lastChunk) {
assert pendingChunks.isEmpty() == true : "still have pending chunks [" + pendingChunks + "]";
fileChunkWriters.remove(chunk.md.name());
assert fileChunkWriters.containsValue(this) == false : "chunk writer [" + newChunk.md + "] was not removed";
}
}
}
}
}
}

View File

@ -20,14 +20,9 @@
package org.elasticsearch.indices.recovery;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
@ -39,7 +34,6 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.ReplicationTracker;
@ -55,15 +49,7 @@ import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -85,15 +71,13 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
private final long recoveryId;
private final IndexShard indexShard;
private final DiscoveryNode sourceNode;
private final String tempFilePrefix;
private final MultiFileWriter multiFileWriter;
private final Store store;
private final PeerRecoveryTargetService.RecoveryListener listener;
private final LongConsumer ensureClusterStateVersionCallback;
private final AtomicBoolean finished = new AtomicBoolean();
private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<String, FileChunkWriter> fileChunkWriters = ConcurrentCollections.newConcurrentMap();
private final CancellableThreads cancellableThreads;
// last time this status was accessed
@ -102,8 +86,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);
private final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();
/**
* Creates a new recovery target object that represents a recovery to the provided shard.
*
@ -126,7 +108,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + ".";
final String tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), indexShard.recoveryState().getIndex(), tempFilePrefix, logger,
this::ensureRefCount);
this.store = indexShard.store();
this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback;
// make sure the store is not released until we are done.
@ -187,12 +171,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
return state().getStage();
}
/** renames all temporary files to their true name, potentially overriding existing files */
public void renameAllTempFiles() throws IOException {
ensureRefCount();
store.renameTempFilesSafe(tempFileNames);
}
/**
* Closes the current recovery target and waits up to a certain timeout for resources to be freed.
* Returns true if resetting the recovery was successful, false if the recovery target is already cancelled / failed or marked as done.
@ -274,7 +252,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
/** mark the current recovery as done */
public void markAsDone() {
if (finished.compareAndSet(false, true)) {
assert tempFileNames.isEmpty() : "not all temporary files are renamed";
assert multiFileWriter.tempFileNames.isEmpty() : "not all temporary files are renamed";
try {
// this might still throw an exception ie. if the shard is CLOSED due to some other event.
// it's safer to decrement the reference in a try finally here.
@ -287,65 +265,12 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
}
/** Get a temporary name for the provided file name. */
public String getTempNameForFile(String origFile) {
return tempFilePrefix + origFile;
}
public IndexOutput getOpenIndexOutput(String key) {
ensureRefCount();
return openIndexOutputs.get(key);
}
/** remove and {@link org.apache.lucene.store.IndexOutput} for a given file. It is the caller's responsibility to close it */
public IndexOutput removeOpenIndexOutputs(String name) {
ensureRefCount();
return openIndexOutputs.remove(name);
}
/**
* Creates an {@link org.apache.lucene.store.IndexOutput} for the given file name. Note that the
* IndexOutput actually point at a temporary file.
* <p>
* Note: You can use {@link #getOpenIndexOutput(String)} with the same filename to retrieve the same IndexOutput
* at a later stage
*/
public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData metaData, Store store) throws IOException {
ensureRefCount();
String tempFileName = getTempNameForFile(fileName);
if (tempFileNames.containsKey(tempFileName)) {
throw new IllegalStateException("output for file [" + fileName + "] has already been created");
}
// add first, before it's created
tempFileNames.put(tempFileName, fileName);
IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, metaData, IOContext.DEFAULT);
openIndexOutputs.put(fileName, indexOutput);
return indexOutput;
}
@Override
protected void closeInternal() {
try {
// clean open index outputs
Iterator<Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
logger.trace("closing IndexOutput file [{}]", entry.getValue());
try {
entry.getValue().close();
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("error while closing recovery output [{}]", entry.getValue()), e);
}
iterator.remove();
}
// trash temporary files
for (String file : tempFileNames.keySet()) {
logger.trace("cleaning temporary file [{}]", file);
store.deleteQuiet(file);
}
multiFileWriter.close();
} finally {
// free store. increment happens in constructor
fileChunkWriters.clear();
store.decRef();
indexShard.recoveryStats().decCurrentAsTarget();
closedLatch.countDown();
@ -470,7 +395,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
renameAllTempFiles();
multiFileWriter.renameAllTempFiles();
final Store store = store();
store.incRef();
try {
@ -511,96 +436,21 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
}
}
private void innerWriteFileChunk(StoreFileMetaData fileMetaData, long position,
BytesReference content, boolean lastChunk) throws IOException {
final Store store = store();
final String name = fileMetaData.name();
final RecoveryState.Index indexState = state().getIndex();
IndexOutput indexOutput;
if (position == 0) {
indexOutput = openAndPutIndexOutput(name, fileMetaData, store);
} else {
indexOutput = getOpenIndexOutput(name);
}
assert indexOutput.getFilePointer() == position : "file-pointer " + indexOutput.getFilePointer() + " != " + position;
BytesRefIterator iterator = content.iterator();
BytesRef scratch;
while((scratch = iterator.next()) != null) { // we iterate over all pages - this is a 0-copy for all core impls
indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length);
}
indexState.addRecoveredBytesToFile(name, content.length());
if (indexOutput.getFilePointer() >= fileMetaData.length() || lastChunk) {
try {
Store.verify(indexOutput);
} finally {
// we are done
indexOutput.close();
}
final String temporaryFileName = getTempNameForFile(name);
assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName) :
"expected: [" + temporaryFileName + "] in " + Arrays.toString(store.directory().listAll());
store.directory().sync(Collections.singleton(temporaryFileName));
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
}
@Override
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener) {
try {
state().getTranslog().totalOperations(totalTranslogOps);
final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter());
writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk));
multiFileWriter.writeFileChunk(fileMetaData, position, content, lastChunk);
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);
}
}
private static final class FileChunk {
final StoreFileMetaData md;
final BytesReference content;
final long position;
final boolean lastChunk;
FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) {
this.md = md;
this.content = content;
this.position = position;
this.lastChunk = lastChunk;
}
}
private final class FileChunkWriter {
// chunks can be delivered out of order, we need to buffer chunks if there's a gap between them.
final PriorityQueue<FileChunk> pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position));
long lastPosition = 0;
void writeChunk(FileChunk newChunk) throws IOException {
synchronized (this) {
pendingChunks.add(newChunk);
}
while (true) {
final FileChunk chunk;
synchronized (this) {
chunk = pendingChunks.peek();
if (chunk == null || chunk.position != lastPosition) {
return;
}
pendingChunks.remove();
}
innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk);
synchronized (this) {
assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position;
lastPosition += chunk.content.length();
if (chunk.lastChunk) {
assert pendingChunks.isEmpty() == true : "still have pending chunks [" + pendingChunks + "]";
fileChunkWriters.remove(chunk.md.name());
assert fileChunkWriters.containsValue(this) == false : "chunk writer [" + newChunk.md + "] was not removed";
}
}
}
}
/** Get a temporary name for the provided file name. */
public String getTempNameForFile(String origFile) {
return multiFileWriter.getTempNameForFile(origFile);
}
Path translogLocation() {

View File

@ -62,14 +62,14 @@ import static java.util.Collections.unmodifiableMap;
*/
public abstract class FileRestoreContext {
private static final Logger logger = LogManager.getLogger(FileRestoreContext.class);
protected static final Logger logger = LogManager.getLogger(FileRestoreContext.class);
private final String repositoryName;
private final IndexShard indexShard;
private final RecoveryState recoveryState;
private final SnapshotId snapshotId;
private final ShardId shardId;
private final int bufferSize;
protected final String repositoryName;
protected final IndexShard indexShard;
protected final RecoveryState recoveryState;
protected final SnapshotId snapshotId;
protected final ShardId shardId;
protected final int bufferSize;
/**
* Constructs new restore context
@ -183,7 +183,6 @@ public abstract class FileRestoreContext {
// list of all existing store files
final List<String> deleteIfExistFiles = Arrays.asList(store.directory().listAll());
// restore the files from the snapshot to the Lucene store
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
// if a file with a same physical name already exist in the store we need to delete it
// before restoring it from the snapshot. We could be lenient and try to reuse the existing
@ -196,10 +195,9 @@ public abstract class FileRestoreContext {
logger.trace("[{}] [{}] deleting pre-existing file [{}]", shardId, snapshotId, physicalName);
store.directory().deleteFile(physicalName);
}
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, store);
}
restoreFiles(filesToRecover, store);
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
}
@ -234,6 +232,14 @@ public abstract class FileRestoreContext {
}
}
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException {
// restore the files from the snapshot to the Lucene store
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, store);
}
}
protected abstract InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo);
@SuppressWarnings("unchecked")

View File

@ -189,7 +189,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
for (Thread sender : senders) {
sender.join();
}
recoveryTarget.renameAllTempFiles();
recoveryTarget.cleanFiles(0, sourceSnapshot);
recoveryTarget.decRef();
Store.MetadataSnapshot targetSnapshot = targetShard.snapshotStoreMetadata();
Store.RecoveryDiff diff = sourceSnapshot.recoveryDiff(targetSnapshot);

View File

@ -20,8 +20,6 @@ package org.elasticsearch.indices.recovery;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
@ -32,9 +30,6 @@ import java.io.IOException;
import java.util.Set;
import java.util.regex.Pattern;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
public class RecoveryStatusTests extends ESSingleNodeTestCase {
private static final org.apache.lucene.util.Version MIN_SUPPORTED_LUCENE_VERSION = org.elasticsearch.Version.CURRENT
.minimumIndexCompatibilityVersion().luceneVersion;
@ -42,35 +37,27 @@ public class RecoveryStatusTests extends ESSingleNodeTestCase {
IndexService service = createIndex("foo");
IndexShard indexShard = service.getShardOrNull(0);
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
RecoveryTarget status = new RecoveryTarget(indexShard, node, new PeerRecoveryTargetService.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
}
}, version -> {});
try (IndexOutput indexOutput = status.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8 + CodecUtil.footerLength()
, "9z51nw", MIN_SUPPORTED_LUCENE_VERSION), status.store())) {
MultiFileWriter multiFileWriter = new MultiFileWriter(indexShard.store(),
indexShard.recoveryState().getIndex(), "recovery.test.", logger, () -> {});
try (IndexOutput indexOutput = multiFileWriter.openAndPutIndexOutput("foo.bar",
new StoreFileMetaData("foo.bar", 8 + CodecUtil.footerLength(), "9z51nw", MIN_SUPPORTED_LUCENE_VERSION), indexShard.store())) {
indexOutput.writeInt(1);
IndexOutput openIndexOutput = status.getOpenIndexOutput("foo.bar");
IndexOutput openIndexOutput = multiFileWriter.getOpenIndexOutput("foo.bar");
assertSame(openIndexOutput, indexOutput);
openIndexOutput.writeInt(1);
CodecUtil.writeFooter(indexOutput);
}
try {
status.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8 + CodecUtil.footerLength(), "9z51nw",
MIN_SUPPORTED_LUCENE_VERSION), status.store());
multiFileWriter.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8 + CodecUtil.footerLength(), "9z51nw",
MIN_SUPPORTED_LUCENE_VERSION), indexShard.store());
fail("file foo.bar is already opened and registered");
} catch (IllegalStateException ex) {
assertEquals("output for file [foo.bar] has already been created", ex.getMessage());
// all well = it's already registered
}
status.removeOpenIndexOutputs("foo.bar");
Set<String> strings = Sets.newHashSet(status.store().directory().listAll());
multiFileWriter.removeOpenIndexOutputs("foo.bar");
Set<String> strings = Sets.newHashSet(indexShard.store().directory().listAll());
String expectedFile = null;
for (String file : strings) {
if (Pattern.compile("recovery[.][\\w-]+[.]foo[.]bar").matcher(file).matches()) {
@ -80,12 +67,10 @@ public class RecoveryStatusTests extends ESSingleNodeTestCase {
}
assertNotNull(expectedFile);
indexShard.close("foo", false);// we have to close it here otherwise rename fails since the write.lock is held by the engine
status.renameAllTempFiles();
strings = Sets.newHashSet(status.store().directory().listAll());
multiFileWriter.renameAllTempFiles();
strings = Sets.newHashSet(indexShard.store().directory().listAll());
assertTrue(strings.toString(), strings.contains("foo.bar"));
assertFalse(strings.toString(), strings.contains(expectedFile));
// we must fail the recovery because marking it as done will try to move the shard to POST_RECOVERY,
// which will fail because it's started
status.fail(new RecoveryFailedException(status.state(), "end of test. OK.", null), false);
multiFileWriter.close();
}
}

View File

@ -1088,6 +1088,12 @@ public abstract class EngineTestCase extends ESTestCase {
return internalEngine.getTranslog();
}
public static boolean hasSnapshottedCommits(Engine engine) {
assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass();
InternalEngine internalEngine = (InternalEngine) engine;
return internalEngine.hasSnapshottedCommits();
}
public static final class PrimaryTermSupplier implements LongSupplier {
private final AtomicLong term;

View File

@ -127,6 +127,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
private final CcrLicenseChecker ccrLicenseChecker;
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
private final SetOnce<CcrSettings> ccrSettings = new SetOnce<>();
private final SetOnce<ThreadPool> threadPool = new SetOnce<>();
private Client client;
private final boolean transportClientMode;
@ -171,6 +172,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings());
this.ccrSettings.set(ccrSettings);
this.threadPool.set(threadPool);
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(threadPool, ccrSettings);
this.restoreSourceService.set(restoreSourceService);
return Arrays.asList(
@ -307,7 +309,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
@Override
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
Repository.Factory repositoryFactory =
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get());
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool.get());
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
}

View File

@ -57,6 +57,12 @@ public final class CcrSettings {
new ByteSizeValue(1, ByteSizeUnit.KB), new ByteSizeValue(1, ByteSizeUnit.GB), Setting.Property.Dynamic,
Setting.Property.NodeScope);
/**
* Controls the maximum number of file chunk requests that are sent concurrently per recovery to the leader.
*/
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING =
Setting.intSetting("ccr.indices.recovery.max_concurrent_file_chunks", 5, 1, 10, Property.Dynamic, Property.NodeScope);
/**
* The leader must open resources for a ccr recovery. If there is no activity for this interval of time,
* the leader will close the restore session.
@ -77,7 +83,7 @@ public final class CcrSettings {
*
* @return the settings
*/
static List<Setting<?>> getSettings() {
public static List<Setting<?>> getSettings() {
return Arrays.asList(
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
@ -86,6 +92,7 @@ public final class CcrSettings {
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
RECOVERY_CHUNK_SIZE,
INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
CCR_WAIT_FOR_METADATA_TIMEOUT);
}
@ -93,14 +100,17 @@ public final class CcrSettings {
private volatile TimeValue recoveryActivityTimeout;
private volatile TimeValue recoveryActionTimeout;
private volatile ByteSizeValue chunkSize;
private volatile int maxConcurrentFileChunks;
public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings);
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
this.chunkSize = RECOVERY_MAX_BYTES_PER_SECOND.get(settings);
this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(RECOVERY_CHUNK_SIZE, this::setChunkSize);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout);
}
@ -109,6 +119,10 @@ public final class CcrSettings {
this.chunkSize = chunkSize;
}
private void setMaxConcurrentFileChunks(int maxConcurrentFileChunks) {
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
}
private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
ccrRateLimiter.setMBPerSec(maxBytesPerSec);
}
@ -125,6 +139,10 @@ public final class CcrSettings {
return chunkSize;
}
public int getMaxConcurrentFileChunks() {
return maxConcurrentFileChunks;
}
public CombinedRateLimiter getRateLimiter() {
return ccrRateLimiter;
}

View File

@ -51,7 +51,6 @@ public class GetCcrRestoreFileChunkAction extends Action<GetCcrRestoreFileChunkA
extends HandledTransportAction<GetCcrRestoreFileChunkRequest, GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> {
private final CcrRestoreSourceService restoreSourceService;
private final ThreadPool threadPool;
private final BigArrays bigArrays;
@Inject
@ -59,7 +58,6 @@ public class GetCcrRestoreFileChunkAction extends Action<GetCcrRestoreFileChunkA
CcrRestoreSourceService restoreSourceService) {
super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC);
TransportActionProxy.registerProxyAction(transportService, NAME, GetCcrRestoreFileChunkResponse::new);
this.threadPool = transportService.getThreadPool();
this.restoreSourceService = restoreSourceService;
this.bigArrays = bigArrays;
}

View File

@ -9,7 +9,10 @@ package org.elasticsearch.xpack.ccr.repository;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
@ -22,26 +25,27 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CombinedRateLimiter;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.recovery.MultiFileWriter;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
@ -51,6 +55,7 @@ import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
@ -65,15 +70,20 @@ import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionReque
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
/**
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
@ -92,17 +102,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
private final String remoteClusterAlias;
private final Client client;
private final CcrLicenseChecker ccrLicenseChecker;
private final ThreadPool threadPool;
private final CounterMetric throttledTime = new CounterMetric();
public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings,
CcrSettings ccrSettings) {
CcrSettings ccrSettings, ThreadPool threadPool) {
this.metadata = metadata;
this.ccrSettings = ccrSettings;
assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX;
this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1];
this.ccrLicenseChecker = ccrLicenseChecker;
this.client = client;
this.threadPool = threadPool;
}
@Override
@ -325,7 +337,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout());
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
response.getStoreFileMetaData(), response.getMappingVersion(), ccrSettings, throttledTime::inc);
response.getStoreFileMetaData(), response.getMappingVersion(), threadPool, ccrSettings, throttledTime::inc);
}
private static class RestoreSession extends FileRestoreContext implements Closeable {
@ -337,33 +349,179 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
private final long mappingVersion;
private final CcrSettings ccrSettings;
private final LongConsumer throttleListener;
private final ThreadPool threadPool;
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion,
CcrSettings ccrSettings, LongConsumer throttleListener) {
ThreadPool threadPool, CcrSettings ccrSettings, LongConsumer throttleListener) {
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes()));
this.remoteClient = remoteClient;
this.sessionUUID = sessionUUID;
this.node = node;
this.sourceMetaData = sourceMetaData;
this.mappingVersion = mappingVersion;
this.threadPool = threadPool;
this.ccrSettings = ccrSettings;
this.throttleListener = throttleListener;
}
void restoreFiles() throws IOException {
ArrayList<BlobStoreIndexShardSnapshot.FileInfo> fileInfos = new ArrayList<>();
ArrayList<FileInfo> fileInfos = new ArrayList<>();
for (StoreFileMetaData fileMetaData : sourceMetaData) {
ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length());
fileInfos.add(new BlobStoreIndexShardSnapshot.FileInfo(fileMetaData.name(), fileMetaData, fileSize));
fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize));
}
SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos);
restore(snapshotFiles);
}
private static class FileSession {
FileSession(long lastTrackedSeqNo, long lastOffset) {
this.lastTrackedSeqNo = lastTrackedSeqNo;
this.lastOffset = lastOffset;
}
final long lastTrackedSeqNo;
final long lastOffset;
}
@Override
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), ccrSettings, throttleListener);
protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException {
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {})) {
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
final ArrayDeque<FileInfo> remainingFiles = new ArrayDeque<>(filesToRecover);
final Map<FileInfo, FileSession> inFlightRequests = new HashMap<>();
final Object mutex = new Object();
while (true) {
if (error.get() != null) {
break;
}
final FileInfo fileToRecover;
final FileSession fileSession;
synchronized (mutex) {
if (inFlightRequests.isEmpty() && remainingFiles.isEmpty()) {
break;
}
final long maxConcurrentFileChunks = ccrSettings.getMaxConcurrentFileChunks();
if (remainingFiles.isEmpty() == false && inFlightRequests.size() < maxConcurrentFileChunks) {
for (int i = 0; i < maxConcurrentFileChunks; i++) {
if (remainingFiles.isEmpty()) {
break;
}
inFlightRequests.put(remainingFiles.pop(), new FileSession(NO_OPS_PERFORMED, 0));
}
}
final Map.Entry<FileInfo, FileSession> minEntry =
inFlightRequests.entrySet().stream().min(Comparator.comparingLong(e -> e.getValue().lastTrackedSeqNo)).get();
fileSession = minEntry.getValue();
fileToRecover = minEntry.getKey();
}
try {
requestSeqIdTracker.waitForOpsToComplete(fileSession.lastTrackedSeqNo);
synchronized (mutex) {
// if file has been removed in the mean-while, it means that restore of this file completed, so start working
// on the next one
if (inFlightRequests.containsKey(fileToRecover) == false) {
continue;
}
}
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
try {
synchronized (mutex) {
inFlightRequests.put(fileToRecover, new FileSession(requestSeqId, fileSession.lastOffset));
}
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(),
fileToRecover.length() - fileSession.lastOffset));
final GetCcrRestoreFileChunkRequest request =
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileToRecover.name(), bytesRequested);
logger.trace("[{}] [{}] fetching chunk for file [{}]", shardId, snapshotId, fileToRecover.name());
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request,
ActionListener.wrap(
r -> threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
@Override
protected void doRun() throws Exception {
final int actualChunkSize = r.getChunk().length();
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
final long newOffset = r.getOffset() + actualChunkSize;
assert newOffset <= fileToRecover.length();
final boolean lastChunk = newOffset >= fileToRecover.length();
multiFileWriter.writeFileChunk(fileToRecover.metadata(), r.getOffset(), r.getChunk(),
lastChunk);
if (lastChunk) {
synchronized (mutex) {
final FileSession session = inFlightRequests.remove(fileToRecover);
assert session != null : "session disappeared for " + fileToRecover.name();
}
} else {
synchronized (mutex) {
final FileSession replaced = inFlightRequests.replace(fileToRecover,
new FileSession(requestSeqId, newOffset));
assert replaced != null : "session disappeared for " + fileToRecover.name();
}
}
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
}),
e -> {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
));
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
throw e;
}
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
break;
}
}
try {
requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchException(e);
}
if (error.get() != null) {
handleError(store, error.get().v2());
}
}
logger.trace("[{}] completed CCR restore", shardId);
}
private void handleError(Store store, Exception e) throws IOException {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
try {
store.markStoreCorrupted(corruptIndexException);
} catch (IOException ioe) {
logger.warn("store cannot be marked as corrupted", e);
}
throw corruptIndexException;
} else {
ExceptionsHelper.reThrowIfNotNull(e);
}
}
@Override
protected InputStream fileInputStream(FileInfo fileInfo) {
throw new UnsupportedOperationException();
}
@Override
@ -373,71 +531,4 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
}
}
private static class RestoreFileInputStream extends InputStream {
private final Client remoteClient;
private final String sessionUUID;
private final DiscoveryNode node;
private final StoreFileMetaData fileToRecover;
private final CombinedRateLimiter rateLimiter;
private final CcrSettings ccrSettings;
private final LongConsumer throttleListener;
private long pos = 0;
private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover,
CcrSettings ccrSettings, LongConsumer throttleListener) {
this.remoteClient = remoteClient;
this.sessionUUID = sessionUUID;
this.node = node;
this.fileToRecover = fileToRecover;
this.ccrSettings = ccrSettings;
this.rateLimiter = ccrSettings.getRateLimiter();
this.throttleListener = throttleListener;
}
@Override
public int read() throws IOException {
throw new UnsupportedOperationException();
}
@Override
public int read(byte[] bytes, int off, int len) throws IOException {
long remainingBytes = fileToRecover.length() - pos;
if (remainingBytes <= 0) {
return 0;
}
int bytesRequested = (int) Math.min(remainingBytes, len);
long nanosPaused = rateLimiter.maybePause(bytesRequested);
throttleListener.accept(nanosPaused);
String fileName = fileToRecover.name();
GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request).actionGet(ccrSettings.getRecoveryActionTimeout());
BytesReference fileChunk = response.getChunk();
int bytesReceived = fileChunk.length();
if (bytesReceived > bytesRequested) {
throw new IOException("More bytes [" + bytesReceived + "] received than requested [" + bytesRequested + "]");
}
long leaderOffset = response.getOffset();
assert pos == leaderOffset : "Position [" + pos + "] should be equal to the leader file offset [" + leaderOffset + "].";
try (StreamInput streamInput = fileChunk.streamInput()) {
int bytesRead = streamInput.read(bytes, 0, bytesReceived);
assert bytesRead == bytesReceived : "Did not read the correct number of bytes";
}
pos += bytesReceived;
return bytesReceived;
}
}
}

View File

@ -42,8 +42,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener {
@ -52,7 +50,6 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
private final Map<String, RestoreSession> onGoingRestores = ConcurrentCollections.newConcurrentMap();
private final Map<IndexShard, HashSet<String>> sessionsForShard = new HashMap<>();
private final CopyOnWriteArrayList<Consumer<String>> closeSessionListeners = new CopyOnWriteArrayList<>();
private final ThreadPool threadPool;
private final CcrSettings ccrSettings;
private final CounterMetric throttleTime = new CounterMetric();
@ -93,12 +90,6 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
onGoingRestores.clear();
}
// TODO: The listeners are for testing. Once end-to-end file restore is implemented and can be tested,
// these should be removed.
public void addCloseSessionListener(Consumer<String> listener) {
closeSessionListeners.add(listener);
}
public synchronized Store.MetadataSnapshot openSession(String sessionUUID, IndexShard indexShard) throws IOException {
boolean success = false;
RestoreSession restore = null;
@ -165,9 +156,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
}
}
}
closeSessionListeners.forEach(c -> c.accept(sessionUUID));
restore.decRef();
}
private Scheduler.Cancellable scheduleTimeout(String sessionUUID) {
@ -255,6 +244,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
assert keyedLock.hasLockedKeys() == false : "Should not hold any file locks when closing";
timeoutTask.cancel();
IOUtils.closeWhileHandlingException(cachedInputs.values());
IOUtils.closeWhileHandlingException(commitRef);
}
}

View File

@ -114,7 +114,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
final int firstBatchNumDocs;
// Sometimes we want to index a lot of documents to ensure that the recovery works with larger files
if (rarely()) {
firstBatchNumDocs = randomIntBetween(1800, 2000);
firstBatchNumDocs = randomIntBetween(1800, 10000);
} else {
firstBatchNumDocs = randomIntBetween(10, 64);
}
@ -127,6 +127,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
waitForDocs(firstBatchNumDocs, indexer);
indexer.assertNoFailures();
logger.info("Executing put follow");
boolean waitOnAll = randomBoolean();
final PutFollowAction.Request followRequest;
@ -176,6 +177,8 @@ public class IndexFollowingIT extends CcrIntegTestCase {
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
indexer.continueIndexing(secondBatchNumDocs);
waitForDocs(firstBatchNumDocs + secondBatchNumDocs, indexer);
final Map<ShardId, Long> secondBatchNumDocsPerShard = new HashMap<>();
final ShardStats[] secondBatchShardStats =
leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards();
@ -194,6 +197,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
assertTrue("Doc with id [" + docId + "] is missing", getResponse.isExists());
});
}
pauseFollow("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfPrimaryShards);
}

View File

@ -12,10 +12,9 @@ import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
@ -25,7 +24,7 @@ import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
@ -39,10 +38,8 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
super.setUp();
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
taskQueue = new DeterministicTaskQueue(settings, random());
Set<Setting<?>> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
CcrSettings.RECOVERY_CHUNK_SIZE);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, CcrSettings.getSettings()
.stream().filter(s -> s.hasNodeScope()).collect(Collectors.toSet()));
restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings));
}
@ -202,7 +199,10 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase {
sessionReader.readFileBytes(files.get(1).name(), new BytesArray(new byte[10]));
}
assertTrue(EngineTestCase.hasSnapshottedCommits(IndexShardTestCase.getEngine(indexShard)));
restoreSourceService.closeSession(sessionUUID);
assertFalse(EngineTestCase.hasSnapshottedCommits(IndexShardTestCase.getEngine(indexShard)));
closeShards(indexShard);
// Exception will be thrown if file is not closed.
}