diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java index 338b074a874..d2516d52923 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java @@ -106,7 +106,7 @@ public class CountRequest extends BroadcastOperationRequest { @Override protected void beforeLocalFork() { if (querySourceUnsafe) { - querySource = Arrays.copyOfRange(querySource, querySourceOffset, querySourceLength); + querySource = Arrays.copyOfRange(querySource, querySourceOffset, querySourceOffset + querySourceLength); querySourceOffset = 0; querySourceUnsafe = false; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryRequest.java index 0176d657e9d..156232544fd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/DeleteByQueryRequest.java @@ -107,7 +107,7 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest { */ byte[] querySource() { if (querySourceUnsafe || querySourceOffset > 0) { - querySource = Arrays.copyOfRange(querySource, querySourceOffset, querySourceLength); + querySource = Arrays.copyOfRange(querySource, querySourceOffset, querySourceOffset + querySourceLength); querySourceOffset = 0; querySourceUnsafe = false; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 17e5ff0d6bc..ec01a480603 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -222,7 +222,7 @@ public class IndexRequest extends ShardReplicationOperationRequest { */ public byte[] source() { if (sourceUnsafe || sourceOffset > 0) { - source = Arrays.copyOfRange(source, sourceOffset, sourceLength); + source = Arrays.copyOfRange(source, sourceOffset, sourceOffset + sourceLength); sourceOffset = 0; sourceUnsafe = false; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/MoreLikeThisRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/MoreLikeThisRequest.java index e3bb532690d..6a7ee434d03 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/MoreLikeThisRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/MoreLikeThisRequest.java @@ -312,7 +312,7 @@ public class MoreLikeThisRequest implements ActionRequest { void beforeLocalFork() { if (searchSourceUnsafe) { - searchSource = Arrays.copyOfRange(searchSource, searchSourceOffset, searchSourceLength); + searchSource = Arrays.copyOfRange(searchSource, searchSourceOffset, searchSourceOffset + searchSourceLength); searchSourceOffset = 0; searchSourceUnsafe = false; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java index d0b8b6ce3ac..a5f6953f96d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -121,12 +121,12 @@ public class SearchRequest implements ActionRequest { */ public void beforeLocalFork() { if (source != null && sourceUnsafe) { - source = Arrays.copyOfRange(source, sourceOffset, sourceLength); + source = Arrays.copyOfRange(source, sourceOffset, sourceOffset + sourceLength); sourceOffset = 0; sourceUnsafe = false; } if (extraSource != null && extraSourceUnsafe) { - extraSource = Arrays.copyOfRange(extraSource, extraSourceOffset, extraSourceLength); + extraSource = Arrays.copyOfRange(extraSource, extraSourceOffset, extraSourceOffset + extraSourceLength); extraSourceOffset = 0; extraSourceUnsafe = false; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContent.java index e36666b2148..305b55d6f75 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContent.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/XContent.java @@ -33,6 +33,8 @@ public interface XContent { */ XContentType type(); + byte streamSeparator(); + /** * Creates a new generator using the provided output stream. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContent.java index 9adc8c70666..350695c49ae 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContent.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContent.java @@ -60,6 +60,10 @@ public class JsonXContent implements XContent { return XContentType.JSON; } + @Override public byte streamSeparator() { + return '\n'; + } + @Override public XContentGenerator createGenerator(OutputStream os) throws IOException { return new JsonXContentGenerator(jsonFactory.createJsonGenerator(os, JsonEncoding.UTF8)); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContent.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContent.java index eefd0cc973b..13d9d7fd95c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContent.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/xcontent/smile/SmileXContent.java @@ -59,6 +59,10 @@ public class SmileXContent implements XContent { return XContentType.SMILE; } + @Override public byte streamSeparator() { + return (byte) 0xFF; + } + @Override public XContentGenerator createGenerator(OutputStream os) throws IOException { return new SmileXContentGenerator(smileFactory.createJsonGenerator(os, JsonEncoding.UTF8)); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index be42e6fbd63..7ca124d0f02 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -41,6 +41,7 @@ import org.elasticsearch.rest.action.admin.indices.optimize.RestOptimizeAction; import org.elasticsearch.rest.action.admin.indices.refresh.RestRefreshAction; import org.elasticsearch.rest.action.admin.indices.settings.RestUpdateSettingsAction; import org.elasticsearch.rest.action.admin.indices.status.RestIndicesStatusAction; +import org.elasticsearch.rest.action.bulk.RestBulkAction; import org.elasticsearch.rest.action.count.RestCountAction; import org.elasticsearch.rest.action.delete.RestDeleteAction; import org.elasticsearch.rest.action.deletebyquery.RestDeleteByQueryAction; @@ -95,6 +96,7 @@ public class RestActionModule extends AbstractModule { bind(RestDeleteByQueryAction.class).asEagerSingleton(); bind(RestCountAction.class).asEagerSingleton(); + bind(RestBulkAction.class).asEagerSingleton(); bind(RestSearchAction.class).asEagerSingleton(); bind(RestSearchScrollAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java new file mode 100644 index 00000000000..18002958138 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java @@ -0,0 +1,185 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.bulk; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.*; + +import java.io.IOException; + +import static org.elasticsearch.rest.RestRequest.Method.*; +import static org.elasticsearch.rest.RestResponse.Status.*; +import static org.elasticsearch.rest.action.support.RestXContentBuilder.*; + +/** + *
+ * { "index" : { "index" : "test", "type" : "type1", "id" : "1" }
+ * { "type1" : { "field1" : "value1" } }
+ * { "delete" : { "index" : "test", "type" : "type1", "id" : "2" } }
+ * { "create" : { "index" : "test", "type" : "type1", "id" : "1" }
+ * { "type1" : { "field1" : "value1" } }
+ * 
+ * + * @author kimchy (shay.banon) + */ +public class RestBulkAction extends BaseRestHandler { + + @Inject public RestBulkAction(Settings settings, Client client, RestController controller) { + super(settings, client); + + controller.registerHandler(POST, "/_bulk", this); + controller.registerHandler(PUT, "/_bulk", this); + } + + @Override public void handleRequest(final RestRequest request, final RestChannel channel) { + BulkRequest bulkRequest = Requests.bulkRequest(); + + int fromIndex = request.contentByteArrayOffset(); + byte[] data = request.contentByteArray(); + int length = request.contentLength(); + + try { + // first, guess the content + XContent xContent = XContentFactory.xContent(data, fromIndex, length); + byte marker = xContent.streamSeparator(); + while (true) { + int nextMarker = findNextMarker(marker, fromIndex, data, length); + if (nextMarker == -1) { + break; + } + // now parse the action + XContentParser parser = xContent.createParser(data, fromIndex, nextMarker - fromIndex); + + // move pointers + fromIndex = nextMarker + 1; + + // Move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token == null) { + continue; + } + assert token == XContentParser.Token.START_OBJECT; + // Move to FIELD_NAME, that's the action + token = parser.nextToken(); + assert token == XContentParser.Token.FIELD_NAME; + String action = parser.currentName(); + // Move to START_OBJECT + token = parser.nextToken(); + assert token == XContentParser.Token.START_OBJECT; + + String index = null; + String type = null; + String id = null; + + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if ("index".equals(currentFieldName)) { + index = parser.text(); + } else if ("type".equals(currentFieldName)) { + type = parser.text(); + } else if ("id".equals(currentFieldName)) { + id = parser.text(); + } + } + } + + if ("delete".equals(action)) { + bulkRequest.add(new DeleteRequest(index, type, id)); + } else { + nextMarker = findNextMarker(marker, fromIndex, data, length); + if (nextMarker == -1) { + break; + } + bulkRequest.add(new IndexRequest(index, type, id) + .create("create".equals(action)) + .source(data, fromIndex, nextMarker - fromIndex, request.contentUnsafe())); + // move pointers + fromIndex = nextMarker + 1; + } + } + } catch (Exception e) { + try { + XContentBuilder builder = restContentBuilder(request); + channel.sendResponse(new XContentRestResponse(request, BAD_REQUEST, builder.startObject().field("error", e.getMessage()).endObject())); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + return; + } + + client.bulk(bulkRequest, new ActionListener() { + @Override public void onResponse(BulkResponse response) { + try { + XContentBuilder builder = restContentBuilder(request); + builder.startObject(); + + for (BulkItemResponse itemResponse : response) { + builder.startObject(itemResponse.opType()); + builder.field("index", itemResponse.index()); + builder.field("type", itemResponse.type()); + builder.field("id", itemResponse.id()); + if (itemResponse.failed()) { + builder.field("error", itemResponse.failure().message()); + } + builder.endObject(); + } + + builder.endObject(); + channel.sendResponse(new XContentRestResponse(request, OK, builder)); + } catch (Exception e) { + onFailure(e); + } + } + + @Override public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } + + private int findNextMarker(byte marker, int from, byte[] data, int length) { + for (int i = from; i < length; i++) { + if (data[i] == marker) { + return i; + } + } + return -1; + } +}