internally store on going state of snapshot to gateway

This commit is contained in:
kimchy 2010-08-17 10:03:19 +03:00
parent 29e981d28d
commit e1f3fee4e4
6 changed files with 430 additions and 245 deletions

View File

@ -21,16 +21,10 @@ package org.elasticsearch.index.gateway;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.component.CloseableIndexComponent;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.translog.Translog;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* @author kimchy (shay.banon)
*/
@ -38,8 +32,22 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
String type();
/**
* The last / on going recovery status.
*/
RecoveryStatus recoveryStatus();
/**
* The last snapshot status performed. Can be <tt>null</tt>.
*/
SnapshotStatus lastSnapshotStatus();
/**
* The current snapshot status being performed. Can be <tt>null</tt> indicating that no snapshot
* is being executed currently.
*/
SnapshotStatus currentSnapshotStatus();
/**
* Recovers the state of the shard from the gateway.
*/
@ -125,214 +133,4 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
return lastTranslogLength;
}
}
class SnapshotStatus {
public static SnapshotStatus NA = new SnapshotStatus(timeValueMillis(0), new Index(0, new ByteSizeValue(0), timeValueMillis(0)), new Translog(0, timeValueMillis(0)));
private TimeValue totalTime;
private Index index;
private Translog translog;
public SnapshotStatus(TimeValue totalTime, Index index, Translog translog) {
this.index = index;
this.translog = translog;
this.totalTime = totalTime;
}
public TimeValue totalTime() {
return this.totalTime;
}
public Index index() {
return index;
}
public Translog translog() {
return translog;
}
public static class Translog {
private int numberOfOperations;
private TimeValue time;
public Translog(int numberOfOperations, TimeValue time) {
this.numberOfOperations = numberOfOperations;
this.time = time;
}
public int numberOfOperations() {
return numberOfOperations;
}
public TimeValue time() {
return time;
}
}
public static class Index {
private int numberOfFiles;
private ByteSizeValue totalSize;
private TimeValue time;
public Index(int numberOfFiles, ByteSizeValue totalSize, TimeValue time) {
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
this.time = time;
}
public TimeValue time() {
return this.time;
}
public int numberOfFiles() {
return numberOfFiles;
}
public ByteSizeValue totalSize() {
return totalSize;
}
}
}
class RecoveryStatus {
public static enum Stage {
NONE,
INDEX,
TRANSLOG,
DONE
}
private Stage stage = Stage.NONE;
private Index index = new Index();
private Translog translog = new Translog();
public Stage stage() {
return this.stage;
}
public RecoveryStatus updateStage(Stage stage) {
this.stage = stage;
return this;
}
public Index index() {
return index;
}
public Translog translog() {
return translog;
}
public static class Translog {
volatile long currentTranslogOperations = 0;
private long startTime = -1;
private long took;
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
}
public void took(long took) {
this.took = took;
}
public void addTranslogOperations(long count) {
this.currentTranslogOperations += count;
}
public long currentTranslogOperations() {
return this.currentTranslogOperations;
}
}
public static class Index {
private long startTime = -1;
private long took = -1;
private long version = -1;
private int numberOfFiles = 0;
private long totalSize = 0;
private int numberOfExistingFiles = 0;
private long existingTotalSize = 0;
private AtomicLong throttlingWaitTime = new AtomicLong();
private AtomicLong currentFilesSize = new AtomicLong();
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
}
public void took(long took) {
this.took = took;
}
public long version() {
return this.version;
}
public void files(int numberOfFiles, long totalSize, int numberOfExistingFiles, long existingTotalSize) {
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
this.numberOfExistingFiles = numberOfExistingFiles;
this.existingTotalSize = existingTotalSize;
}
public int numberOfFiles() {
return numberOfFiles;
}
public ByteSizeValue totalSize() {
return new ByteSizeValue(totalSize);
}
public int numberOfExistingFiles() {
return numberOfExistingFiles;
}
public ByteSizeValue existingTotalSize() {
return new ByteSizeValue(existingTotalSize);
}
public void addThrottlingTime(long delta) {
throttlingWaitTime.addAndGet(delta);
}
public TimeValue throttlingWaitTime() {
return new TimeValue(throttlingWaitTime.get());
}
public void updateVersion(long version) {
this.version = version;
}
public long currentFilesSize() {
return this.currentFilesSize.get();
}
public void addCurrentFilesSize(long updatedSize) {
this.currentFilesSize.addAndGet(updatedSize);
}
}
}
}

View File

@ -142,7 +142,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
try {
logger.debug("starting recovery from {} ...", shardGateway);
StopWatch stopWatch = new StopWatch().start();
IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover();
RecoveryStatus recoveryStatus = shardGateway.recover();
lastIndexVersion = recoveryStatus.index().version();
lastTranslogId = -1;
@ -203,12 +203,12 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
return;
}
try {
IndexShardGateway.SnapshotStatus snapshotStatus = indexShard.snapshot(new Engine.SnapshotHandler<IndexShardGateway.SnapshotStatus>() {
@Override public IndexShardGateway.SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
SnapshotStatus snapshotStatus = indexShard.snapshot(new Engine.SnapshotHandler<SnapshotStatus>() {
@Override public SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogLength < translogSnapshot.length()) {
logger.debug("snapshot ({}) to {} ...", reason, shardGateway);
IndexShardGateway.SnapshotStatus snapshotStatus =
SnapshotStatus snapshotStatus =
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogPosition, lastTranslogLength));
lastIndexVersion = snapshotIndexCommit.getVersion();
@ -217,15 +217,15 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
lastTranslogLength = translogSnapshot.length();
return snapshotStatus;
}
return IndexShardGateway.SnapshotStatus.NA;
return null;
}
});
if (snapshotStatus != IndexShardGateway.SnapshotStatus.NA) {
if (snapshotStatus != null) {
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("snapshot (").append(reason).append(") completed to ").append(shardGateway).append(", took [").append(snapshotStatus.totalTime()).append("]\n");
sb.append(" index : version [").append(lastIndexVersion).append("], number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(snapshotStatus.index().totalSize()).append("], took [").append(snapshotStatus.index().time()).append("]\n");
sb.append(" translog : id [").append(lastTranslogId).append("], number_of_operations [").append(snapshotStatus.translog().numberOfOperations()).append("], took [").append(snapshotStatus.translog().time()).append("]");
sb.append("snapshot (").append(reason).append(") completed to ").append(shardGateway).append(", took [").append(snapshotStatus.took()).append("]\n");
sb.append(" index : version [").append(lastIndexVersion).append("], number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(snapshotStatus.index().totalSize()).append("], took [").append(snapshotStatus.index().took()).append("]\n");
sb.append(" translog : id [").append(lastTranslogId).append("], number_of_operations [").append(snapshotStatus.translog().currentTranslogOperations()).append("], took [").append(snapshotStatus.translog().took()).append("]");
logger.debug(sb.toString());
}
}

View File

