Expose Engine.Searcher provider to ingest plugins. (#41010)

Relates to #32789
This commit is contained in:
Martijn van Groningen 2019-04-24 20:28:50 +02:00
parent 38e6dcd388
commit a61ec11f36
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
5 changed files with 322 additions and 9 deletions

View File

@ -30,11 +30,13 @@ import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
@ -45,8 +47,12 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
@ -86,9 +92,10 @@ public class IngestService implements ClusterStateApplier {
public IngestService(ClusterService clusterService, ThreadPool threadPool,
Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry,
List<IngestPlugin> ingestPlugins) {
List<IngestPlugin> ingestPlugins, IndicesService indicesService) {
this.clusterService = clusterService;
this.scriptService = scriptService;
final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
this.processorFactories = processorFactories(
ingestPlugins,
new Processor.Parameters(
@ -96,7 +103,32 @@ public class IngestService implements ClusterStateApplier {
threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
(delay, command) -> threadPool.schedule(
command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC
), this
), this, indexExpression -> {
ClusterState state = clusterService.state();
Index[] resolvedIndices = resolver.concreteIndices(state, IndicesOptions.STRICT_EXPAND_OPEN, indexExpression);
if (resolvedIndices.length != 1) {
throw new IllegalStateException("expression [" + indexExpression + "] can only point to a single concrete index");
}
Index index = resolvedIndices[0];
// check if indexExpression matches with an alias that has a filter
// There is no guarantee that alias filters are applied, so fail if this is the case.
Set<String> indicesAndAliases = resolver.resolveExpressions(state, indexExpression);
String[] aliasesWithFilter = resolver.filteringAliases(state, index.getName(), indicesAndAliases);
if (aliasesWithFilter != null && aliasesWithFilter.length > 0) {
throw new IllegalStateException("expression [" + indexExpression + "] points an alias with a filter");
}
IndexService indexService = indicesService.indexServiceSafe(index);
int numShards = indexService.getMetaData().getNumberOfShards();
if (numShards != 1) {
throw new IllegalStateException("index [" + index.getName() + "] must have 1 shard, but has " + numShards +
" shards");
}
IndexShard indexShard = indexService.getShard(0);
return indexShard.acquireSearcher("ingest");
}
)
);
this.threadPool = threadPool;

View File

@ -22,11 +22,13 @@ package org.elasticsearch.ingest;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.Scheduler;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
/**
@ -110,9 +112,16 @@ public interface Processor {
*/
public final BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler;
/**
* Provides access to an engine searcher of a locally allocated index specified for the provided index.
*
* The locally allocated index must be have a single primary shard.
*/
public final Function<String, Engine.Searcher> localShardSearcher;
public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> scheduler,
IngestService ingestService) {
IngestService ingestService, Function<String, Engine.Searcher> localShardSearcher) {
this.env = env;
this.scriptService = scriptService;
this.threadContext = threadContext;
@ -120,6 +129,7 @@ public interface Processor {
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
this.ingestService = ingestService;
this.localShardSearcher = localShardSearcher;
}
}

View File

@ -348,8 +348,6 @@ public class Node implements Closeable {
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addStateApplier(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
@ -419,6 +417,10 @@ public class Node implements Closeable {
threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays,
scriptModule.getScriptService(), client, metaStateService, engineFactoryProviders, indexStoreFactories);
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(),
pluginsService.filterPlugins(IngestPlugin.class), indicesService);
final AliasValidator aliasValidator = new AliasValidator();
final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(

View File

@ -0,0 +1,269 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
import org.apache.lucene.index.LeafReader;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class IngestLocalShardSearcherTests extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singleton(TestPlugin.class);
}
public void testLocalShardSearcher() throws Exception {
client().index(new IndexRequest("reference-index").id("1").source("{}", XContentType.JSON)).actionGet();
client().admin().indices().refresh(new RefreshRequest("reference-index")).actionGet();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet();
client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet();
Map<String, Object> result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap();
assertThat(result.size(), equalTo(1));
assertThat(result.get("id"), equalTo("1"));
}
public void testMultipleIndicesAreResolved() throws Exception {
createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1")
.addAlias(new Alias("reference-index")));
createIndex("reference-index2", client().admin().indices().prepareCreate("reference-index2")
.addAlias(new Alias("reference-index")));
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline");
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet());
assertThat(e.getRootCause(), instanceOf(IllegalStateException.class));
assertThat(e.getRootCause().getMessage(), equalTo("expression [reference-index] can only point to a single concrete index"));
}
public void testMoreThanOnePrimaryShard() throws Exception {
createIndex("reference-index", Settings.builder().put("index.number_of_shards", 2).build());
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline");
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet());
assertThat(e.getRootCause(), instanceOf(IllegalStateException.class));
assertThat(e.getRootCause().getMessage(), equalTo("index [reference-index] must have 1 shard, but has 2 shards"));
}
public void testFailWithFilteredAlias() throws Exception {
createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1")
.addAlias(new Alias("reference-index").filter(QueryBuilders.matchAllQuery())));
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
IndexRequest indexRequest = new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline");
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().index(indexRequest).actionGet());
assertThat(e.getRootCause(), instanceOf(IllegalStateException.class));
assertThat(e.getRootCause().getMessage(), equalTo("expression [reference-index] points an alias with a filter"));
}
public void testWithAlias() throws Exception {
createIndex("reference-index1", client().admin().indices().prepareCreate("reference-index1")
.addAlias(new Alias("reference-index")));
client().index(new IndexRequest("reference-index1").id("1").source("{}", XContentType.JSON)).actionGet();
client().admin().indices().refresh(new RefreshRequest("reference-index1")).actionGet();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet();
client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet();
Map<String, Object> result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap();
assertThat(result.size(), equalTo(1));
assertThat(result.get("id"), equalTo("1"));
}
public void testSearchWrapperIsApplied() throws Exception {
client().index(new IndexRequest("reference-index").id("1").source("{}", XContentType.JSON)).actionGet();
client().admin().indices().refresh(new RefreshRequest("reference-index")).actionGet();
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("my-pipeline", createPipelineSource(), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();
client().index(new IndexRequest("my-index").id("1").source("{}", XContentType.JSON).setPipeline("my-pipeline")).actionGet();
client().admin().indices().refresh(new RefreshRequest("my-index")).actionGet();
Map<String, Object> result = client().get(new GetRequest("my-index", "1")).actionGet().getSourceAsMap();
assertThat(result.size(), equalTo(1));
assertThat(result.get("id"), equalTo("1"));
}
private static BytesReference createPipelineSource() throws IOException {
return BytesReference.bytes(jsonBuilder().startObject()
.startArray("processors")
.startObject()
.startObject(TestProcessor.NAME)
.endObject()
.endObject()
.endArray()
.endObject());
}
public static class TestPlugin extends Plugin implements IngestPlugin {
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap(TestProcessor.NAME, new TestProcessor.Factory(parameters.localShardSearcher));
}
@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.setSearcherWrapper(indexService -> new IndexSearcherWrapper() {
@Override
protected DirectoryReader wrap(DirectoryReader reader) throws IOException {
return new TestDirectyReader(reader);
}
});
}
}
static class TestProcessor extends AbstractProcessor {
static final String NAME = "test_processor";
private final Function<String, Engine.Searcher> localShardSearcher;
TestProcessor(String tag, Function<String, Engine.Searcher> localShardSearcher) {
super(tag);
this.localShardSearcher = localShardSearcher;
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
String indexExpression = "reference-index";
try (Engine.Searcher engineSearcher = localShardSearcher.apply(indexExpression)) {
// Ensure that search wrapper has been invoked by checking the directory instance type:
if ((engineSearcher.getDirectoryReader() instanceof TestDirectyReader) == false) {
// asserting or throwing a AssertionError makes this test hang:
// so just throw a runtime exception here:
throw new RuntimeException("unexpected directory instance type");
}
Document document = engineSearcher.searcher().doc(0);
ingestDocument.setFieldValue("id", Uid.decodeId(document.getBinaryValue("_id").bytes));
}
return ingestDocument;
}
@Override
public String getType() {
return NAME;
}
static class Factory implements Processor.Factory {
private final Function<String, Engine.Searcher> localShardSearcher;
Factory(Function<String, Engine.Searcher> localShardSearcher) {
this.localShardSearcher = localShardSearcher;
}
@Override
public Processor create(Map<String, Processor.Factory> processorFactories,
String tag, Map<String, Object> config) throws Exception {
return new TestProcessor(tag, localShardSearcher);
}
}
}
static class TestDirectyReader extends FilterDirectoryReader {
TestDirectyReader(DirectoryReader in) throws IOException {
super(in, new TestSubReaderWrapper());
}
@Override
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException {
return new TestDirectyReader(in);
}
@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}
static class TestSubReaderWrapper extends SubReaderWrapper {
@Override
public LeafReader wrap(LeafReader reader) {
return new TestLeafReader(reader);
}
}
static class TestLeafReader extends FilterLeafReader {
TestLeafReader(LeafReader in) {
super(in);
}
@Override
public CacheHelper getCoreCacheHelper() {
return in.getCoreCacheHelper();
}
@Override
public CacheHelper getReaderCacheHelper() {
return in.getReaderCacheHelper();
}
}
}
}

