diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/OnGoingRecovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/OnGoingRecovery.java new file mode 100644 index 00000000000..eb6487b2a9f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/OnGoingRecovery.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard.recovery; + +import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; + +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +/** + * @author kimchy (shay.banon) + */ +public class OnGoingRecovery { + + public static enum Stage { + INIT, + FILES, + TRANSLOG, + FINALIZE + } + + ConcurrentMap openIndexOutputs = ConcurrentCollections.newConcurrentMap(); + + List phase1FileNames; + List phase1FileSizes; + List phase1ExistingFileNames; + List phase1ExistingFileSizes; + long phase1TotalSize; + long phase1ExistingTotalSize; + + volatile Stage stage = Stage.INIT; + volatile long currentTranslogOperations = 0; +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFilesInfoRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFilesInfoRequest.java new file mode 100644 index 00000000000..ae0080a47cf --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryFilesInfoRequest.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard.recovery; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * @author kimchy (shay.banon) + */ +class RecoveryFilesInfoRequest implements Streamable { + + ShardId shardId; + + List phase1FileNames; + List phase1FileSizes; + List phase1ExistingFileNames; + List phase1ExistingFileSizes; + long phase1TotalSize; + long phase1ExistingTotalSize; + + RecoveryFilesInfoRequest() { + } + + RecoveryFilesInfoRequest(ShardId shardId, List phase1FileNames, List phase1FileSizes, List phase1ExistingFileNames, List phase1ExistingFileSizes, long phase1TotalSize, long phase1ExistingTotalSize) { + this.shardId = shardId; + this.phase1FileNames = phase1FileNames; + this.phase1FileSizes = phase1FileSizes; + this.phase1ExistingFileNames = phase1ExistingFileNames; + this.phase1ExistingFileSizes = phase1ExistingFileSizes; + this.phase1TotalSize = phase1TotalSize; + this.phase1ExistingTotalSize = phase1ExistingTotalSize; + } + + @Override public void readFrom(StreamInput in) throws IOException { + shardId = ShardId.readShardId(in); + int size = in.readVInt(); + phase1FileNames = new ArrayList(size); + for (int i = 0; i < size; i++) { + phase1FileNames.add(in.readUTF()); + } + + size = in.readVInt(); + phase1FileSizes = new ArrayList(size); + for (int i = 0; i < size; i++) { + phase1FileSizes.add(in.readVLong()); + } + + size = in.readVInt(); + phase1ExistingFileNames = new ArrayList(size); + for (int i = 0; i < size; i++) { + phase1ExistingFileNames.add(in.readUTF()); + } + + size = in.readVInt(); + for (int i = 0; i < size; i++) { + phase1ExistingFileSizes.add(in.readVLong()); + } + + phase1TotalSize = in.readVLong(); + phase1ExistingTotalSize = in.readVLong(); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + + out.writeVInt(phase1FileNames.size()); + for (String phase1FileName : phase1FileNames) { + out.writeUTF(phase1FileName); + } + + out.writeVInt(phase1FileSizes.size()); + for (Long phase1FileSize : phase1FileSizes) { + out.writeVLong(phase1FileSize); + } + + out.writeVInt(phase1ExistingFileNames.size()); + for (String phase1ExistingFileName : phase1ExistingFileNames) { + out.writeUTF(phase1ExistingFileName); + } + + out.writeVInt(phase1ExistingFileSizes.size()); + for (Long phase1ExistingFileSize : phase1ExistingFileSizes) { + out.writeVLong(phase1ExistingFileSize); + } + + out.writeVLong(phase1TotalSize); + out.writeVLong(phase1ExistingTotalSize); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java index d1356624014..45fe59a5920 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java @@ -139,6 +139,10 @@ public class RecoverySource extends AbstractComponent { logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); + RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.shardId(), response.phase1FileNames, response.phase1FileSizes, + response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE).txGet(); + final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size()); final AtomicReference lastException = new AtomicReference(); for (final String name : response.phase1FileNames) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java index 9262f7433eb..f0cbb6cd37d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoveryTarget.java @@ -62,6 +62,7 @@ import static org.elasticsearch.common.unit.TimeValue.*; public class RecoveryTarget extends AbstractComponent { public static class Actions { + public static final String FILES_INFO = "index/shard/recovery/filesInfo"; public static final String FILE_CHUNK = "index/shard/recovery/fileChunk"; public static final String CLEAN_FILES = "index/shard/recovery/cleanFiles"; public static final String TRANSLOG_OPS = "index/shard/recovery/translogOps"; @@ -87,6 +88,7 @@ public class RecoveryTarget extends AbstractComponent { this.indicesService = indicesService; this.recoveryThrottler = recoveryThrottler; + transportService.registerHandler(Actions.FILES_INFO, new FilesInfoRequestHandler()); transportService.registerHandler(Actions.FILE_CHUNK, new FileChunkTransportRequestHandler()); transportService.registerHandler(Actions.CLEAN_FILES, new CleanFilesRequestHandler()); transportService.registerHandler(Actions.PREPARE_TRANSLOG, new PrepareForTranslogOperationsRequestHandler()); @@ -252,10 +254,6 @@ public class RecoveryTarget extends AbstractComponent { } } - static class OnGoingRecovery { - ConcurrentMap openIndexOutputs = ConcurrentCollections.newConcurrentMap(); - } - class PrepareForTranslogOperationsRequestHandler extends BaseTransportRequestHandler { @Override public RecoveryPrepareForTranslogOperationsRequest newInstance() { @@ -264,6 +262,14 @@ public class RecoveryTarget extends AbstractComponent { @Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); + + OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId()); + if (onGoingRecovery == null) { + // shard is getting closed on us + throw new IndexShardClosedException(shard.shardId()); + } + onGoingRecovery.stage = OnGoingRecovery.Stage.TRANSLOG; + shard.performRecoveryPrepareForTranslog(); channel.sendResponse(VoidStreamable.INSTANCE); } @@ -277,6 +283,12 @@ public class RecoveryTarget extends AbstractComponent { @Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception { InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); + OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId()); + if (onGoingRecovery == null) { + // shard is getting closed on us + throw new IndexShardClosedException(shard.shardId()); + } + onGoingRecovery.stage = OnGoingRecovery.Stage.FINALIZE; shard.performRecoveryFinalization(false); channel.sendResponse(VoidStreamable.INSTANCE); } @@ -294,6 +306,38 @@ public class RecoveryTarget extends AbstractComponent { for (Translog.Operation operation : request.operations()) { shard.performRecoveryOperation(operation); } + + OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId()); + if (onGoingRecovery == null) { + // shard is getting closed on us + throw new IndexShardClosedException(shard.shardId()); + } + onGoingRecovery.currentTranslogOperations += request.operations().size(); + + channel.sendResponse(VoidStreamable.INSTANCE); + } + } + + class FilesInfoRequestHandler extends BaseTransportRequestHandler { + + @Override public RecoveryFilesInfoRequest newInstance() { + return new RecoveryFilesInfoRequest(); + } + + @Override public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception { + InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId.index().name()).shardSafe(request.shardId.id()); + OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId()); + if (onGoingRecovery == null) { + // shard is getting closed on us + throw new IndexShardClosedException(shard.shardId()); + } + onGoingRecovery.phase1FileNames = request.phase1FileNames; + onGoingRecovery.phase1FileSizes = request.phase1FileSizes; + onGoingRecovery.phase1ExistingFileNames = request.phase1ExistingFileNames; + onGoingRecovery.phase1ExistingFileSizes = request.phase1ExistingFileSizes; + onGoingRecovery.phase1TotalSize = request.phase1TotalSize; + onGoingRecovery.phase1ExistingTotalSize = request.phase1ExistingTotalSize; + onGoingRecovery.stage = OnGoingRecovery.Stage.FILES; channel.sendResponse(VoidStreamable.INSTANCE); } }