AsyncSearchMaintenanceService should stop when closing a node (#55651)

This change turns the AsyncSearchMaintenanceService into an
AbstractLifecycleComponent and ensures that the service is
stopped when a node is closing.

Closes #55646
This commit is contained in:
Jim Ferenczi 2020-04-24 09:37:58 +02:00 committed by jimczi
parent b213209f0c
commit 0a6c74b7d3
4 changed files with 36 additions and 22 deletions

View File

@ -91,8 +91,7 @@ public final class AsyncSearch extends Plugin implements ActionPlugin {
new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN,
AsyncSearchResponse::new, namedWriteableRegistry); AsyncSearchResponse::new, namedWriteableRegistry);
AsyncSearchMaintenanceService maintenanceService = AsyncSearchMaintenanceService maintenanceService =
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), settings, threadPool, indexService); new AsyncSearchMaintenanceService(clusterService, nodeEnvironment.nodeId(), settings, threadPool, indexService);
clusterService.addListener(maintenanceService);
return Collections.singletonList(maintenanceService); return Collections.singletonList(maintenanceService);
} else { } else {
return Collections.emptyList(); return Collections.emptyList();

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.search; package org.elasticsearch.xpack.search;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -24,10 +25,12 @@ public class AsyncSearchMaintenanceService extends AsyncTaskMaintenanceService {
public static final Setting<TimeValue> ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING = public static final Setting<TimeValue> ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING =
Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope); Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope);
AsyncSearchMaintenanceService(String localNodeId, AsyncSearchMaintenanceService(ClusterService clusterService,
String localNodeId,
Settings nodeSettings, Settings nodeSettings,
ThreadPool threadPool, ThreadPool threadPool,
AsyncTaskIndexService<?> indexService) { AsyncTaskIndexService<?> indexService) {
super(AsyncSearch.INDEX, localNodeId, threadPool, indexService, ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings)); super(clusterService, AsyncSearch.INDEX, localNodeId, threadPool, indexService,
ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings));
} }
} }

View File

@ -79,7 +79,7 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder() return Settings.builder()
.put(super.nodeSettings(0)) .put(super.nodeSettings(0))
.put(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(100)) .put(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(1))
.build(); .build();
} }

View File

@ -13,7 +13,8 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
@ -23,7 +24,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.atomic.AtomicBoolean; import java.io.IOException;
import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.EXPIRATION_TIME_FIELD; import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.EXPIRATION_TIME_FIELD;
@ -33,9 +34,10 @@ import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.EXPIRATIO
* Since we will have several injected implementation of this class injected into different transports, and we bind components created * Since we will have several injected implementation of this class injected into different transports, and we bind components created
* by {@linkplain org.elasticsearch.plugins.Plugin#createComponents} to their classes, we need to implement one class per binding. * by {@linkplain org.elasticsearch.plugins.Plugin#createComponents} to their classes, we need to implement one class per binding.
*/ */
public abstract class AsyncTaskMaintenanceService implements Releasable, ClusterStateListener { public abstract class AsyncTaskMaintenanceService extends AbstractLifecycleComponent implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(AsyncTaskMaintenanceService.class); private static final Logger logger = LogManager.getLogger(AsyncTaskMaintenanceService.class);
private final ClusterService clusterService;
private final String index; private final String index;
private final String localNodeId; private final String localNodeId;
private final ThreadPool threadPool; private final ThreadPool threadPool;
@ -43,14 +45,15 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster
private final TimeValue delay; private final TimeValue delay;
private boolean isCleanupRunning; private boolean isCleanupRunning;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private volatile Scheduler.Cancellable cancellable; private volatile Scheduler.Cancellable cancellable;
public AsyncTaskMaintenanceService(String index, public AsyncTaskMaintenanceService(ClusterService clusterService,
String index,
String localNodeId, String localNodeId,
ThreadPool threadPool, ThreadPool threadPool,
AsyncTaskIndexService<?> indexService, AsyncTaskIndexService<?> indexService,
TimeValue delay) { TimeValue delay) {
this.clusterService = clusterService;
this.index = index; this.index = index;
this.localNodeId = localNodeId; this.localNodeId = localNodeId;
this.threadPool = threadPool; this.threadPool = threadPool;
@ -58,6 +61,21 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster
this.delay = delay; this.delay = delay;
} }
@Override
protected void doStart() {
clusterService.addListener(this);
}
@Override
protected void doStop() {
stopCleanup();
}
@Override
protected final void doClose() throws IOException {
}
@Override @Override
public void clusterChanged(ClusterChangedEvent event) { public void clusterChanged(ClusterChangedEvent event) {
final ClusterState state = event.state(); final ClusterState state = event.state();
@ -69,12 +87,12 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster
} }
synchronized void tryStartCleanup(ClusterState state) { synchronized void tryStartCleanup(ClusterState state) {
if (isClosed.get()) { if (lifecycle.stoppedOrClosed()) {
return; return;
} }
IndexRoutingTable indexRouting = state.routingTable().index(index); IndexRoutingTable indexRouting = state.routingTable().index(index);
if (indexRouting == null) { if (indexRouting == null) {
stop(); stopCleanup();
return; return;
} }
String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId(); String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId();
@ -84,12 +102,12 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster
executeNextCleanup(); executeNextCleanup();
} }
} else { } else {
stop(); stopCleanup();
} }
} }
synchronized void executeNextCleanup() { synchronized void executeNextCleanup() {
if (isClosed.get() == false && isCleanupRunning) { if (lifecycle.stoppedOrClosed() == false && isCleanupRunning) {
long nowInMillis = System.currentTimeMillis(); long nowInMillis = System.currentTimeMillis();
DeleteByQueryRequest toDelete = new DeleteByQueryRequest(index) DeleteByQueryRequest toDelete = new DeleteByQueryRequest(index)
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis)); .setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis));
@ -99,7 +117,7 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster
} }
synchronized void scheduleNextCleanup() { synchronized void scheduleNextCleanup() {
if (isClosed.get() == false && isCleanupRunning) { if (lifecycle.stoppedOrClosed() == false && isCleanupRunning) {
try { try {
cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC); cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC);
} catch (EsRejectedExecutionException e) { } catch (EsRejectedExecutionException e) {
@ -112,7 +130,7 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster
} }
} }
synchronized void stop() { synchronized void stopCleanup() {
if (isCleanupRunning) { if (isCleanupRunning) {
if (cancellable != null && cancellable.isCancelled() == false) { if (cancellable != null && cancellable.isCancelled() == false) {
cancellable.cancel(); cancellable.cancel();
@ -120,10 +138,4 @@ public abstract class AsyncTaskMaintenanceService implements Releasable, Cluster
isCleanupRunning = false; isCleanupRunning = false;
} }
} }
@Override
public void close() {
stop();
isClosed.compareAndSet(false, true);
}
} }