From 043f18d5ff14f3d3262ec9613ed55a0873c9eab6 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 19 Nov 2014 17:22:57 +0100 Subject: [PATCH] [RECOVERY] Allow to cancle recovery sources when shards are closed Today recovery sources are not cancled if a shard is closed. The recovery target is already cancled when shards are closed but we should also cleanup and cancel the sources side since it holds on to shard locks / references until it's closed. --- .../util/concurrent/AbstractRunnable.java | 10 + .../util/concurrent/EsThreadPoolExecutor.java | 7 +- .../indices/recovery/RecoverySource.java | 68 ++++- .../recovery/ShardRecoveryHandler.java | 262 +++++++++++++----- 4 files changed, 277 insertions(+), 70 deletions(-) diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.java b/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.java index ef996bd38f6..1bca7401f16 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.java @@ -39,9 +39,19 @@ public abstract class AbstractRunnable implements Runnable { onFailure(ex); } catch (Throwable t) { onFailure(t); + } finally { + onAfter(); } } + /** + * This method is called in a finally block after successful execution + * or on a rejection. + */ + public void onAfter() { + // nothing by default + } + /** * This method is invoked for all exception thrown by {@link #doRun()} */ diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index e9ad347404f..e727d35304d 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -81,7 +81,12 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { if (command instanceof AbstractRunnable) { // If we are an abstract runnable we can handle the rejection // directly and don't need to rethrow it. - ((AbstractRunnable)command).onRejection(ex); + try { + ((AbstractRunnable) command).onRejection(ex); + } finally { + ((AbstractRunnable) command).onAfter(); + + } } else { throw ex; } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index 55bbe7ed10d..a13ef4e1779 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -19,22 +19,31 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportRequestHandler; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportService; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + /** * The source recovery accepts recovery requests from other peer shards and start the recovery process from this * source shard to the target shard. @@ -54,6 +63,7 @@ public class RecoverySource extends AbstractComponent { private final TimeValue internalActionTimeout; private final TimeValue internalActionLongTimeout; + private final OngoingRecoveres ongoingRecoveries = new OngoingRecoveres(); @Inject @@ -64,6 +74,14 @@ public class RecoverySource extends AbstractComponent { this.indicesService = indicesService; this.mappingUpdatedAction = mappingUpdatedAction; this.clusterService = clusterService; + this.indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() { + @Override + public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard) { + if (indexShard != null) { + ongoingRecoveries.cancel(indexShard, "shard is closed"); + } + } + }); this.recoverySettings = recoverySettings; @@ -102,10 +120,14 @@ public class RecoverySource extends AbstractComponent { logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated()); - ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, internalActionTimeout, + final ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, internalActionTimeout, internalActionLongTimeout, clusterService, indicesService, mappingUpdatedAction, logger); - - shard.recover(handler); + ongoingRecoveries.add(shard, handler); + try { + shard.recover(handler); + } finally { + ongoingRecoveries.remove(shard, handler); + } return handler.getResponse(); } @@ -127,5 +149,45 @@ public class RecoverySource extends AbstractComponent { channel.sendResponse(response); } } + + + private static final class OngoingRecoveres { + private final Map> ongoingRecoveries = new HashMap<>(); + + synchronized void add(IndexShard shard, ShardRecoveryHandler handler) { + Set shardRecoveryHandlers = ongoingRecoveries.get(shard); + if (shardRecoveryHandlers == null) { + shardRecoveryHandlers = new HashSet<>(); + ongoingRecoveries.put(shard, shardRecoveryHandlers); + } + assert shardRecoveryHandlers.contains(handler) == false : "Handler was already registered [" + handler + "]"; + shardRecoveryHandlers.add(handler); + } + + synchronized void remove(IndexShard shard, ShardRecoveryHandler handler) { + final Set shardRecoveryHandlers = ongoingRecoveries.get(shard); + assert shardRecoveryHandlers != null : "Shard was not registered [" + shard + "]"; + boolean remove = shardRecoveryHandlers.remove(handler); + assert remove : "Handler was not registered [" + handler + "]"; + if (shardRecoveryHandlers.isEmpty()) { + ongoingRecoveries.remove(shard); + } + } + + synchronized void cancel(IndexShard shard, String reason) { + final Set shardRecoveryHandlers = ongoingRecoveries.get(shard); + if (shardRecoveryHandlers != null) { + final List failures = new ArrayList<>(); + for (ShardRecoveryHandler handlers : shardRecoveryHandlers) { + try { + handlers.cancel(reason); + } catch (Exception ex) { + failures.add(ex); + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(failures); + } + } + } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java index acc6ef917a2..53f3b366084 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.Engine; @@ -59,6 +60,7 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.*; @@ -87,6 +89,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { private final MappingUpdatedAction mappingUpdatedAction; private final RecoveryResponse response; + private final CancelableThreads cancelableThreads = new CancelableThreads(); public ShardRecoveryHandler(final InternalIndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings, final TransportService transportService, final TimeValue internalActionTimeout, @@ -129,6 +132,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { */ @Override public void phase1(final SnapshotIndexCommit snapshot) throws ElasticsearchException { + cancelableThreads.failIfCanceled(); // Total size of segment files that are recovered long totalSize = 0; // Total size of segment files that were able to be re-used @@ -178,13 +182,18 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", indexName, shardId, request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); + cancelableThreads.run(new Interruptable() { + @Override + public void run() throws InterruptedException { + RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(), + response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, + response.phase1TotalSize, response.phase1ExistingTotalSize); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, + TransportRequestOptions.options().withTimeout(internalActionTimeout), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + }); - RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(), - response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, - response.phase1TotalSize, response.phase1ExistingTotalSize); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, - TransportRequestOptions.options().withTimeout(internalActionTimeout), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); // This latch will be used to wait until all files have been transferred to the target node final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size()); @@ -213,9 +222,21 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { pool = recoverySettings.concurrentSmallFileStreamPool(); } - pool.execute(new Runnable() { + pool.execute(new AbstractRunnable() { @Override - public void run() { + public void onFailure(Throwable t) { + // we either got rejected or the store can't be incremented / we are canceled + logger.debug("Failed to transfer file [" + name + "] on recovery"); + } + + public void onAfter() { + // Signify this file has completed by decrementing the latch + latch.countDown(); + } + + @Override + protected void doRun() { + cancelableThreads.failIfCanceled(); store.incRef(); final StoreFileMetaData md = recoverySourceMetadata.get(name); try (final IndexInput indexInput = store.directory().openInput(name, IOContext.READONCE)) { @@ -226,9 +247,9 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { shouldCompressRequest = false; } - long len = indexInput.length(); + final long len = indexInput.length(); long readCount = 0; - TransportRequestOptions requestOptions = TransportRequestOptions.options() + final TransportRequestOptions requestOptions = TransportRequestOptions.options() .withCompress(shouldCompressRequest) .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(internalActionTimeout); @@ -238,7 +259,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { throw new IndexShardClosedException(shard.shardId()); } int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE; - long position = indexInput.getFilePointer(); + final long position = indexInput.getFilePointer(); // Pause using the rate limiter, if desired, to throttle the recovery if (recoverySettings.rateLimiter() != null) { @@ -246,12 +267,19 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { } indexInput.readBytes(buf, 0, toRead, false); - BytesArray content = new BytesArray(buf, 0, toRead); + final BytesArray content = new BytesArray(buf, 0, toRead); readCount += toRead; - // Actually send the file chunk to the target node, waiting for it to complete - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, - new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, readCount == len), - requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + final boolean lastChunk = readCount == len; + cancelableThreads.run(new Interruptable() { + @Override + public void run() throws InterruptedException { + // Actually send the file chunk to the target node, waiting for it to complete + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, + new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, lastChunk), + requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + }); + } } catch (Throwable e) { final Throwable corruptIndexException; @@ -274,39 +302,46 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { exceptions.add(0, e); // last exceptions first } } finally { - try { - store.decRef(); - } finally { - // Signify this file has completed by decrementing the latch - latch.countDown(); - } + store.decRef(); + } } }); fileIndex++; } - // Wait for all files that need to be transferred to finish transferring - latch.await(); + cancelableThreads.run(new Interruptable() { + @Override + public void run() throws InterruptedException { + // Wait for all files that need to be transferred to finish transferring + latch.await(); + } + }); + if (corruptedEngine.get() != null) { throw corruptedEngine.get(); } else { ExceptionsHelper.rethrowAndSuppress(exceptions); } - Set snapshotFiles = Sets.newHashSet(snapshot.getFiles()); - // Send the CLEAN_FILES request, which takes all of the files that - // were transferred and renames them from their temporary file - // names to the actual file names. It also writes checksums for - // the files after they have been renamed. - // - // Once the files have been renamed, any other files that are not - // related to this recovery (out of date segments, for example) - // are deleted - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, - new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles), - TransportRequestOptions.options().withTimeout(internalActionTimeout), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + cancelableThreads.run(new Interruptable() { + @Override + public void run() throws InterruptedException { + final Set snapshotFiles = Sets.newHashSet(snapshot.getFiles()); + // Send the CLEAN_FILES request, which takes all of the files that + // were transferred and renames them from their temporary file + // names to the actual file names. It also writes checksums for + // the files after they have been renamed. + // + // Once the files have been renamed, any other files that are not + // related to this recovery (out of date segments, for example) + // are deleted + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, + new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), snapshotFiles), + TransportRequestOptions.options().withTimeout(internalActionTimeout), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + }); stopWatch.stop(); logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime()); @@ -334,14 +369,21 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } + cancelableThreads.failIfCanceled(); logger.trace("{} recovery [phase2] to {}: start", request.shardId(), request.targetNode()); StopWatch stopWatch = new StopWatch().start(); - // Send a request preparing the new shard's translog to receive - // operations. This ensures the shard engine is started and disables - // garbage collection (not the JVM's GC!) of tombstone deletes - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), - TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + cancelableThreads.run(new Interruptable() { + @Override + public void run() throws InterruptedException { + // Send a request preparing the new shard's translog to receive + // operations. This ensures the shard engine is started and disables + // garbage collection (not the JVM's GC!) of tombstone deletes + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, + new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId()), + TransportRequestOptions.options().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + }); + stopWatch.stop(); response.startTime = stopWatch.totalTime().millis(); logger.trace("{} recovery [phase2] to {}: start took [{}]", @@ -378,21 +420,28 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } + cancelableThreads.failIfCanceled(); logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", indexName, shardId, request.targetNode()); StopWatch stopWatch = new StopWatch().start(); // Send the translog operations to the target node int totalOperations = sendSnapshot(snapshot); - // Send the FINALIZE request to the target node. The finalize request - // clears unreferenced translog files, refreshes the engine now that - // new segments are available, and enables garbage collection of - // tombstone files. The shard is also moved to the POST_RECOVERY phase - // during this time - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, - new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()), - TransportRequestOptions.options().withTimeout(internalActionLongTimeout), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + cancelableThreads.run(new Interruptable() { + @Override + public void run() throws InterruptedException { + // Send the FINALIZE request to the target node. The finalize request + // clears unreferenced translog files, refreshes the engine now that + // new segments are available, and enables garbage collection of + // tombstone files. The shard is also moved to the POST_RECOVERY phase + // during this time + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, + new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()), + TransportRequestOptions.options().withTimeout(internalActionLongTimeout), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + }); + if (request.markAsRelocated()) { // TODO what happens if the recovery process fails afterwards, we need to mark this back to started @@ -455,11 +504,12 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { latch.countDown(); } }); - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + cancelableThreads.run(new Interruptable() { + @Override + public void run() throws InterruptedException { + latch.await(); + } + }); if (documentMappersToUpdate.isEmpty()) { return; } @@ -503,10 +553,10 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { int ops = 0; long size = 0; int totalOperations = 0; - List operations = Lists.newArrayList(); + final List operations = Lists.newArrayList(); Translog.Operation operation = snapshot.next(); - TransportRequestOptions recoveryOptions = TransportRequestOptions.options() + final TransportRequestOptions recoveryOptions = TransportRequestOptions.options() .withCompress(recoverySettings.compress()) .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(internalActionLongTimeout); @@ -515,7 +565,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); } - + cancelableThreads.failIfCanceled(); operations.add(operation); ops += 1; size += operation.estimateSize(); @@ -534,9 +584,15 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { // recoverySettings.rateLimiter().pause(size); // } - RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, - recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + cancelableThreads.run(new Interruptable() { + @Override + public void run() throws InterruptedException { + final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, + recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + }); + ops = 0; size = 0; operations.clear(); @@ -545,10 +601,84 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { } // send the leftover if (!operations.isEmpty()) { - RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, - recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + cancelableThreads.run(new Interruptable() { + @Override + public void run() throws InterruptedException { + RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, + recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } + }); + } return totalOperations; } + + /** + * Cancels the recovery and interrupts all eligible threads. + */ + public void cancel(String reason) { + cancelableThreads.cancel(reason); + } + + private static final class CancelableThreads { + private final Set threads = new HashSet<>(); + private boolean canceled = false; + private String reason; + + public synchronized boolean isCanceled() { + return canceled; + } + + + public synchronized void failIfCanceled() { + if (isCanceled()) { + throw new ElasticsearchException("recovery was canceled reason [" + reason + "]"); + } + } + private synchronized void add() { + failIfCanceled(); + threads.add(Thread.currentThread()); + } + + public void run(Interruptable interruptable) { + add(); + try { + interruptable.run(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + remove(); + } + } + + private synchronized void remove() { + threads.remove(Thread.currentThread()); + failIfCanceled(); + } + + public synchronized void cancel(String reason) { + canceled = true; + this.reason = reason; + for (Thread thread : threads) { + thread.interrupt(); + } + threads.clear(); + } + + + } + + interface Interruptable { + public void run() throws InterruptedException; + } + + @Override + public String toString() { + return "ShardRecoveryHandler{" + + "shardId=" + request.shardId() + + ", sourceNode=" + request.sourceNode() + + ", targetNode=" + request.targetNode() + + '}'; + } }