diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index d87d3058bee..d0a62eac2fd 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -223,18 +223,29 @@ public class BulkProcessor { return add((ActionRequest) request); } + /** + * Adds either a delete or an index request. + */ public BulkProcessor add(ActionRequest request) { - internalAdd(request); + return add(request, null); + } + + public BulkProcessor add(ActionRequest request, @Nullable Object payload) { + internalAdd(request, payload); return this; } - private synchronized void internalAdd(ActionRequest request) { - bulkRequest.add(request); + private synchronized void internalAdd(ActionRequest request, @Nullable Object payload) { + bulkRequest.add(request, payload); executeIfNeeded(); } - public synchronized BulkProcessor add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception { - bulkRequest.add(data, contentUnsafe, defaultIndex, defaultType); + public BulkProcessor add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception { + return add(data, contentUnsafe, defaultIndex, defaultType, null); + } + + public synchronized BulkProcessor add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable Object payload) throws Exception { + bulkRequest.add(data, contentUnsafe, defaultIndex, defaultType, payload); executeIfNeeded(); return this; } diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index ad96e9fa3df..1293a699d58 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.VersionType; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -54,6 +55,7 @@ public class BulkRequest extends ActionRequest { private static final int REQUEST_OVERHEAD = 50; final List requests = Lists.newArrayList(); + List payloads = null; private ReplicationType replicationType = ReplicationType.DEFAULT; private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; @@ -66,13 +68,22 @@ public class BulkRequest extends ActionRequest { */ public BulkRequest add(ActionRequest... requests) { for (ActionRequest request : requests) { - if (request instanceof IndexRequest) { - add((IndexRequest) request); - } else if (request instanceof DeleteRequest) { - add((DeleteRequest) request); - } else { - throw new ElasticSearchIllegalArgumentException("No support for request [" + request + "]"); - } + add(request, null); + } + return this; + } + + public BulkRequest add(ActionRequest request) { + return add(request, null); + } + + public BulkRequest add(ActionRequest request, @Nullable Object payload) { + if (request instanceof IndexRequest) { + add((IndexRequest) request, payload); + } else if (request instanceof DeleteRequest) { + add((DeleteRequest) request, payload); + } else { + throw new ElasticSearchIllegalArgumentException("No support for request [" + request + "]"); } return this; } @@ -99,11 +110,17 @@ public class BulkRequest extends ActionRequest { */ public BulkRequest add(IndexRequest request) { request.beforeLocalFork(); - return internalAdd(request); + return internalAdd(request, null); } - BulkRequest internalAdd(IndexRequest request) { + public BulkRequest add(IndexRequest request, @Nullable Object payload) { + request.beforeLocalFork(); + return internalAdd(request, payload); + } + + BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) { requests.add(request); + addPayload(payload); sizeInBytes += request.source().length() + REQUEST_OVERHEAD; return this; } @@ -112,19 +129,58 @@ public class BulkRequest extends ActionRequest { * Adds an {@link DeleteRequest} to the list of actions to execute. */ public BulkRequest add(DeleteRequest request) { + return add(request, null); + } + + public BulkRequest add(DeleteRequest request, @Nullable Object payload) { requests.add(request); + addPayload(payload); sizeInBytes += REQUEST_OVERHEAD; return this; } + private void addPayload(Object payload) { + if (payloads == null) { + if (payload == null) { + return; + } + payloads = new ArrayList(requests.size() + 10); + // add requests#size-1 elements to the payloads if it null (we add for an *existing* request) + for (int i = 1; i < requests.size(); i++) { + payloads.add(null); + } + } + payloads.add(payload); + } + + /** + * The list of requests in this bulk request. + */ public List requests() { return this.requests; } + /** + * The list of optional payloads associated with requests in the same order as the requests. Note, elements within + * it might be null if no payload has been provided. + *

+ * Note, if no payloads have been provided, this method will return null (as to conserve memory overhead). + */ + @Nullable + public List payloads() { + return this.payloads; + } + + /** + * The number of actions in the bulk request. + */ public int numberOfActions() { return requests.size(); } + /** + * The estimated size in bytes of the bulk request. + */ public long estimatedSizeInBytes() { return sizeInBytes; } @@ -147,6 +203,10 @@ public class BulkRequest extends ActionRequest { * Adds a framed data in binary format */ public BulkRequest add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception { + return add(data, contentUnsafe, defaultIndex, defaultType, null); + } + + public BulkRequest add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable Object payload) throws Exception { XContent xContent = XContentFactory.xContent(data); int from = 0; int length = data.length(); @@ -225,7 +285,7 @@ public class BulkRequest extends ActionRequest { } if ("delete".equals(action)) { - add(new DeleteRequest(index, type, id).parent(parent).version(version).versionType(versionType).routing(routing)); + add(new DeleteRequest(index, type, id).parent(parent).version(version).versionType(versionType).routing(routing), payload); } else { nextMarker = findNextMarker(marker, from, data, length); if (nextMarker == -1) { @@ -238,18 +298,18 @@ public class BulkRequest extends ActionRequest { if (opType == null) { internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) .source(data.slice(from, nextMarker - from), contentUnsafe) - .percolate(percolate)); + .percolate(percolate), payload); } else { internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) .create("create".equals(opType)) .source(data.slice(from, nextMarker - from), contentUnsafe) - .percolate(percolate)); + .percolate(percolate), payload); } } else if ("create".equals(action)) { internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType) .create(true) .source(data.slice(from, nextMarker - from), contentUnsafe) - .percolate(percolate)); + .percolate(percolate), payload); } // move pointers from = nextMarker + 1;