From 995bf0ee668282fbde420eaf2eb028af75a74d12 Mon Sep 17 00:00:00 2001
From: Przemyslaw Gomulka <przemyslaw.gomulka@elastic.co>
Date: Tue, 30 Oct 2018 09:08:12 +0100
Subject: [PATCH] Bulk Api support for global parameters (#34528)

Bulk Request in High level rest client should be consistent with what is
possible in Rest API, therefore should support global parameters. Global
parameters are passed in URL in Rest API.

Some parameters are mandatory - index, type - and would fail validation
if not provided before before the bulk is executed.
Optional parameters - routing, pipeline.

The usage of these should be consistent across sync/async execution,
bulk processor and BulkRequestBuilder

closes #26026
---
 .../client/RequestConverters.java             |   3 +-
 .../elasticsearch/client/BulkProcessorIT.java | 133 ++++++++++-
 .../BulkRequestWithGlobalParametersIT.java    | 217 ++++++++++++++++++
 .../client/ESRestHighLevelClientTestCase.java |  52 +++++
 .../client/RequestConvertersTests.java        |  16 ++
 .../documentation/CRUDDocumentationIT.java    |  10 +
 docs/java-api/docs/bulk.asciidoc              |  23 ++
 .../high-level/document/bulk.asciidoc         |  19 ++
 .../elasticsearch/action/DocWriteRequest.java |  13 ++
 .../action/bulk/BulkProcessor.java            |  43 +++-
 .../action/bulk/BulkRequest.java              |  47 +++-
 .../action/bulk/BulkRequestBuilder.java       |  14 ++
 .../action/delete/DeleteRequest.java          |   1 +
 .../action/index/IndexRequest.java            |   1 +
 .../java/org/elasticsearch/client/Client.java |   5 +
 .../client/support/AbstractClient.java        |   5 +
 .../action/bulk/BulkIntegrationIT.java        |  66 ++++++
 .../action/bulk/BulkProcessorTests.java       |  54 ++---
 .../bulk/simple-bulk-missing-index-type.json  |   5 +
 .../hamcrest/ElasticsearchAssertions.java     |  10 +
 .../test/hamcrest/ElasticsearchMatchers.java  |  28 +++
 21 files changed, 718 insertions(+), 47 deletions(-)
 create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java
 create mode 100644 server/src/test/resources/org/elasticsearch/action/bulk/simple-bulk-missing-index-type.json

diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
index adc1e93efde..3acb8d82973 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
@@ -118,7 +118,8 @@ final class RequestConverters {
         Params parameters = new Params(request);
         parameters.withTimeout(bulkRequest.timeout());
         parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy());
-
+        parameters.withPipeline(bulkRequest.pipeline());
+        parameters.withRouting(bulkRequest.routing());
         // Bulk API only supports newline delimited JSON or Smile. Before executing
         // the bulk, we need to check that all requests have the same content-type
         // and this content-type is supported by the Bulk API.
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java
index fdd5634ddd6..378eb4f0069 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java
@@ -28,14 +28,18 @@ import org.elasticsearch.action.get.MultiGetItemResponse;
 import org.elasticsearch.action.get.MultiGetRequest;
 import org.elasticsearch.action.get.MultiGetResponse;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.search.SearchHit;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -44,10 +48,19 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
 
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.fieldFromSource;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType;
 import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
@@ -268,23 +281,124 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
         assertMultiGetResponse(highLevelClient().mget(multiGetRequest, RequestOptions.DEFAULT), testDocs);
     }
 
