XPack Usage should run on MANAGEMENT threads (#64160)

XPack usage starts out on management threads, but depending on the
implementation of the usage plugin, they could end up running on
transport threads instead. Fixed to always reschedule on a management
thread.
This commit is contained in:
Henning Andersen 2020-10-27 16:02:46 +01:00 committed by Henning Andersen
parent 50d806dc9f
commit 0cba23e08f
1 changed files with 5 additions and 1 deletions

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.action; package org.elasticsearch.xpack.core.action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -17,6 +18,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.protocol.xpack.XPackUsageRequest; import org.elasticsearch.protocol.xpack.XPackUsageRequest;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.xpack.core.XPackFeatureSet; import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackFeatureSet.Usage; import org.elasticsearch.xpack.core.XPackFeatureSet.Usage;
import org.elasticsearch.xpack.core.common.IteratingActionListener; import org.elasticsearch.xpack.core.common.IteratingActionListener;
@ -69,12 +71,14 @@ public class TransportXPackUsageAction extends TransportMasterNodeAction<XPackUs
final AtomicReferenceArray<Usage> featureSetUsages = new AtomicReferenceArray<>(featureSets.size()); final AtomicReferenceArray<Usage> featureSetUsages = new AtomicReferenceArray<>(featureSets.size());
final AtomicInteger position = new AtomicInteger(0); final AtomicInteger position = new AtomicInteger(0);
final BiConsumer<XPackFeatureSet, ActionListener<List<Usage>>> consumer = (featureSet, iteratingListener) -> { final BiConsumer<XPackFeatureSet, ActionListener<List<Usage>>> consumer = (featureSet, iteratingListener) -> {
assert Transports.assertNotTransportThread("calculating usage can be more expensive than we allow on transport threads");
featureSet.usage(new ActionListener<Usage>() { featureSet.usage(new ActionListener<Usage>() {
@Override @Override
public void onResponse(Usage usage) { public void onResponse(Usage usage) {
featureSetUsages.set(position.getAndIncrement(), usage); featureSetUsages.set(position.getAndIncrement(), usage);
// the value sent back doesn't matter since our predicate keeps iterating // the value sent back doesn't matter since our predicate keeps iterating
iteratingListener.onResponse(Collections.emptyList()); ActionRunnable<?> invokeListener = ActionRunnable.supply(iteratingListener, Collections::emptyList);
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(invokeListener);
} }
@Override @Override