better delete by query request serialization handling

This commit is contained in:
kimchy 2010-05-13 09:00:25 +03:00
parent e35d67afd7
commit 0e163e3649
3 changed files with 49 additions and 18 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.deletebyquery;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
@ -34,6 +35,7 @@ import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.xcontent.XContentFactory;
import org.elasticsearch.util.xcontent.XContentType;
import org.elasticsearch.util.xcontent.builder.BinaryXContentBuilder;
import org.elasticsearch.util.xcontent.builder.XContentBuilder;
import java.io.IOException;
import java.util.Arrays;
@ -58,11 +60,13 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
private static final XContentType contentType = Requests.CONTENT_TYPE;
private byte[] querySource;
private int querySourceOffset;
private int querySourceLength;
private boolean querySourceUnsafe;
private String queryParserName;
private String[] types = Strings.EMPTY_ARRAY;
private transient QueryBuilder queryBuilder;
/**
* Constructs a new delete by query request to run against the provided indices. No indices means
* it will run against all indices.
@ -84,7 +88,7 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
@Override public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
if (querySource == null && queryBuilder == null) {
if (querySource == null) {
validationException = addValidationError("query is missing", validationException);
}
return validationException;
@ -99,8 +103,10 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
* The query source to execute.
*/
byte[] querySource() {
if (querySource == null && queryBuilder != null) {
querySource = queryBuilder.buildAsBytes();
if (querySourceUnsafe) {
querySource = Arrays.copyOfRange(querySource, querySourceOffset, querySourceLength);
querySourceOffset = 0;
querySourceUnsafe = false;
}
return querySource;
}
@ -111,7 +117,11 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
* @see org.elasticsearch.index.query.xcontent.QueryBuilders
*/
@Required public DeleteByQueryRequest query(QueryBuilder queryBuilder) {
this.queryBuilder = queryBuilder;
FastByteArrayOutputStream bos = queryBuilder.buildAsUnsafeBytes();
this.querySource = bos.unsafeByteArray();
this.querySourceOffset = 0;
this.querySourceLength = bos.size();
this.querySourceUnsafe = true;
return this;
}
@ -120,7 +130,12 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
* or {@link #query(org.elasticsearch.index.query.QueryBuilder)}.
*/
@Required public DeleteByQueryRequest query(String querySource) {
return query(Unicode.fromStringAsBytes(querySource));
UnicodeUtil.UTF8Result result = Unicode.fromStringAsUtf8(querySource);
this.querySource = result.result;
this.querySourceOffset = 0;
this.querySourceLength = result.length;
this.querySourceUnsafe = true;
return this;
}
/**
@ -130,11 +145,22 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
try {
BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(contentType);
builder.map(querySource);
this.querySource = builder.copiedBytes();
return query(builder);
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + querySource + "]", e);
}
return this;
}
@Required public DeleteByQueryRequest query(XContentBuilder builder) {
try {
this.querySource = builder.unsafeBytes();
this.querySourceOffset = 0;
this.querySourceLength = builder.unsafeBytesLength();
this.querySourceUnsafe = true;
return this;
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + builder + "]", e);
}
}
/**
@ -185,8 +211,13 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
querySource = new byte[in.readVInt()];
querySourceUnsafe = false;
querySourceOffset = 0;
querySourceLength = in.readVInt();
querySource = new byte[querySourceLength];
in.readFully(querySource);
if (in.readBoolean()) {
queryParserName = in.readUTF();
}
@ -194,14 +225,10 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (querySource != null) {
out.writeVInt(querySource.length);
out.writeBytes(querySource);
} else {
FastByteArrayOutputStream os = queryBuilder.buildAsUnsafeBytes(contentType);
out.writeVInt(os.size());
out.writeBytes(os.unsafeByteArray(), 0, os.size());
}
out.writeVInt(querySourceLength);
out.writeBytes(querySource, querySourceOffset, querySourceLength);
if (queryParserName == null) {
out.writeBoolean(false);
} else {

View File

@ -217,6 +217,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
byte[] source() {
if (sourceUnsafe) {
source = Arrays.copyOfRange(source, sourceOffset, sourceLength);
sourceOffset = 0;
sourceUnsafe = false;
}
return source;

View File

@ -68,8 +68,11 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
for (final ShardsIterator shards : groups) {
ShardRequest shardRequest = newShardRequestInstance(request, shards.shardId().id());
// TODO for now, we fork operations on shards of the index
shardRequest.beforeLocalFork(); // optimize for local fork
shardRequest.operationThreaded(true);
// no need for threaded listener, we will fork when its done based on the index request
shardRequest.listenerThreaded(false);
shardAction.execute(shardRequest, new ActionListener<ShardResponse>() {