Fix failures in BulkProcessorIT#testGlobalParametersAndBulkProcessor. (#38129)

This PR fixes a couple test issues:
* It narrows an assertWarnings call that was too broad, and wasn't always
  applicable with certain random sequences.
* Previously, we could send a typeless bulk request containing '_type: 'null'.
  Now we omit the _type key altogether for typeless requests.
This commit is contained in:
Julie Tibshirani 2019-02-05 08:42:37 -08:00 committed by GitHub
parent c9701be1e8
commit 440d1eda8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 35 additions and 27 deletions

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.rest.action.document.RestBulkAction;
@ -75,12 +76,12 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
(request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT,
bulkListener), listener);
}
private static BulkProcessor.Builder initBulkProcessorBuilderUsingTypes(BulkProcessor.Listener listener) {
return BulkProcessor.builder(
(request, bulkListener) -> highLevelClient().bulkAsync(request, expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE),
bulkListener), listener);
}
}
public void testThatBulkProcessorCountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
@ -383,14 +384,14 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
.build()) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();
assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, localType);
Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(localType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
@ -398,7 +399,7 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
}
{
//Check that untyped document additions and untyped global inherit the established custom type
// (the custom document type introduced to the mapping by the earlier code in this test)
// (the custom document type introduced to the mapping by the earlier code in this test)
String globalType = null;
String localType = null;
final CountDownLatch latch = new CountDownLatch(1);
@ -414,20 +415,19 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
.build()) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();
assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, MapperService.SINGLE_MAPPING_NAME);
Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(customType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}
}
@SuppressWarnings("unchecked")
@ -438,8 +438,8 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
.<Matcher<SearchHit>>toArray(Matcher[]::new);
}
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
String globalIndex, String globalType, String globalPipeline) throws Exception {
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
String globalIndex, String globalType, String globalPipeline) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
@ -448,6 +448,11 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
} else {
BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON);
if (localType != null) {
// If the payload contains types, parsing it into a bulk request results in a warning.
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
}
}
multiGetRequest.add(localIndex, Integer.toString(i));
}
@ -455,26 +460,29 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
}
private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException {
String action = Strings.toString(jsonBuilder()
.startObject()
.startObject("index")
.field("_index", localIndex)
.field("_type", localType)
.field("_id", Integer.toString(id))
.endObject()
.endObject()
);
String source = Strings.toString(jsonBuilder()
XContentBuilder action = jsonBuilder().startObject().startObject("index");
if (localIndex != null) {
action.field("_index", localIndex);
}
if (localType != null) {
action.field("_type", localType);
}
action.field("_id", Integer.toString(id));
action.endObject().endObject();
XContentBuilder source = jsonBuilder()
.startObject()
.field("field", randomRealisticUnicodeOfLengthBetween(1, 30))
.endObject()
);
.endObject();
String request = action + "\n" + source + "\n";
String request = Strings.toString(action) + "\n" + Strings.toString(source) + "\n";
return new BytesArray(request);
}
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", null, null, null, null);
}