* Inlined PipelineStoreClient class into the PipelineStore class

* Moved PipelineReference to a top level class and named it PipelineDefinition
* Pulled some logic from the crud transport classes to the PipelineStore
* Use IOUtils#close(...) where appropriate
This commit is contained in:
Martijn van Groningen 2015-11-25 18:27:04 +01:00
parent 1a7391070f
commit afc9069c99
13 changed files with 420 additions and 298 deletions

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import java.util.Collections;
import java.util.Iterator;
/**
* An iterator that easily helps to consume all hits from a scroll search.
*/
public final class SearchScrollIterator implements Iterator<SearchHit> {
/**
* Creates an iterator that returns all matching hits of a scroll search via an iterator.
* The iterator will return all hits per scroll search and execute additional scroll searches
* to get more hits until all hits have been returned by the scroll search on the ES side.
*/
public static Iterable<SearchHit> createIterator(Client client, TimeValue scrollTimeout, SearchRequest searchRequest) {
searchRequest.scroll(scrollTimeout);
SearchResponse searchResponse = client.search(searchRequest).actionGet(scrollTimeout);
if (searchResponse.getHits().getTotalHits() == 0) {
return Collections.emptyList();
} else {
return () -> new SearchScrollIterator(client, scrollTimeout, searchResponse);
}
}
private final Client client;
private final TimeValue scrollTimeout;
private int currentIndex;
private SearchHit[] currentHits;
private SearchResponse searchResponse;
private SearchScrollIterator(Client client, TimeValue scrollTimeout, SearchResponse searchResponse) {
this.client = client;
this.scrollTimeout = scrollTimeout;
this.searchResponse = searchResponse;
this.currentHits = searchResponse.getHits().getHits();
}
@Override
public boolean hasNext() {
if (currentIndex < currentHits.length) {
return true;
} else {
if (searchResponse == null) {
return false;
}
SearchScrollRequest request = new SearchScrollRequest(searchResponse.getScrollId());
request.scroll(scrollTimeout);
searchResponse = client.searchScroll(request).actionGet(scrollTimeout);
if (searchResponse.getHits().getHits().length == 0) {
searchResponse = null;
return false;
} else {
currentHits = searchResponse.getHits().getHits();
currentIndex = 0;
return true;
}
}
}
@Override
public SearchHit next() {
return currentHits[currentIndex++];
}
}

View File

