more work on bulk, client API

This commit is contained in:
kimchy 2010-09-14 22:08:43 +02:00
parent d96ffe9153
commit f6fa6ea44e
35 changed files with 1539 additions and 40 deletions

View File

@ -0,0 +1,98 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.stress;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.node.Node;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.elasticsearch.common.xcontent.XContentFactory.*;
import static org.elasticsearch.node.NodeBuilder.*;
/**
* @author kimchy (shay.banon)
*/
public class SingleThreadBulkStress {
public static void main(String[] args) throws Exception {
Random random = new Random();
Settings settings = settingsBuilder()
.put("cluster.routing.schedule", 200, TimeUnit.MILLISECONDS)
.put("index.engine.robin.refreshInterval", "-1")
.put(SETTING_NUMBER_OF_SHARDS, 2)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.build();
Node node1 = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "server1")).node();
Node node2 = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "server2")).node();
Node client = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "client")).client(true).node();
Client client1 = client.client();
Thread.sleep(1000);
client1.admin().indices().create(createIndexRequest("test")).actionGet();
Thread.sleep(5000);
StopWatch stopWatch = new StopWatch().start();
int COUNT = 200000;
int BATCH = 1000;
System.out.println("Indexing [" + COUNT + "] ...");
int ITERS = COUNT / BATCH;
int i = 1;
for (; i <= ITERS; i++) {
BulkRequestBuilder request = client1.prepareBulk();
for (int j = 0; j < BATCH; j++) {
request.add(Requests.indexRequest("test").type("type1").id(Integer.toString(i)).source(source(Integer.toString(i), "test" + i)));
}
BulkResponse response = request.execute().actionGet();
if (response.hasFailures()) {
System.err.println("failures...");
}
if (((i * BATCH) % 10000) == 0) {
System.out.println("Indexed " + (i * 100) + " took " + stopWatch.stop().lastTaskTime());
stopWatch.start();
}
}
System.out.println("Indexing took " + stopWatch.totalTime() + ", TPS " + (((double) COUNT) / stopWatch.totalTime().secondsFrac()));
client.close();
node1.close();
node2.close();
}
private static XContentBuilder source(String id, String nameValue) throws IOException {
return jsonBuilder().startObject().field("id", id).field("name", nameValue).endObject();
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.benchmark.stress;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.settings.Settings;
@ -37,7 +36,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.*;
import static org.elasticsearch.node.NodeBuilder.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class SingleThreadIndexingStress {
@ -67,13 +66,8 @@ public class SingleThreadIndexingStress {
System.out.println("Indexing [" + COUNT + "] ...");
int i = 1;
for (; i <= COUNT; i++) {
client1.index(
indexRequest("test")
.type("type1")
.id(Integer.toString(i))
.source(source(Integer.toString(i), "test" + i))
.opType(IndexRequest.OpType.INDEX)
).actionGet();
client1.prepareIndex("test", "type1").setId(Integer.toString(i)).setSource(source(Integer.toString(i), "test" + i))
.setCreate(false).execute().actionGet();
if ((i % 10000) == 0) {
System.out.println("Indexed " + i + " took " + stopWatch.stop().lastTaskTime());
stopWatch.start();

View File

@ -39,6 +39,12 @@ public class ActionRequestValidationException extends ElasticSearchException {
validationErrors.add(error);
}
public void addValidationErrors(Iterable<String> errors) {
for (String error : errors) {
validationErrors.add(error);
}
}
public List<String> validationErrors() {
return validationErrors;
}

View File

@ -41,6 +41,8 @@ import org.elasticsearch.action.admin.indices.optimize.TransportOptimizeAction;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.admin.indices.settings.TransportUpdateSettingsAction;
import org.elasticsearch.action.admin.indices.status.TransportIndicesStatusAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.count.TransportCountAction;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
@ -93,6 +95,9 @@ public class TransportActionModule extends AbstractModule {
bind(TransportDeleteAction.class).asEagerSingleton();
bind(TransportCountAction.class).asEagerSingleton();
bind(TransportBulkAction.class).asEagerSingleton();
bind(TransportShardBulkAction.class).asEagerSingleton();
bind(TransportShardDeleteByQueryAction.class).asEagerSingleton();
bind(TransportIndexDeleteByQueryAction.class).asEagerSingleton();
bind(TransportDeleteByQueryAction.class).asEagerSingleton();

View File

@ -24,6 +24,8 @@ package org.elasticsearch.action;
*/
public class TransportActions {
public static final String BULK = "indices/bulk";
public static final String INDEX = "indices/index/shard/index";
public static final String COUNT = "indices/count";

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class BulkItemRequest implements Streamable {
private int id;
private ActionRequest request;
BulkItemRequest() {
}
public BulkItemRequest(int id, ActionRequest request) {
this.id = id;
this.request = request;
}
public int id() {
return id;
}
public ActionRequest request() {
return request;
}
public static BulkItemRequest readBulkItem(StreamInput in) throws IOException {
BulkItemRequest item = new BulkItemRequest();
item.readFrom(in);
return item;
}
@Override public void readFrom(StreamInput in) throws IOException {
id = in.readVInt();
byte type = in.readByte();
if (type == 0) {
request = new IndexRequest();
} else if (type == 1) {
request = new DeleteRequest();
}
request.readFrom(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
}
request.writeTo(out);
}
}

View File

@ -0,0 +1,229 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class BulkItemResponse implements Streamable {
public static class Failure {
private final String index;
private final String type;
private final String id;
private final String message;
public Failure(String index, String type, String id, String message) {
this.index = index;
this.type = type;
this.id = id;
this.message = message;
}
public String index() {
return this.index;
}
public String getIndex() {
return index();
}
public String message() {
return this.message;
}
public String getMessage() {
return message();
}
public String type() {
return type;
}
public String getType() {
return type();
}
public String id() {
return id;
}
public String getId() {
return this.id;
}
}
private int id;
private String opType;
private ActionResponse response;
private Failure failure;
BulkItemResponse() {
}
public BulkItemResponse(int id, String opType, ActionResponse response) {
this.id = id;
this.opType = opType;
this.response = response;
}
public BulkItemResponse(int id, String opType, Failure failure) {
this.id = id;
this.opType = opType;
this.failure = failure;
}
public int itemId() {
return id;
}
public String opType() {
return this.opType;
}
public String index() {
if (failure != null) {
return failure.index();
}
if (response instanceof IndexResponse) {
return ((IndexResponse) response).index();
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).index();
}
return null;
}
public String getIndex() {
return index();
}
public String type() {
if (failure != null) {
return failure.type();
}
if (response instanceof IndexResponse) {
return ((IndexResponse) response).type();
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).type();
}
return null;
}
public String getType() {
return this.type();
}
public String id() {
if (failure != null) {
return failure.id();
}
if (response instanceof IndexResponse) {
return ((IndexResponse) response).id();
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).id();
}
return null;
}
public String getId() {
return id();
}
public ActionResponse response() {
return response;
}
public boolean failed() {
return failure != null;
}
public boolean isFailed() {
return failed();
}
public Failure failure() {
return this.failure;
}
public Failure getFailure() {
return failure();
}
public static BulkItemResponse readBulkItem(StreamInput in) throws IOException {
BulkItemResponse response = new BulkItemResponse();
response.readFrom(in);
return response;
}
@Override public void readFrom(StreamInput in) throws IOException {
id = in.readVInt();
opType = in.readUTF();
byte type = in.readByte();
if (type == 0) {
response = new IndexResponse();
response.readFrom(in);
} else if (type == 1) {
response = new DeleteResponse();
response.readFrom(in);
}
if (in.readBoolean()) {
failure = new Failure(in.readUTF(), in.readUTF(), in.readUTF(), in.readUTF());
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
out.writeUTF(opType);
if (response == null) {
out.writeByte((byte) 2);
} else {
if (response instanceof IndexResponse) {
out.writeByte((byte) 0);
} else if (response instanceof DeleteResponse) {
out.writeByte((byte) 1);
}
response.writeTo(out);
}
if (failure == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(failure.index());
out.writeUTF(failure.type());
out.writeUTF(failure.id());
out.writeUTF(failure.message());
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.List;
import static org.elasticsearch.action.Actions.*;
/**
* @author kimchy (shay.banon)
*/
public class BulkRequest implements ActionRequest {
final List<ActionRequest> requests = Lists.newArrayList();
private boolean listenerThreaded = false;
public BulkRequest add(IndexRequest request) {
// if the source is from a builder, we need to copy it over before adding the next one, which can come from a builder as well...
if (request.sourceFromBuilder()) {
request.beforeLocalFork();
}
requests.add(request);
return this;
}
public BulkRequest add(DeleteRequest request) {
requests.add(request);
return this;
}
@Override public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (requests.isEmpty()) {
validationException = addValidationError("no requests added", validationException);
}
for (int i = 0; i < requests.size(); i++) {
ActionRequestValidationException ex = requests.get(i).validate();
if (ex != null) {
if (validationException == null) {
validationException = new ActionRequestValidationException();
}
validationException.addValidationErrors(ex.validationErrors());
}
}
return validationException;
}
@Override public boolean listenerThreaded() {
return listenerThreaded;
}
@Override public BulkRequest listenerThreaded(boolean listenerThreaded) {
this.listenerThreaded = listenerThreaded;
return this;
}
@Override public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
for (int i = 0; i < size; i++) {
byte type = in.readByte();
if (type == 0) {
IndexRequest request = new IndexRequest();
request.readFrom(in);
requests.add(request);
} else if (type == 1) {
DeleteRequest request = new DeleteRequest();
request.readFrom(in);
requests.add(request);
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(requests.size());
for (ActionRequest request : requests) {
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
}
request.writeTo(out);
}
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Iterator;
/**
* A response of a bulk execution. Holding a response for each item responding (in order) of the
* bulk requests.
*
* @author kimchy (shay.banon)
*/
public class BulkResponse implements ActionResponse, Iterable<BulkItemResponse> {
private BulkItemResponse[] responses;
BulkResponse() {
}
public BulkResponse(BulkItemResponse[] responses) {
this.responses = responses;
}
public boolean hasFailures() {
for (BulkItemResponse response : responses) {
if (response.failed()) {
return true;
}
}
return false;
}
public BulkItemResponse[] items() {
return responses;
}
@Override public Iterator<BulkItemResponse> iterator() {
return Iterators.forArray(responses);
}
@Override public void readFrom(StreamInput in) throws IOException {
responses = new BulkItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = BulkItemResponse.readBulkItem(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(responses.length);
for (BulkItemResponse response : responses) {
response.writeTo(out);
}
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class BulkShardRequest extends ShardReplicationOperationRequest {
private int shardId;
private BulkItemRequest[] items;
BulkShardRequest() {
}
BulkShardRequest(String index, int shardId, BulkItemRequest[] items) {
this.index = index;
this.shardId = shardId;
this.items = items;
}
int shardId() {
return shardId;
}
BulkItemRequest[] items() {
return items;
}
/**
* Before we fork on a local thread, make sure we copy over the bytes if they are unsafe
*/
@Override public void beforeLocalFork() {
for (BulkItemRequest item : items) {
((ShardReplicationOperationRequest) item.request()).beforeLocalFork();
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(shardId);
out.writeVInt(items.length);
for (BulkItemRequest item : items) {
item.writeTo(out);
}
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = in.readVInt();
items = new BulkItemRequest[in.readVInt()];
for (int i = 0; i < items.length; i++) {
items[i] = BulkItemRequest.readBulkItem(in);
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class BulkShardResponse implements ActionResponse {
private ShardId shardId;
private BulkItemResponse[] responses;
BulkShardResponse() {
}
BulkShardResponse(ShardId shardId, BulkItemResponse[] responses) {
this.shardId = shardId;
this.responses = responses;
}
public ShardId shardId() {
return shardId;
}
public BulkItemResponse[] responses() {
return responses;
}
@Override public void readFrom(StreamInput in) throws IOException {
shardId = ShardId.readShardId(in);
responses = new BulkItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = BulkItemResponse.readBulkItem(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeVInt(responses.length);
for (BulkItemResponse response : responses) {
response.writeTo(out);
}
}
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.UUID;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author kimchy (shay.banon)
*/
public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
private final boolean allowIdGeneration;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final TransportShardBulkAction shardBulkAction;
@Inject public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, IndicesService indicesService,
TransportShardBulkAction shardBulkAction) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardBulkAction = shardBulkAction;
this.allowIdGeneration = componentSettings.getAsBoolean("allow_id_generation", true);
transportService.registerHandler(TransportActions.BULK, new TransportHandler());
}
@Override protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
ClusterState clusterState = clusterService.state();
for (ActionRequest request : bulkRequest.requests) {
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
indexRequest.index(clusterState.metaData().concreteIndex(indexRequest.index()));
if (allowIdGeneration) {
if (indexRequest.id() == null) {
indexRequest.id(UUID.randomUUID().toString());
// since we generate the id, change it to CREATE
indexRequest.opType(IndexRequest.OpType.CREATE);
}
}
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
deleteRequest.index(clusterState.metaData().concreteIndex(deleteRequest.index()));
}
}
// first, go over all the requests and create a ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = Maps.newHashMap();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
ActionRequest request = bulkRequest.requests.get(i);
ShardId shardId = null;
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
shardId = indicesService.indexServiceSafe(indexRequest.index()).operationRouting().indexShards(clusterState, indexRequest.type(), indexRequest.id()).shardId();
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
shardId = indicesService.indexServiceSafe(deleteRequest.index()).operationRouting().deleteShards(clusterState, deleteRequest.type(), deleteRequest.id()).shardId();
}
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {
list = Lists.newArrayList();
requestsByShard.put(shardId, list);
}
list.add(new BulkItemRequest(i, request));
}
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
final BulkItemResponse[] responses = new BulkItemResponse[bulkRequest.requests.size()];
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
shardBulkAction.execute(new BulkShardRequest(shardId.index().name(), shardId.id(), requests.toArray(new BulkItemRequest[requests.size()])), new ActionListener<BulkShardResponse>() {
@Override public void onResponse(BulkShardResponse bulkShardResponse) {
synchronized (responses) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.responses()) {
responses[bulkItemResponse.itemId()] = bulkItemResponse;
}
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override public void onFailure(Throwable e) {
// create failures for all relevant requests
String message = ExceptionsHelper.detailedMessage(e);
synchronized (responses) {
for (BulkItemRequest request : requests) {
if (request.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request.request();
responses[request.id()] = new BulkItemResponse(request.id(), indexRequest.opType().toString().toLowerCase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), message));
} else if (request.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request.request();
responses[request.id()] = new BulkItemResponse(request.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), message));
}
}
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
if (bulkRequest.listenerThreaded()) {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(new BulkResponse(responses));
}
});
} else {
listener.onResponse(new BulkResponse(responses));
}
}
});
}
}
class TransportHandler extends BaseTransportRequestHandler<BulkRequest> {
@Override public BulkRequest newInstance() {
return new BulkRequest();
}
@Override public void messageReceived(final BulkRequest request, final TransportChannel channel) throws Exception {
// no need to use threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<BulkResponse>() {
@Override public void onResponse(BulkResponse result) {
try {
channel.sendResponse(result);
} catch (Exception e) {
onFailure(e);
}
}
@Override public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send error response for action [" + TransportActions.BULK + "] and request [" + request + "]", e1);
}
}
});
}
@Override public boolean spawn() {
// no need to spawn, since in the doExecute we always execute with threaded operation set to true
return false;
}
}
}

