diff --git a/core/src/main/java/org/elasticsearch/common/SearchScrollIterator.java b/core/src/main/java/org/elasticsearch/common/SearchScrollIterator.java new file mode 100644 index 00000000000..18535d1626e --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/SearchScrollIterator.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.SearchHit; + +import java.util.Collections; +import java.util.Iterator; + +/** + * An iterator that easily helps to consume all hits from a scroll search. + */ +public final class SearchScrollIterator implements Iterator { + + /** + * Creates an iterator that returns all matching hits of a scroll search via an iterator. + * The iterator will return all hits per scroll search and execute additional scroll searches + * to get more hits until all hits have been returned by the scroll search on the ES side. + */ + public static Iterable createIterator(Client client, TimeValue scrollTimeout, SearchRequest searchRequest) { + searchRequest.scroll(scrollTimeout); + SearchResponse searchResponse = client.search(searchRequest).actionGet(scrollTimeout); + if (searchResponse.getHits().getTotalHits() == 0) { + return Collections.emptyList(); + } else { + return () -> new SearchScrollIterator(client, scrollTimeout, searchResponse); + } + } + + private final Client client; + private final TimeValue scrollTimeout; + + private int currentIndex; + private SearchHit[] currentHits; + private SearchResponse searchResponse; + + private SearchScrollIterator(Client client, TimeValue scrollTimeout, SearchResponse searchResponse) { + this.client = client; + this.scrollTimeout = scrollTimeout; + this.searchResponse = searchResponse; + this.currentHits = searchResponse.getHits().getHits(); + } + + @Override + public boolean hasNext() { + if (currentIndex < currentHits.length) { + return true; + } else { + if (searchResponse == null) { + return false; + } + + SearchScrollRequest request = new SearchScrollRequest(searchResponse.getScrollId()); + request.scroll(scrollTimeout); + searchResponse = client.searchScroll(request).actionGet(scrollTimeout); + if (searchResponse.getHits().getHits().length == 0) { + searchResponse = null; + return false; + } else { + currentHits = searchResponse.getHits().getHits(); + currentIndex = 0; + return true; + } + } + } + + @Override + public SearchHit next() { + return currentHits[currentIndex++]; + } +} diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreClientTests.java b/core/src/test/java/org/elasticsearch/common/SearchScrollIteratorTests.java similarity index 54% rename from plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreClientTests.java rename to core/src/test/java/org/elasticsearch/common/SearchScrollIteratorTests.java index f670bdab7f9..886d9b94e84 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreClientTests.java +++ b/core/src/test/java/org/elasticsearch/common/SearchScrollIteratorTests.java @@ -17,33 +17,39 @@ * under the License. */ -package org.elasticsearch.plugin.ingest; +package org.elasticsearch.common; -import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESSingleNodeTestCase; import static org.hamcrest.Matchers.equalTo; -public class PipelineStoreClientTests extends ESSingleNodeTestCase { +// Not a real unit tests with mocks, but with a single node, because we mock the scroll +// search behaviour and it changes then this test will not catch this. +public class SearchScrollIteratorTests extends ESSingleNodeTestCase { - public void testReadAll() { - PipelineStoreClient reader = new PipelineStoreClient(Settings.EMPTY, node().injector()); - reader.start(); - - createIndex(PipelineStore.INDEX); - int numDocs = scaledRandomIntBetween(32, 128); + public void testSearchScrollIterator() { + createIndex("index"); + int numDocs = scaledRandomIntBetween(0, 128); for (int i = 0; i < numDocs; i++) { - client().prepareIndex(PipelineStore.INDEX, PipelineStore.TYPE, Integer.toString(i)) + client().prepareIndex("index", "type", Integer.toString(i)) .setSource("field", "value" + i) .get(); } client().admin().indices().prepareRefresh().get(); int i = 0; - for (SearchHit hit : reader.readAllPipelines()) { + SearchRequest searchRequest = new SearchRequest("index"); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + // randomize size, because that also controls how many actual searches will happen: + sourceBuilder.size(scaledRandomIntBetween(1, 10)); + searchRequest.source(sourceBuilder); + Iterable hits = SearchScrollIterator.createIterator(client(), TimeValue.timeValueSeconds(10), searchRequest); + for (SearchHit hit : hits) { assertThat(hit.getId(), equalTo(Integer.toString(i))); - assertThat(hit.getVersion(), equalTo(1l)); assertThat(hit.getSource().get("field"), equalTo("value" + i)); i++; } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java index 5cb4e703ea0..dd30334e422 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestModule.java @@ -50,7 +50,6 @@ public class IngestModule extends AbstractModule { binder().bind(IngestRestFilter.class).asEagerSingleton(); binder().bind(PipelineExecutionService.class).asEagerSingleton(); binder().bind(PipelineStore.class).asEagerSingleton(); - binder().bind(PipelineStoreClient.class).asEagerSingleton(); binder().bind(SimulateExecutionService.class).asEagerSingleton(); addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory()); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java index 34411fd60e7..5405459b28a 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java @@ -87,7 +87,7 @@ public class IngestPlugin extends Plugin { if (transportClient) { return Collections.emptyList(); } else { - return Arrays.asList(PipelineStore.class, PipelineStoreClient.class); + return Collections.singletonList(PipelineStore.class); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineDefinition.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineDefinition.java new file mode 100644 index 00000000000..d78274ce7f6 --- /dev/null +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineDefinition.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.plugin.ingest; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.ingest.Pipeline; + +import java.io.IOException; + +public class PipelineDefinition implements Writeable, ToXContent { + + private static final PipelineDefinition PROTOTYPE = new PipelineDefinition((String) null, -1, null); + + public static PipelineDefinition readPipelineDefinitionFrom(StreamInput in) throws IOException { + return PROTOTYPE.readFrom(in); + } + + private final String id; + private final long version; + private final BytesReference source; + + private final Pipeline pipeline; + + PipelineDefinition(Pipeline pipeline, long version, BytesReference source) { + this.id = pipeline.getId(); + this.version = version; + this.source = source; + this.pipeline = pipeline; + } + + PipelineDefinition(String id, long version, BytesReference source) { + this.id = id; + this.version = version; + this.source = source; + this.pipeline = null; + } + + public String getId() { + return id; + } + + public long getVersion() { + return version; + } + + public BytesReference getSource() { + return source; + } + + Pipeline getPipeline() { + return pipeline; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + PipelineDefinition holder = (PipelineDefinition) o; + return source.equals(holder.source); + } + + @Override + public int hashCode() { + return source.hashCode(); + } + + @Override + public PipelineDefinition readFrom(StreamInput in) throws IOException { + String id = in.readString(); + long version = in.readLong(); + BytesReference source = in.readBytesReference(); + return new PipelineDefinition(id, version, source); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + out.writeLong(version); + out.writeBytesReference(source); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(id); + XContentHelper.writeRawField("_source", source, builder, params); + builder.field("_version", version); + builder.endObject(); + return builder; + } +} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java index fae1457ad12..cbfbf269dd3 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java @@ -19,21 +19,39 @@ package org.elasticsearch.plugin.ingest; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.common.SearchScrollIterator; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.env.Environment; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.ingest.processor.Processor; +import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest; +import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -44,22 +62,25 @@ public class PipelineStore extends AbstractLifecycleComponent { public final static String INDEX = ".ingest"; public final static String TYPE = "pipeline"; + private final Injector injector; private final ThreadPool threadPool; + private final TimeValue scrollTimeout; private final ClusterService clusterService; private final TimeValue pipelineUpdateInterval; - private final PipelineStoreClient client; private final Pipeline.Factory factory = new Pipeline.Factory(); private final Map processorFactoryRegistry; - private volatile Map pipelines = new HashMap<>(); + private volatile Client client; + private volatile Map pipelines = new HashMap<>(); @Inject - public PipelineStore(Settings settings, ThreadPool threadPool, Environment environment, ClusterService clusterService, PipelineStoreClient client, Map processors) { + public PipelineStore(Settings settings, Injector injector, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map processors) { super(settings); + this.injector = injector; this.threadPool = threadPool; this.clusterService = clusterService; + this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30)); this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1)); - this.client = client; for (Processor.Factory factory : processors.values()) { factory.setConfigDirectory(environment.configFile()); } @@ -69,6 +90,7 @@ public class PipelineStore extends AbstractLifecycleComponent { @Override protected void doStart() { + client = injector.getInstance(Client.class); } @Override @@ -77,17 +99,43 @@ public class PipelineStore extends AbstractLifecycleComponent { @Override protected void doClose() { - for (Processor.Factory factory : processorFactoryRegistry.values()) { - try { - factory.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } + try { + IOUtils.close(processorFactoryRegistry.values()); + } catch (IOException e) { + throw new RuntimeException(e); } } + public void delete(DeletePipelineRequest request, ActionListener listener) { + DeleteRequest deleteRequest = new DeleteRequest(request); + deleteRequest.index(PipelineStore.INDEX); + deleteRequest.type(PipelineStore.TYPE); + deleteRequest.id(request.id()); + deleteRequest.refresh(true); + client.delete(deleteRequest, listener); + } + + public void put(PutPipelineRequest request, ActionListener listener) { + // validates the pipeline and processor configuration: + Map pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2(); + try { + constructPipeline(request.id(), pipelineConfig); + } catch (IOException e) { + listener.onFailure(e); + return; + } + + IndexRequest indexRequest = new IndexRequest(request); + indexRequest.index(PipelineStore.INDEX); + indexRequest.type(PipelineStore.TYPE); + indexRequest.id(request.id()); + indexRequest.source(request.source()); + indexRequest.refresh(true); + client.index(indexRequest, listener); + } + public Pipeline get(String id) { - PipelineReference ref = pipelines.get(id); + PipelineDefinition ref = pipelines.get(id); if (ref != null) { return ref.getPipeline(); } else { @@ -99,17 +147,17 @@ public class PipelineStore extends AbstractLifecycleComponent { return processorFactoryRegistry; } - public List getReference(String... ids) { - List result = new ArrayList<>(ids.length); + public List getReference(String... ids) { + List result = new ArrayList<>(ids.length); for (String id : ids) { if (Regex.isSimpleMatchPattern(id)) { - for (Map.Entry entry : pipelines.entrySet()) { + for (Map.Entry entry : pipelines.entrySet()) { if (Regex.simpleMatch(id, entry.getKey())) { result.add(entry.getValue()); } } } else { - PipelineReference reference = pipelines.get(id); + PipelineDefinition reference = pipelines.get(id); if (reference != null) { result.add(reference); } @@ -118,7 +166,7 @@ public class PipelineStore extends AbstractLifecycleComponent { return result; } - public Pipeline constructPipeline(String id, Map config) throws IOException { + Pipeline constructPipeline(String id, Map config) throws IOException { return factory.create(id, config, processorFactoryRegistry); } @@ -127,11 +175,11 @@ public class PipelineStore extends AbstractLifecycleComponent { // so for that reason the goal is to keep the update logic simple. int changed = 0; - Map newPipelines = new HashMap<>(pipelines); - for (SearchHit hit : client.readAllPipelines()) { + Map newPipelines = new HashMap<>(pipelines); + for (SearchHit hit : readAllPipelines()) { String pipelineId = hit.getId(); BytesReference pipelineSource = hit.getSourceRef(); - PipelineReference previous = newPipelines.get(pipelineId); + PipelineDefinition previous = newPipelines.get(pipelineId); if (previous != null) { if (previous.getSource().equals(pipelineSource)) { continue; @@ -140,12 +188,12 @@ public class PipelineStore extends AbstractLifecycleComponent { changed++; Pipeline pipeline = constructPipeline(hit.getId(), hit.sourceAsMap()); - newPipelines.put(pipelineId, new PipelineReference(pipeline, hit.getVersion(), pipelineSource)); + newPipelines.put(pipelineId, new PipelineDefinition(pipeline, hit.getVersion(), pipelineSource)); } int removed = 0; for (String existingPipelineId : pipelines.keySet()) { - if (!client.existPipeline(existingPipelineId)) { + if (!existPipeline(existingPipelineId)) { newPipelines.remove(existingPipelineId); removed++; } @@ -165,6 +213,23 @@ public class PipelineStore extends AbstractLifecycleComponent { } } + boolean existPipeline(String pipelineId) { + GetRequest request = new GetRequest(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId); + GetResponse response = client.get(request).actionGet(); + return response.isExists(); + } + + Iterable readAllPipelines() { + // TODO: the search should be replaced with an ingest API when it is available + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.version(true); + sourceBuilder.sort("_doc", SortOrder.ASC); + SearchRequest searchRequest = new SearchRequest(PipelineStore.INDEX); + searchRequest.source(sourceBuilder); + searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + return SearchScrollIterator.createIterator(client, scrollTimeout, searchRequest); + } + class Updater implements Runnable { @Override @@ -191,43 +256,4 @@ public class PipelineStore extends AbstractLifecycleComponent { } } - public static class PipelineReference { - - private final Pipeline pipeline; - private final long version; - private final BytesReference source; - - PipelineReference(Pipeline pipeline, long version, BytesReference source) { - this.pipeline = pipeline; - this.version = version; - this.source = source; - } - - public Pipeline getPipeline() { - return pipeline; - } - - public long getVersion() { - return version; - } - - public BytesReference getSource() { - return source; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - PipelineReference holder = (PipelineReference) o; - return source.equals(holder.source); - } - - @Override - public int hashCode() { - return source.hashCode(); - } - } - } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStoreClient.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStoreClient.java deleted file mode 100644 index c11969f840c..00000000000 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStoreClient.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.plugin.ingest; - -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Injector; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.sort.SortOrder; - -import java.util.Collections; -import java.util.Iterator; - -public class PipelineStoreClient extends AbstractLifecycleComponent { - - private volatile Client client; - private final Injector injector; - private final TimeValue scrollTimeout; - - @Inject - public PipelineStoreClient(Settings settings, Injector injector) { - super(settings); - this.injector = injector; - this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30)); - } - - @Override - protected void doStart() { - client = injector.getInstance(Client.class); - } - - @Override - protected void doStop() { - client.close(); - } - - @Override - protected void doClose() { - } - - public Iterable readAllPipelines() { - // TODO: the search should be replaced with an ingest API when it is available - SearchResponse searchResponse = client.prepareSearch(PipelineStore.INDEX) - .setVersion(true) - .setScroll(scrollTimeout) - .addSort("_doc", SortOrder.ASC) - .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .get(); - - if (searchResponse.getHits().getTotalHits() == 0) { - return Collections.emptyList(); - } - logger.debug("reading [{}] pipeline documents", searchResponse.getHits().totalHits()); - return new Iterable() { - @Override - public Iterator iterator() { - return new SearchScrollIterator(searchResponse); - } - }; - } - - public boolean existPipeline(String pipelineId) { - GetResponse response = client.prepareGet(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId).get(); - return response.isExists(); - } - - class SearchScrollIterator implements Iterator { - - private SearchResponse searchResponse; - - private int currentIndex; - private SearchHit[] currentHits; - - SearchScrollIterator(SearchResponse searchResponse) { - this.searchResponse = searchResponse; - this.currentHits = searchResponse.getHits().getHits(); - } - - @Override - public boolean hasNext() { - if (currentIndex < currentHits.length) { - return true; - } else { - if (searchResponse == null) { - return false; - } - - searchResponse = client.prepareSearchScroll(searchResponse.getScrollId()) - .setScroll(scrollTimeout) - .get(); - if (searchResponse.getHits().getHits().length == 0) { - searchResponse = null; - return false; - } else { - currentHits = searchResponse.getHits().getHits(); - currentIndex = 0; - return true; - } - } - } - - @Override - public SearchHit next() { - SearchHit hit = currentHits[currentIndex++]; - if (logger.isTraceEnabled()) { - logger.trace("reading pipeline document [{}] with source [{}]", hit.getId(), hit.sourceAsString()); - } - return hit; - } - } - -} diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineTransportAction.java index c8b60a0252f..dff4d4697b0 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineTransportAction.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/delete/DeletePipelineTransportAction.java @@ -34,22 +34,17 @@ import org.elasticsearch.transport.TransportService; public class DeletePipelineTransportAction extends HandledTransportAction { - private final TransportDeleteAction deleteAction; + private final PipelineStore pipelineStore; @Inject - public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportDeleteAction deleteAction) { + public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) { super(settings, DeletePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new); - this.deleteAction = deleteAction; + this.pipelineStore = pipelineStore; } @Override protected void doExecute(DeletePipelineRequest request, ActionListener listener) { - DeleteRequest deleteRequest = new DeleteRequest(request); - deleteRequest.index(PipelineStore.INDEX); - deleteRequest.type(PipelineStore.TYPE); - deleteRequest.id(request.id()); - deleteRequest.refresh(true); - deleteAction.execute(deleteRequest, new ActionListener() { + pipelineStore.delete(request, new ActionListener() { @Override public void onResponse(DeleteResponse deleteResponse) { listener.onResponse(new DeletePipelineResponse(deleteResponse.getId(), deleteResponse.isFound())); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineResponse.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineResponse.java index 020c8004631..8d7bf9cc258 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineResponse.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineResponse.java @@ -27,26 +27,27 @@ import org.elasticsearch.common.xcontent.StatusToXContent; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.plugin.ingest.PipelineDefinition; import org.elasticsearch.rest.RestStatus; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class GetPipelineResponse extends ActionResponse implements StatusToXContent { - private Map pipelines; - private Map versions; + private List pipelines; public GetPipelineResponse() { } - public GetPipelineResponse(Map pipelines, Map versions) { + public GetPipelineResponse(List pipelines) { this.pipelines = pipelines; - this.versions = versions; } - public Map pipelines() { + public List pipelines() { return pipelines; } @@ -54,14 +55,9 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont public void readFrom(StreamInput in) throws IOException { super.readFrom(in); int size = in.readVInt(); - pipelines = new HashMap<>(size); + pipelines = new ArrayList<>(size); for (int i = 0; i < size; i++) { - pipelines.put(in.readString(), in.readBytesReference()); - } - size = in.readVInt(); - versions = new HashMap<>(size); - for (int i = 0; i < size; i++) { - versions.put(in.readString(), in.readVLong()); + pipelines.add(PipelineDefinition.readPipelineDefinitionFrom(in)); } } @@ -69,14 +65,8 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeVInt(pipelines.size()); - for (Map.Entry entry : pipelines.entrySet()) { - out.writeString(entry.getKey()); - out.writeBytesReference(entry.getValue()); - } - out.writeVInt(versions.size()); - for (Map.Entry entry : versions.entrySet()) { - out.writeString(entry.getKey()); - out.writeVLong(entry.getValue()); + for (PipelineDefinition pipeline : pipelines) { + pipeline.writeTo(out); } } @@ -91,11 +81,8 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - for (Map.Entry entry : pipelines.entrySet()) { - builder.startObject(entry.getKey()); - XContentHelper.writeRawField("_source", entry.getValue(), builder, params); - builder.field("_version", versions.get(entry.getKey())); - builder.endObject(); + for (PipelineDefinition definition : pipelines) { + definition.toXContent(builder, params); } return builder; } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineTransportAction.java index 89ba97d81b3..ada112cade8 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineTransportAction.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/get/GetPipelineTransportAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugin.ingest.PipelineDefinition; import org.elasticsearch.plugin.ingest.PipelineStore; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -46,13 +47,7 @@ public class GetPipelineTransportAction extends HandledTransportAction listener) { - List references = pipelineStore.getReference(request.ids()); - Map result = new HashMap<>(); - Map versions = new HashMap<>(); - for (PipelineStore.PipelineReference reference : references) { - result.put(reference.getPipeline().getId(), reference.getSource()); - versions.put(reference.getPipeline().getId(), reference.getVersion()); - } - listener.onResponse(new GetPipelineResponse(result, versions)); + List references = pipelineStore.getReference(request.ids()); + listener.onResponse(new GetPipelineResponse(references)); } } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineTransportAction.java index b6c59ff8ed8..72e6391167f 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineTransportAction.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/put/PutPipelineTransportAction.java @@ -38,34 +38,17 @@ import java.util.Map; public class PutPipelineTransportAction extends HandledTransportAction { - private final TransportIndexAction indexAction; private final PipelineStore pipelineStore; @Inject - public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportIndexAction indexAction, PipelineStore pipelineStore) { + public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) { super(settings, PutPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new); - this.indexAction = indexAction; this.pipelineStore = pipelineStore; } @Override protected void doExecute(PutPipelineRequest request, ActionListener listener) { - // validates the pipeline and processor configuration: - Map pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2(); - try { - pipelineStore.constructPipeline(request.id(), pipelineConfig); - } catch (IOException e) { - listener.onFailure(e); - return; - } - - IndexRequest indexRequest = new IndexRequest(request); - indexRequest.index(PipelineStore.INDEX); - indexRequest.type(PipelineStore.TYPE); - indexRequest.id(request.id()); - indexRequest.source(request.source()); - indexRequest.refresh(true); - indexAction.execute(indexRequest, new ActionListener() { + pipelineStore.put(request, new ActionListener() { @Override public void onResponse(IndexResponse indexResponse) { PutPipelineResponse response = new PutPipelineResponse(); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 67b484488de..985f91bfb8e 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -89,7 +89,8 @@ public class IngestClientIT extends ESIntegTestCase { .setIds("_id") .get(); assertThat(response.isFound(), is(true)); - assertThat(response.pipelines().get("_id"), notNullValue()); + assertThat(response.pipelines().size(), equalTo(1)); + assertThat(response.pipelines().get(0).getId(), equalTo("_id")); } }); @@ -178,7 +179,8 @@ public class IngestClientIT extends ESIntegTestCase { .setIds("_id") .get(); assertThat(response.isFound(), is(true)); - assertThat(response.pipelines().get("_id"), notNullValue()); + assertThat(response.pipelines().size(), equalTo(1)); + assertThat(response.pipelines().get(0).getId(), equalTo("_id")); } }); @@ -232,7 +234,7 @@ public class IngestClientIT extends ESIntegTestCase { .setIds("_id") .get(); assertThat(response.isFound(), is(false)); - assertThat(response.pipelines().get("_id"), nullValue()); + assertThat(response.pipelines().size(), equalTo(0)); } }); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java index 559dd54487d..1dfc64f65b5 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java @@ -19,41 +19,59 @@ package org.elasticsearch.plugin.ingest; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.StringText; import org.elasticsearch.env.Environment; +import org.elasticsearch.index.get.GetResult; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.internal.InternalSearchHit; +import org.elasticsearch.search.internal.InternalSearchHits; +import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; +import org.mockito.ArgumentMatcher; +import org.mockito.Matchers; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.*; -import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Matchers.any; public class PipelineStoreTests extends ESTestCase { - private PipelineStore store; private ThreadPool threadPool; - private PipelineStoreClient client; + private PipelineStore store; + private Client client; @Before public void init() { threadPool = new ThreadPool("test"); + client = mock(Client.class); + Injector injector = mock(Injector.class); + when(injector.getInstance(Client.class)).thenReturn(client); + ClusterService clusterService = mock(ClusterService.class); - client = mock(PipelineStoreClient.class); + when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList())); Environment environment = mock(Environment.class); - store = new PipelineStore(Settings.EMPTY, threadPool, environment, clusterService, client, Collections.emptyMap()); + store = new PipelineStore(Settings.EMPTY, injector, threadPool, environment, clusterService, Collections.emptyMap()); store.start(); } @@ -63,22 +81,21 @@ public class PipelineStoreTests extends ESTestCase { threadPool.shutdown(); } - public void testUpdatePipeline() throws Exception { List hits = new ArrayList<>(); hits.add(new InternalSearchHit(0, "1", new StringText("type"), Collections.emptyMap()) .sourceRef(new BytesArray("{\"description\": \"_description1\"}")) ); - when(client.readAllPipelines()).thenReturn(hits); - when(client.existPipeline("1")).thenReturn(true); + when(client.search(any())).thenReturn(expectedSearchReponse(hits)); + when(client.get(any())).thenReturn(expectedGetResponse(true)); assertThat(store.get("1"), nullValue()); store.updatePipelines(); assertThat(store.get("1").getId(), equalTo("1")); assertThat(store.get("1").getDescription(), equalTo("_description1")); - when(client.existPipeline("2")).thenReturn(true); + when(client.get(any())).thenReturn(expectedGetResponse(true)); hits.add(new InternalSearchHit(0, "2", new StringText("type"), Collections.emptyMap()) .sourceRef(new BytesArray("{\"description\": \"_description2\"}")) ); @@ -89,7 +106,7 @@ public class PipelineStoreTests extends ESTestCase { assertThat(store.get("2").getDescription(), equalTo("_description2")); hits.remove(1); - when(client.existPipeline("2")).thenReturn(false); + when(client.get(eqGetRequest(PipelineStore.INDEX, PipelineStore.TYPE, "2"))).thenReturn(expectedGetResponse(false)); store.updatePipelines(); assertThat(store.get("1").getId(), equalTo("1")); assertThat(store.get("1").getDescription(), equalTo("_description1")); @@ -101,8 +118,8 @@ public class PipelineStoreTests extends ESTestCase { hits.add(new InternalSearchHit(0, "1", new StringText("type"), Collections.emptyMap()) .sourceRef(new BytesArray("{\"description\": \"_description1\"}")) ); - when(client.readAllPipelines()).thenReturn(hits); - when(client.existPipeline(anyString())).thenReturn(true); + when(client.search(any())).thenReturn(expectedSearchReponse(hits)); + when(client.get(any())).thenReturn(expectedGetResponse(true)); assertThat(store.get("1"), nullValue()); store.startUpdateWorker(); @@ -131,20 +148,17 @@ public class PipelineStoreTests extends ESTestCase { hits.add(new InternalSearchHit(0, "foo", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); hits.add(new InternalSearchHit(0, "bar", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); hits.add(new InternalSearchHit(0, "foobar", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}"))); - when(client.readAllPipelines()).thenReturn(hits); + when(client.search(any())).thenReturn(expectedSearchReponse(hits)); store.updatePipelines(); - List result = store.getReference("foo"); + List result = store.getReference("foo"); assertThat(result.size(), equalTo(1)); assertThat(result.get(0).getPipeline().getId(), equalTo("foo")); result = store.getReference("foo*"); // to make sure the order is consistent in the test: - Collections.sort(result, new Comparator() { - @Override - public int compare(PipelineStore.PipelineReference first, PipelineStore.PipelineReference second) { - return first.getPipeline().getId().compareTo(second.getPipeline().getId()); - } + result.sort((first, second) -> { + return first.getPipeline().getId().compareTo(second.getPipeline().getId()); }); assertThat(result.size(), equalTo(2)); assertThat(result.get(0).getPipeline().getId(), equalTo("foo")); @@ -156,11 +170,8 @@ public class PipelineStoreTests extends ESTestCase { result = store.getReference("*"); // to make sure the order is consistent in the test: - Collections.sort(result, new Comparator() { - @Override - public int compare(PipelineStore.PipelineReference first, PipelineStore.PipelineReference second) { - return first.getPipeline().getId().compareTo(second.getPipeline().getId()); - } + result.sort((first, second) -> { + return first.getPipeline().getId().compareTo(second.getPipeline().getId()); }); assertThat(result.size(), equalTo(3)); assertThat(result.get(0).getPipeline().getId(), equalTo("bar")); @@ -173,4 +184,49 @@ public class PipelineStoreTests extends ESTestCase { assertThat(result.get(1).getPipeline().getId(), equalTo("bar")); } + ActionFuture expectedSearchReponse(List hits) { + return new PlainActionFuture() { + + @Override + public SearchResponse get(long timeout, TimeUnit unit) { + InternalSearchHits hits1 = new InternalSearchHits(hits.toArray(new InternalSearchHit[0]), hits.size(), 1f); + return new SearchResponse(new InternalSearchResponse(hits1, null, null, false, null), "_scrollId", 1, 1, 1, null); + } + }; + } + + ActionFuture expectedGetResponse(boolean exists) { + return new PlainActionFuture() { + @Override + public GetResponse get() throws InterruptedException, ExecutionException { + return new GetResponse(new GetResult("_index", "_type", "_id", 1, exists, null, null)); + } + }; + } + + GetRequest eqGetRequest(String index, String type, String id) { + return Matchers.argThat(new GetRequestMatcher(index, type, id)); + } + + static class GetRequestMatcher extends ArgumentMatcher { + + private final String index; + private final String type; + private final String id; + + public GetRequestMatcher(String index, String type, String id) { + this.index = index; + this.type = type; + this.id = id; + } + + @Override + public boolean matches(Object o) { + GetRequest getRequest = (GetRequest) o; + return Objects.equals(getRequest.index(), index) && + Objects.equals(getRequest.type(), type) && + Objects.equals(getRequest.id(), id); + } + } + }