better index request serialization handling

This commit is contained in:
kimchy 2010-05-13 08:43:09 +03:00
parent 4d6f2d56f0
commit e35d67afd7
3 changed files with 84 additions and 27 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.index;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionRequestValidationException;
@ -34,6 +35,7 @@ import org.elasticsearch.util.xcontent.builder.BinaryXContentBuilder;
import org.elasticsearch.util.xcontent.builder.XContentBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import static org.elasticsearch.action.Actions.*;
@ -102,10 +104,13 @@ public class IndexRequest extends ShardReplicationOperationRequest {
private String type;
private String id;
private byte[] source;
private OpType opType = OpType.INDEX;
private transient XContentBuilder sourceBuilder;
private byte[] source;
private int sourceOffset;
private int sourceLength;
private boolean sourceUnsafe;
private OpType opType = OpType.INDEX;
public IndexRequest() {
}
@ -138,12 +143,19 @@ public class IndexRequest extends ShardReplicationOperationRequest {
if (type == null) {
validationException = addValidationError("type is missing", validationException);
}
if (source == null && sourceBuilder == null) {
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
return validationException;
}
/**
* Before we fork on a local thread, make sure we copy over the bytes if they are unsafe
*/
@Override protected void beforeLocalFork() {
source();
}
/**
* Sets the index the index operation will happen on.
*/
@ -203,12 +215,9 @@ public class IndexRequest extends ShardReplicationOperationRequest {
* The source of the JSON document to index.
*/
byte[] source() {
if (source == null && sourceBuilder != null) {
try {
source = sourceBuilder.copiedBytes();
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to build source", e);
}
if (sourceUnsafe) {
source = Arrays.copyOfRange(source, sourceOffset, sourceLength);
sourceUnsafe = false;
}
return source;
}
@ -223,7 +232,7 @@ public class IndexRequest extends ShardReplicationOperationRequest {
}
/**
* Writes the JSON as the provided content type.
* Writes the Map as the provided content type.
*
* @param source The map to index
*/
@ -231,21 +240,24 @@ public class IndexRequest extends ShardReplicationOperationRequest {
try {
BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(contentType);
builder.map(source);
this.source = builder.copiedBytes();
return source(builder);
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + source + "]", e);
}
return this;
}
/**
* Sets the JSON source to index.
* Sets the document source to index.
*
* <p>Note, its preferable to either set it using {@link #source(org.elasticsearch.util.xcontent.builder.XContentBuilder)}
* or using the {@link #source(byte[])}.
*/
@Required public IndexRequest source(String source) {
this.source = Unicode.fromStringAsBytes(source);
UnicodeUtil.UTF8Result result = Unicode.fromStringAsUtf8(source);
this.source = result.result;
this.sourceOffset = 0;
this.sourceLength = result.length;
this.sourceUnsafe = true;
return this;
}
@ -253,15 +265,51 @@ public class IndexRequest extends ShardReplicationOperationRequest {
* Sets the content source to index.
*/
@Required public IndexRequest source(XContentBuilder sourceBuilder) {
this.sourceBuilder = sourceBuilder;
try {
source = sourceBuilder.unsafeBytes();
sourceOffset = 0;
sourceLength = sourceBuilder.unsafeBytesLength();
sourceUnsafe = true;
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + sourceBuilder + "]", e);
}
return this;
}
/**
* Sets the JSON source to index.
* Sets the document to index in bytes form.
*/
@Required public IndexRequest source(byte[] source) {
public IndexRequest source(byte[] source) {
return source(source, 0, source.length);
}
/**
* Sets the document to index in bytes form (assumed to be safe to be used from different
* threads).
*
* @param source The source to index
* @param offset The offset in the byte array
* @param length The length of the data
* @return
*/
@Required public IndexRequest source(byte[] source, int offset, int length) {
return source(source, offset, length, false);
}
/**
* Sets the document to index in bytes form.
*
* @param source The source to index
* @param offset The offset in the byte array
* @param length The length of the data
* @param unsafe Is the byte array safe to be used form a different thread
* @return
*/
@Required public IndexRequest source(byte[] source, int offset, int length, boolean unsafe) {
this.source = source;
this.sourceOffset = offset;
this.sourceLength = length;
this.sourceUnsafe = unsafe;
return this;
}
@ -319,8 +367,13 @@ public class IndexRequest extends ShardReplicationOperationRequest {
if (in.readBoolean()) {
id = in.readUTF();
}
source = new byte[in.readVInt()];
sourceUnsafe = false;
sourceOffset = 0;
sourceLength = in.readVInt();
source = new byte[sourceLength];
in.readFully(source);
opType = OpType.fromId(in.readByte());
}
@ -333,17 +386,12 @@ public class IndexRequest extends ShardReplicationOperationRequest {
out.writeBoolean(true);
out.writeUTF(id);
}
if (source != null) {
out.writeVInt(source.length);
out.writeBytes(source);
} else {
out.writeVInt(sourceBuilder.unsafeBytesLength());
out.writeBytes(sourceBuilder.unsafeBytes(), 0, sourceBuilder.unsafeBytesLength());
}
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
out.writeByte(opType.id());
}
@Override public String toString() {
return "[" + index + "][" + type + "][" + id + "], source[" + Unicode.fromBytes(source) + "]";
return "[" + index + "][" + type + "][" + id + "], source[" + Unicode.fromBytes(source, sourceOffset, sourceLength) + "]";
}
}

View File

@ -107,4 +107,11 @@ public abstract class ShardReplicationOperationRequest implements ActionRequest
timeout.writeTo(out);
out.writeUTF(index);
}
/**
* Called before the request gets forked into a local thread.
*/
protected void beforeLocalFork() {
}
}

View File

@ -246,6 +246,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
foundPrimary = true;
if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (request.operationThreaded()) {
request.beforeLocalFork();
threadPool.execute(new Runnable() {
@Override public void run() {
performOnPrimary(shard.id(), fromClusterEvent, true, shard);
@ -434,6 +435,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
});
} else {
if (request.operationThreaded()) {
request.beforeLocalFork();
threadPool.execute(new Runnable() {
@Override public void run() {
try {