diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d4807abeb23..9f7bc41add9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -219,20 +219,18 @@ public class InternalEngine extends Engine { private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException { final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - translogConfig.setTranslogGeneration(null); + Translog.TranslogGeneration generation = null; if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer); + generation = loadTranslogIdFromCommit(writer); // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! if (generation == null) { throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist"); } - translogConfig.setTranslogGeneration(generation); if (generation != null && generation.translogUUID == null) { throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first"); } } - final Translog translog = new Translog(translogConfig); - final Translog.TranslogGeneration generation = translogConfig.getTranslogGeneration(); + final Translog translog = new Translog(translogConfig, generation); if (generation == null || generation.translogUUID == null) { assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be " + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index a0fda01d286..781bcc99c21 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -83,7 +83,7 @@ import java.util.stream.Stream; *
*
* When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations.
- * The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration} on {@link TranslogConfig#getTranslogGeneration()} given when the translog is opened is compared against
+ * The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against
* the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next
* generation is committed using {@link Translog#commit()}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
* the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit()}. In such a case
@@ -130,19 +130,23 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final TranslogConfig config;
private final String translogUUID;
-
/**
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogConfig} has
* a non-null {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}. If the generation is null this method
* us destructive and will delete all files in the translog path given.
*
+ * @param config the configuration of this translog
+ * @param translogGeneration the translog generation to open. If this is null
a new translog is created. If non-null
+ * the translog tries to open the given translog generation. The generation is treated as the last generation referenced
+ * form already committed data. This means all operations that have not yet been committed should be in the translog
+ * file referenced by this generation. The translog creation will fail if this generation can't be opened.
+ *
* @see TranslogConfig#getTranslogPath()
+ *
*/
- public Translog(TranslogConfig config) throws IOException {
+ public Translog(TranslogConfig config, TranslogGeneration translogGeneration) throws IOException {
super(config.getShardId(), config.getIndexSettings());
this.config = config;
- TranslogGeneration translogGeneration = config.getTranslogGeneration();
-
if (translogGeneration == null || translogGeneration.translogUUID == null) { // legacy case
translogUUID = Strings.randomBase64UUID();
} else {
diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java
index 5f6e1912661..6367761c143 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java
@@ -40,7 +40,6 @@ public final class TranslogConfig {
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(8, ByteSizeUnit.KB);
private final BigArrays bigArrays;
- private volatile TranslogGeneration translogGeneration;
private final IndexSettings indexSettings;
private final ShardId shardId;
private final Path translogPath;
@@ -93,24 +92,6 @@ public final class TranslogConfig {
return translogPath;
}
- /**
- * Returns the translog generation to open. If this is null
a new translog is created. If non-null
- * the translog tries to open the given translog generation. The generation is treated as the last generation referenced
- * form already committed data. This means all operations that have not yet been committed should be in the translog
- * file referenced by this generation. The translog creation will fail if this generation can't be opened.
- */
- public TranslogGeneration getTranslogGeneration() {
- return translogGeneration; // TODO make this a ctor argument on the Translog - this mutable state is aweful
- }
-
- /**
- * Set the generation to be opened. Use null
to start with a fresh translog.
- * @see #getTranslogGeneration()
- */
- public void setTranslogGeneration(TranslogGeneration translogGeneration) {
- this.translogGeneration = translogGeneration;
- }
-
/**
* The translog buffer size. Default is 8kb
*/
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index e8208720015..f7e07c1d773 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -257,7 +257,7 @@ public class InternalEngineTests extends ESTestCase {
protected Translog createTranslog(Path translogPath) throws IOException {
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
- return new Translog(translogConfig);
+ return new Translog(translogConfig, null);
}
protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() {
@@ -1982,7 +1982,8 @@ public class InternalEngineTests extends ESTestCase {
Translog.TranslogGeneration generation = engine.getTranslog().getGeneration();
engine.close();
- Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE));
+ Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE)
+ , null);
translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
translog.close();
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 984908ad9d6..ed0fd3f6958 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -133,7 +133,7 @@ public class TranslogTests extends ESTestCase {
}
private Translog create(Path path) throws IOException {
- return new Translog(getTranslogConfig(path));
+ return new Translog(getTranslogConfig(path), null);
}
private TranslogConfig getTranslogConfig(Path path) {
@@ -951,8 +951,7 @@ public class TranslogTests extends ESTestCase {
TranslogConfig config = translog.getConfig();
translog.close();
- config.setTranslogGeneration(translogGeneration);
- translog = new Translog(config);
+ translog = new Translog(config, translogGeneration);
if (translogGeneration == null) {
assertEquals(0, translog.stats().estimatedNumberOfOperations());
assertEquals(1, translog.currentFileGeneration());
@@ -993,8 +992,7 @@ public class TranslogTests extends ESTestCase {
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
// translog here as well.
TranslogConfig config = translog.getConfig();
- config.setTranslogGeneration(translogGeneration);
- try (Translog translog = new Translog(config)) {
+ try (Translog translog = new Translog(config, translogGeneration)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
@@ -1007,7 +1005,7 @@ public class TranslogTests extends ESTestCase {
}
}
if (randomBoolean()) { // recover twice
- try (Translog translog = new Translog(config)) {
+ try (Translog translog = new Translog(config, translogGeneration)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
@@ -1044,12 +1042,11 @@ public class TranslogTests extends ESTestCase {
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
// translog here as well.
TranslogConfig config = translog.getConfig();
- config.setTranslogGeneration(translogGeneration);
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
- try (Translog translog = new Translog(config)) {
+ try (Translog translog = new Translog(config, translogGeneration)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
@@ -1064,7 +1061,7 @@ public class TranslogTests extends ESTestCase {
}
if (randomBoolean()) { // recover twice
- try (Translog translog = new Translog(config)) {
+ try (Translog translog = new Translog(config, translogGeneration)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
@@ -1098,18 +1095,17 @@ public class TranslogTests extends ESTestCase {
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
// translog here as well.
TranslogConfig config = translog.getConfig();
- config.setTranslogGeneration(translogGeneration);
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
Checkpoint corrupted = new Checkpoint(0, 0, 0);
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
- try (Translog translog = new Translog(config)) {
+ try (Translog translog = new Translog(config, translogGeneration)) {
fail("corrupted");
} catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}");
}
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
- try (Translog translog = new Translog(config)) {
+ try (Translog translog = new Translog(config, translogGeneration)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
@@ -1180,15 +1176,15 @@ public class TranslogTests extends ESTestCase {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
translog.close();
- config.setTranslogGeneration(new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration));
+ Translog.TranslogGeneration generation = new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1,
+ translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration);
try {
- new Translog(config);
+ new Translog(config, generation);
fail("translog doesn't belong to this UUID");
} catch (TranslogCorruptedException ex) {
}
- config.setTranslogGeneration(translogGeneration);
- this.translog = new Translog(config);
+ this.translog = new Translog(config, translogGeneration);
Translog.Snapshot snapshot = this.translog.newSnapshot();
for (int i = firstUncommitted; i < translogOperations; i++) {
Translog.Operation next = snapshot.next();
@@ -1357,8 +1353,7 @@ public class TranslogTests extends ESTestCase {
assertFalse(translog.isOpen());
translog.close(); // we are closed
- config.setTranslogGeneration(translogGeneration);
- try (Translog tlog = new Translog(config)) {
+ try (Translog tlog = new Translog(config, translogGeneration)) {
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
assertFalse(tlog.syncNeeded());
@@ -1393,7 +1388,7 @@ public class TranslogTests extends ESTestCase {
Path tempDir = createTempDir();
final FailSwitch fail = new FailSwitch();
TranslogConfig config = getTranslogConfig(tempDir);
- Translog translog = getFailableTranslog(fail, config, false, true);
+ Translog translog = getFailableTranslog(fail, config, false, true, null);
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
fail.failAlways();
@@ -1485,8 +1480,7 @@ public class TranslogTests extends ESTestCase {
iterator.remove();
}
}
- config.setTranslogGeneration(translog.getGeneration());
- try (Translog tlog = new Translog(config)) {
+ try (Translog tlog = new Translog(config, translog.getGeneration())) {
Translog.Snapshot snapshot = tlog.newSnapshot();
if (writtenOperations.size() != snapshot.totalOperations()) {
for (int i = 0; i < threadCount; i++) {
@@ -1507,7 +1501,7 @@ public class TranslogTests extends ESTestCase {
}
private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException {
- return getFailableTranslog(fail, config, randomBoolean(), false);
+ return getFailableTranslog(fail, config, randomBoolean(), false, null);
}
private static class FailSwitch {
@@ -1540,8 +1534,8 @@ public class TranslogTests extends ESTestCase {
}
- private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException) throws IOException {
- return new Translog(config) {
+ private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException, Translog.TranslogGeneration generation) throws IOException {
+ return new Translog(config, generation) {
@Override
TranslogWriter.ChannelFactory getChannelFactory() {
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();
@@ -1660,13 +1654,12 @@ public class TranslogTests extends ESTestCase {
public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException {
Path tempDir = createTempDir();
TranslogConfig config = getTranslogConfig(tempDir);
- Translog translog = new Translog(config);
+ Translog translog = new Translog(config, null);
translog.add(new Translog.Index("test", "boom", "boom".getBytes(Charset.forName("UTF-8"))));
Translog.TranslogGeneration generation = translog.getGeneration();
translog.close();
- config.setTranslogGeneration(generation);
try {
- new Translog(config) {
+ new Translog(config, generation) {
@Override
protected TranslogWriter createWriter(long fileGeneration) throws IOException {
throw new MockDirectoryWrapper.FakeIOException();
@@ -1689,8 +1682,7 @@ public class TranslogTests extends ESTestCase {
Checkpoint read = Checkpoint.read(ckp);
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
- config.setTranslogGeneration(translogGeneration);
- try (Translog tlog = new Translog(config)) {
+ try (Translog tlog = new Translog(config, translogGeneration)) {
assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded());
Translog.Snapshot snapshot = tlog.newSnapshot();
@@ -1701,7 +1693,7 @@ public class TranslogTests extends ESTestCase {
}
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
}
- try (Translog tlog = new Translog(config)) {
+ try (Translog tlog = new Translog(config, translogGeneration)) {
assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded());
Translog.Snapshot snapshot = tlog.newSnapshot();
@@ -1722,10 +1714,9 @@ public class TranslogTests extends ESTestCase {
Checkpoint read = Checkpoint.read(ckp);
// don't copy the new file
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
- config.setTranslogGeneration(translogGeneration);
try {
- Translog tlog = new Translog(config);
+ Translog tlog = new Translog(config, translogGeneration);
fail("file already exists?");
} catch (TranslogException ex) {
// all is well
@@ -1746,8 +1737,7 @@ public class TranslogTests extends ESTestCase {
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
// we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog"));
- config.setTranslogGeneration(translogGeneration);
- try (Translog tlog = new Translog(config)) {
+ try (Translog tlog = new Translog(config, translogGeneration)) {
assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded());
Translog.Snapshot snapshot = tlog.newSnapshot();
@@ -1760,7 +1750,7 @@ public class TranslogTests extends ESTestCase {
}
try {
- Translog tlog = new Translog(config);
+ Translog tlog = new Translog(config, translogGeneration);
fail("file already exists?");
} catch (TranslogException ex) {
// all is well
@@ -1787,8 +1777,9 @@ public class TranslogTests extends ESTestCase {
if (randomBoolean()) {
fail.onceFailedFailAlways();
}
+ Translog.TranslogGeneration generation = null;
try {
- final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false);
+ final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generation);
try {
LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
@@ -1822,7 +1813,7 @@ public class TranslogTests extends ESTestCase {
} catch (IOException ex) {
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
} finally {
- config.setTranslogGeneration(failableTLog.getGeneration());
+ generation = failableTLog.getGeneration();
IOUtils.closeWhileHandlingException(failableTLog);
}
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
@@ -1831,13 +1822,13 @@ public class TranslogTests extends ESTestCase {
// now randomly open this failing tlog again just to make sure we can also recover from failing during recovery
if (randomBoolean()) {
try {
- IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false));
+ IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generation));
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
// failed - that's ok, we didn't even create it
}
}
- try (Translog translog = new Translog(config)) {
+ try (Translog translog = new Translog(config, generation)) {
Translog.Snapshot snapshot = translog.newSnapshot();
assertEquals(syncedDocs.size(), snapshot.totalOperations());
for (int i = 0; i < syncedDocs.size(); i++) {