Add global checkpoint to translog checkpoints

This commit adds the sequence number global checkpoint to translog
checkpoints, and removes them from Lucene commits.

Relates #21254
This commit is contained in:
Jason Tedor 2016-11-08 10:09:06 -05:00 committed by GitHub
parent eaa105951f
commit 9ceb0f2cb4
14 changed files with 393 additions and 153 deletions

View File

@ -46,6 +46,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
@ -83,6 +84,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.LongSupplier;
public class InternalEngine extends Engine { public class InternalEngine extends Engine {
@ -116,7 +118,6 @@ public class InternalEngine extends Engine {
private final SequenceNumbersService seqNoService; private final SequenceNumbersService seqNoService;
static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint";
static final String MAX_SEQ_NO = "max_seq_no"; static final String MAX_SEQ_NO = "max_seq_no";
// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
@ -153,7 +154,7 @@ public class InternalEngine extends Engine {
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
try { try {
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG); writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
final SeqNoStats seqNoStats = loadSeqNoStatsFromCommit(writer); final SeqNoStats seqNoStats = loadSeqNoStats(engineConfig, writer);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace( logger.trace(
"recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]", "recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]",
@ -169,7 +170,7 @@ public class InternalEngine extends Engine {
seqNoStats.getLocalCheckpoint(), seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint()); seqNoStats.getGlobalCheckpoint());
indexWriter = writer; indexWriter = writer;
translog = openTranslog(engineConfig, writer); translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint);
assert translog.getGeneration() != null; assert translog.getGeneration() != null;
} catch (IOException | TranslogCorruptedException e) { } catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e); throw new EngineCreationFailureException(shardId, "failed to create engine", e);
@ -257,7 +258,8 @@ public class InternalEngine extends Engine {
} }
} }
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException { private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, LongSupplier globalCheckpointSupplier) throws IOException {
assert openMode != null;
final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
Translog.TranslogGeneration generation = null; Translog.TranslogGeneration generation = null;
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
@ -266,11 +268,11 @@ public class InternalEngine extends Engine {
if (generation == null) { if (generation == null) {
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist"); throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
} }
if (generation != null && generation.translogUUID == null) { if (generation.translogUUID == null) {
throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first"); throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
} }
} }
final Translog translog = new Translog(translogConfig, generation); final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier);
if (generation == null || generation.translogUUID == null) { if (generation == null || generation.translogUUID == null) {
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be " assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
+ EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
@ -322,21 +324,37 @@ public class InternalEngine extends Engine {
return null; return null;
} }
private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException { /**
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the Translog
* checkpoint (global checkpoint).
*
* @param engineConfig the engine configuration (for the open mode and the translog path)
* @param writer the index writer (for the Lucene commit point)
* @return the sequence number stats
* @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint
*/
private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final IndexWriter writer) throws IOException {
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) { for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
final String key = entry.getKey(); final String key = entry.getKey();
if (key.equals(LOCAL_CHECKPOINT_KEY)) { if (key.equals(LOCAL_CHECKPOINT_KEY)) {
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED;
localCheckpoint = Long.parseLong(entry.getValue()); localCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(GLOBAL_CHECKPOINT_KEY)) {
globalCheckpoint = Long.parseLong(entry.getValue());
} else if (key.equals(MAX_SEQ_NO)) { } else if (key.equals(MAX_SEQ_NO)) {
assert maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : localCheckpoint;
maxSeqNo = Long.parseLong(entry.getValue()); maxSeqNo = Long.parseLong(entry.getValue());
} }
} }
// nocommit: reading this should be part of recovery from the translog
final long globalCheckpoint;
if (engineConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
} else {
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
} }
@ -1312,25 +1330,21 @@ public class InternalEngine extends Engine {
final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration); final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID; final String translogUUID = translogGeneration.translogUUID;
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint()); final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
final String globalCheckpoint = Long.toString(seqNoService().getGlobalCheckpoint());
writer.setLiveCommitData(() -> { writer.setLiveCommitData(() -> {
/** /*
* The user data captured above (e.g. local/global checkpoints) contains data that must be evaluated * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
* *before* Lucene flushes segments, including the local and global checkpoints amongst other values. * segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want
* The maximum sequence number is different - we never want the maximum sequence number to be * the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the
* less than the last sequence number to go into a Lucene commit, otherwise we run the risk * risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently
* of re-using a sequence number for two different documents when restoring from this commit * writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
* point and subsequently writing new documents to the index. Since we only know which Lucene * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation
* documents made it into the final commit after the {@link IndexWriter#commit()} call flushes * of the commit data iterator (which occurs after all documents have been flushed to Lucene).
* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
* data iterator (which occurs after all documents have been flushed to Lucene).
*/ */
final Map<String, String> commitData = new HashMap<>(6); final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen); commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint); commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(GLOBAL_CHECKPOINT_KEY, globalCheckpoint);
if (syncId != null) { if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId); commitData.put(Engine.SYNC_COMMIT_ID, syncId);
} }

View File

@ -41,6 +41,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
public class GlobalCheckpointSyncAction extends TransportReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest, public class GlobalCheckpointSyncAction extends TransportReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest,
GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> { GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> {
@ -64,10 +66,11 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<Globa
} }
@Override @Override
protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request) { protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request) throws Exception {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id()); IndexShard indexShard = indexService.getShard(request.shardId().id());
long checkpoint = indexShard.getGlobalCheckpoint(); long checkpoint = indexShard.getGlobalCheckpoint();
syncTranslog(indexShard);
return new PrimaryResult(new ReplicaRequest(request, checkpoint), new ReplicationResponse()); return new PrimaryResult(new ReplicaRequest(request, checkpoint), new ReplicationResponse());
} }
@ -76,9 +79,19 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<Globa
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id()); IndexShard indexShard = indexService.getShard(request.shardId().id());
indexShard.updateGlobalCheckpointOnReplica(request.checkpoint); indexShard.updateGlobalCheckpointOnReplica(request.checkpoint);
syncTranslog(indexShard);
return new ReplicaResult(); return new ReplicaResult();
} }
private void syncTranslog(final IndexShard indexShard) {
try {
indexShard.getTranslog().sync();
} catch (final IOException e) {
// nocommit: no need to wrap this exception after integrating master into feature/seq_no
throw new UncheckedIOException("failed to sync translog after updating global checkpoint for shard " + indexShard.shardId(), e);
}
}
public void updateCheckpointForShard(ShardId shardId) { public void updateCheckpointForShard(ShardId shardId) {
execute(new PrimaryRequest(shardId), new ActionListener<ReplicationResponse>() { execute(new PrimaryRequest(shardId), new ActionListener<ReplicationResponse>() {
@Override @Override
@ -135,4 +148,5 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<Globa
return checkpoint; return checkpoint;
} }
} }
} }