View File

@ -0,0 +1,228 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.bulk;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Set;
/**
* Performs the index operation.
*
* @author kimchy (shay.banon)
*/
public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardResponse> {
private final MappingUpdatedAction mappingUpdatedAction;
@Inject public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction) {
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
this.mappingUpdatedAction = mappingUpdatedAction;
}
@Override protected TransportRequestOptions transportOptions() {
return TransportRequestOptions.options().withCompress(true);
}
@Override protected BulkShardRequest newRequestInstance() {
return new BulkShardRequest();
}
@Override protected BulkShardResponse newResponseInstance() {
return new BulkShardResponse();
}
@Override protected String transportAction() {
return "indices/index/shard/bulk";
}
@Override protected void checkBlock(BulkShardRequest request, ClusterState state) {
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}
@Override protected ShardsIterator shards(ClusterState clusterState, BulkShardRequest request) {
return clusterState.routingTable().index(request.index()).shard(request.shardId()).shardsIt();
}
@Override protected BulkShardResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest);
final BulkShardRequest request = shardRequest.request;
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
Engine.Operation[] ops = new Engine.Operation[request.items().length];
for (int i = 0; i < ops.length; i++) {
BulkItemRequest item = request.items()[i];
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
try {
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
ops[i] = indexShard.prepareIndex(indexRequest.type(), indexRequest.id(), indexRequest.source());
} else {
ops[i] = indexShard.prepareCreate(indexRequest.type(), indexRequest.id(), indexRequest.source());
}
} catch (Exception e) {
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(e)));
}
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
try {
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id());
} catch (Exception e) {
responses[i] = new BulkItemResponse(item.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(e)));
}
}
}
EngineException[] failures = indexShard.bulk(new Engine.Bulk(ops));
// process failures and mappings
Set<String> processedTypes = Sets.newHashSet();
for (int i = 0; i < ops.length; i++) {
// failed to parse, already set the failure, skip
if (ops[i] == null) {
continue;
}
BulkItemRequest item = request.items()[i];
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index engineIndex = (Engine.Index) ops[i];
if (!processedTypes.contains(engineIndex.type())) {
processedTypes.add(engineIndex.type());
ParsedDocument doc = engineIndex.parsedDoc();
if (doc.mappersAdded()) {
updateMappingOnMaster(indexRequest);
}
}
} else {
Engine.Create engineCreate = (Engine.Create) ops[i];
if (!processedTypes.contains(engineCreate.type())) {
processedTypes.add(engineCreate.type());
ParsedDocument doc = engineCreate.parsedDoc();
if (doc.mappersAdded()) {
updateMappingOnMaster(indexRequest);
}
}
}
if (failures != null && failures[i] != null) {
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(failures[i])));
} else {
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id()));
}
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
if (failures != null && failures[i] != null) {
responses[i] = new BulkItemResponse(item.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(failures[i])));
} else {
responses[i] = new BulkItemResponse(item.id(), "delete",
new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id()));
}
}
}
return new BulkShardResponse(new ShardId(request.index(), request.shardId()), responses);
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest);
final BulkShardRequest request = shardRequest.request;
Engine.Operation[] ops = new Engine.Operation[request.items().length];
for (int i = 0; i < ops.length; i++) {
BulkItemRequest item = request.items()[i];
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
try {
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
ops[i] = indexShard.prepareIndex(indexRequest.type(), indexRequest.id(), indexRequest.source());
} else {
ops[i] = indexShard.prepareCreate(indexRequest.type(), indexRequest.id(), indexRequest.source());
}
} catch (Exception e) {
// ignore, we are on backup
}
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
try {
ops[i] = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id());
} catch (Exception e) {
// ignore, we are on backup
}
}
}
indexShard.bulk(new Engine.Bulk(ops));
}
private void updateMappingOnMaster(final IndexRequest request) {
try {
MapperService mapperService = indicesService.indexServiceSafe(request.index()).mapperService();
final DocumentMapper documentMapper = mapperService.documentMapper(request.type());
documentMapper.refreshSource();
mappingUpdatedAction.execute(new MappingUpdatedAction.MappingUpdatedRequest(request.index(), request.type(), documentMapper.mappingSource()), new ActionListener<MappingUpdatedAction.MappingUpdatedResponse>() {
@Override public void onResponse(MappingUpdatedAction.MappingUpdatedResponse mappingUpdatedResponse) {
// all is well
}
@Override public void onFailure(Throwable e) {
try {
logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "] and source [" + documentMapper.mappingSource().string() + "]", e);
} catch (IOException e1) {
// ignore
}
}
});
} catch (Exception e) {
logger.warn("Failed to update master on updated mapping for index [" + request.index() + "], type [" + request.type() + "]", e);
}
}
}

