From 0a6c74b7d3ddb5f9e31d2dd5f2ac2d6469ff196c Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Fri, 24 Apr 2020 09:37:58 +0200 Subject: [PATCH] AsyncSearchMaintenanceService should stop when closing a node (#55651) This change turns the AsyncSearchMaintenanceService into an AbstractLifecycleComponent and ensures that the service is stopped when a node is closing. Closes #55646 --- .../xpack/search/AsyncSearch.java | 3 +- .../search/AsyncSearchMaintenanceService.java | 7 ++- .../search/AsyncSearchIntegTestCase.java | 2 +- .../async/AsyncTaskMaintenanceService.java | 46 ++++++++++++------- 4 files changed, 36 insertions(+), 22 deletions(-) diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java index 41a25f2ce26..eda9c84b9a7 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java @@ -91,8 +91,7 @@ public final class AsyncSearch extends Plugin implements ActionPlugin { new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, namedWriteableRegistry); AsyncSearchMaintenanceService maintenanceService = - new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), settings, threadPool, indexService); - clusterService.addListener(maintenanceService); + new AsyncSearchMaintenanceService(clusterService, nodeEnvironment.nodeId(), settings, threadPool, indexService); return Collections.singletonList(maintenanceService); } else { return Collections.emptyList(); diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java index 462ad15a14b..65be6b6ba14 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.search; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -24,10 +25,12 @@ public class AsyncSearchMaintenanceService extends AsyncTaskMaintenanceService { public static final Setting ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING = Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope); - AsyncSearchMaintenanceService(String localNodeId, + AsyncSearchMaintenanceService(ClusterService clusterService, + String localNodeId, Settings nodeSettings, ThreadPool threadPool, AsyncTaskIndexService indexService) { - super(AsyncSearch.INDEX, localNodeId, threadPool, indexService, ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings)); + super(clusterService, AsyncSearch.INDEX, localNodeId, threadPool, indexService, + ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings)); } } diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index 6794a07ce61..9407651990c 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -79,7 +79,7 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase { protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() .put(super.nodeSettings(0)) - .put(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(100)) + .put(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(1)) .build(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java index c700729243c..e1dac915a6c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskMaintenanceService.java @@ -13,7 +13,8 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.gateway.GatewayService; @@ -23,7 +24,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; -import java.util.concurrent.atomic.AtomicBoolean; +import java.io.IOException; import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.EXPIRATION_TIME_FIELD; @@ -33,9 +34,10 @@ import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.EXPIRATIO * Since we will have several injected implementation of this class injected into different transports, and we bind components created * by {@linkplain org.elasticsearch.plugins.Plugin#createComponents} to their classes, we need to implement one class per binding. */ -public abstract class AsyncTaskMaintenanceService implements Releasable, ClusterStateListener { +public abstract class AsyncTaskMaintenanceService extends AbstractLifecycleComponent implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(AsyncTaskMaintenanceService.class); + private final ClusterService clusterService; private final String index; private final String localNodeId; private final ThreadPool threadPool; @@ -43,14 +45,15 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster private final TimeValue delay; private boolean isCleanupRunning; - private final AtomicBoolean isClosed = new AtomicBoolean(false); private volatile Scheduler.Cancellable cancellable; - public AsyncTaskMaintenanceService(String index, + public AsyncTaskMaintenanceService(ClusterService clusterService, + String index, String localNodeId, ThreadPool threadPool, AsyncTaskIndexService indexService, TimeValue delay) { + this.clusterService = clusterService; this.index = index; this.localNodeId = localNodeId; this.threadPool = threadPool; @@ -58,6 +61,21 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster this.delay = delay; } + + @Override + protected void doStart() { + clusterService.addListener(this); + } + + @Override + protected void doStop() { + stopCleanup(); + } + + @Override + protected final void doClose() throws IOException { + } + @Override public void clusterChanged(ClusterChangedEvent event) { final ClusterState state = event.state(); @@ -69,12 +87,12 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster } synchronized void tryStartCleanup(ClusterState state) { - if (isClosed.get()) { + if (lifecycle.stoppedOrClosed()) { return; } IndexRoutingTable indexRouting = state.routingTable().index(index); if (indexRouting == null) { - stop(); + stopCleanup(); return; } String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId(); @@ -84,12 +102,12 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster executeNextCleanup(); } } else { - stop(); + stopCleanup(); } } synchronized void executeNextCleanup() { - if (isClosed.get() == false && isCleanupRunning) { + if (lifecycle.stoppedOrClosed() == false && isCleanupRunning) { long nowInMillis = System.currentTimeMillis(); DeleteByQueryRequest toDelete = new DeleteByQueryRequest(index) .setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis)); @@ -99,7 +117,7 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster } synchronized void scheduleNextCleanup() { - if (isClosed.get() == false && isCleanupRunning) { + if (lifecycle.stoppedOrClosed() == false && isCleanupRunning) { try { cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC); } catch (EsRejectedExecutionException e) { @@ -112,7 +130,7 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster } } - synchronized void stop() { + synchronized void stopCleanup() { if (isCleanupRunning) { if (cancellable != null && cancellable.isCancelled() == false) { cancellable.cancel(); @@ -120,10 +138,4 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster isCleanupRunning = false; } } - - @Override - public void close() { - stop(); - isClosed.compareAndSet(false, true); - } }