allow to associate a payload with bulk requests

This commit is contained in:
Shay Banon 2012-11-19 16:16:35 +01:00
parent 83257c8af8
commit 6e597ffccb
2 changed files with 89 additions and 18 deletions

View File

@ -223,18 +223,29 @@ public class BulkProcessor {
return add((ActionRequest) request);
}
/**
* Adds either a delete or an index request.
*/
public BulkProcessor add(ActionRequest request) {
internalAdd(request);
return add(request, null);
}
public BulkProcessor add(ActionRequest request, @Nullable Object payload) {
internalAdd(request, payload);
return this;
}
private synchronized void internalAdd(ActionRequest request) {
bulkRequest.add(request);
private synchronized void internalAdd(ActionRequest request, @Nullable Object payload) {
bulkRequest.add(request, payload);
executeIfNeeded();
}
public synchronized BulkProcessor add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
bulkRequest.add(data, contentUnsafe, defaultIndex, defaultType);
public BulkProcessor add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
return add(data, contentUnsafe, defaultIndex, defaultType, null);
}
public synchronized BulkProcessor add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable Object payload) throws Exception {
bulkRequest.add(data, contentUnsafe, defaultIndex, defaultType, payload);
executeIfNeeded();
return this;
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.VersionType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -54,6 +55,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
private static final int REQUEST_OVERHEAD = 50;
final List<ActionRequest> requests = Lists.newArrayList();
List<Object> payloads = null;
private ReplicationType replicationType = ReplicationType.DEFAULT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
@ -66,14 +68,23 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
*/
public BulkRequest add(ActionRequest... requests) {
for (ActionRequest request : requests) {
add(request, null);
}
return this;
}
public BulkRequest add(ActionRequest request) {
return add(request, null);
}
public BulkRequest add(ActionRequest request, @Nullable Object payload) {
if (request instanceof IndexRequest) {
add((IndexRequest) request);
add((IndexRequest) request, payload);
} else if (request instanceof DeleteRequest) {
add((DeleteRequest) request);
add((DeleteRequest) request, payload);
} else {
throw new ElasticSearchIllegalArgumentException("No support for request [" + request + "]");
}
}
return this;
}
@ -99,11 +110,17 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
*/
public BulkRequest add(IndexRequest request) {
request.beforeLocalFork();
return internalAdd(request);
return internalAdd(request, null);
}
BulkRequest internalAdd(IndexRequest request) {
public BulkRequest add(IndexRequest request, @Nullable Object payload) {
request.beforeLocalFork();
return internalAdd(request, payload);
}
BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) {
requests.add(request);
addPayload(payload);
sizeInBytes += request.source().length() + REQUEST_OVERHEAD;
return this;
}
@ -112,19 +129,58 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
* Adds an {@link DeleteRequest} to the list of actions to execute.
*/
public BulkRequest add(DeleteRequest request) {
return add(request, null);
}
public BulkRequest add(DeleteRequest request, @Nullable Object payload) {
requests.add(request);
addPayload(payload);
sizeInBytes += REQUEST_OVERHEAD;
return this;
}
private void addPayload(Object payload) {
if (payloads == null) {
if (payload == null) {
return;
}
payloads = new ArrayList<Object>(requests.size() + 10);
// add requests#size-1 elements to the payloads if it null (we add for an *existing* request)
for (int i = 1; i < requests.size(); i++) {
payloads.add(null);
}
}
payloads.add(payload);
}
/**
* The list of requests in this bulk request.
*/
public List<ActionRequest> requests() {
return this.requests;
}
/**
* The list of optional payloads associated with requests in the same order as the requests. Note, elements within
* it might be null if no payload has been provided.
* <p/>
* Note, if no payloads have been provided, this method will return null (as to conserve memory overhead).
*/
@Nullable
public List<Object> payloads() {
return this.payloads;
}
/**
* The number of actions in the bulk request.
*/
public int numberOfActions() {
return requests.size();
}
/**
* The estimated size in bytes of the bulk request.
*/
public long estimatedSizeInBytes() {
return sizeInBytes;
}
@ -147,6 +203,10 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
return add(data, contentUnsafe, defaultIndex, defaultType, null);
}
public BulkRequest add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable Object payload) throws Exception {
XContent xContent = XContentFactory.xContent(data);
int from = 0;
int length = data.length();
@ -225,7 +285,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
}
if ("delete".equals(action)) {
add(new DeleteRequest(index, type, id).parent(parent).version(version).versionType(versionType).routing(routing));
add(new DeleteRequest(index, type, id).parent(parent).version(version).versionType(versionType).routing(routing), payload);
} else {
nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
@ -238,18 +298,18 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.source(data.slice(from, nextMarker - from), contentUnsafe)
.percolate(percolate));
.percolate(percolate), payload);
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create("create".equals(opType))
.source(data.slice(from, nextMarker - from), contentUnsafe)
.percolate(percolate));
.percolate(percolate), payload);
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create(true)
.source(data.slice(from, nextMarker - from), contentUnsafe)
.percolate(percolate));
.percolate(percolate), payload);
}
// move pointers
from = nextMarker + 1;