View File

@ -16,6 +16,7 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.elasticsearch.index.seqno; package org.elasticsearch.index.seqno;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -88,4 +89,5 @@ public class SeqNoStats implements ToXContent, Writeable {
", globalCheckpoint=" + globalCheckpoint + ", globalCheckpoint=" + globalCheckpoint +
'}'; '}';
} }
} }

View File

@ -40,26 +40,16 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
final GlobalCheckpointService globalCheckpointService; final GlobalCheckpointService globalCheckpointService;
/** /**
* Initialize the sequence number service. The {@code maxSeqNo} * Initialize the sequence number service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
* should be set to the last sequence number assigned by this * {@link SequenceNumbersService#NO_OPS_PERFORMED}, {@code localCheckpoint} should be set to the last known local checkpoint for this
* shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, * shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}, and {@code globalCheckpoint} should be set to the last known global
* {@code localCheckpoint} should be set to the last known local * checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
* checkpoint for this shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED}, and
* {@code globalCheckpoint} should be set to the last known global
* checkpoint for this shard, or
* {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}.
* *
* @param shardId the shard this service is providing tracking * @param shardId the shard this service is providing tracking local checkpoints for
* local checkpoints for
* @param indexSettings the index settings * @param indexSettings the index settings
* @param maxSeqNo the last sequence number assigned by this * @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* shard, or * @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* {@link SequenceNumbersService#NO_OPS_PERFORMED} * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
* @param localCheckpoint the last known local checkpoint for this shard,
* or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param globalCheckpoint the last known global checkpoint for this shard,
* or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
*/ */
public SequenceNumbersService( public SequenceNumbersService(
final ShardId shardId, final ShardId shardId,
@ -100,8 +90,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
* Gets sequence number related stats * Gets sequence number related stats
*/ */
public SeqNoStats stats() { public SeqNoStats stats() {
return new SeqNoStats(localCheckpointService.getMaxSeqNo(), localCheckpointService.getCheckpoint(), return new SeqNoStats(getMaxSeqNo(), getLocalCheckpoint(), getGlobalCheckpoint());
globalCheckpointService.getCheckpoint());
} }
/** /**
@ -130,6 +119,16 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
return globalCheckpointService.getCheckpoint(); return globalCheckpointService.getCheckpoint();
} }
/**
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
return globalCheckpointService.updateCheckpointOnPrimary();
}
/** /**
* updates the global checkpoint on a replica shard (after it has been updated by the primary). * updates the global checkpoint on a replica shard (after it has been updated by the primary).
*/ */
@ -148,13 +147,4 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
globalCheckpointService.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds); globalCheckpointService.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
} }
/**
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
*
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
* of one of the active allocations is not known.
*/
public boolean updateGlobalCheckpointOnPrimary() {
return globalCheckpointService.updateCheckpointOnPrimary();
}
} }

View File

