From 51180af91dc26a92ba9b41df88565eee88b8ba35 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 17 Jul 2019 08:17:35 -0400 Subject: [PATCH] Make peer recovery send file chunks async (#44468) Relates #44040 Relates #36195 --- .../elasticsearch/action/StepListener.java | 6 +- .../indices/recovery/MultiFileTransfer.java | 210 ++++++++++++++ .../recovery/PeerRecoverySourceService.java | 2 +- .../recovery/RecoverySourceHandler.java | 151 ++++++---- .../recovery/RemoteRecoveryTargetHandler.java | 2 +- .../recovery/RecoverySourceHandlerTests.java | 269 ++++++++++-------- .../index/shard/IndexShardTestCase.java | 2 +- .../indices/recovery/AsyncRecoveryTarget.java | 1 - 8 files changed, 456 insertions(+), 187 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java diff --git a/server/src/main/java/org/elasticsearch/action/StepListener.java b/server/src/main/java/org/elasticsearch/action/StepListener.java index 160ba23da24..43338d94de8 100644 --- a/server/src/main/java/org/elasticsearch/action/StepListener.java +++ b/server/src/main/java/org/elasticsearch/action/StepListener.java @@ -50,7 +50,7 @@ import java.util.function.Consumer; * } */ -public final class StepListener implements ActionListener { +public final class StepListener extends NotifyOnceListener { private final ListenableFuture delegate; public StepListener() { @@ -58,12 +58,12 @@ public final class StepListener implements ActionListener { } @Override - public void onResponse(Response response) { + protected void innerOnResponse(Response response) { delegate.onResponse(response); } @Override - public void onFailure(Exception e) { + protected void innerOnFailure(Exception e) { delegate.onFailure(e); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java new file mode 100644 index 00000000000..6c51a337ecd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/recovery/MultiFileTransfer.java @@ -0,0 +1,210 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Assertions; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.store.StoreFileMetaData; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.function.Consumer; + +import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + +/** + * File chunks are sent/requested sequentially by at most one thread at any time. However, the sender/requestor won't wait for the response + * before processing the next file chunk request to reduce the recovery time especially on secure/compressed or high latency communication. + *

+ * The sender/requestor can send up to {@code maxConcurrentFileChunks} file chunk requests without waiting for responses. Since the recovery + * target can receive file chunks out of order, it has to buffer those file chunks in memory and only flush to disk when there's no gap. + * To ensure the recover target never buffers more than {@code maxConcurrentFileChunks} file chunks, we allow the sender/requestor to send + * only up to {@code maxConcurrentFileChunks} file chunk requests from the last flushed (and acknowledged) file chunk. We leverage the local + * checkpoint tracker for this purpose. We generate a new sequence number and assign it to each file chunk request before sending; then mark + * that sequence number as processed when we receive a response for the corresponding file chunk request. With the local checkpoint tracker, + * we know the last acknowledged-flushed file-chunk is a file chunk whose {@code requestSeqId} equals to the local checkpoint because the + * recover target can flush all file chunks up to the local checkpoint. + *

+ * When the number of un-replied file chunk requests reaches the limit (i.e. the gap between the max_seq_no and the local checkpoint is + * greater than {@code maxConcurrentFileChunks}), the sending/requesting thread will abort its execution. That process will be resumed by + * one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue + * until all chunk requests are sent/responded. + */ +abstract class MultiFileTransfer implements Closeable { + private Status status = Status.PROCESSING; + private final Logger logger; + private final ActionListener listener; + private final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); + private final AsyncIOProcessor processor; + private final int maxConcurrentFileChunks; + private StoreFileMetaData currentFile = null; + private final Iterator remainingFiles; + private Tuple readAheadRequest = null; + + protected MultiFileTransfer(Logger logger, ThreadContext threadContext, ActionListener listener, + int maxConcurrentFileChunks, List files) { + this.logger = logger; + this.maxConcurrentFileChunks = maxConcurrentFileChunks; + this.listener = listener; + this.processor = new AsyncIOProcessor(logger, maxConcurrentFileChunks, threadContext) { + @Override + protected void write(List>> items) { + handleItems(items); + } + }; + this.remainingFiles = files.iterator(); + } + + public final void start() { + addItem(UNASSIGNED_SEQ_NO, null, null); // put a dummy item to start the processor + } + + private void addItem(long requestSeqId, StoreFileMetaData md, Exception failure) { + processor.put(new FileChunkResponseItem(requestSeqId, md, failure), e -> { assert e == null : e; }); + } + + private void handleItems(List>> items) { + if (status != Status.PROCESSING) { + assert status == Status.FAILED : "must not receive any response after the transfer was completed"; + // These exceptions will be ignored as we record only the first failure, log them for debugging purpose. + items.stream().filter(item -> item.v1().failure != null).forEach(item -> + logger.debug(new ParameterizedMessage("failed to transfer a file chunk request {}", item.v1().md), item.v1().failure)); + return; + } + try { + for (Tuple> item : items) { + final FileChunkResponseItem resp = item.v1(); + if (resp.requestSeqId == UNASSIGNED_SEQ_NO) { + continue; // not an actual item + } + requestSeqIdTracker.markSeqNoAsProcessed(resp.requestSeqId); + if (resp.failure != null) { + handleError(resp.md, resp.failure); + throw resp.failure; + } + } + while (requestSeqIdTracker.getMaxSeqNo() - requestSeqIdTracker.getProcessedCheckpoint() < maxConcurrentFileChunks) { + final Tuple request = readAheadRequest != null ? readAheadRequest : getNextRequest(); + readAheadRequest = null; + if (request == null) { + assert currentFile == null && remainingFiles.hasNext() == false; + if (requestSeqIdTracker.getMaxSeqNo() == requestSeqIdTracker.getProcessedCheckpoint()) { + onCompleted(null); + } + return; + } + final long requestSeqId = requestSeqIdTracker.generateSeqNo(); + sendChunkRequest(request.v2(), ActionListener.wrap( + r -> addItem(requestSeqId, request.v1(), null), + e -> addItem(requestSeqId, request.v1(), e))); + } + // While we are waiting for the responses, we can prepare the next request in advance + // so we can send it immediately when the responses arrive to reduce the transfer time. + if (readAheadRequest == null) { + readAheadRequest = getNextRequest(); + } + } catch (Exception e) { + onCompleted(e); + } + } + + private void onCompleted(Exception failure) { + if (Assertions.ENABLED && status != Status.PROCESSING) { + throw new AssertionError("invalid status: expected [" + Status.PROCESSING + "] actual [" + status + "]", failure); + } + status = failure == null ? Status.SUCCESS : Status.FAILED; + try { + IOUtils.close(failure, this); + } catch (Exception e) { + listener.onFailure(e); + return; + } + listener.onResponse(null); + } + + private Tuple getNextRequest() throws Exception { + try { + if (currentFile == null) { + if (remainingFiles.hasNext()) { + currentFile = remainingFiles.next(); + onNewFile(currentFile); + } else { + return null; + } + } + final StoreFileMetaData md = currentFile; + final Request request = nextChunkRequest(md); + if (request.lastChunk()) { + currentFile = null; + } + return Tuple.tuple(md, request); + } catch (Exception e) { + handleError(currentFile, e); + throw e; + } + } + + /** + * This method is called when starting sending/requesting a new file. Subclasses should override + * this method to reset the file offset or close the previous file and open a new file if needed. + */ + protected abstract void onNewFile(StoreFileMetaData md) throws IOException; + + protected abstract Request nextChunkRequest(StoreFileMetaData md) throws IOException; + + protected abstract void sendChunkRequest(Request request, ActionListener listener); + + protected abstract void handleError(StoreFileMetaData md, Exception e) throws Exception; + + private static class FileChunkResponseItem { + final long requestSeqId; + final StoreFileMetaData md; + final Exception failure; + + FileChunkResponseItem(long requestSeqId, StoreFileMetaData md, Exception failure) { + this.requestSeqId = requestSeqId; + this.md = md; + this.failure = failure; + } + } + + protected interface ChunkRequest { + /** + * @return {@code true} if this chunk request is the last chunk of the current file + */ + boolean lastChunk(); + } + + private enum Status { + PROCESSING, + SUCCESS, + FAILED + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index f53e8edecd9..686ccc799ee 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -176,7 +176,7 @@ public class PeerRecoverySourceService implements IndexEventListener { final RemoteRecoveryTargetHandler recoveryTarget = new RemoteRecoveryTargetHandler(request.recoveryId(), request.shardId(), transportService, request.targetNode(), recoverySettings, throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)); - handler = new RecoverySourceHandler(shard, recoveryTarget, request, + handler = new RecoverySourceHandler(shard, recoveryTarget, shard.getThreadPool(), request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), recoverySettings.getMaxConcurrentFileChunks()); return handler; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 9f5148f0232..839e12d87fd 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -38,7 +38,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.Loggers; @@ -47,10 +47,10 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; -import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -62,11 +62,12 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -74,13 +75,10 @@ import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.IntSupplier; import java.util.stream.StreamSupport; -import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; - /** * RecoverySourceHandler handles the three phases of shard recovery, which is * everything relating to copying the segment files as well as sending translog @@ -103,12 +101,15 @@ public class RecoverySourceHandler { private final int chunkSizeInBytes; private final RecoveryTargetHandler recoveryTarget; private final int maxConcurrentFileChunks; + private final ThreadPool threadPool; private final CancellableThreads cancellableThreads = new CancellableThreads(); + private final List resources = new CopyOnWriteArrayList<>(); - public RecoverySourceHandler(final IndexShard shard, RecoveryTargetHandler recoveryTarget, final StartRecoveryRequest request, - final int fileChunkSizeInBytes, final int maxConcurrentFileChunks) { + public RecoverySourceHandler(IndexShard shard, RecoveryTargetHandler recoveryTarget, ThreadPool threadPool, + StartRecoveryRequest request, int fileChunkSizeInBytes, int maxConcurrentFileChunks) { this.shard = shard; this.recoveryTarget = recoveryTarget; + this.threadPool = threadPool; this.request = request; this.shardId = this.request.shardId().id(); this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName()); @@ -125,7 +126,6 @@ public class RecoverySourceHandler { * performs the recovery from the local engine to the target */ public void recoverToTarget(ActionListener listener) { - final List resources = new CopyOnWriteArrayList<>(); final Closeable releaseResources = () -> IOUtils.close(resources); final ActionListener wrappedListener = ActionListener.notifyOnce(listener); try { @@ -407,15 +407,17 @@ public class RecoverySourceHandler { phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes)); final StepListener sendFileInfoStep = new StepListener<>(); + final StepListener sendFilesStep = new StepListener<>(); final StepListener cleanFilesStep = new StepListener<>(); cancellableThreads.execute(() -> recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep)); - sendFileInfoStep.whenComplete(r -> { - sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps); - cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep); - }, listener::onFailure); + sendFileInfoStep.whenComplete(r -> + sendFiles(store, phase1Files.toArray(new StoreFileMetaData[0]), translogOps, sendFilesStep), listener::onFailure); + + sendFilesStep.whenComplete(r -> + cleanFiles(store, recoverySourceMetadata, translogOps, globalCheckpoint, cleanFilesStep), listener::onFailure); final long totalSize = totalSizeInBytes; final long existingTotalSize = existingTotalSizeInBytes; @@ -574,6 +576,7 @@ public class RecoverySourceHandler { final long mappingVersionOnPrimary, final ActionListener listener) throws IOException { assert ThreadPool.assertCurrentMethodIsNotCalledRecursively(); + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[send translog]"); final List operations = nextBatch.get(); // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint if (operations.isEmpty() == false || firstBatch) { @@ -672,54 +675,80 @@ public class RecoverySourceHandler { '}'; } - void sendFiles(Store store, StoreFileMetaData[] files, IntSupplier translogOps) throws Exception { + private static class FileChunk implements MultiFileTransfer.ChunkRequest { + final StoreFileMetaData md; + final BytesReference content; + final long position; + final boolean lastChunk; + + FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) { + this.md = md; + this.content = content; + this.position = position; + this.lastChunk = lastChunk; + } + + @Override + public boolean lastChunk() { + return lastChunk; + } + } + + void sendFiles(Store store, StoreFileMetaData[] files, IntSupplier translogOps, ActionListener listener) { ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first - final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); - final AtomicReference> error = new AtomicReference<>(); - final byte[] buffer = new byte[chunkSizeInBytes]; - for (final StoreFileMetaData md : files) { - if (error.get() != null) { - break; - } - try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); - InputStream in = new InputStreamIndexInput(indexInput, md.length())) { - long position = 0; - int bytesRead; - while ((bytesRead = in.read(buffer, 0, buffer.length)) != -1) { - final BytesArray content = new BytesArray(buffer, 0, bytesRead); - final boolean lastChunk = position + content.length() == md.length(); - final long requestSeqId = requestSeqIdTracker.generateSeqNo(); - cancellableThreads.execute( - () -> requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqId - maxConcurrentFileChunks)); - cancellableThreads.checkForCancel(); - if (error.get() != null) { - break; - } - final long requestFilePosition = position; - cancellableThreads.executeIO(() -> - recoveryTarget.writeFileChunk(md, requestFilePosition, content, lastChunk, translogOps.getAsInt(), - ActionListener.wrap( - r -> requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId), - e -> { - error.compareAndSet(null, Tuple.tuple(md, e)); - requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId); - } - ))); - position += content.length(); + final ThreadContext threadContext = threadPool.getThreadContext(); + final MultiFileTransfer multiFileSender = + new MultiFileTransfer(logger, threadContext, listener, maxConcurrentFileChunks, Arrays.asList(files)) { + + final byte[] buffer = new byte[chunkSizeInBytes]; + InputStreamIndexInput currentInput = null; + long offset = 0; + + @Override + protected void onNewFile(StoreFileMetaData md) throws IOException { + offset = 0; + IOUtils.close(currentInput, () -> currentInput = null); + final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); + currentInput = new InputStreamIndexInput(indexInput, md.length()) { + @Override + public void close() throws IOException { + IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop + } + }; } - } catch (Exception e) { - error.compareAndSet(null, Tuple.tuple(md, e)); - break; - } - } - // When we terminate exceptionally, we don't wait for the outstanding requests as we don't use their results anyway. - // This allows us to end quickly and eliminate the complexity of handling requestSeqIds in case of error. - if (error.get() == null) { - cancellableThreads.execute(() -> requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqIdTracker.getMaxSeqNo())); - } - if (error.get() != null) { - handleErrorOnSendFiles(store, error.get().v2(), new StoreFileMetaData[]{error.get().v1()}); - } + + @Override + protected FileChunk nextChunkRequest(StoreFileMetaData md) throws IOException { + assert Transports.assertNotTransportThread("read file chunk"); + cancellableThreads.checkForCancel(); + final int bytesRead = currentInput.read(buffer); + if (bytesRead == -1) { + throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name()); + } + final boolean lastChunk = offset + bytesRead == md.length(); + final FileChunk chunk = new FileChunk(md, new BytesArray(buffer, 0, bytesRead), offset, lastChunk); + offset += bytesRead; + return chunk; + } + + @Override + protected void sendChunkRequest(FileChunk request, ActionListener listener) { + cancellableThreads.execute(() -> recoveryTarget.writeFileChunk( + request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener)); + } + + @Override + protected void handleError(StoreFileMetaData md, Exception e) throws Exception { + handleErrorOnSendFiles(store, e, new StoreFileMetaData[]{md}); + } + + @Override + public void close() throws IOException { + IOUtils.close(currentInput, () -> currentInput = null); + } + }; + resources.add(multiFileSender); + multiFileSender.start(); } private void cleanFiles(Store store, Store.MetadataSnapshot sourceMetadata, IntSupplier translogOps, @@ -743,6 +772,7 @@ public class RecoverySourceHandler { private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetaData[] mds) throws Exception { final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e); + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[handle error on send/clean files]"); if (corruptIndexException != null) { Exception localException = null; for (StoreFileMetaData md : mds) { @@ -766,9 +796,8 @@ public class RecoverySourceHandler { shardId, request.targetNode(), mds), corruptIndexException); throw remoteException; } - } else { - throw e; } + throw e; } protected void failEngine(IOException cause) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index bb5457c1a3d..f6f8baedd9d 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -181,7 +181,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. */ throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>( - ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE)); + ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 215bf475a0c..f6e1de0233b 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -32,11 +32,12 @@ import org.apache.lucene.index.Term; import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefIterator; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -77,6 +78,7 @@ import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; @@ -93,10 +95,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.IntSupplier; import java.util.zip.CRC32; @@ -105,7 +108,7 @@ import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.core.IsNull.notNullValue; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyObject; @@ -121,10 +124,19 @@ public class RecoverySourceHandlerTests extends ESTestCase { private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); private ThreadPool threadPool; + private Executor recoveryExecutor; @Before public void setUpThreadPool() { - threadPool = new TestThreadPool(getTestName()); + if (randomBoolean()) { + threadPool = new TestThreadPool(getTestName()); + recoveryExecutor = threadPool.generic(); + } else { + // verify that both sending and receiving files can be completed with a single thread + threadPool = new TestThreadPool(getTestName(), + new FixedExecutorBuilder(Settings.EMPTY, "recovery_executor", between(1, 16), between(16, 128), "recovery_executor")); + recoveryExecutor = threadPool.executor("recovery_executor"); + } } @After @@ -133,9 +145,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { } public void testSendFiles() throws Throwable { - Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1). - put("indices.recovery.concurrent_small_file_streams", 1).build(); - final RecoverySettings recoverySettings = new RecoverySettings(settings, service); + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); final StartRecoveryRequest request = getStartRecoveryRequest(); Store store = newStore(createTempDir()); Directory dir = store.directory(); @@ -156,38 +166,22 @@ public class RecoverySourceHandlerTests extends ESTestCase { metas.add(md); } Store targetStore = newStore(createTempDir()); + MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {}); RecoveryTargetHandler target = new TestRecoveryTargetHandler() { - IndexOutputOutputStream out; @Override public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener) { - try { - if (position == 0) { - out = new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) { - @Override - public void close() throws IOException { - super.close(); - targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it - } - }; - } - final BytesRefIterator iterator = content.iterator(); - BytesRef scratch; - while ((scratch = iterator.next()) != null) { - out.write(scratch.bytes, scratch.offset, scratch.length); - } - if (lastChunk) { - out.close(); - } - listener.onResponse(null); - } catch (Exception e) { - listener.onFailure(e); - } + ActionListener.completeWith(listener, () -> { + multiFileWriter.writeFileChunk(md, position, content, lastChunk); + return null; + }); } }; - RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5)); - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); + RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), + threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 5)); + PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0, sendFilesFuture); + sendFilesFuture.actionGet(); Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata(null); Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata); assertEquals(metas.size(), recoveryDiff.identical.size()); @@ -195,7 +189,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { assertEquals(0, recoveryDiff.missing.size()); IndexReader reader = DirectoryReader.open(targetStore.directory()); assertEquals(numDocs, reader.maxDoc()); - IOUtils.close(reader, store, targetStore); + IOUtils.close(reader, store, multiFileWriter, targetStore); } public StartRecoveryRequest getStartRecoveryRequest() throws IOException { @@ -241,10 +235,11 @@ public class RecoverySourceHandlerTests extends ESTestCase { RetentionLeases retentionLeases, long mappingVersion, ActionListener listener) { shippedOps.addAll(operations); checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE)); - listener.onResponse(checkpointOnTarget.get()); } + listener.onResponse(checkpointOnTarget.get()); + } }; - RecoverySourceHandler handler = new RecoverySourceHandler( - shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10)); + RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), + threadPool, request, fileChunkSizeInBytes, between(1, 10)); PlainActionFuture future = new PlainActionFuture<>(); handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, randomNonNegativeLong(), future); @@ -283,8 +278,8 @@ public class RecoverySourceHandlerTests extends ESTestCase { } } }; - RecoverySourceHandler handler = new RecoverySourceHandler( - shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10)); + RecoverySourceHandler handler = new RecoverySourceHandler(shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), + threadPool, request, fileChunkSizeInBytes, between(1, 10)); PlainActionFuture future = new PlainActionFuture<>(); final long startingSeqNo = randomLongBetween(0, ops.size() - 1L); final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L); @@ -343,52 +338,36 @@ public class RecoverySourceHandlerTests extends ESTestCase { (p.getFileName().toString().equals("write.lock") || p.getFileName().toString().startsWith("extra")) == false)); Store targetStore = newStore(createTempDir(), false); + MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {}); RecoveryTargetHandler target = new TestRecoveryTargetHandler() { - IndexOutputOutputStream out; @Override public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener) { - try { - if (position == 0) { - out = new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT)) { - @Override - public void close() throws IOException { - super.close(); - targetStore.directory().sync(Collections.singleton(md.name())); // sync otherwise MDW will mess with it - } - }; - } - final BytesRefIterator iterator = content.iterator(); - BytesRef scratch; - while ((scratch = iterator.next()) != null) { - out.write(scratch.bytes, scratch.offset, scratch.length); - } - if (lastChunk) { - out.close(); - } - listener.onResponse(null); - } catch (Exception e) { - IOUtils.closeWhileHandlingException(out, () -> listener.onFailure(e)); - } + ActionListener.completeWith(listener, () -> { + multiFileWriter.writeFileChunk(md, position, content, lastChunk); + return null; + }); } }; - RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8)) { + RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, + request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8)) { @Override protected void failEngine(IOException cause) { assertFalse(failedEngine.get()); failedEngine.set(true); } }; - - try { - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); - fail("corrupted index"); - } catch (IOException ex) { - assertNotNull(ExceptionsHelper.unwrapCorruption(ex)); - } + SetOnce sendFilesError = new SetOnce<>(); + CountDownLatch latch = new CountDownLatch(1); + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0, + new LatchedActionListener<>(ActionListener.wrap(r -> sendFilesError.set(null), e -> sendFilesError.set(e)), latch)); + latch.await(); + assertThat(sendFilesError.get(), instanceOf(IOException.class)); + assertNotNull(ExceptionsHelper.unwrapCorruption(sendFilesError.get())); assertTrue(failedEngine.get()); - IOUtils.close(store, targetStore); + // ensure all chunk requests have been completed; otherwise some files on the target are left open. + IOUtils.close(() -> terminate(threadPool), () -> threadPool = null); + IOUtils.close(store, multiFileWriter, targetStore); } @@ -427,28 +406,24 @@ public class RecoverySourceHandlerTests extends ESTestCase { } } }; - RecoverySourceHandler handler = new RecoverySourceHandler(null, target, request, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10)) { + RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, + request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10)) { @Override protected void failEngine(IOException cause) { assertFalse(failedEngine.get()); failedEngine.set(true); } }; - try { - handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0); - fail("exception index"); - } catch (RuntimeException ex) { - final IOException unwrappedCorruption = ExceptionsHelper.unwrapCorruption(ex); - if (throwCorruptedIndexException) { - assertNotNull(unwrappedCorruption); - assertEquals(ex.getMessage(), "[File corruption occurred on recovery but checksums are ok]"); - } else { - assertNull(unwrappedCorruption); - assertEquals(ex.getMessage(), "boom"); - } - } catch (CorruptIndexException ex) { - fail("not expected here"); + PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); + handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), () -> 0, sendFilesFuture); + Exception ex = expectThrows(Exception.class, sendFilesFuture::actionGet); + final IOException unwrappedCorruption = ExceptionsHelper.unwrapCorruption(ex); + if (throwCorruptedIndexException) { + assertNotNull(unwrappedCorruption); + assertEquals(ex.getMessage(), "[File corruption occurred on recovery but checksums are ok]"); + } else { + assertNull(unwrappedCorruption); + assertEquals(ex.getMessage(), "boom"); } assertFalse(failedEngine.get()); IOUtils.close(store); @@ -472,6 +447,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { final RecoverySourceHandler handler = new RecoverySourceHandler( shard, mock(RecoveryTargetHandler.class), + threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8)) { @@ -550,19 +526,13 @@ public class RecoverySourceHandlerTests extends ESTestCase { }; final int maxConcurrentChunks = between(1, 8); final int chunkSize = between(1, 32); - final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, getStartRecoveryRequest(), + final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, threadPool, getStartRecoveryRequest(), chunkSize, maxConcurrentChunks); Store store = newStore(createTempDir(), false); List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); - Thread sender = new Thread(() -> { - try { - handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0); - } catch (Exception ex) { - throw new AssertionError(ex); - } - }); - sender.start(); + PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); + handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0, sendFilesFuture); assertBusy(() -> { assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))); assertThat(unrepliedChunks, hasSize(sentChunks.get())); @@ -594,13 +564,11 @@ public class RecoverySourceHandlerTests extends ESTestCase { assertThat(unrepliedChunks, hasSize(expectedUnrepliedChunks)); }); } - sender.join(); + sendFilesFuture.actionGet(); store.close(); } public void testSendFileChunksStopOnError() throws Exception { - final IndexShard shard = mock(IndexShard.class); - when(shard.state()).thenReturn(IndexShardState.STARTED); final List unrepliedChunks = new CopyOnWriteArrayList<>(); final AtomicInteger sentChunks = new AtomicInteger(); final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { @@ -616,23 +584,23 @@ public class RecoverySourceHandlerTests extends ESTestCase { }; final int maxConcurrentChunks = between(1, 4); final int chunkSize = between(1, 16); - final RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, getStartRecoveryRequest(), - chunkSize, maxConcurrentChunks); + final RecoverySourceHandler handler = new RecoverySourceHandler(null, new AsyncRecoveryTarget(recoveryTarget, recoveryExecutor), + threadPool, getStartRecoveryRequest(), chunkSize, maxConcurrentChunks); Store store = newStore(createTempDir(), false); List files = generateFiles(store, between(1, 10), () -> between(1, chunkSize * 20)); int totalChunks = files.stream().mapToInt(md -> ((int) md.length() + chunkSize - 1) / chunkSize).sum(); - AtomicReference error = new AtomicReference<>(); - Thread sender = new Thread(() -> { - try { - handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0); - } catch (Exception ex) { - error.set(ex); - } - }); - sender.start(); + SetOnce sendFilesError = new SetOnce<>(); + CountDownLatch sendFilesLatch = new CountDownLatch(1); + handler.sendFiles(store, files.toArray(new StoreFileMetaData[0]), () -> 0, + new LatchedActionListener<>(ActionListener.wrap(r -> sendFilesError.set(null), e -> sendFilesError.set(e)), sendFilesLatch)); assertBusy(() -> assertThat(sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks)))); List failedChunks = randomSubsetOf(between(1, unrepliedChunks.size()), unrepliedChunks); - failedChunks.forEach(c -> c.listener.onFailure(new RuntimeException("test chunk exception"))); + CountDownLatch replyLatch = new CountDownLatch(failedChunks.size()); + failedChunks.forEach(c -> { + c.listener.onFailure(new IllegalStateException("test chunk exception")); + replyLatch.countDown(); + }); + replyLatch.await(); unrepliedChunks.removeAll(failedChunks); unrepliedChunks.forEach(c -> { if (randomBoolean()) { @@ -641,12 +609,75 @@ public class RecoverySourceHandlerTests extends ESTestCase { c.listener.onResponse(null); } }); - assertBusy(() -> { - assertThat(error.get(), notNullValue()); - assertThat(error.get().getMessage(), containsString("test chunk exception")); - }); + sendFilesLatch.await(); + assertThat(sendFilesError.get(), instanceOf(IllegalStateException.class)); + assertThat(sendFilesError.get().getMessage(), containsString("test chunk exception")); assertThat("no more chunks should be sent", sentChunks.get(), equalTo(Math.min(totalChunks, maxConcurrentChunks))); - sender.join(); + store.close(); + } + + public void testCancelRecoveryDuringPhase1() throws Exception { + Store store = newStore(createTempDir("source"), false); + IndexShard shard = mock(IndexShard.class); + when(shard.store()).thenReturn(store); + Directory dir = store.directory(); + RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig()); + int numDocs = randomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + Document document = new Document(); + document.add(new StringField("id", Integer.toString(i), Field.Store.YES)); + document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED)); + writer.addDocument(document); + } + writer.commit(); + writer.close(); + AtomicBoolean wasCancelled = new AtomicBoolean(); + SetOnce cancelRecovery = new SetOnce<>(); + final TestRecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { + @Override + public void receiveFileInfo(List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, + List phase1ExistingFileSizes, int totalTranslogOps, ActionListener listener) { + recoveryExecutor.execute(() -> listener.onResponse(null)); + if (randomBoolean()) { + wasCancelled.set(true); + cancelRecovery.get().run(); + } + } + + @Override + public void writeFileChunk(StoreFileMetaData md, long position, BytesReference content, + boolean lastChunk, int totalTranslogOps, ActionListener listener) { + recoveryExecutor.execute(() -> listener.onResponse(null)); + if (rarely()) { + wasCancelled.set(true); + cancelRecovery.get().run(); + } + } + + @Override + public void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetaData, + ActionListener listener) { + recoveryExecutor.execute(() -> listener.onResponse(null)); + if (randomBoolean()) { + wasCancelled.set(true); + cancelRecovery.get().run(); + } + } + }; + final RecoverySourceHandler handler = new RecoverySourceHandler( + shard, recoveryTarget, threadPool, getStartRecoveryRequest(), between(1, 16), between(1, 4)); + cancelRecovery.set(() -> handler.cancel("test")); + final StepListener phase1Listener = new StepListener<>(); + try { + final CountDownLatch latch = new CountDownLatch(1); + handler.phase1(DirectoryReader.listCommits(dir).get(0), randomNonNegativeLong(), () -> 0, + new LatchedActionListener<>(phase1Listener, latch)); + latch.await(); + phase1Listener.result(); + } catch (Exception e) { + assertTrue(wasCancelled.get()); + assertNotNull(ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class)); + } store.close(); } @@ -654,7 +685,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { IndexShard shard = mock(IndexShard.class); when(shard.state()).thenReturn(IndexShardState.STARTED); RecoverySourceHandler handler = new RecoverySourceHandler( - shard, new TestRecoveryTargetHandler(), getStartRecoveryRequest(), between(1, 16), between(1, 4)); + shard, new TestRecoveryTargetHandler(), threadPool, getStartRecoveryRequest(), between(1, 16), between(1, 4)); String syncId = UUIDs.randomBase64UUID(); int numDocs = between(0, 1000); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 47a8f73ef62..e36f5e39990 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -635,7 +635,7 @@ public abstract class IndexShardTestCase extends ESTestCase { final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId, pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo); final RecoverySourceHandler recovery = new RecoverySourceHandler(primary, - new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), + new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), threadPool, request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8)); primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable); diff --git a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java index afd2aa4e858..1f4198d00a4 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/recovery/AsyncRecoveryTarget.java @@ -83,7 +83,6 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler { @Override public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk, int totalTranslogOps, ActionListener listener) { - // TODO: remove this clone once we send file chunk async final BytesReference copy = new BytesArray(BytesRef.deepCopyOf(content.toBytesRef())); executor.execute(() -> target.writeFileChunk(fileMetaData, position, copy, lastChunk, totalTranslogOps, listener)); }