capture current state of peer level recovery

This commit is contained in:
kimchy 2010-08-16 09:00:34 +03:00
parent 57ee1bdc55
commit 1bdce4c7ef
4 changed files with 216 additions and 4 deletions

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
/**
* @author kimchy (shay.banon)
*/
public class OnGoingRecovery {
public static enum Stage {
INIT,
FILES,
TRANSLOG,
FINALIZE
}
ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
List<String> phase1FileNames;
List<Long> phase1FileSizes;
List<String> phase1ExistingFileNames;
List<Long> phase1ExistingFileSizes;
long phase1TotalSize;
long phase1ExistingTotalSize;
volatile Stage stage = Stage.INIT;
volatile long currentTranslogOperations = 0;
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard.recovery;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
class RecoveryFilesInfoRequest implements Streamable {
ShardId shardId;
List<String> phase1FileNames;
List<Long> phase1FileSizes;
List<String> phase1ExistingFileNames;
List<Long> phase1ExistingFileSizes;
long phase1TotalSize;
long phase1ExistingTotalSize;
RecoveryFilesInfoRequest() {
}
RecoveryFilesInfoRequest(ShardId shardId, List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes, long phase1TotalSize, long phase1ExistingTotalSize) {
this.shardId = shardId;
this.phase1FileNames = phase1FileNames;
this.phase1FileSizes = phase1FileSizes;
this.phase1ExistingFileNames = phase1ExistingFileNames;
this.phase1ExistingFileSizes = phase1ExistingFileSizes;
this.phase1TotalSize = phase1TotalSize;
this.phase1ExistingTotalSize = phase1ExistingTotalSize;
}
@Override public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
int size = in.readVInt();
phase1FileNames = new ArrayList<String>(size);
for (int i = 0; i < size; i++) {
phase1FileNames.add(in.readUTF());
}
size = in.readVInt();
phase1FileSizes = new ArrayList<Long>(size);
for (int i = 0; i < size; i++) {
phase1FileSizes.add(in.readVLong());
}
size = in.readVInt();
phase1ExistingFileNames = new ArrayList<String>(size);
for (int i = 0; i < size; i++) {
phase1ExistingFileNames.add(in.readUTF());
}
size = in.readVInt();
for (int i = 0; i < size; i++) {
phase1ExistingFileSizes.add(in.readVLong());
}
phase1TotalSize = in.readVLong();
phase1ExistingTotalSize = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeVInt(phase1FileNames.size());
for (String phase1FileName : phase1FileNames) {
out.writeUTF(phase1FileName);
}
out.writeVInt(phase1FileSizes.size());
for (Long phase1FileSize : phase1FileSizes) {
out.writeVLong(phase1FileSize);
}
out.writeVInt(phase1ExistingFileNames.size());
for (String phase1ExistingFileName : phase1ExistingFileNames) {
out.writeUTF(phase1ExistingFileName);
}
out.writeVInt(phase1ExistingFileSizes.size());
for (Long phase1ExistingFileSize : phase1ExistingFileSizes) {
out.writeVLong(phase1ExistingFileSize);
}
out.writeVLong(phase1TotalSize);
out.writeVLong(phase1ExistingTotalSize);
}
}

View File

@ -139,6 +139,10 @@ public class RecoverySource extends AbstractComponent {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", request.shardId().index().name(), request.shardId().id(), request.targetNode(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.shardId(), response.phase1FileNames, response.phase1FileSizes,
response.phase1ExistingFileNames, response.phase1ExistingFileSizes, response.phase1TotalSize, response.phase1ExistingTotalSize);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, VoidTransportResponseHandler.INSTANCE).txGet();
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : response.phase1FileNames) {

View File

@ -62,6 +62,7 @@ import static org.elasticsearch.common.unit.TimeValue.*;
public class RecoveryTarget extends AbstractComponent {
public static class Actions {
public static final String FILES_INFO = "index/shard/recovery/filesInfo";
public static final String FILE_CHUNK = "index/shard/recovery/fileChunk";
public static final String CLEAN_FILES = "index/shard/recovery/cleanFiles";
public static final String TRANSLOG_OPS = "index/shard/recovery/translogOps";
@ -87,6 +88,7 @@ public class RecoveryTarget extends AbstractComponent {
this.indicesService = indicesService;
this.recoveryThrottler = recoveryThrottler;
transportService.registerHandler(Actions.FILES_INFO, new FilesInfoRequestHandler());
transportService.registerHandler(Actions.FILE_CHUNK, new FileChunkTransportRequestHandler());
transportService.registerHandler(Actions.CLEAN_FILES, new CleanFilesRequestHandler());
transportService.registerHandler(Actions.PREPARE_TRANSLOG, new PrepareForTranslogOperationsRequestHandler());
@ -252,10 +254,6 @@ public class RecoveryTarget extends AbstractComponent {
}
}
static class OnGoingRecovery {
ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
}
class PrepareForTranslogOperationsRequestHandler extends BaseTransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
@Override public RecoveryPrepareForTranslogOperationsRequest newInstance() {
@ -264,6 +262,14 @@ public class RecoveryTarget extends AbstractComponent {
@Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
onGoingRecovery.stage = OnGoingRecovery.Stage.TRANSLOG;
shard.performRecoveryPrepareForTranslog();
channel.sendResponse(VoidStreamable.INSTANCE);
}
@ -277,6 +283,12 @@ public class RecoveryTarget extends AbstractComponent {
@Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
onGoingRecovery.stage = OnGoingRecovery.Stage.FINALIZE;
shard.performRecoveryFinalization(false);
channel.sendResponse(VoidStreamable.INSTANCE);
}
@ -294,6 +306,38 @@ public class RecoveryTarget extends AbstractComponent {
for (Translog.Operation operation : request.operations()) {
shard.performRecoveryOperation(operation);
}
OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
onGoingRecovery.currentTranslogOperations += request.operations().size();
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
class FilesInfoRequestHandler extends BaseTransportRequestHandler<RecoveryFilesInfoRequest> {
@Override public RecoveryFilesInfoRequest newInstance() {
return new RecoveryFilesInfoRequest();
}
@Override public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId.index().name()).shardSafe(request.shardId.id());
OnGoingRecovery onGoingRecovery = onGoingRecoveries.get(shard.shardId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shard.shardId());
}
onGoingRecovery.phase1FileNames = request.phase1FileNames;
onGoingRecovery.phase1FileSizes = request.phase1FileSizes;
onGoingRecovery.phase1ExistingFileNames = request.phase1ExistingFileNames;
onGoingRecovery.phase1ExistingFileSizes = request.phase1ExistingFileSizes;
onGoingRecovery.phase1TotalSize = request.phase1TotalSize;
onGoingRecovery.phase1ExistingTotalSize = request.phase1ExistingTotalSize;
onGoingRecovery.stage = OnGoingRecovery.Stage.FILES;
channel.sendResponse(VoidStreamable.INSTANCE);
}
}