@ -27,6 +27,7 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.common.io.Channels; import org.elasticsearch.common.io.Channels;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -41,41 +42,60 @@ class Checkpoint {
final long offset; final long offset;
final int numOps; final int numOps;
final long generation; final long generation;
final long globalCheckpoint;
private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before
private static final int CURRENT_VERSION = 2; // introduction of global checkpoints
private static final String CHECKPOINT_CODEC = "ckp"; private static final String CHECKPOINT_CODEC = "ckp";
// size of 6.0.0 checkpoint
static final int FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC) static final int FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC)
+ Integer.BYTES // ops
+ Long.BYTES // offset
+ Long.BYTES // generation
+ Long.BYTES // global checkpoint, introduced in 6.0.0
+ CodecUtil.footerLength();
// size of 5.0.0 checkpoint
static final int V1_FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC)
+ Integer.BYTES // ops + Integer.BYTES // ops
+ Long.BYTES // offset + Long.BYTES // offset
+ Long.BYTES // generation + Long.BYTES // generation
+ CodecUtil.footerLength(); + CodecUtil.footerLength();
// nocommit: remove legacy support, not needed in 6.0.0
static final int LEGACY_NON_CHECKSUMMED_FILE_LENGTH = Integer.BYTES // ops static final int LEGACY_NON_CHECKSUMMED_FILE_LENGTH = Integer.BYTES // ops
+ Long.BYTES // offset + Long.BYTES // offset
+ Long.BYTES; // generation + Long.BYTES; // generation
Checkpoint(long offset, int numOps, long generation) { Checkpoint(long offset, int numOps, long generation, long globalCheckpoint) {
this.offset = offset; this.offset = offset;
this.numOps = numOps; this.numOps = numOps;
this.generation = generation; this.generation = generation;
this.globalCheckpoint = globalCheckpoint;
} }
private void write(DataOutput out) throws IOException { private void write(DataOutput out) throws IOException {
out.writeLong(offset); out.writeLong(offset);
out.writeInt(numOps); out.writeInt(numOps);
out.writeLong(generation); out.writeLong(generation);
out.writeLong(globalCheckpoint);
}
static Checkpoint readChecksummedV2(DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong());
} }
// reads a checksummed checkpoint introduced in ES 5.0.0 // reads a checksummed checkpoint introduced in ES 5.0.0
static Checkpoint readChecksummedV1(DataInput in) throws IOException { static Checkpoint readChecksummedV1(DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong()); return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
} }
// nocommit: remove legacy support, not needed in 6.0.0
// reads checkpoint from ES < 5.0.0 // reads checkpoint from ES < 5.0.0
static Checkpoint readNonChecksummed(DataInput in) throws IOException { static Checkpoint readNonChecksummed(DataInput in) throws IOException {
return new Checkpoint(in.readLong(), in.readInt(), in.readLong()); return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
} }
@Override @Override
@ -84,6 +104,7 @@ class Checkpoint {
"offset=" + offset + "offset=" + offset +
", numOps=" + numOps + ", numOps=" + numOps +
", translogFileGeneration=" + generation + ", translogFileGeneration=" + generation +
", globalCheckpoint=" + globalCheckpoint +
'}'; '}';
} }
@ -93,11 +114,19 @@ class Checkpoint {
if (indexInput.length() == LEGACY_NON_CHECKSUMMED_FILE_LENGTH) { if (indexInput.length() == LEGACY_NON_CHECKSUMMED_FILE_LENGTH) {
// OLD unchecksummed file that was written < ES 5.0.0 // OLD unchecksummed file that was written < ES 5.0.0
return Checkpoint.readNonChecksummed(indexInput); return Checkpoint.readNonChecksummed(indexInput);
} } else {
// We checksum the entire file before we even go and parse it. If it's corrupted we barf right here. // We checksum the entire file before we even go and parse it. If it's corrupted we barf right here.
CodecUtil.checksumEntireFile(indexInput); CodecUtil.checksumEntireFile(indexInput);
final int fileVersion = CodecUtil.checkHeader(indexInput, CHECKPOINT_CODEC, INITIAL_VERSION, INITIAL_VERSION); final int fileVersion = CodecUtil.checkHeader(indexInput, CHECKPOINT_CODEC, INITIAL_VERSION, CURRENT_VERSION);
if (fileVersion == INITIAL_VERSION) {
assert indexInput.length() == V1_FILE_SIZE;
return Checkpoint.readChecksummedV1(indexInput); return Checkpoint.readChecksummedV1(indexInput);
} else {
assert fileVersion == CURRENT_VERSION;
assert indexInput.length() == FILE_SIZE;
return Checkpoint.readChecksummedV2(indexInput);
}
}
} }
} }
} }
@ -113,14 +142,14 @@ class Checkpoint {
final String resourceDesc = "checkpoint(path=\"" + checkpointFile + "\", gen=" + checkpoint + ")"; final String resourceDesc = "checkpoint(path=\"" + checkpointFile + "\", gen=" + checkpoint + ")";
try (final OutputStreamIndexOutput indexOutput = try (final OutputStreamIndexOutput indexOutput =
new OutputStreamIndexOutput(resourceDesc, checkpointFile.toString(), byteOutputStream, FILE_SIZE)) { new OutputStreamIndexOutput(resourceDesc, checkpointFile.toString(), byteOutputStream, FILE_SIZE)) {
CodecUtil.writeHeader(indexOutput, CHECKPOINT_CODEC, INITIAL_VERSION); CodecUtil.writeHeader(indexOutput, CHECKPOINT_CODEC, CURRENT_VERSION);
checkpoint.write(indexOutput); checkpoint.write(indexOutput);
CodecUtil.writeFooter(indexOutput); CodecUtil.writeFooter(indexOutput);
assert indexOutput.getFilePointer() == FILE_SIZE : assert indexOutput.getFilePointer() == FILE_SIZE :
"get you number straights. Bytes written: " + indexOutput.getFilePointer() + " buffer size: " + FILE_SIZE; "get you numbers straight; bytes written: " + indexOutput.getFilePointer() + ", buffer size: " + FILE_SIZE;
assert indexOutput.getFilePointer() < 512 : assert indexOutput.getFilePointer() < 512 :
"checkpoint files have to be smaller 512b for atomic writes. size: " + indexOutput.getFilePointer(); "checkpoint files have to be smaller than 512 bytes for atomic writes; size: " + indexOutput.getFilePointer();
} }
// now go and write to the channel, in one go. // now go and write to the channel, in one go.

View File

@ -60,6 +60,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.LongSupplier;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -124,25 +125,27 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private volatile long lastCommittedTranslogFileGeneration = NOT_SET_GENERATION; private volatile long lastCommittedTranslogFileGeneration = NOT_SET_GENERATION;
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
private final TranslogConfig config; private final TranslogConfig config;
private final LongSupplier globalCheckpointSupplier;
private final String translogUUID; private final String translogUUID;
/** /**
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogConfig} has * Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is
* a non-null {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}. If the generation is null this method * {@code null}. If the generation is {@code null} this method is destructive and will delete all files in the translog path given. If
* us destructive and will delete all files in the translog path given. * the generation is not {@code null}, this method tries to open the given translog generation. The generation is treated as the last
* generation referenced from already committed data. This means all operations that have not yet been committed should be in the
* translog file referenced by this generation. The translog creation will fail if this generation can't be opened.
* *
* @param config the configuration of this translog * @param config the configuration of this translog
* @param translogGeneration the translog generation to open. If this is <code>null</code> a new translog is created. If non-null * @param translogGeneration the translog generation to open
* the translog tries to open the given translog generation. The generation is treated as the last generation referenced * @param globalCheckpointSupplier a supplier for the global checkpoint
* form already committed data. This means all operations that have not yet been committed should be in the translog
* file referenced by this generation. The translog creation will fail if this generation can't be opened.
*
* @see TranslogConfig#getTranslogPath()
*
*/ */
public Translog(TranslogConfig config, TranslogGeneration translogGeneration) throws IOException { public Translog(
final TranslogConfig config,
final TranslogGeneration translogGeneration,
final LongSupplier globalCheckpointSupplier) throws IOException {
super(config.getShardId(), config.getIndexSettings()); super(config.getShardId(), config.getIndexSettings());
this.config = config; this.config = config;
this.globalCheckpointSupplier = globalCheckpointSupplier;
if (translogGeneration == null || translogGeneration.translogUUID == null) { // legacy case if (translogGeneration == null || translogGeneration.translogUUID == null) { // legacy case
translogUUID = UUIDs.randomBase64UUID(); translogUUID = UUIDs.randomBase64UUID();
} else { } else {
@ -157,7 +160,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
try { try {
if (translogGeneration != null) { if (translogGeneration != null) {
final Checkpoint checkpoint = readCheckpoint(); final Checkpoint checkpoint = readCheckpoint(location);
final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1)); final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1));
final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
// this is special handling for error condition when we create a new writer but we fail to bake // this is special handling for error condition when we create a new writer but we fail to bake
@ -195,7 +198,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
logger.debug("wipe translog location - creating new translog"); logger.debug("wipe translog location - creating new translog");
Files.createDirectories(location); Files.createDirectories(location);
final long generation = 1; final long generation = 1;
Checkpoint checkpoint = new Checkpoint(0, 0, generation); Checkpoint checkpoint = new Checkpoint(0, 0, generation, globalCheckpointSupplier.getAsLong());
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME); final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
IOUtils.fsync(checkpointFile, false); IOUtils.fsync(checkpointFile, false);
@ -372,11 +375,25 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
/**
* Creates a new translog for the specified generation.
*
* @param fileGeneration the translog generation
* @return a writer for the new translog
* @throws IOException if creating the translog failed
*/
TranslogWriter createWriter(long fileGeneration) throws IOException { TranslogWriter createWriter(long fileGeneration) throws IOException {
TranslogWriter newFile; final TranslogWriter newFile;
try { try {
newFile = TranslogWriter.create(shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), getChannelFactory(), config.getBufferSize()); newFile = TranslogWriter.create(
} catch (IOException e) { shardId,
translogUUID,
fileGeneration,
location.resolve(getFilename(fileGeneration)),
getChannelFactory(),
config.getBufferSize(),
globalCheckpointSupplier);
} catch (final IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e); throw new TranslogException(shardId, "failed to create new translog file", e);
} }
return newFile; return newFile;
@ -441,6 +458,16 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
} }
/**
* The last synced checkpoint for this translog.
*
* @return the last synced checkpoint
*/
public long getLastSyncedGlobalCheckpoint() {
try (final ReleasableLock ignored = readLock.acquire()) {
return current.getLastSyncedCheckpoint().globalCheckpoint;
}
}
/** /**
* Snapshots the current transaction log allowing to safely iterate over the snapshot. * Snapshots the current transaction log allowing to safely iterate over the snapshot.
@ -531,7 +558,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
/** /**
* Ensures that all locations in the given stream have been synced / written to the underlying storage. * Ensures that all locations in the given stream have been synced / written to the underlying storage.
* This method allows for internal optimization to minimize the amout of fsync operations if multiple * This method allows for internal optimization to minimize the amount of fsync operations if multiple
* locations must be synced. * locations must be synced.
* *
* @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code> * @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>
@ -1356,10 +1383,21 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
/** Reads and returns the current checkpoint */ /** Reads and returns the current checkpoint */
final Checkpoint readCheckpoint() throws IOException { static final Checkpoint readCheckpoint(final Path location) throws IOException {
return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)); return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
} }
/**
* Reads the sequence numbers global checkpoint from the translog checkpoint.
*
* @param location the location of the translog
* @return the global checkpoint
* @throws IOException if an I/O exception occurred reading the checkpoint
*/
public static final long readGlobalCheckpoint(final Path location) throws IOException {
return readCheckpoint(location).globalCheckpoint;
}
/** /**
* Returns the translog uuid used to associate a lucene index with a translog. * Returns the translog uuid used to associate a lucene index with a translog.
*/ */

