Small fixes:
- rename one method - move "generation == null" check under existing "if (createNew == false)" - fix typo/whitespace - add a TODO
This commit is contained in:
parent
50c771be29
commit
a42d92df21
|
@ -131,6 +131,8 @@ public class InternalEngine extends Engine {
|
||||||
this.searcherFactory = new SearchFactory(engineConfig);
|
this.searcherFactory = new SearchFactory(engineConfig);
|
||||||
final Translog.TranslogGeneration translogGeneration;
|
final Translog.TranslogGeneration translogGeneration;
|
||||||
try {
|
try {
|
||||||
|
// TODO: would be better if ES could tell us "from above" whether this shard was already here, instead of using Lucene's API
|
||||||
|
// (which relies on IO ops, directory listing, and has had scary bugs in the past):
|
||||||
boolean create = !Lucene.indexExists(store.directory());
|
boolean create = !Lucene.indexExists(store.directory());
|
||||||
writer = createWriter(create);
|
writer = createWriter(create);
|
||||||
indexWriter = writer;
|
indexWriter = writer;
|
||||||
|
@ -175,7 +177,12 @@ public class InternalEngine extends Engine {
|
||||||
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, boolean createNew) throws IOException {
|
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, boolean createNew) throws IOException {
|
||||||
final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer);
|
final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer);
|
||||||
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
|
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
|
||||||
|
|
||||||
if (createNew == false) {
|
if (createNew == false) {
|
||||||
|
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
|
||||||
|
if (generation == null) {
|
||||||
|
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
|
||||||
|
}
|
||||||
translogConfig.setTranslogGeneration(generation);
|
translogConfig.setTranslogGeneration(generation);
|
||||||
if (generation != null && generation.translogUUID == null) {
|
if (generation != null && generation.translogUUID == null) {
|
||||||
// only upgrade on pre-2.0 indices...
|
// only upgrade on pre-2.0 indices...
|
||||||
|
@ -184,9 +191,6 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
final Translog translog = new Translog(translogConfig);
|
final Translog translog = new Translog(translogConfig);
|
||||||
if (generation == null) {
|
if (generation == null) {
|
||||||
if (createNew == false) {
|
|
||||||
throw new IllegalStateException("no tranlog generation present in commit data but translog is expected to exist");
|
|
||||||
}
|
|
||||||
logger.debug("no translog ID present in the current generation - creating one");
|
logger.debug("no translog ID present in the current generation - creating one");
|
||||||
commitIndexWriter(writer, translog);
|
commitIndexWriter(writer, translog);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
* specific language governing permissions and limitations
|
* specific language governing permissions and limitations
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.index.engine;
|
package org.elasticsearch.index.engine;
|
||||||
|
|
||||||
public class InternalEngineFactory implements EngineFactory {
|
public class InternalEngineFactory implements EngineFactory {
|
||||||
|
|
|
@ -251,12 +251,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
Checkpoint.write(translogPath.resolve(CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
Checkpoint.write(translogPath.resolve(CHECKPOINT_FILE_NAME), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
||||||
} else {
|
} else {
|
||||||
Checkpoint checkpoint = new Checkpoint(Files.size(target), -1, generation);
|
Checkpoint checkpoint = new Checkpoint(Files.size(target), -1, generation);
|
||||||
Checkpoint.write(translogPath.resolve(getCommitFileName(generation)), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
Checkpoint.write(translogPath.resolve(getCommitCheckpointFileName(generation)), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
IOUtils.fsync(translogPath, true);
|
IOUtils.fsync(translogPath, true);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -275,12 +273,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
if (Files.exists(committedTranslogFile) == false) {
|
if (Files.exists(committedTranslogFile) == false) {
|
||||||
throw new IllegalStateException("translog file doesn't exist with generation: " + i + " lastCommitted: " + lastCommittedTranslogFileGeneration + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
|
throw new IllegalStateException("translog file doesn't exist with generation: " + i + " lastCommitted: " + lastCommittedTranslogFileGeneration + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
|
||||||
}
|
}
|
||||||
final ImmutableTranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitFileName(i))));
|
final ImmutableTranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i))));
|
||||||
foundTranslogs.add(reader);
|
foundTranslogs.add(reader);
|
||||||
logger.debug("recovered local translog from checkpoint {}", checkpoint);
|
logger.debug("recovered local translog from checkpoint {}", checkpoint);
|
||||||
}
|
}
|
||||||
foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint));
|
foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint));
|
||||||
Path commitCheckpoint = location.resolve(getCommitFileName(checkpoint.generation));
|
Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
|
||||||
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), commitCheckpoint);
|
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), commitCheckpoint);
|
||||||
IOUtils.fsync(commitCheckpoint, false);
|
IOUtils.fsync(commitCheckpoint, false);
|
||||||
IOUtils.fsync(commitCheckpoint.getParent(), true);
|
IOUtils.fsync(commitCheckpoint.getParent(), true);
|
||||||
|
@ -544,7 +542,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
return TRANSLOG_FILE_PREFIX + generation + TRANSLOG_FILE_SUFFIX;
|
return TRANSLOG_FILE_PREFIX + generation + TRANSLOG_FILE_SUFFIX;
|
||||||
}
|
}
|
||||||
|
|
||||||
static String getCommitFileName(long generation) {
|
static String getCommitCheckpointFileName(long generation) {
|
||||||
return TRANSLOG_FILE_PREFIX + generation + CHECKPOINT_SUFFIX;
|
return TRANSLOG_FILE_PREFIX + generation + CHECKPOINT_SUFFIX;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -591,7 +589,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
// if the given translogPath is not the current we can safely delete the file since all references are released
|
// if the given translogPath is not the current we can safely delete the file since all references are released
|
||||||
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
|
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
|
||||||
IOUtils.deleteFilesIgnoringExceptions(translogPath);
|
IOUtils.deleteFilesIgnoringExceptions(translogPath);
|
||||||
IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitFileName(channelReference.getGeneration())));
|
IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));
|
||||||
|
|
||||||
}
|
}
|
||||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(location)) {
|
try (DirectoryStream<Path> stream = Files.newDirectoryStream(location)) {
|
||||||
|
@ -602,7 +600,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
if (isReferencedGeneration(generation) == false) {
|
if (isReferencedGeneration(generation) == false) {
|
||||||
logger.trace("delete translog file - not referenced and not current anymore {}", path);
|
logger.trace("delete translog file - not referenced and not current anymore {}", path);
|
||||||
IOUtils.deleteFilesIgnoringExceptions(path);
|
IOUtils.deleteFilesIgnoringExceptions(path);
|
||||||
IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitFileName(channelReference.getGeneration())));
|
IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1660,7 +1658,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
currentCommittingTranslog = current.immutableReader();
|
currentCommittingTranslog = current.immutableReader();
|
||||||
Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
|
Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
|
||||||
assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration();
|
assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration();
|
||||||
Path commitCheckpoint = location.resolve(getCommitFileName(currentCommittingTranslog.getGeneration()));
|
Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(currentCommittingTranslog.getGeneration()));
|
||||||
Files.copy(checkpoint, commitCheckpoint);
|
Files.copy(checkpoint, commitCheckpoint);
|
||||||
IOUtils.fsync(commitCheckpoint, false);
|
IOUtils.fsync(commitCheckpoint, false);
|
||||||
IOUtils.fsync(commitCheckpoint.getParent(), true);
|
IOUtils.fsync(commitCheckpoint.getParent(), true);
|
||||||
|
|
Loading…
Reference in New Issue