diff --git a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index 237b36b53d4..fe84fb9e38c 100644 --- a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -27,7 +27,7 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.OutputStreamIndexOutput; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.SimpleFSDirectory; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.logging.Loggers; @@ -48,9 +48,9 @@ import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; -import java.nio.file.StandardCopyOption; import java.util.ArrayList; -import java.util.Collection; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -69,7 +69,6 @@ public abstract class MetaDataStateFormat { private static final String STATE_FILE_CODEC = "state"; private static final int MIN_COMPATIBLE_STATE_FILE_VERSION = 1; private static final int STATE_FILE_VERSION = 1; - private static final int BUFFER_SIZE = 4096; private final String prefix; private final Pattern stateFilePattern; @@ -81,7 +80,64 @@ public abstract class MetaDataStateFormat { protected MetaDataStateFormat(String prefix) { this.prefix = prefix; this.stateFilePattern = Pattern.compile(Pattern.quote(prefix) + "(\\d+)(" + MetaDataStateFormat.STATE_FILE_EXTENSION + ")?"); + } + private static void deleteFileIfExists(Path stateLocation, Directory directory, String fileName) throws IOException { + try { + directory.deleteFile(fileName); + } catch (FileNotFoundException | NoSuchFileException ignored) { + + } + logger.trace("cleaned up {}", stateLocation.resolve(fileName)); + } + + private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String fileName, String tmpFileName) + throws IOException { + try { + deleteFileIfExists(stateLocation, stateDir, tmpFileName); + try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) { + CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); + out.writeInt(FORMAT.index()); + try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { + @Override + public void close() throws IOException { + // this is important since some of the XContentBuilders write bytes on close. + // in order to write the footer we need to prevent closing the actual index input. + } + })) { + + builder.startObject(); + { + toXContent(builder, state); + } + builder.endObject(); + } + CodecUtil.writeFooter(out); + } + + stateDir.sync(Collections.singleton(tmpFileName)); + stateDir.rename(tmpFileName, fileName); + stateDir.syncMetaData(); + logger.trace("written state to {}", stateLocation.resolve(fileName)); + } finally { + deleteFileIfExists(stateLocation, stateDir, tmpFileName); + } + } + + private void copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String fileName, String tmpFileName) + throws IOException { + try (Directory extraStateDir = newDirectory(extraStateLocation)) { + try { + deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); + extraStateDir.copyFrom(srcStateDir, fileName, tmpFileName, IOContext.DEFAULT); + extraStateDir.sync(Collections.singleton(tmpFileName)); + extraStateDir.rename(tmpFileName, fileName); + extraStateDir.syncMetaData(); + logger.trace("copied state to {}", extraStateLocation.resolve(fileName)); + } finally { + deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); + } + } } /** @@ -89,8 +145,10 @@ public abstract class MetaDataStateFormat { * state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it * doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to * it's target filename of the pattern {@code {prefix}{version}.st}. + * If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return it. + * But if this method throws an exception, loadLatestState could return this state or some previous state. * - * @param state the state object to write + * @param state the state object to write * @param locations the locations where the state should be written to. * @throws IOException if an IOException occurs */ @@ -101,60 +159,22 @@ public abstract class MetaDataStateFormat { if (locations.length <= 0) { throw new IllegalArgumentException("One or more locations required"); } - final long maxStateId = findMaxStateId(prefix, locations)+1; + final long maxStateId = findMaxStateId(prefix, locations) + 1; assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]"; - final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION; - Path stateLocation = locations[0].resolve(STATE_DIR_NAME); - Files.createDirectories(stateLocation); - final Path tmpStatePath = stateLocation.resolve(fileName + ".tmp"); - final Path finalStatePath = stateLocation.resolve(fileName); - try { - final String resourceDesc = "MetaDataStateFormat.write(path=\"" + tmpStatePath + "\")"; - try (OutputStreamIndexOutput out = - new OutputStreamIndexOutput(resourceDesc, fileName, Files.newOutputStream(tmpStatePath), BUFFER_SIZE)) { - CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); - out.writeInt(FORMAT.index()); - try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { - @Override - public void close() throws IOException { - // this is important since some of the XContentBuilders write bytes on close. - // in order to write the footer we need to prevent closing the actual index input. - } })) { - builder.startObject(); - { - toXContent(builder, state); - } - builder.endObject(); - } - CodecUtil.writeFooter(out); - } - IOUtils.fsync(tmpStatePath, false); // fsync the state file - Files.move(tmpStatePath, finalStatePath, StandardCopyOption.ATOMIC_MOVE); - IOUtils.fsync(stateLocation, true); - logger.trace("written state to {}", finalStatePath); + final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION; + final String tmpFileName = fileName + ".tmp"; + final Path firstStateLocation = locations[0].resolve(STATE_DIR_NAME); + try (Directory stateDir = newDirectory(firstStateLocation)) { + writeStateToFirstLocation(state, firstStateLocation, stateDir, fileName, tmpFileName); + for (int i = 1; i < locations.length; i++) { - stateLocation = locations[i].resolve(STATE_DIR_NAME); - Files.createDirectories(stateLocation); - Path tmpPath = stateLocation.resolve(fileName + ".tmp"); - Path finalPath = stateLocation.resolve(fileName); - try { - Files.copy(finalStatePath, tmpPath); - IOUtils.fsync(tmpPath, false); // fsync the state file - // we are on the same FileSystem / Partition here we can do an atomic move - Files.move(tmpPath, finalPath, StandardCopyOption.ATOMIC_MOVE); - IOUtils.fsync(stateLocation, true); - logger.trace("copied state to {}", finalPath); - } finally { - Files.deleteIfExists(tmpPath); - logger.trace("cleaned up {}", tmpPath); - } + final Path extraStateLocation = locations[i].resolve(STATE_DIR_NAME); + copyStateToExtraLocation(stateDir, extraStateLocation, fileName, tmpFileName); } - } finally { - Files.deleteIfExists(tmpStatePath); - logger.trace("cleaned up {}", tmpStatePath); } - cleanupOldFiles(prefix, fileName, locations); + + cleanupOldFiles(fileName, locations); } protected XContentBuilder newXContentBuilder(XContentType type, OutputStream stream ) throws IOException { @@ -207,26 +227,29 @@ public abstract class MetaDataStateFormat { return new SimpleFSDirectory(dir); } - private void cleanupOldFiles(final String prefix, final String currentStateFile, Path[] locations) throws IOException { - final DirectoryStream.Filter filter = entry -> { - final String entryFileName = entry.getFileName().toString(); - return Files.isRegularFile(entry) - && entryFileName.startsWith(prefix) // only state files - && currentStateFile.equals(entryFileName) == false; // keep the current state file around - }; - // now clean up the old files - for (Path dataLocation : locations) { - logger.trace("cleanupOldFiles: cleaning up {}", dataLocation); - try (DirectoryStream stream = Files.newDirectoryStream(dataLocation.resolve(STATE_DIR_NAME), filter)) { - for (Path stateFile : stream) { - Files.deleteIfExists(stateFile); - logger.trace("cleanupOldFiles: cleaned up {}", stateFile); + private void cleanupOldFiles(final String currentStateFile, Path[] locations) throws IOException { + for (Path location : locations) { + logger.trace("cleanupOldFiles: cleaning up {}", location); + Path stateLocation = location.resolve(STATE_DIR_NAME); + try (Directory stateDir = newDirectory(stateLocation)) { + for (String file : stateDir.listAll()) { + if (file.startsWith(prefix) && file.equals(currentStateFile) == false) { + deleteFileIfExists(stateLocation, stateDir, file); + } } } } } - long findMaxStateId(final String prefix, Path... locations) throws IOException { + /** + * Finds state file with maximum id. + * + * @param prefix - filename prefix + * @param locations - paths to directories with state folder + * @return maximum id of state file or -1 if no such files are found + * @throws IOException if IOException occurs + */ + private long findMaxStateId(final String prefix, Path... locations) throws IOException { long maxId = -1; for (Path dataLocation : locations) { final Path resolve = dataLocation.resolve(STATE_DIR_NAME); @@ -245,6 +268,24 @@ public abstract class MetaDataStateFormat { return maxId; } + private List findStateFilesByGeneration(final long generation, Path... locations) { + List files = new ArrayList<>(); + if (generation == -1) { + return files; + } + + final String fileName = prefix + generation + STATE_FILE_EXTENSION; + for (Path dataLocation : locations) { + final Path stateFilePath = dataLocation.resolve(STATE_DIR_NAME).resolve(fileName); + if (Files.exists(stateFilePath)) { + logger.trace("found state file: {}", stateFilePath); + files.add(stateFilePath); + } + } + + return files; + } + /** * Tries to load the latest state from the given data-locations. It tries to load the latest state determined by * the states version from one or more data directories and if none of the latest states can be loaded an exception @@ -255,78 +296,39 @@ public abstract class MetaDataStateFormat { * @return the latest state or null if no state was found. */ public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws IOException { - List files = new ArrayList<>(); - long maxStateId = -1; - if (dataLocations != null) { // select all eligible files first - for (Path dataLocation : dataLocations) { - final Path stateDir = dataLocation.resolve(STATE_DIR_NAME); - // now, iterate over the current versions, and find latest one - // we don't check if the stateDir is present since it could be deleted - // after the check. Also if there is a _state file and it's not a dir something is really wrong - // we don't pass a glob since we need the group part for parsing - try (DirectoryStream paths = Files.newDirectoryStream(stateDir)) { - for (Path stateFile : paths) { - final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString()); - if (matcher.matches()) { - final long stateId = Long.parseLong(matcher.group(1)); - maxStateId = Math.max(maxStateId, stateId); - PathAndStateId pav = new PathAndStateId(stateFile, stateId); - logger.trace("found state file: {}", pav); - files.add(pav); - } - } - } catch (NoSuchFileException | FileNotFoundException ex) { - // no _state directory -- move on - } - } + long maxStateId = findMaxStateId(prefix, dataLocations); + List stateFiles = findStateFilesByGeneration(maxStateId, dataLocations); + + if (maxStateId > -1 && stateFiles.isEmpty()) { + throw new IllegalStateException("unable to find state files with state id " + maxStateId + + " returned by findMaxStateId function, in data folders [" + + Arrays.stream(dataLocations).map(Path::toAbsolutePath). + map(Object::toString).collect(Collectors.joining(", ")) + + "], concurrent writes?"); } - // NOTE: we might have multiple version of the latest state if there are multiple data dirs.. for this case - // we iterate only over the ones with the max version. - long finalMaxStateId = maxStateId; - Collection pathAndStateIds = files - .stream() - .filter(pathAndStateId -> pathAndStateId.id == finalMaxStateId) - .collect(Collectors.toCollection(ArrayList::new)); final List exceptions = new ArrayList<>(); - for (PathAndStateId pathAndStateId : pathAndStateIds) { + for (Path stateFile : stateFiles) { try { - T state = read(namedXContentRegistry, pathAndStateId.file); - logger.trace("state id [{}] read from [{}]", pathAndStateId.id, pathAndStateId.file.getFileName()); + T state = read(namedXContentRegistry, stateFile); + logger.trace("state id [{}] read from [{}]", maxStateId, stateFile.getFileName()); return state; } catch (Exception e) { - exceptions.add(new IOException("failed to read " + pathAndStateId.toString(), e)); + exceptions.add(new IOException("failed to read " + stateFile.toAbsolutePath(), e)); logger.debug(() -> new ParameterizedMessage( - "{}: failed to read [{}], ignoring...", pathAndStateId.file.toAbsolutePath(), prefix), e); + "{}: failed to read [{}], ignoring...", stateFile.toAbsolutePath(), prefix), e); } } // if we reach this something went wrong ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); - if (files.size() > 0) { + if (stateFiles.size() > 0) { // We have some state files but none of them gave us a usable state - throw new IllegalStateException("Could not find a state file to recover from among " + files); + throw new IllegalStateException("Could not find a state file to recover from among " + + stateFiles.stream().map(Path::toAbsolutePath).map(Object::toString).collect(Collectors.joining(", "))); } return null; } - /** - * Internal struct-like class that holds the parsed state id and the file - */ - private static class PathAndStateId { - final Path file; - final long id; - - private PathAndStateId(Path file, long id) { - this.file = file; - this.id = id; - } - - @Override - public String toString() { - return "[id:" + id + ", file:" + file.toAbsolutePath() + "]"; - } - } - /** * Deletes all meta state directories recursively for the given data locations * @param dataLocations the data location to delete diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java index 8f89e59003c..4acb5f428a7 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java @@ -288,6 +288,113 @@ public class MetaDataStateFormatTests extends ESTestCase { } } + private DummyState writeAndReadStateSuccessfully(Format format, Path... paths) throws IOException { + format.noFailures(); + DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), + randomDouble(), randomBoolean()); + format.write(state, paths); + assertEquals(state, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths)); + ensureOnlyOneStateFile(paths); + return state; + } + + private static void ensureOnlyOneStateFile(Path[] paths) throws IOException { + for (Path path : paths) { + try (Directory dir = new SimpleFSDirectory(path.resolve(MetaDataStateFormat.STATE_DIR_NAME))) { + assertThat(dir.listAll().length, equalTo(1)); + } + } + } + + public void testFailWriteAndReadPreviousState() throws IOException { + Path path = createTempDir(); + Format format = new Format("foo-"); + + DummyState initialState = writeAndReadStateSuccessfully(format, path); + + for (int i = 0; i < randomIntBetween(1, 5); i++) { + format.failOnMethods(Format.FAIL_DELETE_TMP_FILE, Format.FAIL_CREATE_OUTPUT_FILE, Format.FAIL_WRITE_TO_OUTPUT_FILE, + Format.FAIL_FSYNC_TMP_FILE, Format.FAIL_RENAME_TMP_FILE); + DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), + randomDouble(), randomBoolean()); + expectThrows(IOException.class, () -> format.write(newState, path)); + format.noFailures(); + assertEquals(initialState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, path)); + } + + writeAndReadStateSuccessfully(format, path); + } + + public void testFailWriteAndReadAnyState() throws IOException { + Path path = createTempDir(); + Format format = new Format("foo-"); + Set possibleStates = new HashSet<>(); + + DummyState initialState = writeAndReadStateSuccessfully(format, path); + possibleStates.add(initialState); + + for (int i = 0; i < randomIntBetween(1, 5); i++) { + format.failOnMethods(Format.FAIL_FSYNC_STATE_DIRECTORY); + DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), + randomDouble(), randomBoolean()); + possibleStates.add(newState); + expectThrows(IOException.class, () -> format.write(newState, path)); + format.noFailures(); + assertTrue(possibleStates.contains(format.loadLatestState(logger, NamedXContentRegistry.EMPTY, path))); + } + + writeAndReadStateSuccessfully(format, path); + } + + public void testFailCopyStateToExtraLocation() throws IOException { + Path paths[] = new Path[randomIntBetween(2, 5)]; + for (int i = 0; i < paths.length; i++) { + paths[i] = createTempDir(); + } + Format format = new Format("foo-"); + + writeAndReadStateSuccessfully(format, paths); + + for (int i = 0; i < randomIntBetween(1, 5); i++) { + format.failOnMethods(Format.FAIL_OPEN_STATE_FILE_WHEN_COPYING); + DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), + randomDouble(), randomBoolean()); + expectThrows(IOException.class, () -> format.write(newState, paths)); + format.noFailures(); + assertEquals(newState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths)); + } + + writeAndReadStateSuccessfully(format, paths); + } + + public void testFailRandomlyAndReadAnyState() throws IOException { + Path paths[] = new Path[randomIntBetween(1, 5)]; + for (int i = 0; i < paths.length; i++) { + paths[i] = createTempDir(); + } + Format format = new Format("foo-"); + Set possibleStates = new HashSet<>(); + + DummyState initialState = writeAndReadStateSuccessfully(format, paths); + possibleStates.add(initialState); + + for (int i = 0; i < randomIntBetween(1, 5); i++) { + format.failRandomly(); + DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), + randomDouble(), randomBoolean()); + possibleStates.add(newState); + try { + format.write(newState, paths); + } catch (Exception e) { + // since we're injecting failures at random it's ok if exception is thrown, it's also ok if exception is not thrown + } + format.noFailures(); + assertTrue(possibleStates.contains(format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths))); + } + + writeAndReadStateSuccessfully(format, paths); + } + private static MetaDataStateFormat metaDataFormat() { return new MetaDataStateFormat(MetaData.GLOBAL_STATE_FILE_PREFIX) { @Override @@ -324,12 +431,36 @@ public class MetaDataStateFormatTests extends ESTestCase { } - private class Format extends MetaDataStateFormat { + private static class Format extends MetaDataStateFormat { + private enum FailureMode { + NO_FAILURES, + FAIL_ON_METHOD, + FAIL_RANDOMLY + } + private FailureMode failureMode; + private String[] failureMethods; + + static final String FAIL_CREATE_OUTPUT_FILE = "createOutput"; + static final String FAIL_WRITE_TO_OUTPUT_FILE = "writeBytes"; + static final String FAIL_FSYNC_TMP_FILE = "sync"; + static final String FAIL_RENAME_TMP_FILE = "rename"; + static final String FAIL_FSYNC_STATE_DIRECTORY = "syncMetaData"; + static final String FAIL_DELETE_TMP_FILE = "deleteFile"; + static final String FAIL_OPEN_STATE_FILE_WHEN_COPYING = "openInput"; + + /** + * Constructs a MetaDataStateFormat object for storing/retrieving DummyState. + * By default no I/O failures are injected. + * I/O failure behaviour can be controlled by {@link #noFailures()}, {@link #failOnMethods(String...)} and + * {@link #failRandomly()} method calls. + */ Format(String prefix) { super(prefix); + this.failureMode = FailureMode.NO_FAILURES; } + @Override public void toXContent(XContentBuilder builder, DummyState state) throws IOException { state.toXContent(builder, null); @@ -340,9 +471,47 @@ public class MetaDataStateFormatTests extends ESTestCase { return new DummyState().parse(parser); } + + public void noFailures() { + this.failureMode = FailureMode.NO_FAILURES; + } + + public void failOnMethods(String... failureMethods) { + this.failureMode = FailureMode.FAIL_ON_METHOD; + this.failureMethods = failureMethods; + } + + public void failRandomly() { + this.failureMode = FailureMode.FAIL_RANDOMLY; + } + @Override - protected Directory newDirectory(Path dir) throws IOException { - MockDirectoryWrapper mock = new MockDirectoryWrapper(random(), super.newDirectory(dir)); + protected Directory newDirectory(Path dir) { + MockDirectoryWrapper mock = newMockFSDirectory(dir); + if (failureMode == FailureMode.FAIL_ON_METHOD) { + MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { + @Override + public void eval(MockDirectoryWrapper dir) throws IOException { + String failMethod = randomFrom(failureMethods); + for (StackTraceElement e : Thread.currentThread().getStackTrace()) { + if (failMethod.equals(e.getMethodName())) { + throw new MockDirectoryWrapper.FakeIOException(); + } + } + } + }; + mock.failOn(fail); + } else if (failureMode == FailureMode.FAIL_RANDOMLY) { + MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { + @Override + public void eval(MockDirectoryWrapper dir) throws IOException { + if (randomIntBetween(0, 20) == 0) { + throw new MockDirectoryWrapper.FakeIOException(); + } + } + }; + mock.failOn(fail); + } closeAfterSuite(mock); return mock; }