View File

@ -19,15 +19,11 @@
package org.elasticsearch.index.translog; package org.elasticsearch.index.translog;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog.TranslogGeneration;
import org.elasticsearch.threadpool.ThreadPool;
import java.nio.file.Path; import java.nio.file.Path;

View File

@ -151,10 +151,6 @@ public class TranslogReader extends BaseTranslogReader implements Closeable {
Channels.readFromFileChannelWithEofException(channel, position, buffer); Channels.readFromFileChannelWithEofException(channel, position, buffer);
} }
public Checkpoint getInfo() {
return new Checkpoint(length, totalOperations, getGeneration());
}
@Override @Override
public final void close() throws IOException { public final void close() throws IOException {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {

View File

@ -38,6 +38,7 @@ import java.nio.channels.FileChannel;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
public class TranslogWriter extends BaseTranslogReader implements Closeable { public class TranslogWriter extends BaseTranslogReader implements Closeable {
@ -48,8 +49,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
private final ShardId shardId; private final ShardId shardId;
private final ChannelFactory channelFactory; private final ChannelFactory channelFactory;
/* the offset in bytes that was written when the file was last synced*/ // the last checkpoint that was written when the translog was last synced
private volatile long lastSyncedOffset; private volatile Checkpoint lastSyncedCheckpoint;
/* the number of translog operations written to this file */ /* the number of translog operations written to this file */
private volatile int operationCounter; private volatile int operationCounter;
/* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */ /* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
@ -59,17 +60,27 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
/* the total offset of this file including the bytes written to the file as well as into the buffer */ /* the total offset of this file including the bytes written to the file as well as into the buffer */
private volatile long totalOffset; private volatile long totalOffset;
private final LongSupplier globalCheckpointSupplier;
protected final AtomicBoolean closed = new AtomicBoolean(false); protected final AtomicBoolean closed = new AtomicBoolean(false);
// lock order synchronized(syncLock) -> synchronized(this) // lock order synchronized(syncLock) -> synchronized(this)
private final Object syncLock = new Object(); private final Object syncLock = new Object();
public TranslogWriter(ChannelFactory channelFactory, ShardId shardId, long generation, FileChannel channel, Path path, ByteSizeValue bufferSize) throws IOException { public TranslogWriter(
super(generation, channel, path, channel.position()); final ChannelFactory channelFactory,
final ShardId shardId,
final Checkpoint initialCheckpoint,
final FileChannel channel,
final Path path,
final ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier) throws IOException {
super(initialCheckpoint.generation, channel, path, channel.position());
this.shardId = shardId; this.shardId = shardId;
this.channelFactory = channelFactory; this.channelFactory = channelFactory;
this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt()); this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt());
this.lastSyncedOffset = channel.position(); this.lastSyncedCheckpoint = initialCheckpoint;
totalOffset = lastSyncedOffset; this.totalOffset = initialCheckpoint.offset;
this.globalCheckpointSupplier = globalCheckpointSupplier;
} }
static int getHeaderLength(String translogUUID) { static int getHeaderLength(String translogUUID) {
@ -86,7 +97,14 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
out.writeBytes(ref.bytes, ref.offset, ref.length); out.writeBytes(ref.bytes, ref.offset, ref.length);
} }
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException { public static TranslogWriter create(
ShardId shardId,
String translogUUID,
long fileGeneration,
Path file,
ChannelFactory channelFactory,
ByteSizeValue bufferSize,
final LongSupplier globalCheckpointSupplier) throws IOException {
final BytesRef ref = new BytesRef(translogUUID); final BytesRef ref = new BytesRef(translogUUID);
final int headerLength = getHeaderLength(ref.length); final int headerLength = getHeaderLength(ref.length);
final FileChannel channel = channelFactory.open(file); final FileChannel channel = channelFactory.open(file);
@ -96,8 +114,10 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel)); final OutputStreamDataOutput out = new OutputStreamDataOutput(java.nio.channels.Channels.newOutputStream(channel));
writeHeader(out, ref); writeHeader(out, ref);
channel.force(true); channel.force(true);
writeCheckpoint(channelFactory, headerLength, 0, file.getParent(), fileGeneration); final Checkpoint checkpoint =
final TranslogWriter writer = new TranslogWriter(channelFactory, shardId, fileGeneration, channel, file, bufferSize); writeCheckpoint(channelFactory, headerLength, 0, globalCheckpointSupplier.getAsLong(), file.getParent(), fileGeneration);
final TranslogWriter writer =
new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier);
return writer; return writer;
} catch (Exception exception) { } catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
@ -163,7 +183,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
* returns true if there are buffered ops * returns true if there are buffered ops
*/ */
public boolean syncNeeded() { public boolean syncNeeded() {
return totalOffset != lastSyncedOffset; return totalOffset != lastSyncedCheckpoint.offset || globalCheckpointSupplier.getAsLong() != lastSyncedCheckpoint.globalCheckpoint;
} }
@Override @Override
@ -200,7 +220,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
if (closed.compareAndSet(false, true)) { if (closed.compareAndSet(false, true)) {
boolean success = false; boolean success = false;
try { try {
final TranslogReader reader = new TranslogReader(generation, channel, path, firstOperationOffset, getWrittenOffset(), operationCounter); final TranslogReader reader =
new TranslogReader(generation, channel, path, firstOperationOffset, getWrittenOffset(), operationCounter);
success = true; success = true;
return reader; return reader;
} finally { } finally {
@ -244,19 +265,21 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
* @return <code>true</code> if this call caused an actual sync operation * @return <code>true</code> if this call caused an actual sync operation
*/ */
public boolean syncUpTo(long offset) throws IOException { public boolean syncUpTo(long offset) throws IOException {
if (lastSyncedOffset < offset && syncNeeded()) { if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait
if (lastSyncedOffset < offset && syncNeeded()) { if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
// double checked locking - we don't want to fsync unless we have to and now that we have // double checked locking - we don't want to fsync unless we have to and now that we have
// the lock we should check again since if this code is busy we might have fsynced enough already // the lock we should check again since if this code is busy we might have fsynced enough already
final long offsetToSync; final long offsetToSync;
final int opsCounter; final int opsCounter;
final long globalCheckpoint;
synchronized (this) { synchronized (this) {
ensureOpen(); ensureOpen();
try { try {
outputStream.flush(); outputStream.flush();
offsetToSync = totalOffset; offsetToSync = totalOffset;
opsCounter = operationCounter; opsCounter = operationCounter;
globalCheckpoint = globalCheckpointSupplier.getAsLong();
} catch (Exception ex) { } catch (Exception ex) {
try { try {
closeWithTragicEvent(ex); closeWithTragicEvent(ex);
@ -268,9 +291,11 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
} }
// now do the actual fsync outside of the synchronized block such that // now do the actual fsync outside of the synchronized block such that
// we can continue writing to the buffer etc. // we can continue writing to the buffer etc.
final Checkpoint checkpoint;
try { try {
channel.force(false); channel.force(false);
writeCheckpoint(channelFactory, offsetToSync, opsCounter, path.getParent(), generation); checkpoint =
writeCheckpoint(channelFactory, offsetToSync, opsCounter, globalCheckpoint, path.getParent(), generation);
} catch (Exception ex) { } catch (Exception ex) {
try { try {
closeWithTragicEvent(ex); closeWithTragicEvent(ex);
@ -279,8 +304,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
} }
throw ex; throw ex;
} }
assert lastSyncedOffset <= offsetToSync : "illegal state: " + lastSyncedOffset + " <= " + offsetToSync; assert lastSyncedCheckpoint.offset <= offsetToSync :
lastSyncedOffset = offsetToSync; // write protected by syncLock "illegal state: " + lastSyncedCheckpoint.offset + " <= " + offsetToSync;
lastSyncedCheckpoint = checkpoint; // write protected by syncLock
return true; return true;
} }
} }
@ -306,10 +332,26 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
Channels.readFromFileChannelWithEofException(channel, position, targetBuffer); Channels.readFromFileChannelWithEofException(channel, position, targetBuffer);
} }
private static void writeCheckpoint(ChannelFactory channelFactory, long syncPosition, int numOperations, Path translogFile, long generation) throws IOException { private static Checkpoint writeCheckpoint(
ChannelFactory channelFactory,
long syncPosition,
int numOperations,
long globalCheckpoint,
Path translogFile,
long generation) throws IOException {
final Path checkpointFile = translogFile.resolve(Translog.CHECKPOINT_FILE_NAME); final Path checkpointFile = translogFile.resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation); final Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation, globalCheckpoint);
Checkpoint.write(channelFactory::open, checkpointFile, checkpoint, StandardOpenOption.WRITE); Checkpoint.write(channelFactory::open, checkpointFile, checkpoint, StandardOpenOption.WRITE);
return checkpoint;
}
/**
* The last synced checkpoint for this translog.
*
* @return the last synced checkpoint
*/
public Checkpoint getLastSyncedCheckpoint() {
return lastSyncedCheckpoint;
} }
protected final void ensureOpen() { protected final void ensureOpen() {

View File

@ -39,6 +39,7 @@ import org.elasticsearch.cli.Terminal;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.Channels; import java.nio.channels.Channels;
@ -166,7 +167,7 @@ public class TruncateTranslogCommand extends SettingCommand {
/** Write a checkpoint file to the given location with the given generation */ /** Write a checkpoint file to the given location with the given generation */
public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException { public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException {
Checkpoint emptyCheckpoint = new Checkpoint(translogLength, 0, translogGeneration); Checkpoint emptyCheckpoint = new Checkpoint(translogLength, 0, translogGeneration, SequenceNumbersService.UNASSIGNED_SEQ_NO);
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint, Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
// fsync with metadata here to make sure. // fsync with metadata here to make sure.

View File

@ -286,7 +286,7 @@ public class InternalEngineTests extends ESTestCase {
protected Translog createTranslog(Path translogPath) throws IOException { protected Translog createTranslog(Path translogPath) throws IOException {
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
return new Translog(translogConfig, null); return new Translog(translogConfig, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
} }
protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() { protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() {
@ -599,10 +599,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat( assertThat(
Long.parseLong(stats1.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), Long.parseLong(stats1.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
assertThat(stats1.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY));
assertThat(
Long.parseLong(stats1.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO));
assertThat(stats1.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO)); assertThat(stats1.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
assertThat( assertThat(
Long.parseLong(stats1.getUserData().get(InternalEngine.MAX_SEQ_NO)), Long.parseLong(stats1.getUserData().get(InternalEngine.MAX_SEQ_NO)),
@ -628,10 +625,6 @@ public class InternalEngineTests extends ESTestCase {
not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY)));
assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get())); assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get()));
assertThat(stats2.getUserData(), hasKey(InternalEngine.GLOBAL_CHECKPOINT_KEY));
assertThat(
Long.parseLong(stats2.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(globalCheckpoint.get()));
assertThat(stats2.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO)); assertThat(stats2.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO));
assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.MAX_SEQ_NO)), equalTo(maxSeqNo.get())); assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.MAX_SEQ_NO)), equalTo(maxSeqNo.get()));
} finally { } finally {
@ -1702,13 +1695,13 @@ public class InternalEngineTests extends ESTestCase {
if (rarely()) { if (rarely()) {
localCheckpoint = primarySeqNo; localCheckpoint = primarySeqNo;
maxSeqNo = primarySeqNo; maxSeqNo = primarySeqNo;
globalCheckpoint = replicaLocalCheckpoint;
initialEngine.seqNoService().updateGlobalCheckpointOnPrimary(); initialEngine.seqNoService().updateGlobalCheckpointOnPrimary();
initialEngine.flush(true, true); initialEngine.flush(true, true);
} }
} }
initialEngine.seqNoService().updateGlobalCheckpointOnPrimary(); initialEngine.seqNoService().updateGlobalCheckpointOnPrimary();
globalCheckpoint = initialEngine.seqNoService().getGlobalCheckpoint();
assertEquals(primarySeqNo, initialEngine.seqNoService().getMaxSeqNo()); assertEquals(primarySeqNo, initialEngine.seqNoService().getMaxSeqNo());
assertThat(initialEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo)); assertThat(initialEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo));
@ -1718,8 +1711,9 @@ public class InternalEngineTests extends ESTestCase {
assertThat( assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
equalTo(localCheckpoint)); equalTo(localCheckpoint));
initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint
assertThat( assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(),
equalTo(globalCheckpoint)); equalTo(globalCheckpoint));
assertThat( assertThat(
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)), Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
@ -1739,7 +1733,7 @@ public class InternalEngineTests extends ESTestCase {
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
equalTo(primarySeqNo)); equalTo(primarySeqNo));
assertThat( assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), recoveringEngine.getTranslog().getLastSyncedGlobalCheckpoint(),
equalTo(globalCheckpoint)); equalTo(globalCheckpoint));
assertThat( assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)), Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
@ -2344,8 +2338,10 @@ public class InternalEngineTests extends ESTestCase {
Translog.TranslogGeneration generation = engine.getTranslog().getGeneration(); Translog.TranslogGeneration generation = engine.getTranslog().getGeneration();
engine.close(); engine.close();
Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE) Translog translog = new Translog(
, null); new TranslogConfig(shardId, createTempDir(), INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE),
null,
() -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8")))); translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration()); assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
translog.close(); translog.close();

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.elasticsearch.index.seqno;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashSet;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class GlobalCheckpointSyncActionTests extends ESTestCase {
private ThreadPool threadPool;
private Transport transport;
private ClusterService clusterService;
private TransportService transportService;
private ShardStateAction shardStateAction;
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
}
public void tearDown() throws Exception {
try {
IOUtils.close(transportService, clusterService, transport);
} finally {
terminate(threadPool);
}
super.tearDown();
}
public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception {
final IndicesService indicesService = mock(IndicesService.class);
final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);
final Translog translog = mock(Translog.class);
when(indexShard.getTranslog()).thenReturn(translog);
final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver(Settings.EMPTY));
final ShardId shardId = new ShardId(index, id);
final GlobalCheckpointSyncAction.PrimaryRequest primaryRequest = new GlobalCheckpointSyncAction.PrimaryRequest(shardId);
if (randomBoolean()) {
action.shardOperationOnPrimary(primaryRequest);
} else {
action.shardOperationOnReplica(new GlobalCheckpointSyncAction.ReplicaRequest(primaryRequest, randomPositiveLong()));
}
verify(translog).sync();
}
}

