From e52daa96705d143dc3360abd9dcb93f8978d1ed8 Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 15 Sep 2010 18:18:48 +0200 Subject: [PATCH] automatic index creation when using the bulk api --- .../action/bulk/TransportBulkAction.java | 67 ++++++++++++++++++- .../action/index/TransportIndexAction.java | 2 +- .../action/bulk/BulkRequestBuilder.java | 17 +++++ .../rest/action/bulk/RestBulkAction.java | 2 + 4 files changed, 85 insertions(+), 3 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index a6ef673dced..873c2bda2e2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -23,6 +23,9 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.TransportActions; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.BaseAction; @@ -31,9 +34,11 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.UUID; import org.elasticsearch.common.collect.Lists; import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportRequestHandler; @@ -42,6 +47,8 @@ import org.elasticsearch.transport.TransportService; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -49,6 +56,8 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class TransportBulkAction extends BaseAction { + private final boolean autoCreateIndex; + private final boolean allowIdGeneration; private final ThreadPool threadPool; @@ -59,20 +68,74 @@ public class TransportBulkAction extends BaseAction { private final TransportShardBulkAction shardBulkAction; + private final TransportCreateIndexAction createIndexAction; + @Inject public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, IndicesService indicesService, - TransportShardBulkAction shardBulkAction) { + TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction) { super(settings); this.threadPool = threadPool; this.clusterService = clusterService; this.indicesService = indicesService; this.shardBulkAction = shardBulkAction; + this.createIndexAction = createIndexAction; - this.allowIdGeneration = componentSettings.getAsBoolean("allow_id_generation", true); + this.autoCreateIndex = settings.getAsBoolean("action.auto_create_index", true); + this.allowIdGeneration = componentSettings.getAsBoolean("action.allow_id_generation", true); transportService.registerHandler(TransportActions.BULK, new TransportHandler()); } @Override protected void doExecute(final BulkRequest bulkRequest, final ActionListener listener) { + Set indices = Sets.newHashSet(); + for (ActionRequest request : bulkRequest.requests) { + if (request instanceof IndexRequest) { + IndexRequest indexRequest = (IndexRequest) request; + if (!indices.contains(indexRequest.index())) { + indices.add(indexRequest.index()); + } + } else if (request instanceof DeleteRequest) { + DeleteRequest deleteRequest = (DeleteRequest) request; + if (!indices.contains(deleteRequest.index())) { + indices.add(deleteRequest.index()); + } + } + } + + if (autoCreateIndex) { + final AtomicInteger counter = new AtomicInteger(indices.size()); + final AtomicBoolean failed = new AtomicBoolean(); + for (String index : indices) { + if (!clusterService.state().metaData().hasConcreteIndex(index)) { + createIndexAction.execute(new CreateIndexRequest(index).cause("auto(bulk api)"), new ActionListener() { + @Override public void onResponse(CreateIndexResponse result) { + if (counter.decrementAndGet() == 0) { + executeBulk(bulkRequest, listener); + } + } + + @Override public void onFailure(Throwable e) { + if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) { + // we have the index, do it + if (counter.decrementAndGet() == 0) { + executeBulk(bulkRequest, listener); + } + } else if (failed.compareAndSet(false, true)) { + listener.onFailure(e); + } + } + }); + } else { + if (counter.decrementAndGet() == 0) { + executeBulk(bulkRequest, listener); + } + } + } + } else { + executeBulk(bulkRequest, listener); + } + } + + private void executeBulk(final BulkRequest bulkRequest, final ActionListener listener) { ClusterState clusterState = clusterService.state(); for (ActionRequest request : bulkRequest.requests) { if (request instanceof IndexRequest) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 6accf2e3425..93921004999 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -74,7 +74,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi this.createIndexAction = createIndexAction; this.mappingUpdatedAction = mappingUpdatedAction; this.autoCreateIndex = settings.getAsBoolean("action.auto_create_index", true); - this.allowIdGeneration = componentSettings.getAsBoolean("allow_id_generation", true); + this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true); } @Override protected void doExecute(final IndexRequest indexRequest, final ActionListener listener) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/bulk/BulkRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/bulk/BulkRequestBuilder.java index d4a6dc01e90..82e555d47e2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/bulk/BulkRequestBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/bulk/BulkRequestBuilder.java @@ -30,6 +30,9 @@ import org.elasticsearch.client.action.index.IndexRequestBuilder; import org.elasticsearch.client.action.support.BaseRequestBuilder; /** + * A bulk request holds an ordered {@link IndexRequest}s and {@link DeleteRequest}s and allows to executes + * it in a single batch. + * * @author kimchy (shay.banon) */ public class BulkRequestBuilder extends BaseRequestBuilder { @@ -38,21 +41,35 @@ public class BulkRequestBuilder extends BaseRequestBuilder