-    private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
+    @SuppressWarnings("unchecked")
+    public void testGlobalParametersAndSingleRequest() throws Exception {
+        createIndexWithMultipleShards("test");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
+        createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
+
+        // tag::bulk-processor-mix-parameters
+        try (BulkProcessor processor = initBulkProcessorBuilder(listener)
+                .setGlobalIndex("tweets")
+                .setGlobalType("_doc")
+                .setGlobalRouting("routing")
+                .setGlobalPipeline("pipeline_id")
+                .build()) {
+
+
+            processor.add(new IndexRequest() // <1>
+                .source(XContentType.JSON, "user", "some user"));
+            processor.add(new IndexRequest("blogs", "post_type", "1") // <2>
+                .source(XContentType.JSON, "title", "some title"));
+        }
+        // end::bulk-request-mix-pipeline
+        latch.await();
+
+        Iterable<SearchHit> hits = searchAll(new SearchRequest("tweets").routing("routing"));
+        assertThat(hits, everyItem(hasProperty(fieldFromSource("user"), equalTo("some user"))));
+        assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
+
+
+        Iterable<SearchHit> blogs = searchAll(new SearchRequest("blogs").routing("routing"));
+        assertThat(blogs, everyItem(hasProperty(fieldFromSource("title"), equalTo("some title"))));
+        assertThat(blogs, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testGlobalParametersAndBulkProcessor() throws Exception {
+        createIndexWithMultipleShards("test");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
+        createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
+
+        int numDocs = randomIntBetween(10, 10);
+        try (BulkProcessor processor = initBulkProcessorBuilder(listener)
+                //let's make sure that the bulk action limit trips, one single execution will index all the documents
+                .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
+                .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
+                .setGlobalIndex("test")
+                .setGlobalType("test")
+                .setGlobalRouting("routing")
+                .setGlobalPipeline("pipeline_id")
+                .build()) {
+
+            indexDocs(processor, numDocs, null, null, "test", "test", "pipeline_id");
+            latch.await();
+
+            assertThat(listener.beforeCounts.get(), equalTo(1));
+            assertThat(listener.afterCounts.get(), equalTo(1));
+            assertThat(listener.bulkFailures.size(), equalTo(0));
+            assertResponseItems(listener.bulkItems, numDocs);
+
+            Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));
+
+            assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
+            assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType("test"))));
+            assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private Matcher<SearchHit>[] expectedIds(int numDocs) {
+        return IntStream.rangeClosed(1, numDocs)
+            .boxed()
+            .map(n -> hasId(n.toString()))
+            .<Matcher<SearchHit>>toArray(Matcher[]::new);
+    }
+
+    private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
+                                             String globalIndex, String globalType, String globalPipeline) throws Exception {
         MultiGetRequest multiGetRequest = new MultiGetRequest();
         for (int i = 1; i <= numDocs; i++) {
             if (randomBoolean()) {
-                processor.add(new IndexRequest("test", "test", Integer.toString(i))
-                        .source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
+                processor.add(new IndexRequest(localIndex, localType, Integer.toString(i))
+                    .source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
             } else {
-                final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n"
-                        + Strings.toString(JsonXContent.contentBuilder()
-                        .startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject()) + "\n";
-                processor.add(new BytesArray(source), null, null, XContentType.JSON);
+                BytesArray data = bytesBulkRequest(localIndex, localType, i);
+                processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON);
             }
-            multiGetRequest.add("test", "test", Integer.toString(i));
+            multiGetRequest.add(localIndex, localType, Integer.toString(i));
         }
         return multiGetRequest;
     }
 
+    private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException {
+        String action = Strings.toString(jsonBuilder()
+            .startObject()
+                .startObject("index")
+                    .field("_index", localIndex)
+                    .field("_type", localType)
+                    .field("_id", Integer.toString(id))
+                .endObject()
+            .endObject()
+        );
+        String source = Strings.toString(jsonBuilder()
+            .startObject()
+                .field("field", randomRealisticUnicodeOfLengthBetween(1, 30))
+            .endObject()
+        );
+
+        String request = action + "\n" + source + "\n";
+        return new BytesArray(request);
+    }
+
+    private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
+        return indexDocs(processor, numDocs, "test", "test", null, null, null);
+    }
+
     private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
         assertThat(bulkItemResponses.size(), is(numDocs));
         int i = 1;
@@ -343,4 +457,5 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
         }
     }
 
