change DocumentRequest<?> to DocumentRequest for readibility
This commit is contained in:
parent
225a04b2cc
commit
661067d160
|
@ -35,7 +35,7 @@ import java.io.IOException;
|
|||
public class BulkItemRequest implements Streamable {
|
||||
|
||||
private int id;
|
||||
private DocumentRequest<?> request;
|
||||
private DocumentRequest request;
|
||||
private volatile BulkItemResponse primaryResponse;
|
||||
private volatile boolean ignoreOnReplica;
|
||||
|
||||
|
@ -43,7 +43,7 @@ public class BulkItemRequest implements Streamable {
|
|||
|
||||
}
|
||||
|
||||
public BulkItemRequest(int id, DocumentRequest<?> request) {
|
||||
public BulkItemRequest(int id, DocumentRequest request) {
|
||||
this.id = id;
|
||||
this.request = request;
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ public class BulkItemRequest implements Streamable {
|
|||
return id;
|
||||
}
|
||||
|
||||
public DocumentRequest<?> request() {
|
||||
public DocumentRequest request() {
|
||||
return request;
|
||||
}
|
||||
|
||||
|
|
|
@ -250,24 +250,24 @@ public class BulkProcessor implements Closeable {
|
|||
* (for example, if no id is provided, one will be generated, or usage of the create flag).
|
||||
*/
|
||||
public BulkProcessor add(IndexRequest request) {
|
||||
return add((DocumentRequest<?>) request);
|
||||
return add((DocumentRequest) request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an {@link DeleteRequest} to the list of actions to execute.
|
||||
*/
|
||||
public BulkProcessor add(DeleteRequest request) {
|
||||
return add((DocumentRequest<?>) request);
|
||||
return add((DocumentRequest) request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds either a delete or an index request.
|
||||
*/
|
||||
public BulkProcessor add(DocumentRequest<?> request) {
|
||||
public BulkProcessor add(DocumentRequest request) {
|
||||
return add(request, null);
|
||||
}
|
||||
|
||||
public BulkProcessor add(DocumentRequest<?> request, @Nullable Object payload) {
|
||||
public BulkProcessor add(DocumentRequest request, @Nullable Object payload) {
|
||||
internalAdd(request, payload);
|
||||
return this;
|
||||
}
|
||||
|
@ -282,7 +282,7 @@ public class BulkProcessor implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized void internalAdd(DocumentRequest<?> request, @Nullable Object payload) {
|
||||
private synchronized void internalAdd(DocumentRequest request, @Nullable Object payload) {
|
||||
ensureOpen();
|
||||
bulkRequest.add(request, payload);
|
||||
executeIfNeeded();
|
||||
|
|
|
@ -72,7 +72,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
|||
* {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare
|
||||
* the one with the least casts.
|
||||
*/
|
||||
final List<DocumentRequest<?>> requests = new ArrayList<>();
|
||||
final List<DocumentRequest> requests = new ArrayList<>();
|
||||
List<Object> payloads = null;
|
||||
|
||||
protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
|
||||
|
@ -87,14 +87,14 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
|||
/**
|
||||
* Adds a list of requests to be executed. Either index or delete requests.
|
||||
*/
|
||||
public BulkRequest add(DocumentRequest<?>... requests) {
|
||||
for (DocumentRequest<?> request : requests) {
|
||||
public BulkRequest add(DocumentRequest... requests) {
|
||||
for (DocumentRequest request : requests) {
|
||||
add(request, null);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public BulkRequest add(DocumentRequest<?> request) {
|
||||
public BulkRequest add(DocumentRequest request) {
|
||||
return add(request, null);
|
||||
}
|
||||
|
||||
|
@ -104,7 +104,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
|||
* @param payload Optional payload
|
||||
* @return the current bulk request
|
||||
*/
|
||||
public BulkRequest add(DocumentRequest<?> request, @Nullable Object payload) {
|
||||
public BulkRequest add(DocumentRequest request, @Nullable Object payload) {
|
||||
if (request instanceof IndexRequest) {
|
||||
add((IndexRequest) request, payload);
|
||||
} else if (request instanceof DeleteRequest) {
|
||||
|
@ -120,8 +120,8 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
|||
/**
|
||||
* Adds a list of requests to be executed. Either index or delete requests.
|
||||
*/
|
||||
public BulkRequest add(Iterable<DocumentRequest<?>> requests) {
|
||||
for (DocumentRequest<?> request : requests) {
|
||||
public BulkRequest add(Iterable<DocumentRequest> requests) {
|
||||
for (DocumentRequest request : requests) {
|
||||
add(request);
|
||||
}
|
||||
return this;
|
||||
|
@ -207,7 +207,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
|||
/**
|
||||
* The list of requests in this bulk request.
|
||||
*/
|
||||
public List<DocumentRequest<?>> requests() {
|
||||
public List<DocumentRequest> requests() {
|
||||
return this.requests;
|
||||
}
|
||||
|
||||
|
@ -508,7 +508,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
|||
* @return Whether this bulk request contains index request with an ingest pipeline enabled.
|
||||
*/
|
||||
public boolean hasIndexRequestsWithPipelines() {
|
||||
for (DocumentRequest<?> actionRequest : requests) {
|
||||
for (DocumentRequest actionRequest : requests) {
|
||||
if (actionRequest instanceof IndexRequest) {
|
||||
IndexRequest indexRequest = (IndexRequest) actionRequest;
|
||||
if (Strings.hasText(indexRequest.getPipeline())) {
|
||||
|
@ -526,7 +526,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
|||
if (requests.isEmpty()) {
|
||||
validationException = addValidationError("no requests added", validationException);
|
||||
}
|
||||
for (DocumentRequest<?> request : requests) {
|
||||
for (DocumentRequest request : requests) {
|
||||
// We first check if refresh has been set
|
||||
if (((WriteRequest<?>) request).getRefreshPolicy() != RefreshPolicy.NONE) {
|
||||
validationException = addValidationError(
|
||||
|
@ -561,7 +561,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
|
|||
super.writeTo(out);
|
||||
waitForActiveShards.writeTo(out);
|
||||
out.writeVInt(requests.size());
|
||||
for (DocumentRequest<?> request : requests) {
|
||||
for (DocumentRequest request : requests) {
|
||||
DocumentRequest.writeDocumentRequest(out, request);
|
||||
}
|
||||
refreshPolicy.writeTo(out);
|
||||
|
|
|
@ -145,7 +145,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
if (!(ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException)) {
|
||||
// fail all requests involving this index, if create didnt work
|
||||
for (int i = 0; i < bulkRequest.requests.size(); i++) {
|
||||
DocumentRequest<?> request = bulkRequest.requests.get(i);
|
||||
DocumentRequest request = bulkRequest.requests.get(i);
|
||||
if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
|
||||
bulkRequest.requests.set(i, null);
|
||||
}
|
||||
|
@ -180,7 +180,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
return autoCreateIndex.shouldAutoCreate(index, state);
|
||||
}
|
||||
|
||||
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocumentRequest<?> request, String index, Exception e) {
|
||||
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocumentRequest request, String index, Exception e) {
|
||||
if (index.equals(request.index())) {
|
||||
responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(), request.id(), e)));
|
||||
return true;
|
||||
|
@ -211,7 +211,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
|
||||
MetaData metaData = clusterState.metaData();
|
||||
for (int i = 0; i < bulkRequest.requests.size(); i++) {
|
||||
DocumentRequest<?> documentRequest = bulkRequest.requests.get(i);
|
||||
DocumentRequest documentRequest = bulkRequest.requests.get(i);
|
||||
//the request can only be null because we set it to null in the previous step, so it gets ignored
|
||||
if (documentRequest == null) {
|
||||
continue;
|
||||
|
@ -253,7 +253,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
// first, go over all the requests and create a ShardId -> Operations mapping
|
||||
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
|
||||
for (int i = 0; i < bulkRequest.requests.size(); i++) {
|
||||
DocumentRequest<?> request = bulkRequest.requests.get(i);
|
||||
DocumentRequest request = bulkRequest.requests.get(i);
|
||||
if (request == null) {
|
||||
continue;
|
||||
}
|
||||
|
@ -300,7 +300,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
// create failures for all relevant requests
|
||||
for (BulkItemRequest request : requests) {
|
||||
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
|
||||
DocumentRequest<?> documentRequest = request.request();
|
||||
DocumentRequest documentRequest = request.request();
|
||||
responses.set(request.id(), new BulkItemResponse(request.id(), documentRequest.opType(),
|
||||
new BulkItemResponse.Failure(indexName, documentRequest.type(), documentRequest.id(), e)));
|
||||
}
|
||||
|
|
|
@ -147,14 +147,14 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
if (retryPrimaryException(e)) {
|
||||
// restore updated versions...
|
||||
for (int j = 0; j < requestIndex; j++) {
|
||||
DocumentRequest<?> documentRequest = request.items()[j].request();
|
||||
DocumentRequest documentRequest = request.items()[j].request();
|
||||
documentRequest.version(preVersions[j]);
|
||||
documentRequest.versionType(preVersionTypes[j]);
|
||||
}
|
||||
throw (ElasticsearchException) e;
|
||||
}
|
||||
BulkItemRequest item = request.items()[requestIndex];
|
||||
DocumentRequest<?> documentRequest = item.request();
|
||||
DocumentRequest documentRequest = item.request();
|
||||
if (isConflictException(e)) {
|
||||
logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
|
||||
request.shardId(), documentRequest.opType().getLowercase(), request), e);
|
||||
|
@ -179,7 +179,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
|
||||
private WriteResult<? extends DocWriteResponse> innerExecuteBulkItemRequest(IndexMetaData metaData, IndexShard indexShard,
|
||||
BulkShardRequest request, int requestIndex) throws Exception {
|
||||
DocumentRequest<?> itemRequest = request.items()[requestIndex].request();
|
||||
DocumentRequest itemRequest = request.items()[requestIndex].request();
|
||||
switch (itemRequest.opType()) {
|
||||
case CREATE:
|
||||
case INDEX:
|
||||
|
@ -268,7 +268,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
if (item == null || item.isIgnoreOnReplica()) {
|
||||
continue;
|
||||
}
|
||||
DocumentRequest<?> documentRequest = item.request();
|
||||
DocumentRequest documentRequest = item.request();
|
||||
final Engine.Operation operation;
|
||||
try {
|
||||
switch (documentRequest.opType()) {
|
||||
|
|
|
@ -135,7 +135,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
|||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
static final class BulkRequestModifier implements Iterator<DocumentRequest<?>> {
|
||||
static final class BulkRequestModifier implements Iterator<DocumentRequest> {
|
||||
|
||||
final BulkRequest bulkRequest;
|
||||
final Set<Integer> failedSlots;
|
||||
|
@ -151,7 +151,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
|||
}
|
||||
|
||||
@Override
|
||||
public DocumentRequest<?> next() {
|
||||
public DocumentRequest next() {
|
||||
return bulkRequest.requests().get(++currentSlot);
|
||||
}
|
||||
|
||||
|
@ -172,7 +172,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
|||
int slot = 0;
|
||||
originalSlots = new int[bulkRequest.requests().size() - failedSlots.size()];
|
||||
for (int i = 0; i < bulkRequest.requests().size(); i++) {
|
||||
DocumentRequest<?> request = bulkRequest.requests().get(i);
|
||||
DocumentRequest request = bulkRequest.requests().get(i);
|
||||
if (failedSlots.contains(i) == false) {
|
||||
modifiedBulkRequest.add(request);
|
||||
originalSlots[slot++] = i;
|
||||
|
|
|
@ -68,7 +68,7 @@ public class PipelineExecutionService implements ClusterStateListener {
|
|||
});
|
||||
}
|
||||
|
||||
public void executeBulkRequest(Iterable<DocumentRequest<?>> actionRequests,
|
||||
public void executeBulkRequest(Iterable<DocumentRequest> actionRequests,
|
||||
BiConsumer<IndexRequest, Exception> itemFailureHandler,
|
||||
Consumer<Exception> completionHandler) {
|
||||
threadPool.executor(ThreadPool.Names.BULK).execute(new AbstractRunnable() {
|
||||
|
@ -80,7 +80,7 @@ public class PipelineExecutionService implements ClusterStateListener {
|
|||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
for (DocumentRequest<?> actionRequest : actionRequests) {
|
||||
for (DocumentRequest actionRequest : actionRequests) {
|
||||
if ((actionRequest instanceof IndexRequest)) {
|
||||
IndexRequest indexRequest = (IndexRequest) actionRequest;
|
||||
if (Strings.hasText(indexRequest.getPipeline())) {
|
||||
|
|
|
@ -113,7 +113,7 @@ public class BulkRequestTests extends ESTestCase {
|
|||
|
||||
public void testBulkAddIterable() {
|
||||
BulkRequest bulkRequest = Requests.bulkRequest();
|
||||
List<DocumentRequest<?>> requests = new ArrayList<>();
|
||||
List<DocumentRequest> requests = new ArrayList<>();
|
||||
requests.add(new IndexRequest("test", "test", "id").source("field", "value"));
|
||||
requests.add(new UpdateRequest("test", "test", "id").doc("field", "value"));
|
||||
requests.add(new DeleteRequest("test", "test", "id"));
|
||||
|
|
Loading…
Reference in New Issue