Introduce retention lease actions (#38756)

This commit introduces actions for some common retention lease
operations that clients need to be able to perform remotely. These
actions include add/renew/remove.
This commit is contained in:
Jason Tedor 2019-02-12 07:37:12 -05:00
parent 745d0c1bba
commit bbc9aa9979
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
10 changed files with 993 additions and 17 deletions

View File

@ -19,8 +19,8 @@
package org.elasticsearch.action;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.allocation.TransportClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
@ -209,6 +209,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.persistent.RemovePersistentTaskAction;
@ -220,6 +221,7 @@ import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.action.RestFieldCapabilitiesAction;
import org.elasticsearch.rest.action.RestMainAction;
import org.elasticsearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction;
import org.elasticsearch.rest.action.admin.cluster.RestCancelTasksAction;
import org.elasticsearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
@ -251,7 +253,6 @@ import org.elasticsearch.rest.action.admin.cluster.RestRemoteClusterInfoAction;
import org.elasticsearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
import org.elasticsearch.rest.action.admin.cluster.RestSnapshotsStatusAction;
import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction;
import org.elasticsearch.rest.action.admin.indices.RestAnalyzeAction;
import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction;
import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction;
@ -529,6 +530,11 @@ public class ActionModule extends AbstractModule {
actions.register(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class);
actions.register(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class);
// retention leases
actions.register(RetentionLeaseActions.Add.INSTANCE, RetentionLeaseActions.Add.TransportAction.class);
actions.register(RetentionLeaseActions.Renew.INSTANCE, RetentionLeaseActions.Renew.TransportAction.class);
actions.register(RetentionLeaseActions.Remove.INSTANCE, RetentionLeaseActions.Remove.TransportAction.class);
return unmodifiableMap(actions.getRegistry());
}

View File

@ -744,7 +744,7 @@ public abstract class Engine implements Closeable {
/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
public abstract Closeable acquireRetentionLockForPeerRecovery();
public abstract Closeable acquireRetentionLock();
/**
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
@ -771,6 +771,13 @@ public abstract class Engine implements Closeable {
*/
public abstract boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) throws IOException;
/**
* Gets the minimum retained sequence number for this engine.
*
* @return the minimum retained sequence number
*/
public abstract long getMinRetainedSeqNo();
public abstract TranslogStats getTranslogStats();
/**

View File

@ -2567,13 +2567,13 @@ public class InternalEngine extends Engine {
* Returns the minimum seqno that is retained in the Lucene index.
* Operations whose seq# are at least this value should exist in the Lucene index.
*/
final long getMinRetainedSeqNo() {
public final long getMinRetainedSeqNo() {
assert softDeleteEnabled : Thread.currentThread().getName();
return softDeletesPolicy.getMinRetainedSeqNo();
}
@Override
public Closeable acquireRetentionLockForPeerRecovery() {
public Closeable acquireRetentionLock() {
if (softDeleteEnabled) {
return softDeletesPolicy.acquireRetentionLock();
} else {

View File

@ -272,7 +272,7 @@ public class ReadOnlyEngine extends Engine {
}
@Override
public Closeable acquireRetentionLockForPeerRecovery() {
public Closeable acquireRetentionLock() {
return () -> {};
}
@ -311,6 +311,11 @@ public class ReadOnlyEngine extends Engine {
return false;
}
@Override
public long getMinRetainedSeqNo() {
throw new UnsupportedOperationException();
}
@Override
public TranslogStats getTranslogStats() {
return translogStats;

View File

@ -0,0 +1,404 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;
/**
* This class holds all actions related to retention leases. Note carefully that these actions are executed under a primary permit. Care is
* taken to thread the listener through the invocations so that for the sync APIs we do not notify the listener until these APIs have
* responded with success. Additionally, note the use of
* {@link TransportSingleShardAction#asyncShardOperation(SingleShardRequest, ShardId, ActionListener)} to handle the case when acquiring
* permits goes asynchronous because acquiring permits is blocked
*/
public class RetentionLeaseActions {
public static final long RETAIN_ALL = -1;
abstract static class TransportRetentionLeaseAction<T extends Request<T>> extends TransportSingleShardAction<T, Response> {
private final IndicesService indicesService;
@Inject
TransportRetentionLeaseAction(
final String name,
final ThreadPool threadPool,
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final IndicesService indicesService,
final Supplier<T> requestSupplier) {
super(
name,
threadPool,
clusterService,
transportService,
actionFilters,
indexNameExpressionResolver,
requestSupplier,
ThreadPool.Names.MANAGEMENT);
this.indicesService = Objects.requireNonNull(indicesService);
}
@Override
protected ShardsIterator shards(final ClusterState state, final InternalRequest request) {
return state
.routingTable()
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id())
.primaryShardIt();
}
@Override
protected void asyncShardOperation(T request, ShardId shardId, final ActionListener<Response> listener) {
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.acquirePrimaryOperationPermit(
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignore = releasable) {
doRetentionLeaseAction(indexShard, request, listener);
}
}
@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
},
ThreadPool.Names.SAME,
request);
}
@Override
protected Response shardOperation(final T request, final ShardId shardId) throws IOException {
throw new UnsupportedOperationException();
}
abstract void doRetentionLeaseAction(IndexShard indexShard, T request, ActionListener<Response> listener);
@Override
protected Response newResponse() {
return new Response();
}
@Override
protected boolean resolveIndex(final T request) {
return false;
}
}
public static class Add extends Action<Response> {
public static final Add INSTANCE = new Add();
public static final String NAME = "indices:admin/seq_no/add_retention_lease";
private Add() {
super(NAME);
}
public static class TransportAction extends TransportRetentionLeaseAction<AddRequest> {
@Inject
public TransportAction(
final ThreadPool threadPool,
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final IndicesService indicesService) {
super(
NAME,
threadPool,
clusterService,
transportService,
actionFilters,
indexNameExpressionResolver,
indicesService,
AddRequest::new);
}
@Override
void doRetentionLeaseAction(final IndexShard indexShard, final AddRequest request, final ActionListener<Response> listener) {
indexShard.addRetentionLease(
request.getId(),
request.getRetainingSequenceNumber(),
request.getSource(),
ActionListener.wrap(
r -> listener.onResponse(new Response()),
listener::onFailure));
}
}
@Override
public Response newResponse() {
return new Response();
}
}
public static class Renew extends Action<Response> {
public static final Renew INSTANCE = new Renew();
public static final String NAME = "indices:admin/seq_no/renew_retention_lease";
private Renew() {
super(NAME);
}
public static class TransportAction extends TransportRetentionLeaseAction<RenewRequest> {
@Inject
public TransportAction(
final ThreadPool threadPool,
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final IndicesService indicesService) {
super(
NAME,
threadPool,
clusterService,
transportService,
actionFilters,
indexNameExpressionResolver,
indicesService,
RenewRequest::new);
}
@Override
void doRetentionLeaseAction(final IndexShard indexShard, final RenewRequest request, final ActionListener<Response> listener) {
indexShard.renewRetentionLease(request.getId(), request.getRetainingSequenceNumber(), request.getSource());
listener.onResponse(new Response());
}
}
@Override
public Response newResponse() {
return new Response();
}
}
public static class Remove extends Action<Response> {
public static final Remove INSTANCE = new Remove();
public static final String NAME = "indices:admin/seq_no/remove_retention_lease";
private Remove() {
super(NAME);
}
public static class TransportAction extends TransportRetentionLeaseAction<RemoveRequest> {
@Inject
public TransportAction(
final ThreadPool threadPool,
final ClusterService clusterService,
final TransportService transportService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final IndicesService indicesService) {
super(
NAME,
threadPool,
clusterService,
transportService,
actionFilters,
indexNameExpressionResolver,
indicesService,
RemoveRequest::new);
}
@Override
void doRetentionLeaseAction(final IndexShard indexShard, final RemoveRequest request, final ActionListener<Response> listener) {
indexShard.removeRetentionLease(
request.getId(),
ActionListener.wrap(
r -> listener.onResponse(new Response()),
listener::onFailure));
}
}
@Override
public Response newResponse() {
return new Response();
}
}
private abstract static class Request<T extends SingleShardRequest<T>> extends SingleShardRequest<T> {
private ShardId shardId;
public ShardId getShardId() {
return shardId;
}
private String id;
public String getId() {
return id;
}
Request() {
}
Request(final ShardId shardId, final String id) {
super(Objects.requireNonNull(shardId).getIndexName());
this.shardId = shardId;
this.id = Objects.requireNonNull(id);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
id = in.readString();
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeString(id);
}
}
private abstract static class AddOrRenewRequest<T extends SingleShardRequest<T>> extends Request<T> {
private long retainingSequenceNumber;
public long getRetainingSequenceNumber() {
return retainingSequenceNumber;
}
private String source;
public String getSource() {
return source;
}
AddOrRenewRequest() {
}
AddOrRenewRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) {
super(shardId, id);
if (retainingSequenceNumber < 0 && retainingSequenceNumber != RETAIN_ALL) {
throw new IllegalArgumentException("retaining sequence number [" + retainingSequenceNumber + "] out of range");
}
this.retainingSequenceNumber = retainingSequenceNumber;
this.source = Objects.requireNonNull(source);
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
retainingSequenceNumber = in.readZLong();
source = in.readString();
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(retainingSequenceNumber);
out.writeString(source);
}
}
public static class AddRequest extends AddOrRenewRequest<AddRequest> {
public AddRequest() {
}
public AddRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) {
super(shardId, id, retainingSequenceNumber, source);
}
}
public static class RenewRequest extends AddOrRenewRequest<RenewRequest> {
public RenewRequest() {
}
public RenewRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) {
super(shardId, id, retainingSequenceNumber, source);
}
}
public static class RemoveRequest extends Request<RemoveRequest> {
public RemoveRequest() {
}
public RemoveRequest(final ShardId shardId, final String id) {
super(shardId, id);
}
}
public static class Response extends ActionResponse {
}
}

