With the changes in #48110 there is no more need to block a generic thread when waiting for the multi file transfer in `CcrRepository`.
This commit is contained in:
parent
6c290ecaf7
commit
e26d01e71f
|
@ -38,6 +38,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
|
@ -83,6 +84,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -301,11 +303,14 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
@Override
|
||||
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
|
||||
ActionListener<Void> listener) {
|
||||
// TODO: Instead of blocking in the restore logic and synchronously completing the listener we should just make below logic async
|
||||
ActionListener.completeWith(listener, () -> {
|
||||
final ShardId shardId = store.shardId();
|
||||
final LinkedList<Closeable> toClose = new LinkedList<>();
|
||||
final ActionListener<Void> restoreListener = ActionListener.runBefore(ActionListener.delegateResponse(listener,
|
||||
(l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e))),
|
||||
() -> IOUtils.close(toClose));
|
||||
try {
|
||||
// TODO: Add timeouts to network calls / the restore process.
|
||||
createEmptyStore(store);
|
||||
ShardId shardId = store.shardId();
|
||||
|
||||
final Map<String, String> ccrMetaData = store.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
|
||||
final String leaderIndexName = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
|
||||
|
@ -348,21 +353,23 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
},
|
||||
CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(store.indexSettings().getNodeSettings()),
|
||||
Ccr.CCR_THREAD_POOL_NAME);
|
||||
|
||||
toClose.add(() -> {
|
||||
logger.trace(
|
||||
"{} canceling background renewal of retention lease [{}] at the end of restore", shardId, retentionLeaseId);
|
||||
renewable.cancel();
|
||||
});
|
||||
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
|
||||
// response, we should be able to retry by creating a new session.
|
||||
try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState)) {
|
||||
restoreSession.restoreFiles(store);
|
||||
final RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState);
|
||||
toClose.addFirst(restoreSession); // Some tests depend on closing session before cancelling retention lease renewal
|
||||
restoreSession.restoreFiles(store, ActionListener.wrap(v -> {
|
||||
logger.trace("[{}] completed CCR restore", shardId);
|
||||
updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, shardId.getIndex());
|
||||
} catch (Exception e) {
|
||||
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
|
||||
} finally {
|
||||
logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId,
|
||||
retentionLeaseId);
|
||||
renewable.cancel();
|
||||
}
|
||||
return null;
|
||||
});
|
||||
restoreListener.onResponse(null);
|
||||
}, restoreListener::onFailure));
|
||||
} catch (Exception e) {
|
||||
restoreListener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void createEmptyStore(Store store) {
|
||||
|
@ -471,98 +478,90 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
this.throttleListener = throttleListener;
|
||||
}
|
||||
|
||||
void restoreFiles(Store store) {
|
||||
void restoreFiles(Store store, ActionListener<Void> listener) {
|
||||
ArrayList<FileInfo> fileInfos = new ArrayList<>();
|
||||
for (StoreFileMetaData fileMetaData : sourceMetaData) {
|
||||
ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length());
|
||||
fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize));
|
||||
}
|
||||
SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos);
|
||||
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
|
||||
restore(snapshotFiles, store, future);
|
||||
future.actionGet();
|
||||
restore(snapshotFiles, store, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void restoreFiles(List<FileInfo> filesToRecover, Store store, ActionListener<Void> listener) {
|
||||
ActionListener.completeWith(listener, () -> {
|
||||
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
|
||||
final PlainActionFuture<Void> restoreFilesFuture = new PlainActionFuture<>();
|
||||
final List<StoreFileMetaData> mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList());
|
||||
final MultiFileTransfer<FileChunk> multiFileTransfer = new MultiFileTransfer<FileChunk>(
|
||||
logger, threadPool.getThreadContext(), restoreFilesFuture, ccrSettings.getMaxConcurrentFileChunks(), mds) {
|
||||
protected void restoreFiles(List<FileInfo> filesToRecover, Store store, ActionListener<Void> allFilesListener) {
|
||||
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
|
||||
final List<StoreFileMetaData> mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList());
|
||||
final MultiFileTransfer<FileChunk> multiFileTransfer = new MultiFileTransfer<FileChunk>(
|
||||
logger, threadPool.getThreadContext(), allFilesListener, ccrSettings.getMaxConcurrentFileChunks(), mds) {
|
||||
|
||||
final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {});
|
||||
long offset = 0;
|
||||
final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {
|
||||
});
|
||||
long offset = 0;
|
||||
|
||||
@Override
|
||||
protected void onNewFile(StoreFileMetaData md) {
|
||||
offset = 0;
|
||||
}
|
||||
@Override
|
||||
protected void onNewFile(StoreFileMetaData md) {
|
||||
offset = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FileChunk nextChunkRequest(StoreFileMetaData md) {
|
||||
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset));
|
||||
offset += bytesRequested;
|
||||
return new FileChunk(md, bytesRequested, offset == md.length());
|
||||
}
|
||||
@Override
|
||||
protected FileChunk nextChunkRequest(StoreFileMetaData md) {
|
||||
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset));
|
||||
offset += bytesRequested;
|
||||
return new FileChunk(md, bytesRequested, offset == md.length());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
|
||||
final ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> threadedListener
|
||||
= new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap(
|
||||
r -> {
|
||||
writeFileChunk(request.md, r);
|
||||
listener.onResponse(null);
|
||||
}, listener::onFailure), false);
|
||||
@Override
|
||||
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
|
||||
final ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> threadedListener
|
||||
= new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap(r -> {
|
||||
writeFileChunk(request.md, r);
|
||||
listener.onResponse(null);
|
||||
}, listener::onFailure), false);
|
||||
|
||||
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE,
|
||||
new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested),
|
||||
ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(),
|
||||
ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME));
|
||||
}
|
||||
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE,
|
||||
new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested),
|
||||
ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(),
|
||||
ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME));
|
||||
}
|
||||
|
||||
private void writeFileChunk(StoreFileMetaData md,
|
||||
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception {
|
||||
final int actualChunkSize = r.getChunk().length();
|
||||
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}",
|
||||
shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize);
|
||||
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
|
||||
throttleListener.accept(nanosPaused);
|
||||
multiFileWriter.incRef();
|
||||
try (Releasable ignored = multiFileWriter::decRef) {
|
||||
final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length();
|
||||
multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk);
|
||||
} catch (Exception e) {
|
||||
handleError(md, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleError(StoreFileMetaData md, Exception e) throws Exception {
|
||||
final IOException corruptIndexException;
|
||||
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
|
||||
try {
|
||||
store.markStoreCorrupted(corruptIndexException);
|
||||
} catch (IOException ioe) {
|
||||
logger.warn("store cannot be marked as corrupted", e);
|
||||
}
|
||||
throw corruptIndexException;
|
||||
}
|
||||
private void writeFileChunk(StoreFileMetaData md,
|
||||
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception {
|
||||
final int actualChunkSize = r.getChunk().length();
|
||||
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}",
|
||||
shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize);
|
||||
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
|
||||
throttleListener.accept(nanosPaused);
|
||||
multiFileWriter.incRef();
|
||||
try (Releasable ignored = multiFileWriter::decRef) {
|
||||
final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length();
|
||||
multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk);
|
||||
} catch (Exception e) {
|
||||
handleError(md, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
multiFileWriter.close();
|
||||
@Override
|
||||
protected void handleError(StoreFileMetaData md, Exception e) throws Exception {
|
||||
final IOException corruptIndexException;
|
||||
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
|
||||
try {
|
||||
store.markStoreCorrupted(corruptIndexException);
|
||||
} catch (IOException ioe) {
|
||||
logger.warn("store cannot be marked as corrupted", e);
|
||||
}
|
||||
throw corruptIndexException;
|
||||
}
|
||||
};
|
||||
multiFileTransfer.start();
|
||||
restoreFilesFuture.actionGet();
|
||||
logger.trace("[{}] completed CCR restore", shardId);
|
||||
return null;
|
||||
});
|
||||
throw e;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
multiFileWriter.close();
|
||||
}
|
||||
};
|
||||
multiFileTransfer.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue