[Type removal] Remove type from BulkRequestParser (#3423)
* [Type removal] Remove type handling in bulk request parser Signed-off-by: Suraj Singh <surajrider@gmail.com> * [Type removal] Remove testTypesStillParsedForBulkMonitoring as it is no longer present in codebase Signed-off-by: Suraj Singh <surajrider@gmail.com>
This commit is contained in:
parent
a023ad9cba
commit
eb847aeeef
|
@ -287,7 +287,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
|
||||||
String routing = valueOrDefault(defaultRouting, globalRouting);
|
String routing = valueOrDefault(defaultRouting, globalRouting);
|
||||||
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
|
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
|
||||||
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
|
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
|
||||||
new BulkRequestParser(true).parse(
|
new BulkRequestParser().parse(
|
||||||
data,
|
data,
|
||||||
defaultIndex,
|
defaultIndex,
|
||||||
routing,
|
routing,
|
||||||
|
@ -296,7 +296,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
|
||||||
requireAlias,
|
requireAlias,
|
||||||
allowExplicitIndex,
|
allowExplicitIndex,
|
||||||
xContentType,
|
xContentType,
|
||||||
(indexRequest, type) -> internalAdd(indexRequest),
|
this::internalAdd,
|
||||||
this::internalAdd,
|
this::internalAdd,
|
||||||
this::add
|
this::add
|
||||||
);
|
);
|
||||||
|
|
|
@ -53,7 +53,6 @@ import org.opensearch.search.fetch.subphase.FetchSourceContext;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.BiConsumer;
|
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
@ -67,7 +66,6 @@ import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM
|
||||||
public final class BulkRequestParser {
|
public final class BulkRequestParser {
|
||||||
|
|
||||||
private static final ParseField INDEX = new ParseField("_index");
|
private static final ParseField INDEX = new ParseField("_index");
|
||||||
private static final ParseField TYPE = new ParseField("_type");
|
|
||||||
private static final ParseField ID = new ParseField("_id");
|
private static final ParseField ID = new ParseField("_id");
|
||||||
private static final ParseField ROUTING = new ParseField("routing");
|
private static final ParseField ROUTING = new ParseField("routing");
|
||||||
private static final ParseField OP_TYPE = new ParseField("op_type");
|
private static final ParseField OP_TYPE = new ParseField("op_type");
|
||||||
|
@ -80,17 +78,6 @@ public final class BulkRequestParser {
|
||||||
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
|
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
|
||||||
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);
|
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);
|
||||||
|
|
||||||
// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
|
|
||||||
private final boolean errorOnType;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new parser.
|
|
||||||
* @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring
|
|
||||||
*/
|
|
||||||
public BulkRequestParser(boolean errorOnType) {
|
|
||||||
this.errorOnType = errorOnType;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static int findNextMarker(byte marker, int from, BytesReference data) {
|
private static int findNextMarker(byte marker, int from, BytesReference data) {
|
||||||
final int res = data.indexOf(marker, from);
|
final int res = data.indexOf(marker, from);
|
||||||
if (res != -1) {
|
if (res != -1) {
|
||||||
|
@ -136,7 +123,7 @@ public final class BulkRequestParser {
|
||||||
@Nullable Boolean defaultRequireAlias,
|
@Nullable Boolean defaultRequireAlias,
|
||||||
boolean allowExplicitIndex,
|
boolean allowExplicitIndex,
|
||||||
XContentType xContentType,
|
XContentType xContentType,
|
||||||
BiConsumer<IndexRequest, String> indexRequestConsumer,
|
Consumer<IndexRequest> indexRequestConsumer,
|
||||||
Consumer<UpdateRequest> updateRequestConsumer,
|
Consumer<UpdateRequest> updateRequestConsumer,
|
||||||
Consumer<DeleteRequest> deleteRequestConsumer
|
Consumer<DeleteRequest> deleteRequestConsumer
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
|
@ -192,7 +179,6 @@ public final class BulkRequestParser {
|
||||||
String action = parser.currentName();
|
String action = parser.currentName();
|
||||||
|
|
||||||
String index = defaultIndex;
|
String index = defaultIndex;
|
||||||
String type = null;
|
|
||||||
String id = null;
|
String id = null;
|
||||||
String routing = defaultRouting;
|
String routing = defaultRouting;
|
||||||
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
|
FetchSourceContext fetchSourceContext = defaultFetchSourceContext;
|
||||||
|
@ -205,7 +191,7 @@ public final class BulkRequestParser {
|
||||||
String pipeline = defaultPipeline;
|
String pipeline = defaultPipeline;
|
||||||
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
|
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
|
||||||
|
|
||||||
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
|
// at this stage, next token can either be END_OBJECT (and use default index with auto generated id)
|
||||||
// or START_OBJECT which will have another set of parameters
|
// or START_OBJECT which will have another set of parameters
|
||||||
token = parser.nextToken();
|
token = parser.nextToken();
|
||||||
|
|
||||||
|
@ -220,13 +206,6 @@ public final class BulkRequestParser {
|
||||||
throw new IllegalArgumentException("explicit index in bulk is not allowed");
|
throw new IllegalArgumentException("explicit index in bulk is not allowed");
|
||||||
}
|
}
|
||||||
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
|
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
|
||||||
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
|
|
||||||
if (errorOnType) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
|
|
||||||
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
|
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||||
id = parser.text();
|
id = parser.text();
|
||||||
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
|
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||||
|
@ -322,8 +301,7 @@ public final class BulkRequestParser {
|
||||||
.setIfSeqNo(ifSeqNo)
|
.setIfSeqNo(ifSeqNo)
|
||||||
.setIfPrimaryTerm(ifPrimaryTerm)
|
.setIfPrimaryTerm(ifPrimaryTerm)
|
||||||
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
|
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
|
||||||
.setRequireAlias(requireAlias),
|
.setRequireAlias(requireAlias)
|
||||||
type
|
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
indexRequestConsumer.accept(
|
indexRequestConsumer.accept(
|
||||||
|
@ -336,8 +314,7 @@ public final class BulkRequestParser {
|
||||||
.setIfSeqNo(ifSeqNo)
|
.setIfSeqNo(ifSeqNo)
|
||||||
.setIfPrimaryTerm(ifPrimaryTerm)
|
.setIfPrimaryTerm(ifPrimaryTerm)
|
||||||
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
|
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
|
||||||
.setRequireAlias(requireAlias),
|
.setRequireAlias(requireAlias)
|
||||||
type
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
} else if ("create".equals(action)) {
|
} else if ("create".equals(action)) {
|
||||||
|
@ -351,8 +328,7 @@ public final class BulkRequestParser {
|
||||||
.setIfSeqNo(ifSeqNo)
|
.setIfSeqNo(ifSeqNo)
|
||||||
.setIfPrimaryTerm(ifPrimaryTerm)
|
.setIfPrimaryTerm(ifPrimaryTerm)
|
||||||
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
|
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
|
||||||
.setRequireAlias(requireAlias),
|
.setRequireAlias(requireAlias)
|
||||||
type
|
|
||||||
);
|
);
|
||||||
} else if ("update".equals(action)) {
|
} else if ("update".equals(action)) {
|
||||||
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
|
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
|
||||||
|
|
|
@ -47,9 +47,9 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
|
|
||||||
public void testIndexRequest() throws IOException {
|
public void testIndexRequest() throws IOException {
|
||||||
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}\n");
|
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}\n");
|
||||||
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
|
BulkRequestParser parser = new BulkRequestParser();
|
||||||
final AtomicBoolean parsed = new AtomicBoolean();
|
final AtomicBoolean parsed = new AtomicBoolean();
|
||||||
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
|
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, indexRequest -> {
|
||||||
assertFalse(parsed.get());
|
assertFalse(parsed.get());
|
||||||
assertEquals("foo", indexRequest.index());
|
assertEquals("foo", indexRequest.index());
|
||||||
assertEquals("bar", indexRequest.id());
|
assertEquals("bar", indexRequest.id());
|
||||||
|
@ -67,7 +67,7 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
XContentType.JSON,
|
XContentType.JSON,
|
||||||
(indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); },
|
indexRequest -> { assertTrue(indexRequest.isRequireAlias()); },
|
||||||
req -> fail(),
|
req -> fail(),
|
||||||
req -> fail()
|
req -> fail()
|
||||||
);
|
);
|
||||||
|
@ -82,7 +82,7 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
null,
|
null,
|
||||||
false,
|
false,
|
||||||
XContentType.JSON,
|
XContentType.JSON,
|
||||||
(indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); },
|
indexRequest -> { assertTrue(indexRequest.isRequireAlias()); },
|
||||||
req -> fail(),
|
req -> fail(),
|
||||||
req -> fail()
|
req -> fail()
|
||||||
);
|
);
|
||||||
|
@ -97,7 +97,7 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
XContentType.JSON,
|
XContentType.JSON,
|
||||||
(indexRequest, type) -> { assertFalse(indexRequest.isRequireAlias()); },
|
indexRequest -> { assertFalse(indexRequest.isRequireAlias()); },
|
||||||
req -> fail(),
|
req -> fail(),
|
||||||
req -> fail()
|
req -> fail()
|
||||||
);
|
);
|
||||||
|
@ -105,34 +105,22 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
|
|
||||||
public void testDeleteRequest() throws IOException {
|
public void testDeleteRequest() throws IOException {
|
||||||
BytesArray request = new BytesArray("{ \"delete\":{ \"_id\": \"bar\" } }\n");
|
BytesArray request = new BytesArray("{ \"delete\":{ \"_id\": \"bar\" } }\n");
|
||||||
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
|
BulkRequestParser parser = new BulkRequestParser();
|
||||||
final AtomicBoolean parsed = new AtomicBoolean();
|
final AtomicBoolean parsed = new AtomicBoolean();
|
||||||
parser.parse(
|
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), deleteRequest -> {
|
||||||
request,
|
assertFalse(parsed.get());
|
||||||
"foo",
|
assertEquals("foo", deleteRequest.index());
|
||||||
null,
|
assertEquals("bar", deleteRequest.id());
|
||||||
null,
|
parsed.set(true);
|
||||||
null,
|
});
|
||||||
null,
|
|
||||||
false,
|
|
||||||
XContentType.JSON,
|
|
||||||
(req, type) -> fail(),
|
|
||||||
req -> fail(),
|
|
||||||
deleteRequest -> {
|
|
||||||
assertFalse(parsed.get());
|
|
||||||
assertEquals("foo", deleteRequest.index());
|
|
||||||
assertEquals("bar", deleteRequest.id());
|
|
||||||
parsed.set(true);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
assertTrue(parsed.get());
|
assertTrue(parsed.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUpdateRequest() throws IOException {
|
public void testUpdateRequest() throws IOException {
|
||||||
BytesArray request = new BytesArray("{ \"update\":{ \"_id\": \"bar\" } }\n{}\n");
|
BytesArray request = new BytesArray("{ \"update\":{ \"_id\": \"bar\" } }\n{}\n");
|
||||||
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
|
BulkRequestParser parser = new BulkRequestParser();
|
||||||
final AtomicBoolean parsed = new AtomicBoolean();
|
final AtomicBoolean parsed = new AtomicBoolean();
|
||||||
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
|
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), updateRequest -> {
|
||||||
assertFalse(parsed.get());
|
assertFalse(parsed.get());
|
||||||
assertEquals("foo", updateRequest.index());
|
assertEquals("foo", updateRequest.index());
|
||||||
assertEquals("bar", updateRequest.id());
|
assertEquals("bar", updateRequest.id());
|
||||||
|
@ -150,7 +138,7 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
XContentType.JSON,
|
XContentType.JSON,
|
||||||
(req, type) -> fail(),
|
req -> fail(),
|
||||||
updateRequest -> { assertTrue(updateRequest.isRequireAlias()); },
|
updateRequest -> { assertTrue(updateRequest.isRequireAlias()); },
|
||||||
req -> fail()
|
req -> fail()
|
||||||
);
|
);
|
||||||
|
@ -165,7 +153,7 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
null,
|
null,
|
||||||
false,
|
false,
|
||||||
XContentType.JSON,
|
XContentType.JSON,
|
||||||
(req, type) -> fail(),
|
req -> fail(),
|
||||||
updateRequest -> { assertTrue(updateRequest.isRequireAlias()); },
|
updateRequest -> { assertTrue(updateRequest.isRequireAlias()); },
|
||||||
req -> fail()
|
req -> fail()
|
||||||
);
|
);
|
||||||
|
@ -180,7 +168,7 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
XContentType.JSON,
|
XContentType.JSON,
|
||||||
(req, type) -> fail(),
|
req -> fail(),
|
||||||
updateRequest -> { assertFalse(updateRequest.isRequireAlias()); },
|
updateRequest -> { assertFalse(updateRequest.isRequireAlias()); },
|
||||||
req -> fail()
|
req -> fail()
|
||||||
);
|
);
|
||||||
|
@ -188,7 +176,7 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
|
|
||||||
public void testBarfOnLackOfTrailingNewline() {
|
public void testBarfOnLackOfTrailingNewline() {
|
||||||
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}");
|
BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}");
|
||||||
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
|
BulkRequestParser parser = new BulkRequestParser();
|
||||||
IllegalArgumentException e = expectThrows(
|
IllegalArgumentException e = expectThrows(
|
||||||
IllegalArgumentException.class,
|
IllegalArgumentException.class,
|
||||||
() -> parser.parse(
|
() -> parser.parse(
|
||||||
|
@ -200,7 +188,7 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
null,
|
null,
|
||||||
false,
|
false,
|
||||||
XContentType.JSON,
|
XContentType.JSON,
|
||||||
(indexRequest, type) -> fail(),
|
indexRequest -> fail(),
|
||||||
req -> fail(),
|
req -> fail(),
|
||||||
req -> fail()
|
req -> fail()
|
||||||
)
|
)
|
||||||
|
@ -210,46 +198,21 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
|
|
||||||
public void testFailOnExplicitIndex() {
|
public void testFailOnExplicitIndex() {
|
||||||
BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n");
|
BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n");
|
||||||
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
|
BulkRequestParser parser = new BulkRequestParser();
|
||||||
|
|
||||||
IllegalArgumentException ex = expectThrows(
|
IllegalArgumentException ex = expectThrows(
|
||||||
IllegalArgumentException.class,
|
IllegalArgumentException.class,
|
||||||
() -> parser.parse(
|
() -> parser.parse(request, null, null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), req -> fail())
|
||||||
request,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
false,
|
|
||||||
XContentType.JSON,
|
|
||||||
(req, type) -> fail(),
|
|
||||||
req -> fail(),
|
|
||||||
req -> fail()
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
assertEquals("explicit index in bulk is not allowed", ex.getMessage());
|
assertEquals("explicit index in bulk is not allowed", ex.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testTypesStillParsedForBulkMonitoring() throws IOException {
|
|
||||||
BytesArray request = new BytesArray("{ \"index\":{ \"_type\": \"quux\", \"_id\": \"bar\" } }\n{}\n");
|
|
||||||
BulkRequestParser parser = new BulkRequestParser(false);
|
|
||||||
final AtomicBoolean parsed = new AtomicBoolean();
|
|
||||||
parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
|
|
||||||
assertFalse(parsed.get());
|
|
||||||
assertEquals("foo", indexRequest.index());
|
|
||||||
assertEquals("bar", indexRequest.id());
|
|
||||||
parsed.set(true);
|
|
||||||
}, req -> fail(), req -> fail());
|
|
||||||
assertTrue(parsed.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testParseDeduplicatesParameterStrings() throws IOException {
|
public void testParseDeduplicatesParameterStrings() throws IOException {
|
||||||
BytesArray request = new BytesArray(
|
BytesArray request = new BytesArray(
|
||||||
"{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\"} }\n{}\n"
|
"{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\"} }\n{}\n"
|
||||||
+ "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\" } }\n{}\n"
|
+ "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\" } }\n{}\n"
|
||||||
);
|
);
|
||||||
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
|
BulkRequestParser parser = new BulkRequestParser();
|
||||||
final List<IndexRequest> indexRequests = new ArrayList<>();
|
final List<IndexRequest> indexRequests = new ArrayList<>();
|
||||||
parser.parse(
|
parser.parse(
|
||||||
request,
|
request,
|
||||||
|
@ -260,7 +223,7 @@ public class BulkRequestParserTests extends OpenSearchTestCase {
|
||||||
null,
|
null,
|
||||||
true,
|
true,
|
||||||
XContentType.JSON,
|
XContentType.JSON,
|
||||||
(indexRequest, type) -> indexRequests.add(indexRequest),
|
indexRequest -> indexRequests.add(indexRequest),
|
||||||
req -> fail(),
|
req -> fail(),
|
||||||
req -> fail()
|
req -> fail()
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue