Change the reindex fetch in policy runner from 1000 to 10000 and (#41838)

Reindex uses scroll searches to read the source data. It is more efficient
to read more data in one search scroll round then several. I think 10000
is a good sweet spot.

Relates to #32789
This commit is contained in:
Martijn van Groningen 2019-05-07 11:07:46 +02:00
parent d709b8bb97
commit 1b00e7f834
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
4 changed files with 29 additions and 10 deletions

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -54,6 +55,9 @@ import static org.elasticsearch.xpack.core.XPackSettings.ENRICH_ENABLED_SETTING;
public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
static final Setting<Integer> ENRICH_FETCH_SIZE_SETTING =
Setting.intSetting("index.xpack.enrich.fetch_size", 10000, 1, 1000000, Setting.Property.NodeScope);
private final Settings settings;
private final Boolean enabled;
@ -107,7 +111,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(clusterService, client, threadPool,
EnrichPolicyExecutor enrichPolicyExecutor = new EnrichPolicyExecutor(settings, clusterService, client, threadPool,
new IndexNameExpressionResolver(), System::currentTimeMillis);
return Collections.singleton(enrichPolicyExecutor);
}
@ -122,4 +126,9 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
return Collections.singletonList(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(EnrichMetadata.TYPE),
EnrichMetadata::fromXContent));
}
@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(ENRICH_FETCH_SIZE_SETTING);
}
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@ -23,14 +24,20 @@ class EnrichPolicyExecutor {
private final ThreadPool threadPool;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final LongSupplier nowSupplier;
private final int fetchSize;
EnrichPolicyExecutor(ClusterService clusterService, Client client, ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier) {
EnrichPolicyExecutor(Settings settings,
ClusterService clusterService,
Client client,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
LongSupplier nowSupplier) {
this.clusterService = clusterService;
this.client = client;
this.threadPool = threadPool;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.nowSupplier = nowSupplier;
this.fetchSize = EnrichPlugin.ENRICH_FETCH_SIZE_SETTING.get(settings);
}
public void runPolicy(String policyId, ActionListener<PolicyExecutionResult> listener) {
@ -44,8 +51,8 @@ class EnrichPolicyExecutor {
}
public void runPolicy(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
EnrichPolicyRunner runnable =
new EnrichPolicyRunner(policyName, policy, listener, clusterService, client, indexNameExpressionResolver, nowSupplier);
EnrichPolicyRunner runnable = new EnrichPolicyRunner(policyName, policy, listener, clusterService, client,
indexNameExpressionResolver, nowSupplier, fetchSize);
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
}
}

View File

@ -56,10 +56,11 @@ public class EnrichPolicyRunner implements Runnable {
private final Client client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final LongSupplier nowSupplier;
private final int fetchSize;
EnrichPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener,
ClusterService clusterService, Client client, IndexNameExpressionResolver indexNameExpressionResolver,
LongSupplier nowSupplier) {
LongSupplier nowSupplier, int fetchSize) {
this.policyName = policyName;
this.policy = policy;
this.listener = listener;
@ -67,6 +68,7 @@ public class EnrichPolicyRunner implements Runnable {
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.nowSupplier = nowSupplier;
this.fetchSize = fetchSize;
}
@Override
@ -181,6 +183,7 @@ public class EnrichPolicyRunner implements Runnable {
retainFields.add(policy.getEnrichKey());
retainFields.addAll(policy.getEnrichValues());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(fetchSize);
searchSourceBuilder.fetchSource(retainFields.toArray(new String[0]), new String[0]);
if (policy.getQuery() != null) {
searchSourceBuilder.query(QueryBuilders.wrapperQuery(policy.getQuery().getQuery()));

View File

@ -106,8 +106,8 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
}
};
EnrichPolicyRunner enrichPolicyRunner =
new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, () -> createTime);
EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner(policyName, policy, listener, clusterService,
client(), resolver, () -> createTime, randomIntBetween(1, 10000));
logger.info("Starting policy run");
@ -222,8 +222,8 @@ public class EnrichPolicyRunnerTests extends ESSingleNodeTestCase {
}
};
EnrichPolicyRunner enrichPolicyRunner =
new EnrichPolicyRunner(policyName, policy, listener, clusterService, client(), resolver, () -> createTime);
EnrichPolicyRunner enrichPolicyRunner = new EnrichPolicyRunner(policyName, policy, listener, clusterService,
client(), resolver, () -> createTime, randomIntBetween(1, 10000));
logger.info("Starting policy run");