@ -0,0 +1,187 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author kimchy (shay.banon)
*/
public class RecoveryStatus {
public static enum Stage {
NONE,
INDEX,
TRANSLOG,
DONE
}
private Stage stage = Stage.NONE;
private long startTime;
private long took;
private Index index = new Index();
private Translog translog = new Translog();
public Stage stage() {
return this.stage;
}
public RecoveryStatus updateStage(Stage stage) {
this.stage = stage;
return this;
}
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
}
public void took(long took) {
this.took = took;
}
public Index index() {
return index;
}
public Translog translog() {
return translog;
}
public static class Translog {
volatile long currentTranslogOperations = 0;
private long startTime = -1;
private long took;
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
}
public void took(long took) {
this.took = took;
}
public void addTranslogOperations(long count) {
this.currentTranslogOperations += count;
}
public long currentTranslogOperations() {
return this.currentTranslogOperations;
}
}
public static class Index {
private long startTime = -1;
private long took = -1;
private long version = -1;
private int numberOfFiles = 0;
private long totalSize = 0;
private int numberOfExistingFiles = 0;
private long existingTotalSize = 0;
private AtomicLong throttlingWaitTime = new AtomicLong();
private AtomicLong currentFilesSize = new AtomicLong();
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
}
public void took(long took) {
this.took = took;
}
public long version() {
return this.version;
}
public void files(int numberOfFiles, long totalSize, int numberOfExistingFiles, long existingTotalSize) {
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
this.numberOfExistingFiles = numberOfExistingFiles;
this.existingTotalSize = existingTotalSize;
}
public int numberOfFiles() {
return numberOfFiles;
}
public ByteSizeValue totalSize() {
return new ByteSizeValue(totalSize);
}
public int numberOfExistingFiles() {
return numberOfExistingFiles;
}
public ByteSizeValue existingTotalSize() {
return new ByteSizeValue(existingTotalSize);
}
public void addThrottlingTime(long delta) {
throttlingWaitTime.addAndGet(delta);
}
public TimeValue throttlingWaitTime() {
return new TimeValue(throttlingWaitTime.get());
}
public void updateVersion(long version) {
this.version = version;
}
public long currentFilesSize() {
return this.currentFilesSize.get();
}
public void addCurrentFilesSize(long updatedSize) {
this.currentFilesSize.addAndGet(updatedSize);
}
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.gateway;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
/**
* @author kimchy (shay.banon)
*/
public class SnapshotStatus {
public static enum Stage {
NONE,
INDEX,
TRANSLOG,
FINALIZE,
DONE,
FAILURE
}
private Stage stage = Stage.NONE;
private long startTime;
private long took;
private Index index = new Index();
private Translog translog = new Translog();
private Throwable failure;
public Stage stage() {
return this.stage;
}
public SnapshotStatus updateStage(Stage stage) {
this.stage = stage;
return this;
}
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
}
public void took(long took) {
this.took = took;
}
public void failed(Throwable failure) {
this.failure = failure;
}
public Index index() {
return index;
}
public Translog translog() {
return translog;
}
public static class Index {
private long startTime;
private long took;
private int numberOfFiles;
private long totalSize = -1;
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
}
public void took(long took) {
this.took = took;
}
public void files(int numberOfFiles, long totalSize) {
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
}
public int numberOfFiles() {
return numberOfFiles;
}
public ByteSizeValue totalSize() {
return new ByteSizeValue(totalSize);
}
}
public static class Translog {
private volatile int currentTranslogOperations;
private long startTime = -1;
private long took;
public long startTime() {
return this.startTime;
}
public void startTime(long startTime) {
this.startTime = startTime;
}
public TimeValue took() {
return new TimeValue(this.took);
}
public void took(long took) {
this.took = took;
}
public void addTranslogOperations(long count) {
this.currentTranslogOperations += count;
}
public long currentTranslogOperations() {
return this.currentTranslogOperations;
}
}
}

View File

