From e26d01e71f2687e74caf13d73a8fdd3ceb3cf400 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 1 Nov 2019 21:02:47 +0100 Subject: [PATCH] Make CcrRepository#restore non-Blocking (#48814) (#48823) With the changes in #48110 there is no more need to block a generic thread when waiting for the multi file transfer in `CcrRepository`. --- .../xpack/ccr/repository/CcrRepository.java | 175 +++++++++--------- 1 file changed, 87 insertions(+), 88 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 52f3be9b743..021e1a28204 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.mapper.MapperService; @@ -83,6 +84,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -301,11 +303,14 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, ActionListener listener) { - // TODO: Instead of blocking in the restore logic and synchronously completing the listener we should just make below logic async - ActionListener.completeWith(listener, () -> { + final ShardId shardId = store.shardId(); + final LinkedList toClose = new LinkedList<>(); + final ActionListener restoreListener = ActionListener.runBefore(ActionListener.delegateResponse(listener, + (l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e))), + () -> IOUtils.close(toClose)); + try { // TODO: Add timeouts to network calls / the restore process. createEmptyStore(store); - ShardId shardId = store.shardId(); final Map ccrMetaData = store.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); final String leaderIndexName = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); @@ -348,21 +353,23 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit }, CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(store.indexSettings().getNodeSettings()), Ccr.CCR_THREAD_POOL_NAME); - + toClose.add(() -> { + logger.trace( + "{} canceling background renewal of retention lease [{}] at the end of restore", shardId, retentionLeaseId); + renewable.cancel(); + }); // TODO: There should be some local timeout. And if the remote cluster returns an unknown session // response, we should be able to retry by creating a new session. - try (RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState)) { - restoreSession.restoreFiles(store); + final RestoreSession restoreSession = openSession(metadata.name(), remoteClient, leaderShardId, shardId, recoveryState); + toClose.addFirst(restoreSession); // Some tests depend on closing session before cancelling retention lease renewal + restoreSession.restoreFiles(store, ActionListener.wrap(v -> { + logger.trace("[{}] completed CCR restore", shardId); updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, shardId.getIndex()); - } catch (Exception e) { - throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); - } finally { - logger.trace("{} canceling background renewal of retention lease [{}] at the end of restore", shardId, - retentionLeaseId); - renewable.cancel(); - } - return null; - }); + restoreListener.onResponse(null); + }, restoreListener::onFailure)); + } catch (Exception e) { + restoreListener.onFailure(e); + } } private void createEmptyStore(Store store) { @@ -471,98 +478,90 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit this.throttleListener = throttleListener; } - void restoreFiles(Store store) { + void restoreFiles(Store store, ActionListener listener) { ArrayList fileInfos = new ArrayList<>(); for (StoreFileMetaData fileMetaData : sourceMetaData) { ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length()); fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize)); } SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos); - final PlainActionFuture future = PlainActionFuture.newFuture(); - restore(snapshotFiles, store, future); - future.actionGet(); + restore(snapshotFiles, store, listener); } @Override - protected void restoreFiles(List filesToRecover, Store store, ActionListener listener) { - ActionListener.completeWith(listener, () -> { - logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); - final PlainActionFuture restoreFilesFuture = new PlainActionFuture<>(); - final List mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList()); - final MultiFileTransfer multiFileTransfer = new MultiFileTransfer( - logger, threadPool.getThreadContext(), restoreFilesFuture, ccrSettings.getMaxConcurrentFileChunks(), mds) { + protected void restoreFiles(List filesToRecover, Store store, ActionListener allFilesListener) { + logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover); + final List mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList()); + final MultiFileTransfer multiFileTransfer = new MultiFileTransfer( + logger, threadPool.getThreadContext(), allFilesListener, ccrSettings.getMaxConcurrentFileChunks(), mds) { - final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {}); - long offset = 0; + final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> { + }); + long offset = 0; - @Override - protected void onNewFile(StoreFileMetaData md) { - offset = 0; - } + @Override + protected void onNewFile(StoreFileMetaData md) { + offset = 0; + } - @Override - protected FileChunk nextChunkRequest(StoreFileMetaData md) { - final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset)); - offset += bytesRequested; - return new FileChunk(md, bytesRequested, offset == md.length()); - } + @Override + protected FileChunk nextChunkRequest(StoreFileMetaData md) { + final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset)); + offset += bytesRequested; + return new FileChunk(md, bytesRequested, offset == md.length()); + } - @Override - protected void executeChunkRequest(FileChunk request, ActionListener listener) { - final ActionListener threadedListener - = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap( - r -> { - writeFileChunk(request.md, r); - listener.onResponse(null); - }, listener::onFailure), false); + @Override + protected void executeChunkRequest(FileChunk request, ActionListener listener) { + final ActionListener threadedListener + = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap(r -> { + writeFileChunk(request.md, r); + listener.onResponse(null); + }, listener::onFailure), false); - remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, - new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested), - ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(), - ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME)); - } + remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, + new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested), + ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(), + ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME)); + } - private void writeFileChunk(StoreFileMetaData md, - GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception { - final int actualChunkSize = r.getChunk().length(); - logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", - shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize); - final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize); - throttleListener.accept(nanosPaused); - multiFileWriter.incRef(); - try (Releasable ignored = multiFileWriter::decRef) { - final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length(); - multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk); - } catch (Exception e) { - handleError(md, e); - throw e; - } - } - - @Override - protected void handleError(StoreFileMetaData md, Exception e) throws Exception { - final IOException corruptIndexException; - if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { - try { - store.markStoreCorrupted(corruptIndexException); - } catch (IOException ioe) { - logger.warn("store cannot be marked as corrupted", e); - } - throw corruptIndexException; - } + private void writeFileChunk(StoreFileMetaData md, + GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception { + final int actualChunkSize = r.getChunk().length(); + logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", + shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize); + final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize); + throttleListener.accept(nanosPaused); + multiFileWriter.incRef(); + try (Releasable ignored = multiFileWriter::decRef) { + final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length(); + multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk); + } catch (Exception e) { + handleError(md, e); throw e; } + } - @Override - public void close() { - multiFileWriter.close(); + @Override + protected void handleError(StoreFileMetaData md, Exception e) throws Exception { + final IOException corruptIndexException; + if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) { + try { + store.markStoreCorrupted(corruptIndexException); + } catch (IOException ioe) { + logger.warn("store cannot be marked as corrupted", e); + } + throw corruptIndexException; } - }; - multiFileTransfer.start(); - restoreFilesFuture.actionGet(); - logger.trace("[{}] completed CCR restore", shardId); - return null; - }); + throw e; + } + + @Override + public void close() { + multiFileWriter.close(); + } + }; + multiFileTransfer.start(); } @Override