@ -17,33 +17,39 @@
* under the License.
*/
package org.elasticsearch.plugin.ingest;
package org.elasticsearch.common;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;
import static org.hamcrest.Matchers.equalTo;
public class PipelineStoreClientTests extends ESSingleNodeTestCase {
// Not a real unit tests with mocks, but with a single node, because we mock the scroll
// search behaviour and it changes then this test will not catch this.
public class SearchScrollIteratorTests extends ESSingleNodeTestCase {
public void testReadAll() {
PipelineStoreClient reader = new PipelineStoreClient(Settings.EMPTY, node().injector());
reader.start();
createIndex(PipelineStore.INDEX);
int numDocs = scaledRandomIntBetween(32, 128);
public void testSearchScrollIterator() {
createIndex("index");
int numDocs = scaledRandomIntBetween(0, 128);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex(PipelineStore.INDEX, PipelineStore.TYPE, Integer.toString(i))
client().prepareIndex("index", "type", Integer.toString(i))
.setSource("field", "value" + i)
.get();
}
client().admin().indices().prepareRefresh().get();
int i = 0;
for (SearchHit hit : reader.readAllPipelines()) {
SearchRequest searchRequest = new SearchRequest("index");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// randomize size, because that also controls how many actual searches will happen:
sourceBuilder.size(scaledRandomIntBetween(1, 10));
searchRequest.source(sourceBuilder);
Iterable<SearchHit> hits = SearchScrollIterator.createIterator(client(), TimeValue.timeValueSeconds(10), searchRequest);
for (SearchHit hit : hits) {
assertThat(hit.getId(), equalTo(Integer.toString(i)));
assertThat(hit.getVersion(), equalTo(1l));
assertThat(hit.getSource().get("field"), equalTo("value" + i));
i++;
}

View File

@ -50,7 +50,6 @@ public class IngestModule extends AbstractModule {
binder().bind(IngestRestFilter.class).asEagerSingleton();
binder().bind(PipelineExecutionService.class).asEagerSingleton();
binder().bind(PipelineStore.class).asEagerSingleton();
binder().bind(PipelineStoreClient.class).asEagerSingleton();
binder().bind(SimulateExecutionService.class).asEagerSingleton();
addProcessor(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory());

View File

@ -87,7 +87,7 @@ public class IngestPlugin extends Plugin {
if (transportClient) {
return Collections.emptyList();
} else {
return Arrays.asList(PipelineStore.class, PipelineStoreClient.class);
return Collections.singletonList(PipelineStore.class);
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.Pipeline;
import java.io.IOException;
public class PipelineDefinition implements Writeable<PipelineDefinition>, ToXContent {
private static final PipelineDefinition PROTOTYPE = new PipelineDefinition((String) null, -1, null);
public static PipelineDefinition readPipelineDefinitionFrom(StreamInput in) throws IOException {
return PROTOTYPE.readFrom(in);
}
private final String id;
private final long version;
private final BytesReference source;
private final Pipeline pipeline;
PipelineDefinition(Pipeline pipeline, long version, BytesReference source) {
this.id = pipeline.getId();
this.version = version;
this.source = source;
this.pipeline = pipeline;
}
PipelineDefinition(String id, long version, BytesReference source) {
this.id = id;
this.version = version;
this.source = source;
this.pipeline = null;
}
public String getId() {
return id;
}
public long getVersion() {
return version;
}
public BytesReference getSource() {
return source;
}
Pipeline getPipeline() {
return pipeline;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PipelineDefinition holder = (PipelineDefinition) o;
return source.equals(holder.source);
}
@Override
public int hashCode() {
return source.hashCode();
}
@Override
public PipelineDefinition readFrom(StreamInput in) throws IOException {
String id = in.readString();
long version = in.readLong();
BytesReference source = in.readBytesReference();
return new PipelineDefinition(id, version, source);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeLong(version);
out.writeBytesReference(source);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(id);
XContentHelper.writeRawField("_source", source, builder, params);
builder.field("_version", version);
builder.endObject();
return builder;
}
}

View File

@ -19,21 +19,39 @@
package org.elasticsearch.plugin.ingest;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.SearchScrollIterator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineRequest;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@ -44,22 +62,25 @@ public class PipelineStore extends AbstractLifecycleComponent {
public final static String INDEX = ".ingest";
public final static String TYPE = "pipeline";
private final Injector injector;
private final ThreadPool threadPool;
private final TimeValue scrollTimeout;
private final ClusterService clusterService;
private final TimeValue pipelineUpdateInterval;
private final PipelineStoreClient client;
private final Pipeline.Factory factory = new Pipeline.Factory();
private final Map<String, Processor.Factory> processorFactoryRegistry;
private volatile Map<String, PipelineReference> pipelines = new HashMap<>();
private volatile Client client;
private volatile Map<String, PipelineDefinition> pipelines = new HashMap<>();
@Inject
public PipelineStore(Settings settings, ThreadPool threadPool, Environment environment, ClusterService clusterService, PipelineStoreClient client, Map<String, Processor.Factory> processors) {
public PipelineStore(Settings settings, Injector injector, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map<String, Processor.Factory> processors) {
super(settings);
this.injector = injector;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30));
this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1));
this.client = client;
for (Processor.Factory factory : processors.values()) {
factory.setConfigDirectory(environment.configFile());
}
@ -69,6 +90,7 @@ public class PipelineStore extends AbstractLifecycleComponent {
@Override
protected void doStart() {
client = injector.getInstance(Client.class);
}
@Override
@ -77,17 +99,43 @@ public class PipelineStore extends AbstractLifecycleComponent {
@Override
protected void doClose() {
for (Processor.Factory factory : processorFactoryRegistry.values()) {
try {
factory.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
IOUtils.close(processorFactoryRegistry.values());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void delete(DeletePipelineRequest request, ActionListener<DeleteResponse> listener) {
DeleteRequest deleteRequest = new DeleteRequest(request);
deleteRequest.index(PipelineStore.INDEX);
deleteRequest.type(PipelineStore.TYPE);
deleteRequest.id(request.id());
deleteRequest.refresh(true);
client.delete(deleteRequest, listener);
}
public void put(PutPipelineRequest request, ActionListener<IndexResponse> listener) {
// validates the pipeline and processor configuration:
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2();
try {
constructPipeline(request.id(), pipelineConfig);
} catch (IOException e) {
listener.onFailure(e);
return;
}
IndexRequest indexRequest = new IndexRequest(request);
indexRequest.index(PipelineStore.INDEX);
indexRequest.type(PipelineStore.TYPE);
indexRequest.id(request.id());
indexRequest.source(request.source());
indexRequest.refresh(true);
client.index(indexRequest, listener);
}
public Pipeline get(String id) {
PipelineReference ref = pipelines.get(id);
PipelineDefinition ref = pipelines.get(id);
if (ref != null) {
return ref.getPipeline();
} else {
@ -99,17 +147,17 @@ public class PipelineStore extends AbstractLifecycleComponent {
return processorFactoryRegistry;
}
public List<PipelineReference> getReference(String... ids) {
List<PipelineReference> result = new ArrayList<>(ids.length);
public List<PipelineDefinition> getReference(String... ids) {
List<PipelineDefinition> result = new ArrayList<>(ids.length);
for (String id : ids) {
if (Regex.isSimpleMatchPattern(id)) {
for (Map.Entry<String, PipelineReference> entry : pipelines.entrySet()) {
for (Map.Entry<String, PipelineDefinition> entry : pipelines.entrySet()) {
if (Regex.simpleMatch(id, entry.getKey())) {
result.add(entry.getValue());
}
}
} else {
PipelineReference reference = pipelines.get(id);
PipelineDefinition reference = pipelines.get(id);
if (reference != null) {
result.add(reference);
}
@ -118,7 +166,7 @@ public class PipelineStore extends AbstractLifecycleComponent {
return result;
}
public Pipeline constructPipeline(String id, Map<String, Object> config) throws IOException {
Pipeline constructPipeline(String id, Map<String, Object> config) throws IOException {
return factory.create(id, config, processorFactoryRegistry);
}
@ -127,11 +175,11 @@ public class PipelineStore extends AbstractLifecycleComponent {
// so for that reason the goal is to keep the update logic simple.
int changed = 0;
Map<String, PipelineReference> newPipelines = new HashMap<>(pipelines);
for (SearchHit hit : client.readAllPipelines()) {
Map<String, PipelineDefinition> newPipelines = new HashMap<>(pipelines);
for (SearchHit hit : readAllPipelines()) {
String pipelineId = hit.getId();
BytesReference pipelineSource = hit.getSourceRef();
PipelineReference previous = newPipelines.get(pipelineId);
PipelineDefinition previous = newPipelines.get(pipelineId);
if (previous != null) {
if (previous.getSource().equals(pipelineSource)) {
continue;
@ -140,12 +188,12 @@ public class PipelineStore extends AbstractLifecycleComponent {
changed++;
Pipeline pipeline = constructPipeline(hit.getId(), hit.sourceAsMap());
newPipelines.put(pipelineId, new PipelineReference(pipeline, hit.getVersion(), pipelineSource));
newPipelines.put(pipelineId, new PipelineDefinition(pipeline, hit.getVersion(), pipelineSource));
}
int removed = 0;
for (String existingPipelineId : pipelines.keySet()) {
if (!client.existPipeline(existingPipelineId)) {
if (!existPipeline(existingPipelineId)) {
newPipelines.remove(existingPipelineId);
removed++;
}
@ -165,6 +213,23 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
}
boolean existPipeline(String pipelineId) {
GetRequest request = new GetRequest(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId);
GetResponse response = client.get(request).actionGet();
return response.isExists();
}
Iterable<SearchHit> readAllPipelines() {
// TODO: the search should be replaced with an ingest API when it is available
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.version(true);
sourceBuilder.sort("_doc", SortOrder.ASC);
SearchRequest searchRequest = new SearchRequest(PipelineStore.INDEX);
searchRequest.source(sourceBuilder);
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
return SearchScrollIterator.createIterator(client, scrollTimeout, searchRequest);
}
class Updater implements Runnable {
@Override
@ -191,43 +256,4 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
}
public static class PipelineReference {
private final Pipeline pipeline;
private final long version;
private final BytesReference source;
PipelineReference(Pipeline pipeline, long version, BytesReference source) {
this.pipeline = pipeline;
this.version = version;
this.source = source;
}
public Pipeline getPipeline() {
return pipeline;
}
public long getVersion() {
return version;
}
public BytesReference getSource() {
return source;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PipelineReference holder = (PipelineReference) o;
return source.equals(holder.source);
}
@Override
public int hashCode() {
return source.hashCode();
}
}
}

View File

@ -1,135 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import java.util.Collections;
import java.util.Iterator;
public class PipelineStoreClient extends AbstractLifecycleComponent {
private volatile Client client;
private final Injector injector;
private final TimeValue scrollTimeout;
@Inject
public PipelineStoreClient(Settings settings, Injector injector) {
super(settings);
this.injector = injector;
this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30));
}
@Override
protected void doStart() {
client = injector.getInstance(Client.class);
}
@Override
protected void doStop() {
client.close();
}
@Override
protected void doClose() {
}
public Iterable<SearchHit> readAllPipelines() {
// TODO: the search should be replaced with an ingest API when it is available
SearchResponse searchResponse = client.prepareSearch(PipelineStore.INDEX)
.setVersion(true)
.setScroll(scrollTimeout)
.addSort("_doc", SortOrder.ASC)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.get();
if (searchResponse.getHits().getTotalHits() == 0) {
return Collections.emptyList();
}
logger.debug("reading [{}] pipeline documents", searchResponse.getHits().totalHits());
return new Iterable<SearchHit>() {
@Override
public Iterator<SearchHit> iterator() {
return new SearchScrollIterator(searchResponse);
}
};
}
public boolean existPipeline(String pipelineId) {
GetResponse response = client.prepareGet(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId).get();
return response.isExists();
}
class SearchScrollIterator implements Iterator<SearchHit> {
private SearchResponse searchResponse;
private int currentIndex;
private SearchHit[] currentHits;
SearchScrollIterator(SearchResponse searchResponse) {
this.searchResponse = searchResponse;
this.currentHits = searchResponse.getHits().getHits();
}
@Override
public boolean hasNext() {
if (currentIndex < currentHits.length) {
return true;
} else {
if (searchResponse == null) {
return false;
}
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(scrollTimeout)
.get();
if (searchResponse.getHits().getHits().length == 0) {
searchResponse = null;
return false;
} else {
currentHits = searchResponse.getHits().getHits();
currentIndex = 0;
return true;
}
}
}
@Override
public SearchHit next() {
SearchHit hit = currentHits[currentIndex++];
if (logger.isTraceEnabled()) {
logger.trace("reading pipeline document [{}] with source [{}]", hit.getId(), hit.sourceAsString());
}
return hit;
}
}
}

View File

@ -34,22 +34,17 @@ import org.elasticsearch.transport.TransportService;
public class DeletePipelineTransportAction extends HandledTransportAction<DeletePipelineRequest, DeletePipelineResponse> {
private final TransportDeleteAction deleteAction;
private final PipelineStore pipelineStore;
@Inject
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportDeleteAction deleteAction) {
public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) {
super(settings, DeletePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new);
this.deleteAction = deleteAction;
this.pipelineStore = pipelineStore;
}
@Override
protected void doExecute(DeletePipelineRequest request, ActionListener<DeletePipelineResponse> listener) {
DeleteRequest deleteRequest = new DeleteRequest(request);
deleteRequest.index(PipelineStore.INDEX);
deleteRequest.type(PipelineStore.TYPE);
deleteRequest.id(request.id());
deleteRequest.refresh(true);
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
pipelineStore.delete(request, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
listener.onResponse(new DeletePipelineResponse(deleteResponse.getId(), deleteResponse.isFound()));

View File

@ -27,26 +27,27 @@ import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.plugin.ingest.PipelineDefinition;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GetPipelineResponse extends ActionResponse implements StatusToXContent {
private Map<String, BytesReference> pipelines;
private Map<String, Long> versions;
private List<PipelineDefinition> pipelines;
public GetPipelineResponse() {
}
public GetPipelineResponse(Map<String, BytesReference> pipelines, Map<String, Long> versions) {
public GetPipelineResponse(List<PipelineDefinition> pipelines) {
this.pipelines = pipelines;
this.versions = versions;
}
public Map<String, BytesReference> pipelines() {
public List<PipelineDefinition> pipelines() {
return pipelines;
}
@ -54,14 +55,9 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
pipelines = new HashMap<>(size);
pipelines = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
pipelines.put(in.readString(), in.readBytesReference());
}
size = in.readVInt();
versions = new HashMap<>(size);
for (int i = 0; i < size; i++) {
versions.put(in.readString(), in.readVLong());
pipelines.add(PipelineDefinition.readPipelineDefinitionFrom(in));
}
}
@ -69,14 +65,8 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(pipelines.size());
for (Map.Entry<String, BytesReference> entry : pipelines.entrySet()) {
out.writeString(entry.getKey());
out.writeBytesReference(entry.getValue());
}
out.writeVInt(versions.size());
for (Map.Entry<String, Long> entry : versions.entrySet()) {
out.writeString(entry.getKey());
out.writeVLong(entry.getValue());
for (PipelineDefinition pipeline : pipelines) {
pipeline.writeTo(out);
}
}
@ -91,11 +81,8 @@ public class GetPipelineResponse extends ActionResponse implements StatusToXCont
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (Map.Entry<String, BytesReference> entry : pipelines.entrySet()) {
builder.startObject(entry.getKey());
XContentHelper.writeRawField("_source", entry.getValue(), builder, params);
builder.field("_version", versions.get(entry.getKey()));
builder.endObject();
for (PipelineDefinition definition : pipelines) {
definition.toXContent(builder, params);
}
return builder;
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.PipelineDefinition;
import org.elasticsearch.plugin.ingest.PipelineStore;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -46,13 +47,7 @@ public class GetPipelineTransportAction extends HandledTransportAction<GetPipeli
@Override
protected void doExecute(GetPipelineRequest request, ActionListener<GetPipelineResponse> listener) {
List<PipelineStore.PipelineReference> references = pipelineStore.getReference(request.ids());
Map<String, BytesReference> result = new HashMap<>();
Map<String, Long> versions = new HashMap<>();
for (PipelineStore.PipelineReference reference : references) {
result.put(reference.getPipeline().getId(), reference.getSource());
versions.put(reference.getPipeline().getId(), reference.getVersion());
}
listener.onResponse(new GetPipelineResponse(result, versions));
List<PipelineDefinition> references = pipelineStore.getReference(request.ids());
listener.onResponse(new GetPipelineResponse(references));
}
}

View File

@ -38,34 +38,17 @@ import java.util.Map;
public class PutPipelineTransportAction extends HandledTransportAction<PutPipelineRequest, PutPipelineResponse> {
private final TransportIndexAction indexAction;
private final PipelineStore pipelineStore;
@Inject
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportIndexAction indexAction, PipelineStore pipelineStore) {
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, PipelineStore pipelineStore) {
super(settings, PutPipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
this.indexAction = indexAction;
this.pipelineStore = pipelineStore;
}
@Override
protected void doExecute(PutPipelineRequest request, ActionListener<PutPipelineResponse> listener) {
// validates the pipeline and processor configuration:
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2();
try {
pipelineStore.constructPipeline(request.id(), pipelineConfig);
} catch (IOException e) {
listener.onFailure(e);
return;
}
IndexRequest indexRequest = new IndexRequest(request);
indexRequest.index(PipelineStore.INDEX);
indexRequest.type(PipelineStore.TYPE);
indexRequest.id(request.id());
indexRequest.source(request.source());
indexRequest.refresh(true);
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
pipelineStore.put(request, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
PutPipelineResponse response = new PutPipelineResponse();

View File

@ -89,7 +89,8 @@ public class IngestClientIT extends ESIntegTestCase {
.setIds("_id")
.get();
assertThat(response.isFound(), is(true));
assertThat(response.pipelines().get("_id"), notNullValue());
assertThat(response.pipelines().size(), equalTo(1));
assertThat(response.pipelines().get(0).getId(), equalTo("_id"));
}
});
@ -178,7 +179,8 @@ public class IngestClientIT extends ESIntegTestCase {
.setIds("_id")
.get();
assertThat(response.isFound(), is(true));
assertThat(response.pipelines().get("_id"), notNullValue());
assertThat(response.pipelines().size(), equalTo(1));
assertThat(response.pipelines().get(0).getId(), equalTo("_id"));
}
});
@ -232,7 +234,7 @@ public class IngestClientIT extends ESIntegTestCase {
.setIds("_id")
.get();
assertThat(response.isFound(), is(false));
assertThat(response.pipelines().get("_id"), nullValue());
assertThat(response.pipelines().size(), equalTo(0));
}
});
}

View File

@ -19,41 +19,59 @@
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentMatcher;
import org.mockito.Matchers;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.*;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Matchers.any;
public class PipelineStoreTests extends ESTestCase {
private PipelineStore store;
private ThreadPool threadPool;
private PipelineStoreClient client;
private PipelineStore store;
private Client client;
@Before
public void init() {
threadPool = new ThreadPool("test");
client = mock(Client.class);
Injector injector = mock(Injector.class);
when(injector.getInstance(Client.class)).thenReturn(client);
ClusterService clusterService = mock(ClusterService.class);
client = mock(PipelineStoreClient.class);
when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
Environment environment = mock(Environment.class);
store = new PipelineStore(Settings.EMPTY, threadPool, environment, clusterService, client, Collections.emptyMap());
store = new PipelineStore(Settings.EMPTY, injector, threadPool, environment, clusterService, Collections.emptyMap());
store.start();
}
@ -63,22 +81,21 @@ public class PipelineStoreTests extends ESTestCase {
threadPool.shutdown();
}
public void testUpdatePipeline() throws Exception {
List<SearchHit> hits = new ArrayList<>();
hits.add(new InternalSearchHit(0, "1", new StringText("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"description\": \"_description1\"}"))
);
when(client.readAllPipelines()).thenReturn(hits);
when(client.existPipeline("1")).thenReturn(true);
when(client.search(any())).thenReturn(expectedSearchReponse(hits));
when(client.get(any())).thenReturn(expectedGetResponse(true));
assertThat(store.get("1"), nullValue());
store.updatePipelines();
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
when(client.existPipeline("2")).thenReturn(true);
when(client.get(any())).thenReturn(expectedGetResponse(true));
hits.add(new InternalSearchHit(0, "2", new StringText("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"description\": \"_description2\"}"))
);
@ -89,7 +106,7 @@ public class PipelineStoreTests extends ESTestCase {
assertThat(store.get("2").getDescription(), equalTo("_description2"));
hits.remove(1);
when(client.existPipeline("2")).thenReturn(false);
when(client.get(eqGetRequest(PipelineStore.INDEX, PipelineStore.TYPE, "2"))).thenReturn(expectedGetResponse(false));
store.updatePipelines();
assertThat(store.get("1").getId(), equalTo("1"));
assertThat(store.get("1").getDescription(), equalTo("_description1"));
@ -101,8 +118,8 @@ public class PipelineStoreTests extends ESTestCase {
hits.add(new InternalSearchHit(0, "1", new StringText("type"), Collections.emptyMap())
.sourceRef(new BytesArray("{\"description\": \"_description1\"}"))
);
when(client.readAllPipelines()).thenReturn(hits);
when(client.existPipeline(anyString())).thenReturn(true);
when(client.search(any())).thenReturn(expectedSearchReponse(hits));
when(client.get(any())).thenReturn(expectedGetResponse(true));
assertThat(store.get("1"), nullValue());
store.startUpdateWorker();
@ -131,20 +148,17 @@ public class PipelineStoreTests extends ESTestCase {
hits.add(new InternalSearchHit(0, "foo", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
hits.add(new InternalSearchHit(0, "bar", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
hits.add(new InternalSearchHit(0, "foobar", new StringText("type"), Collections.emptyMap()).sourceRef(new BytesArray("{\"description\": \"_description\"}")));
when(client.readAllPipelines()).thenReturn(hits);
when(client.search(any())).thenReturn(expectedSearchReponse(hits));
store.updatePipelines();
List<PipelineStore.PipelineReference> result = store.getReference("foo");
List<PipelineDefinition> result = store.getReference("foo");
assertThat(result.size(), equalTo(1));
assertThat(result.get(0).getPipeline().getId(), equalTo("foo"));
result = store.getReference("foo*");
// to make sure the order is consistent in the test:
Collections.sort(result, new Comparator<PipelineStore.PipelineReference>() {
@Override
public int compare(PipelineStore.PipelineReference first, PipelineStore.PipelineReference second) {
return first.getPipeline().getId().compareTo(second.getPipeline().getId());
}
result.sort((first, second) -> {
return first.getPipeline().getId().compareTo(second.getPipeline().getId());
});
assertThat(result.size(), equalTo(2));
assertThat(result.get(0).getPipeline().getId(), equalTo("foo"));
@ -156,11 +170,8 @@ public class PipelineStoreTests extends ESTestCase {
result = store.getReference("*");
// to make sure the order is consistent in the test:
Collections.sort(result, new Comparator<PipelineStore.PipelineReference>() {
@Override
public int compare(PipelineStore.PipelineReference first, PipelineStore.PipelineReference second) {
return first.getPipeline().getId().compareTo(second.getPipeline().getId());
}
result.sort((first, second) -> {
return first.getPipeline().getId().compareTo(second.getPipeline().getId());
});
assertThat(result.size(), equalTo(3));
assertThat(result.get(0).getPipeline().getId(), equalTo("bar"));
@ -173,4 +184,49 @@ public class PipelineStoreTests extends ESTestCase {
assertThat(result.get(1).getPipeline().getId(), equalTo("bar"));
}
ActionFuture<SearchResponse> expectedSearchReponse(List<SearchHit> hits) {
return new PlainActionFuture<SearchResponse>() {
@Override
public SearchResponse get(long timeout, TimeUnit unit) {
InternalSearchHits hits1 = new InternalSearchHits(hits.toArray(new InternalSearchHit[0]), hits.size(), 1f);
return new SearchResponse(new InternalSearchResponse(hits1, null, null, false, null), "_scrollId", 1, 1, 1, null);
}
};
}
ActionFuture<GetResponse> expectedGetResponse(boolean exists) {
return new PlainActionFuture<GetResponse>() {
@Override
public GetResponse get() throws InterruptedException, ExecutionException {
return new GetResponse(new GetResult("_index", "_type", "_id", 1, exists, null, null));
}
};
}
GetRequest eqGetRequest(String index, String type, String id) {
return Matchers.argThat(new GetRequestMatcher(index, type, id));
}
static class GetRequestMatcher extends ArgumentMatcher<GetRequest> {
private final String index;
private final String type;
private final String id;
public GetRequestMatcher(String index, String type, String id) {
this.index = index;
this.type = type;
this.id = id;
}
@Override
public boolean matches(Object o) {
GetRequest getRequest = (GetRequest) o;
return Objects.equals(getRequest.index(), index) &&
Objects.equals(getRequest.type(), type) &&
Objects.equals(getRequest.id(), id);
}
}
}