remove use of already processed header in favour of resetting the pipeline id to null

This commit is contained in:
javanna 2016-01-11 17:30:58 +01:00 committed by Luca Cavanna
parent 362deb4579
commit b4baa6c7ab
3 changed files with 31 additions and 10 deletions

View File

@ -46,8 +46,6 @@ import java.util.Set;
public final class IngestActionFilter extends AbstractComponent implements ActionFilter {
static final String PIPELINE_ALREADY_PROCESSED = "ingest_already_processed";
private final PipelineExecutionService executionService;
@Inject
@ -96,18 +94,15 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
void processIndexRequest(Task task, String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest) {
// The IndexRequest has the same type on the node that receives the request and the node that
// processes the primary action. This could lead to a pipeline being executed twice for the same
// index request, hence this check
if (indexRequest.hasHeader(PIPELINE_ALREADY_PROCESSED)) {
chain.proceed(task, action, indexRequest, listener);
return;
}
executionService.execute(indexRequest, t -> {
logger.error("failed to execute pipeline [{}]", t, indexRequest.pipeline());
listener.onFailure(t);
}, success -> {
indexRequest.putHeader(PIPELINE_ALREADY_PROCESSED, true);
// TransportIndexAction uses IndexRequest and same action name on the node that receives the request and the node that
// processes the primary action. This could lead to a pipeline being executed twice for the same
// index request, hence we set the pipeline to null once its execution completed.
indexRequest.pipeline(null);
chain.proceed(task, action, indexRequest, listener);
});
}

View File

@ -61,6 +61,8 @@ public class PipelineExecutionService {
if (Strings.hasText(indexRequest.pipeline())) {
try {
innerExecute(indexRequest, getPipeline(indexRequest.pipeline()));
//this shouldn't be needed here but we do it for consistency with index api which requires it to prevent double execution
indexRequest.pipeline(null);
} catch (Throwable e) {
itemFailureHandler.accept(new Tuple<>(indexRequest, e));
}

View File

@ -45,10 +45,12 @@ import org.mockito.stubbing.Answer;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
@ -212,4 +214,26 @@ public class IngestActionFilterTests extends ESTestCase {
assertThat(assertedRequests, equalTo(numRequest));
});
}
@SuppressWarnings("unchecked")
public void testIndexApiSinglePipelineExecution() {
Answer answer = invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<Boolean> listener = (Consumer) invocationOnMock.getArguments()[2];
listener.accept(true);
return null;
};
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), any(Consumer.class), any(Consumer.class));
Task task = mock(Task.class);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").pipeline("_id").source("field", "value");
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
assertThat(indexRequest.pipeline(), nullValue());
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
verify(executionService, times(1)).execute(same(indexRequest), any(Consumer.class), any(Consumer.class));
verify(actionFilterChain, times(2)).proceed(task, IndexAction.NAME, indexRequest, actionListener);
}
}