View File

@ -167,6 +167,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
@ -1739,8 +1740,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed
*/
public Closeable acquireRetentionLockForPeerRecovery() {
return getEngine().acquireRetentionLockForPeerRecovery();
public Closeable acquireRetentionLock() {
return getEngine().acquireRetentionLock();
}
/**
@ -1760,12 +1761,21 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/**
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireRetentionLockForPeerRecovery()}
* This method should be called after acquiring the retention lock; See {@link #acquireRetentionLock()}
*/
public boolean hasCompleteHistoryOperations(String source, long startingSeqNo) throws IOException {
return getEngine().hasCompleteOperationHistory(source, mapperService, startingSeqNo);
}
/**
* Gets the minimum retained sequence number for this engine.
*
* @return the minimum retained sequence number
*/
public long getMinRetainedSeqNo() {
return getEngine().getMinRetainedSeqNo();
}
/**
* Creates a new changes snapshot for reading operations whose seq_no are between {@code fromSeqNo}(inclusive)
* and {@code toSeqNo}(inclusive). The caller has to close the returned snapshot after finishing the reading.
@ -1938,7 +1948,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
Objects.requireNonNull(listener);
assert assertPrimaryMode();
verifyNotClosed();
return replicationTracker.addRetentionLease(id, retainingSequenceNumber, source, listener);
try (Closeable ignore = acquireRetentionLock()) {
final long actualRetainingSequenceNumber =
retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
return replicationTracker.addRetentionLease(id, actualRetainingSequenceNumber, source, listener);
} catch (final IOException e) {
throw new AssertionError(e);
}
}
/**
@ -1953,7 +1969,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert assertPrimaryMode();
verifyNotClosed();
return replicationTracker.renewRetentionLease(id, retainingSequenceNumber, source);
try (Closeable ignore = acquireRetentionLock()) {
final long actualRetainingSequenceNumber =
retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber;
return replicationTracker.renewRetentionLease(id, actualRetainingSequenceNumber, source);
} catch (final IOException e) {
throw new AssertionError(e);
}
}
/**

View File

@ -155,7 +155,7 @@ public class RecoverySourceHandler {
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",
shard, cancellableThreads, logger);
final Closeable retentionLock = shard.acquireRetentionLockForPeerRecovery();
final Closeable retentionLock = shard.acquireRetentionLock();
resources.add(retentionLock);
final long startingSeqNo;
final long requiredSeqNoRangeStart;

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.engine;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -5375,7 +5374,7 @@ public class InternalEngineTests extends EngineTestCase {
if (rarely()) {
engine.forceMerge(randomBoolean());
}
try (Closeable ignored = engine.acquireRetentionLockForPeerRecovery()) {
try (Closeable ignored = engine.acquireRetentionLock()) {
long minRetainSeqNos = engine.getMinRetainedSeqNo();
assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1));
Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new);

View File

@ -0,0 +1,533 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
public class RetentionLeaseActionsTests extends ESSingleNodeTestCase {
public void testAddAction() {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexService indexService = createIndex("index", settings);
ensureGreen("index");
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
client()
.execute(
RetentionLeaseActions.Add.INSTANCE,
new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source))
.actionGet();
final IndicesStatsResponse stats = client()
.execute(
IndicesStatsAction.INSTANCE,
new IndicesStatsRequest().indices("index"))
.actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(1));
assertNotNull(stats.getShards()[0].getRetentionLeaseStats());
assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1));
final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next();
assertThat(retentionLease.id(), equalTo(id));
assertThat(retentionLease.retainingSequenceNumber(), equalTo(retainingSequenceNumber == RETAIN_ALL ? 0L : retainingSequenceNumber));
assertThat(retentionLease.source(), equalTo(source));
}
public void testAddAlreadyExists() {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexService indexService = createIndex("index", settings);
ensureGreen("index");
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
client()
.execute(
RetentionLeaseActions.Add.INSTANCE,
new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source))
.actionGet();
final long nextRetainingSequenceNumber =
retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL
: randomLongBetween(Math.max(retainingSequenceNumber, 0L), Long.MAX_VALUE);
final RetentionLeaseAlreadyExistsException e = expectThrows(
RetentionLeaseAlreadyExistsException.class,
() -> client()
.execute(
RetentionLeaseActions.Add.INSTANCE,
new RetentionLeaseActions.AddRequest(
indexService.getShard(0).shardId(),
id,
nextRetainingSequenceNumber,
source))
.actionGet());
assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] already exists")));
}
public void testRenewAction() throws InterruptedException {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexService indexService = createIndex("index", settings);
ensureGreen("index");
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
/*
* When we renew the lease, we want to ensure that the timestamp on the thread pool clock has advanced. To do this, we sample how
* often the thread pool clock advances based on the following setting. After we add the initial lease we sample the relative time.
* Immediately before the renewal of the lease, we sleep long enough to ensure that an estimated time interval has elapsed, and
* sample the thread pool to ensure the clock has in fact advanced.
*/
final TimeValue estimatedTimeInterval = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(getInstanceFromNode(Node.class).settings());
client()
.execute(
RetentionLeaseActions.Add.INSTANCE,
new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source))
.actionGet();
/*
* Sample these after adding the retention lease so that advancement here guarantees we have advanced past the timestamp on the
* lease.
*/
final ThreadPool threadPool = getInstanceFromNode(ThreadPool.class);
final long timestampUpperBound = threadPool.absoluteTimeInMillis();
final long start = System.nanoTime();
final IndicesStatsResponse initialStats = client()
.execute(
IndicesStatsAction.INSTANCE,
new IndicesStatsRequest().indices("index"))
.actionGet();
assertNotNull(initialStats.getShards());
assertThat(initialStats.getShards(), arrayWithSize(1));
assertNotNull(initialStats.getShards()[0].getRetentionLeaseStats());
assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1));
final RetentionLease initialRetentionLease =
initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next();
final long nextRetainingSequenceNumber =
retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL
: randomLongBetween(Math.max(retainingSequenceNumber, 0L), Long.MAX_VALUE);
/*
* Wait until the thread pool clock advances. Note that this will fail on a system when the system clock goes backwards during
* execution of the test. The specific circumstances under which this can fail is if the system clock goes backwards more than the
* suite timeout. It seems there is nothing simple that we can do to avoid this?
*/
do {
final long end = System.nanoTime();
if (end - start < estimatedTimeInterval.nanos()) {
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(estimatedTimeInterval.nanos() - (end - start)));
}
} while (threadPool.absoluteTimeInMillis() <= timestampUpperBound);
client()
.execute(
RetentionLeaseActions.Renew.INSTANCE,
new RetentionLeaseActions.RenewRequest(indexService.getShard(0).shardId(), id, nextRetainingSequenceNumber, source))
.actionGet();
final IndicesStatsResponse renewedStats = client()
.execute(
IndicesStatsAction.INSTANCE,
new IndicesStatsRequest().indices("index"))
.actionGet();
assertNotNull(renewedStats.getShards());
assertThat(renewedStats.getShards(), arrayWithSize(1));
assertNotNull(renewedStats.getShards()[0].getRetentionLeaseStats());
assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1));
final RetentionLease renewedRetentionLease =
renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next();
assertThat(renewedRetentionLease.id(), equalTo(id));
assertThat(
renewedRetentionLease.retainingSequenceNumber(),
equalTo(nextRetainingSequenceNumber == RETAIN_ALL ? 0L : nextRetainingSequenceNumber));
assertThat(renewedRetentionLease.timestamp(), greaterThan(initialRetentionLease.timestamp()));
assertThat(renewedRetentionLease.source(), equalTo(source));
}
public void testRenewNotFound() {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexService indexService = createIndex("index", settings);
ensureGreen("index");
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
final RetentionLeaseNotFoundException e = expectThrows(
RetentionLeaseNotFoundException.class,
() -> client()
.execute(
RetentionLeaseActions.Renew.INSTANCE,
new RetentionLeaseActions.RenewRequest(
indexService.getShard(0).shardId(),
id,
retainingSequenceNumber,
source))
.actionGet());
assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] not found")));
}
public void testRemoveAction() {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexService indexService = createIndex("index", settings);
ensureGreen("index");
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
client()
.execute(
RetentionLeaseActions.Add.INSTANCE,
new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source))
.actionGet();
client()
.execute(
RetentionLeaseActions.Remove.INSTANCE,
new RetentionLeaseActions.RemoveRequest(indexService.getShard(0).shardId(), id))
.actionGet();
final IndicesStatsResponse stats = client()
.execute(
IndicesStatsAction.INSTANCE,
new IndicesStatsRequest().indices("index"))
.actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(1));
assertNotNull(stats.getShards()[0].getRetentionLeaseStats());
assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(0));
}
public void testRemoveNotFound() {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexService indexService = createIndex("index", settings);
ensureGreen("index");
final String id = randomAlphaOfLength(8);
final RetentionLeaseNotFoundException e = expectThrows(
RetentionLeaseNotFoundException.class,
() -> client()
.execute(
RetentionLeaseActions.Remove.INSTANCE,
new RetentionLeaseActions.RemoveRequest(indexService.getShard(0).shardId(), id))
.actionGet());
assertThat(e, hasToString(containsString("retention lease with ID [" + id + "] not found")));
}
public void testAddUnderBlock() throws InterruptedException {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexService indexService = createIndex("index", settings);
ensureGreen("index");
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
runActionUnderBlockTest(
indexService,
(shardId, actionLatch) ->
client().execute(
RetentionLeaseActions.Add.INSTANCE,
new RetentionLeaseActions.AddRequest(shardId, id, retainingSequenceNumber, source),
new ActionListener<RetentionLeaseActions.Response>() {
@Override
public void onResponse(final RetentionLeaseActions.Response response) {
actionLatch.countDown();
}
@Override
public void onFailure(final Exception e) {
fail(e.toString());
}
}));
final IndicesStatsResponse stats = client()
.execute(
IndicesStatsAction.INSTANCE,
new IndicesStatsRequest().indices("index"))
.actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(1));
assertNotNull(stats.getShards()[0].getRetentionLeaseStats());
assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1));
final RetentionLease retentionLease = stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next();
assertThat(retentionLease.id(), equalTo(id));
assertThat(retentionLease.retainingSequenceNumber(), equalTo(retainingSequenceNumber == RETAIN_ALL ? 0L : retainingSequenceNumber));
assertThat(retentionLease.source(), equalTo(source));
}
public void testRenewUnderBlock() throws InterruptedException {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexService indexService = createIndex("index", settings);
ensureGreen("index");
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
/*
* When we renew the lease, we want to ensure that the timestamp on the thread pool clock has advanced. To do this, we sample how
* often the thread pool clock advances based on the following setting. After we add the initial lease we sample the relative time.
* Immediately before the renewal of the lease, we sleep long enough to ensure that an estimated time interval has elapsed, and
* sample the thread pool to ensure the clock has in fact advanced.
*/
final TimeValue estimatedTimeInterval = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(getInstanceFromNode(Node.class).settings());
client()
.execute(
RetentionLeaseActions.Add.INSTANCE,
new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source))
.actionGet();
/*
* Sample these after adding the retention lease so that advancement here guarantees we have advanced past the timestamp on the
* lease.
*/
final ThreadPool threadPool = getInstanceFromNode(ThreadPool.class);
final long timestampUpperBound = threadPool.absoluteTimeInMillis();
final long start = System.nanoTime();
final IndicesStatsResponse initialStats = client()
.execute(
IndicesStatsAction.INSTANCE,
new IndicesStatsRequest().indices("index"))
.actionGet();
assertNotNull(initialStats.getShards());
assertThat(initialStats.getShards(), arrayWithSize(1));
assertNotNull(initialStats.getShards()[0].getRetentionLeaseStats());
assertThat(initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1));
final RetentionLease initialRetentionLease =
initialStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next();
final long nextRetainingSequenceNumber =
retainingSequenceNumber == RETAIN_ALL && randomBoolean() ? RETAIN_ALL
: randomLongBetween(Math.max(retainingSequenceNumber, 0L), Long.MAX_VALUE);
/*
* Wait until the thread pool clock advances. Note that this will fail on a system when the system clock goes backwards during
* execution of the test. The specific circumstances under which this can fail is if the system clock goes backwards more than the
* suite timeout. It seems there is nothing simple that we can do to avoid this?
*/
do {
final long end = System.nanoTime();
if (end - start < estimatedTimeInterval.nanos()) {
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(estimatedTimeInterval.nanos() - (end - start)));
}
} while (threadPool.absoluteTimeInMillis() <= timestampUpperBound);
runActionUnderBlockTest(
indexService,
(shardId, actionLatch) ->
client().execute(
RetentionLeaseActions.Renew.INSTANCE,
new RetentionLeaseActions.RenewRequest(shardId, id, nextRetainingSequenceNumber, source),
new ActionListener<RetentionLeaseActions.Response>() {
@Override
public void onResponse(final RetentionLeaseActions.Response response) {
actionLatch.countDown();
}
@Override
public void onFailure(final Exception e) {
fail(e.toString());
}
}));
final IndicesStatsResponse renewedStats = client()
.execute(
IndicesStatsAction.INSTANCE,
new IndicesStatsRequest().indices("index"))
.actionGet();
assertNotNull(renewedStats.getShards());
assertThat(renewedStats.getShards(), arrayWithSize(1));
assertNotNull(renewedStats.getShards()[0].getRetentionLeaseStats());
assertThat(renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(1));
final RetentionLease renewedRetentionLease =
renewedStats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases().iterator().next();
assertThat(renewedRetentionLease.id(), equalTo(id));
assertThat(
renewedRetentionLease.retainingSequenceNumber(),
equalTo(nextRetainingSequenceNumber == RETAIN_ALL ? 0L : nextRetainingSequenceNumber));
assertThat(renewedRetentionLease.timestamp(), greaterThan(initialRetentionLease.timestamp()));
assertThat(renewedRetentionLease.source(), equalTo(source));
}
public void testRemoveUnderBlock() throws InterruptedException {
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexService indexService = createIndex("index", settings);
ensureGreen("index");
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomBoolean() ? RETAIN_ALL : randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
client()
.execute(
RetentionLeaseActions.Add.INSTANCE,
new RetentionLeaseActions.AddRequest(indexService.getShard(0).shardId(), id, retainingSequenceNumber, source))
.actionGet();
runActionUnderBlockTest(
indexService,
(shardId, actionLatch) ->
client().execute(
RetentionLeaseActions.Remove.INSTANCE,
new RetentionLeaseActions.RemoveRequest(shardId, id),
new ActionListener<RetentionLeaseActions.Response>() {
@Override
public void onResponse(final RetentionLeaseActions.Response response) {
actionLatch.countDown();
}
@Override
public void onFailure(final Exception e) {
fail(e.toString());
}
}));
final IndicesStatsResponse stats = client()
.execute(
IndicesStatsAction.INSTANCE,
new IndicesStatsRequest().indices("index"))
.actionGet();
assertNotNull(stats.getShards());
assertThat(stats.getShards(), arrayWithSize(1));
assertNotNull(stats.getShards()[0].getRetentionLeaseStats());
assertThat(stats.getShards()[0].getRetentionLeaseStats().retentionLeases().leases(), hasSize(0));
}
/*
* Tests that use this method are ensuring that the asynchronous usage of the permits API when permit acquisition is blocked is
* correctly handler. In these scenarios, we first acquire all permits. Then we submit a request to one of the retention lease actions
* (via the consumer callback). That invocation will go asynchronous and be queued, since all permits are blocked. Then we release the
* permit block and except that the callbacks occur correctly. These assertions happen after returning from this method.
*/
private void runActionUnderBlockTest(
final IndexService indexService,
final BiConsumer<ShardId, CountDownLatch> consumer) throws InterruptedException {
final CountDownLatch blockedLatch = new CountDownLatch(1);
final CountDownLatch unblockLatch = new CountDownLatch(1);
indexService.getShard(0).acquireAllPrimaryOperationsPermits(
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignore = releasable) {
blockedLatch.countDown();
unblockLatch.await();
} catch (final InterruptedException e) {
onFailure(e);
}
}
@Override
public void onFailure(final Exception e) {
fail(e.toString());
}
},
TimeValue.timeValueHours(1));
blockedLatch.await();
final CountDownLatch actionLatch = new CountDownLatch(1);
consumer.accept(indexService.getShard(0).shardId(), actionLatch);
unblockLatch.countDown();
actionLatch.await();
}
}

View File

@ -98,7 +98,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
// simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await();
retentionLock.close();
@ -149,7 +149,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
// simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await();
retentionLock.close();
@ -160,7 +160,7 @@ public class RetentionLeaseIT extends ESIntegTestCase {
final CountDownLatch latch = new CountDownLatch(1);
primary.removeRetentionLease(id, ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())));
// simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLock() : () -> {};
currentRetentionLeases.remove(id);
latch.await();
retentionLock.close();