View File

@ -98,7 +98,7 @@ public class IngestServiceTests extends ESTestCase {
public void testIngestPlugin() {
ThreadPool tp = mock(ThreadPool.class);
IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null,
null, Collections.singletonList(DUMMY_PLUGIN));
null, Collections.singletonList(DUMMY_PLUGIN), null);
Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
assertTrue(factories.containsKey("foo"));
assertEquals(1, factories.size());
@ -108,7 +108,7 @@ public class IngestServiceTests extends ESTestCase {
ThreadPool tp = mock(ThreadPool.class);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
new IngestService(mock(ClusterService.class), tp, null, null,
null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN)));
null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), null));
assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
}
@ -117,7 +117,7 @@ public class IngestServiceTests extends ESTestCase {
final ExecutorService executorService = EsExecutors.newDirectExecutorService();
when(threadPool.executor(anyString())).thenReturn(executorService);
IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null,
null, Collections.singletonList(DUMMY_PLUGIN));
null, Collections.singletonList(DUMMY_PLUGIN), null);
final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id");
final SetOnce<Boolean> failure = new SetOnce<>();
@ -1001,7 +1001,7 @@ public class IngestServiceTests extends ESTestCase {
public Map<String, Processor.Factory> getProcessors(final Processor.Parameters parameters) {
return processors;
}
}));
}), null);
}
private class IngestDocumentMatcher extends ArgumentMatcher<IngestDocument> {