Enable a long translog retention policy by default (#25294)
#25147 added the translog deletion policy but didn't enable it by default. This PR enables a default retention of 512MB (same maximum size of the current translog) and an age of 12 hours (i.e., after 12 hours all translog files will be deleted). This increases to chance to have an ops based recovery, even if the primary flushed or the replica was offline for a few hours. In order to see which parts of the translog are committed into lucene the translog stats are extended to include information about uncommitted operations. Views now include all translog ops and guarantee, as before, that those will not go away. Snapshotting a view allows to filter out generations that are not relevant based on a specific sequence number. Relates to #10708
This commit is contained in:
parent
29e80eea40
commit
d963882053
|
@ -118,7 +118,7 @@ public final class IndexSettings {
|
|||
* the chance of ops based recoveries.
|
||||
**/
|
||||
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
|
||||
Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic,
|
||||
Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueHours(12), TimeValue.timeValueMillis(-1), Property.Dynamic,
|
||||
Property.IndexScope);
|
||||
|
||||
/**
|
||||
|
@ -127,7 +127,7 @@ public final class IndexSettings {
|
|||
* the chance of ops based recoveries.
|
||||
**/
|
||||
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
|
||||
Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic,
|
||||
Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
|
||||
Property.IndexScope);
|
||||
|
||||
/**
|
||||
|
|
|
@ -803,6 +803,12 @@ public abstract class Engine implements Closeable {
|
|||
*/
|
||||
public abstract CommitId flush() throws EngineException;
|
||||
|
||||
/**
|
||||
* Rolls the translog generation and cleans unneeded.
|
||||
*/
|
||||
public abstract void rollTranslogGeneration() throws EngineException;
|
||||
|
||||
|
||||
/**
|
||||
* Force merges to 1 segment
|
||||
*/
|
||||
|
|
|
@ -1215,7 +1215,7 @@ public class InternalEngine extends Engine {
|
|||
ensureOpen();
|
||||
ensureCanFlush();
|
||||
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
|
||||
if (syncId != null && translog.totalOperations() == 0 && indexWriter.hasUncommittedChanges()) {
|
||||
if (syncId != null && translog.uncommittedOperations() == 0 && indexWriter.hasUncommittedChanges()) {
|
||||
logger.trace("start renewing sync commit [{}]", syncId);
|
||||
commitIndexWriter(indexWriter, translog, syncId);
|
||||
logger.debug("successfully sync committed. sync id [{}].", syncId);
|
||||
|
@ -1317,6 +1317,25 @@ public class InternalEngine extends Engine {
|
|||
return new CommitId(newCommitId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollTranslogGeneration() throws EngineException {
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
translog.rollGeneration();
|
||||
translog.trimUnreferencedReaders();
|
||||
} catch (AlreadyClosedException e) {
|
||||
failOnTragicEvent(e);
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
failEngine("translog trimming failed", e);
|
||||
} catch (Exception inner) {
|
||||
e.addSuppressed(inner);
|
||||
}
|
||||
throw new EngineException(shardId, "failed to roll translog", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void pruneDeletedTombstones() {
|
||||
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
|
||||
|
||||
|
|
|
@ -921,13 +921,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
/**
|
||||
* Rolls the tranlog generation.
|
||||
*
|
||||
* @throws IOException if any file operations on the translog throw an I/O exception
|
||||
* Rolls the tranlog generation and cleans unneeded.
|
||||
*/
|
||||
private void rollTranslogGeneration() throws IOException {
|
||||
private void rollTranslogGeneration() {
|
||||
final Engine engine = getEngine();
|
||||
engine.getTranslog().rollGeneration();
|
||||
engine.rollTranslogGeneration();
|
||||
}
|
||||
|
||||
public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
|
||||
|
@ -2142,7 +2140,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws IOException {
|
||||
protected void doRun() throws Exception {
|
||||
rollTranslogGeneration();
|
||||
}
|
||||
|
||||
|
|
|
@ -80,7 +80,8 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
|
|||
|
||||
public void resync(IndexShard indexShard, ActionListener<ResyncTask> listener) throws IOException {
|
||||
try (Translog.View view = indexShard.acquireTranslogView()) {
|
||||
Translog.Snapshot snapshot = view.snapshot();
|
||||
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1;
|
||||
Translog.Snapshot snapshot = view.snapshot(startingSeqNo);
|
||||
ShardId shardId = indexShard.shardId();
|
||||
|
||||
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
|
||||
|
@ -104,7 +105,7 @@ public class PrimaryReplicaSyncer extends AbstractComponent {
|
|||
};
|
||||
|
||||
resync(shardId, indexShard.routingEntry().allocationId().getId(), wrappedSnapshot,
|
||||
indexShard.getGlobalCheckpoint() + 1, listener);
|
||||
startingSeqNo, listener);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -367,17 +367,31 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
|
||||
|
||||
/**
|
||||
* Returns the number of operations in the transaction files that aren't committed to lucene..
|
||||
* Returns the number of operations in the translog files that aren't committed to lucene.
|
||||
*/
|
||||
public int totalOperations() {
|
||||
public int uncommittedOperations() {
|
||||
return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size in bytes of the translog files that aren't committed to lucene.
|
||||
*/
|
||||
public long uncommittedSizeInBytes() {
|
||||
return sizeInBytesByMinGen(deletionPolicy.getMinTranslogGenerationForRecovery());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of operations in the translog files
|
||||
*/
|
||||
public int totalOperations() {
|
||||
return totalOperations(-1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size in bytes of the v files
|
||||
*/
|
||||
public long sizeInBytes() {
|
||||
return sizeInBytes(deletionPolicy.getMinTranslogGenerationForRecovery());
|
||||
return sizeInBytesByMinGen(-1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -394,9 +408,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the size in bytes of the translog files that aren't committed to lucene.
|
||||
* Returns the number of operations in the transaction files that aren't committed to lucene..
|
||||
*/
|
||||
private long sizeInBytes(long minGeneration) {
|
||||
private int totalOperationsInGensAboveSeqNo(long minSeqNo) {
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size in bytes of the translog files above the given generation
|
||||
*/
|
||||
private long sizeInBytesByMinGen(long minGeneration) {
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
return Stream.concat(readers.stream(), Stream.of(current))
|
||||
|
@ -406,6 +430,16 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size in bytes of the translog files with ops above the given seqNo
|
||||
*/
|
||||
private long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new translog for the specified generation.
|
||||
*
|
||||
|
@ -493,7 +527,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
* @return {@code true} if the translog should be flushed
|
||||
*/
|
||||
public boolean shouldFlush() {
|
||||
final long size = this.sizeInBytes();
|
||||
final long size = this.uncommittedSizeInBytes();
|
||||
return size > this.indexSettings.getFlushThresholdSize().getBytes();
|
||||
}
|
||||
|
||||
|
@ -560,6 +594,25 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
|
||||
private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo) {
|
||||
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
|
||||
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
|
||||
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
|
||||
return Stream.concat(readers.stream(), Stream.of(current))
|
||||
.filter(reader -> {
|
||||
final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
|
||||
return maxSeqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo;
|
||||
});
|
||||
}
|
||||
|
||||
private Snapshot createSnapshotFromMinSeqNo(long minSeqNo) {
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
Snapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot).toArray(Snapshot[]::new);
|
||||
return new MultiSnapshot(snapshots);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a view into the current translog that is guaranteed to retain all current operations
|
||||
* while receiving future ones as well
|
||||
|
@ -567,7 +620,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
public Translog.View newView() {
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
final long viewGen = deletionPolicy.acquireTranslogGenForView();
|
||||
final long viewGen = getMinFileGeneration();
|
||||
deletionPolicy.acquireTranslogGenForView(viewGen);
|
||||
try {
|
||||
return new View(viewGen);
|
||||
} catch (Exception e) {
|
||||
|
@ -674,7 +728,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
public TranslogStats stats() {
|
||||
// acquire lock to make the two numbers roughly consistent (no file change half way)
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
return new TranslogStats(totalOperations(), sizeInBytes());
|
||||
return new TranslogStats(totalOperations(), sizeInBytes(), uncommittedOperations(), uncommittedSizeInBytes());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -698,35 +752,36 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
public class View implements Closeable {
|
||||
|
||||
AtomicBoolean closed = new AtomicBoolean();
|
||||
final long minGeneration;
|
||||
final long viewGenToRelease;
|
||||
|
||||
View(long minGeneration) {
|
||||
this.minGeneration = minGeneration;
|
||||
}
|
||||
|
||||
/** this smallest translog generation in this view */
|
||||
public long minTranslogGeneration() {
|
||||
return minGeneration;
|
||||
View(long viewGenToRelease) {
|
||||
this.viewGenToRelease = viewGenToRelease;
|
||||
}
|
||||
|
||||
/**
|
||||
* The total number of operations in the view.
|
||||
* The total number of operations in the view files which contain an operation with a sequence number
|
||||
* above the given min sequence numbers. This will be the number of operations in snapshot taken
|
||||
* by calling {@link #snapshot(long)} with the same parameter.
|
||||
*/
|
||||
public int totalOperations() {
|
||||
return Translog.this.totalOperations(minGeneration);
|
||||
public int estimateTotalOperations(long minSequenceNumber) {
|
||||
return Translog.this.totalOperationsInGensAboveSeqNo(minSequenceNumber);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size in bytes of the files behind the view.
|
||||
* The total size of the view files which contain an operation with a sequence number
|
||||
* above the given min sequence numbers. These are the files that would need to be read by snapshot
|
||||
* acquired {@link #snapshot(long)} with the same parameter.
|
||||
*/
|
||||
public long sizeInBytes() {
|
||||
return Translog.this.sizeInBytes(minGeneration);
|
||||
public long estimateSizeInBytes(long minSequenceNumber) {
|
||||
return Translog.this.sizeOfGensAboveSeqNoInBytes(minSequenceNumber);
|
||||
}
|
||||
|
||||
/** create a snapshot from this view */
|
||||
public Snapshot snapshot() {
|
||||
/**
|
||||
* create a snapshot from this view, containing all
|
||||
* operations from the given sequence number and up (with potentially some more) */
|
||||
public Snapshot snapshot(long minSequenceNumber) {
|
||||
ensureOpen();
|
||||
return Translog.this.newSnapshot(minGeneration);
|
||||
return Translog.this.createSnapshotFromMinSeqNo(minSequenceNumber);
|
||||
}
|
||||
|
||||
void ensureOpen() {
|
||||
|
@ -738,8 +793,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
@Override
|
||||
public void close() throws IOException {
|
||||
if (closed.getAndSet(true) == false) {
|
||||
logger.trace("closing view starting at translog [{}]", minGeneration);
|
||||
deletionPolicy.releaseTranslogGenView(minGeneration);
|
||||
logger.trace("closing view starting at translog [{}]", viewGenToRelease);
|
||||
deletionPolicy.releaseTranslogGenView(viewGenToRelease);
|
||||
trimUnreferencedReaders();
|
||||
closeFilesIfNoPendingViews();
|
||||
}
|
||||
|
@ -1663,4 +1718,12 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
return translogUUID;
|
||||
}
|
||||
|
||||
|
||||
TranslogWriter getCurrent() {
|
||||
return current;
|
||||
}
|
||||
|
||||
List<TranslogReader> getReaders() {
|
||||
return readers;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,9 +69,8 @@ public class TranslogDeletionPolicy {
|
|||
* acquires the basis generation for a new view. Any translog generation above, and including, the returned generation
|
||||
* will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called.
|
||||
*/
|
||||
synchronized long acquireTranslogGenForView() {
|
||||
translogRefCounts.computeIfAbsent(minTranslogGenerationForRecovery, l -> Counter.newCounter(false)).addAndGet(1);
|
||||
return minTranslogGenerationForRecovery;
|
||||
synchronized void acquireTranslogGenForView(final long genForView) {
|
||||
translogRefCounts.computeIfAbsent(genForView, l -> Counter.newCounter(false)).addAndGet(1);
|
||||
}
|
||||
|
||||
/** returns the number of generations that were acquired for views */
|
||||
|
@ -80,7 +79,7 @@ public class TranslogDeletionPolicy {
|
|||
}
|
||||
|
||||
/**
|
||||
* releases a generation that was acquired by {@link #acquireTranslogGenForView()}
|
||||
* releases a generation that was acquired by {@link #acquireTranslogGenForView(long)}
|
||||
*/
|
||||
synchronized void releaseTranslogGenView(long translogGen) {
|
||||
Counter current = translogRefCounts.get(translogGen);
|
||||
|
@ -154,4 +153,9 @@ public class TranslogDeletionPolicy {
|
|||
public synchronized long getMinTranslogGenerationForRecovery() {
|
||||
return minTranslogGenerationForRecovery;
|
||||
}
|
||||
|
||||
synchronized long getViewCount(long viewGen) {
|
||||
final Counter counter = translogRefCounts.get(viewGen);
|
||||
return counter == null ? 0 : counter.get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,7 +99,7 @@ final class TranslogSnapshot extends BaseTranslogReader implements Translog.Snap
|
|||
return "TranslogSnapshot{" +
|
||||
"readOperations=" + readOperations +
|
||||
", position=" + position +
|
||||
", totalOperations=" + totalOperations +
|
||||
", estimateTotalOperations=" + totalOperations +
|
||||
", length=" + length +
|
||||
", reusableBuffer=" + reusableBuffer +
|
||||
'}';
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.index.translog;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.support.ToXContentToBytes;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -30,20 +31,29 @@ public class TranslogStats extends ToXContentToBytes implements Streamable {
|
|||
|
||||
private long translogSizeInBytes;
|
||||
private int numberOfOperations;
|
||||
private long uncommittedSizeInBytes;
|
||||
private int uncommittedOperations;
|
||||
|
||||
public TranslogStats() {
|
||||
}
|
||||
|
||||
public TranslogStats(int numberOfOperations, long translogSizeInBytes) {
|
||||
public TranslogStats(int numberOfOperations, long translogSizeInBytes, int uncommittedOperations, long uncommittedSizeInBytes) {
|
||||
if (numberOfOperations < 0) {
|
||||
throw new IllegalArgumentException("numberOfOperations must be >= 0");
|
||||
}
|
||||
if (translogSizeInBytes < 0) {
|
||||
throw new IllegalArgumentException("translogSizeInBytes must be >= 0");
|
||||
}
|
||||
assert translogSizeInBytes >= 0 : "translogSizeInBytes must be >= 0, got [" + translogSizeInBytes + "]";
|
||||
if (uncommittedOperations < 0) {
|
||||
throw new IllegalArgumentException("uncommittedOperations must be >= 0");
|
||||
}
|
||||
if (uncommittedSizeInBytes < 0) {
|
||||
throw new IllegalArgumentException("uncommittedSizeInBytes must be >= 0");
|
||||
}
|
||||
this.numberOfOperations = numberOfOperations;
|
||||
this.translogSizeInBytes = translogSizeInBytes;
|
||||
this.uncommittedSizeInBytes = uncommittedSizeInBytes;
|
||||
this.uncommittedOperations = uncommittedOperations;
|
||||
}
|
||||
|
||||
public void add(TranslogStats translogStats) {
|
||||
|
@ -53,41 +63,59 @@ public class TranslogStats extends ToXContentToBytes implements Streamable {
|
|||
|
||||
this.numberOfOperations += translogStats.numberOfOperations;
|
||||
this.translogSizeInBytes += translogStats.translogSizeInBytes;
|
||||
this.uncommittedOperations += translogStats.uncommittedOperations;
|
||||
this.uncommittedSizeInBytes += translogStats.uncommittedSizeInBytes;
|
||||
}
|
||||
|
||||
public long getTranslogSizeInBytes() {
|
||||
return translogSizeInBytes;
|
||||
}
|
||||
|
||||
public long estimatedNumberOfOperations() {
|
||||
public int estimatedNumberOfOperations() {
|
||||
return numberOfOperations;
|
||||
}
|
||||
|
||||
/** the size of the generations in the translog that weren't yet to comitted to lucene */
|
||||
public long getUncommittedSizeInBytes() {
|
||||
return uncommittedSizeInBytes;
|
||||
}
|
||||
|
||||
/** the number of operations in generations of the translog that weren't yet to comitted to lucene */
|
||||
public int getUncommittedOperations() {
|
||||
return uncommittedOperations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(Fields.TRANSLOG);
|
||||
builder.field(Fields.OPERATIONS, numberOfOperations);
|
||||
builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, translogSizeInBytes);
|
||||
builder.startObject("translog");
|
||||
builder.field("operations", numberOfOperations);
|
||||
builder.byteSizeField("size_in_bytes", "size", translogSizeInBytes);
|
||||
builder.field("uncommitted_operations", uncommittedOperations);
|
||||
builder.byteSizeField("uncommitted_size_in_bytes", "uncommitted_size", uncommittedSizeInBytes);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final String TRANSLOG = "translog";
|
||||
static final String OPERATIONS = "operations";
|
||||
static final String SIZE = "size";
|
||||
static final String SIZE_IN_BYTES = "size_in_bytes";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
numberOfOperations = in.readVInt();
|
||||
translogSizeInBytes = in.readVLong();
|
||||
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha3)) {
|
||||
uncommittedOperations = in.readVInt();
|
||||
uncommittedSizeInBytes = in.readVLong();
|
||||
} else {
|
||||
uncommittedOperations = numberOfOperations;
|
||||
uncommittedSizeInBytes = translogSizeInBytes;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(numberOfOperations);
|
||||
out.writeVLong(translogSizeInBytes);
|
||||
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha3)) {
|
||||
out.writeVInt(uncommittedOperations);
|
||||
out.writeVLong(uncommittedSizeInBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -255,8 +255,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
}
|
||||
|
||||
@Override
|
||||
Checkpoint getCheckpoint() {
|
||||
return getLastSyncedCheckpoint();
|
||||
synchronized Checkpoint getCheckpoint() {
|
||||
return new Checkpoint(totalOffset, operationCounter, generation, minSeqNo, maxSeqNo,
|
||||
globalCheckpointSupplier.getAsLong(), minTranslogGenerationSupplier.getAsLong());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -329,22 +330,12 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
|
||||
// double checked locking - we don't want to fsync unless we have to and now that we have
|
||||
// the lock we should check again since if this code is busy we might have fsynced enough already
|
||||
final long offsetToSync;
|
||||
final int opsCounter;
|
||||
final long currentMinSeqNo;
|
||||
final long currentMaxSeqNo;
|
||||
final long currentGlobalCheckpoint;
|
||||
final long currentMinTranslogGeneration;
|
||||
final Checkpoint checkpointToSync;
|
||||
synchronized (this) {
|
||||
ensureOpen();
|
||||
try {
|
||||
outputStream.flush();
|
||||
offsetToSync = totalOffset;
|
||||
opsCounter = operationCounter;
|
||||
currentMinSeqNo = minSeqNo;
|
||||
currentMaxSeqNo = maxSeqNo;
|
||||
currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
|
||||
currentMinTranslogGeneration = minTranslogGenerationSupplier.getAsLong();
|
||||
checkpointToSync = getCheckpoint();
|
||||
} catch (Exception ex) {
|
||||
try {
|
||||
closeWithTragicEvent(ex);
|
||||
|
@ -356,12 +347,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
}
|
||||
// now do the actual fsync outside of the synchronized block such that
|
||||
// we can continue writing to the buffer etc.
|
||||
final Checkpoint checkpoint;
|
||||
try {
|
||||
channel.force(false);
|
||||
checkpoint =
|
||||
writeCheckpoint(channelFactory, offsetToSync, opsCounter, currentMinSeqNo, currentMaxSeqNo,
|
||||
currentGlobalCheckpoint, currentMinTranslogGeneration, path.getParent(), generation);
|
||||
writeCheckpoint(channelFactory, path.getParent(), checkpointToSync);
|
||||
} catch (Exception ex) {
|
||||
try {
|
||||
closeWithTragicEvent(ex);
|
||||
|
@ -370,9 +358,9 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
|
|||
}
|
||||
throw ex;
|
||||
}
|
||||
assert lastSyncedCheckpoint.offset <= offsetToSync :
|
||||
"illegal state: " + lastSyncedCheckpoint.offset + " <= " + offsetToSync;
|
||||
lastSyncedCheckpoint = checkpoint; // write protected by syncLock
|
||||
assert lastSyncedCheckpoint.offset <= checkpointToSync.offset :
|
||||
"illegal state: " + lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset;
|
||||
lastSyncedCheckpoint = checkpointToSync; // write protected by syncLock
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,6 +58,7 @@ import java.io.BufferedOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
|
@ -128,13 +129,14 @@ public class RecoverySourceHandler {
|
|||
*/
|
||||
public RecoveryResponse recoverToTarget() throws IOException {
|
||||
try (Translog.View translogView = shard.acquireTranslogView()) {
|
||||
logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
|
||||
|
||||
final long startingSeqNo;
|
||||
boolean isSequenceNumberBasedRecoveryPossible = request.startingSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO &&
|
||||
isTranslogReadyForSequenceNumberBasedRecovery(translogView);
|
||||
|
||||
if (isSequenceNumberBasedRecoveryPossible) {
|
||||
logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());
|
||||
startingSeqNo = request.startingSeqNo();
|
||||
} else {
|
||||
final Engine.IndexCommitRef phase1Snapshot;
|
||||
try {
|
||||
|
@ -143,8 +145,12 @@ public class RecoverySourceHandler {
|
|||
IOUtils.closeWhileHandlingException(translogView);
|
||||
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
|
||||
}
|
||||
// we set this to unassigned to create a translog roughly according to the retention policy
|
||||
// on the target
|
||||
startingSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||
|
||||
try {
|
||||
phase1(phase1Snapshot.getIndexCommit(), translogView);
|
||||
phase1(phase1Snapshot.getIndexCommit(), translogView, startingSeqNo);
|
||||
} catch (final Exception e) {
|
||||
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
|
||||
} finally {
|
||||
|
@ -157,7 +163,7 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
|
||||
try {
|
||||
prepareTargetForTranslog(translogView.totalOperations());
|
||||
prepareTargetForTranslog(translogView.estimateTotalOperations(startingSeqNo));
|
||||
} catch (final Exception e) {
|
||||
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
|
||||
}
|
||||
|
@ -180,12 +186,10 @@ public class RecoverySourceHandler {
|
|||
throw new IndexShardRelocatedException(request.shardId());
|
||||
}
|
||||
|
||||
logger.trace("snapshot translog for recovery; current size is [{}]", translogView.totalOperations());
|
||||
logger.trace("snapshot translog for recovery; current size is [{}]", translogView.estimateTotalOperations(startingSeqNo));
|
||||
final long targetLocalCheckpoint;
|
||||
try {
|
||||
final long startingSeqNo =
|
||||
isSequenceNumberBasedRecoveryPossible ? request.startingSeqNo() : SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||
targetLocalCheckpoint = phase2(startingSeqNo, translogView.snapshot());
|
||||
targetLocalCheckpoint = phase2(startingSeqNo, translogView.snapshot(startingSeqNo));
|
||||
} catch (Exception e) {
|
||||
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
|
||||
}
|
||||
|
@ -219,7 +223,7 @@ public class RecoverySourceHandler {
|
|||
logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo);
|
||||
|
||||
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1);
|
||||
final Translog.Snapshot snapshot = translogView.snapshot();
|
||||
final Translog.Snapshot snapshot = translogView.snapshot(startingSeqNo);
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
if (operation.seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
|
||||
|
@ -244,7 +248,7 @@ public class RecoverySourceHandler {
|
|||
* segments that are missing. Only segments that have the same size and
|
||||
* checksum can be reused
|
||||
*/
|
||||
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
|
||||
public void phase1(final IndexCommit snapshot, final Translog.View translogView, final long startSeqNo) {
|
||||
cancellableThreads.checkForCancel();
|
||||
// Total size of segment files that are recovered
|
||||
long totalSize = 0;
|
||||
|
@ -322,10 +326,10 @@ public class RecoverySourceHandler {
|
|||
new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
|
||||
cancellableThreads.execute(() ->
|
||||
recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames,
|
||||
response.phase1ExistingFileSizes, translogView.totalOperations()));
|
||||
response.phase1ExistingFileSizes, translogView.estimateTotalOperations(startSeqNo)));
|
||||
// How many bytes we've copied since we last called RateLimiter.pause
|
||||
final Function<StoreFileMetaData, OutputStream> outputStreamFactories =
|
||||
md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView), chunkSizeInBytes);
|
||||
md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView, startSeqNo), chunkSizeInBytes);
|
||||
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
|
||||
// Send the CLEAN_FILES request, which takes all of the files that
|
||||
// were transferred and renames them from their temporary file
|
||||
|
@ -336,7 +340,8 @@ public class RecoverySourceHandler {
|
|||
// related to this recovery (out of date segments, for example)
|
||||
// are deleted
|
||||
try {
|
||||
cancellableThreads.executeIO(() -> recoveryTarget.cleanFiles(translogView.totalOperations(), recoverySourceMetadata));
|
||||
cancellableThreads.executeIO(() ->
|
||||
recoveryTarget.cleanFiles(translogView.estimateTotalOperations(startSeqNo), recoverySourceMetadata));
|
||||
} catch (RemoteTransportException | IOException targetException) {
|
||||
final IOException corruptIndexException;
|
||||
// we realized that after the index was copied and we wanted to finalize the recovery
|
||||
|
@ -347,11 +352,8 @@ public class RecoverySourceHandler {
|
|||
try {
|
||||
final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
|
||||
StoreFileMetaData[] metadata =
|
||||
StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new
|
||||
StoreFileMetaData[size]);
|
||||
ArrayUtil.timSort(metadata, (o1, o2) -> {
|
||||
return Long.compare(o1.length(), o2.length()); // check small files first
|
||||
});
|
||||
StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(StoreFileMetaData[]::new);
|
||||
ArrayUtil.timSort(metadata, Comparator.comparingLong(StoreFileMetaData::length)); // check small files first
|
||||
for (StoreFileMetaData md : metadata) {
|
||||
cancellableThreads.checkForCancel();
|
||||
logger.debug("checking integrity for file {} after remove corruption exception", md);
|
||||
|
@ -577,11 +579,13 @@ public class RecoverySourceHandler {
|
|||
final class RecoveryOutputStream extends OutputStream {
|
||||
private final StoreFileMetaData md;
|
||||
private final Translog.View translogView;
|
||||
private final long startSeqNp;
|
||||
private long position = 0;
|
||||
|
||||
RecoveryOutputStream(StoreFileMetaData md, Translog.View translogView) {
|
||||
RecoveryOutputStream(StoreFileMetaData md, Translog.View translogView, long startSeqNp) {
|
||||
this.md = md;
|
||||
this.translogView = translogView;
|
||||
this.startSeqNp = startSeqNp;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -599,7 +603,7 @@ public class RecoverySourceHandler {
|
|||
private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
|
||||
// Actually send the file chunk to the target node, waiting for it to complete
|
||||
cancellableThreads.executeIO(() ->
|
||||
recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogView.totalOperations())
|
||||
recoveryTarget.writeFileChunk(md, position, content, lastChunk, translogView.estimateTotalOperations(startSeqNp))
|
||||
);
|
||||
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
|
||||
throw new IndexShardClosedException(request.shardId());
|
||||
|
@ -610,7 +614,7 @@ public class RecoverySourceHandler {
|
|||
void sendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) throws Exception {
|
||||
store.incRef();
|
||||
try {
|
||||
ArrayUtil.timSort(files, (a, b) -> Long.compare(a.length(), b.length())); // send smallest first
|
||||
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetaData::length)); // send smallest first
|
||||
for (int i = 0; i < files.length; i++) {
|
||||
final StoreFileMetaData md = files[i];
|
||||
try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
|
||||
|
|
|
@ -62,8 +62,8 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of
|
||||
|
@ -397,6 +397,8 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
|
|||
// update stats only after all operations completed (to ensure that mapping updates don't mess with stats)
|
||||
translog.incrementRecoveredOperations(operations.size());
|
||||
indexShard().sync();
|
||||
// roll over / flush / trim if needed
|
||||
indexShard().afterWriteOperation();
|
||||
return indexShard().getLocalCheckpoint();
|
||||
}
|
||||
|
||||
|
|
|
@ -28,10 +28,8 @@ import org.elasticsearch.index.store.Store;
|
|||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.transport.EmptyTransportResponseHandler;
|
||||
import org.elasticsearch.transport.FutureTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportFuture;
|
||||
import org.elasticsearch.transport.TransportRequestOptions;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -159,13 +157,13 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
|
|||
}
|
||||
|
||||
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FILE_CHUNK,
|
||||
new RecoveryFileChunkRequest(recoveryId, shardId, fileMetaData, position, content, lastChunk,
|
||||
totalTranslogOps,
|
||||
/* we send totalOperations with every request since we collect stats on the target and that way we can
|
||||
* see how many translog ops we accumulate while copying files across the network. A future optimization
|
||||
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
|
||||
*/
|
||||
throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
new RecoveryFileChunkRequest(recoveryId, shardId, fileMetaData, position, content, lastChunk,
|
||||
totalTranslogOps,
|
||||
/* we send estimateTotalOperations with every request since we collect stats on the target and that way we can
|
||||
* see how many translog ops we accumulate while copying files across the network. A future optimization
|
||||
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
|
||||
*/
|
||||
throttleTimeInNanos), fileChunkRequestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -428,6 +429,10 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
// prevent a sequence-number-based recovery from being possible
|
||||
client(primaryNode).admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
|
||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
|
||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
|
||||
).get();
|
||||
client(primaryNode).admin().indices().prepareFlush("test").setForce(true).get();
|
||||
return super.onNodeStopped(nodeName);
|
||||
}
|
||||
|
|
|
@ -127,7 +127,6 @@ import org.elasticsearch.index.store.DirectoryUtils;
|
|||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.TranslogConfig;
|
||||
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||
import org.elasticsearch.test.DummyShardLock;
|
||||
|
@ -866,14 +865,14 @@ public class InternalEngineTests extends ESTestCase {
|
|||
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
|
||||
@Override
|
||||
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
|
||||
assertThat(getTranslog().totalOperations(), equalTo(docs));
|
||||
assertThat(getTranslog().uncommittedOperations(), equalTo(docs));
|
||||
final CommitId commitId = super.flush(force, waitIfOngoing);
|
||||
flushed.set(true);
|
||||
return commitId;
|
||||
}
|
||||
};
|
||||
|
||||
assertThat(recoveringEngine.getTranslog().totalOperations(), equalTo(docs));
|
||||
assertThat(recoveringEngine.getTranslog().uncommittedOperations(), equalTo(docs));
|
||||
recoveringEngine.recoverFromTranslog();
|
||||
assertTrue(flushed.get());
|
||||
} finally {
|
||||
|
@ -2491,10 +2490,19 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testTranslogCleanUpPostCommitCrash() throws Exception {
|
||||
IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), defaultSettings.getNodeSettings(),
|
||||
defaultSettings.getScopedSettings());
|
||||
IndexMetaData.Builder builder = IndexMetaData.builder(indexSettings.getIndexMetaData());
|
||||
builder.settings(Settings.builder().put(indexSettings.getSettings())
|
||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
|
||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
|
||||
);
|
||||
indexSettings.updateIndexMetaData(builder.build());
|
||||
|
||||
try (Store store = createStore()) {
|
||||
AtomicBoolean throwErrorOnCommit = new AtomicBoolean();
|
||||
final Path translogPath = createTempDir();
|
||||
try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null)) {
|
||||
try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null)) {
|
||||
@Override
|
||||
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
|
||||
super.commitIndexWriter(writer, translog, syncId);
|
||||
|
@ -2509,7 +2517,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
FlushFailedEngineException e = expectThrows(FlushFailedEngineException.class, engine::flush);
|
||||
assertThat(e.getCause().getMessage(), equalTo("power's out"));
|
||||
}
|
||||
try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null))) {
|
||||
try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null))) {
|
||||
engine.recoverFromTranslog();
|
||||
assertVisibleCount(engine, 1);
|
||||
final long committedGen = Long.valueOf(
|
||||
|
@ -2933,7 +2941,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
|
||||
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
|
||||
assertEquals(1, engine.getTranslog().currentFileGeneration());
|
||||
assertEquals(0L, engine.getTranslog().totalOperations());
|
||||
assertEquals(0L, engine.getTranslog().uncommittedOperations());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3850,7 +3858,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
System.nanoTime(),
|
||||
reason));
|
||||
assertThat(noOpEngine.seqNoService().getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1)));
|
||||
assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1 + gapsFilled));
|
||||
assertThat(noOpEngine.getTranslog().uncommittedOperations(), equalTo(1 + gapsFilled));
|
||||
// skip to the op that we added to the translog
|
||||
Translog.Operation op;
|
||||
Translog.Operation last = null;
|
||||
|
@ -3996,7 +4004,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo());
|
||||
assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint());
|
||||
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
|
||||
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().totalOperations());
|
||||
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations());
|
||||
recoveringEngine.recoverFromTranslog();
|
||||
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
|
||||
assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
|
||||
|
@ -4027,7 +4035,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
try {
|
||||
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
|
||||
if (flushed) {
|
||||
assertEquals(0, recoveringEngine.getTranslog().totalOperations());
|
||||
assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations());
|
||||
}
|
||||
recoveringEngine.recoverFromTranslog();
|
||||
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
|
||||
|
|
|
@ -18,17 +18,15 @@
|
|||
*/
|
||||
package org.elasticsearch.index.replication;
|
||||
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexableField;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
|
@ -44,7 +42,6 @@ import org.elasticsearch.index.shard.IndexShardTests;
|
|||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.hamcrest.Matcher;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -272,9 +269,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class));
|
||||
shards.assertAllEqual(0);
|
||||
for (IndexShard indexShard : shards) {
|
||||
try(Translog.View view = indexShard.acquireTranslogView()) {
|
||||
assertThat(view.totalOperations(), equalTo(0));
|
||||
}
|
||||
assertThat(indexShard.routingEntry() + " has the wrong number of ops in the translog",
|
||||
indexShard.translogStats().estimatedNumberOfOperations(), equalTo(0));
|
||||
}
|
||||
|
||||
// add some replicas
|
||||
|
@ -292,9 +288,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class));
|
||||
shards.assertAllEqual(0);
|
||||
for (IndexShard indexShard : shards) {
|
||||
try(Translog.View view = indexShard.acquireTranslogView()) {
|
||||
assertThat(view.totalOperations(), equalTo(0));
|
||||
}
|
||||
assertThat(indexShard.routingEntry() + " has the wrong number of ops in the translog",
|
||||
indexShard.translogStats().estimatedNumberOfOperations(), equalTo(0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -327,8 +322,8 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
String failureMessage) throws IOException {
|
||||
for (IndexShard indexShard : replicationGroup) {
|
||||
try(Translog.View view = indexShard.acquireTranslogView()) {
|
||||
assertThat(view.totalOperations(), equalTo(expectedOperation));
|
||||
final Translog.Snapshot snapshot = view.snapshot();
|
||||
assertThat(view.estimateTotalOperations(SequenceNumbersService.NO_OPS_PERFORMED), equalTo(expectedOperation));
|
||||
final Translog.Snapshot snapshot = view.snapshot(SequenceNumbersService.NO_OPS_PERFORMED);
|
||||
long expectedSeqNo = 0L;
|
||||
Translog.Operation op = snapshot.next();
|
||||
do {
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
|
@ -115,18 +114,25 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
docs += missingOnReplica;
|
||||
replicaHasDocsSinceLastFlushedCheckpoint |= missingOnReplica > 0;
|
||||
|
||||
final boolean flushPrimary = randomBoolean();
|
||||
if (flushPrimary) {
|
||||
final boolean translogTrimmed;
|
||||
if (randomBoolean()) {
|
||||
shards.flush();
|
||||
translogTrimmed = randomBoolean();
|
||||
if (translogTrimmed) {
|
||||
final Translog translog = shards.getPrimary().getTranslog();
|
||||
translog.getDeletionPolicy().setRetentionAgeInMillis(0);
|
||||
translog.trimUnreferencedReaders();
|
||||
}
|
||||
} else {
|
||||
translogTrimmed = false;
|
||||
}
|
||||
|
||||
originalReplica.close("disconnected", false);
|
||||
IOUtils.close(originalReplica.store());
|
||||
final IndexShard recoveredReplica =
|
||||
shards.addReplicaWithExistingPath(originalReplica.shardPath(), originalReplica.routingEntry().currentNodeId());
|
||||
shards.recoverReplica(recoveredReplica);
|
||||
if (flushPrimary && replicaHasDocsSinceLastFlushedCheckpoint) {
|
||||
// replica has something to catch up with, but since we flushed the primary, we should fall back to full recovery
|
||||
if (translogTrimmed && replicaHasDocsSinceLastFlushedCheckpoint) {
|
||||
// replica has something to catch up with, but since we trimmed the primary translog, we should fall back to full recovery
|
||||
assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), not(empty()));
|
||||
} else {
|
||||
assertThat(recoveredReplica.recoveryState().getIndex().fileDetails(), empty());
|
||||
|
@ -179,6 +185,10 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
// index some more
|
||||
totalDocs += shards.indexDocs(randomIntBetween(0, 5));
|
||||
|
||||
if (randomBoolean()) {
|
||||
shards.flush();
|
||||
}
|
||||
|
||||
oldPrimary.close("demoted", false);
|
||||
oldPrimary.store().close();
|
||||
|
||||
|
@ -190,9 +200,10 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
|
||||
} else {
|
||||
assertThat(newReplica.recoveryState().getIndex().fileDetails(), not(empty()));
|
||||
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs - committedDocs));
|
||||
assertThat(newReplica.recoveryState().getTranslog().recoveredOperations(), equalTo(totalDocs));
|
||||
}
|
||||
|
||||
// roll back the extra ops in the replica
|
||||
shards.removeReplica(replica);
|
||||
replica.close("resync", false);
|
||||
replica.store().close();
|
||||
|
@ -444,7 +455,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
}
|
||||
}
|
||||
|
||||
private static class BlockingTarget extends RecoveryTarget {
|
||||
public static class BlockingTarget extends RecoveryTarget {
|
||||
|
||||
private final CountDownLatch recoveryBlocked;
|
||||
private final CountDownLatch releaseRecovery;
|
||||
|
@ -453,8 +464,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
EnumSet.of(RecoveryState.Stage.INDEX, RecoveryState.Stage.TRANSLOG, RecoveryState.Stage.FINALIZE);
|
||||
private final Logger logger;
|
||||
|
||||
BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery, IndexShard shard,
|
||||
DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, Logger logger) {
|
||||
public BlockingTarget(RecoveryState.Stage stageToBlock, CountDownLatch recoveryBlocked, CountDownLatch releaseRecovery,
|
||||
IndexShard shard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener,
|
||||
Logger logger) {
|
||||
super(shard, sourceNode, listener, version -> {});
|
||||
this.recoveryBlocked = recoveryBlocked;
|
||||
this.releaseRecovery = releaseRecovery;
|
||||
|
|
|
@ -45,16 +45,9 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
|
|||
|
||||
private LocalCheckpointTracker tracker;
|
||||
|
||||
private final int SMALL_CHUNK_SIZE = 4;
|
||||
private static final int SMALL_CHUNK_SIZE = 4;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
tracker = getTracker();
|
||||
}
|
||||
|
||||
private LocalCheckpointTracker getTracker() {
|
||||
public static LocalCheckpointTracker createEmptyTracker() {
|
||||
return new LocalCheckpointTracker(
|
||||
IndexSettingsModule.newIndexSettings(
|
||||
"test",
|
||||
|
@ -67,6 +60,13 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
tracker = createEmptyTracker();
|
||||
}
|
||||
|
||||
public void testSimplePrimary() {
|
||||
long seqNo1, seqNo2;
|
||||
assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
|
||||
|
@ -236,5 +236,4 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
|
|||
|
||||
thread.join();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -349,29 +349,30 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||
SourceToParse.source("test", "test", "1", new BytesArray("{}"), XContentType.JSON),
|
||||
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, update -> {});
|
||||
assertTrue(shard.shouldFlush());
|
||||
assertEquals(2, shard.getEngine().getTranslog().totalOperations());
|
||||
final Translog translog = shard.getEngine().getTranslog();
|
||||
assertEquals(2, translog.uncommittedOperations());
|
||||
client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON)
|
||||
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
|
||||
assertBusy(() -> { // this is async
|
||||
assertFalse(shard.shouldFlush());
|
||||
});
|
||||
assertEquals(0, shard.getEngine().getTranslog().totalOperations());
|
||||
shard.getEngine().getTranslog().sync();
|
||||
long size = shard.getEngine().getTranslog().sizeInBytes();
|
||||
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(),
|
||||
shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
|
||||
assertEquals(0, translog.uncommittedOperations());
|
||||
translog.sync();
|
||||
long size = translog.uncommittedSizeInBytes();
|
||||
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
|
||||
translog.uncommittedOperations(), translog.getGeneration());
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(
|
||||
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES))
|
||||
.build()).get();
|
||||
client().prepareDelete("test", "test", "2").get();
|
||||
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(),
|
||||
shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
|
||||
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
|
||||
translog.uncommittedOperations(), translog.getGeneration());
|
||||
assertBusy(() -> { // this is async
|
||||
logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(),
|
||||
shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
|
||||
logger.info("--> translog size on iter : [{}] num_ops [{}] generation [{}]", translog.uncommittedSizeInBytes(),
|
||||
translog.uncommittedOperations(), translog.getGeneration());
|
||||
assertFalse(shard.shouldFlush());
|
||||
});
|
||||
assertEquals(0, shard.getEngine().getTranslog().totalOperations());
|
||||
assertEquals(0, translog.uncommittedOperations());
|
||||
}
|
||||
|
||||
public void testMaybeRollTranslogGeneration() throws Exception {
|
||||
|
|
|
@ -75,7 +75,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
|
|||
if (syncNeeded) {
|
||||
assertTrue("Sync action was not called", syncActionCalled.get());
|
||||
}
|
||||
assertEquals(numDocs, fut.get().getTotalOperations());
|
||||
assertEquals(globalCheckPoint == numDocs - 1 ? 0 : numDocs, fut.get().getTotalOperations());
|
||||
if (syncNeeded) {
|
||||
long skippedOps = globalCheckPoint + 1; // everything up to global checkpoint included
|
||||
assertEquals(skippedOps, fut.get().getSkippedOperations());
|
||||
|
|
|
@ -126,20 +126,17 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
|
|||
selectedReader = randomIntBetween(0, allGens.size() - 1);
|
||||
final long selectedGenerationBySize = allGens.get(selectedReader).generation;
|
||||
long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get();
|
||||
selectedReader = randomIntBetween(0, allGens.size() - 1);
|
||||
long committedGen = allGens.get(selectedReader).generation;
|
||||
deletionPolicy.setRetentionAgeInMillis(maxAge);
|
||||
deletionPolicy.setRetentionSizeInBytes(size);
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.max(selectedGenerationByAge, selectedGenerationBySize));
|
||||
// make a new policy as committed gen can't go backwards (for now)
|
||||
deletionPolicy = new MockDeletionPolicy(now, size, maxAge);
|
||||
long committedGen = randomFrom(allGens).generation;
|
||||
deletionPolicy.setMinTranslogGenerationForRecovery(committedGen);
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter,
|
||||
Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize)));
|
||||
long viewGen = deletionPolicy.acquireTranslogGenForView();
|
||||
selectedReader = randomIntBetween(selectedReader, allGens.size() - 1);
|
||||
committedGen = allGens.get(selectedReader).generation;
|
||||
deletionPolicy.setMinTranslogGenerationForRecovery(committedGen);
|
||||
long viewGen = randomFrom(allGens).generation;
|
||||
deletionPolicy.acquireTranslogGenForView(viewGen);
|
||||
assertMinGenRequired(deletionPolicy, readersAndWriter,
|
||||
Math.min(
|
||||
Math.min(committedGen, viewGen),
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.elasticsearch.common.collect.Tuple;
|
|||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
@ -63,6 +62,8 @@ import org.elasticsearch.index.mapper.ParsedDocument;
|
|||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
|
||||
import org.elasticsearch.index.seqno.LocalCheckpointTrackerTests;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog.Location;
|
||||
|
@ -158,7 +159,10 @@ public class TranslogTests extends ESTestCase {
|
|||
private void commit(Translog translog, long genToCommit) throws IOException {
|
||||
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
|
||||
deletionPolicy.setMinTranslogGenerationForRecovery(genToCommit);
|
||||
long minGenRequired = deletionPolicy.minTranslogGenRequired(translog.getReaders(), translog.getCurrent());
|
||||
translog.trimUnreferencedReaders();
|
||||
assertThat(minGenRequired, equalTo(translog.getMinFileGeneration()));
|
||||
assertFilePresences(translog);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -190,9 +194,12 @@ public class TranslogTests extends ESTestCase {
|
|||
|
||||
private TranslogConfig getTranslogConfig(final Path path) {
|
||||
final Settings settings = Settings
|
||||
.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
|
||||
.build();
|
||||
.builder()
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
|
||||
// only randomize between nog age retention and a long one, so failures will have a chance of reproducing
|
||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomBoolean() ? "-1ms" : "1h")
|
||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomIntBetween(-1, 2048) + "b")
|
||||
.build();
|
||||
return getTranslogConfig(path, settings);
|
||||
}
|
||||
|
||||
|
@ -317,7 +324,7 @@ public class TranslogTests extends ESTestCase {
|
|||
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
|
||||
|
||||
markCurrentGenAsCommitted(translog);
|
||||
snapshot = translog.newSnapshot();
|
||||
snapshot = translog.newSnapshot(firstId + 1);
|
||||
assertThat(snapshot, SnapshotMatchers.size(0));
|
||||
assertThat(snapshot.totalOperations(), equalTo(0));
|
||||
}
|
||||
|
@ -337,49 +344,60 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testStats() throws IOException {
|
||||
// self control cleaning for test
|
||||
translog.getDeletionPolicy().setRetentionSizeInBytes(1024 * 1024);
|
||||
translog.getDeletionPolicy().setRetentionAgeInMillis(3600 * 1000);
|
||||
final long firstOperationPosition = translog.getFirstOperationPosition();
|
||||
{
|
||||
final TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(0L));
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(0));
|
||||
}
|
||||
assertThat((int) firstOperationPosition, greaterThan(CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC)));
|
||||
translog.add(new Translog.Index("test", "1", 0, new byte[]{1}));
|
||||
|
||||
{
|
||||
final TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(1L));
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(1));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(97L));
|
||||
assertThat(stats.getUncommittedOperations(), equalTo(1));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(97L));
|
||||
}
|
||||
|
||||
translog.add(new Translog.Delete("test", "2", 1, newUid("2")));
|
||||
{
|
||||
final TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(2L));
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(2));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(146L));
|
||||
assertThat(stats.getUncommittedOperations(), equalTo(2));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(146L));
|
||||
}
|
||||
|
||||
translog.add(new Translog.Delete("test", "3", 2, newUid("3")));
|
||||
{
|
||||
final TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(3L));
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(3));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(195L));
|
||||
assertThat(stats.getUncommittedOperations(), equalTo(3));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(195L));
|
||||
}
|
||||
|
||||
translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16)));
|
||||
{
|
||||
final TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(4L));
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(4));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(237L));
|
||||
assertThat(stats.getUncommittedOperations(), equalTo(4));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(237L));
|
||||
}
|
||||
|
||||
final long expectedSizeInBytes = 280L;
|
||||
translog.rollGeneration();
|
||||
{
|
||||
final TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(4L));
|
||||
assertThat(
|
||||
stats.getTranslogSizeInBytes(),
|
||||
equalTo(expectedSizeInBytes));
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(4));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes));
|
||||
assertThat(stats.getUncommittedOperations(), equalTo(4));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(expectedSizeInBytes));
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -389,22 +407,25 @@ public class TranslogTests extends ESTestCase {
|
|||
final TranslogStats copy = new TranslogStats();
|
||||
copy.readFrom(out.bytes().streamInput());
|
||||
|
||||
assertThat(copy.estimatedNumberOfOperations(), equalTo(4L));
|
||||
assertThat(copy.estimatedNumberOfOperations(), equalTo(4));
|
||||
assertThat(copy.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes));
|
||||
|
||||
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
|
||||
builder.startObject();
|
||||
copy.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
builder.endObject();
|
||||
assertThat(builder.string(), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + expectedSizeInBytes + "}}"));
|
||||
assertThat(builder.string(), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + expectedSizeInBytes
|
||||
+ ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + expectedSizeInBytes + "}}"));
|
||||
}
|
||||
}
|
||||
|
||||
markCurrentGenAsCommitted(translog);
|
||||
{
|
||||
final TranslogStats stats = stats();
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(0L));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(firstOperationPosition));
|
||||
assertThat(stats.estimatedNumberOfOperations(), equalTo(4));
|
||||
assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes));
|
||||
assertThat(stats.getUncommittedOperations(), equalTo(0));
|
||||
assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -413,27 +434,38 @@ public class TranslogTests extends ESTestCase {
|
|||
final int n = randomIntBetween(0, 16);
|
||||
final List<TranslogStats> statsList = new ArrayList<>(n);
|
||||
for (int i = 0; i < n; i++) {
|
||||
final TranslogStats stats = new TranslogStats(randomIntBetween(1, 4096), randomIntBetween(1, 1 << 20));
|
||||
final TranslogStats stats = new TranslogStats(randomIntBetween(1, 4096), randomIntBetween(1, 1 << 20),
|
||||
randomIntBetween(1, 1 << 20), randomIntBetween(1, 4096));
|
||||
statsList.add(stats);
|
||||
total.add(stats);
|
||||
}
|
||||
|
||||
assertThat(
|
||||
total.estimatedNumberOfOperations(),
|
||||
equalTo(statsList.stream().mapToLong(TranslogStats::estimatedNumberOfOperations).sum()));
|
||||
equalTo(statsList.stream().mapToInt(TranslogStats::estimatedNumberOfOperations).sum()));
|
||||
assertThat(
|
||||
total.getTranslogSizeInBytes(),
|
||||
equalTo(statsList.stream().mapToLong(TranslogStats::getTranslogSizeInBytes).sum()));
|
||||
assertThat(
|
||||
total.getUncommittedOperations(),
|
||||
equalTo(statsList.stream().mapToInt(TranslogStats::getUncommittedOperations).sum()));
|
||||
assertThat(
|
||||
total.getUncommittedSizeInBytes(),
|
||||
equalTo(statsList.stream().mapToLong(TranslogStats::getUncommittedSizeInBytes).sum()));
|
||||
}
|
||||
|
||||
public void testNegativeNumberOfOperations() {
|
||||
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(-1, 1));
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(-1, 1, 1, 1));
|
||||
assertThat(e, hasToString(containsString("numberOfOperations must be >= 0")));
|
||||
e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, -1, 1));
|
||||
assertThat(e, hasToString(containsString("uncommittedOperations must be >= 0")));
|
||||
}
|
||||
|
||||
public void testNegativeSizeInBytes() {
|
||||
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, -1));
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, -1, 1, 1));
|
||||
assertThat(e, hasToString(containsString("translogSizeInBytes must be >= 0")));
|
||||
e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, 1, -1));
|
||||
assertThat(e, hasToString(containsString("uncommittedSizeInBytes must be >= 0")));
|
||||
}
|
||||
|
||||
public void testSnapshot() throws IOException {
|
||||
|
@ -719,7 +751,9 @@ public class TranslogTests extends ESTestCase {
|
|||
final AtomicBoolean run = new AtomicBoolean(true);
|
||||
|
||||
final Object flushMutex = new Object();
|
||||
|
||||
final AtomicLong lastCommittedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
|
||||
final LocalCheckpointTracker tracker = LocalCheckpointTrackerTests.createEmptyTracker();
|
||||
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
|
||||
// any errors on threads
|
||||
final List<Exception> errors = new CopyOnWriteArrayList<>();
|
||||
logger.debug("using [{}] readers. [{}] writers. flushing every ~[{}] ops.", readers.length, writers.length, flushEveryOps);
|
||||
|
@ -732,7 +766,7 @@ public class TranslogTests extends ESTestCase {
|
|||
barrier.await();
|
||||
int counter = 0;
|
||||
while (run.get() && idGenerator.get() < maxOps) {
|
||||
long id = idGenerator.incrementAndGet();
|
||||
long id = idGenerator.getAndIncrement();
|
||||
final Translog.Operation op;
|
||||
final Translog.Operation.Type type =
|
||||
Translog.Operation.Type.values()[((int) (id % Translog.Operation.Type.values().length))];
|
||||
|
@ -751,6 +785,7 @@ public class TranslogTests extends ESTestCase {
|
|||
throw new AssertionError("unsupported operation type [" + type + "]");
|
||||
}
|
||||
Translog.Location location = translog.add(op);
|
||||
tracker.markSeqNoAsCompleted(id);
|
||||
Translog.Location existing = writtenOps.put(op, location);
|
||||
if (existing != null) {
|
||||
fail("duplicate op [" + op + "], old entry at " + location);
|
||||
|
@ -762,7 +797,12 @@ public class TranslogTests extends ESTestCase {
|
|||
synchronized (flushMutex) {
|
||||
// we need not do this concurrently as we need to make sure that the generation
|
||||
// we're committing - is still present when we're committing
|
||||
rollAndCommit(translog);
|
||||
long localCheckpoint = tracker.getCheckpoint() + 1;
|
||||
translog.rollGeneration();
|
||||
deletionPolicy.setMinTranslogGenerationForRecovery(
|
||||
translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration);
|
||||
translog.trimUnreferencedReaders();
|
||||
lastCommittedLocalCheckpoint.set(localCheckpoint);
|
||||
}
|
||||
}
|
||||
if (id % 7 == 0) {
|
||||
|
@ -788,7 +828,7 @@ public class TranslogTests extends ESTestCase {
|
|||
final String threadId = "reader_" + i;
|
||||
readers[i] = new Thread(new AbstractRunnable() {
|
||||
Translog.View view = null;
|
||||
Set<Translog.Operation> writtenOpsAtView;
|
||||
long committedLocalCheckpointAtView;
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
|
@ -811,9 +851,10 @@ public class TranslogTests extends ESTestCase {
|
|||
void newView() throws IOException {
|
||||
closeView();
|
||||
view = translog.newView();
|
||||
// captures the currently written ops so we know what to expect from the view
|
||||
writtenOpsAtView = new HashSet<>(writtenOps.keySet());
|
||||
logger.debug("--> [{}] opened view from [{}]", threadId, view.minTranslogGeneration());
|
||||
// captures the last committed checkpoint, while holding the view, simulating
|
||||
// recovery logic which captures a view and gets a lucene commit
|
||||
committedLocalCheckpointAtView = lastCommittedLocalCheckpoint.get();
|
||||
logger.debug("--> [{}] opened view from [{}]", threadId, view.viewGenToRelease);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -828,23 +869,18 @@ public class TranslogTests extends ESTestCase {
|
|||
// captures al views that are written since the view was created (with a small caveat see bellow)
|
||||
// these are what we expect the snapshot to return (and potentially some more).
|
||||
Set<Translog.Operation> expectedOps = new HashSet<>(writtenOps.keySet());
|
||||
expectedOps.removeAll(writtenOpsAtView);
|
||||
Translog.Snapshot snapshot = view.snapshot();
|
||||
expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView);
|
||||
Translog.Snapshot snapshot = view.snapshot(committedLocalCheckpointAtView + 1L);
|
||||
Translog.Operation op;
|
||||
while ((op = snapshot.next()) != null) {
|
||||
expectedOps.remove(op);
|
||||
}
|
||||
if (expectedOps.isEmpty() == false) {
|
||||
StringBuilder missed = new StringBuilder("missed ").append(expectedOps.size()).append(" operations");
|
||||
StringBuilder missed = new StringBuilder("missed ").append(expectedOps.size())
|
||||
.append(" operations from [").append(committedLocalCheckpointAtView + 1L).append("]");
|
||||
boolean failed = false;
|
||||
for (Translog.Operation expectedOp : expectedOps) {
|
||||
final Translog.Location loc = writtenOps.get(expectedOp);
|
||||
if (loc.generation < view.minTranslogGeneration()) {
|
||||
// writtenOps is only updated after the op was written to the translog. This mean
|
||||
// that ops written to the translog before the view was taken (and will be missing from the view)
|
||||
// may yet be available in writtenOpsAtView, meaning we will erroneously expect them
|
||||
continue;
|
||||
}
|
||||
failed = true;
|
||||
missed.append("\n --> [").append(expectedOp).append("] written at ").append(loc);
|
||||
}
|
||||
|
@ -854,7 +890,7 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
// slow down things a bit and spread out testing..
|
||||
synchronized (signalReaderSomeDataWasIndexed) {
|
||||
if (idGenerator.get() < maxOps){
|
||||
if (idGenerator.get() < maxOps) {
|
||||
signalReaderSomeDataWasIndexed.wait();
|
||||
}
|
||||
}
|
||||
|
@ -1154,7 +1190,7 @@ public class TranslogTests extends ESTestCase {
|
|||
translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
|
||||
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration());
|
||||
assertFalse(translog.syncNeeded());
|
||||
Translog.Snapshot snapshot = translog.newSnapshot();
|
||||
Translog.Snapshot snapshot = translog.newSnapshot(translogGeneration.translogFileGeneration);
|
||||
for (int i = minUncommittedOp; i < translogOperations; i++) {
|
||||
assertEquals("expected operation" + i + " to be in the previous translog but wasn't", translog.currentFileGeneration() - 1, locations.get(i).generation);
|
||||
Translog.Operation next = snapshot.next();
|
||||
|
@ -1388,7 +1424,7 @@ public class TranslogTests extends ESTestCase {
|
|||
|
||||
}
|
||||
this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO);
|
||||
Translog.Snapshot snapshot = this.translog.newSnapshot();
|
||||
Translog.Snapshot snapshot = this.translog.newSnapshot(translogGeneration.translogFileGeneration);
|
||||
for (int i = firstUncommitted; i < translogOperations; i++) {
|
||||
Translog.Operation next = snapshot.next();
|
||||
assertNotNull("" + i, next);
|
||||
|
@ -1772,6 +1808,10 @@ public class TranslogTests extends ESTestCase {
|
|||
final long comittedGeneration;
|
||||
final String translogUUID;
|
||||
try (Translog translog = getFailableTranslog(fail, config)) {
|
||||
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
|
||||
// disable retention so we trim things
|
||||
deletionPolicy.setRetentionSizeInBytes(-1);
|
||||
deletionPolicy.setRetentionAgeInMillis(-1);
|
||||
translogUUID = translog.getTranslogUUID();
|
||||
int translogOperations = randomIntBetween(10, 100);
|
||||
for (int op = 0; op < translogOperations / 2; op++) {
|
||||
|
@ -1788,9 +1828,10 @@ public class TranslogTests extends ESTestCase {
|
|||
translog.rollGeneration();
|
||||
}
|
||||
}
|
||||
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
|
||||
fail.failRandomly();
|
||||
try {
|
||||
commit(translog, comittedGeneration);
|
||||
translog.trimUnreferencedReaders();
|
||||
} catch (Exception e) {
|
||||
// expected...
|
||||
}
|
||||
|
@ -2162,7 +2203,7 @@ public class TranslogTests extends ESTestCase {
|
|||
TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy();
|
||||
deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery);
|
||||
try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) {
|
||||
Translog.Snapshot snapshot = translog.newSnapshot();
|
||||
Translog.Snapshot snapshot = translog.newSnapshot(minGenForRecovery);
|
||||
assertEquals(syncedDocs.size(), snapshot.totalOperations());
|
||||
for (int i = 0; i < syncedDocs.size(); i++) {
|
||||
Translog.Operation next = snapshot.next();
|
||||
|
@ -2302,11 +2343,14 @@ public class TranslogTests extends ESTestCase {
|
|||
public void testRollGeneration() throws Exception {
|
||||
// make sure we keep some files around
|
||||
final boolean longRetention = randomBoolean();
|
||||
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
|
||||
if (longRetention) {
|
||||
translog.getDeletionPolicy().setRetentionAgeInMillis(3600 * 1000);
|
||||
deletionPolicy.setRetentionAgeInMillis(3600 * 1000);
|
||||
} else {
|
||||
translog.getDeletionPolicy().setRetentionAgeInMillis(-1);
|
||||
deletionPolicy.setRetentionAgeInMillis(-1);
|
||||
}
|
||||
// we control retention via time, disable size based calculations for simplicity
|
||||
deletionPolicy.setRetentionSizeInBytes(-1);
|
||||
final long generation = translog.currentFileGeneration();
|
||||
final int rolls = randomIntBetween(1, 16);
|
||||
int totalOperations = 0;
|
||||
|
@ -2328,12 +2372,12 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
commit(translog, generation + rolls);
|
||||
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls ));
|
||||
assertThat(translog.totalOperations(), equalTo(0));
|
||||
assertThat(translog.uncommittedOperations(), equalTo(0));
|
||||
if (longRetention) {
|
||||
for (int i = 0; i <= rolls; i++) {
|
||||
assertFileIsPresent(translog, generation + i);
|
||||
}
|
||||
translog.getDeletionPolicy().setRetentionAgeInMillis(randomBoolean() ? 100 : -1);
|
||||
deletionPolicy.setRetentionAgeInMillis(randomBoolean() ? 100 : -1);
|
||||
assertBusy(() -> {
|
||||
translog.trimUnreferencedReaders();
|
||||
for (int i = 0; i < rolls; i++) {
|
||||
|
@ -2349,65 +2393,6 @@ public class TranslogTests extends ESTestCase {
|
|||
assertFileIsPresent(translog, generation + rolls);
|
||||
}
|
||||
|
||||
public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException {
|
||||
final long generation = translog.currentFileGeneration();
|
||||
int seqNo = 0;
|
||||
|
||||
final int rollsBefore = randomIntBetween(0, 16);
|
||||
for (int r = 1; r <= rollsBefore; r++) {
|
||||
final int operationsBefore = randomIntBetween(1, 256);
|
||||
for (int i = 0; i < operationsBefore; i++) {
|
||||
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
|
||||
}
|
||||
|
||||
try (Releasable ignored = translog.writeLock.acquire()) {
|
||||
translog.rollGeneration();
|
||||
}
|
||||
|
||||
assertThat(translog.currentFileGeneration(), equalTo(generation + r));
|
||||
for (int i = 0; i <= r; i++) {
|
||||
assertFileIsPresent(translog, generation + r);
|
||||
}
|
||||
}
|
||||
|
||||
assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore));
|
||||
translog.rollGeneration();
|
||||
assertThat(translog.currentFileGeneration(), equalTo(generation + rollsBefore + 1));
|
||||
|
||||
for (int i = 0; i <= rollsBefore + 1; i++) {
|
||||
assertFileIsPresent(translog, generation + i);
|
||||
}
|
||||
|
||||
final int rollsBetween = randomIntBetween(0, 16);
|
||||
for (int r = 1; r <= rollsBetween; r++) {
|
||||
final int operationsBetween = randomIntBetween(1, 256);
|
||||
for (int i = 0; i < operationsBetween; i++) {
|
||||
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
|
||||
}
|
||||
|
||||
try (Releasable ignored = translog.writeLock.acquire()) {
|
||||
translog.rollGeneration();
|
||||
}
|
||||
|
||||
assertThat(
|
||||
translog.currentFileGeneration(),
|
||||
equalTo(generation + rollsBefore + 1 + r));
|
||||
for (int i = 0; i <= rollsBefore + 1 + r; i++) {
|
||||
assertFileIsPresent(translog, generation + i);
|
||||
}
|
||||
}
|
||||
|
||||
commit(translog, generation + rollsBefore + 1);
|
||||
|
||||
for (int i = 0; i <= rollsBefore; i++) {
|
||||
assertFileDeleted(translog, generation + i);
|
||||
}
|
||||
for (int i = rollsBefore + 1; i <= rollsBefore + 1 + rollsBetween; i++) {
|
||||
assertFileIsPresent(translog, generation + i);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testMinGenerationForSeqNo() throws IOException {
|
||||
final int operations = randomIntBetween(1, 4096);
|
||||
final List<Long> shuffledSeqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList());
|
||||
|
@ -2471,65 +2456,26 @@ public class TranslogTests extends ESTestCase {
|
|||
final long generation =
|
||||
randomIntBetween(1, Math.toIntExact(translog.currentFileGeneration()));
|
||||
commit(translog, generation);
|
||||
for (long g = 0; g < generation; g++) {
|
||||
assertFileDeleted(translog, g);
|
||||
}
|
||||
for (long g = generation; g <= translog.currentFileGeneration(); g++) {
|
||||
assertFileIsPresent(translog, g);
|
||||
}
|
||||
}
|
||||
|
||||
public void testPrepareCommitAndCommit() throws IOException {
|
||||
public void testOpenViewIsPassToDeletionPolicy() throws IOException {
|
||||
final int operations = randomIntBetween(1, 4096);
|
||||
long seqNo = 0;
|
||||
long last = -1;
|
||||
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
|
||||
for (int i = 0; i < operations; i++) {
|
||||
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
|
||||
translog.add(new Translog.NoOp(i, 0, "test"));
|
||||
if (rarely()) {
|
||||
final long generation = translog.currentFileGeneration();
|
||||
translog.rollGeneration();
|
||||
if (rarely()) {
|
||||
// simulate generation filling up and rolling between preparing the commit and committing
|
||||
translog.rollGeneration();
|
||||
}
|
||||
final int committedGeneration = randomIntBetween(Math.max(1, Math.toIntExact(last)), Math.toIntExact(generation));
|
||||
commit(translog, committedGeneration);
|
||||
last = committedGeneration;
|
||||
for (long g = 0; g < committedGeneration; g++) {
|
||||
assertFileDeleted(translog, g);
|
||||
}
|
||||
for (long g = committedGeneration; g <= translog.currentFileGeneration(); g++) {
|
||||
assertFileIsPresent(translog, g);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testCommitWithOpenView() throws IOException {
|
||||
final int operations = randomIntBetween(1, 4096);
|
||||
long seqNo = 0;
|
||||
long lastCommittedGeneration = -1;
|
||||
for (int i = 0; i < operations; i++) {
|
||||
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
|
||||
if (rarely()) {
|
||||
try (Translog.View ignored = translog.newView()) {
|
||||
final long viewGeneration = lastCommittedGeneration;
|
||||
translog.rollGeneration();
|
||||
final long committedGeneration = randomIntBetween(
|
||||
Math.max(1, Math.toIntExact(lastCommittedGeneration)),
|
||||
Math.toIntExact(translog.currentFileGeneration()));
|
||||
commit(translog, committedGeneration);
|
||||
lastCommittedGeneration = committedGeneration;
|
||||
// with an open view, committing should preserve generations back to the last committed generation
|
||||
for (long g = 1; g < Math.min(lastCommittedGeneration, viewGeneration); g++) {
|
||||
assertFileDeleted(translog, g);
|
||||
}
|
||||
// the view generation could be -1 if no commit has been performed
|
||||
final long max = Math.max(1, Math.min(lastCommittedGeneration, viewGeneration));
|
||||
for (long g = max; g <= translog.currentFileGeneration(); g++) {
|
||||
assertFileIsPresent(translog, g);
|
||||
}
|
||||
commit(translog, randomLongBetween(deletionPolicy.getMinTranslogGenerationForRecovery(), translog.currentFileGeneration()));
|
||||
}
|
||||
if (frequently()) {
|
||||
long viewGen;
|
||||
try (Translog.View view = translog.newView()) {
|
||||
viewGen = view.viewGenToRelease;
|
||||
assertThat(deletionPolicy.getViewCount(view.viewGenToRelease), equalTo(1L));
|
||||
}
|
||||
assertThat(deletionPolicy.getViewCount(viewGen), equalTo(0L));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -377,6 +377,11 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
when(shard.state()).thenReturn(IndexShardState.RELOCATED);
|
||||
when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
|
||||
final AtomicBoolean phase1Called = new AtomicBoolean();
|
||||
// final Engine.IndexCommitRef indexCommitRef = mock(Engine.IndexCommitRef.class);
|
||||
// when(shard.acquireIndexCommit(anyBoolean())).thenReturn(indexCommitRef);
|
||||
// final IndexCommit indexCommit = mock(IndexCommit.class);
|
||||
// when(indexCommitRef.getIndexCommit()).thenReturn(indexCommit);
|
||||
// when(indexCommit.getUserData()).thenReturn(Collections.emptyMap());final AtomicBoolean phase1Called = new AtomicBoolean();
|
||||
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
|
||||
final AtomicBoolean phase2Called = new AtomicBoolean();
|
||||
final RecoverySourceHandler handler = new RecoverySourceHandler(
|
||||
|
@ -394,7 +399,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
|
||||
public void phase1(final IndexCommit snapshot, final Translog.View translogView, final long startSeqNo) {
|
||||
phase1Called.set(true);
|
||||
}
|
||||
|
||||
|
@ -451,6 +456,11 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}).when(shard).relocated(any(String.class));
|
||||
when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class));
|
||||
|
||||
// final Engine.IndexCommitRef indexCommitRef = mock(Engine.IndexCommitRef.class);
|
||||
// when(shard.acquireIndexCommit(anyBoolean())).thenReturn(indexCommitRef);
|
||||
// final IndexCommit indexCommit = mock(IndexCommit.class);
|
||||
// when(indexCommitRef.getIndexCommit()).thenReturn(indexCommit);
|
||||
// when(indexCommit.getUserData()).thenReturn(Collections.emptyMap());
|
||||
final Supplier<Long> currentClusterStateVersionSupplier = () -> {
|
||||
assertFalse(ensureClusterStateVersionCalled.get());
|
||||
assertTrue(recoveriesDelayed.get());
|
||||
|
@ -487,7 +497,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
|
||||
public void phase1(final IndexCommit snapshot, final Translog.View translogView, final long startSeqNo) {
|
||||
phase1Called.set(true);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices.recovery;
|
||||
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
|
||||
import org.elasticsearch.index.replication.RecoveryDuringReplicationTests;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class RecoveryTests extends ESIndexLevelReplicationTestCase {
|
||||
|
||||
public void testTranslogHistoryTransferred() throws Exception {
|
||||
try (ReplicationGroup shards = createGroup(0)) {
|
||||
shards.startPrimary();
|
||||
int docs = shards.indexDocs(10);
|
||||
shards.getPrimary().getTranslog().rollGeneration();
|
||||
shards.flush();
|
||||
if (randomBoolean()) {
|
||||
docs += shards.indexDocs(10);
|
||||
}
|
||||
shards.addReplica();
|
||||
shards.startAll();
|
||||
final IndexShard replica = shards.getReplicas().get(0);
|
||||
assertThat(replica.getTranslog().totalOperations(), equalTo(docs));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testRetentionPolicyChangeDuringRecovery() throws Exception {
|
||||
try (ReplicationGroup shards = createGroup(0)) {
|
||||
shards.startPrimary();
|
||||
shards.indexDocs(10);
|
||||
shards.getPrimary().getTranslog().rollGeneration();
|
||||
shards.flush();
|
||||
shards.indexDocs(10);
|
||||
final IndexShard replica = shards.addReplica();
|
||||
final CountDownLatch recoveryBlocked = new CountDownLatch(1);
|
||||
final CountDownLatch releaseRecovery = new CountDownLatch(1);
|
||||
Future<Void> future = shards.asyncRecoverReplica(replica,
|
||||
(indexShard, node) -> new RecoveryDuringReplicationTests.BlockingTarget(RecoveryState.Stage.TRANSLOG,
|
||||
recoveryBlocked, releaseRecovery, indexShard, node, recoveryListener, logger));
|
||||
recoveryBlocked.await();
|
||||
IndexMetaData.Builder builder = IndexMetaData.builder(replica.indexSettings().getIndexMetaData());
|
||||
builder.settings(Settings.builder().put(replica.indexSettings().getSettings())
|
||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
|
||||
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
|
||||
// force a roll and flush
|
||||
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), "100b")
|
||||
);
|
||||
replica.indexSettings().updateIndexMetaData(builder.build());
|
||||
replica.onSettingsChanged();
|
||||
releaseRecovery.countDown();
|
||||
future.get();
|
||||
// rolling/flushing is async
|
||||
assertBusy(() -> assertThat(replica.getTranslog().totalOperations(), equalTo(0)));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,16 +20,6 @@ replaying its operations take a considerable amount of time during recovery.
|
|||
It is also exposed through an API, though its rarely needed to be performed
|
||||
manually.
|
||||
|
||||
[float]
|
||||
=== Flush settings
|
||||
|
||||
The following <<indices-update-settings,dynamically updatable>> settings
|
||||
control how often the in-memory buffer is flushed to disk:
|
||||
|
||||
`index.translog.flush_threshold_size`::
|
||||
|
||||
Once the translog hits this size, a flush will happen. Defaults to `512mb`.
|
||||
|
||||
[float]
|
||||
=== Translog settings
|
||||
|
||||
|
@ -72,6 +62,26 @@ update, or bulk request. This setting accepts the following parameters:
|
|||
automatic commit will be discarded.
|
||||
--
|
||||
|
||||
`index.translog.flush_threshold_size`::
|
||||
|
||||
The translog stores all operations that are not yet safely persisted in Lucene (i.e., are
|
||||
not part of a lucene commit point). Although these operations are available for reads, they will
|
||||
need to be reindexed if the shard was to shutdown and has to be recovered. This settings controls
|
||||
the maximum total size of these operations, to prevent recoveries from taking too long. Once the
|
||||
maximum size has been reached a flush will happen, generating a new Lucene commit. Defaults to `512mb`.
|
||||
|
||||
`index.translog.retention.size`::
|
||||
|
||||
The total size of translog files to keep. Keeping more translog files increases the chance of performing
|
||||
an operation based sync when recovering replicas. If the translog files are not sufficient, replica recovery
|
||||
will fall back to a file based sync. Defaults to `512mb`
|
||||
|
||||
|
||||
`index.translog.retention.age`::
|
||||
|
||||
The maximum duration for which translog files will be kept. Defaults to `12h`.
|
||||
|
||||
|
||||
[float]
|
||||
[[corrupt-translog-truncation]]
|
||||
=== What to do if the translog becomes corrupted?
|
||||
|
|
|
@ -68,3 +68,8 @@ matching indices).
|
|||
Omitting the `+` has the same effect as specifying it, hence support for `+`
|
||||
has been removed in index expressions.
|
||||
|
||||
==== Translog retention
|
||||
|
||||
Translog files are now kept for up to 12 hours (by default), with a maximum size of `512mb` (default), and
|
||||
are no longer deleted on `flush`. This is to increase the chance of doing an operation based recovery when
|
||||
bringing up replicas up to speed.
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
---
|
||||
setup:
|
||||
- do:
|
||||
indices.create:
|
||||
index: test
|
||||
|
||||
---
|
||||
"Translog retention":
|
||||
- skip:
|
||||
version: " - 5.99.0"
|
||||
reason: translog retention was added in 6.0.0
|
||||
- do:
|
||||
indices.stats:
|
||||
metric: [ translog ]
|
||||
- set: { indices.test.primaries.translog.size_in_bytes: empty_size }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
type: bar
|
||||
id: 1
|
||||
body: { "foo": "bar" }
|
||||
|
||||
- do:
|
||||
indices.stats:
|
||||
metric: [ translog ]
|
||||
- gt: { indices.test.primaries.translog.size_in_bytes: $empty_size }
|
||||
- match: { indices.test.primaries.translog.operations: 1 }
|
||||
- gt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $empty_size }
|
||||
- match: { indices.test.primaries.translog.uncommitted_operations: 1 }
|
||||
|
||||
- do:
|
||||
indices.flush:
|
||||
index: test
|
||||
|
||||
- do:
|
||||
indices.stats:
|
||||
metric: [ translog ]
|
||||
- gt: { indices.test.primaries.translog.size_in_bytes: $empty_size }
|
||||
- match: { indices.test.primaries.translog.operations: 1 }
|
||||
- match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $empty_size }
|
||||
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }
|
||||
|
||||
- do:
|
||||
indices.put_settings:
|
||||
index: test
|
||||
body:
|
||||
index.translog.retention.size: -1
|
||||
index.translog.retention.age: -1
|
||||
|
||||
- do:
|
||||
indices.flush:
|
||||
index: test
|
||||
force: true # force flush as we don't have pending ops
|
||||
|
||||
- do:
|
||||
indices.stats:
|
||||
metric: [ translog ]
|
||||
- match: { indices.test.primaries.translog.size_in_bytes: $empty_size }
|
||||
- match: { indices.test.primaries.translog.operations: 0 }
|
||||
- match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $empty_size }
|
||||
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }
|
|
@ -633,7 +633,7 @@ public abstract class ESTestCase extends LuceneTestCase {
|
|||
|
||||
private static final String[] TIME_SUFFIXES = new String[]{"d", "h", "ms", "s", "m", "micros", "nanos"};
|
||||
|
||||
public static String randomTimeValue(int lower, int upper, String[] suffixes) {
|
||||
public static String randomTimeValue(int lower, int upper, String... suffixes) {
|
||||
return randomIntBetween(lower, upper) + randomFrom(suffixes);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue