addressed various comments

This commit is contained in:
Martijn van Groningen 2016-01-20 13:35:15 +01:00
parent e383e96f58
commit 8a7f3d9d6f
18 changed files with 100 additions and 87 deletions

View File

@ -227,8 +227,8 @@ public class ActionModule extends AbstractModule {
private final boolean ingestEnabled;
private final boolean proxy;
public ActionModule(Settings settings, boolean proxy) {
this.ingestEnabled = NodeModule.isNodeIngestEnabled(settings);
public ActionModule(boolean ingestEnabled, boolean proxy) {
this.ingestEnabled = ingestEnabled;
this.proxy = proxy;
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -383,15 +384,15 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
if ("index".equals(action)) {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.pipeline(pipeline).source(data.slice(from, nextMarker - from)), payload);
.setPipeline(pipeline).source(data.slice(from, nextMarker - from)), payload);
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create("create".equals(opType)).pipeline(pipeline)
.create("create".equals(opType)).setPipeline(pipeline)
.source(data.slice(from, nextMarker - from)), payload);
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create(true).pipeline(pipeline)
.create(true).setPipeline(pipeline)
.source(data.slice(from, nextMarker - from)), payload);
} else if ("update".equals(action)) {
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict)
@ -482,6 +483,22 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
return -1;
}
/**
* @return Whether this bulk request contains index request with an ingest pipeline enabled.
*/
public boolean hasIndexRequestsWithPipelines() {
for (ActionRequest actionRequest : requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
if (Strings.hasText(indexRequest.getPipeline())) {
return true;
}
}
}
return false;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;

View File

