From bdea0e2eddb4373b850e00d8e363c5240d78d180 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 13 Aug 2012 11:24:48 +0200 Subject: [PATCH] add a simple bulk processor to simplify using the bulk API --- .../action/bulk/BulkProcessor.java | 207 ++++++++++++++++++ .../action/bulk/BulkRequest.java | 53 ++++- .../common/unit/ByteSizeValue.java | 9 + 3 files changed, 264 insertions(+), 5 deletions(-) create mode 100644 src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java new file mode 100644 index 00000000000..57d6d019022 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -0,0 +1,207 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; + +import java.util.concurrent.Semaphore; + +/** + * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request + * (either based on number of actions, or based on the size), and to easily control the number of concurrent bulk + * requests allowed to be executed in parallel. + *

+ * In order to create a new bulk processor, use the {@link Builder}. + */ +public class BulkProcessor { + + /** + * A builder used to create a build an instance of a bulk processor. + */ + public static class Builder { + + private final Client client; + private final ActionListener listener; + + private int concurrentRequests = 1; + private int bulkActions = 1000; + private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); + + /** + * Creates a builder of bulk processor with the client to use and the listener that will be used + * to be notified on the completion of bulk requests. + */ + public Builder(Client client, ActionListener listener) { + this.client = client; + this.listener = listener; + } + + /** + * Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single + * request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed + * while accumulating new bulk requests. Defaults to 1. + */ + public Builder setConcurrentRequests(int concurrentRequests) { + this.concurrentRequests = concurrentRequests; + return this; + } + + /** + * Sets when to flush a new bulk request based on the number of actions currently added. Defaults to + * 1000. Can be set to -1 to disable it. + */ + public Builder setBulkActions(int bulkActions) { + this.bulkActions = bulkActions; + return this; + } + + /** + * Sets when to flush a new bulk request based on the size of actions currently added. Defaults to + * 5mb. Can be set to -1 to disable it. + */ + public Builder setBulkSize(ByteSizeValue bulkSize) { + this.bulkSize = bulkSize; + return this; + } + + /** + * Builds a new bulk processor. + */ + public BulkProcessor build() { + return new BulkProcessor(client, listener, concurrentRequests, bulkActions, bulkSize); + } + } + + public static Builder builder(Client client, ActionListener listener) { + return new Builder(client, listener); + } + + private final Client client; + private final ActionListener listener; + + private int concurrentRequests; + private final int bulkActions; + private final int bulkSize; + + private final Semaphore semaphore; + + private BulkRequest bulkRequest; + + BulkProcessor(Client client, ActionListener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize) { + this.client = client; + this.listener = listener; + this.concurrentRequests = concurrentRequests; + this.bulkActions = bulkActions; + this.bulkSize = bulkSize.bytesAsInt(); + + this.semaphore = new Semaphore(concurrentRequests); + this.bulkRequest = new BulkRequest(); + } + + /** + * Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest} + * (for example, if no id is provided, one will be generated, or usage of the create flag). + */ + public BulkProcessor add(IndexRequest request) { + return add((ActionRequest) request); + } + + /** + * Adds an {@link DeleteRequest} to the list of actions to execute. + */ + public BulkProcessor add(DeleteRequest request) { + return add((ActionRequest) request); + } + + public BulkProcessor add(ActionRequest request) { + internalAdd(request); + return this; + } + + private synchronized void internalAdd(ActionRequest request) { + bulkRequest.add(request); + executeIfNeeded(); + } + + public synchronized BulkProcessor add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception { + bulkRequest.add(data, contentUnsafe, defaultIndex, defaultType); + executeIfNeeded(); + return this; + } + + private void executeIfNeeded() { + if (!isOverTheLimit()) { + return; + } + if (concurrentRequests == 0) { + // execute in a blocking fashion... + try { + listener.onResponse(client.bulk(bulkRequest).actionGet()); + } catch (Exception e) { + listener.onFailure(e); + } + } else { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + listener.onFailure(e); + return; + } + client.bulk(bulkRequest, new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + try { + listener.onResponse(response); + } finally { + semaphore.release(); + } + } + + @Override + public void onFailure(Throwable e) { + try { + listener.onFailure(e); + } finally { + semaphore.release(); + } + } + }); + } + bulkRequest = new BulkRequest(); + } + + private boolean isOverTheLimit() { + if (bulkActions != -1 && bulkRequest.numberOfActions() > bulkActions) { + return true; + } + if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() > bulkSize) { + return true; + } + return false; + } +} diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index a2b9ef59b9f..0d5fe5ea8b6 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.bulk; import com.google.common.collect.Lists; +import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.WriteConsistencyLevel; @@ -50,6 +51,8 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; */ public class BulkRequest implements ActionRequest { + private static final int REQUEST_OVERHEAD = 50; + final List requests = Lists.newArrayList(); private boolean listenerThreaded = false; @@ -58,6 +61,40 @@ public class BulkRequest implements ActionRequest { private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; private boolean refresh = false; + private long sizeInBytes = 0; + + /** + * Adds a list of requests to be executed. Either index or delete requests. + */ + public BulkRequest add(ActionRequest... requests) { + for (ActionRequest request : requests) { + if (request instanceof IndexRequest) { + add((IndexRequest) request); + } else if (request instanceof DeleteRequest) { + add((DeleteRequest) request); + } else { + throw new ElasticSearchIllegalArgumentException("No support for request [" + request + "]"); + } + } + return this; + } + + /** + * Adds a list of requests to be executed. Either index or delete requests. + */ + public BulkRequest add(Iterable requests) { + for (ActionRequest request : requests) { + if (request instanceof IndexRequest) { + add((IndexRequest) request); + } else if (request instanceof DeleteRequest) { + add((DeleteRequest) request); + } else { + throw new ElasticSearchIllegalArgumentException("No support for request [" + request + "]"); + } + } + return this; + } + /** * Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest} * (for example, if no id is provided, one will be generated, or usage of the create flag). @@ -67,8 +104,9 @@ public class BulkRequest implements ActionRequest { return internalAdd(request); } - private BulkRequest internalAdd(IndexRequest request) { + BulkRequest internalAdd(IndexRequest request) { requests.add(request); + sizeInBytes += request.source().length() + REQUEST_OVERHEAD; return this; } @@ -77,6 +115,7 @@ public class BulkRequest implements ActionRequest { */ public BulkRequest add(DeleteRequest request) { requests.add(request); + sizeInBytes += REQUEST_OVERHEAD; return this; } @@ -84,6 +123,14 @@ public class BulkRequest implements ActionRequest { return this.requests; } + public int numberOfActions() { + return requests.size(); + } + + public long estimatedSizeInBytes() { + return sizeInBytes; + } + /** * Adds a framed data in binary format */ @@ -263,10 +310,6 @@ public class BulkRequest implements ActionRequest { return -1; } - public int numberOfActions() { - return requests.size(); - } - @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; diff --git a/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java b/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java index 1876482e2d9..0c4e9b167be 100644 --- a/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java +++ b/src/main/java/org/elasticsearch/common/unit/ByteSizeValue.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.unit; +import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -50,6 +51,14 @@ public class ByteSizeValue implements Serializable, Streamable { this.sizeUnit = sizeUnit; } + public int bytesAsInt() throws ElasticSearchIllegalArgumentException { + long bytes = bytes(); + if (bytes > Integer.MAX_VALUE) { + throw new ElasticSearchIllegalArgumentException("size [" + toString() + "] is bigger than max int"); + } + return (int) bytes; + } + public long bytes() { return sizeUnit.toBytes(size); }