apply review comments
This commit is contained in:
parent
91cfba3485
commit
08d7638ed1
|
@ -200,7 +200,7 @@ public final class EngineConfig {
|
|||
}
|
||||
|
||||
/** if true the engine will start even if the translog id in the commit point can not be found */
|
||||
public boolean forceNewTranlog() {
|
||||
public boolean forceNewTranslog() {
|
||||
return forceNewTranslog;
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.lucene.util.BytesRef;
|
|||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.InfoStream;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.routing.DjbHashFunction;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
|
@ -134,7 +133,7 @@ public class InternalEngine extends Engine {
|
|||
try {
|
||||
writer = createWriter();
|
||||
indexWriter = writer;
|
||||
translog = openTranslog(engineConfig, writer, skipInitialTranslogRecovery || engineConfig.forceNewTranlog());
|
||||
translog = openTranslog(engineConfig, writer, skipInitialTranslogRecovery || engineConfig.forceNewTranslog());
|
||||
translogGeneration = translog.getGeneration();
|
||||
assert translogGeneration != null;
|
||||
} catch (IOException | TranslogCorruptedException e) {
|
||||
|
@ -151,7 +150,7 @@ public class InternalEngine extends Engine {
|
|||
try {
|
||||
if (skipInitialTranslogRecovery) {
|
||||
// make sure we point at the latest translog from now on..
|
||||
commitIndexWriter(writer, translog.getGeneration());
|
||||
commitIndexWriter(writer, translog);
|
||||
} else {
|
||||
recoverFromTranslog(engineConfig, translogGeneration);
|
||||
}
|
||||
|
@ -174,22 +173,21 @@ public class InternalEngine extends Engine {
|
|||
|
||||
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, boolean createNew) throws IOException {
|
||||
final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer);
|
||||
Translog translog;
|
||||
TranslogConfig translogConfig = engineConfig.getTranslogConfig();
|
||||
if (createNew) {
|
||||
translog = new Translog(translogConfig);
|
||||
} else {
|
||||
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
|
||||
if (createNew == false) {
|
||||
translogConfig.setTranslogGeneration(generation);
|
||||
if (generation != null && generation.translogUUID == null) {
|
||||
// only upgrade on pre-2.0 indices...
|
||||
Translog.upgradeLegacyTranslog(logger, translogConfig);
|
||||
}
|
||||
translog = new Translog(translogConfig);
|
||||
}
|
||||
|
||||
final Translog translog = new Translog(translogConfig);
|
||||
if (generation == null) {
|
||||
if (createNew) {
|
||||
throw new IllegalStateException("no tranlog generation present in commit data but tranlog is expected to exists");
|
||||
}
|
||||
logger.debug("no translog ID present in the current generation - creating one");
|
||||
commitIndexWriter(writer, translog.getGeneration());
|
||||
commitIndexWriter(writer, translog);
|
||||
}
|
||||
return translog;
|
||||
}
|
||||
|
@ -244,7 +242,9 @@ public class InternalEngine extends Engine {
|
|||
assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false : "legacy commit contains translog UUID";
|
||||
return new Translog.TranslogGeneration(null, Long.parseLong(commitUserData.get("translog_id")));
|
||||
} else if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY)) {
|
||||
assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) : "commit doesn't contain translog UUID";
|
||||
if (commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false) {
|
||||
throw new IllegalStateException("commit doesn't contain translog UUID");
|
||||
}
|
||||
final String translogUUID = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
|
||||
final long translogGen = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY));
|
||||
return new Translog.TranslogGeneration(translogUUID, translogGen);
|
||||
|
@ -702,12 +702,10 @@ public class InternalEngine extends Engine {
|
|||
if (commitTranslog) {
|
||||
if (flushNeeded || force) {
|
||||
flushNeeded = false;
|
||||
final Translog.TranslogGeneration translogGeneration;
|
||||
try {
|
||||
translog.prepareCommit();
|
||||
translogGeneration = translog.getGeneration();
|
||||
logger.trace("starting commit for flush; commitTranslog=true");
|
||||
commitIndexWriter(indexWriter, translogGeneration);
|
||||
commitIndexWriter(indexWriter, translog);
|
||||
logger.trace("finished commit for flush");
|
||||
translog.commit();
|
||||
// we need to refresh in order to clear older version values
|
||||
|
@ -726,7 +724,7 @@ public class InternalEngine extends Engine {
|
|||
// other flushes use flushLock
|
||||
try {
|
||||
logger.trace("starting commit for flush; commitTranslog=false");
|
||||
commitIndexWriter(indexWriter, translog.getGeneration());
|
||||
commitIndexWriter(indexWriter, translog);
|
||||
logger.trace("finished commit for flush");
|
||||
} catch (Throwable e) {
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
|
@ -1176,12 +1174,13 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
|
||||
private void commitIndexWriter(IndexWriter writer, Translog.TranslogGeneration commit) throws IOException {
|
||||
private void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException {
|
||||
try {
|
||||
logger.trace("committing writer with translog id [{}] ", commit.translogFileGeneration);
|
||||
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
|
||||
logger.trace("committing writer with translog id [{}] ", translogGeneration.translogFileGeneration);
|
||||
Map<String, String> commitData = new HashMap<>(2);
|
||||
commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(commit.translogFileGeneration));
|
||||
commitData.put(Translog.TRANSLOG_UUID_KEY, commit.translogUUID);
|
||||
commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
|
||||
commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);
|
||||
indexWriter.setCommitData(commitData);
|
||||
writer.commit();
|
||||
} catch (Throwable ex) {
|
||||
|
|
|
@ -380,6 +380,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
if (currentCommittingTranslog != null) {
|
||||
int tops = currentCommittingTranslog.totalOperations();
|
||||
assert tops != TranslogReader.UNKNOWN_OP_COUNT;
|
||||
assert tops >= 0;
|
||||
ops += tops;
|
||||
}
|
||||
}
|
||||
|
@ -499,7 +500,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
* while receiving future ones as well
|
||||
*/
|
||||
public Translog.View newView() {
|
||||
// we need to acquire the read lock to make sure new translog is created
|
||||
// we need to acquire the read lock to make sure no new translog is created
|
||||
// and will be missed by the view we're making
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
ArrayList<TranslogReader> translogs = new ArrayList<>();
|
||||
|
@ -571,7 +572,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isReferencedGeneration(long generation) { // pkg private for testing
|
||||
private boolean isReferencedGeneration(long generation) { // used to make decisions if a file can be deleted
|
||||
return generation >= lastCommittedTranslogFileGeneration;
|
||||
}
|
||||
|
||||
|
@ -662,6 +663,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
if (tops == TranslogReader.UNKNOWN_OP_COUNT) {
|
||||
return -1;
|
||||
}
|
||||
assert tops >= 0;
|
||||
ops += tops;
|
||||
}
|
||||
return ops;
|
||||
|
@ -812,7 +814,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
/**
|
||||
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
|
||||
*/
|
||||
public Translog.Operation next() throws IOException;
|
||||
Translog.Operation next() throws IOException;
|
||||
|
||||
}
|
||||
|
||||
|
@ -1653,8 +1655,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
if (currentCommittingTranslog != null) {
|
||||
throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration());
|
||||
}
|
||||
final TranslogWriter writer = current;
|
||||
writer.sync();
|
||||
final TranslogWriter oldCurrent = current;
|
||||
oldCurrent.sync();
|
||||
currentCommittingTranslog = current.immutableReader();
|
||||
Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
|
||||
assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration();
|
||||
|
@ -1663,16 +1665,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
IOUtils.fsync(commitCheckpoint, false);
|
||||
IOUtils.fsync(commitCheckpoint.getParent(), true);
|
||||
// create a new translog file - this will sync it and update the checkpoint data;
|
||||
final TranslogWriter newFile = createWriter(current.getGeneration() + 1);
|
||||
current = newFile;
|
||||
current = createWriter(current.getGeneration() + 1);
|
||||
// notify all outstanding views of the new translog (no views are created now as
|
||||
// we hold a write lock).
|
||||
for (FsView view : outstandingViews) {
|
||||
view.onNewTranslog(currentCommittingTranslog.clone(), newFile.newReaderFromWriter());
|
||||
view.onNewTranslog(currentCommittingTranslog.clone(), current.newReaderFromWriter());
|
||||
}
|
||||
IOUtils.close(writer);
|
||||
IOUtils.close(oldCurrent);
|
||||
logger.trace("current translog set to [{}]", current.getGeneration());
|
||||
assert writer.syncNeeded() == false : "old translog writer must not need a sync";
|
||||
assert oldCurrent.syncNeeded() == false : "old translog oldCurrent must not need a sync";
|
||||
|
||||
} catch (Throwable t) {
|
||||
IOUtils.closeWhileHandlingException(this); // tragic event
|
||||
|
@ -1688,7 +1689,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
if (currentCommittingTranslog == null) {
|
||||
prepareCommit();
|
||||
}
|
||||
current.sync();
|
||||
lastCommittedTranslogFileGeneration = current.getGeneration(); // this is important - otherwise old files will not be cleaned up
|
||||
if (recoveredTranslogs.isEmpty() == false) {
|
||||
IOUtils.close(recoveredTranslogs);
|
||||
|
|
Loading…
Reference in New Issue