improve handling of translog snapshotting

This commit is contained in:
kimchy 2010-07-08 20:51:37 +03:00
parent 4429a61528
commit 84af9f5681
10 changed files with 50 additions and 66 deletions

View File

@ -43,10 +43,5 @@ public interface AppendableBlobContainer extends BlobContainer {
void close();
}
/**
* Returns of an appended blob can be opened on an existing blob.
*/
boolean canAppendToExistingBlob();
AppendableBlob appendBlob(String blobName) throws IOException;
}

View File

@ -41,10 +41,6 @@ public class FsAppendableBlobContainer extends AbstractFsBlobContainer implement
return new FsAppendableBlob(new File(path, blobName));
}
@Override public boolean canAppendToExistingBlob() {
return true;
}
private class FsAppendableBlob implements AppendableBlob {
private final File file;

View File

@ -44,10 +44,6 @@ public class ImmutableAppendableBlobContainer extends AbstractBlobContainer impl
this.container = container;
}
@Override public boolean canAppendToExistingBlob() {
return false;
}
@Override public AppendableBlob appendBlob(final String blobName) throws IOException {
return new AppendableBlob() {
int part = 0;

View File

@ -58,13 +58,15 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
private final long lastIndexVersion;
private final long lastTranslogId;
private final long lastTranslogPosition;
private final long lastTranslogLength;
public Snapshot(SnapshotIndexCommit indexCommit, Translog.Snapshot translogSnapshot, long lastIndexVersion, long lastTranslogId, long lastTranslogLength) {
public Snapshot(SnapshotIndexCommit indexCommit, Translog.Snapshot translogSnapshot, long lastIndexVersion, long lastTranslogId, long lastTranslogPosition, long lastTranslogLength) {
this.indexCommit = indexCommit;
this.translogSnapshot = translogSnapshot;
this.lastIndexVersion = lastIndexVersion;
this.lastTranslogId = lastTranslogId;
this.lastTranslogPosition = lastTranslogPosition;
this.lastTranslogLength = lastTranslogLength;
}
@ -112,6 +114,10 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
}
public long lastTranslogPosition() {
return lastTranslogPosition;
}
public long lastTranslogLength() {
return lastTranslogLength;
}
}
@ -207,36 +213,15 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo
}
public static class Translog {
private long translogId;
private int numberOfOperations;
private ByteSizeValue totalSize;
private long translogLength;
public Translog(long translogId, long translogLength, int numberOfOperations, ByteSizeValue totalSize) {
this.translogId = translogId;
this.translogLength = translogLength;
public Translog(int numberOfOperations) {
this.numberOfOperations = numberOfOperations;
this.totalSize = totalSize;
}
/**
* The translog id recovered, <tt>-1</tt> indicating no translog.
*/
public long translogId() {
return translogId;
}
public long translogLength() {
return translogLength;
}
public int numberOfOperations() {
return numberOfOperations;
}
public ByteSizeValue totalSize() {
return totalSize;
}
}
public static class Index {

View File

@ -63,6 +63,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private volatile long lastTranslogId = -1;
private volatile long lastTranslogPosition;
private volatile long lastTranslogLength;
private final AtomicBoolean recovered = new AtomicBoolean();
@ -130,8 +132,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
IndexShardGateway.RecoveryStatus recoveryStatus = shardGateway.recover();
lastIndexVersion = recoveryStatus.index().version();
lastTranslogId = recoveryStatus.translog().translogId();
lastTranslogLength = recoveryStatus.translog().translogLength();
lastTranslogId = -1;
lastTranslogPosition = 0;
lastTranslogLength = 0;
// start the shard if the gateway has not started it already
if (indexShard.state() != IndexShardState.STARTED) {
@ -143,7 +146,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n");
sb.append(" translog : translog_id [").append(recoveryStatus.translog().translogId()).append("], number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]");
sb.append(" translog : number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("]");
logger.debug(sb.toString());
}
// refresh the shard
@ -185,13 +188,14 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
try {
IndexShardGateway.SnapshotStatus snapshotStatus = indexShard.snapshot(new Engine.SnapshotHandler<IndexShardGateway.SnapshotStatus>() {
@Override public IndexShardGateway.SnapshotStatus snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogLength != translogSnapshot.length()) {
if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogLength < translogSnapshot.length()) {
IndexShardGateway.SnapshotStatus snapshotStatus =
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogLength));
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogPosition, lastTranslogLength));
lastIndexVersion = snapshotIndexCommit.getVersion();
lastTranslogId = translogSnapshot.translogId();
lastTranslogPosition = translogSnapshot.position();
lastTranslogLength = translogSnapshot.length();
return snapshotStatus;
}

View File

@ -38,7 +38,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -369,14 +368,14 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
} catch (FileNotFoundException e) {
// no index, that fine
indexShard.start();
return new RecoveryStatus.Translog(-1, 0, 0, new ByteSizeValue(0));
return new RecoveryStatus.Translog(0);
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog, can't read current index version", e);
}
if (!translogContainer.blobExists("translog-" + translogId)) {
// no recovery file found, start the shard and bail
indexShard.start();
return new RecoveryStatus.Translog(-1, 0, 0, new ByteSizeValue(0));
return new RecoveryStatus.Translog(0);
}
try {
@ -452,19 +451,10 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
indexShard.performRecoveryFinalization();
// only if we can append to an existing translog we should use the current id and continue to append to it
long lastTranslogId = indexShard.translog().currentId();
Translog.Snapshot translogSnapshot = indexShard.translog().snapshot();
long lastTranslogLength = translogSnapshot.length();
translogSnapshot.release();
if (!translogContainer.canAppendToExistingBlob()) {
// flush the index, so we generate a new translog based on a new index version
indexShard.flush(new Engine.Flush());
lastTranslogId = -1;
lastTranslogLength = 0;
}
// flush the index, so we create a new transaction log
indexShard.flush(new Engine.Flush());
return new RecoveryStatus.Translog(lastTranslogId, lastTranslogLength, totalOperations.get(), new ByteSizeValue(totalSize.get(), ByteSizeUnit.BYTES));
return new RecoveryStatus.Translog(totalOperations.get());
} catch (Throwable e) {
throw new IndexShardGatewayRecoveryException(shardId, "Failed to recovery translog", e);
}

View File

@ -21,8 +21,6 @@ package org.elasticsearch.index.gateway.none;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.gateway.none.NoneGateway;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
@ -55,7 +53,7 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
logger.warn("failed to clean store before starting shard", e);
}
indexShard.start();
return new RecoveryStatus(RecoveryStatus.Index.EMPTY, new RecoveryStatus.Translog(-1, 0, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES)));
return new RecoveryStatus(RecoveryStatus.Index.EMPTY, new RecoveryStatus.Translog(0));
}
@Override public String type() {

View File

@ -167,6 +167,31 @@ public abstract class AbstractSimpleTranslogTests {
snapshot.release();
}
@Test public void testSnapshotWithSeekForward() {
Translog.Snapshot snapshot = translog.snapshot();
assertThat(snapshot, translogSize(0));
snapshot.release();
translog.add(new Translog.Create("test", "1", new byte[]{1}));
snapshot = translog.snapshot();
assertThat(snapshot, translogSize(1));
long lastPosition = snapshot.position();
snapshot.release();
translog.add(new Translog.Create("test", "2", new byte[]{1}));
snapshot = translog.snapshot();
snapshot.seekForward(lastPosition);
assertThat(snapshot, translogSize(1));
snapshot.release();
snapshot = translog.snapshot();
snapshot.seekForward(lastPosition);
assertThat(snapshot.hasNext(), equalTo(true));
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create.id(), equalTo("2"));
snapshot.release();
}
private Term newUid(String id) {
return new Term("_uid", id);
}

View File

@ -34,7 +34,6 @@ import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
@ -416,7 +415,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
if (latestTranslogId == -1) {
// no recovery file found, start the shard and bail
indexShard.start();
return new RecoveryStatus.Translog(-1, 0, 0, new ByteSizeValue(0, ByteSizeUnit.BYTES));
return new RecoveryStatus.Translog(0);
}
@ -447,7 +446,7 @@ public class CloudIndexShardGateway extends AbstractIndexShardComponent implemen
indexShard.performRecoveryPrepareForTranslog();
indexShard.performRecoveryFinalization();
return new RecoveryStatus.Translog(latestTranslogId, operations.size(), 0, new ByteSizeValue(size, ByteSizeUnit.BYTES));
return new RecoveryStatus.Translog(operations.size());
} catch (Exception e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e);
}

View File

@ -40,10 +40,6 @@ public class HdfsAppendableBlobContainer extends AbstractHdfsBlobContainer imple
return new HdfsAppendableBlob(new Path(path, blobName));
}
@Override public boolean canAppendToExistingBlob() {
return false;
}
private class HdfsAppendableBlob implements AppendableBlob {
private final Path file;