addressed IngestProxyActionFilter comments

This commit is contained in:
Martijn van Groningen 2016-01-20 13:44:27 +01:00
parent 8a7f3d9d6f
commit c663aa0dec
1 changed files with 22 additions and 26 deletions

View File

@ -59,34 +59,30 @@ public final class IngestProxyActionFilter implements ActionFilter {
@Override
public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
Action ingestAction = null;
boolean isIngestRequest = false;
if (IndexAction.NAME.equals(action)) {
ingestAction = IndexAction.INSTANCE;
assert request instanceof IndexRequest;
IndexRequest indexRequest = (IndexRequest) request;
isIngestRequest = Strings.hasText(indexRequest.getPipeline());
} else if (BulkAction.NAME.equals(action)) {
ingestAction = BulkAction.INSTANCE;
assert request instanceof BulkRequest;
BulkRequest bulkRequest = (BulkRequest) request;
for (ActionRequest actionRequest : bulkRequest.requests()) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
if (Strings.hasText(indexRequest.getPipeline())) {
isIngestRequest = true;
break;
}
Action ingestAction;
switch (action) {
case IndexAction.NAME:
ingestAction = IndexAction.INSTANCE;
IndexRequest indexRequest = (IndexRequest) request;
if (Strings.hasText(indexRequest.getPipeline())) {
forwardIngestRequest(ingestAction, request, listener);
} else {
chain.proceed(task, action, request, listener);
}
}
break;
case BulkAction.NAME:
ingestAction = BulkAction.INSTANCE;
BulkRequest bulkRequest = (BulkRequest) request;
if (bulkRequest.hasIndexRequestsWithPipelines()) {
forwardIngestRequest(ingestAction, request, listener);
} else {
chain.proceed(task, action, request, listener);
}
break;
default:
chain.proceed(task, action, request, listener);
break;
}
if (isIngestRequest) {
assert ingestAction != null;
forwardIngestRequest(ingestAction, request, listener);
return;
}
chain.proceed(task, action, request, listener);
}
private void forwardIngestRequest(Action action, ActionRequest request, ActionListener listener) {