View File

@ -69,7 +69,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
this.id = id;
}
DeleteRequest() {
public DeleteRequest() {
}
@Override public ActionRequestValidationException validate() {
@ -119,7 +119,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
/**
* The type of the document to delete.
*/
String type() {
public String type() {
return type;
}
@ -134,7 +134,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
/**
* The id of the document to delete.
*/
String id() {
public String id() {
return id;
}

View File

@ -41,11 +41,11 @@ public class DeleteResponse implements ActionResponse, Streamable {
private String type;
DeleteResponse() {
public DeleteResponse() {
}
DeleteResponse(String index, String type, String id) {
public DeleteResponse(String index, String type, String id) {
this.index = index;
this.id = id;
this.type = type;

View File

@ -109,6 +109,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
private int sourceOffset;
private int sourceLength;
private boolean sourceUnsafe;
private boolean sourceFromBuilder;
private OpType opType = OpType.INDEX;
@ -150,10 +151,17 @@ public class IndexRequest extends ShardReplicationOperationRequest {
/**
* Before we fork on a local thread, make sure we copy over the bytes if they are unsafe
*/
@Override protected void beforeLocalFork() {
@Override public void beforeLocalFork() {
source();
}
/**
* Need this in case builders are used, we need to copy after adding...
*/
public boolean sourceFromBuilder() {
return sourceFromBuilder;
}
/**
* Sets the index the index operation will happen on.
*/
@ -182,7 +190,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
/**
* The type of the indexed document.
*/
String type() {
public String type() {
return type;
}
@ -197,7 +205,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
/**
* The id of the indexed document. If not set, will be automatically generated.
*/
String id() {
public String id() {
return id;
}
@ -212,7 +220,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
/**
* The source of the JSON document to index.
*/
byte[] source() {
public byte[] source() {
if (sourceUnsafe || sourceOffset > 0) {
source = Arrays.copyOfRange(source, sourceOffset, sourceLength);
sourceOffset = 0;
@ -269,6 +277,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
sourceOffset = 0;
sourceLength = sourceBuilder.unsafeBytesLength();
sourceUnsafe = true;
this.sourceFromBuilder = true;
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + sourceBuilder + "]", e);
}

View File

@ -41,11 +41,11 @@ public class IndexResponse implements ActionResponse, Streamable {
private String type;
IndexResponse() {
public IndexResponse() {
}
IndexResponse(String index, String type, String id) {
public IndexResponse(String index, String type, String id) {
this.index = index;
this.id = id;
this.type = type;

View File

@ -130,7 +130,7 @@ public abstract class ShardReplicationOperationRequest implements ActionRequest
/**
* Called before the request gets forked into a local thread.
*/
protected void beforeLocalFork() {
public void beforeLocalFork() {
}
}

View File

@ -109,6 +109,10 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
protected TransportRequestOptions transportOptions() {
return TransportRequestOptions.EMPTY;
}
/**
* Should the operations be performed on the replicas as well. Defaults to <tt>false</tt> meaning operations
* will be executed on the replica.
@ -283,7 +287,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, transportAction(), request, new BaseTransportResponseHandler<Response>() {
transportService.sendRequest(node, transportAction(), request, transportOptions(), new BaseTransportResponseHandler<Response>() {
@Override public Response newInstance() {
return newResponseInstance();
@ -511,7 +515,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
final ShardOperationRequest shardRequest = new ShardOperationRequest(shards.shardId().id(), request);
if (!nodeId.equals(nodes.localNodeId())) {
DiscoveryNode node = nodes.get(nodeId);
transportService.sendRequest(node, transportReplicaAction(), shardRequest, new VoidTransportResponseHandler() {
transportService.sendRequest(node, transportReplicaAction(), shardRequest, transportOptions(), new VoidTransportResponseHandler() {
@Override public void handleResponse(VoidStreamable vResponse) {
finishIfPossible();
}

View File

@ -21,6 +21,8 @@ package org.elasticsearch.client;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@ -35,6 +37,7 @@ import org.elasticsearch.action.mlt.MoreLikeThisRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.action.count.CountRequestBuilder;
import org.elasticsearch.client.action.delete.DeleteRequestBuilder;
import org.elasticsearch.client.action.deletebyquery.DeleteByQueryRequestBuilder;
@ -153,6 +156,29 @@ public interface Client {
*/
DeleteRequestBuilder prepareDelete(String index, String type, String id);
/**
* Executes a bulk of index / delete operations.
*
* @param request The bulk request
* @return The result future
* @see org.elasticsearch.client.Requests#bulkRequest()
*/
ActionFuture<BulkResponse> bulk(BulkRequest request);
/**
* Executes a bulk of index / delete operations.
*
* @param request The bulk request
* @param listener A listener to be notified with a result
* @see org.elasticsearch.client.Requests#bulkRequest()
*/
void bulk(BulkRequest request, ActionListener<BulkResponse> listener);
/**
* Executes a bulk of index / delete operations.
*/
BulkRequestBuilder prepareBulk();
/**
* Deletes all documents from one or more indices based on a query.
*

View File

@ -39,6 +39,7 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
@ -89,6 +90,13 @@ public class Requests {
return new DeleteRequest(index);
}
/**
* Creats a new bulk request.
*/
public static BulkRequest bulkRequest() {
return new BulkRequest();
}
/**
* Creates a delete by query request. Note, the query itself must be set either by setting the JSON source
* of the query, or by using a {@link org.elasticsearch.index.query.QueryBuilder} (using {@link org.elasticsearch.index.query.xcontent.QueryBuilders}).

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.action.bulk;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.action.delete.DeleteRequestBuilder;
import org.elasticsearch.client.action.index.IndexRequestBuilder;
import org.elasticsearch.client.action.support.BaseRequestBuilder;
/**
* @author kimchy (shay.banon)
*/
public class BulkRequestBuilder extends BaseRequestBuilder<BulkRequest, BulkResponse> {
public BulkRequestBuilder(Client client) {
super(client, new BulkRequest());
}
public BulkRequestBuilder add(IndexRequest request) {
super.request.add(request);
return this;
}
public BulkRequestBuilder add(IndexRequestBuilder request) {
super.request.add(request.request());
return this;
}
public BulkRequestBuilder add(DeleteRequest request) {
super.request.add(request);
return this;
}
public BulkRequestBuilder add(DeleteRequestBuilder request) {
super.request.add(request.request());
return this;
}
@Override protected void doExecute(ActionListener<BulkResponse> listener) {
client.bulk(request, listener);
}
}

View File

@ -42,6 +42,10 @@ public abstract class BaseRequestBuilder<Request extends ActionRequest, Response
this.request = request;
}
public Request request() {
return this.request;
}
@Override public ListenableActionFuture<Response> execute() {
PlainListenableActionFuture<Response> future = new PlainListenableActionFuture<Response>(request.listenerThreaded(), client.threadPool());
execute(future);

View File

@ -21,6 +21,9 @@ package org.elasticsearch.client.node;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.count.TransportCountAction;
@ -59,6 +62,8 @@ public class NodeClient extends AbstractClient implements InternalClient {
private final TransportDeleteAction deleteAction;
private final TransportBulkAction bulkAction;
private final TransportDeleteByQueryAction deleteByQueryAction;
private final TransportGetAction getAction;
@ -72,7 +77,7 @@ public class NodeClient extends AbstractClient implements InternalClient {
private final TransportMoreLikeThisAction moreLikeThisAction;
@Inject public NodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin,
TransportIndexAction indexAction, TransportDeleteAction deleteAction,
TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportBulkAction bulkAction,
TransportDeleteByQueryAction deleteByQueryAction, TransportGetAction getAction, TransportCountAction countAction,
TransportSearchAction searchAction, TransportSearchScrollAction searchScrollAction,
TransportMoreLikeThisAction moreLikeThisAction) {
@ -80,6 +85,7 @@ public class NodeClient extends AbstractClient implements InternalClient {
this.admin = admin;
this.indexAction = indexAction;
this.deleteAction = deleteAction;
this.bulkAction = bulkAction;
this.deleteByQueryAction = deleteByQueryAction;
this.getAction = getAction;
this.countAction = countAction;
@ -116,6 +122,14 @@ public class NodeClient extends AbstractClient implements InternalClient {
deleteAction.execute(request, listener);
}
@Override public ActionFuture<BulkResponse> bulk(BulkRequest request) {
return bulkAction.execute(request);
}
@Override public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
bulkAction.execute(request, listener);
}
@Override public ActionFuture<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest request) {
return deleteByQueryAction.execute(request);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.client.support;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.action.count.CountRequestBuilder;
import org.elasticsearch.client.action.delete.DeleteRequestBuilder;
import org.elasticsearch.client.action.deletebyquery.DeleteByQueryRequestBuilder;
@ -55,6 +56,10 @@ public abstract class AbstractClient implements InternalClient {
return prepareDelete().setIndex(index).setType(type).setId(id);
}
@Override public BulkRequestBuilder prepareBulk() {
return new BulkRequestBuilder(this);
}
@Override public DeleteByQueryRequestBuilder prepareDeleteByQuery(String... indices) {
return new DeleteByQueryRequestBuilder(this).setIndices(indices);
}

View File

@ -22,6 +22,8 @@ package org.elasticsearch.client.transport;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@ -248,6 +250,14 @@ public class TransportClient extends AbstractClient {
internalClient.delete(request, listener);
}
@Override public ActionFuture<BulkResponse> bulk(BulkRequest request) {
return internalClient.bulk(request);
}
@Override public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
internalClient.bulk(request, listener);
}
@Override public ActionFuture<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest request) {
return internalClient.deleteByQuery(request);
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.client.transport.action.admin.indices.optimize.ClientTr
import org.elasticsearch.client.transport.action.admin.indices.refresh.ClientTransportRefreshAction;
import org.elasticsearch.client.transport.action.admin.indices.settings.ClientTransportUpdateSettingsAction;
import org.elasticsearch.client.transport.action.admin.indices.status.ClientTransportIndicesStatusAction;
import org.elasticsearch.client.transport.action.bulk.ClientTransportBulkAction;
import org.elasticsearch.client.transport.action.count.ClientTransportCountAction;
import org.elasticsearch.client.transport.action.delete.ClientTransportDeleteAction;
import org.elasticsearch.client.transport.action.deletebyquery.ClientTransportDeleteByQueryAction;
@ -61,6 +62,7 @@ public class ClientTransportActionModule extends AbstractModule {
bind(ClientTransportCountAction.class).asEagerSingleton();
bind(ClientTransportSearchAction.class).asEagerSingleton();
bind(ClientTransportSearchScrollAction.class).asEagerSingleton();
bind(ClientTransportBulkAction.class).asEagerSingleton();
bind(ClientTransportIndicesStatusAction.class).asEagerSingleton();
bind(ClientTransportRefreshAction.class).asEagerSingleton();

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport.action.bulk;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.action.support.BaseClientTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
/**
* @author kimchy (shay.banon)
*/
public class ClientTransportBulkAction extends BaseClientTransportAction<BulkRequest, BulkResponse> {
@Inject public ClientTransportBulkAction(Settings settings, TransportService transportService) {
super(settings, transportService, BulkResponse.class);
}
@Override protected String action() {
return TransportActions.BULK;
}
@Override protected TransportRequestOptions options() {
return TransportRequestOptions.options().withCompress(true);
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.lang.reflect.Constructor;
@ -67,7 +68,7 @@ public abstract class BaseClientTransportAction<Request extends ActionRequest, R
}
@Override public void execute(DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
transportService.sendRequest(node, action(), request, new BaseTransportResponseHandler<Response>() {
transportService.sendRequest(node, action(), request, options(), new BaseTransportResponseHandler<Response>() {
@Override public Response newInstance() {
return BaseClientTransportAction.this.newInstance();
}
@ -86,6 +87,10 @@ public abstract class BaseClientTransportAction<Request extends ActionRequest, R
});
}
protected TransportRequestOptions options() {
return TransportRequestOptions.EMPTY;
}
protected abstract String action();
protected Response newInstance() {

View File

@ -22,6 +22,8 @@ package org.elasticsearch.client.transport.support;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@ -40,6 +42,7 @@ import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.internal.InternalClient;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.client.transport.action.bulk.ClientTransportBulkAction;
import org.elasticsearch.client.transport.action.count.ClientTransportCountAction;
import org.elasticsearch.client.transport.action.delete.ClientTransportDeleteAction;
import org.elasticsearch.client.transport.action.deletebyquery.ClientTransportDeleteByQueryAction;
@ -68,6 +71,8 @@ public class InternalTransportClient extends AbstractClient implements InternalC
private final ClientTransportDeleteAction deleteAction;
private final ClientTransportBulkAction bulkAction;
private final ClientTransportGetAction getAction;
private final ClientTransportDeleteByQueryAction deleteByQueryAction;
@ -82,7 +87,7 @@ public class InternalTransportClient extends AbstractClient implements InternalC
@Inject public InternalTransportClient(Settings settings, ThreadPool threadPool,
TransportClientNodesService nodesService, InternalTransportAdminClient adminClient,
ClientTransportIndexAction indexAction, ClientTransportDeleteAction deleteAction, ClientTransportGetAction getAction,
ClientTransportIndexAction indexAction, ClientTransportDeleteAction deleteAction, ClientTransportBulkAction bulkAction, ClientTransportGetAction getAction,
ClientTransportDeleteByQueryAction deleteByQueryAction, ClientTransportCountAction countAction,
ClientTransportSearchAction searchAction, ClientTransportSearchScrollAction searchScrollAction,
ClientTransportMoreLikeThisAction moreLikeThisAction) {
@ -92,6 +97,7 @@ public class InternalTransportClient extends AbstractClient implements InternalC
this.indexAction = indexAction;
this.deleteAction = deleteAction;
this.bulkAction = bulkAction;
this.getAction = getAction;
this.deleteByQueryAction = deleteByQueryAction;
this.countAction = countAction;
@ -146,6 +152,23 @@ public class InternalTransportClient extends AbstractClient implements InternalC
});
}
@Override public ActionFuture<BulkResponse> bulk(final BulkRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<BulkResponse>>() {
@Override public ActionFuture<BulkResponse> doWithNode(DiscoveryNode node) throws ElasticSearchException {
return bulkAction.execute(node, request);
}
});
}
@Override public void bulk(final BulkRequest request, final ActionListener<BulkResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
bulkAction.execute(node, request, listener);
return null;
}
});
}
@Override public ActionFuture<DeleteByQueryResponse> deleteByQuery(final DeleteByQueryRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<DeleteByQueryResponse>>() {
@Override public ActionFuture<DeleteByQueryResponse> doWithNode(DiscoveryNode node) throws ElasticSearchException {

View File

@ -189,6 +189,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
for (int i = 0; i < bulk.ops().length; i++) {
Operation op = bulk.ops()[i];
if (op == null) {
continue;
}
try {
switch (op.opType()) {
case CREATE:

View File

@ -68,6 +68,8 @@ public interface IndexShard extends IndexShardComponent, CloseableComponent {
void delete(Term uid) throws ElasticSearchException;
EngineException[] bulk(Engine.Bulk bulk) throws ElasticSearchException;
void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException;
byte[] get(String type, String id) throws ElasticSearchException;

View File

@ -264,6 +264,14 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
engine.delete(delete);
}
@Override public EngineException[] bulk(Engine.Bulk bulk) throws ElasticSearchException {
writeAllowed();
if (logger.isTraceEnabled()) {
logger.trace("bulk, items [{}]", bulk.ops().length);
}
return engine.bulk(bulk);
}
@Override public void deleteByQuery(byte[] querySource, @Nullable String queryParserName, String... types) throws ElasticSearchException {
writeAllowed();
if (types == null) {

View File

@ -29,6 +29,12 @@ public class AliasedIndexDocumentActionsTests extends DocumentActionsTests {
protected void createIndex() {
logger.info("Creating index [test1] with alias [test]");
try {
client1.admin().indices().prepareDelete("test1").execute().actionGet();
} catch (Exception e) {
// ignore
}
logger.info("--> creating index test");
client1.admin().indices().create(createIndexRequest("test1").settings(settingsBuilder().putArray("index.aliases", "test"))).actionGet();
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRespo
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
@ -34,11 +35,14 @@ import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.client.Requests.*;
@ -54,7 +58,7 @@ public class DocumentActionsTests extends AbstractNodesTests {
protected Client client1;
protected Client client2;
@BeforeMethod public void startNodes() {
@BeforeClass public void startNodes() {
startNode("server1");
startNode("server2");
client1 = getClient1();
@ -68,12 +72,15 @@ public class DocumentActionsTests extends AbstractNodesTests {
// all is well
}
client1.prepareCount().setQuery(termQuery("_type", "type1")).setOperationThreading(BroadcastOperationThreading.NO_THREADS).execute().actionGet();
createIndex();
}
protected void createIndex() {
logger.info("Creating index test");
try {
client1.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
logger.info("--> creating index test");
client1.admin().indices().create(createIndexRequest("test")).actionGet();
}
@ -81,7 +88,7 @@ public class DocumentActionsTests extends AbstractNodesTests {
return "test";
}
@AfterMethod public void closeNodes() {
@AfterClass public void closeNodes() {
client1.close();
client2.close();
closeAllNodes();
@ -96,6 +103,7 @@ public class DocumentActionsTests extends AbstractNodesTests {
}
@Test public void testIndexActions() throws Exception {
createIndex();
logger.info("Running Cluster Health");
ClusterHealthResponse clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
@ -128,10 +136,10 @@ public class DocumentActionsTests extends AbstractNodesTests {
for (int i = 0; i < 5; i++) {
getResult = client1.prepareGet("test", "type1", "1").setOperationThreaded(false).execute().actionGet();
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test")));
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test").string()));
assertThat("cycle(map) #" + i, (String) ((Map) getResult.sourceAsMap().get("type1")).get("name"), equalTo("test"));
getResult = client1.get(getRequest("test").type("type1").id("1").operationThreaded(true)).actionGet();
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test")));
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test").string()));
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
}
@ -171,9 +179,9 @@ public class DocumentActionsTests extends AbstractNodesTests {
for (int i = 0; i < 5; i++) {
getResult = client1.get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test")));
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test").string()));
getResult = client1.get(getRequest("test").type("type1").id("2")).actionGet();
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("2", "test2")));
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("2", "test2").string()));
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
}
@ -214,14 +222,82 @@ public class DocumentActionsTests extends AbstractNodesTests {
for (int i = 0; i < 5; i++) {
getResult = client1.get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test")));
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("1", "test").string()));
getResult = client1.get(getRequest("test").type("type1").id("2")).actionGet();
assertThat("cycle #" + i, getResult.exists(), equalTo(false));
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
}
}
private String source(String id, String nameValue) {
return "{ type1 : { \"id\" : \"" + id + "\", \"name\" : \"" + nameValue + "\" } }";
@Test public void testBulk() throws Exception {
createIndex();
logger.info("-> running Cluster Health");
ClusterHealthResponse clusterHealth = client1.admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet();
logger.info("Done Cluster Health, status " + clusterHealth.status());
assertThat(clusterHealth.timedOut(), equalTo(false));
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
BulkResponse bulkResponse = client1.prepareBulk()
.add(client1.prepareIndex().setIndex("test").setType("type1").setId("1").setSource(source("1", "test")))
.add(client1.prepareIndex().setIndex("test").setType("type1").setId("2").setSource(source("2", "test")).setCreate(true))
.add(client1.prepareIndex().setIndex("test").setType("type1").setSource(source("3", "test")))
.add(client1.prepareDelete().setIndex("test").setType("type1").setId("1"))
.add(client1.prepareIndex().setIndex("test").setType("type1").setSource("{ xxx }")) // failure
.execute().actionGet();
assertThat(bulkResponse.hasFailures(), equalTo(true));
assertThat(bulkResponse.items().length, equalTo(5));
assertThat(bulkResponse.items()[0].isFailed(), equalTo(false));
assertThat(bulkResponse.items()[0].opType(), equalTo("index"));
assertThat(bulkResponse.items()[0].index(), equalTo(getConcreteIndexName()));
assertThat(bulkResponse.items()[0].type(), equalTo("type1"));
assertThat(bulkResponse.items()[0].id(), equalTo("1"));
assertThat(bulkResponse.items()[1].isFailed(), equalTo(false));
assertThat(bulkResponse.items()[1].opType(), equalTo("create"));
assertThat(bulkResponse.items()[1].index(), equalTo(getConcreteIndexName()));
assertThat(bulkResponse.items()[1].type(), equalTo("type1"));
assertThat(bulkResponse.items()[1].id(), equalTo("2"));
assertThat(bulkResponse.items()[2].isFailed(), equalTo(false));
assertThat(bulkResponse.items()[2].opType(), equalTo("create"));
assertThat(bulkResponse.items()[2].index(), equalTo(getConcreteIndexName()));
assertThat(bulkResponse.items()[2].type(), equalTo("type1"));
String generatedId3 = bulkResponse.items()[2].id();
assertThat(bulkResponse.items()[3].isFailed(), equalTo(false));
assertThat(bulkResponse.items()[3].opType(), equalTo("delete"));
assertThat(bulkResponse.items()[3].index(), equalTo(getConcreteIndexName()));
assertThat(bulkResponse.items()[3].type(), equalTo("type1"));
assertThat(bulkResponse.items()[3].id(), equalTo("1"));
assertThat(bulkResponse.items()[4].isFailed(), equalTo(true));
assertThat(bulkResponse.items()[4].opType(), equalTo("create"));
assertThat(bulkResponse.items()[4].index(), equalTo(getConcreteIndexName()));
assertThat(bulkResponse.items()[4].type(), equalTo("type1"));
RefreshResponse refreshResponse = client1.admin().indices().prepareRefresh("test").execute().actionGet();
assertThat(refreshResponse.successfulShards(), equalTo(10));
assertThat(refreshResponse.failedShards(), equalTo(0));
for (int i = 0; i < 5; i++) {
GetResponse getResult = client1.get(getRequest("test").type("type1").id("1")).actionGet();
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
assertThat("cycle #" + i, getResult.exists(), equalTo(false));
getResult = client1.get(getRequest("test").type("type1").id("2")).actionGet();
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("2", "test").string()));
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
getResult = client1.get(getRequest("test").type("type1").id(generatedId3)).actionGet();
assertThat("cycle #" + i, getResult.sourceAsString(), equalTo(source("3", "test").string()));
assertThat(getResult.index(), equalTo(getConcreteIndexName()));
}
}
private XContentBuilder source(String id, String nameValue) throws IOException {
return XContentFactory.jsonBuilder().startObject().startObject("type1").field("id", id).field("name", nameValue).endObject().endObject();
}
}