Introduce retention lease state file (#39004)
This commit moves retention leases from being persisted in the Lucene commit point to being persisted in a dedicated state file.
This commit is contained in:
parent
d43ac8fe11
commit
2d8f6b6501
|
@ -103,8 +103,7 @@ which returns something similar to:
|
|||
"max_seq_no" : "-1",
|
||||
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
|
||||
"max_unsafe_auto_id_timestamp" : "-1",
|
||||
"min_retained_seq_no" : "0",
|
||||
"retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"
|
||||
"min_retained_seq_no" : "0"
|
||||
},
|
||||
"num_docs" : 0
|
||||
}
|
||||
|
@ -119,7 +118,6 @@ which returns something similar to:
|
|||
// TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/]
|
||||
// TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/]
|
||||
// TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/]
|
||||
// TESTRESPONSE[s/"retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/]
|
||||
<1> the `sync id` marker
|
||||
|
||||
[float]
|
||||
|
|
|
@ -114,7 +114,6 @@ public abstract class Engine implements Closeable {
|
|||
public static final String SYNC_COMMIT_ID = "sync_id";
|
||||
public static final String HISTORY_UUID_KEY = "history_uuid";
|
||||
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
|
||||
public static final String RETENTION_LEASES = "retention_leases";
|
||||
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
|
||||
|
||||
protected final ShardId shardId;
|
||||
|
|
|
@ -51,7 +51,6 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lucene.LoggerInfoStream;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
|
@ -75,7 +74,6 @@ import org.elasticsearch.index.mapper.SourceFieldMapper;
|
|||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.merge.OnGoingMerge;
|
||||
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
|
||||
import org.elasticsearch.index.seqno.RetentionLeases;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
|
||||
|
@ -2347,13 +2345,7 @@ public class InternalEngine extends Engine {
|
|||
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
|
||||
commitData.put(HISTORY_UUID_KEY, historyUUID);
|
||||
if (softDeleteEnabled) {
|
||||
/*
|
||||
* We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
|
||||
* retained sequence number, and the retention leases.
|
||||
*/
|
||||
final Tuple<Long, RetentionLeases> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
|
||||
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
|
||||
commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2()));
|
||||
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
|
||||
}
|
||||
logger.trace("committing writer with commit data [{}]", commitData);
|
||||
return commitData.entrySet().iterator();
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.index.engine;
|
|||
|
||||
import org.apache.lucene.document.LongPoint;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.seqno.RetentionLease;
|
||||
|
@ -107,10 +106,6 @@ final class SoftDeletesPolicy {
|
|||
* Operations whose seq# is least this value should exist in the Lucene index.
|
||||
*/
|
||||
synchronized long getMinRetainedSeqNo() {
|
||||
return getRetentionPolicy().v1();
|
||||
}
|
||||
|
||||
public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
|
||||
/*
|
||||
* When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
|
||||
* locked for peer recovery.
|
||||
|
@ -151,7 +146,7 @@ final class SoftDeletesPolicy {
|
|||
*/
|
||||
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
|
||||
}
|
||||
return Tuple.tuple(minRetainedSeqNo, retentionLeases);
|
||||
return minRetainedSeqNo;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.elasticsearch.common.collect.Tuple;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.gateway.WriteStateException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
|
@ -39,6 +41,7 @@ import org.elasticsearch.index.shard.ReplicationGroup;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -318,6 +321,40 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the latest retention leases from their dedicated state file.
|
||||
*
|
||||
* @param path the path to the directory containing the state file
|
||||
* @return the retention leases
|
||||
* @throws IOException if an I/O exception occurs reading the retention leases
|
||||
*/
|
||||
public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
|
||||
final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
|
||||
if (retentionLeases == null) {
|
||||
return RetentionLeases.EMPTY;
|
||||
}
|
||||
return retentionLeases;
|
||||
}
|
||||
|
||||
private final Object retentionLeasePersistenceLock = new Object();
|
||||
|
||||
/**
|
||||
* Persists the current retention leases to their dedicated state file.
|
||||
*
|
||||
* @param path the path to the directory containing the state file
|
||||
* @throws WriteStateException if an exception occurs writing the state file
|
||||
*/
|
||||
public void persistRetentionLeases(final Path path) throws WriteStateException {
|
||||
synchronized (retentionLeasePersistenceLock) {
|
||||
final RetentionLeases currentRetentionLeases;
|
||||
synchronized (this) {
|
||||
currentRetentionLeases = retentionLeases;
|
||||
}
|
||||
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
|
||||
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
|
||||
}
|
||||
}
|
||||
|
||||
public static class CheckpointState implements Writeable {
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,13 +19,16 @@
|
|||
|
||||
package org.elasticsearch.index.seqno;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
|
@ -34,7 +37,7 @@ import java.util.Objects;
|
|||
* otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
|
||||
* number, the timestamp of when the lease was created or renewed, and the source of the retention lease (e.g., "ccr").
|
||||
*/
|
||||
public final class RetentionLease implements Writeable {
|
||||
public final class RetentionLease implements ToXContent, Writeable {
|
||||
|
||||
private final String id;
|
||||
|
||||
|
@ -94,10 +97,6 @@ public final class RetentionLease implements Writeable {
|
|||
if (id.isEmpty()) {
|
||||
throw new IllegalArgumentException("retention lease ID can not be empty");
|
||||
}
|
||||
if (id.contains(":") || id.contains(";") || id.contains(",")) {
|
||||
// retention lease IDs can not contain these characters because they are used in encoding retention leases
|
||||
throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]");
|
||||
}
|
||||
if (retainingSequenceNumber < 0) {
|
||||
throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range");
|
||||
}
|
||||
|
@ -108,10 +107,6 @@ public final class RetentionLease implements Writeable {
|
|||
if (source.isEmpty()) {
|
||||
throw new IllegalArgumentException("retention lease source can not be empty");
|
||||
}
|
||||
if (source.contains(":") || source.contains(";") || source.contains(",")) {
|
||||
// retention lease sources can not contain these characters because they are used in encoding retention leases
|
||||
throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]");
|
||||
}
|
||||
this.id = id;
|
||||
this.retainingSequenceNumber = retainingSequenceNumber;
|
||||
this.timestamp = timestamp;
|
||||
|
@ -145,43 +140,49 @@ public final class RetentionLease implements Writeable {
|
|||
out.writeString(source);
|
||||
}
|
||||
|
||||
/**
|
||||
* Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is
|
||||
* encoded in the format <code>id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}</code>.
|
||||
*
|
||||
* @param retentionLease the retention lease
|
||||
* @return the encoding of the retention lease
|
||||
*/
|
||||
static String encodeRetentionLease(final RetentionLease retentionLease) {
|
||||
Objects.requireNonNull(retentionLease);
|
||||
return String.format(
|
||||
Locale.ROOT,
|
||||
"id:%s;retaining_seq_no:%d;timestamp:%d;source:%s",
|
||||
retentionLease.id,
|
||||
retentionLease.retainingSequenceNumber,
|
||||
retentionLease.timestamp,
|
||||
retentionLease.source);
|
||||
private static final ParseField ID_FIELD = new ParseField("id");
|
||||
private static final ParseField RETAINING_SEQUENCE_NUMBER_FIELD = new ParseField("retaining_sequence_number");
|
||||
private static final ParseField TIMESTAMP_FIELD = new ParseField("timestamp");
|
||||
private static final ParseField SOURCE_FIELD = new ParseField("source");
|
||||
|
||||
private static ConstructingObjectParser<RetentionLease, Void> PARSER = new ConstructingObjectParser<>(
|
||||
"retention_leases",
|
||||
(a) -> new RetentionLease((String) a[0], (Long) a[1], (Long) a[2], (String) a[3]));
|
||||
|
||||
static {
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), RETAINING_SEQUENCE_NUMBER_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD);
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), SOURCE_FIELD);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field(ID_FIELD.getPreferredName(), id);
|
||||
builder.field(RETAINING_SEQUENCE_NUMBER_FIELD.getPreferredName(), retainingSequenceNumber);
|
||||
builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp);
|
||||
builder.field(SOURCE_FIELD.getPreferredName(), source);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFragment() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}.
|
||||
* Parses a retention lease from {@link org.elasticsearch.common.xcontent.XContent}. This method assumes that the retention lease was
|
||||
* converted to {@link org.elasticsearch.common.xcontent.XContent} via {@link #toXContent(XContentBuilder, Params)}.
|
||||
*
|
||||
* @param encodedRetentionLease an encoded retention lease
|
||||
* @return the decoded retention lease
|
||||
* @param parser the parser
|
||||
* @return a retention lease
|
||||
*/
|
||||
static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
|
||||
Objects.requireNonNull(encodedRetentionLease);
|
||||
final String[] fields = encodedRetentionLease.split(";");
|
||||
assert fields.length == 4 : Arrays.toString(fields);
|
||||
assert fields[0].matches("id:[^:;,]+") : fields[0];
|
||||
final String id = fields[0].substring("id:".length());
|
||||
assert fields[1].matches("retaining_seq_no:\\d+") : fields[1];
|
||||
final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length()));
|
||||
assert fields[2].matches("timestamp:\\d+") : fields[2];
|
||||
final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length()));
|
||||
assert fields[3].matches("source:[^:;,]+") : fields[3];
|
||||
final String source = fields[3].substring("source:".length());
|
||||
return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
|
||||
public static RetentionLease fromXContent(final XContentParser parser) {
|
||||
return PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.gateway.WriteStateException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardClosedException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -119,19 +120,21 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
|
|||
}
|
||||
|
||||
@Override
|
||||
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(final Request request, final IndexShard primary) {
|
||||
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
|
||||
final Request request,
|
||||
final IndexShard primary) throws WriteStateException {
|
||||
Objects.requireNonNull(request);
|
||||
Objects.requireNonNull(primary);
|
||||
primary.afterWriteOperation();
|
||||
primary.persistRetentionLeases();
|
||||
return new PrimaryResult<>(request, new ReplicationResponse());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica){
|
||||
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) throws WriteStateException {
|
||||
Objects.requireNonNull(request);
|
||||
Objects.requireNonNull(replica);
|
||||
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
|
||||
replica.afterWriteOperation();
|
||||
replica.persistRetentionLeases();
|
||||
return new ReplicaResult();
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.WriteResponse;
|
||||
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
|
||||
|
@ -39,6 +38,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.gateway.WriteStateException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardClosedException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -121,31 +121,26 @@ public class RetentionLeaseSyncAction extends
|
|||
}
|
||||
|
||||
@Override
|
||||
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(final Request request, final IndexShard primary) {
|
||||
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
|
||||
final Request request,
|
||||
final IndexShard primary) throws WriteStateException {
|
||||
Objects.requireNonNull(request);
|
||||
Objects.requireNonNull(primary);
|
||||
// we flush to ensure that retention leases are committed
|
||||
flush(primary);
|
||||
primary.persistRetentionLeases();
|
||||
return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WriteReplicaResult<Request> shardOperationOnReplica(final Request request, final IndexShard replica) {
|
||||
protected WriteReplicaResult<Request> shardOperationOnReplica(
|
||||
final Request request,
|
||||
final IndexShard replica) throws WriteStateException {
|
||||
Objects.requireNonNull(request);
|
||||
Objects.requireNonNull(replica);
|
||||
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
|
||||
// we flush to ensure that retention leases are committed
|
||||
flush(replica);
|
||||
replica.persistRetentionLeases();
|
||||
return new WriteReplicaResult<>(request, null, null, replica, logger);
|
||||
}
|
||||
|
||||
private void flush(final IndexShard indexShard) {
|
||||
final FlushRequest flushRequest = new FlushRequest();
|
||||
flushRequest.force(true);
|
||||
flushRequest.waitIfOngoing(true);
|
||||
indexShard.flush(flushRequest);
|
||||
}
|
||||
|
||||
public static final class Request extends ReplicatedWriteRequest<Request> {
|
||||
|
||||
private RetentionLeases retentionLeases;
|
||||
|
|
|
@ -19,15 +19,20 @@
|
|||
|
||||
package org.elasticsearch.index.seqno;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.gateway.MetaDataStateFormat;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Locale;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
|
@ -37,7 +42,7 @@ import java.util.stream.Collectors;
|
|||
* Represents a versioned collection of retention leases. We version the collection of retention leases to ensure that sync requests that
|
||||
* arrive out of order on the replica, using the version to ensure that older sync requests are rejected.
|
||||
*/
|
||||
public class RetentionLeases implements Writeable {
|
||||
public class RetentionLeases implements ToXContent, Writeable {
|
||||
|
||||
private final long primaryTerm;
|
||||
|
||||
|
@ -157,54 +162,59 @@ public class RetentionLeases implements Writeable {
|
|||
out.writeCollection(leases.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* Encodes a retention lease collection as a string. This encoding can be decoded by
|
||||
* {@link RetentionLeases#decodeRetentionLeases(String)}. The encoding is a comma-separated encoding of each retention lease as encoded
|
||||
* by {@link RetentionLease#encodeRetentionLease(RetentionLease)}, prefixed by the version of the retention lease collection.
|
||||
*
|
||||
* @param retentionLeases the retention lease collection
|
||||
* @return the encoding of the retention lease collection
|
||||
*/
|
||||
public static String encodeRetentionLeases(final RetentionLeases retentionLeases) {
|
||||
Objects.requireNonNull(retentionLeases);
|
||||
return String.format(
|
||||
Locale.ROOT,
|
||||
"primary_term:%d;version:%d;%s",
|
||||
retentionLeases.primaryTerm,
|
||||
retentionLeases.version,
|
||||
retentionLeases.leases.values().stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")));
|
||||
private static final ParseField PRIMARY_TERM_FIELD = new ParseField("primary_term");
|
||||
private static final ParseField VERSION_FIELD = new ParseField("version");
|
||||
private static final ParseField LEASES_FIELD = new ParseField("leases");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static ConstructingObjectParser<RetentionLeases, Void> PARSER = new ConstructingObjectParser<>(
|
||||
"retention_leases",
|
||||
(a) -> new RetentionLeases((Long) a[0], (Long) a[1], (Collection<RetentionLease>) a[2]));
|
||||
|
||||
static {
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), PRIMARY_TERM_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), VERSION_FIELD);
|
||||
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> RetentionLease.fromXContent(p), LEASES_FIELD);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
|
||||
builder.field(PRIMARY_TERM_FIELD.getPreferredName(), primaryTerm);
|
||||
builder.field(VERSION_FIELD.getPreferredName(), version);
|
||||
builder.startArray(LEASES_FIELD.getPreferredName());
|
||||
{
|
||||
for (final RetentionLease retentionLease : leases.values()) {
|
||||
retentionLease.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
return builder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decodes retention leases encoded by {@link #encodeRetentionLeases(RetentionLeases)}.
|
||||
* Parses a retention leases collection from {@link org.elasticsearch.common.xcontent.XContent}. This method assumes that the retention
|
||||
* leases were converted to {@link org.elasticsearch.common.xcontent.XContent} via {@link #toXContent(XContentBuilder, Params)}.
|
||||
*
|
||||
* @param encodedRetentionLeases an encoded retention lease collection
|
||||
* @return the decoded retention lease collection
|
||||
* @param parser the parser
|
||||
* @return a retention leases collection
|
||||
*/
|
||||
public static RetentionLeases decodeRetentionLeases(final String encodedRetentionLeases) {
|
||||
Objects.requireNonNull(encodedRetentionLeases);
|
||||
if (encodedRetentionLeases.isEmpty()) {
|
||||
return EMPTY;
|
||||
}
|
||||
assert encodedRetentionLeases.matches("primary_term:\\d+;version:\\d+;.*") : encodedRetentionLeases;
|
||||
final int firstSemicolon = encodedRetentionLeases.indexOf(";");
|
||||
final long primaryTerm = Long.parseLong(encodedRetentionLeases.substring("primary_term:".length(), firstSemicolon));
|
||||
final int secondSemicolon = encodedRetentionLeases.indexOf(";", firstSemicolon + 1);
|
||||
final long version = Long.parseLong(encodedRetentionLeases.substring(firstSemicolon + 1 + "version:".length(), secondSemicolon));
|
||||
final Collection<RetentionLease> leases;
|
||||
if (secondSemicolon + 1 == encodedRetentionLeases.length()) {
|
||||
leases = Collections.emptyList();
|
||||
} else {
|
||||
assert Arrays.stream(encodedRetentionLeases.substring(secondSemicolon + 1).split(","))
|
||||
.allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+"))
|
||||
: encodedRetentionLeases;
|
||||
leases = Arrays.stream(encodedRetentionLeases.substring(secondSemicolon + 1).split(","))
|
||||
.map(RetentionLease::decodeRetentionLease)
|
||||
.collect(Collectors.toList());
|
||||
public static RetentionLeases fromXContent(final XContentParser parser) {
|
||||
return PARSER.apply(parser, null);
|
||||
}
|
||||
|
||||
static final MetaDataStateFormat<RetentionLeases> FORMAT = new MetaDataStateFormat<RetentionLeases>("retention-leases-") {
|
||||
|
||||
@Override
|
||||
public void toXContent(final XContentBuilder builder, final RetentionLeases retentionLeases) throws IOException {
|
||||
retentionLeases.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
}
|
||||
|
||||
return new RetentionLeases(primaryTerm, version, leases);
|
||||
}
|
||||
@Override
|
||||
public RetentionLeases fromXContent(final XContentParser parser) {
|
||||
return RetentionLeases.fromXContent(parser);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
|
@ -237,7 +247,16 @@ public class RetentionLeases implements Writeable {
|
|||
* @return the map from retention lease ID to retention lease
|
||||
*/
|
||||
private static Map<String, RetentionLease> toMap(final Collection<RetentionLease> leases) {
|
||||
return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity()));
|
||||
// use a linked hash map to preserve order
|
||||
return leases.stream()
|
||||
.collect(Collectors.toMap(
|
||||
RetentionLease::id,
|
||||
Function.identity(),
|
||||
(left, right) -> {
|
||||
assert left.id().equals(right.id()) : "expected [" + left.id() + "] to equal [" + right.id() + "]";
|
||||
throw new IllegalStateException("duplicate retention lease ID [" + left.id() + "]");
|
||||
},
|
||||
LinkedHashMap::new));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -67,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
|
|||
import org.elasticsearch.common.util.concurrent.RunOnce;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.gateway.WriteStateException;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
|
@ -1432,7 +1433,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
|
||||
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
|
||||
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
|
||||
replicationTracker.updateRetentionLeasesOnReplica(getRetentionLeases(store.readLastCommittedSegmentsInfo()));
|
||||
updateRetentionLeasesOnReplica(loadRetentionLeases());
|
||||
trimUnsafeCommits();
|
||||
synchronized (mutex) {
|
||||
verifyNotClosed();
|
||||
|
@ -1452,14 +1453,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
|
||||
}
|
||||
|
||||
static RetentionLeases getRetentionLeases(final SegmentInfos segmentInfos) {
|
||||
final String committedRetentionLeases = segmentInfos.getUserData().get(Engine.RETENTION_LEASES);
|
||||
if (committedRetentionLeases == null) {
|
||||
return RetentionLeases.EMPTY;
|
||||
}
|
||||
return RetentionLeases.decodeRetentionLeases(committedRetentionLeases);
|
||||
}
|
||||
|
||||
private void trimUnsafeCommits() throws IOException {
|
||||
assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running";
|
||||
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
|
||||
|
@ -2004,6 +1997,27 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
replicationTracker.updateRetentionLeasesOnReplica(retentionLeases);
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the latest retention leases from their dedicated state file.
|
||||
*
|
||||
* @return the retention leases
|
||||
* @throws IOException if an I/O exception occurs reading the retention leases
|
||||
*/
|
||||
public RetentionLeases loadRetentionLeases() throws IOException {
|
||||
verifyNotClosed();
|
||||
return replicationTracker.loadRetentionLeases(path.getShardStatePath());
|
||||
}
|
||||
|
||||
/**
|
||||
* Persists the current retention leases to their dedicated state file.
|
||||
*
|
||||
* @throws WriteStateException if an exception occurs writing the state file
|
||||
*/
|
||||
public void persistRetentionLeases() throws WriteStateException {
|
||||
verifyNotClosed();
|
||||
replicationTracker.persistRetentionLeases(path.getShardStatePath());
|
||||
}
|
||||
|
||||
/**
|
||||
* Syncs the current retention leases to all replicas.
|
||||
*/
|
||||
|
|
|
@ -1548,13 +1548,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
+ translogUUID + "]");
|
||||
}
|
||||
if (startingIndexCommit.equals(lastIndexCommitCommit) == false) {
|
||||
/*
|
||||
* Unlike other commit tags, the retention-leases tag is not restored when an engine is
|
||||
* recovered from translog. We need to manually copy it from the last commit to the safe commit;
|
||||
* otherwise we might lose the latest committed retention leases when re-opening an engine.
|
||||
*/
|
||||
final Map<String, String> userData = new HashMap<>(startingIndexCommit.getUserData());
|
||||
userData.put(Engine.RETENTION_LEASES, lastIndexCommitCommit.getUserData().getOrDefault(Engine.RETENTION_LEASES, ""));
|
||||
try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) {
|
||||
// this achieves two things:
|
||||
// - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
|
||||
|
@ -1565,7 +1558,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
|
||||
// The new commit will use segment files from the starting commit but userData from the last commit by default.
|
||||
// Thus, we need to manually set the userData from the starting commit to the new commit.
|
||||
writer.setLiveCommitData(userData.entrySet());
|
||||
writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet());
|
||||
writer.commit();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -307,6 +307,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
|
|||
indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery");
|
||||
// Persist the global checkpoint.
|
||||
indexShard.sync();
|
||||
indexShard.persistRetentionLeases();
|
||||
indexShard.finalizeRecovery();
|
||||
return null;
|
||||
});
|
||||
|
|
|
@ -5361,16 +5361,6 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
engine.flush(true, true);
|
||||
assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)),
|
||||
equalTo(engine.getMinRetainedSeqNo()));
|
||||
final RetentionLeases leases = retentionLeasesHolder.get();
|
||||
if (leases.leases().isEmpty()) {
|
||||
assertThat(
|
||||
engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES),
|
||||
equalTo("primary_term:" + primaryTerm + ";version:" + retentionLeasesVersion.get() + ";"));
|
||||
} else {
|
||||
assertThat(
|
||||
engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES),
|
||||
equalTo(RetentionLeases.encodeRetentionLeases(leases)));
|
||||
}
|
||||
}
|
||||
if (rarely()) {
|
||||
engine.forceMerge(randomBoolean());
|
||||
|
|
|
@ -37,8 +37,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
|
@ -121,30 +119,6 @@ public class SoftDeletesPolicyTests extends ESTestCase {
|
|||
assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
|
||||
}
|
||||
|
||||
public void testAlwaysFetchLatestRetentionLeases() {
|
||||
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
|
||||
final Collection<RetentionLease> leases = new ArrayList<>();
|
||||
final int numLeases = randomIntBetween(0, 10);
|
||||
for (int i = 0; i < numLeases; i++) {
|
||||
leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(0, 1000), randomNonNegativeLong(), "test"));
|
||||
}
|
||||
final Supplier<RetentionLeases> leasesSupplier =
|
||||
() -> new RetentionLeases(
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
Collections.unmodifiableCollection(new ArrayList<>(leases)));
|
||||
final SoftDeletesPolicy policy =
|
||||
new SoftDeletesPolicy(globalCheckpoint::get, randomIntBetween(1, 1000), randomIntBetween(0, 1000), leasesSupplier);
|
||||
if (randomBoolean()) {
|
||||
policy.acquireRetentionLock();
|
||||
}
|
||||
if (numLeases == 0) {
|
||||
assertThat(policy.getRetentionPolicy().v2().leases(), empty());
|
||||
} else {
|
||||
assertThat(policy.getRetentionPolicy().v2().leases(), contains(leases.toArray(new RetentionLease[0])));
|
||||
}
|
||||
}
|
||||
|
||||
public void testWhenGlobalCheckpointDictatesThePolicy() {
|
||||
final int retentionOperations = randomIntBetween(0, 1024);
|
||||
final AtomicLong globalCheckpoint = new AtomicLong(randomLongBetween(0, Long.MAX_VALUE - 2));
|
||||
|
|
|
@ -24,16 +24,21 @@ import org.elasticsearch.cluster.routing.AllocationId;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.gateway.WriteStateException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -460,6 +465,109 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
|
|||
}
|
||||
}
|
||||
|
||||
public void testLoadAndPersistRetentionLeases() throws IOException {
|
||||
final AllocationId allocationId = AllocationId.newInitializing();
|
||||
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
|
||||
final ReplicationTracker replicationTracker = new ReplicationTracker(
|
||||
new ShardId("test", "_na", 0),
|
||||
allocationId.getId(),
|
||||
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
|
||||
primaryTerm,
|
||||
UNASSIGNED_SEQ_NO,
|
||||
value -> {},
|
||||
() -> 0L,
|
||||
(leases, listener) -> {});
|
||||
replicationTracker.updateFromMaster(
|
||||
randomNonNegativeLong(),
|
||||
Collections.singleton(allocationId.getId()),
|
||||
routingTable(Collections.emptySet(), allocationId),
|
||||
Collections.emptySet());
|
||||
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
final int length = randomIntBetween(0, 8);
|
||||
for (int i = 0; i < length; i++) {
|
||||
if (rarely() && primaryTerm < Long.MAX_VALUE) {
|
||||
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
|
||||
replicationTracker.setOperationPrimaryTerm(primaryTerm);
|
||||
}
|
||||
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
|
||||
replicationTracker.addRetentionLease(
|
||||
Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
|
||||
}
|
||||
|
||||
final Path path = createTempDir();
|
||||
replicationTracker.persistRetentionLeases(path);
|
||||
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}.
|
||||
* This test can fail without the synchronization block in that method.
|
||||
*
|
||||
* @throws IOException if an I/O exception occurs loading the retention lease state file
|
||||
*/
|
||||
public void testPersistRetentionLeasesUnderConcurrency() throws IOException {
|
||||
final AllocationId allocationId = AllocationId.newInitializing();
|
||||
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
|
||||
final ReplicationTracker replicationTracker = new ReplicationTracker(
|
||||
new ShardId("test", "_na", 0),
|
||||
allocationId.getId(),
|
||||
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
|
||||
primaryTerm,
|
||||
UNASSIGNED_SEQ_NO,
|
||||
value -> {},
|
||||
() -> 0L,
|
||||
(leases, listener) -> {});
|
||||
replicationTracker.updateFromMaster(
|
||||
randomNonNegativeLong(),
|
||||
Collections.singleton(allocationId.getId()),
|
||||
routingTable(Collections.emptySet(), allocationId),
|
||||
Collections.emptySet());
|
||||
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
final int length = randomIntBetween(0, 8);
|
||||
for (int i = 0; i < length; i++) {
|
||||
if (rarely() && primaryTerm < Long.MAX_VALUE) {
|
||||
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
|
||||
replicationTracker.setOperationPrimaryTerm(primaryTerm);
|
||||
}
|
||||
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
|
||||
replicationTracker.addRetentionLease(
|
||||
Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
|
||||
}
|
||||
|
||||
final Path path = createTempDir();
|
||||
final int numberOfThreads = randomIntBetween(1, 2 * Runtime.getRuntime().availableProcessors());
|
||||
final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
|
||||
final Thread[] threads = new Thread[numberOfThreads];
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
final String id = Integer.toString(length + i);
|
||||
threads[i] = new Thread(() -> {
|
||||
try {
|
||||
barrier.await();
|
||||
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
|
||||
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test-" + id, ActionListener.wrap(() -> {}));
|
||||
replicationTracker.persistRetentionLeases(path);
|
||||
barrier.await();
|
||||
} catch (final BrokenBarrierException | InterruptedException | WriteStateException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
try {
|
||||
// synchronize the threads invoking ReplicationTracker#persistRetentionLeases(Path path)
|
||||
barrier.await();
|
||||
// wait for all the threads to finish
|
||||
barrier.await();
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
threads[i].join();
|
||||
}
|
||||
} catch (final BrokenBarrierException | InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
|
||||
}
|
||||
|
||||
private void assertRetentionLeases(
|
||||
final ReplicationTracker replicationTracker,
|
||||
final int size,
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.gateway.WriteStateException;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
|
@ -91,7 +92,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
|
|||
super.tearDown();
|
||||
}
|
||||
|
||||
public void testRetentionLeaseBackgroundSyncActionOnPrimary() {
|
||||
public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws WriteStateException {
|
||||
final IndicesService indicesService = mock(IndicesService.class);
|
||||
|
||||
final Index index = new Index("index", "uuid");
|
||||
|
@ -120,13 +121,13 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
|
|||
|
||||
final ReplicationOperation.PrimaryResult<RetentionLeaseBackgroundSyncAction.Request> result =
|
||||
action.shardOperationOnPrimary(request, indexShard);
|
||||
// the retention leases on the shard should be periodically flushed
|
||||
verify(indexShard).afterWriteOperation();
|
||||
// the retention leases on the shard should be persisted
|
||||
verify(indexShard).persistRetentionLeases();
|
||||
// we should forward the request containing the current retention leases to the replica
|
||||
assertThat(result.replicaRequest(), sameInstance(request));
|
||||
}
|
||||
|
||||
public void testRetentionLeaseBackgroundSyncActionOnReplica() {
|
||||
public void testRetentionLeaseBackgroundSyncActionOnReplica() throws WriteStateException {
|
||||
final IndicesService indicesService = mock(IndicesService.class);
|
||||
|
||||
final Index index = new Index("index", "uuid");
|
||||
|
@ -156,8 +157,8 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
|
|||
final TransportReplicationAction.ReplicaResult result = action.shardOperationOnReplica(request, indexShard);
|
||||
// the retention leases on the shard should be updated
|
||||
verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases);
|
||||
// the retention leases on the shard should be periodically flushed
|
||||
verify(indexShard).afterWriteOperation();
|
||||
// the retention leases on the shard should be persisted
|
||||
verify(indexShard).persistRetentionLeases();
|
||||
// the result should indicate success
|
||||
final AtomicBoolean success = new AtomicBoolean();
|
||||
result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
@ -103,10 +102,8 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|||
latch.await();
|
||||
retentionLock.close();
|
||||
|
||||
// check retention leases have been committed on the primary
|
||||
final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
|
||||
primary.commitStats().getUserData().get(Engine.RETENTION_LEASES));
|
||||
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases)));
|
||||
// check retention leases have been written on the primary
|
||||
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primary.loadRetentionLeases())));
|
||||
|
||||
// check current retention leases have been synced to all replicas
|
||||
for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
|
||||
|
@ -118,10 +115,8 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|||
final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
|
||||
assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));
|
||||
|
||||
// check retention leases have been committed on the replica
|
||||
final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
|
||||
replica.commitStats().getUserData().get(Engine.RETENTION_LEASES));
|
||||
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases)));
|
||||
// check retention leases have been written on the replica
|
||||
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -165,10 +160,8 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|||
latch.await();
|
||||
retentionLock.close();
|
||||
|
||||
// check retention leases have been committed on the primary
|
||||
final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
|
||||
primary.commitStats().getUserData().get(Engine.RETENTION_LEASES));
|
||||
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases)));
|
||||
// check retention leases have been written on the primary
|
||||
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primary.loadRetentionLeases())));
|
||||
|
||||
// check current retention leases have been synced to all replicas
|
||||
for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
|
||||
|
@ -180,10 +173,8 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|||
final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
|
||||
assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));
|
||||
|
||||
// check retention leases have been committed on the replica
|
||||
final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
|
||||
replica.commitStats().getUserData().get(Engine.RETENTION_LEASES));
|
||||
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases)));
|
||||
// check retention leases have been written on the replica
|
||||
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -322,7 +313,6 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38588")
|
||||
public void testRetentionLeasesSyncOnRecovery() throws Exception {
|
||||
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
|
||||
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
|
||||
|
@ -378,6 +368,9 @@ public class RetentionLeaseIT extends ESIntegTestCase {
|
|||
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
|
||||
final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
|
||||
assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));
|
||||
|
||||
// check retention leases have been written on the replica; see RecoveryTarget#finalizeRecovery
|
||||
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases())));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.TransportWriteAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
|
@ -29,6 +28,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.gateway.WriteStateException;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
|
@ -90,7 +90,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
super.tearDown();
|
||||
}
|
||||
|
||||
public void testRetentionLeaseSyncActionOnPrimary() {
|
||||
public void testRetentionLeaseSyncActionOnPrimary() throws WriteStateException {
|
||||
final IndicesService indicesService = mock(IndicesService.class);
|
||||
|
||||
final Index index = new Index("index", "uuid");
|
||||
|
@ -118,18 +118,15 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
|
||||
final TransportWriteAction.WritePrimaryResult<RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> result =
|
||||
action.shardOperationOnPrimary(request, indexShard);
|
||||
// the retention leases on the shard should be flushed
|
||||
final ArgumentCaptor<FlushRequest> flushRequest = ArgumentCaptor.forClass(FlushRequest.class);
|
||||
verify(indexShard).flush(flushRequest.capture());
|
||||
assertTrue(flushRequest.getValue().force());
|
||||
assertTrue(flushRequest.getValue().waitIfOngoing());
|
||||
// the retention leases on the shard should be persisted
|
||||
verify(indexShard).persistRetentionLeases();
|
||||
// we should forward the request containing the current retention leases to the replica
|
||||
assertThat(result.replicaRequest(), sameInstance(request));
|
||||
// we should start with an empty replication response
|
||||
assertNull(result.finalResponseIfSuccessful.getShardInfo());
|
||||
}
|
||||
|
||||
public void testRetentionLeaseSyncActionOnReplica() {
|
||||
public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException {
|
||||
final IndicesService indicesService = mock(IndicesService.class);
|
||||
|
||||
final Index index = new Index("index", "uuid");
|
||||
|
@ -159,11 +156,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
|
|||
action.shardOperationOnReplica(request, indexShard);
|
||||
// the retention leases on the shard should be updated
|
||||
verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases);
|
||||
// the retention leases on the shard should be flushed
|
||||
final ArgumentCaptor<FlushRequest> flushRequest = ArgumentCaptor.forClass(FlushRequest.class);
|
||||
verify(indexShard).flush(flushRequest.capture());
|
||||
assertTrue(flushRequest.getValue().force());
|
||||
assertTrue(flushRequest.getValue().waitIfOngoing());
|
||||
// the retention leases on the shard should be persisteed
|
||||
verify(indexShard).persistRetentionLeases();
|
||||
// the result should indicate success
|
||||
final AtomicBoolean success = new AtomicBoolean();
|
||||
result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
|
||||
|
|
|
@ -31,14 +31,6 @@ import static org.hamcrest.Matchers.hasToString;
|
|||
|
||||
public class RetentionLeaseTests extends ESTestCase {
|
||||
|
||||
public void testInvalidId() {
|
||||
final String id = "id" + randomFrom(":", ";", ",");
|
||||
final IllegalArgumentException e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> new RetentionLease(id, randomNonNegativeLong(), randomNonNegativeLong(), "source"));
|
||||
assertThat(e, hasToString(containsString("retention lease ID can not contain any of [:;,] but was [" + id + "]")));
|
||||
}
|
||||
|
||||
public void testEmptyId() {
|
||||
final IllegalArgumentException e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
|
@ -64,14 +56,6 @@ public class RetentionLeaseTests extends ESTestCase {
|
|||
assertThat(e, hasToString(containsString("retention lease timestamp [" + timestamp + "] out of range")));
|
||||
}
|
||||
|
||||
public void testInvalidSource() {
|
||||
final String source = "source" + randomFrom(":", ";", ",");
|
||||
final IllegalArgumentException e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
() -> new RetentionLease("id", randomNonNegativeLong(), randomNonNegativeLong(), source));
|
||||
assertThat(e, hasToString(containsString("retention lease source can not contain any of [:;,] but was [" + source + "]")));
|
||||
}
|
||||
|
||||
public void testEmptySource() {
|
||||
final IllegalArgumentException e = expectThrows(
|
||||
IllegalArgumentException.class,
|
||||
|
@ -93,13 +77,4 @@ public class RetentionLeaseTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testRetentionLeaseEncoding() {
|
||||
final String id = randomAlphaOfLength(8);
|
||||
final long retainingSequenceNumber = randomNonNegativeLong();
|
||||
final long timestamp = randomNonNegativeLong();
|
||||
final String source = randomAlphaOfLength(8);
|
||||
final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
|
||||
assertThat(RetentionLease.decodeRetentionLease(RetentionLease.encodeRetentionLease(retentionLease)), equalTo(retentionLease));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.seqno;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class RetentionLeaseXContentTests extends AbstractXContentTestCase<RetentionLease> {
|
||||
|
||||
@Override
|
||||
protected RetentionLease createTestInstance() {
|
||||
final String id = randomAlphaOfLength(8);
|
||||
final long retainingSequenceNumber = randomNonNegativeLong();
|
||||
final long timestamp = randomNonNegativeLong();
|
||||
final String source = randomAlphaOfLength(8);
|
||||
return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RetentionLease doParseInstance(final XContentParser parser) throws IOException {
|
||||
return RetentionLease.fromXContent(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -19,13 +19,18 @@
|
|||
|
||||
package org.elasticsearch.index.seqno;
|
||||
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -49,30 +54,6 @@ public class RetentionLeasesTests extends ESTestCase {
|
|||
assertThat(e, hasToString(containsString("version must be non-negative but was [" + version + "]")));
|
||||
}
|
||||
|
||||
public void testRetentionLeasesEncoding() {
|
||||
final long primaryTerm = randomNonNegativeLong();
|
||||
final long version = randomNonNegativeLong();
|
||||
final int length = randomIntBetween(0, 8);
|
||||
final List<RetentionLease> retentionLeases = new ArrayList<>(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
final String id = randomAlphaOfLength(8);
|
||||
final long retainingSequenceNumber = randomNonNegativeLong();
|
||||
final long timestamp = randomNonNegativeLong();
|
||||
final String source = randomAlphaOfLength(8);
|
||||
final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
|
||||
retentionLeases.add(retentionLease);
|
||||
}
|
||||
final RetentionLeases decodedRetentionLeases =
|
||||
RetentionLeases.decodeRetentionLeases(
|
||||
RetentionLeases.encodeRetentionLeases(new RetentionLeases(primaryTerm, version, retentionLeases)));
|
||||
assertThat(decodedRetentionLeases.version(), equalTo(version));
|
||||
if (length == 0) {
|
||||
assertThat(decodedRetentionLeases.leases(), empty());
|
||||
} else {
|
||||
assertThat(decodedRetentionLeases.leases(), containsInAnyOrder(retentionLeases.toArray(new RetentionLease[0])));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSupersedesByPrimaryTerm() {
|
||||
final long lowerPrimaryTerm = randomLongBetween(1, Long.MAX_VALUE);
|
||||
final RetentionLeases left = new RetentionLeases(lowerPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
|
||||
|
@ -92,4 +73,48 @@ public class RetentionLeasesTests extends ESTestCase {
|
|||
assertFalse(left.supersedes(right));
|
||||
}
|
||||
|
||||
public void testRetentionLeasesRejectsDuplicates() {
|
||||
final RetentionLeases retentionLeases = randomRetentionLeases(false);
|
||||
final RetentionLease retentionLease = randomFrom(retentionLeases.leases());
|
||||
final IllegalStateException e = expectThrows(
|
||||
IllegalStateException.class,
|
||||
() -> new RetentionLeases(
|
||||
retentionLeases.primaryTerm(),
|
||||
retentionLeases.version(),
|
||||
Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList())));
|
||||
assertThat(e, hasToString(containsString("duplicate retention lease ID [" + retentionLease.id() + "]")));
|
||||
}
|
||||
|
||||
public void testLeasesPreservesIterationOrder() {
|
||||
final RetentionLeases retentionLeases = randomRetentionLeases(true);
|
||||
if (retentionLeases.leases().isEmpty()) {
|
||||
assertThat(retentionLeases.leases(), empty());
|
||||
} else {
|
||||
assertThat(retentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0])));
|
||||
}
|
||||
}
|
||||
|
||||
public void testRetentionLeasesMetaDataStateFormat() throws IOException {
|
||||
final Path path = createTempDir();
|
||||
final RetentionLeases retentionLeases = randomRetentionLeases(true);
|
||||
RetentionLeases.FORMAT.writeAndCleanup(retentionLeases, path);
|
||||
assertThat(RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path), equalTo(retentionLeases));
|
||||
}
|
||||
|
||||
private RetentionLeases randomRetentionLeases(boolean allowEmpty) {
|
||||
final long primaryTerm = randomNonNegativeLong();
|
||||
final long version = randomNonNegativeLong();
|
||||
final int length = randomIntBetween(allowEmpty ? 0 : 1, 8);
|
||||
final List<RetentionLease> leases = new ArrayList<>(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
final String id = randomAlphaOfLength(8);
|
||||
final long retainingSequenceNumber = randomNonNegativeLong();
|
||||
final long timestamp = randomNonNegativeLong();
|
||||
final String source = randomAlphaOfLength(8);
|
||||
final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
|
||||
leases.add(retentionLease);
|
||||
}
|
||||
return new RetentionLeases(primaryTerm, version, leases);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.seqno;
|
||||
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class RetentionLeasesXContentTests extends AbstractXContentTestCase<RetentionLeases> {
|
||||
|
||||
@Override
|
||||
protected RetentionLeases createTestInstance() {
|
||||
final long primaryTerm = randomNonNegativeLong();
|
||||
final long version = randomNonNegativeLong();
|
||||
final int length = randomIntBetween(0, 8);
|
||||
final List<RetentionLease> leases = new ArrayList<>(length);
|
||||
for (int i = 0; i < length; i++) {
|
||||
final String id = randomAlphaOfLength(8);
|
||||
final long retainingSequenceNumber = randomNonNegativeLong();
|
||||
final long timestamp = randomNonNegativeLong();
|
||||
final String source = randomAlphaOfLength(8);
|
||||
final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source);
|
||||
leases.add(retentionLease);
|
||||
}
|
||||
return new RetentionLeases(primaryTerm, version, leases);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RetentionLeases doParseInstance(final XContentParser parser) throws IOException {
|
||||
return RetentionLeases.fromXContent(parser);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
|
@ -19,43 +19,30 @@
|
|||
|
||||
package org.elasticsearch.index.shard;
|
||||
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.InternalEngine;
|
||||
import org.elasticsearch.index.engine.InternalEngineFactory;
|
||||
import org.elasticsearch.index.seqno.RetentionLease;
|
||||
import org.elasticsearch.index.seqno.RetentionLeaseStats;
|
||||
import org.elasticsearch.index.seqno.RetentionLeases;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
|
@ -221,7 +208,7 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testCommit() throws IOException {
|
||||
public void testPersistence() throws IOException {
|
||||
final Settings settings = Settings.builder()
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), Long.MAX_VALUE, TimeUnit.NANOSECONDS)
|
||||
|
@ -242,19 +229,17 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
|
|||
|
||||
currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE));
|
||||
|
||||
// force a commit
|
||||
indexShard.flush(new FlushRequest().force(true));
|
||||
// force the retention leases to persist
|
||||
indexShard.persistRetentionLeases();
|
||||
|
||||
// the committed retention leases should equal our current retention leases
|
||||
final SegmentInfos segmentCommitInfos = indexShard.store().readLastCommittedSegmentsInfo();
|
||||
assertTrue(segmentCommitInfos.getUserData().containsKey(Engine.RETENTION_LEASES));
|
||||
// the written retention leases should equal our current retention leases
|
||||
final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
|
||||
final RetentionLeases committedRetentionLeases = IndexShard.getRetentionLeases(segmentCommitInfos);
|
||||
final RetentionLeases writtenRetentionLeases = indexShard.loadRetentionLeases();
|
||||
if (retentionLeases.leases().isEmpty()) {
|
||||
assertThat(committedRetentionLeases.version(), equalTo(0L));
|
||||
assertThat(committedRetentionLeases.leases(), empty());
|
||||
assertThat(writtenRetentionLeases.version(), equalTo(0L));
|
||||
assertThat(writtenRetentionLeases.leases(), empty());
|
||||
} else {
|
||||
assertThat(committedRetentionLeases.version(), equalTo((long) length));
|
||||
assertThat(writtenRetentionLeases.version(), equalTo((long) length));
|
||||
assertThat(retentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0])));
|
||||
}
|
||||
|
||||
|
@ -304,76 +289,6 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testRecoverFromStoreReserveRetentionLeases() throws Exception {
|
||||
final AtomicBoolean throwDuringRecoverFromTranslog = new AtomicBoolean();
|
||||
final IndexShard shard = newStartedShard(false, Settings.builder().put("index.soft_deletes.enabled", true).build(),
|
||||
config -> new InternalEngine(config) {
|
||||
@Override
|
||||
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
|
||||
long recoverUpToSeqNo) throws IOException {
|
||||
if (throwDuringRecoverFromTranslog.get()) {
|
||||
throw new RuntimeException("crashed before recover from translog is completed");
|
||||
}
|
||||
return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
|
||||
}
|
||||
});
|
||||
final List<RetentionLease> leases = new ArrayList<>();
|
||||
long version = randomLongBetween(0, 100);
|
||||
long primaryTerm = randomLongBetween(1, 100);
|
||||
final int iterations = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
if (randomBoolean()) {
|
||||
indexDoc(shard, "_doc", Integer.toString(i));
|
||||
} else {
|
||||
leases.add(new RetentionLease(Integer.toString(i), randomNonNegativeLong(),
|
||||
randomLongBetween(Integer.MAX_VALUE, Long.MAX_VALUE), "test"));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
if (randomBoolean()) {
|
||||
version += randomLongBetween(1, 100);
|
||||
primaryTerm += randomLongBetween(0, 100);
|
||||
shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
|
||||
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
|
||||
}
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()), "test");
|
||||
flushShard(shard);
|
||||
}
|
||||
}
|
||||
version += randomLongBetween(1, 100);
|
||||
primaryTerm += randomLongBetween(0, 100);
|
||||
shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
|
||||
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
|
||||
closeShard(shard, false);
|
||||
|
||||
final IndexShard failedShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
|
||||
shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
|
||||
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
|
||||
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
|
||||
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
|
||||
failedShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
|
||||
throwDuringRecoverFromTranslog.set(true);
|
||||
expectThrows(IndexShardRecoveryException.class, failedShard::recoverFromStore);
|
||||
closeShards(failedShard);
|
||||
|
||||
final IndexShard newShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
|
||||
shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
|
||||
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
|
||||
newShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
|
||||
throwDuringRecoverFromTranslog.set(false);
|
||||
assertTrue(newShard.recoverFromStore());
|
||||
final RetentionLeases retentionLeases = newShard.getRetentionLeases();
|
||||
assertThat(retentionLeases.version(), equalTo(version));
|
||||
assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
|
||||
if (leases.isEmpty()) {
|
||||
assertThat(retentionLeases.leases(), empty());
|
||||
} else {
|
||||
assertThat(retentionLeases.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0])));
|
||||
}
|
||||
closeShards(newShard);
|
||||
}
|
||||
|
||||
private void assertRetentionLeases(
|
||||
final IndexShard indexShard,
|
||||
final int size,
|
||||
|
|
|
@ -278,7 +278,6 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37165")
|
||||
public void testUnfollowInjectedBeforeShrink() throws Exception {
|
||||
final String indexName = "shrink-test";
|
||||
final String shrunkenIndexName = "shrink-" + indexName;
|
||||
|
|
Loading…
Reference in New Issue