From 4e6bbf6e3c5c01b1a74395de2faa486cc21989ad Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 20 Mar 2020 07:34:37 -0400 Subject: [PATCH] Execute retention lease syncs under system context (#53838) The retention lease syncs need to occur under the system context, because they are internal actions executed on behalf of the user. Today we are relying on this happening for background syncs by virtue of the fact that the context the syncs are created under is the system context. This is due to these occurring on the cluster state applier thread. However, there are situations where this does not hold such as when a timed out cluster state publication occurs, and the node where the shard is allocated is the elected master node. In that case, the context will be empty due to the fact that we do not reschedule publication under the system context. Currently, doing so runs us into some troubles with losing the existing context, possibly dropping deprecation headers. We could copy that context over when marking the current context as the system context, but the implications of that require some more investigation. For now, we explicitly mark the retention lease syncs as executing under the system context, as this is situation that we can reason about. --- .../util/concurrent/AbstractAsyncTask.java | 2 +- .../RetentionLeaseBackgroundSyncAction.java | 78 ++++++++++--------- .../index/seqno/RetentionLeaseSyncAction.java | 72 +++++++++-------- 3 files changed, 82 insertions(+), 70 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java index 58e48f02155..1ef9a484a27 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java @@ -91,7 +91,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable { if (logger.isTraceEnabled()) { logger.trace("scheduling {} every {}", toString(), interval); } - cancellable = threadPool.schedule(threadPool.preserveContext(this), interval, getThreadPool()); + cancellable = threadPool.schedule(this, interval, getThreadPool()); isScheduledOrRunning = true; } else { logger.trace("scheduled {} disabled", toString()); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 1f6dd80a9f8..e4b82b34175 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -101,44 +102,49 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi } final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) { - final Request request = new Request(shardId, retentionLeases); - final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request); - transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction, - new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), - task, - transportOptions, - new TransportResponseHandler() { - @Override - public ReplicationResponse read(StreamInput in) throws IOException { - return newResponseInstance(in); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public void handleResponse(ReplicationResponse response) { - task.setPhase("finished"); - taskManager.unregister(task); - } - - @Override - public void handleException(TransportException e) { - task.setPhase("finished"); - taskManager.unregister(task); - if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { - // node shutting down - return; + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we have to execute under the system context so that if security is enabled the sync is authorized + threadContext.markAsSystemContext(); + final Request request = new Request(shardId, retentionLeases); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request); + transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); } - if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) { - // the shard is closed - return; + + @Override + public String executor() { + return ThreadPool.Names.SAME; } - getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); - } - }); + + @Override + public void handleResponse(ReplicationResponse response) { + task.setPhase("finished"); + taskManager.unregister(task); + } + + @Override + public void handleException(TransportException e) { + task.setPhase("finished"); + taskManager.unregister(task); + if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) { + // node shutting down + return; + } + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) { + // the shard is closed + return; + } + getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e); + } + }); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 44da3318698..6b906792afc 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -99,40 +100,45 @@ public class RetentionLeaseSyncAction extends final void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases, ActionListener listener) { - final Request request = new Request(shardId, retentionLeases); - final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request); - transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction, - new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), - task, - transportOptions, - new TransportResponseHandler() { - @Override - public ReplicationResponse read(StreamInput in) throws IOException { - return newResponseInstance(in); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public void handleResponse(ReplicationResponse response) { - task.setPhase("finished"); - taskManager.unregister(task); - listener.onResponse(response); - } - - @Override - public void handleException(TransportException e) { - if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { - getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we have to execute under the system context so that if security is enabled the sync is authorized + threadContext.markAsSystemContext(); + final Request request = new Request(shardId, retentionLeases); + final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request); + transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction, + new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm), + task, + transportOptions, + new TransportResponseHandler() { + @Override + public ReplicationResponse read(StreamInput in) throws IOException { + return newResponseInstance(in); } - task.setPhase("finished"); - taskManager.unregister(task); - listener.onFailure(e); - } - }); + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public void handleResponse(ReplicationResponse response) { + task.setPhase("finished"); + taskManager.unregister(task); + listener.onResponse(response); + } + + @Override + public void handleException(TransportException e) { + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e); + } + task.setPhase("finished"); + taskManager.unregister(task); + listener.onFailure(e); + } + }); + } } @Override