diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
index 57d6d019022..1f0c1bce5b4 100644
--- a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
+++ b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
@@ -19,48 +19,84 @@
package org.elasticsearch.action.bulk;
+import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
+import org.elasticsearch.client.internal.InternalClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
/**
* A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
- * (either based on number of actions, or based on the size), and to easily control the number of concurrent bulk
+ * (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk
* requests allowed to be executed in parallel.
*
* In order to create a new bulk processor, use the {@link Builder}.
*/
public class BulkProcessor {
+ /**
+ * A listener for the execution.
+ */
+ public static interface Listener {
+
+ /**
+ * Callback before the bulk is executed.
+ */
+ void beforeBulk(long executionId, BulkRequest request);
+
+ /**
+ * Callback after a successful execution of bulk request.
+ */
+ void afterBulk(long executionId, BulkRequest request, BulkResponse response);
+
+ /**
+ * Callback after a failed execution of bulk request.
+ */
+ void afterBulk(long executionId, BulkRequest request, Throwable failure);
+ }
+
/**
* A builder used to create a build an instance of a bulk processor.
*/
public static class Builder {
private final Client client;
- private final ActionListener listener;
+ private final Listener listener;
+ private String name;
private int concurrentRequests = 1;
private int bulkActions = 1000;
private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
+ private TimeValue flushInterval = null;
/**
* Creates a builder of bulk processor with the client to use and the listener that will be used
* to be notified on the completion of bulk requests.
*/
- public Builder(Client client, ActionListener listener) {
+ public Builder(Client client, Listener listener) {
this.client = client;
this.listener = listener;
}
+ /**
+ * Sets an optional name to identify this bulk processor.
+ */
+ public Builder setName(String name) {
+ this.name = name;
+ return this;
+ }
+
/**
* Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single
* request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed
@@ -89,38 +125,87 @@ public class BulkProcessor {
return this;
}
+ /**
+ * Sets a flush interval flushing *any* bulk actions pending if the interval passes. Defaults to not set.
+ *
+ * Note, both {@link #setBulkActions(int)} and {@link #setBulkSize(org.elasticsearch.common.unit.ByteSizeValue)}
+ * can be set to -1 with the flush interval set allowing for complete async processing of bulk actions.
+ */
+ public Builder setFlushInterval(TimeValue flushInterval) {
+ this.flushInterval = flushInterval;
+ return this;
+ }
+
/**
* Builds a new bulk processor.
*/
public BulkProcessor build() {
- return new BulkProcessor(client, listener, concurrentRequests, bulkActions, bulkSize);
+ return new BulkProcessor(client, listener, name, concurrentRequests, bulkActions, bulkSize, flushInterval);
}
}
- public static Builder builder(Client client, ActionListener listener) {
+ public static Builder builder(Client client, Listener listener) {
return new Builder(client, listener);
}
private final Client client;
- private final ActionListener listener;
+ private final Listener listener;
- private int concurrentRequests;
+ private final String name;
+
+ private final int concurrentRequests;
private final int bulkActions;
private final int bulkSize;
+ private final TimeValue flushInterval;
private final Semaphore semaphore;
+ private final ScheduledThreadPoolExecutor scheduler;
+ private final ScheduledFuture scheduledFuture;
+
+ private final AtomicLong executionIdGen = new AtomicLong();
private BulkRequest bulkRequest;
- BulkProcessor(Client client, ActionListener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize) {
+ private volatile boolean closed = false;
+
+ BulkProcessor(Client client, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) {
this.client = client;
this.listener = listener;
+ this.name = name;
this.concurrentRequests = concurrentRequests;
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.bytesAsInt();
this.semaphore = new Semaphore(concurrentRequests);
this.bulkRequest = new BulkRequest();
+
+ this.flushInterval = flushInterval;
+ if (flushInterval != null) {
+ this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(((InternalClient) client).settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));
+ this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);
+ } else {
+ this.scheduler = null;
+ this.scheduledFuture = null;
+ }
+ }
+
+ /**
+ * Closes the processor. If flushing by time is enabled, then its shutdown. Any remaining bulk actions are flushed.
+ */
+ public synchronized void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ if (this.scheduledFuture != null) {
+ this.scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+ if (bulkRequest.numberOfActions() > 0) {
+ execute();
+ }
}
/**
@@ -155,28 +240,44 @@ public class BulkProcessor {
}
private void executeIfNeeded() {
+ if (closed) {
+ throw new ElasticSearchIllegalStateException("bulk process already closed");
+ }
+ this.closed = true;
if (!isOverTheLimit()) {
return;
}
+ execute();
+ }
+
+ // (currently) needs to be executed under a lock
+ private void execute() {
+ final BulkRequest bulkRequest = this.bulkRequest;
+ final long executionId = executionIdGen.incrementAndGet();
+
+ this.bulkRequest = new BulkRequest();
+
if (concurrentRequests == 0) {
// execute in a blocking fashion...
try {
- listener.onResponse(client.bulk(bulkRequest).actionGet());
+ listener.beforeBulk(executionId, bulkRequest);
+ listener.afterBulk(executionId, bulkRequest, client.bulk(bulkRequest).actionGet());
} catch (Exception e) {
- listener.onFailure(e);
+ listener.afterBulk(executionId, bulkRequest, e);
}
} else {
try {
semaphore.acquire();
} catch (InterruptedException e) {
- listener.onFailure(e);
+ listener.afterBulk(executionId, bulkRequest, e);
return;
}
+ listener.beforeBulk(executionId, bulkRequest);
client.bulk(bulkRequest, new ActionListener() {
@Override
public void onResponse(BulkResponse response) {
try {
- listener.onResponse(response);
+ listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
}
@@ -185,14 +286,13 @@ public class BulkProcessor {
@Override
public void onFailure(Throwable e) {
try {
- listener.onFailure(e);
+ listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
}
}
});
}
- bulkRequest = new BulkRequest();
}
private boolean isOverTheLimit() {
@@ -204,4 +304,20 @@ public class BulkProcessor {
}
return false;
}
+
+ class Flush implements Runnable {
+
+ @Override
+ public void run() {
+ synchronized (BulkProcessor.this) {
+ if (closed) {
+ return;
+ }
+ if (bulkRequest.numberOfActions() == 0) {
+ return;
+ }
+ execute();
+ }
+ }
+ }
}
diff --git a/src/main/java/org/elasticsearch/client/internal/InternalClient.java b/src/main/java/org/elasticsearch/client/internal/InternalClient.java
index da0223760ae..26ebea05870 100644
--- a/src/main/java/org/elasticsearch/client/internal/InternalClient.java
+++ b/src/main/java/org/elasticsearch/client/internal/InternalClient.java
@@ -20,6 +20,7 @@
package org.elasticsearch.client.internal;
import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
/**
@@ -27,5 +28,7 @@ import org.elasticsearch.threadpool.ThreadPool;
*/
public interface InternalClient extends Client {
+ Settings settings();
+
ThreadPool threadPool();
}
diff --git a/src/main/java/org/elasticsearch/client/node/NodeClient.java b/src/main/java/org/elasticsearch/client/node/NodeClient.java
index a9489f0de8a..27d33213575 100644
--- a/src/main/java/org/elasticsearch/client/node/NodeClient.java
+++ b/src/main/java/org/elasticsearch/client/node/NodeClient.java
@@ -37,6 +37,7 @@ import java.util.Map;
*/
public class NodeClient extends AbstractClient implements InternalClient {
+ private final Settings settings;
private final ThreadPool threadPool;
private final NodeAdminClient admin;
@@ -45,6 +46,7 @@ public class NodeClient extends AbstractClient implements InternalClient {
@Inject
public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin, Map actions) {
+ this.settings = settings;
this.threadPool = threadPool;
this.admin = admin;
MapBuilder actionsBuilder = new MapBuilder();
@@ -56,6 +58,11 @@ public class NodeClient extends AbstractClient implements InternalClient {
this.actions = actionsBuilder.immutableMap();
}
+ @Override
+ public Settings settings() {
+ return this.settings;
+ }
+
@Override
public ThreadPool threadPool() {
return this.threadPool;
diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java
index 977e44d6bd9..bcad0511799 100644
--- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java
+++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java
@@ -271,6 +271,11 @@ public class TransportClient extends AbstractClient {
ThreadLocals.clearReferencesThreadLocals();
}
+ @Override
+ public Settings settings() {
+ return this.settings;
+ }
+
@Override
public ThreadPool threadPool() {
return internalClient.threadPool();
diff --git a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java
index 9d30aca0c39..d5b3e75c404 100644
--- a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java
+++ b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java
@@ -40,6 +40,7 @@ import java.util.Map;
*/
public class InternalTransportClient extends AbstractClient implements InternalClient {
+ private final Settings settings;
private final ThreadPool threadPool;
private final TransportClientNodesService nodesService;
@@ -52,6 +53,7 @@ public class InternalTransportClient extends AbstractClient implements InternalC
public InternalTransportClient(Settings settings, ThreadPool threadPool, TransportService transportService,
TransportClientNodesService nodesService, InternalTransportAdminClient adminClient,
Map actions) {
+ this.settings = settings;
this.threadPool = threadPool;
this.nodesService = nodesService;
this.adminClient = adminClient;
@@ -70,6 +72,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
// nothing to do here
}
+ @Override
+ public Settings settings() {
+ return this.settings;
+ }
+
@Override
public ThreadPool threadPool() {
return this.threadPool;