@ -368,7 +368,7 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
/**
* Sets the ingest pipeline to be executed before indexing the document
*/
public IndexRequest pipeline(String pipeline) {
public IndexRequest setPipeline(String pipeline) {
this.pipeline = pipeline;
return this;
}
@ -376,7 +376,7 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
/**
* Returns the ingest pipeline to be executed before indexing the document
*/
public String pipeline() {
public String getPipeline() {
return this.pipeline;
}

View File

@ -283,7 +283,7 @@ public class IndexRequestBuilder extends ReplicationRequestBuilder<IndexRequest,
* Sets the ingest pipeline to be executed before indexing the document
*/
public IndexRequestBuilder setPipeline(String pipeline) {
request.pipeline(pipeline);
request.setPipeline(pipeline);
return this;
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -32,21 +33,20 @@ public class GetPipelineRequest extends MasterNodeReadRequest<GetPipelineRequest
private String[] ids;
public void ids(String... ids) {
this.ids = ids;
public void setIds(String... ids) {
this.ids = Objects.requireNonNull(ids);
if (ids.length == 0) {
throw new IllegalArgumentException("No ids specified");
}
}
public String[] ids() {
public String[] getIds() {
return ids;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (ids == null || ids.length == 0) {
validationException = addValidationError("ids is missing", validationException);
}
return validationException;
return null;
}
@Override

View File

@ -29,7 +29,7 @@ public class GetPipelineRequestBuilder extends MasterNodeReadOperationRequestBui
}
public GetPipelineRequestBuilder setIds(String... ids) {
request.ids(ids);
request.setIds(ids);
return this;
}

View File

@ -58,7 +58,7 @@ public class GetPipelineTransportAction extends TransportMasterNodeReadAction<Ge
@Override
protected void masterOperation(GetPipelineRequest request, ClusterState state, ActionListener<GetPipelineResponse> listener) throws Exception {
listener.onResponse(new GetPipelineResponse(pipelineStore.getPipelines(request.ids())));
listener.onResponse(new GetPipelineResponse(pipelineStore.getPipelines(request.getIds())));
}
@Override

View File

@ -56,36 +56,29 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
@Override
public void apply(Task task, String action, ActionRequest<?> request, ActionListener<?> listener, ActionFilterChain chain) {
if (IndexAction.NAME.equals(action)) {
assert request instanceof IndexRequest;
IndexRequest indexRequest = (IndexRequest) request;
if (Strings.hasText(indexRequest.pipeline())) {
processIndexRequest(task, action, listener, chain, (IndexRequest) request);
return;
}
}
if (BulkAction.NAME.equals(action)) {
assert request instanceof BulkRequest;
BulkRequest bulkRequest = (BulkRequest) request;
boolean isIngestRequest = false;
for (ActionRequest actionRequest : bulkRequest.requests()) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
if (Strings.hasText(indexRequest.pipeline())) {
isIngestRequest = true;
break;
}
switch (action) {
case IndexAction.NAME:
IndexRequest indexRequest = (IndexRequest) request;
if (Strings.hasText(indexRequest.getPipeline())) {
processIndexRequest(task, action, listener, chain, (IndexRequest) request);
} else {
chain.proceed(task, action, request, listener);
}
}
if (isIngestRequest) {
@SuppressWarnings("unchecked")
ActionListener<BulkResponse> actionListener = (ActionListener<BulkResponse>) listener;
processBulkIndexRequest(task, bulkRequest, action, chain, actionListener);
return;
}
break;
case BulkAction.NAME:
BulkRequest bulkRequest = (BulkRequest) request;
if (bulkRequest.hasIndexRequestsWithPipelines()) {
@SuppressWarnings("unchecked")
ActionListener<BulkResponse> actionListener = (ActionListener<BulkResponse>) listener;
processBulkIndexRequest(task, bulkRequest, action, chain, actionListener);
} else {
chain.proceed(task, action, request, listener);
}
break;
default:
chain.proceed(task, action, request, listener);
break;
}
chain.proceed(task, action, request, listener);
}
@Override
@ -96,13 +89,13 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
void processIndexRequest(Task task, String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest) {
executionService.execute(indexRequest, t -> {
logger.error("failed to execute pipeline [{}]", t, indexRequest.pipeline());
logger.error("failed to execute pipeline [{}]", t, indexRequest.getPipeline());
listener.onFailure(t);
}, success -> {
// TransportIndexAction uses IndexRequest and same action name on the node that receives the request and the node that
// processes the primary action. This could lead to a pipeline being executed twice for the same
// index request, hence we set the pipeline to null once its execution completed.
indexRequest.pipeline(null);
indexRequest.setPipeline(null);
chain.proceed(task, action, indexRequest, listener);
});
}
@ -110,7 +103,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
void processBulkIndexRequest(Task task, BulkRequest original, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
executionService.execute(() -> bulkRequestModifier, (indexRequest, throwable) -> {
logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", indexRequest.pipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id(), throwable);
logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id(), throwable);
bulkRequestModifier.markCurrentItemAsFailed(throwable);
}, (success) -> {
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();

View File

@ -65,7 +65,7 @@ public final class IngestProxyActionFilter implements ActionFilter {
ingestAction = IndexAction.INSTANCE;
assert request instanceof IndexRequest;
IndexRequest indexRequest = (IndexRequest) request;
isIngestRequest = Strings.hasText(indexRequest.pipeline());
isIngestRequest = Strings.hasText(indexRequest.getPipeline());
} else if (BulkAction.NAME.equals(action)) {
ingestAction = BulkAction.INSTANCE;
assert request instanceof BulkRequest;
@ -73,7 +73,7 @@ public final class IngestProxyActionFilter implements ActionFilter {
for (ActionRequest actionRequest : bulkRequest.requests()) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
if (Strings.hasText(indexRequest.pipeline())) {
if (Strings.hasText(indexRequest.getPipeline())) {
isIngestRequest = true;
break;
}

View File

@ -49,6 +49,7 @@ import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.NodeModule;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsModule;
@ -150,7 +151,8 @@ public class TransportClient extends AbstractClient {
// noop
}
});
modules.add(new ActionModule(settings, true));
boolean ingestEnabled = NodeModule.isNodeIngestEnabled(settings);
modules.add(new ActionModule(ingestEnabled, true));
modules.add(new CircuitBreakerModule(settings));
pluginsService.processModules(modules);

View File

@ -41,7 +41,7 @@ public class PipelineExecutionService {
}
public void execute(IndexRequest request, Consumer<Throwable> failureHandler, Consumer<Boolean> completionHandler) {
Pipeline pipeline = getPipeline(request.pipeline());
Pipeline pipeline = getPipeline(request.getPipeline());
threadPool.executor(ThreadPool.Names.INDEX).execute(() -> {
try {
innerExecute(request, pipeline);
@ -58,11 +58,11 @@ public class PipelineExecutionService {
for (ActionRequest actionRequest : actionRequests) {
if ((actionRequest instanceof IndexRequest)) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
if (Strings.hasText(indexRequest.pipeline())) {
if (Strings.hasText(indexRequest.getPipeline())) {
try {
innerExecute(indexRequest, getPipeline(indexRequest.pipeline()));
innerExecute(indexRequest, getPipeline(indexRequest.getPipeline()));
//this shouldn't be needed here but we do it for consistency with index api which requires it to prevent double execution
indexRequest.pipeline(null);
indexRequest.setPipeline(null);
} catch (Throwable e) {
itemFailureHandler.accept(indexRequest, e);
}

View File

@ -190,7 +190,8 @@ public class Node implements Releasable {
modules.add(new ClusterModule(this.settings));
modules.add(new IndicesModule());
modules.add(new SearchModule(settings, namedWriteableRegistry));
modules.add(new ActionModule(settings, false));
boolean ingestEnabled = NodeModule.isNodeIngestEnabled(settings);
modules.add(new ActionModule(ingestEnabled, false));
modules.add(new GatewayModule(settings));
modules.add(new NodeClientModule());
modules.add(new PercolatorModule());

View File

@ -77,7 +77,7 @@ public class RestIndexAction extends BaseRestHandler {
if (request.hasParam("ttl")) {
indexRequest.ttl(request.param("ttl"));
}
indexRequest.pipeline(request.param("pipeline"));
indexRequest.setPipeline(request.param("pipeline"));
indexRequest.source(request.content());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));

View File

@ -41,7 +41,7 @@ public class RestGetPipelineAction extends BaseRestHandler {
@Override
protected void handleRequest(RestRequest restRequest, RestChannel channel, Client client) throws Exception {
GetPipelineRequest request = new GetPipelineRequest();
request.ids(Strings.splitStringByCommaToArray(restRequest.param("id")));
request.setIds(Strings.splitStringByCommaToArray(restRequest.param("id")));
request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
client.getPipeline(request, new RestStatusToXContentListener<>(channel));
}

View File

@ -99,7 +99,7 @@ public class IngestActionFilterTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testApplyIngestIdViaRequestParam() throws Exception {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
indexRequest.source("field", "value");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -113,7 +113,7 @@ public class IngestActionFilterTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testApplyExecuted() throws Exception {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
indexRequest.source("field", "value");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -135,7 +135,7 @@ public class IngestActionFilterTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testApplyFailed() throws Exception {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
indexRequest.source("field", "value");
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
@ -196,7 +196,7 @@ public class IngestActionFilterTests extends ESTestCase {
}
bulkRequest.add(request);
} else {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id");
indexRequest.source("field1", "value1");
bulkRequest.add(indexRequest);
}
@ -239,9 +239,9 @@ public class IngestActionFilterTests extends ESTestCase {
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").pipeline("_id").source("field", "value");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id").source("field", "value");
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
assertThat(indexRequest.pipeline(), nullValue());
assertThat(indexRequest.getPipeline(), nullValue());
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
verify(executionService, times(1)).execute(same(indexRequest), any(Consumer.class), any(Consumer.class));
verify(actionFilterChain, times(2)).proceed(task, IndexAction.NAME, indexRequest, actionListener);

View File

@ -102,10 +102,10 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionRequest request;
if (randomBoolean()) {
action = IndexAction.NAME;
request = new IndexRequest().pipeline("_id");
request = new IndexRequest().setPipeline("_id");
} else {
action = BulkAction.NAME;
request = new BulkRequest().add(new IndexRequest().pipeline("_id"));
request = new BulkRequest().add(new IndexRequest().setPipeline("_id"));
}
try {
filter.apply(task, action, request, actionListener, actionFilterChain);
@ -169,7 +169,7 @@ public class IngestProxyActionFilterTests extends ESTestCase {
};
doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class));
IndexRequest indexRequest = new IndexRequest().pipeline("_id");
IndexRequest indexRequest = new IndexRequest().setPipeline("_id");
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(IndexAction.NAME), same(indexRequest), any(TransportResponseHandler.class));
@ -193,7 +193,7 @@ public class IngestProxyActionFilterTests extends ESTestCase {
doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class));
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest().pipeline("_id"));
bulkRequest.add(new IndexRequest().setPipeline("_id"));
int numNoPipelineRequests = randomIntBetween(0, 10);
for (int i = 0; i < numNoPipelineRequests; i++) {
bulkRequest.add(new IndexRequest());
@ -224,10 +224,10 @@ public class IngestProxyActionFilterTests extends ESTestCase {
ActionRequest request;
if (randomBoolean()) {
action = IndexAction.NAME;
request = new IndexRequest().pipeline("_id");
request = new IndexRequest().setPipeline("_id");
} else {
action = BulkAction.NAME;
request = new BulkRequest().add(new IndexRequest().pipeline("_id"));
request = new BulkRequest().add(new IndexRequest().setPipeline("_id"));
}
filter.apply(task, action, request, actionListener, actionFilterChain);

View File

@ -145,7 +145,7 @@ public class IngestClientIT extends ESIntegTestCase {
int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numRequests; i++) {
IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i)).pipeline("_id");
IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i)).setPipeline("_id");
indexRequest.source("field", "value", "fail", i % 2 == 0);
bulkRequest.add(indexRequest);
}
@ -180,7 +180,7 @@ public class IngestClientIT extends ESIntegTestCase {
client().putPipeline(putPipelineRequest).get();
GetPipelineRequest getPipelineRequest = new GetPipelineRequest();
getPipelineRequest.ids("_id");
getPipelineRequest.setIds("_id");
GetPipelineResponse getResponse = client().getPipeline(getPipelineRequest).get();
assertThat(getResponse.isFound(), is(true));
assertThat(getResponse.pipelines().size(), equalTo(1));

View File

@ -71,7 +71,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
}
public void testExecuteIndexPipelineDoesNotExist() {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
@ -91,9 +91,9 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("_id");
IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("does_not_exist");
IndexRequest indexRequest2 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist");
bulkRequest.add(indexRequest2);
@SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Throwable> failureHandler = mock(BiConsumer.class);
@ -122,7 +122,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
CompoundProcessor processor = mock(CompoundProcessor.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
@ -148,7 +148,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
}).when(processor).execute(any());
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
@ -170,7 +170,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecuteFailure() throws Exception {
CompoundProcessor processor = mock(CompoundProcessor.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", processor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@ -187,7 +187,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
Processor onFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@ -203,7 +203,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
Processor onFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
@ -223,7 +223,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor),
Collections.singletonList(new CompoundProcessor(Collections.singletonList(onFailureProcessor), Collections.singletonList(onFailureOnFailureProcessor))));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
doThrow(new RuntimeException()).when(onFailureOnFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(onFailureProcessor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@ -241,7 +241,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("_ttl", "5d"));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
@ -257,7 +257,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
Processor processor = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("_ttl", "abc"));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", new CompoundProcessor(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("_id");
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
Consumer<Throwable> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
@ -270,7 +270,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecuteProvidedTTL() throws Exception {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", mock(CompoundProcessor.class)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").pipeline("_id")
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline("_id")
.source(Collections.emptyMap())
.ttl(1000L);
Consumer<Throwable> failureHandler = mock(Consumer.class);
@ -297,7 +297,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
request = new UpdateRequest("_index", "_type", "_id");
}
} else {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").pipeline(pipelineId);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId);
indexRequest.source("field1", "value1");
request = indexRequest;
numIndexRequests++;
@ -324,7 +324,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
int numRequest = scaledRandomIntBetween(8, 64);
for (int i = 0; i < numRequest; i++) {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").pipeline(pipelineId);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId);
indexRequest.source("field1", "value1");
bulkRequest.add(indexRequest);
}