Switch MetaDataStateFormat to Lucene directory abstraction (#33989)

Switch MetaDataStateFormat to Lucene directory abstraction

This commit switches MetaDataStateFormat class to Lucene directory abstraction to make it easier to test MetaDataStateFormat for different IO failures.
This commits also adds different IO failures tests to MetaDataStateFormatTests.
This commit is contained in:
Andrey Ershov 2018-10-17 18:17:17 +02:00 committed by GitHub
parent 93bb24e1f8
commit 51f38ddc0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 299 additions and 128 deletions

View File

@ -27,7 +27,7 @@ import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.logging.Loggers;
@ -48,9 +48,9 @@ import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -69,7 +69,6 @@ public abstract class MetaDataStateFormat<T> {
private static final String STATE_FILE_CODEC = "state";
private static final int MIN_COMPATIBLE_STATE_FILE_VERSION = 1;
private static final int STATE_FILE_VERSION = 1;
private static final int BUFFER_SIZE = 4096;
private final String prefix;
private final Pattern stateFilePattern;
@ -81,7 +80,64 @@ public abstract class MetaDataStateFormat<T> {
protected MetaDataStateFormat(String prefix) {
this.prefix = prefix;
this.stateFilePattern = Pattern.compile(Pattern.quote(prefix) + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?");
}
private static void deleteFileIfExists(Path stateLocation, Directory directory, String fileName) throws IOException {
try {
directory.deleteFile(fileName);
} catch (FileNotFoundException | NoSuchFileException ignored) {
}
logger.trace("cleaned up {}", stateLocation.resolve(fileName));
}
private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String fileName, String tmpFileName)
throws IOException {
try {
deleteFileIfExists(stateLocation, stateDir, tmpFileName);
try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) {
CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION);
out.writeInt(FORMAT.index());
try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) {
@Override
public void close() throws IOException {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
}
})) {
builder.startObject();
{
toXContent(builder, state);
}
builder.endObject();
}
CodecUtil.writeFooter(out);
}
stateDir.sync(Collections.singleton(tmpFileName));
stateDir.rename(tmpFileName, fileName);
stateDir.syncMetaData();
logger.trace("written state to {}", stateLocation.resolve(fileName));
} finally {
deleteFileIfExists(stateLocation, stateDir, tmpFileName);
}
}
private void copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String fileName, String tmpFileName)
throws IOException {
try (Directory extraStateDir = newDirectory(extraStateLocation)) {
try {
deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName);
extraStateDir.copyFrom(srcStateDir, fileName, tmpFileName, IOContext.DEFAULT);
extraStateDir.sync(Collections.singleton(tmpFileName));
extraStateDir.rename(tmpFileName, fileName);
extraStateDir.syncMetaData();
logger.trace("copied state to {}", extraStateLocation.resolve(fileName));
} finally {
deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName);
}
}
}
/**
@ -89,8 +145,10 @@ public abstract class MetaDataStateFormat<T> {
* state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it
* doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to
* it's target filename of the pattern {@code {prefix}{version}.st}.
* If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return it.
* But if this method throws an exception, loadLatestState could return this state or some previous state.
*
* @param state the state object to write
* @param state the state object to write
* @param locations the locations where the state should be written to.
* @throws IOException if an IOException occurs
*/
@ -101,60 +159,22 @@ public abstract class MetaDataStateFormat<T> {
if (locations.length <= 0) {
throw new IllegalArgumentException("One or more locations required");
}
final long maxStateId = findMaxStateId(prefix, locations)+1;
final long maxStateId = findMaxStateId(prefix, locations) + 1;
assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]";
final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION;
Path stateLocation = locations[0].resolve(STATE_DIR_NAME);
Files.createDirectories(stateLocation);
final Path tmpStatePath = stateLocation.resolve(fileName + ".tmp");
final Path finalStatePath = stateLocation.resolve(fileName);
try {
final String resourceDesc = "MetaDataStateFormat.write(path=\"" + tmpStatePath + "\")";
try (OutputStreamIndexOutput out =
new OutputStreamIndexOutput(resourceDesc, fileName, Files.newOutputStream(tmpStatePath), BUFFER_SIZE)) {
CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION);
out.writeInt(FORMAT.index());
try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) {
@Override
public void close() throws IOException {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
} })) {
builder.startObject();
{
toXContent(builder, state);
}
builder.endObject();
}
CodecUtil.writeFooter(out);
}
IOUtils.fsync(tmpStatePath, false); // fsync the state file
Files.move(tmpStatePath, finalStatePath, StandardCopyOption.ATOMIC_MOVE);
IOUtils.fsync(stateLocation, true);
logger.trace("written state to {}", finalStatePath);
final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION;
final String tmpFileName = fileName + ".tmp";
final Path firstStateLocation = locations[0].resolve(STATE_DIR_NAME);
try (Directory stateDir = newDirectory(firstStateLocation)) {
writeStateToFirstLocation(state, firstStateLocation, stateDir, fileName, tmpFileName);
for (int i = 1; i < locations.length; i++) {
stateLocation = locations[i].resolve(STATE_DIR_NAME);
Files.createDirectories(stateLocation);
Path tmpPath = stateLocation.resolve(fileName + ".tmp");
Path finalPath = stateLocation.resolve(fileName);
try {
Files.copy(finalStatePath, tmpPath);
IOUtils.fsync(tmpPath, false); // fsync the state file
// we are on the same FileSystem / Partition here we can do an atomic move
Files.move(tmpPath, finalPath, StandardCopyOption.ATOMIC_MOVE);
IOUtils.fsync(stateLocation, true);
logger.trace("copied state to {}", finalPath);
} finally {
Files.deleteIfExists(tmpPath);
logger.trace("cleaned up {}", tmpPath);
}
final Path extraStateLocation = locations[i].resolve(STATE_DIR_NAME);
copyStateToExtraLocation(stateDir, extraStateLocation, fileName, tmpFileName);
}
} finally {
Files.deleteIfExists(tmpStatePath);
logger.trace("cleaned up {}", tmpStatePath);
}
cleanupOldFiles(prefix, fileName, locations);
cleanupOldFiles(fileName, locations);
}
protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream ) throws IOException {
@ -207,26 +227,29 @@ public abstract class MetaDataStateFormat<T> {
return new SimpleFSDirectory(dir);
}
private void cleanupOldFiles(final String prefix, final String currentStateFile, Path[] locations) throws IOException {
final DirectoryStream.Filter<Path> filter = entry -> {
final String entryFileName = entry.getFileName().toString();
return Files.isRegularFile(entry)
&& entryFileName.startsWith(prefix) // only state files
&& currentStateFile.equals(entryFileName) == false; // keep the current state file around
};
// now clean up the old files
for (Path dataLocation : locations) {
logger.trace("cleanupOldFiles: cleaning up {}", dataLocation);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dataLocation.resolve(STATE_DIR_NAME), filter)) {
for (Path stateFile : stream) {
Files.deleteIfExists(stateFile);
logger.trace("cleanupOldFiles: cleaned up {}", stateFile);
private void cleanupOldFiles(final String currentStateFile, Path[] locations) throws IOException {
for (Path location : locations) {
logger.trace("cleanupOldFiles: cleaning up {}", location);
Path stateLocation = location.resolve(STATE_DIR_NAME);
try (Directory stateDir = newDirectory(stateLocation)) {
for (String file : stateDir.listAll()) {
if (file.startsWith(prefix) && file.equals(currentStateFile) == false) {
deleteFileIfExists(stateLocation, stateDir, file);
}
}
}
}
}
long findMaxStateId(final String prefix, Path... locations) throws IOException {
/**
* Finds state file with maximum id.
*
* @param prefix - filename prefix
* @param locations - paths to directories with state folder
* @return maximum id of state file or -1 if no such files are found
* @throws IOException if IOException occurs
*/
private long findMaxStateId(final String prefix, Path... locations) throws IOException {
long maxId = -1;
for (Path dataLocation : locations) {
final Path resolve = dataLocation.resolve(STATE_DIR_NAME);
@ -245,6 +268,24 @@ public abstract class MetaDataStateFormat<T> {
return maxId;
}
private List<Path> findStateFilesByGeneration(final long generation, Path... locations) {
List<Path> files = new ArrayList<>();
if (generation == -1) {
return files;
}
final String fileName = prefix + generation + STATE_FILE_EXTENSION;
for (Path dataLocation : locations) {
final Path stateFilePath = dataLocation.resolve(STATE_DIR_NAME).resolve(fileName);
if (Files.exists(stateFilePath)) {
logger.trace("found state file: {}", stateFilePath);
files.add(stateFilePath);
}
}
return files;
}
/**
* Tries to load the latest state from the given data-locations. It tries to load the latest state determined by
* the states version from one or more data directories and if none of the latest states can be loaded an exception
@ -255,78 +296,39 @@ public abstract class MetaDataStateFormat<T> {
* @return the latest state or <code>null</code> if no state was found.
*/
public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws IOException {
List<PathAndStateId> files = new ArrayList<>();
long maxStateId = -1;
if (dataLocations != null) { // select all eligible files first
for (Path dataLocation : dataLocations) {
final Path stateDir = dataLocation.resolve(STATE_DIR_NAME);
// now, iterate over the current versions, and find latest one
// we don't check if the stateDir is present since it could be deleted
// after the check. Also if there is a _state file and it's not a dir something is really wrong
// we don't pass a glob since we need the group part for parsing
try (DirectoryStream<Path> paths = Files.newDirectoryStream(stateDir)) {
for (Path stateFile : paths) {
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
if (matcher.matches()) {
final long stateId = Long.parseLong(matcher.group(1));
maxStateId = Math.max(maxStateId, stateId);
PathAndStateId pav = new PathAndStateId(stateFile, stateId);
logger.trace("found state file: {}", pav);
files.add(pav);
}
}
} catch (NoSuchFileException | FileNotFoundException ex) {
// no _state directory -- move on
}
}
long maxStateId = findMaxStateId(prefix, dataLocations);
List<Path> stateFiles = findStateFilesByGeneration(maxStateId, dataLocations);
if (maxStateId > -1 && stateFiles.isEmpty()) {
throw new IllegalStateException("unable to find state files with state id " + maxStateId +
" returned by findMaxStateId function, in data folders [" +
Arrays.stream(dataLocations).map(Path::toAbsolutePath).
map(Object::toString).collect(Collectors.joining(", ")) +
"], concurrent writes?");
}
// NOTE: we might have multiple version of the latest state if there are multiple data dirs.. for this case
// we iterate only over the ones with the max version.
long finalMaxStateId = maxStateId;
Collection<PathAndStateId> pathAndStateIds = files
.stream()
.filter(pathAndStateId -> pathAndStateId.id == finalMaxStateId)
.collect(Collectors.toCollection(ArrayList::new));
final List<Throwable> exceptions = new ArrayList<>();
for (PathAndStateId pathAndStateId : pathAndStateIds) {
for (Path stateFile : stateFiles) {
try {
T state = read(namedXContentRegistry, pathAndStateId.file);
logger.trace("state id [{}] read from [{}]", pathAndStateId.id, pathAndStateId.file.getFileName());
T state = read(namedXContentRegistry, stateFile);
logger.trace("state id [{}] read from [{}]", maxStateId, stateFile.getFileName());
return state;
} catch (Exception e) {
exceptions.add(new IOException("failed to read " + pathAndStateId.toString(), e));
exceptions.add(new IOException("failed to read " + stateFile.toAbsolutePath(), e));
logger.debug(() -> new ParameterizedMessage(
"{}: failed to read [{}], ignoring...", pathAndStateId.file.toAbsolutePath(), prefix), e);
"{}: failed to read [{}], ignoring...", stateFile.toAbsolutePath(), prefix), e);
}
}
// if we reach this something went wrong
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
if (files.size() > 0) {
if (stateFiles.size() > 0) {
// We have some state files but none of them gave us a usable state
throw new IllegalStateException("Could not find a state file to recover from among " + files);
throw new IllegalStateException("Could not find a state file to recover from among " +
stateFiles.stream().map(Path::toAbsolutePath).map(Object::toString).collect(Collectors.joining(", ")));
}
return null;
}
/**
* Internal struct-like class that holds the parsed state id and the file
*/
private static class PathAndStateId {
final Path file;
final long id;
private PathAndStateId(Path file, long id) {
this.file = file;
this.id = id;
}
@Override
public String toString() {
return "[id:" + id + ", file:" + file.toAbsolutePath() + "]";
}
}
/**
* Deletes all meta state directories recursively for the given data locations
* @param dataLocations the data location to delete

View File

@ -288,6 +288,113 @@ public class MetaDataStateFormatTests extends ESTestCase {
}
}
private DummyState writeAndReadStateSuccessfully(Format format, Path... paths) throws IOException {
format.noFailures();
DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean());
format.write(state, paths);
assertEquals(state, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths));
ensureOnlyOneStateFile(paths);
return state;
}
private static void ensureOnlyOneStateFile(Path[] paths) throws IOException {
for (Path path : paths) {
try (Directory dir = new SimpleFSDirectory(path.resolve(MetaDataStateFormat.STATE_DIR_NAME))) {
assertThat(dir.listAll().length, equalTo(1));
}
}
}
public void testFailWriteAndReadPreviousState() throws IOException {
Path path = createTempDir();
Format format = new Format("foo-");
DummyState initialState = writeAndReadStateSuccessfully(format, path);
for (int i = 0; i < randomIntBetween(1, 5); i++) {
format.failOnMethods(Format.FAIL_DELETE_TMP_FILE, Format.FAIL_CREATE_OUTPUT_FILE, Format.FAIL_WRITE_TO_OUTPUT_FILE,
Format.FAIL_FSYNC_TMP_FILE, Format.FAIL_RENAME_TMP_FILE);
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean());
expectThrows(IOException.class, () -> format.write(newState, path));
format.noFailures();
assertEquals(initialState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, path));
}
writeAndReadStateSuccessfully(format, path);
}
public void testFailWriteAndReadAnyState() throws IOException {
Path path = createTempDir();
Format format = new Format("foo-");
Set<DummyState> possibleStates = new HashSet<>();
DummyState initialState = writeAndReadStateSuccessfully(format, path);
possibleStates.add(initialState);
for (int i = 0; i < randomIntBetween(1, 5); i++) {
format.failOnMethods(Format.FAIL_FSYNC_STATE_DIRECTORY);
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean());
possibleStates.add(newState);
expectThrows(IOException.class, () -> format.write(newState, path));
format.noFailures();
assertTrue(possibleStates.contains(format.loadLatestState(logger, NamedXContentRegistry.EMPTY, path)));
}
writeAndReadStateSuccessfully(format, path);
}
public void testFailCopyStateToExtraLocation() throws IOException {
Path paths[] = new Path[randomIntBetween(2, 5)];
for (int i = 0; i < paths.length; i++) {
paths[i] = createTempDir();
}
Format format = new Format("foo-");
writeAndReadStateSuccessfully(format, paths);
for (int i = 0; i < randomIntBetween(1, 5); i++) {
format.failOnMethods(Format.FAIL_OPEN_STATE_FILE_WHEN_COPYING);
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean());
expectThrows(IOException.class, () -> format.write(newState, paths));
format.noFailures();
assertEquals(newState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths));
}
writeAndReadStateSuccessfully(format, paths);
}
public void testFailRandomlyAndReadAnyState() throws IOException {
Path paths[] = new Path[randomIntBetween(1, 5)];
for (int i = 0; i < paths.length; i++) {
paths[i] = createTempDir();
}
Format format = new Format("foo-");
Set<DummyState> possibleStates = new HashSet<>();
DummyState initialState = writeAndReadStateSuccessfully(format, paths);
possibleStates.add(initialState);
for (int i = 0; i < randomIntBetween(1, 5); i++) {
format.failRandomly();
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean());
possibleStates.add(newState);
try {
format.write(newState, paths);
} catch (Exception e) {
// since we're injecting failures at random it's ok if exception is thrown, it's also ok if exception is not thrown
}
format.noFailures();
assertTrue(possibleStates.contains(format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths)));
}
writeAndReadStateSuccessfully(format, paths);
}
private static MetaDataStateFormat<MetaData> metaDataFormat() {
return new MetaDataStateFormat<MetaData>(MetaData.GLOBAL_STATE_FILE_PREFIX) {
@Override
@ -324,12 +431,36 @@ public class MetaDataStateFormatTests extends ESTestCase {
}
private class Format extends MetaDataStateFormat<DummyState> {
private static class Format extends MetaDataStateFormat<DummyState> {
private enum FailureMode {
NO_FAILURES,
FAIL_ON_METHOD,
FAIL_RANDOMLY
}
private FailureMode failureMode;
private String[] failureMethods;
static final String FAIL_CREATE_OUTPUT_FILE = "createOutput";
static final String FAIL_WRITE_TO_OUTPUT_FILE = "writeBytes";
static final String FAIL_FSYNC_TMP_FILE = "sync";
static final String FAIL_RENAME_TMP_FILE = "rename";
static final String FAIL_FSYNC_STATE_DIRECTORY = "syncMetaData";
static final String FAIL_DELETE_TMP_FILE = "deleteFile";
static final String FAIL_OPEN_STATE_FILE_WHEN_COPYING = "openInput";
/**
* Constructs a MetaDataStateFormat object for storing/retrieving DummyState.
* By default no I/O failures are injected.
* I/O failure behaviour can be controlled by {@link #noFailures()}, {@link #failOnMethods(String...)} and
* {@link #failRandomly()} method calls.
*/
Format(String prefix) {
super(prefix);
this.failureMode = FailureMode.NO_FAILURES;
}
@Override
public void toXContent(XContentBuilder builder, DummyState state) throws IOException {
state.toXContent(builder, null);
@ -340,9 +471,47 @@ public class MetaDataStateFormatTests extends ESTestCase {
return new DummyState().parse(parser);
}
public void noFailures() {
this.failureMode = FailureMode.NO_FAILURES;
}
public void failOnMethods(String... failureMethods) {
this.failureMode = FailureMode.FAIL_ON_METHOD;
this.failureMethods = failureMethods;
}
public void failRandomly() {
this.failureMode = FailureMode.FAIL_RANDOMLY;
}
@Override
protected Directory newDirectory(Path dir) throws IOException {
MockDirectoryWrapper mock = new MockDirectoryWrapper(random(), super.newDirectory(dir));
protected Directory newDirectory(Path dir) {
MockDirectoryWrapper mock = newMockFSDirectory(dir);
if (failureMode == FailureMode.FAIL_ON_METHOD) {
MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() {
@Override
public void eval(MockDirectoryWrapper dir) throws IOException {
String failMethod = randomFrom(failureMethods);
for (StackTraceElement e : Thread.currentThread().getStackTrace()) {
if (failMethod.equals(e.getMethodName())) {
throw new MockDirectoryWrapper.FakeIOException();
}
}
}
};
mock.failOn(fail);
} else if (failureMode == FailureMode.FAIL_RANDOMLY) {
MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() {
@Override
public void eval(MockDirectoryWrapper dir) throws IOException {
if (randomIntBetween(0, 20) == 0) {
throw new MockDirectoryWrapper.FakeIOException();
}
}
};
mock.failOn(fail);
}
closeAfterSuite(mock);
return mock;
}