REST Bulk API: Allow to execute _bulk against /{index}/_bulk and /{index}/{type}/_bulk endpoints, closes #1375.
This commit is contained in:
parent
3d4c31de91
commit
63844ddd43
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.WriteConsistencyLevel;
|
||||||
import org.elasticsearch.action.delete.DeleteRequest;
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.support.replication.ReplicationType;
|
import org.elasticsearch.action.support.replication.ReplicationType;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.collect.Lists;
|
import org.elasticsearch.common.collect.Lists;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
@ -82,6 +83,13 @@ public class BulkRequest implements ActionRequest {
|
||||||
* Adds a framed data in binary format
|
* Adds a framed data in binary format
|
||||||
*/
|
*/
|
||||||
public BulkRequest add(byte[] data, int from, int length, boolean contentUnsafe) throws Exception {
|
public BulkRequest add(byte[] data, int from, int length, boolean contentUnsafe) throws Exception {
|
||||||
|
return add(data, from, length, contentUnsafe, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a framed data in binary format
|
||||||
|
*/
|
||||||
|
public BulkRequest add(byte[] data, int from, int length, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
|
||||||
XContent xContent = XContentFactory.xContent(data, from, length);
|
XContent xContent = XContentFactory.xContent(data, from, length);
|
||||||
byte marker = xContent.streamSeparator();
|
byte marker = xContent.streamSeparator();
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -105,12 +113,9 @@ public class BulkRequest implements ActionRequest {
|
||||||
token = parser.nextToken();
|
token = parser.nextToken();
|
||||||
assert token == XContentParser.Token.FIELD_NAME;
|
assert token == XContentParser.Token.FIELD_NAME;
|
||||||
String action = parser.currentName();
|
String action = parser.currentName();
|
||||||
// Move to START_OBJECT
|
|
||||||
token = parser.nextToken();
|
|
||||||
assert token == XContentParser.Token.START_OBJECT;
|
|
||||||
|
|
||||||
String index = null;
|
String index = defaultIndex;
|
||||||
String type = null;
|
String type = defaultType;
|
||||||
String id = null;
|
String id = null;
|
||||||
String routing = null;
|
String routing = null;
|
||||||
String parent = null;
|
String parent = null;
|
||||||
|
@ -121,6 +126,9 @@ public class BulkRequest implements ActionRequest {
|
||||||
VersionType versionType = VersionType.INTERNAL;
|
VersionType versionType = VersionType.INTERNAL;
|
||||||
String percolate = null;
|
String percolate = null;
|
||||||
|
|
||||||
|
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
|
||||||
|
// or START_OBJECT which will have another set of parameters
|
||||||
|
|
||||||
String currentFieldName = null;
|
String currentFieldName = null;
|
||||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||||
if (token == XContentParser.Token.FIELD_NAME) {
|
if (token == XContentParser.Token.FIELD_NAME) {
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.client.action.delete.DeleteRequestBuilder;
|
import org.elasticsearch.client.action.delete.DeleteRequestBuilder;
|
||||||
import org.elasticsearch.client.action.index.IndexRequestBuilder;
|
import org.elasticsearch.client.action.index.IndexRequestBuilder;
|
||||||
import org.elasticsearch.client.action.support.BaseRequestBuilder;
|
import org.elasticsearch.client.action.support.BaseRequestBuilder;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A bulk request holds an ordered {@link IndexRequest}s and {@link DeleteRequest}s and allows to executes
|
* A bulk request holds an ordered {@link IndexRequest}s and {@link DeleteRequest}s and allows to executes
|
||||||
|
@ -81,7 +82,15 @@ public class BulkRequestBuilder extends BaseRequestBuilder<BulkRequest, BulkResp
|
||||||
* Adds a framed data in binary format
|
* Adds a framed data in binary format
|
||||||
*/
|
*/
|
||||||
public BulkRequestBuilder add(byte[] data, int from, int length, boolean contentUnsafe) throws Exception {
|
public BulkRequestBuilder add(byte[] data, int from, int length, boolean contentUnsafe) throws Exception {
|
||||||
request.add(data, from, length, contentUnsafe);
|
request.add(data, from, length, contentUnsafe, null, null);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a framed data in binary format
|
||||||
|
*/
|
||||||
|
public BulkRequestBuilder add(byte[] data, int from, int length, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
|
||||||
|
request.add(data, from, length, contentUnsafe, defaultIndex, defaultType);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,12 @@ import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||||
import org.elasticsearch.rest.*;
|
import org.elasticsearch.rest.BaseRestHandler;
|
||||||
|
import org.elasticsearch.rest.RestChannel;
|
||||||
|
import org.elasticsearch.rest.RestController;
|
||||||
|
import org.elasticsearch.rest.RestRequest;
|
||||||
|
import org.elasticsearch.rest.XContentRestResponse;
|
||||||
|
import org.elasticsearch.rest.XContentThrowableRestResponse;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -58,10 +63,16 @@ public class RestBulkAction extends BaseRestHandler {
|
||||||
|
|
||||||
controller.registerHandler(POST, "/_bulk", this);
|
controller.registerHandler(POST, "/_bulk", this);
|
||||||
controller.registerHandler(PUT, "/_bulk", this);
|
controller.registerHandler(PUT, "/_bulk", this);
|
||||||
|
controller.registerHandler(POST, "/{index}/_bulk", this);
|
||||||
|
controller.registerHandler(PUT, "/{index}/_bulk", this);
|
||||||
|
controller.registerHandler(POST, "/{index}/{type}/_bulk", this);
|
||||||
|
controller.registerHandler(PUT, "/{index}/{type}/_bulk", this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
|
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
|
||||||
BulkRequest bulkRequest = Requests.bulkRequest();
|
BulkRequest bulkRequest = Requests.bulkRequest();
|
||||||
|
String defaultIndex = request.param("index");
|
||||||
|
String defaultType = request.param("type");
|
||||||
|
|
||||||
String replicationType = request.param("replication");
|
String replicationType = request.param("replication");
|
||||||
if (replicationType != null) {
|
if (replicationType != null) {
|
||||||
|
@ -73,7 +84,7 @@ public class RestBulkAction extends BaseRestHandler {
|
||||||
}
|
}
|
||||||
bulkRequest.refresh(request.paramAsBoolean("refresh", bulkRequest.refresh()));
|
bulkRequest.refresh(request.paramAsBoolean("refresh", bulkRequest.refresh()));
|
||||||
try {
|
try {
|
||||||
bulkRequest.add(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe());
|
bulkRequest.add(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength(), request.contentUnsafe(), defaultIndex, defaultType);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
try {
|
try {
|
||||||
XContentBuilder builder = restContentBuilder(request);
|
XContentBuilder builder = restContentBuilder(request);
|
||||||
|
|
|
@ -26,10 +26,18 @@ import static org.hamcrest.MatcherAssert.*;
|
||||||
import static org.hamcrest.Matchers.*;
|
import static org.hamcrest.Matchers.*;
|
||||||
|
|
||||||
public class BulkActionTests {
|
public class BulkActionTests {
|
||||||
@Test public void testSimpleBulk() throws Exception {
|
|
||||||
|
@Test public void testSimpleBulk1() throws Exception {
|
||||||
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk.json");
|
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk.json");
|
||||||
BulkRequest bulkRequest = new BulkRequest();
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
bulkRequest.add(bulkAction.getBytes(), 0, bulkAction.length(), true);
|
bulkRequest.add(bulkAction.getBytes(), 0, bulkAction.length(), true, null, null);
|
||||||
|
assertThat(bulkRequest.numberOfActions(), equalTo(3));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testSimpleBulk2() throws Exception {
|
||||||
|
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/action/bulk/simple-bulk2.json");
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
bulkRequest.add(bulkAction.getBytes(), 0, bulkAction.length(), true, null, null);
|
||||||
assertThat(bulkRequest.numberOfActions(), equalTo(3));
|
assertThat(bulkRequest.numberOfActions(), equalTo(3));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
{ "index" : { } }
|
||||||
|
{ "field1" : "value1" }
|
||||||
|
{ "delete" : { "_id" : "2" } }
|
||||||
|
{ "create" : { "_id" : "3" } }
|
||||||
|
{ "field1" : "value3" }
|
Loading…
Reference in New Issue