clean and refactor the way fs index gateway work, should work nicer with NFS

This commit is contained in:
kimchy 2010-03-31 17:23:16 +03:00
parent 0586bcd003
commit 38d8fad8d0
12 changed files with 212 additions and 74 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.gateway;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.translog.Translog;
@ -36,7 +37,7 @@ public interface IndexShardGateway extends IndexShardComponent {
/**
* Snapshots the given shard into the gateway.
*/
void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot);
void snapshot(Snapshot snapshot);
/**
* Returns <tt>true</tt> if this gateway requires scheduling management for snapshot
@ -45,4 +46,68 @@ public interface IndexShardGateway extends IndexShardComponent {
boolean requiresSnapshotScheduling();
void close();
public static class Snapshot {
private final SnapshotIndexCommit indexCommit;
private final Translog.Snapshot translogSnapshot;
private final long lastIndexVersion;
private final long lastTranslogId;
private final int lastTranslogSize;
public Snapshot(SnapshotIndexCommit indexCommit, Translog.Snapshot translogSnapshot, long lastIndexVersion, long lastTranslogId, int lastTranslogSize) {
this.indexCommit = indexCommit;
this.translogSnapshot = translogSnapshot;
this.lastIndexVersion = lastIndexVersion;
this.lastTranslogId = lastTranslogId;
this.lastTranslogSize = lastTranslogSize;
}
/**
* Indicates that the index has changed from the latest snapshot.
*/
public boolean indexChanged() {
return lastIndexVersion != indexCommit.getVersion();
}
/**
* Indicates that a new transaction log has been created. Note check this <b>before</b> you
* check {@link #sameTranslogNewOperations()}.
*/
public boolean newTranslogCreated() {
return translogSnapshot.translogId() != lastTranslogId;
}
/**
* Indicates that the same translog exists, but new operations have been appended to it. Throws
* {@link ElasticSearchIllegalStateException} if {@link #newTranslogCreated()} is <tt>true</tt>, so
* always check that first.
*/
public boolean sameTranslogNewOperations() {
if (newTranslogCreated()) {
throw new ElasticSearchIllegalStateException("Should not be called when there is a new translog");
}
return translogSnapshot.size() > lastTranslogSize;
}
public SnapshotIndexCommit indexCommit() {
return indexCommit;
}
public Translog.Snapshot translogSnapshot() {
return translogSnapshot;
}
public long lastIndexVersion() {
return lastIndexVersion;
}
public long lastTranslogId() {
return lastTranslogId;
}
public int lastTranslogSize() {
return lastTranslogSize;
}
}
}

View File

@ -110,14 +110,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
StopWatch stopWatch = new StopWatch().start();
RecoveryStatus recoveryStatus = shardGateway.recover();
// update the last up to date values
indexShard.snapshot(new Engine.SnapshotHandler() {
@Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
lastIndexVersion = snapshotIndexCommit.getVersion();
lastTranslogId = translogSnapshot.translogId();
lastTranslogSize = translogSnapshot.size();
}
});
lastIndexVersion = recoveryStatus.index().version();
lastTranslogId = recoveryStatus.translog().translogId();
lastTranslogSize = recoveryStatus.translog().numberOfOperations();
// start the shard if the gateway has not started it already
if (indexShard.state() != IndexShardState.STARTED) {
@ -127,8 +122,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("]\n");
sb.append(" Index : numberOfFiles [").append(recoveryStatus.index().numberOfFiles()).append("] with totalSize [").append(recoveryStatus.index().totalSize()).append("]\n");
sb.append(" Translog : numberOfOperations [").append(recoveryStatus.translog().numberOfOperations()).append("] with totalSize [").append(recoveryStatus.translog().totalSize()).append("]");
sb.append(" Index : numberOfFiles [").append(recoveryStatus.index().numberOfFiles()).append("] with totalSize [").append(recoveryStatus.index().totalSize()).append("]\n");
sb.append(" Translog : translogId [").append(recoveryStatus.translog().translogId()).append(", numberOfOperations [").append(recoveryStatus.translog().numberOfOperations()).append("] with totalSize [").append(recoveryStatus.translog().totalSize()).append("]");
logger.debug(sb.toString());
}
// refresh the shard
@ -156,7 +151,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
@Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
if (lastIndexVersion != snapshotIndexCommit.getVersion() || lastTranslogId != translogSnapshot.translogId() || lastTranslogSize != translogSnapshot.size()) {
shardGateway.snapshot(snapshotIndexCommit, translogSnapshot);
shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogSize));
lastIndexVersion = snapshotIndexCommit.getVersion();
lastTranslogId = translogSnapshot.translogId();

View File

@ -44,14 +44,23 @@ public class RecoveryStatus {
}
public static class Translog {
private long translogId;
private int numberOfOperations;
private SizeValue totalSize;
public Translog(int numberOfOperations, SizeValue totalSize) {
public Translog(long translogId, int numberOfOperations, SizeValue totalSize) {
this.translogId = translogId;
this.numberOfOperations = numberOfOperations;
this.totalSize = totalSize;
}
/**
* The translog id recovered, <tt>-1</tt> indicating no translog.
*/
public long translogId() {
return translogId;
}
public int numberOfOperations() {
return numberOfOperations;
}
@ -62,14 +71,19 @@ public class RecoveryStatus {
}
public static class Index {
private long version;
private int numberOfFiles;
private SizeValue totalSize;
public Index(int numberOfFiles, SizeValue totalSize) {
public Index(long version, int numberOfFiles, SizeValue totalSize) {
this.numberOfFiles = numberOfFiles;
this.totalSize = totalSize;
}
public long version() {
return this.version;
}
public int numberOfFiles() {
return numberOfFiles;
}

View File

@ -20,11 +20,9 @@
package org.elasticsearch.index.gateway.fs;
import com.google.inject.Inject;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.IndexShardGatewaySnapshotFailedException;
@ -74,12 +72,6 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
private final File locationTranslog;
private volatile long lastIndexVersion;
private volatile long lastTranslogId = -1;
private volatile int lastTranslogSize;
@Inject public FsIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, FsIndexGateway fsIndexGateway, IndexShard indexShard, Store store) {
super(shardId, indexSettings);
this.threadPool = threadPool;
@ -107,27 +99,24 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
RecoveryStatus.Index recoveryStatusIndex = recoverIndex();
RecoveryStatus.Translog recoveryStatusTranslog = recoverTranslog();
// update the last up to date values
indexShard.snapshot(new Engine.SnapshotHandler() {
@Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException {
lastIndexVersion = snapshotIndexCommit.getVersion();
lastTranslogId = translogSnapshot.translogId();
lastTranslogSize = translogSnapshot.size();
}
});
return new RecoveryStatus(recoveryStatusIndex, recoveryStatusTranslog);
}
@Override public void snapshot(final SnapshotIndexCommit snapshotIndexCommit, final Translog.Snapshot translogSnapshot) {
@Override public void snapshot(Snapshot snapshot) {
boolean indexDirty = false;
boolean translogDirty = false;
if (lastIndexVersion != snapshotIndexCommit.getVersion()) {
final SnapshotIndexCommit snapshotIndexCommit = snapshot.indexCommit();
final Translog.Snapshot translogSnapshot = snapshot.translogSnapshot();
if (snapshot.indexChanged()) {
indexDirty = true;
// snapshot into the index
final CountDownLatch latch = new CountDownLatch(snapshotIndexCommit.getFiles().length);
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String fileName : snapshotIndexCommit.getFiles()) {
// don't copy over the segments file, it will be copied over later on as part of the
// final snapshot phase
if (fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
latch.countDown();
continue;
@ -170,21 +159,14 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
File translogFile = new File(locationTranslog, "translog-" + translogSnapshot.translogId());
RandomAccessFile translogRaf = null;
// if we have a different trnaslogId (or the file does not exists at all), we want to flush the full
// translog to a new file (based on the translogId). If we still work on existing translog, just
// append the latest translog operations
if (translogSnapshot.translogId() != lastTranslogId || !translogFile.exists()) {
// if we have a different trnaslogId we want to flush the full translog to a new file (based on the translogId).
// If we still work on existing translog, just append the latest translog operations
if (snapshot.newTranslogCreated()) {
translogDirty = true;
try {
translogRaf = new RandomAccessFile(translogFile, "rw");
StreamOutput out = new DataOutputStreamOutput(translogRaf);
out.writeInt(-1); // write the number of operations header with -1 currently
// double check that we managed to read/write correctly
translogRaf.seek(0);
if (translogRaf.readInt() != -1) {
throw new ElasticSearchIllegalStateException("Wrote to snapshot file [" + translogFile + "] but did not read...");
}
for (Translog.Operation operation : translogSnapshot) {
writeTranslogOperation(out, operation);
}
@ -194,25 +176,27 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
} catch (IOException e1) {
// ignore
}
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog", e);
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog into [" + translogFile + "]", e);
}
} else if (translogSnapshot.size() > lastTranslogSize) {
} else if (snapshot.sameTranslogNewOperations()) {
translogDirty = true;
try {
translogRaf = new RandomAccessFile(translogFile, "rw");
// seek to the end, since we append
translogRaf.seek(translogRaf.length());
StreamOutput out = new DataOutputStreamOutput(translogRaf);
for (Translog.Operation operation : translogSnapshot.skipTo(lastTranslogSize)) {
for (Translog.Operation operation : translogSnapshot.skipTo(snapshot.lastTranslogSize())) {
writeTranslogOperation(out, operation);
}
} catch (Exception e) {
try {
translogRaf.close();
} catch (IOException e1) {
if (translogRaf != null) {
translogRaf.close();
}
} catch (Exception e1) {
// ignore
}
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to snapshot translog", e);
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to append snapshot translog into [" + translogFile + "]", e);
}
}
@ -222,6 +206,18 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
copyFromDirectory(snapshotIndexCommit.getDirectory(), snapshotIndexCommit.getSegmentsFileName(),
new File(locationIndex, snapshotIndexCommit.getSegmentsFileName()));
}
} catch (Exception e) {
try {
if (translogRaf != null) {
translogRaf.close();
}
} catch (Exception e1) {
// ignore
}
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize index snapshot into [" + new File(locationIndex, snapshotIndexCommit.getSegmentsFileName()) + "]", e);
}
try {
if (translogDirty) {
translogRaf.seek(0);
translogRaf.writeInt(translogSnapshot.size());
@ -232,16 +228,18 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
}
} catch (Exception e) {
try {
translogRaf.close();
} catch (IOException e1) {
if (translogRaf != null) {
translogRaf.close();
}
} catch (Exception e1) {
// ignore
}
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize snapshot", e);
throw new IndexShardGatewaySnapshotFailedException(shardId(), "Failed to finalize snapshot into [" + translogFile + "]", e);
}
// delete the old translog
if (lastTranslogId != translogSnapshot.translogId()) {
new File(locationTranslog, "translog-" + lastTranslogId).delete();
if (snapshot.newTranslogCreated()) {
new File(locationTranslog, "translog-" + snapshot.lastTranslogId()).delete();
}
// delete files that no longer exists in the index
@ -260,11 +258,6 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
}
}
}
lastIndexVersion = snapshotIndexCommit.getVersion();
lastTranslogId = translogSnapshot.translogId();
lastTranslogSize = translogSnapshot.size();
}
private RecoveryStatus.Index recoverIndex() throws IndexShardGatewayRecoveryException {
@ -297,7 +290,17 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
for (File file : files) {
totalSize += file.length();
}
return new RecoveryStatus.Index(files.length, new SizeValue(totalSize, SizeUnit.BYTES));
long version = -1;
try {
if (IndexReader.indexExists(store.directory())) {
version = IndexReader.getCurrentVersion(store.directory());
}
} catch (IOException e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to fetch index version after copying it over", e);
}
return new RecoveryStatus.Index(version, files.length, new SizeValue(totalSize, SizeUnit.BYTES));
}
private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecoveryException {
@ -307,7 +310,7 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
if (recoveryTranslogId == -1) {
// no recovery file found, start the shard and bail
indexShard.start();
return new RecoveryStatus.Translog(0, new SizeValue(0, SizeUnit.BYTES));
return new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES));
}
File recoveryTranslogFile = new File(locationTranslog, "translog-" + recoveryTranslogId);
raf = new RandomAccessFile(recoveryTranslogFile, "r");
@ -317,7 +320,7 @@ public class FsIndexShardGateway extends AbstractIndexShardComponent implements
operations.add(readTranslogOperation(new DataInputStreamInput(raf)));
}
indexShard.performRecovery(operations);
return new RecoveryStatus.Translog(operations.size(), new SizeValue(recoveryTranslogFile.length(), SizeUnit.BYTES));
return new RecoveryStatus.Translog(recoveryTranslogId, operations.size(), new SizeValue(recoveryTranslogFile.length(), SizeUnit.BYTES));
} catch (Exception e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e);
} finally {

View File

@ -20,7 +20,6 @@
package org.elasticsearch.index.gateway.none;
import com.google.inject.Inject;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.RecoveryStatus;
@ -29,7 +28,6 @@ import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.settings.Settings;
@ -49,10 +47,10 @@ public class NoneIndexShardGateway extends AbstractIndexShardComponent implement
@Override public RecoveryStatus recover() throws IndexShardGatewayRecoveryException {
// in the none case, we simply start the shard
indexShard.start();
return new RecoveryStatus(new RecoveryStatus.Index(0, new SizeValue(0, SizeUnit.BYTES)), new RecoveryStatus.Translog(0, new SizeValue(0, SizeUnit.BYTES)));
return new RecoveryStatus(new RecoveryStatus.Index(-1, 0, new SizeValue(0, SizeUnit.BYTES)), new RecoveryStatus.Translog(-1, 0, new SizeValue(0, SizeUnit.BYTES)));
}
@Override public void snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) {
@Override public void snapshot(Snapshot snapshot) {
// nothing to do here
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.index.LocalNodeId;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShardLifecycle;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.support.ForceSyncDirectory;
import org.elasticsearch.util.lucene.store.SwitchDirectory;
import org.elasticsearch.util.settings.Settings;
@ -80,7 +81,7 @@ public class MmapFsStore extends AbstractFsStore<Directory> {
return suggestUseCompoundFile;
}
private static class CustomMMapDirectory extends MMapDirectory {
private static class CustomMMapDirectory extends MMapDirectory implements ForceSyncDirectory {
private final boolean syncToDisk;
@ -95,5 +96,9 @@ public class MmapFsStore extends AbstractFsStore<Directory> {
}
super.sync(name);
}
@Override public void forceSync(String name) throws IOException {
super.sync(name);
}
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.index.LocalNodeId;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.support.ForceSyncDirectory;
import org.elasticsearch.util.lucene.store.SwitchDirectory;
import org.elasticsearch.util.settings.Settings;
@ -78,7 +79,7 @@ public class NioFsStore extends AbstractFsStore<Directory> {
return suggestUseCompoundFile;
}
private static class CustomNioFSDirectory extends NIOFSDirectory {
private static class CustomNioFSDirectory extends NIOFSDirectory implements ForceSyncDirectory {
private final boolean syncToDisk;
@ -93,5 +94,9 @@ public class NioFsStore extends AbstractFsStore<Directory> {
}
super.sync(name);
}
@Override public void forceSync(String name) throws IOException {
super.sync(name);
}
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.index.LocalNodeId;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.support.ForceSyncDirectory;
import org.elasticsearch.util.lucene.store.SwitchDirectory;
import org.elasticsearch.util.settings.Settings;
@ -78,7 +79,7 @@ public class SimpleFsStore extends AbstractFsStore<Directory> {
return suggestUseCompoundFile;
}
private static class CustomSimpleFSDirectory extends SimpleFSDirectory {
private static class CustomSimpleFSDirectory extends SimpleFSDirectory implements ForceSyncDirectory {
private final boolean syncToDisk;
@ -93,5 +94,9 @@ public class SimpleFsStore extends AbstractFsStore<Directory> {
}
super.sync(name);
}
@Override public void forceSync(String name) throws IOException {
super.sync(name);
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store.support;
import java.io.IOException;
/**
* A custom directory that allows to forceSync (since the actual directory might disable it)
*
* @author kimchy (shay.banon)
*/
public interface ForceSyncDirectory {
/**
* Similar to {@link org.apache.lucene.store.Directory#sync(String)} but forces it even if its
* disabled.
*/
void forceSync(String name) throws IOException;
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.util.lucene;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.*;
import org.elasticsearch.index.store.support.ForceSyncDirectory;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.io.FileSystemUtils;
@ -158,7 +159,11 @@ public class Directories {
} else {
copyToDirectory(new FileInputStream(copyFrom), dir.createOutput(fileName));
}
dir.sync(fileName);
if (dir instanceof ForceSyncDirectory) {
((ForceSyncDirectory) dir).forceSync(fileName);
} else {
dir.sync(fileName);
}
}
public static void copyToDirectory(InputStream is, IndexOutput io) throws IOException {

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.index.store.support.ForceSyncDirectory;
import java.io.IOException;
import java.util.Set;
@ -37,7 +38,7 @@ import java.util.Set;
*
* @author kimchy (shay.banon)
*/
public class SwitchDirectory extends Directory {
public class SwitchDirectory extends Directory implements ForceSyncDirectory {
private final Directory secondaryDir;
@ -141,6 +142,15 @@ public class SwitchDirectory extends Directory {
getDirectory(name).sync(name);
}
@Override public void forceSync(String name) throws IOException {
Directory dir = getDirectory(name);
if (dir instanceof ForceSyncDirectory) {
((ForceSyncDirectory) dir).forceSync(name);
} else {
dir.sync(name);
}
}
@Override public IndexInput openInput(String name) throws IOException {
return getDirectory(name).openInput(name);
}

View File

@ -48,7 +48,6 @@ public class FsMetaDataGatewayTests extends AbstractServersTests {
}
@Test public void testIndexActions() throws Exception {
buildServer("server1");
((InternalServer) server("server1")).injector().getInstance(Gateway.class).reset();
server("server1").start();
@ -67,7 +66,5 @@ public class FsMetaDataGatewayTests extends AbstractServersTests {
} catch (IndexAlreadyExistsException e) {
// all is well
}
((InternalServer) server("server1")).injector().getInstance(Gateway.class).reset();
}
}