@ -39,13 +39,9 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
import org.elasticsearch.index.gateway.*;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
@ -106,6 +102,10 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
private volatile AppendableBlobContainer.AppendableBlob translogBlob;
private volatile SnapshotStatus lastSnapshotStatus;
private volatile SnapshotStatus currentSnapshotStatus;
protected BlobStoreIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexGateway indexGateway,
IndexShard indexShard, Store store, RecoveryThrottler recoveryThrottler) {
super(shardId, indexSettings);
@ -149,8 +149,42 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
@Override public SnapshotStatus lastSnapshotStatus() {
return this.lastSnapshotStatus;
}
@Override public SnapshotStatus currentSnapshotStatus() {
return this.currentSnapshotStatus;
}
@Override public SnapshotStatus snapshot(final Snapshot snapshot) throws IndexShardGatewaySnapshotFailedException {
long totalTimeStart = System.currentTimeMillis();
currentSnapshotStatus = new SnapshotStatus();
currentSnapshotStatus.startTime(System.currentTimeMillis());
try {
doSnapshot(snapshot);
currentSnapshotStatus.took(System.currentTimeMillis() - currentSnapshotStatus.startTime());
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.DONE);
} catch (Exception e) {
currentSnapshotStatus.took(System.currentTimeMillis() - currentSnapshotStatus.startTime());
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.FAILURE);
currentSnapshotStatus.failed(e);
if (e instanceof IndexShardGatewaySnapshotFailedException) {
throw (IndexShardGatewaySnapshotFailedException) e;
} else {
throw new IndexShardGatewaySnapshotFailedException(shardId, e.getMessage(), e);
}
} finally {
this.lastSnapshotStatus = currentSnapshotStatus;
this.currentSnapshotStatus = null;
}
return this.lastSnapshotStatus;
}
private void doSnapshot(final Snapshot snapshot) throws IndexShardGatewaySnapshotFailedException {
currentSnapshotStatus.index().startTime(System.currentTimeMillis());
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.INDEX);
boolean indexDirty = false;
final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
@ -218,6 +252,8 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
currentSnapshotStatus.index().files(indexNumberOfFiles, indexTotalFilesSize);
try {
latch.await();
} catch (InterruptedException e) {
@ -229,14 +265,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
indexTime = System.currentTimeMillis() - time;
}
currentSnapshotStatus.index().took(System.currentTimeMillis() - currentSnapshotStatus.index().startTime());
// handle if snapshot has changed
final AtomicInteger translogNumberOfOperations = new AtomicInteger();
long translogTime = 0;
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.TRANSLOG);
currentSnapshotStatus.translog().startTime(System.currentTimeMillis());
if (snapshot.newTranslogCreated() || snapshot.sameTranslogNewOperations()) {
long time = System.currentTimeMillis();
if (snapshot.newTranslogCreated() && translogBlob != null) {
translogBlob.close();
translogBlob = null;
@ -262,7 +296,6 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
bos = new FastByteArrayOutputStream();
cachedBos = new SoftReference<FastByteArrayOutputStream>(bos);
}
int totalNumberOfOperations = 0;
OutputStreamStreamOutput bosOs = new OutputStreamStreamOutput(bos);
while (translogSnapshot.hasNext()) {
bos.reset();
@ -270,9 +303,8 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
bosOs.flush();
os.writeVInt(bos.size());
os.writeBytes(bos.unsafeByteArray(), bos.size());
totalNumberOfOperations++;
currentSnapshotStatus.translog().addTranslogOperations(1);
}
translogNumberOfOperations.set(totalNumberOfOperations);
}
@Override public void onCompleted() {
@ -295,8 +327,8 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
throw new IndexShardGatewaySnapshotFailedException(shardId, "Failed to snapshot translog", failure.get());
}
translogTime = System.currentTimeMillis() - time;
}
currentSnapshotStatus.translog().took(System.currentTimeMillis() - currentSnapshotStatus.translog().startTime());
// now write the segments file
if (indexDirty) {
@ -327,6 +359,8 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
currentSnapshotStatus.updateStage(SnapshotStatus.Stage.FINALIZE);
// delete the old translog
if (snapshot.newTranslogCreated()) {
try {
@ -367,13 +401,11 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
}
}
}
return new SnapshotStatus(new TimeValue(System.currentTimeMillis() - totalTimeStart),
new SnapshotStatus.Index(indexNumberOfFiles, new ByteSizeValue(indexTotalFilesSize), new TimeValue(indexTime)),
new SnapshotStatus.Translog(translogNumberOfOperations.get(), new TimeValue(translogTime)));
}
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
recoveryStatus.startTime(System.currentTimeMillis());
recoveryStatus.index().startTime(System.currentTimeMillis());
recoveryStatus.updateStage(RecoveryStatus.Stage.INDEX);
recoverIndex();
@ -384,6 +416,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
recoverTranslog();
recoveryStatus.translog().took(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.took(System.currentTimeMillis() - recoveryStatus.startTime());
recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);
return recoveryStatus;
}

View File

@ -24,6 +24,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.index.gateway.SnapshotStatus;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
@ -55,6 +57,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
}
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
recoveryStatus.startTime(System.currentTimeMillis());
recoveryStatus().index().startTime(System.currentTimeMillis());
recoveryStatus.translog().startTime(System.currentTimeMillis());
// in the none case, we simply start the shard
@ -67,6 +70,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
indexShard.start();
recoveryStatus.index().took(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.translog().took(System.currentTimeMillis() - recoveryStatus.index().startTime());
recoveryStatus.took(System.currentTimeMillis() - recoveryStatus.startTime());
return recoveryStatus.updateStage(RecoveryStatus.Stage.DONE);
}
@ -75,7 +79,15 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
}
@Override public SnapshotStatus snapshot(Snapshot snapshot) {
return SnapshotStatus.NA;
return null;
}
@Override public SnapshotStatus lastSnapshotStatus() {
return null;
}
@Override public SnapshotStatus currentSnapshotStatus() {
return null;
}
@Override public boolean requiresSnapshotScheduling() {