+
 }
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java
new file mode 100644
index 00000000000..cf8f1ebfdbd
--- /dev/null
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client;
+
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.search.SearchHit;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+public class BulkRequestWithGlobalParametersIT extends ESRestHighLevelClientTestCase {
+
+    @SuppressWarnings("unchecked")
+    public void testGlobalPipelineOnBulkRequest() throws IOException {
+        createFieldAddingPipleine("xyz", "fieldNameXYZ", "valueXYZ");
+
+        BulkRequest request = new BulkRequest();
+        request.add(new IndexRequest("test", "doc", "1")
+            .source(XContentType.JSON, "field", "bulk1"));
+        request.add(new IndexRequest("test", "doc", "2")
+            .source(XContentType.JSON, "field", "bulk2"));
+        request.pipeline("xyz");
+
+        bulk(request);
+
+        Iterable<SearchHit> hits = searchAll("test");
+        assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2")));
+        assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
+    }
+
+    public void testPipelineOnRequestOverridesGlobalPipeline() throws IOException {
+        createFieldAddingPipleine("globalId", "fieldXYZ", "valueXYZ");
+        createFieldAddingPipleine("perIndexId", "someNewField", "someValue");
+
+        BulkRequest request = new BulkRequest();
+        request.pipeline("globalId");
+        request.add(new IndexRequest("test", "doc", "1")
+            .source(XContentType.JSON, "field", "bulk1")
+            .setPipeline("perIndexId"));
+        request.add(new IndexRequest("test", "doc", "2")
+            .source(XContentType.JSON, "field", "bulk2")
+            .setPipeline("perIndexId"));
+
+        bulk(request);
+
+        Iterable<SearchHit> hits = searchAll("test");
+        assertThat(hits, everyItem(hasProperty(fieldFromSource("someNewField"), equalTo("someValue"))));
+        // global pipeline was not applied
+        assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldXYZ"), nullValue())));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testMixPipelineOnRequestAndGlobal() throws IOException {
+        createFieldAddingPipleine("globalId", "fieldXYZ", "valueXYZ");
+        createFieldAddingPipleine("perIndexId", "someNewField", "someValue");
+
+        // tag::bulk-request-mix-pipeline
+        BulkRequest request = new BulkRequest();
+        request.pipeline("globalId");
+
+        request.add(new IndexRequest("test", "doc", "1")
+            .source(XContentType.JSON, "field", "bulk1")
+            .setPipeline("perIndexId")); // <1>
+
+        request.add(new IndexRequest("test", "doc", "2")
+            .source(XContentType.JSON, "field", "bulk2")); // <2>
+        // end::bulk-request-mix-pipeline
+        bulk(request);
+
+        Iterable<SearchHit> hits = searchAll("test");
+        assertThat(hits, containsInAnyOrder(
+            both(hasId("1"))
+                .and(hasProperty(fieldFromSource("someNewField"), equalTo("someValue"))),
+            both(hasId("2"))
+                .and(hasProperty(fieldFromSource("fieldXYZ"), equalTo("valueXYZ")))));
+    }
+
+    public void testGlobalIndex() throws IOException {
+        BulkRequest request = new BulkRequest("global_index", null);
+        request.add(new IndexRequest().type("doc").id("1")
+            .source(XContentType.JSON, "field", "bulk1"));
+        request.add(new IndexRequest().type("doc").id("2")
+            .source(XContentType.JSON, "field", "bulk2"));
+
+        bulk(request);
+
+        Iterable<SearchHit> hits = searchAll("global_index");
+        assertThat(hits, everyItem(hasIndex("global_index")));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testIndexGlobalAndPerRequest() throws IOException {
+        BulkRequest request = new BulkRequest("global_index", null);
+        request.add(new IndexRequest("local_index", "doc", "1")
+            .source(XContentType.JSON, "field", "bulk1"));
+        request.add(new IndexRequest().type("doc").id("2") // will take global index
+            .source(XContentType.JSON, "field", "bulk2"));
+
+        bulk(request);
+
+        Iterable<SearchHit> hits = searchAll("local_index", "global_index");
+        assertThat(hits, containsInAnyOrder(
+            both(hasId("1"))
+                .and(hasIndex("local_index")),
+            both(hasId("2"))
+                .and(hasIndex("global_index"))));
+    }
+
+    public void testGlobalType() throws IOException {
+        BulkRequest request = new BulkRequest(null, "global_type");
+        request.add(new IndexRequest("index").id("1")
+            .source(XContentType.JSON, "field", "bulk1"));
+        request.add(new IndexRequest("index").id("2")
+            .source(XContentType.JSON, "field", "bulk2"));
+
+        bulk(request);
+
+        Iterable<SearchHit> hits = searchAll("index");
+        assertThat(hits, everyItem(hasType("global_type")));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testTypeGlobalAndPerRequest() throws IOException {
+        BulkRequest request = new BulkRequest(null, "global_type");
+        request.add(new IndexRequest("index1", "local_type", "1")
+            .source(XContentType.JSON, "field", "bulk1"));
+        request.add(new IndexRequest("index2").id("2") // will take global type
+            .source(XContentType.JSON, "field", "bulk2"));
+
+        bulk(request);
+
+        Iterable<SearchHit> hits = searchAll("index1", "index2");
+        assertThat(hits, containsInAnyOrder(
+            both(hasId("1"))
+                .and(hasType("local_type")),
+            both(hasId("2"))
+                .and(hasType("global_type"))));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testGlobalRouting() throws IOException {
+        createIndexWithMultipleShards("index");
+        BulkRequest request = new BulkRequest(null, null);
+        request.add(new IndexRequest("index", "type", "1")
+            .source(XContentType.JSON, "field", "bulk1"));
+        request.add(new IndexRequest("index", "type", "2")
+            .source(XContentType.JSON, "field", "bulk1"));
+        request.routing("1");
+        bulk(request);
+        
+        Iterable<SearchHit> emptyHits = searchAll(new SearchRequest("index").routing("xxx"));
+        assertThat(emptyHits, is(emptyIterable()));
+
+        Iterable<SearchHit> hits = searchAll(new SearchRequest("index").routing("1"));
+        assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2")));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testMixLocalAndGlobalRouting() throws IOException {
+        BulkRequest request = new BulkRequest(null, null);
+        request.routing("globalRouting");
+        request.add(new IndexRequest("index", "type", "1")
+            .source(XContentType.JSON, "field", "bulk1"));
+        request.add(new IndexRequest("index", "type", "2")
+            .routing("localRouting")
+            .source(XContentType.JSON, "field", "bulk1"));
+
+        bulk(request);
+
+        Iterable<SearchHit> hits = searchAll(new SearchRequest("index").routing("globalRouting", "localRouting"));
+        assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2")));
+    }
+
+    private BulkResponse bulk(BulkRequest request) throws IOException {
+        BulkResponse bulkResponse = execute(request, highLevelClient()::bulk, highLevelClient()::bulkAsync);
+        assertFalse(bulkResponse.hasFailures());
+        return bulkResponse;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> Function<SearchHit, T> fieldFromSource(String fieldName) {
+        return (response) -> (T) response.getSourceAsMap().get(fieldName);
+    }
+}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java
index 07c0d818bfa..d31b9f04dbb 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java
@@ -21,7 +21,10 @@ package org.elasticsearch.client;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.ingest.PutPipelineRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
@@ -30,15 +33,20 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.ingest.Pipeline;
+import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.test.rest.ESRestTestCase;
 import org.junit.AfterClass;
 import org.junit.Before;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
 public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
 
@@ -125,6 +133,22 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
         return buildRandomXContentPipeline(pipelineBuilder);
     }
 
+    protected static void createFieldAddingPipleine(String id, String fieldName, String value) throws IOException {
+        XContentBuilder pipeline = jsonBuilder()
+            .startObject()
+                .startArray("processors")
+                    .startObject()
+                        .startObject("set")
+                            .field("field", fieldName)
+                            .field("value", value)
+                        .endObject()
+                    .endObject()
+                .endArray()
+            .endObject();
+
+        createPipeline(new PutPipelineRequest(id, BytesReference.bytes(pipeline), XContentType.JSON));
+    }
+
     protected static void createPipeline(String pipelineId) throws IOException {
         XContentBuilder builder = buildRandomXContentPipeline();
         createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));
@@ -154,4 +178,32 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
             .put(ThreadContext.PREFIX + ".Authorization", token)
             .build();
     }
+
+    protected Iterable<SearchHit> searchAll(String... indices) throws IOException {
+        SearchRequest searchRequest = new SearchRequest(indices);
+        return searchAll(searchRequest);
+    }
+
+    protected Iterable<SearchHit> searchAll(SearchRequest searchRequest) throws IOException {
+        refreshIndexes(searchRequest.indices());
+        SearchResponse search = highLevelClient().search(searchRequest, RequestOptions.DEFAULT);
+        return search.getHits();
+    }
+
+    protected void refreshIndexes(String... indices) throws IOException {
+        String joinedIndices = Arrays.stream(indices)
+            .collect(Collectors.joining(","));
+        Response refreshResponse = client().performRequest(new Request("POST", "/" + joinedIndices + "/_refresh"));
+        assertEquals(200, refreshResponse.getStatusLine().getStatusCode());
+    }
+
+    protected void createIndexWithMultipleShards(String index) throws IOException {
+        CreateIndexRequest indexRequest = new CreateIndexRequest(index);
+        int shards = randomIntBetween(8,10);
+        indexRequest.settings(Settings.builder()
+            .put("index.number_of_shards", shards)
+            .put("index.number_of_replicas", 0)
+        );
+        highLevelClient().indices().create(indexRequest, RequestOptions.DEFAULT);
+    }
 }
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
index e5922658fd9..4640ab56599 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
@@ -100,6 +100,7 @@ import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.RandomObjects;
+import org.hamcrest.Matchers;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -860,6 +861,21 @@ public class RequestConvertersTests extends ESTestCase {
         }
     }
 
