Introduce retention lease background sync (#38262)

This commit introduces a background sync for retention leases. The idea
here is that we do a heavyweight sync when adding a new retention lease,
and then periodically we want to background sync any retention lease
renewals to the replicas. As long as the background sync interval is
significantly lower than the extended lifetime of a retention lease, it
is okay if from time to time a replica misses a sync (it will still have
an older version of the lease that is retaining more data as we assume
that renewals do not decrease the retaining sequence number). There are
two follow-ups that will come after this commit. The first is to address
the fact that we have not adapted the should periodically flush logic to
possibly flush the retention leases. We want to do something like flush
if we have not flushed in the last five minutes and there are renewed
retention leases since the last time that we flushed. An additional
follow-up will remove the syncing of retention leases when a retention
lease expires. Today this sync could be invoked in the background by a
merge operation. Rather, we will move the syncing of retention lease
expiration to be done under the background sync. The background sync
will use the heavyweight sync (write action) if a lease has expired, and
will use the lightweight background sync (replication action) otherwise.
This commit is contained in:
Jason Tedor 2019-02-04 10:35:29 -05:00 committed by GitHub
parent 5ee7232379
commit 625d37a26a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 673 additions and 30 deletions

View File

@ -525,7 +525,7 @@ public abstract class TransportReplicationAction<
}
}
protected static class ReplicaResult {
public static class ReplicaResult {
final Exception finalFailure;
public ReplicaResult(Exception finalFailure) {

View File

@ -121,6 +121,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private volatile AsyncRefreshTask refreshTask;
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
private volatile AsyncRetentionLeaseBackgroundSyncTask retentionLeaseBackgroundSyncTask;
// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
@ -197,6 +198,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseBackgroundSyncTask = new AsyncRetentionLeaseBackgroundSyncTask(this);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}
@ -286,7 +288,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
refreshTask,
fsyncTask,
trimTranslogTask,
globalCheckpointTask);
globalCheckpointTask,
retentionLeaseBackgroundSyncTask);
}
}
}
@ -403,7 +406,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
searchOperationListeners,
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
(retentionLeases, listener) -> retentionLeaseSyncer.syncRetentionLeasesForShard(shardId, retentionLeases, listener),
retentionLeaseSyncer,
circuitBreakerService);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
@ -782,6 +785,14 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
private void maybeSyncGlobalCheckpoints() {
sync(is -> is.maybeSyncGlobalCheckpoint("background"), "global checkpoint");
}
private void backgroundSyncRetentionLeases() {
sync(IndexShard::backgroundSyncRetentionLeases, "retention lease");
}
private void sync(final Consumer<IndexShard> sync, final String source) {
for (final IndexShard shard : this.shards.values()) {
if (shard.routingEntry().active() && shard.routingEntry().primary()) {
switch (shard.state()) {
@ -795,17 +806,17 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
case STARTED:
try {
shard.runUnderPrimaryPermit(
() -> shard.maybeSyncGlobalCheckpoint("background"),
() -> sync.accept(shard),
e -> {
if (e instanceof AlreadyClosedException == false
&& e instanceof IndexShardClosedException == false) {
logger.warn(
new ParameterizedMessage(
"{} failed to execute background global checkpoint sync", shard.shardId()), e);
"{} failed to execute background {} sync", shard.shardId(), source), e);
}
},
ThreadPool.Names.SAME,
"background global checkpoint sync");
"background " + source + " sync");
} catch (final AlreadyClosedException | IndexShardClosedException e) {
// the shard was closed concurrently, continue
}
@ -911,6 +922,15 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
Property.Dynamic,
Property.IndexScope);
// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> RETENTION_LEASE_SYNC_INTERVAL_SETTING =
Setting.timeSetting(
"index.soft_deletes.retention_lease.sync_interval",
new TimeValue(5, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope);
/**
* Background task that syncs the global checkpoint to replicas.
*/
@ -937,6 +957,29 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
}
final class AsyncRetentionLeaseBackgroundSyncTask extends BaseAsyncTask {
AsyncRetentionLeaseBackgroundSyncTask(final IndexService indexService) {
super(indexService, RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
}
@Override
protected void runInternal() {
indexService.backgroundSyncRetentionLeases();
}
@Override
protected String getThreadPool() {
return ThreadPool.Names.MANAGEMENT;
}
@Override
public String toString() {
return "retention_lease_background_sync";
}
}
AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}

View File

@ -177,8 +177,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
/**
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only
* the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas.
* Get all non-expired retention leases tracked on this shard. Note that only the primary shard calculates which leases are expired,
* and if any have expired, syncs the retention leases to any replicas.
*
* @return the retention leases
*/

View File

@ -0,0 +1,185 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
/**
* Replication action responsible for background syncing retention leases to replicas. This action is deliberately a replication action so
* that if a replica misses a background retention lease sync then that shard will not be marked as stale. We have some tolerance for a
* shard copy missing renewals of retention leases since the background sync interval is much smaller than the expected lifetime of
* retention leases.
*/
public class RetentionLeaseBackgroundSyncAction extends TransportReplicationAction<
RetentionLeaseBackgroundSyncAction.Request,
RetentionLeaseBackgroundSyncAction.Request,
ReplicationResponse> {
public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync";
private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);
protected Logger getLogger() {
return LOGGER;
}
@Inject
public RetentionLeaseBackgroundSyncAction(
final Settings settings,
final TransportService transportService,
final ClusterService clusterService,
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
ACTION_NAME,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
Request::new,
Request::new,
ThreadPool.Names.MANAGEMENT);
}
/**
* Background sync the specified retention leases for the specified shard.
*
* @param shardId the shard to sync
* @param retentionLeases the retention leases to sync
*/
public void backgroundSync(
final ShardId shardId,
final RetentionLeases retentionLeases) {
Objects.requireNonNull(shardId);
Objects.requireNonNull(retentionLeases);
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
execute(
new Request(shardId, retentionLeases),
ActionListener.wrap(
r -> {},
e -> {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
}
}));
}
}
@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(final Request request, final IndexShard primary) {
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.afterWriteOperation();
return new PrimaryResult<>(request, new ReplicationResponse());
}
@Override
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica){
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.afterWriteOperation();
return new ReplicaResult();
}
public static final class Request extends ReplicationRequest<Request> {
private RetentionLeases retentionLeases;
public RetentionLeases getRetentionLeases() {
return retentionLeases;
}
public Request() {
}
public Request(final ShardId shardId, final RetentionLeases retentionLeases) {
super(Objects.requireNonNull(shardId));
this.retentionLeases = Objects.requireNonNull(retentionLeases);
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
retentionLeases = new RetentionLeases(in);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(Objects.requireNonNull(out));
retentionLeases.writeTo(out);
}
@Override
public String toString() {
return "Request{" +
"retentionLeases=" + retentionLeases +
", shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", waitForActiveShards=" + waitForActiveShards +
'}';
}
}
@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}
}

View File

@ -96,7 +96,7 @@ public class RetentionLeaseSyncAction extends
* @param retentionLeases the retention leases to sync
* @param listener the callback to invoke when the sync completes normally or abnormally
*/
public void syncRetentionLeasesForShard(
public void sync(
final ShardId shardId,
final RetentionLeases retentionLeases,
final ActionListener<ReplicationResponse> listener) {

View File

@ -27,7 +27,6 @@ import org.elasticsearch.index.shard.ShardId;
* A functional interface that represents a method for syncing retention leases to replica shards after a new retention lease is added on
* the primary.
*/
@FunctionalInterface
public interface RetentionLeaseSyncer {
/**
@ -38,9 +37,20 @@ public interface RetentionLeaseSyncer {
* @param retentionLeases the retention leases to sync
* @param listener the callback when sync completes
*/
void syncRetentionLeasesForShard(
ShardId shardId,
RetentionLeases retentionLeases,
ActionListener<ReplicationResponse> listener);
void sync(ShardId shardId, RetentionLeases retentionLeases, ActionListener<ReplicationResponse> listener);
void backgroundSync(ShardId shardId, RetentionLeases retentionLeases);
RetentionLeaseSyncer EMPTY = new RetentionLeaseSyncer() {
@Override
public void sync(final ShardId shardId, final RetentionLeases retentionLeases, final ActionListener<ReplicationResponse> listener) {
}
@Override
public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) {
}
};
}

View File

@ -109,6 +109,7 @@ import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeaseStats;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
@ -210,6 +211,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return globalCheckpointSyncer;
}
private final RetentionLeaseSyncer retentionLeaseSyncer;
@Nullable
private RecoveryState recoveryState;
@ -267,7 +270,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final List<SearchOperationListener> searchOperationListener,
final List<IndexingOperationListener> listeners,
final Runnable globalCheckpointSyncer,
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> retentionLeaseSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
@ -289,6 +292,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
listenersList.add(internalIndexingStats);
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
this.globalCheckpointSyncer = globalCheckpointSyncer;
this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer);
final List<SearchOperationListener> searchListenersList = new ArrayList<>(searchOperationListener);
searchListenersList.add(searchStats);
this.searchOperationListener = new SearchOperationListener.CompositeListener(searchListenersList, logger);
@ -319,7 +323,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
UNASSIGNED_SEQ_NO,
globalCheckpointListeners::globalCheckpointUpdated,
threadPool::absoluteTimeInMillis,
retentionLeaseSyncer);
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, retentionLeases, listener));
this.replicationTracker = replicationTracker;
// the query cache is a node-level thing, however we want the most popular filters
@ -1888,7 +1892,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned.
* Get all non-expired retention leases tracked on this shard.
*
* @return the retention leases
*/
@ -1949,6 +1953,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
replicationTracker.updateRetentionLeasesOnReplica(retentionLeases);
}
/**
* Syncs the current retention leases to all replicas.
*/
public void backgroundSyncRetentionLeases() {
assert assertPrimaryMode();
verifyNotClosed();
retentionLeaseSyncer.backgroundSync(shardId, getRetentionLeases());
}
/**
* Waits for all operations up to the provided sequence number to complete.
*

View File

@ -26,6 +26,7 @@ import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
@ -56,8 +57,10 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
@ -141,7 +144,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
final SnapshotShardsService snapshotShardsService,
final PrimaryReplicaSyncer primaryReplicaSyncer,
final GlobalCheckpointSyncAction globalCheckpointSyncAction,
final RetentionLeaseSyncAction retentionLeaseSyncAction) {
final RetentionLeaseSyncAction retentionLeaseSyncAction,
final RetentionLeaseBackgroundSyncAction retentionLeaseBackgroundSyncAction) {
this(
settings,
(AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
@ -157,7 +161,20 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
snapshotShardsService,
primaryReplicaSyncer,
globalCheckpointSyncAction::updateGlobalCheckpointForShard,
Objects.requireNonNull(retentionLeaseSyncAction)::syncRetentionLeasesForShard);
new RetentionLeaseSyncer() {
@Override
public void sync(
final ShardId shardId,
final RetentionLeases retentionLeases,
final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(retentionLeaseSyncAction).sync(shardId, retentionLeases, listener);
}
@Override
public void backgroundSync(final ShardId shardId, final RetentionLeases retentionLeases) {
Objects.requireNonNull(retentionLeaseBackgroundSyncAction).backgroundSync(shardId, retentionLeases);
}
});
}
// for tests

View File

@ -153,7 +153,9 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
final Settings settings = Settings
.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
.put(
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(),
TimeValue.timeValueMillis(retentionLeaseMillis))
.build();
final long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
@ -232,7 +234,9 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
final Settings settings = Settings
.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
.put(
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(),
TimeValue.timeValueMillis(retentionLeaseMillis))
.build();
final Map<String, Tuple<Long, Long>> retentionLeases = new HashMap<>();
final AtomicBoolean invoked = new AtomicBoolean();

View File

@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.elasticsearch.index.seqno;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.mockito.ArgumentCaptor;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase {
private ThreadPool threadPool;
private CapturingTransport transport;
private ClusterService clusterService;
private TransportService transportService;
private ShardStateAction shardStateAction;
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(getClass().getName());
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = transport.createTransportService(
clusterService.getSettings(),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(),
null,
Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();
shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
}
public void tearDown() throws Exception {
try {
IOUtils.close(transportService, clusterService, transport);
} finally {
terminate(threadPool);
}
super.tearDown();
}
public void testRetentionLeaseBackgroundSyncActionOnPrimary() {
final IndicesService indicesService = mock(IndicesService.class);
final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);
final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);
final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseBackgroundSyncAction.Request request =
new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases);
final ReplicationOperation.PrimaryResult<RetentionLeaseBackgroundSyncAction.Request> result =
action.shardOperationOnPrimary(request, indexShard);
// the retention leases on the shard should be periodically flushed
verify(indexShard).afterWriteOperation();
// we should forward the request containing the current retention leases to the replica
assertThat(result.replicaRequest(), sameInstance(request));
}
public void testRetentionLeaseBackgroundSyncActionOnReplica() {
final IndicesService indicesService = mock(IndicesService.class);
final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);
final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);
final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver());
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseBackgroundSyncAction.Request request =
new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases);
final TransportReplicationAction.ReplicaResult result = action.shardOperationOnReplica(request, indexShard);
// the retention leases on the shard should be updated
verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases);
// the retention leases on the shard should be periodically flushed
verify(indexShard).afterWriteOperation();
// the result should indicate success
final AtomicBoolean success = new AtomicBoolean();
result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString())));
assertTrue(success.get());
}
public void testRetentionLeaseSyncExecution() {
final IndicesService indicesService = mock(IndicesService.class);
final Index index = new Index("index", "uuid");
final IndexService indexService = mock(IndexService.class);
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
final int id = randomIntBetween(0, 4);
final IndexShard indexShard = mock(IndexShard.class);
when(indexService.getShard(id)).thenReturn(indexShard);
final ShardId shardId = new ShardId(index, id);
when(indexShard.shardId()).thenReturn(shardId);
final Logger retentionLeaseSyncActionLogger = mock(Logger.class);
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final AtomicBoolean invoked = new AtomicBoolean();
final RetentionLeaseBackgroundSyncAction action = new RetentionLeaseBackgroundSyncAction(
Settings.EMPTY,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexNameExpressionResolver()) {
@Override
protected void doExecute(Task task, Request request, ActionListener<ReplicationResponse> listener) {
assertTrue(threadPool.getThreadContext().isSystemContext());
assertThat(request.shardId(), sameInstance(indexShard.shardId()));
assertThat(request.getRetentionLeases(), sameInstance(retentionLeases));
if (randomBoolean()) {
listener.onResponse(new ReplicationResponse());
} else {
final Exception e = randomFrom(
new AlreadyClosedException("closed"),
new IndexShardClosedException(indexShard.shardId()),
new RuntimeException("failed"));
listener.onFailure(e);
if (e instanceof AlreadyClosedException == false && e instanceof IndexShardClosedException == false) {
final ArgumentCaptor<ParameterizedMessage> captor = ArgumentCaptor.forClass(ParameterizedMessage.class);
verify(retentionLeaseSyncActionLogger).warn(captor.capture(), same(e));
final ParameterizedMessage message = captor.getValue();
assertThat(message.getFormat(), equalTo("{} retention lease background sync failed"));
assertThat(message.getParameters(), arrayContaining(indexShard.shardId()));
}
verifyNoMoreInteractions(retentionLeaseSyncActionLogger);
}
invoked.set(true);
}
@Override
protected Logger getLogger() {
return retentionLeaseSyncActionLogger;
}
};
action.backgroundSync(indexShard.shardId(), retentionLeases);
assertTrue(invoked.get());
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
public class RetentionLeaseBackgroundSyncIT extends ESIntegTestCase {
public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING);
}
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class))
.collect(Collectors.toList());
}
public void testBackgroundRetentionLeaseSync() throws Exception {
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s")
.build();
createIndex("index", settings);
ensureGreen("index");
final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary = internalCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
// we will add multiple retention leases and expect to see them synced to all replicas
final int length = randomIntBetween(1, 8);
final Map<String, RetentionLease> currentRetentionLeases = new HashMap<>(length);
final List<String> ids = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8));
ids.add(id);
final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
// put a new lease
currentRetentionLeases.put(
id,
primary.addRetentionLease(id, retainingSequenceNumber, source, ActionListener.wrap(latch::countDown)));
latch.await();
// now renew all existing leases; we expect to see these synced to the replicas
for (int j = 0; j <= i; j++) {
currentRetentionLeases.put(
ids.get(j),
primary.renewRetentionLease(
ids.get(j),
randomLongBetween(currentRetentionLeases.get(ids.get(j)).retainingSequenceNumber(), Long.MAX_VALUE),
source));
}
assertBusy(() -> {
// check all retention leases have been synced to all replicas
for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
final String replicaShardNodeId = replicaShard.currentNodeId();
final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName();
final IndexShard replica = internalCluster()
.getInstance(IndicesService.class, replicaShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
assertThat(replica.getRetentionLeases(), equalTo(primary.getRetentionLeases()));
}
});
}
}
}

View File

@ -155,7 +155,8 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
final TransportWriteAction.WriteReplicaResult result = action.shardOperationOnReplica(request, indexShard);
final TransportWriteAction.WriteReplicaResult<RetentionLeaseSyncAction.Request> result =
action.shardOperationOnReplica(request, indexShard);
// the retention leases on the shard should be updated
verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases);
// the retention leases on the shard should be flushed
@ -229,7 +230,7 @@ public class RetentionLeaseSyncActionTests extends ESTestCase {
};
// execution happens on the test thread, so no need to register an actual listener to callback
action.syncRetentionLeasesForShard(indexShard.shardId(), retentionLeases, ActionListener.wrap(() -> {}));
action.sync(indexShard.shardId(), retentionLeases, ActionListener.wrap(() -> {}));
assertTrue(invoked.get());
}

View File

@ -65,6 +65,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
@ -669,7 +670,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
Collections.emptyList(),
Arrays.asList(listeners),
() -> {},
(leases, listener) -> {},
RetentionLeaseSyncer.EMPTY,
cbs);
}

View File

@ -110,7 +110,9 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
final Settings settings = Settings
.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
.put(
IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(),
TimeValue.timeValueMillis(retentionLeaseMillis))
.build();
// current time is mocked through the thread pool
final IndexShard indexShard = newStartedShard(primary, settings, new InternalEngineFactory());

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
@ -130,7 +131,7 @@ public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCas
newRouting = newRouting.moveToUnassigned(unassignedInfo)
.updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
IndexShard shard = index.createShard(newRouting, s -> {}, (s, leases, listener) -> {});
IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY);
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
assertEquals(5, counter.get());
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),

View File

@ -48,6 +48,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
@ -481,7 +482,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
null,
primaryReplicaSyncer,
s -> {},
(s, leases, listener) -> {});
RetentionLeaseSyncer.EMPTY);
}
private class RecordingIndicesService extends MockIndicesService {

View File

@ -97,6 +97,7 @@ import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.indices.IndicesService;
@ -896,7 +897,16 @@ public class SnapshotResiliencyTests extends ESTestCase {
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver));
indexNameExpressionResolver),
new RetentionLeaseBackgroundSyncAction(
settings,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver));
Map<Action, TransportAction> actions = new HashMap<>();
actions.put(CreateIndexAction.INSTANCE,
new TransportCreateIndexAction(

View File

@ -59,6 +59,7 @@ import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@ -385,7 +386,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
Collections.emptyList(),
Arrays.asList(listeners),
globalCheckpointSyncer,
(leases, listener) -> {},
RetentionLeaseSyncer.EMPTY,
breakerService);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
success = true;

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.core.security.authz.privilege;
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.xpack.core.security.support.Automatons;
@ -25,7 +27,8 @@ public final class SystemPrivilege extends Privilege {
"indices:admin/template/put", // needed for the TemplateUpgradeService
"indices:admin/template/delete", // needed for the TemplateUpgradeService
"indices:admin/seq_no/global_checkpoint_sync*", // needed for global checkpoint syncs
"indices:admin/seq_no/retention_lease_sync*", // needed for retention lease syncs
RetentionLeaseSyncAction.ACTION_NAME + "*", // needed for retention lease syncs
RetentionLeaseBackgroundSyncAction.ACTION_NAME + "*", // needed for background retention lease syncs
"indices:admin/settings/update" // needed for DiskThresholdMonitor.markIndicesReadOnly
);

View File

@ -130,6 +130,9 @@ public class PrivilegeTests extends ESTestCase {
assertThat(predicate.test("indices:admin/seq_no/retention_lease_sync"), is(true));
assertThat(predicate.test("indices:admin/seq_no/retention_lease_sync[p]"), is(true));
assertThat(predicate.test("indices:admin/seq_no/retention_lease_sync[r]"), is(true));
assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync"), is(true));
assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync[p]"), is(true));
assertThat(predicate.test("indices:admin/seq_no/retention_lease_background_sync[r]"), is(true));
assertThat(predicate.test("indices:admin/settings/update"), is(true));
assertThat(predicate.test("indices:admin/settings/foo"), is(false));
}

View File

@ -258,6 +258,7 @@ public class AuthorizationServiceTests extends ESTestCase {
"indices:admin/template/put",
"indices:admin/seq_no/global_checkpoint_sync",
"indices:admin/seq_no/retention_lease_sync",
"indices:admin/seq_no/retention_lease_background_sync",
"indices:admin/settings/update" };
for (String action : actions) {
authorize(authentication, action, request);