Merge pull request #17457 from s1monw/make_translog_config_immutable
Make TranslogConfig immutable and pass TranslogGeneration as a ctor arg to Translog This mutable state is confusing and is easily missed. By default this is null and wipes all translog. This commit makes the TranslogGeneration mandatory on the Translog constructor and removes the mutalbe state.
This commit is contained in:
commit
283eff13aa
|
@ -219,20 +219,18 @@ public class InternalEngine extends Engine {
|
||||||
|
|
||||||
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException {
|
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException {
|
||||||
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
|
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
|
||||||
translogConfig.setTranslogGeneration(null);
|
Translog.TranslogGeneration generation = null;
|
||||||
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
|
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
|
||||||
final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer);
|
generation = loadTranslogIdFromCommit(writer);
|
||||||
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
|
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
|
||||||
if (generation == null) {
|
if (generation == null) {
|
||||||
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
|
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
|
||||||
}
|
}
|
||||||
translogConfig.setTranslogGeneration(generation);
|
|
||||||
if (generation != null && generation.translogUUID == null) {
|
if (generation != null && generation.translogUUID == null) {
|
||||||
throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
|
throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final Translog translog = new Translog(translogConfig);
|
final Translog translog = new Translog(translogConfig, generation);
|
||||||
final Translog.TranslogGeneration generation = translogConfig.getTranslogGeneration();
|
|
||||||
if (generation == null || generation.translogUUID == null) {
|
if (generation == null || generation.translogUUID == null) {
|
||||||
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
|
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
|
||||||
+ EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
|
+ EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
|
||||||
|
|
|
@ -83,7 +83,7 @@ import java.util.stream.Stream;
|
||||||
* </p>
|
* </p>
|
||||||
* <p>
|
* <p>
|
||||||
* When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations.
|
* When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations.
|
||||||
* The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration} on {@link TranslogConfig#getTranslogGeneration()} given when the translog is opened is compared against
|
* The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against
|
||||||
* the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next
|
* the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next
|
||||||
* generation is committed using {@link Translog#commit()}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
|
* generation is committed using {@link Translog#commit()}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
|
||||||
* the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit()}. In such a case
|
* the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit()}. In such a case
|
||||||
|
@ -130,19 +130,23 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
private final TranslogConfig config;
|
private final TranslogConfig config;
|
||||||
private final String translogUUID;
|
private final String translogUUID;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogConfig} has
|
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogConfig} has
|
||||||
* a non-null {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}. If the generation is null this method
|
* a non-null {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}. If the generation is null this method
|
||||||
* us destructive and will delete all files in the translog path given.
|
* us destructive and will delete all files in the translog path given.
|
||||||
*
|
*
|
||||||
|
* @param config the configuration of this translog
|
||||||
|
* @param translogGeneration the translog generation to open. If this is <code>null</code> a new translog is created. If non-null
|
||||||
|
* the translog tries to open the given translog generation. The generation is treated as the last generation referenced
|
||||||
|
* form already committed data. This means all operations that have not yet been committed should be in the translog
|
||||||
|
* file referenced by this generation. The translog creation will fail if this generation can't be opened.
|
||||||
|
*
|
||||||
* @see TranslogConfig#getTranslogPath()
|
* @see TranslogConfig#getTranslogPath()
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
public Translog(TranslogConfig config) throws IOException {
|
public Translog(TranslogConfig config, TranslogGeneration translogGeneration) throws IOException {
|
||||||
super(config.getShardId(), config.getIndexSettings());
|
super(config.getShardId(), config.getIndexSettings());
|
||||||
this.config = config;
|
this.config = config;
|
||||||
TranslogGeneration translogGeneration = config.getTranslogGeneration();
|
|
||||||
|
|
||||||
if (translogGeneration == null || translogGeneration.translogUUID == null) { // legacy case
|
if (translogGeneration == null || translogGeneration.translogUUID == null) { // legacy case
|
||||||
translogUUID = Strings.randomBase64UUID();
|
translogUUID = Strings.randomBase64UUID();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -40,7 +40,6 @@ public final class TranslogConfig {
|
||||||
|
|
||||||
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(8, ByteSizeUnit.KB);
|
public static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(8, ByteSizeUnit.KB);
|
||||||
private final BigArrays bigArrays;
|
private final BigArrays bigArrays;
|
||||||
private volatile TranslogGeneration translogGeneration;
|
|
||||||
private final IndexSettings indexSettings;
|
private final IndexSettings indexSettings;
|
||||||
private final ShardId shardId;
|
private final ShardId shardId;
|
||||||
private final Path translogPath;
|
private final Path translogPath;
|
||||||
|
@ -93,24 +92,6 @@ public final class TranslogConfig {
|
||||||
return translogPath;
|
return translogPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the translog generation to open. If this is <code>null</code> a new translog is created. If non-null
|
|
||||||
* the translog tries to open the given translog generation. The generation is treated as the last generation referenced
|
|
||||||
* form already committed data. This means all operations that have not yet been committed should be in the translog
|
|
||||||
* file referenced by this generation. The translog creation will fail if this generation can't be opened.
|
|
||||||
*/
|
|
||||||
public TranslogGeneration getTranslogGeneration() {
|
|
||||||
return translogGeneration; // TODO make this a ctor argument on the Translog - this mutable state is aweful
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the generation to be opened. Use <code>null</code> to start with a fresh translog.
|
|
||||||
* @see #getTranslogGeneration()
|
|
||||||
*/
|
|
||||||
public void setTranslogGeneration(TranslogGeneration translogGeneration) {
|
|
||||||
this.translogGeneration = translogGeneration;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The translog buffer size. Default is <tt>8kb</tt>
|
* The translog buffer size. Default is <tt>8kb</tt>
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -257,7 +257,7 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
|
|
||||||
protected Translog createTranslog(Path translogPath) throws IOException {
|
protected Translog createTranslog(Path translogPath) throws IOException {
|
||||||
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
|
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
|
||||||
return new Translog(translogConfig);
|
return new Translog(translogConfig, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() {
|
protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() {
|
||||||
|
@ -1982,7 +1982,8 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
Translog.TranslogGeneration generation = engine.getTranslog().getGeneration();
|
Translog.TranslogGeneration generation = engine.getTranslog().getGeneration();
|
||||||
engine.close();
|
engine.close();
|
||||||
|
|
||||||
Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE));
|
Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE)
|
||||||
|
, null);
|
||||||
translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
|
translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
|
||||||
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
|
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
|
||||||
translog.close();
|
translog.close();
|
||||||
|
|
|
@ -133,7 +133,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Translog create(Path path) throws IOException {
|
private Translog create(Path path) throws IOException {
|
||||||
return new Translog(getTranslogConfig(path));
|
return new Translog(getTranslogConfig(path), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private TranslogConfig getTranslogConfig(Path path) {
|
private TranslogConfig getTranslogConfig(Path path) {
|
||||||
|
@ -951,8 +951,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
TranslogConfig config = translog.getConfig();
|
TranslogConfig config = translog.getConfig();
|
||||||
|
|
||||||
translog.close();
|
translog.close();
|
||||||
config.setTranslogGeneration(translogGeneration);
|
translog = new Translog(config, translogGeneration);
|
||||||
translog = new Translog(config);
|
|
||||||
if (translogGeneration == null) {
|
if (translogGeneration == null) {
|
||||||
assertEquals(0, translog.stats().estimatedNumberOfOperations());
|
assertEquals(0, translog.stats().estimatedNumberOfOperations());
|
||||||
assertEquals(1, translog.currentFileGeneration());
|
assertEquals(1, translog.currentFileGeneration());
|
||||||
|
@ -993,8 +992,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
|
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
|
||||||
// translog here as well.
|
// translog here as well.
|
||||||
TranslogConfig config = translog.getConfig();
|
TranslogConfig config = translog.getConfig();
|
||||||
config.setTranslogGeneration(translogGeneration);
|
try (Translog translog = new Translog(config, translogGeneration)) {
|
||||||
try (Translog translog = new Translog(config)) {
|
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
||||||
assertFalse(translog.syncNeeded());
|
assertFalse(translog.syncNeeded());
|
||||||
|
@ -1007,7 +1005,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (randomBoolean()) { // recover twice
|
if (randomBoolean()) { // recover twice
|
||||||
try (Translog translog = new Translog(config)) {
|
try (Translog translog = new Translog(config, translogGeneration)) {
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
|
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
|
||||||
assertFalse(translog.syncNeeded());
|
assertFalse(translog.syncNeeded());
|
||||||
|
@ -1044,12 +1042,11 @@ public class TranslogTests extends ESTestCase {
|
||||||
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
|
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
|
||||||
// translog here as well.
|
// translog here as well.
|
||||||
TranslogConfig config = translog.getConfig();
|
TranslogConfig config = translog.getConfig();
|
||||||
config.setTranslogGeneration(translogGeneration);
|
|
||||||
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
||||||
Checkpoint read = Checkpoint.read(ckp);
|
Checkpoint read = Checkpoint.read(ckp);
|
||||||
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
|
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
|
||||||
|
|
||||||
try (Translog translog = new Translog(config)) {
|
try (Translog translog = new Translog(config, translogGeneration)) {
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
||||||
assertFalse(translog.syncNeeded());
|
assertFalse(translog.syncNeeded());
|
||||||
|
@ -1064,7 +1061,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (randomBoolean()) { // recover twice
|
if (randomBoolean()) { // recover twice
|
||||||
try (Translog translog = new Translog(config)) {
|
try (Translog translog = new Translog(config, translogGeneration)) {
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
|
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
|
||||||
assertFalse(translog.syncNeeded());
|
assertFalse(translog.syncNeeded());
|
||||||
|
@ -1098,18 +1095,17 @@ public class TranslogTests extends ESTestCase {
|
||||||
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
|
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
|
||||||
// translog here as well.
|
// translog here as well.
|
||||||
TranslogConfig config = translog.getConfig();
|
TranslogConfig config = translog.getConfig();
|
||||||
config.setTranslogGeneration(translogGeneration);
|
|
||||||
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
||||||
Checkpoint read = Checkpoint.read(ckp);
|
Checkpoint read = Checkpoint.read(ckp);
|
||||||
Checkpoint corrupted = new Checkpoint(0, 0, 0);
|
Checkpoint corrupted = new Checkpoint(0, 0, 0);
|
||||||
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
||||||
try (Translog translog = new Translog(config)) {
|
try (Translog translog = new Translog(config, translogGeneration)) {
|
||||||
fail("corrupted");
|
fail("corrupted");
|
||||||
} catch (IllegalStateException ex) {
|
} catch (IllegalStateException ex) {
|
||||||
assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}");
|
assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}");
|
||||||
}
|
}
|
||||||
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
|
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
|
||||||
try (Translog translog = new Translog(config)) {
|
try (Translog translog = new Translog(config, translogGeneration)) {
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
||||||
assertFalse(translog.syncNeeded());
|
assertFalse(translog.syncNeeded());
|
||||||
|
@ -1180,15 +1176,15 @@ public class TranslogTests extends ESTestCase {
|
||||||
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
|
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
|
||||||
translog.close();
|
translog.close();
|
||||||
|
|
||||||
config.setTranslogGeneration(new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration));
|
Translog.TranslogGeneration generation = new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1,
|
||||||
|
translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration);
|
||||||
try {
|
try {
|
||||||
new Translog(config);
|
new Translog(config, generation);
|
||||||
fail("translog doesn't belong to this UUID");
|
fail("translog doesn't belong to this UUID");
|
||||||
} catch (TranslogCorruptedException ex) {
|
} catch (TranslogCorruptedException ex) {
|
||||||
|
|
||||||
}
|
}
|
||||||
config.setTranslogGeneration(translogGeneration);
|
this.translog = new Translog(config, translogGeneration);
|
||||||
this.translog = new Translog(config);
|
|
||||||
Translog.Snapshot snapshot = this.translog.newSnapshot();
|
Translog.Snapshot snapshot = this.translog.newSnapshot();
|
||||||
for (int i = firstUncommitted; i < translogOperations; i++) {
|
for (int i = firstUncommitted; i < translogOperations; i++) {
|
||||||
Translog.Operation next = snapshot.next();
|
Translog.Operation next = snapshot.next();
|
||||||
|
@ -1357,8 +1353,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
|
|
||||||
assertFalse(translog.isOpen());
|
assertFalse(translog.isOpen());
|
||||||
translog.close(); // we are closed
|
translog.close(); // we are closed
|
||||||
config.setTranslogGeneration(translogGeneration);
|
try (Translog tlog = new Translog(config, translogGeneration)) {
|
||||||
try (Translog tlog = new Translog(config)) {
|
|
||||||
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
|
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
|
||||||
assertFalse(tlog.syncNeeded());
|
assertFalse(tlog.syncNeeded());
|
||||||
|
|
||||||
|
@ -1393,7 +1388,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
Path tempDir = createTempDir();
|
Path tempDir = createTempDir();
|
||||||
final FailSwitch fail = new FailSwitch();
|
final FailSwitch fail = new FailSwitch();
|
||||||
TranslogConfig config = getTranslogConfig(tempDir);
|
TranslogConfig config = getTranslogConfig(tempDir);
|
||||||
Translog translog = getFailableTranslog(fail, config, false, true);
|
Translog translog = getFailableTranslog(fail, config, false, true, null);
|
||||||
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
|
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
|
||||||
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
|
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
|
||||||
fail.failAlways();
|
fail.failAlways();
|
||||||
|
@ -1485,8 +1480,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
iterator.remove();
|
iterator.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
config.setTranslogGeneration(translog.getGeneration());
|
try (Translog tlog = new Translog(config, translog.getGeneration())) {
|
||||||
try (Translog tlog = new Translog(config)) {
|
|
||||||
Translog.Snapshot snapshot = tlog.newSnapshot();
|
Translog.Snapshot snapshot = tlog.newSnapshot();
|
||||||
if (writtenOperations.size() != snapshot.totalOperations()) {
|
if (writtenOperations.size() != snapshot.totalOperations()) {
|
||||||
for (int i = 0; i < threadCount; i++) {
|
for (int i = 0; i < threadCount; i++) {
|
||||||
|
@ -1507,7 +1501,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException {
|
private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException {
|
||||||
return getFailableTranslog(fail, config, randomBoolean(), false);
|
return getFailableTranslog(fail, config, randomBoolean(), false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class FailSwitch {
|
private static class FailSwitch {
|
||||||
|
@ -1540,8 +1534,8 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException) throws IOException {
|
private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException, Translog.TranslogGeneration generation) throws IOException {
|
||||||
return new Translog(config) {
|
return new Translog(config, generation) {
|
||||||
@Override
|
@Override
|
||||||
TranslogWriter.ChannelFactory getChannelFactory() {
|
TranslogWriter.ChannelFactory getChannelFactory() {
|
||||||
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();
|
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();
|
||||||
|
@ -1660,13 +1654,12 @@ public class TranslogTests extends ESTestCase {
|
||||||
public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException {
|
public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException {
|
||||||
Path tempDir = createTempDir();
|
Path tempDir = createTempDir();
|
||||||
TranslogConfig config = getTranslogConfig(tempDir);
|
TranslogConfig config = getTranslogConfig(tempDir);
|
||||||
Translog translog = new Translog(config);
|
Translog translog = new Translog(config, null);
|
||||||
translog.add(new Translog.Index("test", "boom", "boom".getBytes(Charset.forName("UTF-8"))));
|
translog.add(new Translog.Index("test", "boom", "boom".getBytes(Charset.forName("UTF-8"))));
|
||||||
Translog.TranslogGeneration generation = translog.getGeneration();
|
Translog.TranslogGeneration generation = translog.getGeneration();
|
||||||
translog.close();
|
translog.close();
|
||||||
config.setTranslogGeneration(generation);
|
|
||||||
try {
|
try {
|
||||||
new Translog(config) {
|
new Translog(config, generation) {
|
||||||
@Override
|
@Override
|
||||||
protected TranslogWriter createWriter(long fileGeneration) throws IOException {
|
protected TranslogWriter createWriter(long fileGeneration) throws IOException {
|
||||||
throw new MockDirectoryWrapper.FakeIOException();
|
throw new MockDirectoryWrapper.FakeIOException();
|
||||||
|
@ -1689,8 +1682,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
Checkpoint read = Checkpoint.read(ckp);
|
Checkpoint read = Checkpoint.read(ckp);
|
||||||
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
|
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
|
||||||
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
|
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
|
||||||
config.setTranslogGeneration(translogGeneration);
|
try (Translog tlog = new Translog(config, translogGeneration)) {
|
||||||
try (Translog tlog = new Translog(config)) {
|
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertFalse(tlog.syncNeeded());
|
assertFalse(tlog.syncNeeded());
|
||||||
Translog.Snapshot snapshot = tlog.newSnapshot();
|
Translog.Snapshot snapshot = tlog.newSnapshot();
|
||||||
|
@ -1701,7 +1693,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
||||||
}
|
}
|
||||||
try (Translog tlog = new Translog(config)) {
|
try (Translog tlog = new Translog(config, translogGeneration)) {
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertFalse(tlog.syncNeeded());
|
assertFalse(tlog.syncNeeded());
|
||||||
Translog.Snapshot snapshot = tlog.newSnapshot();
|
Translog.Snapshot snapshot = tlog.newSnapshot();
|
||||||
|
@ -1722,10 +1714,9 @@ public class TranslogTests extends ESTestCase {
|
||||||
Checkpoint read = Checkpoint.read(ckp);
|
Checkpoint read = Checkpoint.read(ckp);
|
||||||
// don't copy the new file
|
// don't copy the new file
|
||||||
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
|
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
|
||||||
config.setTranslogGeneration(translogGeneration);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Translog tlog = new Translog(config);
|
Translog tlog = new Translog(config, translogGeneration);
|
||||||
fail("file already exists?");
|
fail("file already exists?");
|
||||||
} catch (TranslogException ex) {
|
} catch (TranslogException ex) {
|
||||||
// all is well
|
// all is well
|
||||||
|
@ -1746,8 +1737,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
|
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
|
||||||
// we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition
|
// we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition
|
||||||
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog"));
|
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog"));
|
||||||
config.setTranslogGeneration(translogGeneration);
|
try (Translog tlog = new Translog(config, translogGeneration)) {
|
||||||
try (Translog tlog = new Translog(config)) {
|
|
||||||
assertNotNull(translogGeneration);
|
assertNotNull(translogGeneration);
|
||||||
assertFalse(tlog.syncNeeded());
|
assertFalse(tlog.syncNeeded());
|
||||||
Translog.Snapshot snapshot = tlog.newSnapshot();
|
Translog.Snapshot snapshot = tlog.newSnapshot();
|
||||||
|
@ -1760,7 +1750,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Translog tlog = new Translog(config);
|
Translog tlog = new Translog(config, translogGeneration);
|
||||||
fail("file already exists?");
|
fail("file already exists?");
|
||||||
} catch (TranslogException ex) {
|
} catch (TranslogException ex) {
|
||||||
// all is well
|
// all is well
|
||||||
|
@ -1787,8 +1777,9 @@ public class TranslogTests extends ESTestCase {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
fail.onceFailedFailAlways();
|
fail.onceFailedFailAlways();
|
||||||
}
|
}
|
||||||
|
Translog.TranslogGeneration generation = null;
|
||||||
try {
|
try {
|
||||||
final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false);
|
final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false, generation);
|
||||||
try {
|
try {
|
||||||
LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly
|
LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly
|
||||||
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
|
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
|
||||||
|
@ -1822,7 +1813,7 @@ public class TranslogTests extends ESTestCase {
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
|
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
|
||||||
} finally {
|
} finally {
|
||||||
config.setTranslogGeneration(failableTLog.getGeneration());
|
generation = failableTLog.getGeneration();
|
||||||
IOUtils.closeWhileHandlingException(failableTLog);
|
IOUtils.closeWhileHandlingException(failableTLog);
|
||||||
}
|
}
|
||||||
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
|
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
|
||||||
|
@ -1831,13 +1822,13 @@ public class TranslogTests extends ESTestCase {
|
||||||
// now randomly open this failing tlog again just to make sure we can also recover from failing during recovery
|
// now randomly open this failing tlog again just to make sure we can also recover from failing during recovery
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
try {
|
try {
|
||||||
IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false));
|
IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generation));
|
||||||
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
|
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
|
||||||
// failed - that's ok, we didn't even create it
|
// failed - that's ok, we didn't even create it
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try (Translog translog = new Translog(config)) {
|
try (Translog translog = new Translog(config, generation)) {
|
||||||
Translog.Snapshot snapshot = translog.newSnapshot();
|
Translog.Snapshot snapshot = translog.newSnapshot();
|
||||||
assertEquals(syncedDocs.size(), snapshot.totalOperations());
|
assertEquals(syncedDocs.size(), snapshot.totalOperations());
|
||||||
for (int i = 0; i < syncedDocs.size(); i++) {
|
for (int i = 0; i < syncedDocs.size(); i++) {
|
||||||
|
|
Loading…
Reference in New Issue