Use MultiFileTransfer in CCR remote recovery (#44514)

Relates #44468
This commit is contained in:
Nhat Nguyen 2019-10-21 23:19:23 -04:00
parent 547e399dbf
commit d0a4bad95b
4 changed files with 103 additions and 90 deletions

View File

@ -57,7 +57,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
* one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue * one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue
* until all chunk requests are sent/responded. * until all chunk requests are sent/responded.
*/ */
abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkRequest> implements Closeable { public abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkRequest> implements Closeable {
private Status status = Status.PROCESSING; private Status status = Status.PROCESSING;
private final Logger logger; private final Logger logger;
private final ActionListener<Void> listener; private final ActionListener<Void> listener;
@ -121,7 +121,7 @@ abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkRequest>
return; return;
} }
final long requestSeqId = requestSeqIdTracker.generateSeqNo(); final long requestSeqId = requestSeqIdTracker.generateSeqNo();
sendChunkRequest(request.v2(), ActionListener.wrap( executeChunkRequest(request.v2(), ActionListener.wrap(
r -> addItem(requestSeqId, request.v1(), null), r -> addItem(requestSeqId, request.v1(), null),
e -> addItem(requestSeqId, request.v1(), e))); e -> addItem(requestSeqId, request.v1(), e)));
} }
@ -179,7 +179,7 @@ abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkRequest>
protected abstract Request nextChunkRequest(StoreFileMetaData md) throws IOException; protected abstract Request nextChunkRequest(StoreFileMetaData md) throws IOException;
protected abstract void sendChunkRequest(Request request, ActionListener<Void> listener); protected abstract void executeChunkRequest(Request request, ActionListener<Void> listener);
protected abstract void handleError(StoreFileMetaData md, Exception e) throws Exception; protected abstract void handleError(StoreFileMetaData md, Exception e) throws Exception;
@ -195,7 +195,7 @@ abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkRequest>
} }
} }
protected interface ChunkRequest { public interface ChunkRequest {
/** /**
* @return {@code true} if this chunk request is the last chunk of the current file * @return {@code true} if this chunk request is the last chunk of the current file
*/ */

View File

@ -27,9 +27,11 @@ import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.transport.Transports;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -39,10 +41,12 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class MultiFileWriter implements Releasable { public class MultiFileWriter extends AbstractRefCounted implements Releasable {
public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) { public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
super("multi_file_writer");
this.store = store; this.store = store;
this.indexState = indexState; this.indexState = indexState;
this.tempFilePrefix = tempFilePrefix; this.tempFilePrefix = tempFilePrefix;
@ -51,6 +55,7 @@ public class MultiFileWriter implements Releasable {
} }
private final Runnable ensureOpen; private final Runnable ensureOpen;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Logger logger; private final Logger logger;
private final Store store; private final Store store;
private final RecoveryState.Index indexState; private final RecoveryState.Index indexState;
@ -64,6 +69,7 @@ public class MultiFileWriter implements Releasable {
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk) public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk)
throws IOException { throws IOException {
assert Transports.assertNotTransportThread("multi_file_writer");
final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter()); final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter());
writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk)); writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk));
} }
@ -138,6 +144,13 @@ public class MultiFileWriter implements Releasable {
@Override @Override
public void close() { public void close() {
if (closed.compareAndSet(false, true)) {
decRef();
}
}
@Override
protected void closeInternal() {
fileChunkWriters.clear(); fileChunkWriters.clear();
// clean open index outputs // clean open index outputs
Iterator<Map.Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator(); Iterator<Map.Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();

View File

@ -893,7 +893,7 @@ public class RecoverySourceHandler {
} }
@Override @Override
protected void sendChunkRequest(FileChunk request, ActionListener<Void> listener) { protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
cancellableThreads.checkForCancel(); cancellableThreads.checkForCancel();
recoveryTarget.writeFileChunk( recoveryTarget.writeFileChunk(
request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener); request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener);

View File

@ -12,7 +12,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -21,6 +20,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.ListenerTimeouts; import org.elasticsearch.action.support.ListenerTimeouts;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -31,18 +31,16 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException; import org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
@ -54,6 +52,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.F
import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles; import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.recovery.MultiFileTransfer;
import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.MultiFileWriter;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexId;
@ -87,12 +86,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer; import java.util.function.LongConsumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncRenewRetentionLease; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncRenewRetentionLease;
@ -477,104 +475,106 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
} }
@Override @Override
protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException { protected void restoreFiles(List<FileInfo> filesToRecover, Store store) {
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
final PlainActionFuture<Void> restoreFilesFuture = new PlainActionFuture<>();
final List<StoreFileMetaData> mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList());
final MultiFileTransfer<FileChunk> multiFileTransfer = new MultiFileTransfer<FileChunk>(
logger, threadPool.getThreadContext(), restoreFilesFuture, ccrSettings.getMaxConcurrentFileChunks(), mds) {
try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> { final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {});
})) { long offset = 0;
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
for (FileInfo fileInfo : filesToRecover) { @Override
final long fileLength = fileInfo.length(); protected void onNewFile(StoreFileMetaData md) {
long offset = 0; offset = 0;
while (offset < fileLength && error.get() == null) { }
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
try {
requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks());
if (error.get() != null) { @Override
requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); protected FileChunk nextChunkRequest(StoreFileMetaData md) {
break; final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset));
} offset += bytesRequested;
return new FileChunk(md, bytesRequested, offset == md.length());
}
final int bytesRequested = Math.toIntExact( @Override
Math.min(ccrSettings.getChunkSize().getBytes(), fileLength - offset)); protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
offset += bytesRequested; final ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> threadedListener
= new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap(
r -> {
writeFileChunk(request.md, r);
listener.onResponse(null);
}, listener::onFailure), false);
final GetCcrRestoreFileChunkRequest request = remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE,
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileInfo.name(), bytesRequested); new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested),
logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId, ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(),
fileInfo.name(), offset, bytesRequested); ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME));
}
TimeValue timeout = ccrSettings.getRecoveryActionTimeout(); private void writeFileChunk(StoreFileMetaData md,
ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> listener = GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception {
ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap( final int actualChunkSize = r.getChunk().length();
r -> threadPool.generic().execute(new AbstractRunnable() { logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}",
@Override shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize);
public void onFailure(Exception e) { final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e)); throttleListener.accept(nanosPaused);
requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); multiFileWriter.incRef();
} try (Releasable ignored = multiFileWriter::decRef) {
final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length();
@Override multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk);
protected void doRun() throws Exception { } catch (Exception e) {
final int actualChunkSize = r.getChunk().length(); handleError(md, e);
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId, throw e;
snapshotId, fileInfo.name(), r.getOffset(), actualChunkSize);
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
final boolean lastChunk = r.getOffset() + actualChunkSize >= fileLength;
multiFileWriter.writeFileChunk(fileInfo.metadata(), r.getOffset(), r.getChunk(), lastChunk);
requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId);
}
}),
e -> {
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId);
}
), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME);
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener);
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId);
}
} }
} }
try { @Override
requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqIdTracker.getMaxSeqNo()); protected void handleError(StoreFileMetaData md, Exception e) throws Exception {
} catch (InterruptedException e) { final IOException corruptIndexException;
Thread.currentThread().interrupt(); if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
throw new ElasticsearchException(e); try {
store.markStoreCorrupted(corruptIndexException);
} catch (IOException ioe) {
logger.warn("store cannot be marked as corrupted", e);
}
throw corruptIndexException;
}
throw e;
} }
if (error.get() != null) {
handleError(store, error.get().v2());
}
}
@Override
public void close() {
multiFileWriter.close();
}
};
multiFileTransfer.start();
restoreFilesFuture.actionGet();
logger.trace("[{}] completed CCR restore", shardId); logger.trace("[{}] completed CCR restore", shardId);
} }
private void handleError(Store store, Exception e) throws IOException {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
try {
store.markStoreCorrupted(corruptIndexException);
} catch (IOException ioe) {
logger.warn("store cannot be marked as corrupted", e);
}
throw corruptIndexException;
} else {
ExceptionsHelper.reThrowIfNotNull(e);
}
}
@Override @Override
public void close() { public void close() {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout()); remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
} }
private static class FileChunk implements MultiFileTransfer.ChunkRequest {
final StoreFileMetaData md;
final int bytesRequested;
final boolean lastChunk;
FileChunk(StoreFileMetaData md, int bytesRequested, boolean lastChunk) {
this.md = md;
this.bytesRequested = bytesRequested;
this.lastChunk = lastChunk;
}
@Override
public boolean lastChunk() {
return lastChunk;
}
}
} }
} }