+    public void testGlobalPipelineOnBulkRequest() throws IOException {
+        BulkRequest bulkRequest = new BulkRequest();
+        bulkRequest.pipeline("xyz");
+        bulkRequest.add(new IndexRequest("test", "doc", "11")
+            .source(XContentType.JSON, "field", "bulk1"));
+        bulkRequest.add(new IndexRequest("test", "doc", "12")
+            .source(XContentType.JSON, "field", "bulk2"));
+        bulkRequest.add(new IndexRequest("test", "doc", "13")
+            .source(XContentType.JSON, "field", "bulk3"));
+
+        Request request = RequestConverters.bulk(bulkRequest);
+
+        assertThat(request.getParameters(), Matchers.hasEntry("pipeline","xyz"));
+    }
+
     public void testSearchNullSource() throws IOException {
         SearchRequest searchRequest = new SearchRequest();
         Request request = RequestConverters.search(searchRequest);
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java
index 200a3a9b0f9..82c82784661 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java
@@ -749,6 +749,16 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
             request.waitForActiveShards(2); // <1>
             request.waitForActiveShards(ActiveShardCount.ALL); // <2>
             // end::bulk-request-active-shards
+            // tag::bulk-request-pipeline
+            request.pipeline("pipelineId"); // <1>
+            // end::bulk-request-pipeline
+            // tag::bulk-request-routing
+            request.routing("routingId"); // <1>
+            // end::bulk-request-routing
+
+            // tag::bulk-request-index-type
+            BulkRequest defaulted = new BulkRequest("posts","_doc"); // <1>
+            // end::bulk-request-index-type
 
             // tag::bulk-execute-listener
             ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
diff --git a/docs/java-api/docs/bulk.asciidoc b/docs/java-api/docs/bulk.asciidoc
index 03c6ae719e5..6141fabbf5b 100644
--- a/docs/java-api/docs/bulk.asciidoc
+++ b/docs/java-api/docs/bulk.asciidoc
@@ -165,3 +165,26 @@ client.admin().indices().prepareRefresh().get();
 client.prepareSearch().get();
 --------------------------------------------------
 
+
+[[java-docs-bulk-global-parameters]]
+==== Global Parameters
+
+Global parameters can be specified on the BulkRequest as well as BulkProcessor, similar to the REST API. These global
+ parameters serve as defaults and can be overridden by local parameters specified on each sub request. Some parameters
+ have to be set before any sub request is added - index, type - and you have to specify them during  BulkRequest or
+ BulkProcessor creation. Some are optional - pipeline, routing - and can be specified at any point before the bulk is sent.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/BulkProcessorIT.java[bulk-processor-mix-parameters]
+--------------------------------------------------
+<1> global parameters from the BulkRequest will be applied on a sub request
+<2> local pipeline parameter on a sub request will override global parameters from BulkRequest
+
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/BulkRequestWithGlobalParametersIT.java[bulk-request-mix-pipeline]
+--------------------------------------------------
+<1> local pipeline parameter on a sub request will override global pipeline from the BulkRequest
+<2> global parameter from the BulkRequest will be applied on a sub request
diff --git a/docs/java-rest/high-level/document/bulk.asciidoc b/docs/java-rest/high-level/document/bulk.asciidoc
index db9a3463135..d794779435a 100644
--- a/docs/java-rest/high-level/document/bulk.asciidoc
+++ b/docs/java-rest/high-level/document/bulk.asciidoc
@@ -70,6 +70,25 @@ the index/update/delete operations.
 `ActiveShardCount.ALL`, `ActiveShardCount.ONE` or
 `ActiveShardCount.DEFAULT` (default)
 
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-pipeline]
+--------------------------------------------------
+<1> Global pipelineId used on all sub requests, unless overridden on a sub request
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-routing]
+--------------------------------------------------
+<1> Global routingId used on all sub requests, unless overridden on a sub request
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-request-index-type]
+--------------------------------------------------
+<1> A bulk request with global index and type used on all sub requests, unless overridden on a sub request.
+Both parameters are @Nullable and can only be set during BulkRequest creation.
+
 include::../execution.asciidoc[]
 
 [id="{upid}-{api}-response"]
diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
index b0d553534e4..e2d01aad230 100644
--- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
@@ -35,12 +35,25 @@ import java.util.Locale;
  */
 public interface DocWriteRequest<T> extends IndicesRequest {
 
+    /**
+     * Set the index for this request
+     * @return the Request
+     */
+    T index(String index);
+
     /**
      * Get the index that this request operates on
      * @return the index
      */
     String index();
 
+
+    /**
+     * Set the type for this request
+     * @return the Request
+     */
+    T type(String type);
+
     /**
      * Get the type that this request operates on
      * @return the type
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
index a2cae6b7bec..c083c895677 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
+import java.util.function.Supplier;
 
 /**
  * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
@@ -88,6 +89,10 @@ public class BulkProcessor implements Closeable {
         private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
         private TimeValue flushInterval = null;
         private BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff();
+        private String globalIndex;
+        private String globalType;
+        private String globalRouting;
+        private String globalPipeline;
 
         private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener,
                         Scheduler scheduler, Runnable onClose) {
@@ -136,6 +141,26 @@ public class BulkProcessor implements Closeable {
             return this;
         }
 
+        public Builder setGlobalIndex(String globalIndex) {
+            this.globalIndex = globalIndex;
+            return this;
+        }
+
+        public Builder setGlobalType(String globalType) {
+            this.globalType = globalType;
+            return this;
+        }
+
+        public Builder setGlobalRouting(String globalRouting) {
+            this.globalRouting = globalRouting;
+            return this;
+        }
+
+        public Builder setGlobalPipeline(String globalPipeline) {
+            this.globalPipeline = globalPipeline;
+            return this;
+        }
+
         /**
          * Sets a custom backoff policy. The backoff policy defines how the bulk processor should handle retries of bulk requests internally
          * in case they have failed due to resource constraints (i.e. a thread pool was full).
@@ -156,8 +181,14 @@ public class BulkProcessor implements Closeable {
          * Builds a new bulk processor.
          */
         public BulkProcessor build() {
-            return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval,
-                    scheduler, onClose);
+            return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions,
+                bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults());
+        }
+
+        private Supplier<BulkRequest> createBulkRequestWithGlobalDefaults() {
+            return () -> new BulkRequest(globalIndex, globalType)
+                .pipeline(globalPipeline)
+                .routing(globalRouting);
         }
     }
 
