Merge pull request #15788 from s1monw/dont_delete_tlog_file
Never delete translog-N.tlog file when creation fails
This commit is contained in:
commit
8a90c8085d
|
@ -163,6 +163,21 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
try {
|
||||
if (translogGeneration != null) {
|
||||
final Checkpoint checkpoint = readCheckpoint();
|
||||
final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1));
|
||||
final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
|
||||
// this is special handling for error condition when we create a new writer but we fail to bake
|
||||
// the newly written file (generation+1) into the checkpoint. This is still a valid state
|
||||
// we just need to cleanup before we continue
|
||||
// we hit this before and then blindly deleted the new generation even though we managed to bake it in and then hit this:
|
||||
// https://discuss.elastic.co/t/cannot-recover-index-because-of-missing-tanslog-files/38336 as an example
|
||||
//
|
||||
// For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that file exists
|
||||
// if not we don't even try to clean it up and wait until we fail creating it
|
||||
assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]";
|
||||
if (Files.exists(currentCheckpointFile) // current checkpoint is already copied
|
||||
&& Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning
|
||||
logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName());
|
||||
}
|
||||
this.recoveredTranslogs = recoverFromFiles(translogGeneration, checkpoint);
|
||||
if (recoveredTranslogs.isEmpty()) {
|
||||
throw new IllegalStateException("at least one reader must be recovered");
|
||||
|
|
|
@ -69,9 +69,17 @@ public class TranslogWriter extends TranslogReader {
|
|||
totalOffset = lastSyncedOffset;
|
||||
}
|
||||
|
||||
static int getHeaderLength(String translogUUID) {
|
||||
return getHeaderLength(new BytesRef(translogUUID).length);
|
||||
}
|
||||
|
||||
private static int getHeaderLength(int uuidLength) {
|
||||
return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + RamUsageEstimator.NUM_BYTES_INT;
|
||||
}
|
||||
|
||||
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException {
|
||||
final BytesRef ref = new BytesRef(translogUUID);
|
||||
final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT;
|
||||
final int headerLength = getHeaderLength(ref.length);
|
||||
final FileChannel channel = channelFactory.open(file);
|
||||
try {
|
||||
// This OutputStreamDataOutput is intentionally not closed because
|
||||
|
@ -80,17 +88,14 @@ public class TranslogWriter extends TranslogReader {
|
|||
CodecUtil.writeHeader(out, TRANSLOG_CODEC, VERSION);
|
||||
out.writeInt(ref.length);
|
||||
out.writeBytes(ref.bytes, ref.offset, ref.length);
|
||||
channel.force(false);
|
||||
channel.force(true);
|
||||
writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE);
|
||||
final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize);
|
||||
return writer;
|
||||
} catch (Throwable throwable){
|
||||
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
|
||||
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition
|
||||
IOUtils.closeWhileHandlingException(channel);
|
||||
try {
|
||||
Files.delete(file); // remove the file as well
|
||||
} catch (IOException ex) {
|
||||
throwable.addSuppressed(ex);
|
||||
}
|
||||
throw throwable;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,7 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.InvalidPathException;
|
||||
import java.nio.file.Path;
|
||||
|
@ -136,8 +137,8 @@ public class TranslogTests extends ESTestCase {
|
|||
|
||||
private TranslogConfig getTranslogConfig(Path path) {
|
||||
Settings build = Settings.settingsBuilder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
|
||||
.build();
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
|
||||
.build();
|
||||
ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
|
||||
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize);
|
||||
}
|
||||
|
@ -335,9 +336,9 @@ public class TranslogTests extends ESTestCase {
|
|||
assertEquals(6, copy.estimatedNumberOfOperations());
|
||||
assertEquals(431, copy.getTranslogSizeInBytes());
|
||||
assertEquals("\"translog\"{\n" +
|
||||
" \"operations\" : 6,\n" +
|
||||
" \"size_in_bytes\" : 431\n" +
|
||||
"}", copy.toString().trim());
|
||||
" \"operations\" : 6,\n" +
|
||||
" \"size_in_bytes\" : 431\n" +
|
||||
"}", copy.toString().trim());
|
||||
|
||||
try {
|
||||
new TranslogStats(1, -1);
|
||||
|
@ -634,7 +635,9 @@ public class TranslogTests extends ESTestCase {
|
|||
assertFileIsPresent(translog, 1);
|
||||
}
|
||||
|
||||
/** Tests that concurrent readers and writes maintain view and snapshot semantics */
|
||||
/**
|
||||
* Tests that concurrent readers and writes maintain view and snapshot semantics
|
||||
*/
|
||||
public void testConcurrentWriteViewsAndSnapshot() throws Throwable {
|
||||
final Thread[] writers = new Thread[randomIntBetween(1, 10)];
|
||||
final Thread[] readers = new Thread[randomIntBetween(1, 10)];
|
||||
|
@ -833,7 +836,7 @@ public class TranslogTests extends ESTestCase {
|
|||
int count = 0;
|
||||
for (int op = 0; op < translogOperations; op++) {
|
||||
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
|
||||
if (rarely() && translogOperations > op+1) {
|
||||
if (rarely() && translogOperations > op + 1) {
|
||||
translog.commit();
|
||||
}
|
||||
}
|
||||
|
@ -912,7 +915,7 @@ public class TranslogTests extends ESTestCase {
|
|||
final TranslogReader reader = randomBoolean() ? writer : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)));
|
||||
for (int i = 0; i < numOps; i++) {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(4);
|
||||
reader.readBytes(buffer, reader.getFirstOperationOffset() + 4*i);
|
||||
reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i);
|
||||
buffer.flip();
|
||||
final int value = buffer.getInt();
|
||||
assertEquals(i, value);
|
||||
|
@ -951,9 +954,9 @@ public class TranslogTests extends ESTestCase {
|
|||
for (int op = 0; op < translogOperations; op++) {
|
||||
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
||||
final boolean commit = commitOften ? frequently() : rarely();
|
||||
if (commit && op < translogOperations-1) {
|
||||
if (commit && op < translogOperations - 1) {
|
||||
translog.commit();
|
||||
minUncommittedOp = op+1;
|
||||
minUncommittedOp = op + 1;
|
||||
translogGeneration = translog.getGeneration();
|
||||
}
|
||||
}
|
||||
|
@ -987,7 +990,7 @@ public class TranslogTests extends ESTestCase {
|
|||
public void testRecoveryUncommitted() throws IOException {
|
||||
List<Translog.Location> locations = new ArrayList<>();
|
||||
int translogOperations = randomIntBetween(10, 100);
|
||||
final int prepareOp = randomIntBetween(0, translogOperations-1);
|
||||
final int prepareOp = randomIntBetween(0, translogOperations - 1);
|
||||
Translog.TranslogGeneration translogGeneration = null;
|
||||
final boolean sync = randomBoolean();
|
||||
for (int op = 0; op < translogOperations; op++) {
|
||||
|
@ -1040,7 +1043,7 @@ public class TranslogTests extends ESTestCase {
|
|||
public void testRecoveryUncommittedFileExists() throws IOException {
|
||||
List<Translog.Location> locations = new ArrayList<>();
|
||||
int translogOperations = randomIntBetween(10, 100);
|
||||
final int prepareOp = randomIntBetween(0, translogOperations-1);
|
||||
final int prepareOp = randomIntBetween(0, translogOperations - 1);
|
||||
Translog.TranslogGeneration translogGeneration = null;
|
||||
final boolean sync = randomBoolean();
|
||||
for (int op = 0; op < translogOperations; op++) {
|
||||
|
@ -1094,7 +1097,7 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testRecoveryUncommittedCorryptedCheckpoint() throws IOException {
|
||||
public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException {
|
||||
List<Translog.Location> locations = new ArrayList<>();
|
||||
int translogOperations = 100;
|
||||
final int prepareOp = 44;
|
||||
|
@ -1116,10 +1119,10 @@ public class TranslogTests extends ESTestCase {
|
|||
config.setTranslogGeneration(translogGeneration);
|
||||
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
||||
Checkpoint read = Checkpoint.read(ckp);
|
||||
Checkpoint corrupted = new Checkpoint(0,0,0);
|
||||
Checkpoint corrupted = new Checkpoint(0, 0, 0);
|
||||
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
||||
try (Translog translog = new Translog(config)) {
|
||||
fail("corrupted");
|
||||
fail("corrupted");
|
||||
} catch (IllegalStateException ex) {
|
||||
assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}");
|
||||
}
|
||||
|
@ -1157,7 +1160,7 @@ public class TranslogTests extends ESTestCase {
|
|||
List<Translog.Location> locations = new ArrayList<>();
|
||||
List<Translog.Location> locations2 = new ArrayList<>();
|
||||
int translogOperations = randomIntBetween(10, 100);
|
||||
try(Translog translog2 = create(createTempDir())) {
|
||||
try (Translog translog2 = create(createTempDir())) {
|
||||
for (int op = 0; op < translogOperations; op++) {
|
||||
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
||||
locations2.add(translog2.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
||||
|
@ -1196,7 +1199,7 @@ public class TranslogTests extends ESTestCase {
|
|||
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
|
||||
translog.close();
|
||||
|
||||
config.setTranslogGeneration(new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()),translogGeneration.translogFileGeneration));
|
||||
config.setTranslogGeneration(new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration));
|
||||
try {
|
||||
new Translog(config);
|
||||
fail("translog doesn't belong to this UUID");
|
||||
|
@ -1283,12 +1286,12 @@ public class TranslogTests extends ESTestCase {
|
|||
case CREATE:
|
||||
case INDEX:
|
||||
op = new Translog.Index("test", threadId + "_" + opCount,
|
||||
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
|
||||
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
|
||||
break;
|
||||
case DELETE:
|
||||
op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount),
|
||||
1 + randomInt(100000),
|
||||
randomFrom(VersionType.values()));
|
||||
1 + randomInt(100000),
|
||||
randomFrom(VersionType.values()));
|
||||
break;
|
||||
default:
|
||||
throw new ElasticsearchException("not supported op type");
|
||||
|
@ -1307,7 +1310,8 @@ public class TranslogTests extends ESTestCase {
|
|||
return translog.add(op);
|
||||
}
|
||||
|
||||
protected void afterAdd() throws IOException {}
|
||||
protected void afterAdd() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
public void testFailFlush() throws IOException {
|
||||
|
@ -1319,7 +1323,7 @@ public class TranslogTests extends ESTestCase {
|
|||
List<Translog.Location> locations = new ArrayList<>();
|
||||
int opsSynced = 0;
|
||||
boolean failed = false;
|
||||
while(failed == false) {
|
||||
while (failed == false) {
|
||||
try {
|
||||
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
|
||||
translog.sync();
|
||||
|
@ -1331,7 +1335,7 @@ public class TranslogTests extends ESTestCase {
|
|||
failed = true;
|
||||
assertFalse(translog.isOpen());
|
||||
assertEquals("__FAKE__ no space left on device", ex.getMessage());
|
||||
}
|
||||
}
|
||||
fail.set(randomBoolean());
|
||||
}
|
||||
fail.set(false);
|
||||
|
@ -1370,7 +1374,7 @@ public class TranslogTests extends ESTestCase {
|
|||
assertFalse(translog.isOpen());
|
||||
translog.close(); // we are closed
|
||||
config.setTranslogGeneration(translogGeneration);
|
||||
try (Translog tlog = new Translog(config)){
|
||||
try (Translog tlog = new Translog(config)) {
|
||||
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
|
||||
assertFalse(tlog.syncNeeded());
|
||||
|
||||
|
@ -1393,7 +1397,7 @@ public class TranslogTests extends ESTestCase {
|
|||
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
|
||||
locations.add(translog.add(new Translog.Index("test", "" + opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))));
|
||||
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
||||
assertEquals(opsAdded+1, snapshot.estimatedTotalOperations());
|
||||
assertEquals(opsAdded + 1, snapshot.estimatedTotalOperations());
|
||||
for (int i = 0; i < opsAdded; i++) {
|
||||
assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation);
|
||||
Translog.Operation next = snapshot.next();
|
||||
|
@ -1407,7 +1411,7 @@ public class TranslogTests extends ESTestCase {
|
|||
Path tempDir = createTempDir();
|
||||
final AtomicBoolean fail = new AtomicBoolean();
|
||||
TranslogConfig config = getTranslogConfig(tempDir);
|
||||
assumeFalse("this won't work if we sync on any op",config.isSyncOnEachOperation());
|
||||
assumeFalse("this won't work if we sync on any op", config.isSyncOnEachOperation());
|
||||
Translog translog = getFailableTranslog(fail, config, false, true);
|
||||
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
|
||||
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
|
||||
|
@ -1427,7 +1431,7 @@ public class TranslogTests extends ESTestCase {
|
|||
assertTrue(ex.getCause() instanceof UnknownException);
|
||||
}
|
||||
assertFalse(translog.isOpen());
|
||||
assertTrue(translog.getTragicException() instanceof UnknownException);
|
||||
assertTrue(translog.getTragicException() instanceof UnknownException);
|
||||
}
|
||||
|
||||
public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException {
|
||||
|
@ -1520,6 +1524,7 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException {
|
||||
return getFailableTranslog(fail, config, randomBoolean(), false);
|
||||
}
|
||||
|
@ -1613,4 +1618,97 @@ public class TranslogTests extends ESTestCase {
|
|||
// all is well
|
||||
}
|
||||
}
|
||||
|
||||
public void testRecoverWithUnbackedNextGen() throws IOException {
|
||||
translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
|
||||
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
|
||||
translog.close();
|
||||
TranslogConfig config = translog.getConfig();
|
||||
|
||||
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
||||
Checkpoint read = Checkpoint.read(ckp);
|
||||
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
|
||||
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
|
||||
config.setTranslogGeneration(translogGeneration);
|
||||
try (Translog tlog = new Translog(config)) {
|
||||
assertNotNull(translogGeneration);
|
||||
assertFalse(tlog.syncNeeded());
|
||||
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
|
||||
for (int i = 0; i < 1; i++) {
|
||||
Translog.Operation next = snapshot.next();
|
||||
assertNotNull("operation " + i + " must be non-null", next);
|
||||
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
|
||||
}
|
||||
}
|
||||
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
||||
}
|
||||
try (Translog tlog = new Translog(config)) {
|
||||
assertNotNull(translogGeneration);
|
||||
assertFalse(tlog.syncNeeded());
|
||||
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
|
||||
for (int i = 0; i < 2; i++) {
|
||||
Translog.Operation next = snapshot.next();
|
||||
assertNotNull("operation " + i + " must be non-null", next);
|
||||
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException {
|
||||
translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
|
||||
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
|
||||
translog.close();
|
||||
TranslogConfig config = translog.getConfig();
|
||||
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
||||
Checkpoint read = Checkpoint.read(ckp);
|
||||
// don't copy the new file
|
||||
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
|
||||
config.setTranslogGeneration(translogGeneration);
|
||||
|
||||
try {
|
||||
Translog tlog = new Translog(config);
|
||||
fail("file already exists?");
|
||||
} catch (TranslogException ex) {
|
||||
// all is well
|
||||
assertEquals(ex.getMessage(), "failed to create new translog file");
|
||||
assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class);
|
||||
}
|
||||
}
|
||||
public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException {
|
||||
translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
|
||||
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
|
||||
translog.close();
|
||||
TranslogConfig config = translog.getConfig();
|
||||
|
||||
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
||||
Checkpoint read = Checkpoint.read(ckp);
|
||||
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
|
||||
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
|
||||
// we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition
|
||||
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog"));
|
||||
config.setTranslogGeneration(translogGeneration);
|
||||
try (Translog tlog = new Translog(config)) {
|
||||
assertNotNull(translogGeneration);
|
||||
assertFalse(tlog.syncNeeded());
|
||||
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
|
||||
for (int i = 0; i < 1; i++) {
|
||||
Translog.Operation next = snapshot.next();
|
||||
assertNotNull("operation " + i + " must be non-null", next);
|
||||
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
|
||||
}
|
||||
}
|
||||
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
|
||||
}
|
||||
|
||||
try {
|
||||
Translog tlog = new Translog(config);
|
||||
fail("file already exists?");
|
||||
} catch (TranslogException ex) {
|
||||
// all is well
|
||||
assertEquals(ex.getMessage(), "failed to create new translog file");
|
||||
assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue