more work on reusing work node level data, now when recovering from peers

This commit is contained in:
kimchy 2010-06-24 15:34:02 +03:00
parent 57169d4233
commit edf0075025
7 changed files with 255 additions and 95 deletions

View File

@ -1,7 +1,14 @@
rootLogger: INFO, console, file
logger:
# log action execution errors for easier debugging
action : DEBUG
action: DEBUG
# gateway
#gateway: DEBUG
#index.gateway: DEBUG
# peer shard recovery
#index.shard.recovery: DEBUG
appender:
console:

View File

@ -137,8 +137,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" index : number_of_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n");
sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n");
sb.append(" translog : translog_id [").append(recoveryStatus.translog().translogId()).append("], number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]");
logger.debug(sb.toString());
}

View File

@ -381,9 +381,11 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
break;
}
}
indexShard.performRecovery(operations);
indexShard.performRecoveryPrepareForTranslog();
indexShard.performRecoveryOperations(operations);
indexShard.performRecoveryFinalization();
// clean all the other translogs
// clean all the other translog
for (Long translogIdToDelete : translogIds) {
if (!translogId.equals(translogIdToDelete)) {
try {

View File

@ -26,6 +26,9 @@ import org.elasticsearch.ElasticSearchInterruptedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -44,14 +47,16 @@ import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.memory.MemorySnapshot;
import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.indices.recovery.throttler.RecoveryThrottler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -86,7 +91,13 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
private final String fileChunkTransportAction;
private final String snapshotTransportAction;
private final String cleanFilesTransportAction;
private final String prepareForTranslogOperationsTransportAction;
private final String translogOperationsTransportAction;
private final String finalizeRecoveryTransportAction;
private volatile boolean closed = false;
@ -109,20 +120,34 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
startTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/start";
transportService.registerHandler(startTransportAction, new StartRecoveryTransportRequestHandler());
fileChunkTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/fileChunk";
transportService.registerHandler(fileChunkTransportAction, new FileChunkTransportRequestHandler());
snapshotTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/snapshot";
transportService.registerHandler(snapshotTransportAction, new SnapshotTransportRequestHandler());
cleanFilesTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/cleanFiles";
transportService.registerHandler(cleanFilesTransportAction, new CleanFilesRequestHandler());
prepareForTranslogOperationsTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/prepareForTranslog";
transportService.registerHandler(prepareForTranslogOperationsTransportAction, new PrepareForTranslogOperationsRequestHandler());
translogOperationsTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/translogOperations";
transportService.registerHandler(translogOperationsTransportAction, new TranslogOperationsRequestHandler());
finalizeRecoveryTransportAction = shardId.index().name() + "/" + shardId.id() + "/recovery/finalizeRecovery";
transportService.registerHandler(finalizeRecoveryTransportAction, new FinalizeRecoveryRequestHandler());
this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB));
logger.trace("Recovery Action registered, using file_chunk_size[{}]", fileChunkSize);
logger.trace("recovery action registered, using file_chunk_size[{}]", fileChunkSize);
}
public void close() {
closed = true;
transportService.removeHandler(startTransportAction);
transportService.removeHandler(fileChunkTransportAction);
transportService.removeHandler(snapshotTransportAction);
transportService.removeHandler(cleanFilesTransportAction);
transportService.removeHandler(prepareForTranslogOperationsTransportAction);
transportService.removeHandler(translogOperationsTransportAction);
transportService.removeHandler(finalizeRecoveryTransportAction);
cleanOpenIndex();
@ -176,17 +201,24 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
throttlingWaitTime.stop();
logger.debug("Starting recovery from {}", targetNode);
logger.debug("starting recovery from {}", targetNode);
try {
if (closed) {
throw new IgnoreRecoveryException("Recovery closed");
}
// build a list of the current files located locally, maybe we don't need to recover them...
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(node, markAsRelocated);
for (String storeFile : store.directory().listAll()) {
startRecoveryRequest.existingFiles.put(storeFile, store.directory().fileLength(storeFile));
}
StopWatch stopWatch = null;
RecoveryStatus recoveryStatus = null;
boolean retry = true;
while (retry) {
stopWatch = new StopWatch().start();
recoveryStatus = transportService.submitRequest(targetNode, startTransportAction, new StartRecoveryRequest(node, markAsRelocated), new FutureTransportResponseHandler<RecoveryStatus>() {
recoveryStatus = transportService.submitRequest(targetNode, startTransportAction, startRecoveryRequest, new FutureTransportResponseHandler<RecoveryStatus>() {
@Override public RecoveryStatus newInstance() {
return new RecoveryStatus();
}
@ -207,15 +239,15 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
stopWatch.stop();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Recovery completed from ").append(targetNode).append(", took[").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" Phase1: recovered [").append(recoveryStatus.phase1FileNames.size()).append("]")
.append(" files with total size of [").append(new ByteSizeValue(recoveryStatus.phase1TotalSize)).append("]")
sb.append("recovery completed from ").append(targetNode).append(", took[").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" phase1: recovered_files [").append(recoveryStatus.phase1FileNames.size()).append("]").append(" with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1TotalSize)).append("]")
.append(", took [").append(timeValueMillis(recoveryStatus.phase1Time)).append("], throttling_wait [").append(timeValueMillis(recoveryStatus.phase1ThrottlingWaitTime)).append(']')
.append("\n");
sb.append(" Phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations")
sb.append(" : reusing_files [").append(recoveryStatus.phase1ExistingFileNames.size()).append("] with total_size of [").append(new ByteSizeValue(recoveryStatus.phase1ExistingTotalSize)).append("]\n");
sb.append(" phase2: recovered [").append(recoveryStatus.phase2Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryStatus.phase2Time)).append("]")
.append("\n");
sb.append(" Phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations")
sb.append(" phase3: recovered [").append(recoveryStatus.phase3Operations).append("]").append(" transaction log operations")
.append(", took [").append(timeValueMillis(recoveryStatus.phase3Time)).append("]");
logger.debug(sb.toString());
}
@ -263,11 +295,13 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
openIndexOutputs.clear();
}
private static class StartRecoveryRequest implements Streamable {
static class StartRecoveryRequest implements Streamable {
private DiscoveryNode node;
DiscoveryNode node;
private boolean markAsRelocated;
boolean markAsRelocated;
Map<String, Long> existingFiles = Maps.newHashMap();
private StartRecoveryRequest() {
}
@ -280,11 +314,20 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
@Override public void readFrom(StreamInput in) throws IOException {
node = DiscoveryNode.readNode(in);
markAsRelocated = in.readBoolean();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
existingFiles.put(in.readUTF(), in.readVLong());
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeBoolean(markAsRelocated);
out.writeVInt(existingFiles.size());
for (Map.Entry<String, Long> entry : existingFiles.entrySet()) {
out.writeUTF(entry.getKey());
out.writeVLong(entry.getValue());
}
}
}
@ -302,32 +345,44 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
return;
}
try {
logger.trace("Starting recovery to {}, markAsRelocated {}", startRecoveryRequest.node, startRecoveryRequest.markAsRelocated);
logger.trace("starting recovery to {}, mark_as_relocated {}", startRecoveryRequest.node, startRecoveryRequest.markAsRelocated);
final DiscoveryNode node = startRecoveryRequest.node;
cleanOpenIndex();
final RecoveryStatus recoveryStatus = new RecoveryStatus();
indexShard.recover(new Engine.RecoveryHandler() {
@Override public void phase1(SnapshotIndexCommit snapshot) throws ElasticSearchException {
long totalSize = 0;
long existingTotalSize = 0;
try {
StopWatch stopWatch = new StopWatch().start();
for (String name : snapshot.getFiles()) {
IndexInput indexInput = store.directory().openInput(name);
recoveryStatus.phase1FileNames.add(name);
recoveryStatus.phase1FileSizes.add(indexInput.length());
totalSize += indexInput.length();
indexInput.close();
long length = indexInput.length();
try {
if (startRecoveryRequest.existingFiles.containsKey(name) && startRecoveryRequest.existingFiles.get(name) == length) {
recoveryStatus.phase1ExistingFileNames.add(name);
recoveryStatus.phase1ExistingFileSizes.add(length);
existingTotalSize += length;
} else {
recoveryStatus.phase1FileNames.add(name);
recoveryStatus.phase1FileSizes.add(length);
totalSize += length;
}
} finally {
indexInput.close();
}
}
recoveryStatus.phase1TotalSize = totalSize;
recoveryStatus.phase1ExistingTotalSize = existingTotalSize;
final AtomicLong throttlingWaitTime = new AtomicLong();
logger.trace("Recovery [phase1] to {}: recovering [{}] files with total size of [{}]", node, snapshot.getFiles().length, new ByteSizeValue(totalSize));
logger.trace("recovery [phase1] to {}: recovering [{}] files with total size of [{}]", node, recoveryStatus.phase1FileNames.size(), new ByteSizeValue(totalSize));
final CountDownLatch latch = new CountDownLatch(snapshot.getFiles().length);
final CountDownLatch latch = new CountDownLatch(recoveryStatus.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : snapshot.getFiles()) {
for (final String name : recoveryStatus.phase1FileNames) {
sendFileChunksRecoveryFutures.add(threadPool.submit(new Runnable() {
@Override public void run() {
IndexInput indexInput = null;
@ -374,14 +429,19 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
throw lastException.get();
}
// now, set the clean files request
CleanFilesRequest cleanFilesRequest = new CleanFilesRequest();
cleanFilesRequest.snapshotFiles.addAll(Arrays.asList(snapshot.getFiles()));
transportService.submitRequest(node, cleanFilesTransportAction, cleanFilesRequest, VoidTransportResponseHandler.INSTANCE).txGet();
stopWatch.stop();
logger.trace("Recovery [phase1] to {}: took [{}], throttling_wait [{}]", node, stopWatch.totalTime(), timeValueMillis(throttlingWaitTime.get()));
logger.trace("recovery [phase1] to {}: took [{}], throttling_wait [{}]", node, stopWatch.totalTime(), timeValueMillis(throttlingWaitTime.get()));
recoveryStatus.phase1Time = stopWatch.totalTime().millis();
} catch (ElasticSearchInterruptedException e) {
// we got interrupted since we are closing, ignore the recovery
throw new IgnoreRecoveryException("Interrupted while recovering files");
} catch (Throwable e) {
throw new RecoverFilesRecoveryException(shardId, snapshot.getFiles().length, new ByteSizeValue(totalSize), e);
throw new RecoverFilesRecoveryException(shardId, recoveryStatus.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
} finally {
sendFileChunksRecoveryFutures.clear();
}
@ -393,11 +453,15 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("Recovery [phase2] to {}: sending [{}] transaction log operations", node, snapshot.size());
logger.trace("recovery [phase2] to {}: sending [{}] transaction log operations", node, snapshot.size());
StopWatch stopWatch = new StopWatch().start();
sendSnapshot(snapshot, false);
transportService.submitRequest(node, prepareForTranslogOperationsTransportAction, VoidStreamable.INSTANCE, VoidTransportResponseHandler.INSTANCE).txGet();
sendSnapshot(snapshot);
stopWatch.stop();
logger.trace("Recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime());
logger.trace("recovery [phase2] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase2Time = stopWatch.totalTime().millis();
recoveryStatus.phase2Operations = snapshot.size();
} catch (ElasticSearchInterruptedException e) {
@ -414,9 +478,10 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
if (closed) {
throw new IndexShardClosedException(shardId);
}
logger.trace("Recovery [phase3] to {}: sending [{}] transaction log operations", node, snapshot.size());
logger.trace("recovery [phase3] to {}: sending [{}] transaction log operations", node, snapshot.size());
StopWatch stopWatch = new StopWatch().start();
sendSnapshot(snapshot, true);
sendSnapshot(snapshot);
transportService.submitRequest(node, finalizeRecoveryTransportAction, VoidStreamable.INSTANCE, VoidTransportResponseHandler.INSTANCE).txGet();
if (startRecoveryRequest.markAsRelocated) {
// TODO what happens if the recovery process fails afterwards, we need to mark this back to started
try {
@ -428,7 +493,7 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
}
stopWatch.stop();
logger.trace("Recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime());
logger.trace("recovery [phase3] to {}: took [{}]", node, stopWatch.totalTime());
recoveryStatus.phase3Time = stopWatch.totalTime().millis();
recoveryStatus.phase3Operations = snapshot.size();
} catch (ElasticSearchInterruptedException e) {
@ -439,14 +504,12 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
}
private void sendSnapshot(Translog.Snapshot snapshot, boolean phase3) throws ElasticSearchException {
MemorySnapshot memorySnapshot;
if (snapshot instanceof MemorySnapshot) {
memorySnapshot = (MemorySnapshot) snapshot;
} else {
memorySnapshot = new MemorySnapshot(snapshot);
private void sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException {
TranslogOperationsRequest request = new TranslogOperationsRequest();
for (Translog.Operation operation : snapshot) {
request.operations.add(operation);
}
transportService.submitRequest(node, snapshotTransportAction, new SnapshotWrapper(memorySnapshot, phase3), VoidTransportResponseHandler.INSTANCE).txGet();
transportService.submitRequest(node, translogOperationsTransportAction, request, VoidTransportResponseHandler.INSTANCE).txGet();
}
});
channel.sendResponse(recoveryStatus);
@ -459,9 +522,12 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
private static class RecoveryStatus implements Streamable {
boolean retry = false;
List<String> phase1FileNames = new ArrayList<String>();
List<Long> phase1FileSizes = new ArrayList<Long>();
List<String> phase1FileNames = Lists.newArrayList();
List<Long> phase1FileSizes = Lists.newArrayList();
List<String> phase1ExistingFileNames = Lists.newArrayList();
List<Long> phase1ExistingFileSizes = Lists.newArrayList();
long phase1TotalSize;
long phase1ExistingTotalSize;
long phase1Time;
long phase1ThrottlingWaitTime;
@ -477,16 +543,29 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
@Override public void readFrom(StreamInput in) throws IOException {
retry = in.readBoolean();
int size = in.readVInt();
phase1FileNames = new ArrayList<String>(size);
phase1FileNames = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1FileNames.add(in.readUTF());
}
size = in.readVInt();
phase1FileSizes = new ArrayList<Long>(size);
phase1FileSizes = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1FileSizes.add(in.readVLong());
}
size = in.readVInt();
phase1ExistingFileNames = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1ExistingFileNames.add(in.readUTF());
}
size = in.readVInt();
phase1ExistingFileSizes = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
phase1ExistingFileSizes.add(in.readVLong());
}
phase1TotalSize = in.readVLong();
phase1ExistingTotalSize = in.readVLong();
phase1Time = in.readVLong();
phase1ThrottlingWaitTime = in.readVLong();
phase2Operations = in.readVInt();
@ -505,7 +584,18 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
for (long size : phase1FileSizes) {
out.writeVLong(size);
}
out.writeVInt(phase1ExistingFileNames.size());
for (String name : phase1ExistingFileNames) {
out.writeUTF(name);
}
out.writeVInt(phase1ExistingFileSizes.size());
for (long size : phase1ExistingFileSizes) {
out.writeVLong(size);
}
out.writeVLong(phase1TotalSize);
out.writeVLong(phase1ExistingTotalSize);
out.writeVLong(phase1Time);
out.writeVLong(phase1ThrottlingWaitTime);
out.writeVInt(phase2Operations);
@ -515,26 +605,38 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
}
private class SnapshotTransportRequestHandler extends BaseTransportRequestHandler<SnapshotWrapper> {
static class CleanFilesRequest implements Streamable {
@Override public SnapshotWrapper newInstance() {
return new SnapshotWrapper();
Set<String> snapshotFiles = Sets.newHashSet();
@Override public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
for (int i = 0; i < size; i++) {
snapshotFiles.add(in.readUTF());
}
}
@Override public void messageReceived(SnapshotWrapper snapshot, TransportChannel channel) throws Exception {
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(snapshotFiles.size());
for (String snapshotFile : snapshotFiles) {
out.writeUTF(snapshotFile);
}
}
}
class CleanFilesRequestHandler extends BaseTransportRequestHandler<CleanFilesRequest> {
@Override public CleanFilesRequest newInstance() {
return new CleanFilesRequest();
}
@Override public void messageReceived(CleanFilesRequest request, TransportChannel channel) throws Exception {
receiveSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
if (!snapshot.phase3) {
// clean open index outputs in any case (there should not be any open, we close then in the chunk)
cleanOpenIndex();
}
indexShard.performRecovery(snapshot.snapshot, snapshot.phase3);
if (snapshot.phase3) {
indexShard.refresh(new Engine.Refresh(true));
// probably need to do more here...
for (String existingFile : store.directory().listAll()) {
if (!request.snapshotFiles.contains(existingFile)) {
store.directory().deleteFile(existingFile);
}
}
channel.sendResponse(VoidStreamable.INSTANCE);
} finally {
@ -543,29 +645,80 @@ public class RecoveryAction extends AbstractIndexShardComponent implements Close
}
}
private static class SnapshotWrapper implements Streamable {
private MemorySnapshot snapshot;
class PrepareForTranslogOperationsRequestHandler extends BaseTransportRequestHandler<VoidStreamable> {
private boolean phase3;
private SnapshotWrapper() {
@Override public VoidStreamable newInstance() {
return VoidStreamable.INSTANCE;
}
private SnapshotWrapper(MemorySnapshot snapshot, boolean phase3) {
this.snapshot = snapshot;
this.phase3 = phase3;
@Override public void messageReceived(VoidStreamable stream, TransportChannel channel) throws Exception {
receiveSnapshotRecoveryThread = Thread.currentThread();
try {
indexShard.performRecoveryPrepareForTranslog();
channel.sendResponse(VoidStreamable.INSTANCE);
} finally {
receiveSnapshotRecoveryThread = null;
}
}
}
class FinalizeRecoveryRequestHandler extends BaseTransportRequestHandler<VoidStreamable> {
@Override public VoidStreamable newInstance() {
return VoidStreamable.INSTANCE;
}
@Override public void messageReceived(VoidStreamable stream, TransportChannel channel) throws Exception {
receiveSnapshotRecoveryThread = Thread.currentThread();
try {
indexShard.performRecoveryFinalization();
channel.sendResponse(VoidStreamable.INSTANCE);
} finally {
receiveSnapshotRecoveryThread = null;
}
}
}
class TranslogOperationsRequestHandler extends BaseTransportRequestHandler<TranslogOperationsRequest> {
@Override public TranslogOperationsRequest newInstance() {
return new TranslogOperationsRequest();
}
@Override public void messageReceived(TranslogOperationsRequest snapshot, TransportChannel channel) throws Exception {
receiveSnapshotRecoveryThread = Thread.currentThread();
try {
if (closed) {
throw new IndexShardClosedException(shardId);
}
indexShard.performRecoveryOperations(snapshot.operations);
channel.sendResponse(VoidStreamable.INSTANCE);
} finally {
receiveSnapshotRecoveryThread = null;
}
}
}
static class TranslogOperationsRequest implements Streamable {
List<Translog.Operation> operations = Lists.newArrayList();
TranslogOperationsRequest() {
}
@Override public void readFrom(StreamInput in) throws IOException {
snapshot = new MemorySnapshot();
snapshot.readFrom(in);
phase3 = in.readBoolean();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
operations.add(TranslogStreams.readTranslogOperation(in));
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
snapshot.writeTo(out);
out.writeBoolean(phase3);
out.writeVInt(operations.size());
for (Translog.Operation operation : operations) {
TranslogStreams.writeTranslogOperation(out, operation);
}
}
}

View File

@ -400,35 +400,30 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
}
public void performRecovery(Iterable<Translog.Operation> operations) throws ElasticSearchException {
/**
* After the store has been recovered, we need to start the engine in order to apply operations
*/
public void performRecoveryPrepareForTranslog() throws ElasticSearchException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
engine.start();
applyTranslogOperations(operations);
}
public void performRecoveryFinalization() throws ElasticSearchException {
synchronized (mutex) {
logger.debug("Moved to state [STARTED] post recovery (from gateway)");
logger.debug("Moved to state [STARTED] post recovery (from another shard)");
state = IndexShardState.STARTED;
}
scheduleRefresherIfNeeded();
engine.refresh(new Engine.Refresh(true));
}
public void performRecovery(Translog.Snapshot snapshot, boolean phase3) throws ElasticSearchException {
public void performRecoveryOperations(Iterable<Translog.Operation> operations) throws ElasticSearchException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
if (!phase3) {
// start the engine, but the shard is not started yet...
engine.start();
}
applyTranslogOperations(snapshot);
if (phase3) {
synchronized (mutex) {
logger.debug("Moved to state [STARTED] post recovery (from another shard)");
state = IndexShardState.STARTED;
}
scheduleRefresherIfNeeded();
}
applyTranslogOperations(operations);
}
private void applyTranslogOperations(Iterable<Translog.Operation> snapshot) {

View File

@ -25,7 +25,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TranslogStreams {

View File

@ -461,7 +461,10 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
}
currentTranslogPartToWrite = index;
indexShard.performRecovery(operations);
indexShard.performRecoveryPrepareForTranslog();
indexShard.performRecoveryOperations(operations);
indexShard.performRecoveryFinalization();
return new RecoveryStatus.Translog(latestTranslogId, operations.size(), new ByteSizeValue(size, ByteSizeUnit.BYTES));
} catch (Exception e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e);