@@ -184,6 +215,7 @@ public class BulkProcessor implements Closeable {
     private final AtomicLong executionIdGen = new AtomicLong();
 
     private BulkRequest bulkRequest;
+    private final Supplier<BulkRequest> bulkRequestSupplier;
     private final BulkRequestHandler bulkRequestHandler;
     private final Runnable onClose;
 
@@ -191,10 +223,11 @@ public class BulkProcessor implements Closeable {
 
     BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
                   int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
-                  Scheduler scheduler, Runnable onClose) {
+                  Scheduler scheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) {
         this.bulkActions = bulkActions;
         this.bulkSize = bulkSize.getBytes();
-        this.bulkRequest = new BulkRequest();
+        this.bulkRequest = bulkRequestSupplier.get();
+        this.bulkRequestSupplier = bulkRequestSupplier;
         this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
         // Start period flushing task after everything is setup
         this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
@@ -335,7 +368,7 @@ public class BulkProcessor implements Closeable {
         final BulkRequest bulkRequest = this.bulkRequest;
         final long executionId = executionIdGen.incrementAndGet();
 
-        this.bulkRequest = new BulkRequest();
+        this.bulkRequest = bulkRequestSupplier.get();
         this.bulkRequestHandler.execute(bulkRequest, executionId);
     }
 
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
index c10dcc02082..d2929a2dbc5 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
@@ -90,12 +90,21 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
     protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
     private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
     private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;
+    private String globalPipeline;
+    private String globalRouting;
+    private String globalIndex;
+    private String globalType;
 
     private long sizeInBytes = 0;
 
     public BulkRequest() {
     }
 
+    public BulkRequest(@Nullable String globalIndex, @Nullable String globalType) {
+        this.globalIndex = globalIndex;
+        this.globalType = globalType;
+    }
+
     /**
      * Adds a list of requests to be executed. Either index or delete requests.
      */
@@ -154,6 +163,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
 
     BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) {
         Objects.requireNonNull(request, "'request' must not be null");
+        applyGlobalMandatoryParameters(request);
+
         requests.add(request);
         addPayload(payload);
         // lack of source is validated in validate() method
@@ -175,6 +186,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
 
     BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) {
         Objects.requireNonNull(request, "'request' must not be null");
+        applyGlobalMandatoryParameters(request);
+
         requests.add(request);
         addPayload(payload);
         if (request.doc() != null) {
@@ -199,6 +212,8 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
 
     public BulkRequest add(DeleteRequest request, @Nullable Object payload) {
         Objects.requireNonNull(request, "'request' must not be null");
+        applyGlobalMandatoryParameters(request);
+
         requests.add(request);
         addPayload(payload);
         sizeInBytes += REQUEST_OVERHEAD;
@@ -327,13 +342,13 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
                 String index = defaultIndex;
                 String type = defaultType;
                 String id = null;
-                String routing = defaultRouting;
+                String routing = valueOrDefault(defaultRouting, globalRouting);
                 FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
                 String opType = null;
                 long version = Versions.MATCH_ANY;
                 VersionType versionType = VersionType.INTERNAL;
                 int retryOnConflict = 0;
-                String pipeline = defaultPipeline;
+                String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
 
                 // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
                 // or START_OBJECT which will have another set of parameters
@@ -503,6 +518,15 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
         return this;
     }
 
+    public final BulkRequest pipeline(String globalPipeline) {
+        this.globalPipeline = globalPipeline;
+        return this;
+    }
+
+    public final BulkRequest routing(String globalRouting){
+        this.globalRouting = globalRouting;
+        return this;
+    }
     /**
      * A timeout to wait if the index operation can't be performed immediately. Defaults to {@code 1m}.
      */
@@ -514,6 +538,14 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
         return timeout;
     }
 
+    public String pipeline() {
+        return globalPipeline;
+    }
+
+    public String routing() {
+        return globalRouting;
+    }
+
     private int findNextMarker(byte marker, int from, BytesReference data, int length) {
         for (int i = from; i < length; i++) {
             if (data.get(i) == marker) {
@@ -579,4 +611,15 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
         return "requests[" + requests.size() + "], indices[" + Strings.collectionToDelimitedString(indices, ", ") + "]";
     }
 
+    private void applyGlobalMandatoryParameters(DocWriteRequest<?> request) {
+        request.index(valueOrDefault(request.index(), globalIndex));
+        request.type(valueOrDefault(request.type(), globalType));
+    }
+
+    private static String valueOrDefault(String value, String globalDefault) {
+        if (Strings.isNullOrEmpty(value) && !Strings.isNullOrEmpty(globalDefault)) {
+            return globalDefault;
+        }
+        return value;
+    }
 }
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java
index a577569476c..fc91f4f907e 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java
@@ -41,6 +41,10 @@ import org.elasticsearch.common.xcontent.XContentType;
 public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkResponse>
         implements WriteRequestBuilder<BulkRequestBuilder> {
 
+    public BulkRequestBuilder(ElasticsearchClient client, BulkAction action, @Nullable String globalIndex, @Nullable String globalType) {
+        super(client, action, new BulkRequest(globalIndex, globalType));
+    }
+
     public BulkRequestBuilder(ElasticsearchClient client, BulkAction action) {
         super(client, action, new BulkRequest());
     }
@@ -153,4 +157,14 @@ public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkRe
     public int numberOfActions() {
         return request.numberOfActions();
     }
+
+    public BulkRequestBuilder pipeline(String globalPipeline) {
+        request.pipeline(globalPipeline);
+        return this;
+    }
+
+    public BulkRequestBuilder routing(String globalRouting) {
+        request.routing(globalRouting);
+        return this;
+    }
 }
diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java
index ee9c863c011..165aa7afd9e 100644
--- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java
@@ -110,6 +110,7 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
     /**
      * Sets the type of the document to delete.
      */
+    @Override
     public DeleteRequest type(String type) {
         this.type = type;
         return this;
diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
index 157ec351110..8f5fd156018 100644
--- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
@@ -214,6 +214,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
     /**
      * Sets the type of the indexed document.
      */
+    @Override
     public IndexRequest type(String type) {
         this.type = type;
         return this;
diff --git a/server/src/main/java/org/elasticsearch/client/Client.java b/server/src/main/java/org/elasticsearch/client/Client.java
index f97f618347a..d2be1fba086 100644
--- a/server/src/main/java/org/elasticsearch/client/Client.java
+++ b/server/src/main/java/org/elasticsearch/client/Client.java
@@ -232,6 +232,11 @@ public interface Client extends ElasticsearchClient, Releasable {
      */
     BulkRequestBuilder prepareBulk();
 
+    /**
+     * Executes a bulk of index / delete operations with default index and/or type
+     */
+    BulkRequestBuilder prepareBulk(@Nullable String globalIndex, @Nullable String globalType);
+
     /**
      * Gets the document that was indexed from an index with a type and id.
      *
diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java
index 7f2e6681294..e5450c320f4 100644
--- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java
+++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java
@@ -471,6 +471,11 @@ public abstract class AbstractClient extends AbstractComponent implements Client
         return new BulkRequestBuilder(this, BulkAction.INSTANCE);
     }
 
+    @Override
+    public BulkRequestBuilder prepareBulk(@Nullable String globalIndex, @Nullable String globalType) {
+        return new BulkRequestBuilder(this, BulkAction.INSTANCE, globalIndex, globalType);
+    }
+
     @Override
     public ActionFuture<GetResponse> get(final GetRequest request) {
         return execute(GetAction.INSTANCE, request);
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java
index 1fd912e72a4..3e61557869d 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java
@@ -20,22 +20,40 @@
 
 package org.elasticsearch.action.bulk;
 
+import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.admin.indices.alias.Alias;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.ingest.IngestTestPlugin;
+import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESIntegTestCase;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 
 public class BulkIntegrationIT extends ESIntegTestCase {
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Arrays.asList(IngestTestPlugin.class);
+    }
+
     public void testBulkIndexCreatesMapping() throws Exception {
         String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/bulk-log.json");
         BulkRequestBuilder bulkBuilder = client().prepareBulk();
@@ -81,4 +99,52 @@ public class BulkIntegrationIT extends ESIntegTestCase {
         assertFalse(bulkResponse.hasFailures());
         assertFalse(client().prepareGet("index3", "type", "id").setRouting("1").get().isExists());
     }
+
+    public void testBulkWithGlobalDefaults() throws Exception {
+        // all requests in the json are missing index and type parameters: "_index" : "test", "_type" : "type1",
+        String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk-missing-index-type.json");
+        {
+            BulkRequestBuilder bulkBuilder = client().prepareBulk();
+            bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON);
+            ActionRequestValidationException ex = expectThrows(ActionRequestValidationException.class, bulkBuilder::get);
+
+            assertThat(ex.validationErrors(), containsInAnyOrder(
+                "index is missing",
+                "index is missing",
+                "index is missing",
+                "type is missing",
+                "type is missing",
+                "type is missing"));
+        }
+
+        {
+            createSamplePipeline("pipeline");
+            BulkRequestBuilder bulkBuilder = client().prepareBulk("test","type1")
+                .routing("routing")
+                .pipeline("pipeline");
+
+            bulkBuilder.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null, XContentType.JSON);
+            BulkResponse bulkItemResponses = bulkBuilder.get();
+            assertFalse(bulkItemResponses.hasFailures());
+        }
+    }
+
+    private void createSamplePipeline(String pipelineId) throws IOException, ExecutionException, InterruptedException {
+        XContentBuilder pipeline = jsonBuilder()
+            .startObject()
+                .startArray("processors")
+                    .startObject()
+                        .startObject("test")
+                        .endObject()
+                    .endObject()
+                .endArray()
+            .endObject();
+
+        AcknowledgedResponse acknowledgedResponse = client().admin()
+            .cluster()
+            .putPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(pipeline), XContentType.JSON))
+            .get();
+
+        assertTrue(acknowledgedResponse.isAcknowledged());
+    }
 }
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
index 3fbfa381ad3..6a7d9bc02ec 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java
@@ -72,19 +72,9 @@ public class BulkProcessorTests extends ESTestCase {
         try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
             threadPool.getThreadContext().putHeader(headerKey, headerValue);
             threadPool.getThreadContext().putTransient(transientKey, transientValue);
-            bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() {
-                @Override
-                public void beforeBulk(long executionId, BulkRequest request) {
-                }
-
-                @Override
-                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-                }
-
-                @Override
-                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-                }
-            }, 1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval, threadPool, () -> {});
+            bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(),
+                1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval,
+                threadPool, () -> {}, BulkRequest::new);
         }
         assertNull(threadPool.getThreadContext().getHeader(headerKey));
         assertNull(threadPool.getThreadContext().getTransient(transientKey));
@@ -100,28 +90,32 @@ public class BulkProcessorTests extends ESTestCase {
         bulkProcessor.close();
     }
 
+
     public void testAwaitOnCloseCallsOnClose() throws Exception {
         final AtomicBoolean called = new AtomicBoolean(false);
-        BulkProcessor bulkProcessor = new BulkProcessor((request, listener) -> {
-        }, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() {
-            @Override
-            public void beforeBulk(long executionId, BulkRequest request) {
-
-            }
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
-
-            }
-
-            @Override
-            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
-
-            }
-        }, 0, 10, new ByteSizeValue(1000), null, (delay, executor, command) -> null, () -> called.set(true));
+        BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {};
+        BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(),
+            0, 10, new ByteSizeValue(1000), null,
+            (delay, executor, command) -> null, () -> called.set(true), BulkRequest::new);
 
         assertFalse(called.get());
         bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS);
         assertTrue(called.get());
     }
+
+    private BulkProcessor.Listener emptyListener() {
+        return new BulkProcessor.Listener() {
+            @Override
+            public void beforeBulk(long executionId, BulkRequest request) {
+            }
+
+            @Override
+            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
+            }
+
+            @Override
+            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+            }
+        };
+    }
 }
diff --git a/server/src/test/resources/org/elasticsearch/action/bulk/simple-bulk-missing-index-type.json b/server/src/test/resources/org/elasticsearch/action/bulk/simple-bulk-missing-index-type.json
new file mode 100644
index 00000000000..2edb45742b7
--- /dev/null
+++ b/server/src/test/resources/org/elasticsearch/action/bulk/simple-bulk-missing-index-type.json
@@ -0,0 +1,5 @@
+{ "index":{"_id":"1"} }
+{ "field1" : "value1" }
+{ "delete" : {  "_id" : "2" } }
+{ "create" : {  "_id" : "3" } }
+{ "field1" : "value3" }
diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java
index 65ed746acca..1a7e1c16f7b 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java
@@ -58,6 +58,7 @@ import org.elasticsearch.search.suggest.Suggest;
 import org.elasticsearch.test.NotEqualMessageBuilder;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.Matcher;
+import org.hamcrest.core.CombinableMatcher;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -70,6 +71,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -472,6 +474,14 @@ public class ElasticsearchAssertions {
         return new ElasticsearchMatchers.SearchHitHasScoreMatcher(score);
     }
 
+    public static <T, V> CombinableMatcher<T> hasProperty(Function<? super T, ? extends V> property, Matcher<V> valueMatcher) {
+        return ElasticsearchMatchers.HasPropertyLambdaMatcher.hasProperty(property, valueMatcher);
+    }
+
+    public static Function<SearchHit, Object> fieldFromSource(String fieldName) {
+        return (response) ->  response.getSourceAsMap().get(fieldName);
+    }
+
     public static <T extends Query> T assertBooleanSubQuery(Query query, Class<T> subqueryType, int i) {
         assertThat(query, instanceOf(BooleanQuery.class));
         BooleanQuery q = (BooleanQuery) query;
diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java
index f49cc3bd39e..33320586481 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java
@@ -20,7 +20,12 @@ package org.elasticsearch.test.hamcrest;
 
 import org.elasticsearch.search.SearchHit;
 import org.hamcrest.Description;
+import org.hamcrest.FeatureMatcher;
+import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeMatcher;
+import org.hamcrest.core.CombinableMatcher;
+
+import java.util.function.Function;
 
 public class ElasticsearchMatchers {
 
@@ -115,4 +120,27 @@ public class ElasticsearchMatchers {
             description.appendText("searchHit score should be ").appendValue(score);
         }
     }
+
+    public static class HasPropertyLambdaMatcher<T, V> extends FeatureMatcher<T, V> {
+
+        private final Function<? super T, ? extends V> property;
+
+        private HasPropertyLambdaMatcher(Matcher<? super V> subMatcher, Function<? super T, ? extends V> property) {
+            super(subMatcher, "object with", "lambda");
+            this.property = property;
+        }
+
+        @Override
+        protected V featureValueOf(T actual) {
+            return property.apply(actual);
+        }
+
+        /**
+         * @param valueMatcher The matcher to apply to the property
+         * @param property     The lambda to fetch property
+         */
+        public static <T, V> CombinableMatcher<T> hasProperty(Function<? super T, ? extends V> property, Matcher<V> valueMatcher) {
+            return new CombinableMatcher<>(new HasPropertyLambdaMatcher<>(valueMatcher, property));
+        }
+    }
 }