From b0564cb75c126d9b1229239f734bef2147e86555 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 27 Jul 2015 13:52:51 -0400 Subject: [PATCH 1/9] Add integration test for Azure snapshot repository bug This commit adds an integration test that replicates the Azure snapshot repository bug in elastic/elasticsearch-cloud-azure#51. Closes elastic/elasticsearch-cloud-azure#100 --- .../azure/AzureSnapshotRestoreITest.java | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/plugins/cloud-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreITest.java b/plugins/cloud-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreITest.java index debb6f41463..876d1202cb2 100644 --- a/plugins/cloud-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreITest.java +++ b/plugins/cloud-azure/src/test/java/org/elasticsearch/repositories/azure/AzureSnapshotRestoreITest.java @@ -173,6 +173,66 @@ public class AzureSnapshotRestoreITest extends AbstractAzureTest { assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); } + /** + * For issue #51: https://github.com/elasticsearch/elasticsearch-cloud-azure/issues/51 + */ + @Test + public void testMultipleSnapshots() throws URISyntaxException, StorageException { + final String indexName = "test-idx-1"; + final String typeName = "doc"; + final String repositoryName = "test-repo"; + final String snapshot1Name = "test-snap-1"; + final String snapshot2Name = "test-snap-2"; + + Client client = client(); + + logger.info("creating index [{}]", indexName); + createIndex(indexName); + ensureGreen(); + + logger.info("indexing first document"); + index(indexName, typeName, Integer.toString(1), "foo", "bar " + Integer.toString(1)); + refresh(); + assertThat(client.prepareCount(indexName).get().getCount(), equalTo(1L)); + + logger.info("creating Azure repository with path [{}]", getRepositoryPath()); + PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository(repositoryName) + .setType("azure").setSettings(Settings.settingsBuilder() + .put(Repository.CONTAINER, getContainerName()) + .put(Repository.BASE_PATH, getRepositoryPath()) + .put(Repository.BASE_PATH, randomIntBetween(1000, 10000), ByteSizeUnit.BYTES) + ).get(); + assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); + + logger.info("creating snapshot [{}]", snapshot1Name); + CreateSnapshotResponse createSnapshotResponse1 = client.admin().cluster().prepareCreateSnapshot(repositoryName, snapshot1Name).setWaitForCompletion(true).setIndices(indexName).get(); + assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse1.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse1.getSnapshotInfo().totalShards())); + + assertThat(client.admin().cluster().prepareGetSnapshots(repositoryName).setSnapshots(snapshot1Name).get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + + logger.info("indexing second document"); + index(indexName, typeName, Integer.toString(2), "foo", "bar " + Integer.toString(2)); + refresh(); + assertThat(client.prepareCount(indexName).get().getCount(), equalTo(2L)); + + logger.info("creating snapshot [{}]", snapshot2Name); + CreateSnapshotResponse createSnapshotResponse2 = client.admin().cluster().prepareCreateSnapshot(repositoryName, snapshot2Name).setWaitForCompletion(true).setIndices(indexName).get(); + assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards())); + + assertThat(client.admin().cluster().prepareGetSnapshots(repositoryName).setSnapshots(snapshot2Name).get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + + logger.info("closing index [{}]", indexName); + client.admin().indices().prepareClose(indexName).get(); + + logger.info("attempting restore from snapshot [{}]", snapshot1Name); + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot(repositoryName, snapshot1Name).setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + ensureGreen(); + assertThat(client.prepareCount(indexName).get().getCount(), equalTo(1L)); + } + @Test public void testMultipleRepositories() { Client client = client(); From d7491515b21fb4b3c94956c75bcb74b8a5c863ae Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 27 Jul 2015 15:36:34 -0400 Subject: [PATCH 2/9] _all: Stop NPE querying _all when it doesn't exist This can happen in two ways: 1. The _all field is disabled. 2. There are documents in the index, the _all field is enabled, but there are no fields in any of the documents. In both of these cases we now rewrite the query to a MatchNoDocsQuery which should be safe because there isn't anything to match. Closes #12439 --- .../common/lucene/all/AllTermQuery.java | 14 +++++++++++--- .../search/query/SearchQueryTests.java | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/lucene/all/AllTermQuery.java b/core/src/main/java/org/elasticsearch/common/lucene/all/AllTermQuery.java index f586d152524..0aa7bf0f05c 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/all/AllTermQuery.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/all/AllTermQuery.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.PostingsEnum; import org.apache.lucene.index.Term; import org.apache.lucene.index.Terms; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.payloads.AveragePayloadFunction; @@ -124,14 +125,21 @@ public final class AllTermQuery extends PayloadTermQuery { @Override public Query rewrite(IndexReader reader) throws IOException { + boolean fieldExists = false; boolean hasPayloads = false; for (LeafReaderContext context : reader.leaves()) { final Terms terms = context.reader().terms(term.field()); - if (terms.hasPayloads()) { - hasPayloads = true; - break; + if (terms != null) { + fieldExists = true; + if (terms.hasPayloads()) { + hasPayloads = true; + break; + } } } + if (fieldExists == false) { + return new MatchNoDocsQuery(); + } if (hasPayloads == false) { TermQuery rewritten = new TermQuery(term); rewritten.setBoost(getBoost()); diff --git a/core/src/test/java/org/elasticsearch/search/query/SearchQueryTests.java b/core/src/test/java/org/elasticsearch/search/query/SearchQueryTests.java index bbd2ebc30f3..c539db49a3b 100644 --- a/core/src/test/java/org/elasticsearch/search/query/SearchQueryTests.java +++ b/core/src/test/java/org/elasticsearch/search/query/SearchQueryTests.java @@ -1867,6 +1867,24 @@ public class SearchQueryTests extends ElasticsearchIntegrationTest { assertHitCount(searchResponse, 1l); } + @Test + public void testAllFieldEmptyMapping() throws Exception { + client().prepareIndex("myindex", "mytype").setId("1").setSource("{}").setRefresh(true).get(); + SearchResponse response = client().prepareSearch("myindex").setQuery(matchQuery("_all", "foo")).get(); + assertNoFailures(response); + } + + @Test + public void testAllDisabledButQueried() throws Exception { + createIndex("myindex"); + assertAcked(client().admin().indices().preparePutMapping("myindex").setType("mytype").setSource( + jsonBuilder().startObject().startObject("mytype").startObject("_all").field("enabled", false))); + client().prepareIndex("myindex", "mytype").setId("1").setSource("bar", "foo").setRefresh(true).get(); + SearchResponse response = client().prepareSearch("myindex").setQuery(matchQuery("_all", "foo")).get(); + assertNoFailures(response); + assertHitCount(response, 0); + } + @Test public void testIndicesQuery() throws Exception { createIndex("index1", "index2", "index3"); From 2ebf229632a1f58690fa5e24383331ff93be35c1 Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Tue, 28 Jul 2015 14:07:06 +0100 Subject: [PATCH 3/9] Aggregations: Fixes serialization of HDRHistogram in percentiles aggregations Previously we would write the entire ByteBuffer to the stream to serialise the HDRHistogram even if it was not all needed. Now we only write the bytes that are actually written to in the ByteBuffer. --- .../hdr/AbstractInternalHDRPercentiles.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/hdr/AbstractInternalHDRPercentiles.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/hdr/AbstractInternalHDRPercentiles.java index 0549774ec5a..ddc0b0f92ad 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/hdr/AbstractInternalHDRPercentiles.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/hdr/AbstractInternalHDRPercentiles.java @@ -89,7 +89,10 @@ abstract class AbstractInternalHDRPercentiles extends InternalNumericMetricsAggr keys[i] = in.readDouble(); } long minBarForHighestToLowestValueRatio = in.readLong(); - ByteBuffer stateBuffer = ByteBuffer.wrap(in.readByteArray()); + final int serializedLen = in.readVInt(); + byte[] bytes = new byte[serializedLen]; + in.readBytes(bytes, 0, serializedLen); + ByteBuffer stateBuffer = ByteBuffer.wrap(bytes); try { state = DoubleHistogram.decodeFromCompressedByteBuffer(stateBuffer, minBarForHighestToLowestValueRatio); } catch (DataFormatException e) { @@ -107,8 +110,9 @@ abstract class AbstractInternalHDRPercentiles extends InternalNumericMetricsAggr } out.writeLong(state.getHighestToLowestValueRatio()); ByteBuffer stateBuffer = ByteBuffer.allocate(state.getNeededByteBufferCapacity()); - state.encodeIntoCompressedByteBuffer(stateBuffer); - out.writeByteArray(stateBuffer.array()); + final int serializedLen = state.encodeIntoCompressedByteBuffer(stateBuffer); + out.writeVInt(serializedLen); + out.writeBytes(stateBuffer.array(), 0, serializedLen); out.writeBoolean(keyed); } From adcd1fc11da5b3ac00d7a6c6c6a0b83fb461e683 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Tue, 28 Jul 2015 09:25:06 -0400 Subject: [PATCH 4/9] _all: Add missing boost When we rewrite to a MatchNoTermsQuery we were throwing out the boost which could could lead to funky changes when the query against _all was in a bool query. --- .../org/elasticsearch/common/lucene/all/AllTermQuery.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/common/lucene/all/AllTermQuery.java b/core/src/main/java/org/elasticsearch/common/lucene/all/AllTermQuery.java index 0aa7bf0f05c..a28d635a04e 100644 --- a/core/src/main/java/org/elasticsearch/common/lucene/all/AllTermQuery.java +++ b/core/src/main/java/org/elasticsearch/common/lucene/all/AllTermQuery.java @@ -138,7 +138,9 @@ public final class AllTermQuery extends PayloadTermQuery { } } if (fieldExists == false) { - return new MatchNoDocsQuery(); + Query rewritten = new MatchNoDocsQuery(); + rewritten.setBoost(getBoost()); + return rewritten; } if (hasPayloads == false) { TermQuery rewritten = new TermQuery(term); From af17b9cd40cf610b71c6e67c72a5f8e96a67fdac Mon Sep 17 00:00:00 2001 From: Dave Parfitt Date: Tue, 28 Jul 2015 09:43:26 -0400 Subject: [PATCH 5/9] Use underscores for date formats @clintongormley Closes #12429 --- .../index/mapper/core/DateFieldMapper.java | 4 ++-- .../index/mapper/date/SimpleDateMappingTests.java | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/mapper/core/DateFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/core/DateFieldMapper.java index c14b6b2ae11..9e24b50cc04 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/core/DateFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/core/DateFieldMapper.java @@ -72,8 +72,8 @@ public class DateFieldMapper extends NumberFieldMapper { public static final String CONTENT_TYPE = "date"; public static class Defaults extends NumberFieldMapper.Defaults { - public static final FormatDateTimeFormatter DATE_TIME_FORMATTER = Joda.forPattern("strictDateOptionalTime||epoch_millis", Locale.ROOT); - public static final FormatDateTimeFormatter DATE_TIME_FORMATTER_BEFORE_2_0 = Joda.forPattern("dateOptionalTime", Locale.ROOT); + public static final FormatDateTimeFormatter DATE_TIME_FORMATTER = Joda.forPattern("strict_date_optional_time||epoch_millis", Locale.ROOT); + public static final FormatDateTimeFormatter DATE_TIME_FORMATTER_BEFORE_2_0 = Joda.forPattern("date_optional_time", Locale.ROOT); public static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS; public static final DateFieldType FIELD_TYPE = new DateFieldType(); diff --git a/core/src/test/java/org/elasticsearch/index/mapper/date/SimpleDateMappingTests.java b/core/src/test/java/org/elasticsearch/index/mapper/date/SimpleDateMappingTests.java index 8db412ffc83..ebc638b33f1 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/date/SimpleDateMappingTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/date/SimpleDateMappingTests.java @@ -495,7 +495,7 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { Version randomVersion = VersionUtils.randomVersionBetween(getRandom(), Version.V_0_90_0, Version.V_1_6_1); IndexService index = createIndex("test", settingsBuilder().put(IndexMetaData.SETTING_VERSION_CREATED, randomVersion).build()); client().admin().indices().preparePutMapping("test").setType("type").setSource(mapping).get(); - assertDateFormat("epoch_millis||dateOptionalTime"); + assertDateFormat("epoch_millis||date_optional_time"); DocumentMapper defaultMapper = index.mapperService().documentMapper("type"); defaultMapper.parse("test", "type", "1", XContentFactory.jsonBuilder() @@ -543,13 +543,13 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { public void testThatUpgradingAnOlderIndexToStrictDateWorks() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") - .startObject("properties").startObject("date_field").field("type", "date").field("format", "dateOptionalTime").endObject().endObject() + .startObject("properties").startObject("date_field").field("type", "date").field("format", "date_optional_time").endObject().endObject() .endObject().endObject().string(); Version randomVersion = VersionUtils.randomVersionBetween(getRandom(), Version.V_0_90_0, Version.V_1_6_1); createIndex("test", settingsBuilder().put(IndexMetaData.SETTING_VERSION_CREATED, randomVersion).build()); client().admin().indices().preparePutMapping("test").setType("type").setSource(mapping).get(); - assertDateFormat("epoch_millis||dateOptionalTime"); + assertDateFormat("epoch_millis||date_optional_time"); // index doc client().prepareIndex("test", "type", "1").setSource(XContentFactory.jsonBuilder() @@ -561,12 +561,12 @@ public class SimpleDateMappingTests extends ElasticsearchSingleNodeTest { String newMapping = XContentFactory.jsonBuilder().startObject().startObject("type") .startObject("properties").startObject("date_field") .field("type", "date") - .field("format", "strictDateOptionalTime||epoch_millis") + .field("format", "strict_date_optional_time||epoch_millis") .endObject().endObject().endObject().endObject().string(); PutMappingResponse putMappingResponse = client().admin().indices().preparePutMapping("test").setType("type").setSource(newMapping).get(); assertThat(putMappingResponse.isAcknowledged(), is(true)); - assertDateFormat("strictDateOptionalTime||epoch_millis"); + assertDateFormat("strict_date_optional_time||epoch_millis"); } private void assertDateFormat(String expectedFormat) throws IOException { From 3f6e470810f558d1bf70894c35c81b42957d933f Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 28 Jul 2015 21:38:58 +0200 Subject: [PATCH 6/9] top_hits: If topN (based on `offset` + `size`) is higher than the maxDoc of an shard then normalize topN to maxDoc. Closes #12510 --- .../metrics/tophits/TopHitsAggregator.java | 3 +++ .../aggregations/bucket/TopHitsTests.java | 17 +++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java index 8433022b6c6..82dea4806a1 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java @@ -117,6 +117,9 @@ public class TopHitsAggregator extends MetricsAggregator { if (collectors == null) { Sort sort = subSearchContext.sort(); int topN = subSearchContext.from() + subSearchContext.size(); + // In the QueryPhase we don't need this protection, because it is build into the IndexSearcher, + // but here we create collectors ourselves and we need prevent OOM because of crazy an offset and size. + topN = Math.min(topN, subSearchContext.searcher().getIndexReader().maxDoc()); TopDocsCollector topLevelCollector = sort != null ? TopFieldCollector.create(sort, topN, true, subSearchContext.trackScores(), subSearchContext.trackScores()) : TopScoreDocCollector.create(topN); collectors = new TopDocsAndLeafCollector(topLevelCollector); collectors.leafCollector = collectors.topLevelCollector.getLeafCollector(ctx); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java index 1f0a47522cd..f3ddb1ff284 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TopHitsTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.bucket; import org.apache.lucene.search.Explanation; +import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; @@ -928,4 +929,20 @@ public class TopHitsTests extends ElasticsearchIntegrationTest { } } } + + @Test + public void testDontExplode() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .setTypes("type") + .addAggregation(terms("terms") + .executionHint(randomExecutionHint()) + .field(TERMS_AGGS_FIELD) + .subAggregation( + topHits("hits").setSize(ArrayUtil.MAX_ARRAY_LENGTH - 1).addSort(SortBuilders.fieldSort(SORT_FIELD).order(SortOrder.DESC)) + ) + ) + .get(); + assertNoFailures(response); + } } From 37177d2f4c4fe3f8dbf4b53cf249931127281e64 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 28 Jul 2015 15:44:55 -0400 Subject: [PATCH 7/9] Use consistent check whether or not timeout is set --- .../org/elasticsearch/search/internal/ContextIndexSearcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index 2144d13a123..dd7489cc7a5 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -176,7 +176,7 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { @Override public void search(List leaves, Weight weight, Collector collector) throws IOException { - final boolean timeoutSet = searchContext.timeoutInMillis() != -1; + final boolean timeoutSet = searchContext.timeoutInMillis() != SearchService.NO_TIMEOUT.millis(); final boolean terminateAfterSet = searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER; try { if (timeoutSet || terminateAfterSet) { From 2dc4550839f44fc93c6006ca361f13b33a7eb1d9 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 27 Jul 2015 11:52:51 -0600 Subject: [PATCH 8/9] Add more debugging information to the Awareness Decider Relates to #12431 --- .../allocation/decider/AwarenessAllocationDecider.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java index a5d8e31e96c..218b58f8241 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AwarenessAllocationDecider.java @@ -234,7 +234,9 @@ public class AwarenessAllocationDecider extends AllocationDecider { int currentNodeCount = shardPerAttribute.get(node.node().attributes().get(awarenessAttribute)); // if we are above with leftover, then we know we are not good, even with mod if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) { - return allocation.decision(Decision.NO, NAME, "too many shards on nodes for attribute: [%s]", awarenessAttribute); + return allocation.decision(Decision.NO, NAME, + "too many shards on node for attribute: [%s], required per attribute: [%d], node count: [%d], leftover: [%d]", + awarenessAttribute, requiredCountPerAttribute, currentNodeCount, leftoverPerAttribute); } // all is well, we are below or same as average if (currentNodeCount <= requiredCountPerAttribute) { From 62c4abd14c4bfea709eeb5bc943af127d69b552e Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 28 Jul 2015 14:19:45 -0600 Subject: [PATCH 9/9] Added an import statement. --- docs/java-api/docs/bulk.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/java-api/docs/bulk.asciidoc b/docs/java-api/docs/bulk.asciidoc index d615ba95cfe..6890f7c49d3 100644 --- a/docs/java-api/docs/bulk.asciidoc +++ b/docs/java-api/docs/bulk.asciidoc @@ -50,6 +50,7 @@ To use it, first create a `BulkProcessor` instance: import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, <1>