View File

@ -48,6 +48,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog.Location; import org.elasticsearch.index.translog.Translog.Location;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -85,6 +86,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -97,6 +99,7 @@ public class TranslogTests extends ESTestCase {
protected final ShardId shardId = new ShardId("index", "_na_", 1); protected final ShardId shardId = new ShardId("index", "_na_", 1);
protected Translog translog; protected Translog translog;
private AtomicLong globalCheckpoint;
protected Path translogDir; protected Path translogDir;
@Override @Override
@ -136,7 +139,8 @@ public class TranslogTests extends ESTestCase {
} }
private Translog create(Path path) throws IOException { private Translog create(Path path) throws IOException {
return new Translog(getTranslogConfig(path), null); globalCheckpoint = new AtomicLong(SequenceNumbersService.UNASSIGNED_SEQ_NO);
return new Translog(getTranslogConfig(path), null, () -> globalCheckpoint.get());
} }
private TranslogConfig getTranslogConfig(Path path) { private TranslogConfig getTranslogConfig(Path path) {
@ -845,11 +849,16 @@ public class TranslogTests extends ESTestCase {
List<Translog.Location> locations = new ArrayList<>(); List<Translog.Location> locations = new ArrayList<>();
int translogOperations = randomIntBetween(10, 100); int translogOperations = randomIntBetween(10, 100);
int lastSynced = -1; int lastSynced = -1;
long lastSyncedGlobalCheckpoint = globalCheckpoint.get();
for (int op = 0; op < translogOperations; op++) { for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (randomBoolean()) {
globalCheckpoint.set(globalCheckpoint.get() + randomIntBetween(1, 16));
}
if (frequently()) { if (frequently()) {
translog.sync(); translog.sync();
lastSynced = op; lastSynced = op;
lastSyncedGlobalCheckpoint = globalCheckpoint.get();
} }
} }
assertEquals(translogOperations, translog.totalOperations()); assertEquals(translogOperations, translog.totalOperations());
@ -873,6 +882,7 @@ public class TranslogTests extends ESTestCase {
assertNull(next); assertNull(next);
} }
assertEquals(translogOperations + 1, translog.totalOperations()); assertEquals(translogOperations + 1, translog.totalOperations());
assertThat(checkpoint.globalCheckpoint, equalTo(lastSyncedGlobalCheckpoint));
translog.close(); translog.close();
} }
@ -977,7 +987,7 @@ public class TranslogTests extends ESTestCase {
TranslogConfig config = translog.getConfig(); TranslogConfig config = translog.getConfig();
translog.close(); translog.close();
translog = new Translog(config, translogGeneration); translog = new Translog(config, translogGeneration,() -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
if (translogGeneration == null) { if (translogGeneration == null) {
assertEquals(0, translog.stats().estimatedNumberOfOperations()); assertEquals(0, translog.stats().estimatedNumberOfOperations());
assertEquals(1, translog.currentFileGeneration()); assertEquals(1, translog.currentFileGeneration());
@ -1018,7 +1028,7 @@ public class TranslogTests extends ESTestCase {
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted // we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
// translog here as well. // translog here as well.
TranslogConfig config = translog.getConfig(); TranslogConfig config = translog.getConfig();
try (Translog translog = new Translog(config, translogGeneration)) { try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded()); assertFalse(translog.syncNeeded());
@ -1031,7 +1041,7 @@ public class TranslogTests extends ESTestCase {
} }
} }
if (randomBoolean()) { // recover twice if (randomBoolean()) { // recover twice
try (Translog translog = new Translog(config, translogGeneration)) { try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertFalse(translog.syncNeeded()); assertFalse(translog.syncNeeded());
@ -1072,7 +1082,7 @@ public class TranslogTests extends ESTestCase {
Checkpoint read = Checkpoint.read(ckp); Checkpoint read = Checkpoint.read(ckp);
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation))); Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
try (Translog translog = new Translog(config, translogGeneration)) { try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded()); assertFalse(translog.syncNeeded());
@ -1087,7 +1097,7 @@ public class TranslogTests extends ESTestCase {
} }
if (randomBoolean()) { // recover twice if (randomBoolean()) { // recover twice
try (Translog translog = new Translog(config, translogGeneration)) { try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration()); assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertFalse(translog.syncNeeded()); assertFalse(translog.syncNeeded());
@ -1123,15 +1133,15 @@ public class TranslogTests extends ESTestCase {
TranslogConfig config = translog.getConfig(); TranslogConfig config = translog.getConfig();
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp); Checkpoint read = Checkpoint.read(ckp);
Checkpoint corrupted = new Checkpoint(0, 0, 0); Checkpoint corrupted = new Checkpoint(0, 0, 0, SequenceNumbersService.UNASSIGNED_SEQ_NO);
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
try (Translog translog = new Translog(config, translogGeneration)) { try (Translog ignored = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
fail("corrupted"); fail("corrupted");
} catch (IllegalStateException ex) { } catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3178, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}"); assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3178, numOps=55, translogFileGeneration=2, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration=0, globalCheckpoint=-2}");
} }
Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
try (Translog translog = new Translog(config, translogGeneration)) { try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration()); assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded()); assertFalse(translog.syncNeeded());
@ -1205,12 +1215,12 @@ public class TranslogTests extends ESTestCase {
Translog.TranslogGeneration generation = new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, Translog.TranslogGeneration generation = new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1,
translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration); translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration);
try { try {
new Translog(config, generation); new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
fail("translog doesn't belong to this UUID"); fail("translog doesn't belong to this UUID");
} catch (TranslogCorruptedException ex) { } catch (TranslogCorruptedException ex) {
} }
this.translog = new Translog(config, translogGeneration); this.translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
Translog.Snapshot snapshot = this.translog.newSnapshot(); Translog.Snapshot snapshot = this.translog.newSnapshot();
for (int i = firstUncommitted; i < translogOperations; i++) { for (int i = firstUncommitted; i < translogOperations; i++) {
Translog.Operation next = snapshot.next(); Translog.Operation next = snapshot.next();
@ -1381,7 +1391,7 @@ public class TranslogTests extends ESTestCase {
assertFalse(translog.isOpen()); assertFalse(translog.isOpen());
translog.close(); // we are closed translog.close(); // we are closed
try (Translog tlog = new Translog(config, translogGeneration)) { try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration()); assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
assertFalse(tlog.syncNeeded()); assertFalse(tlog.syncNeeded());
@ -1508,7 +1518,7 @@ public class TranslogTests extends ESTestCase {
iterator.remove(); iterator.remove();
} }
} }
try (Translog tlog = new Translog(config, translog.getGeneration())) { try (Translog tlog = new Translog(config, translog.getGeneration(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
Translog.Snapshot snapshot = tlog.newSnapshot(); Translog.Snapshot snapshot = tlog.newSnapshot();
if (writtenOperations.size() != snapshot.totalOperations()) { if (writtenOperations.size() != snapshot.totalOperations()) {
for (int i = 0; i < threadCount; i++) { for (int i = 0; i < threadCount; i++) {
@ -1563,7 +1573,7 @@ public class TranslogTests extends ESTestCase {
private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException, Translog.TranslogGeneration generation) throws IOException { private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException, Translog.TranslogGeneration generation) throws IOException {
return new Translog(config, generation) { return new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) {
@Override @Override
ChannelFactory getChannelFactory() { ChannelFactory getChannelFactory() {
final ChannelFactory factory = super.getChannelFactory(); final ChannelFactory factory = super.getChannelFactory();
@ -1675,12 +1685,12 @@ public class TranslogTests extends ESTestCase {
public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException { public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException {
Path tempDir = createTempDir(); Path tempDir = createTempDir();
TranslogConfig config = getTranslogConfig(tempDir); TranslogConfig config = getTranslogConfig(tempDir);
Translog translog = new Translog(config, null); Translog translog = new Translog(config, null, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
translog.add(new Translog.Index("test", "boom", "boom".getBytes(Charset.forName("UTF-8")))); translog.add(new Translog.Index("test", "boom", "boom".getBytes(Charset.forName("UTF-8"))));
Translog.TranslogGeneration generation = translog.getGeneration(); Translog.TranslogGeneration generation = translog.getGeneration();
translog.close(); translog.close();
try { try {
new Translog(config, generation) { new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO) {
@Override @Override
protected TranslogWriter createWriter(long fileGeneration) throws IOException { protected TranslogWriter createWriter(long fileGeneration) throws IOException {
throw new MockDirectoryWrapper.FakeIOException(); throw new MockDirectoryWrapper.FakeIOException();
@ -1703,7 +1713,7 @@ public class TranslogTests extends ESTestCase {
Checkpoint read = Checkpoint.read(ckp); Checkpoint read = Checkpoint.read(ckp);
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation))); Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
try (Translog tlog = new Translog(config, translogGeneration)) { try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded()); assertFalse(tlog.syncNeeded());
Translog.Snapshot snapshot = tlog.newSnapshot(); Translog.Snapshot snapshot = tlog.newSnapshot();
@ -1714,7 +1724,7 @@ public class TranslogTests extends ESTestCase {
} }
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
} }
try (Translog tlog = new Translog(config, translogGeneration)) { try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded()); assertFalse(tlog.syncNeeded());
Translog.Snapshot snapshot = tlog.newSnapshot(); Translog.Snapshot snapshot = tlog.newSnapshot();
@ -1737,7 +1747,7 @@ public class TranslogTests extends ESTestCase {
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
try { try {
Translog tlog = new Translog(config, translogGeneration); Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
fail("file already exists?"); fail("file already exists?");
} catch (TranslogException ex) { } catch (TranslogException ex) {
// all is well // all is well
@ -1758,7 +1768,7 @@ public class TranslogTests extends ESTestCase {
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
// we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition // we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog")); Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog"));
try (Translog tlog = new Translog(config, translogGeneration)) { try (Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
assertNotNull(translogGeneration); assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded()); assertFalse(tlog.syncNeeded());
Translog.Snapshot snapshot = tlog.newSnapshot(); Translog.Snapshot snapshot = tlog.newSnapshot();
@ -1771,7 +1781,7 @@ public class TranslogTests extends ESTestCase {
} }
try { try {
Translog tlog = new Translog(config, translogGeneration); Translog tlog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
fail("file already exists?"); fail("file already exists?");
} catch (TranslogException ex) { } catch (TranslogException ex) {
// all is well // all is well
@ -1834,7 +1844,7 @@ public class TranslogTests extends ESTestCase {
} catch (IOException ex) { } catch (IOException ex) {
assertEquals(ex.getMessage(), "__FAKE__ no space left on device"); assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
} finally { } finally {
Checkpoint checkpoint = failableTLog.readCheckpoint(); Checkpoint checkpoint = Translog.readCheckpoint(config.getTranslogPath());
if (checkpoint.numOps == unsynced.size() + syncedDocs.size()) { if (checkpoint.numOps == unsynced.size() + syncedDocs.size()) {
syncedDocs.addAll(unsynced); // failed in fsync but got fully written syncedDocs.addAll(unsynced); // failed in fsync but got fully written
unsynced.clear(); unsynced.clear();
@ -1859,7 +1869,7 @@ public class TranslogTests extends ESTestCase {
} }
fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file
try (Translog translog = new Translog(config, generation)) { try (Translog translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
Translog.Snapshot snapshot = translog.newSnapshot(); Translog.Snapshot snapshot = translog.newSnapshot();
assertEquals(syncedDocs.size(), snapshot.totalOperations()); assertEquals(syncedDocs.size(), snapshot.totalOperations());
for (int i = 0; i < syncedDocs.size(); i++) { for (int i = 0; i < syncedDocs.size(); i++) {
@ -1872,10 +1882,10 @@ public class TranslogTests extends ESTestCase {
} }
public void testCheckpointOnDiskFull() throws IOException { public void testCheckpointOnDiskFull() throws IOException {
Checkpoint checkpoint = new Checkpoint(randomLong(), randomInt(), randomLong()); Checkpoint checkpoint = new Checkpoint(randomLong(), randomInt(), randomLong(), randomLong());
Path tempDir = createTempDir(); Path tempDir = createTempDir();
Checkpoint.write(FileChannel::open, tempDir.resolve("foo.cpk"), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); Checkpoint.write(FileChannel::open, tempDir.resolve("foo.cpk"), checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
Checkpoint checkpoint2 = new Checkpoint(randomLong(), randomInt(), randomLong()); Checkpoint checkpoint2 = new Checkpoint(randomLong(), randomInt(), randomLong(), randomLong());
try { try {
Checkpoint.write((p, o) -> { Checkpoint.write((p, o) -> {
if (randomBoolean()) { if (randomBoolean()) {
@ -1906,14 +1916,14 @@ public class TranslogTests extends ESTestCase {
Translog.TranslogGeneration generation = translog.getGeneration(); Translog.TranslogGeneration generation = translog.getGeneration();
TranslogConfig config = translog.getConfig(); TranslogConfig config = translog.getConfig();
translog.close(); translog.close();
translog = new Translog(config, generation); translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
translog.add(new Translog.Index("test", "2", new byte[]{2})); translog.add(new Translog.Index("test", "2", new byte[]{2}));
translog.prepareCommit(); translog.prepareCommit();
Translog.View view = translog.newView(); Translog.View view = translog.newView();
translog.add(new Translog.Index("test", "3", new byte[]{3})); translog.add(new Translog.Index("test", "3", new byte[]{3}));
translog.close(); translog.close();
IOUtils.close(view); IOUtils.close(view);
translog = new Translog(config, generation); translog = new Translog(config, generation, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
} }
public static Translog.Location randomTranslogLocation() { public static Translog.Location randomTranslogLocation() {

View File

@ -87,7 +87,7 @@ public class TranslogVersionTests extends ESTestCase {
public TranslogReader openReader(Path path, long id) throws IOException { public TranslogReader openReader(Path path, long id) throws IOException {
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ); FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
try { try {
TranslogReader reader = TranslogReader.open(channel, path, new Checkpoint(Files.size(path), 1, id), null); TranslogReader reader = TranslogReader.open(channel, path, new Checkpoint(Files.size(path), 1, id, 0), null);
channel = null; channel = null;
return reader; return reader;
} finally { } finally {