Maxing out retries on conflict in bulk update cause null pointer exceptions
Also: Bulk update one less retry then requested Document for retries on conflict says it default to 1 (but default is 0) TransportShardReplicationOperationAction methods now catches Throwables instead of exceptions Added a little extra check to UpdateTests.concurrentUpdateWithRetryOnConflict Closes #3447 & #3448
This commit is contained in:
parent
636c35d0d4
commit
43e374f793
|
@ -144,16 +144,16 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
|
|
||||||
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
|
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
|
||||||
long[] preVersions = new long[request.items().length];
|
long[] preVersions = new long[request.items().length];
|
||||||
for (int i = 0; i < request.items().length; i++) {
|
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
|
||||||
BulkItemRequest item = request.items()[i];
|
BulkItemRequest item = request.items()[requestIndex];
|
||||||
if (item.request() instanceof IndexRequest) {
|
if (item.request() instanceof IndexRequest) {
|
||||||
IndexRequest indexRequest = (IndexRequest) item.request();
|
IndexRequest indexRequest = (IndexRequest) item.request();
|
||||||
try {
|
try {
|
||||||
WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true);
|
WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true);
|
||||||
// add the response
|
// add the response
|
||||||
IndexResponse indexResponse = result.response();
|
IndexResponse indexResponse = result.response();
|
||||||
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse);
|
responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse);
|
||||||
preVersions[i] = result.preVersion;
|
preVersions[requestIndex] = result.preVersion;
|
||||||
if (result.mappingToUpdate != null) {
|
if (result.mappingToUpdate != null) {
|
||||||
if (mappingsToUpdate == null) {
|
if (mappingsToUpdate == null) {
|
||||||
mappingsToUpdate = Sets.newHashSet();
|
mappingsToUpdate = Sets.newHashSet();
|
||||||
|
@ -164,13 +164,13 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
if (ops == null) {
|
if (ops == null) {
|
||||||
ops = new Engine.IndexingOperation[request.items().length];
|
ops = new Engine.IndexingOperation[request.items().length];
|
||||||
}
|
}
|
||||||
ops[i] = result.op;
|
ops[requestIndex] = result.op;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||||
if (retryPrimaryException(e)) {
|
if (retryPrimaryException(e)) {
|
||||||
// restore updated versions...
|
// restore updated versions...
|
||||||
for (int j = 0; j < i; j++) {
|
for (int j = 0; j < requestIndex; j++) {
|
||||||
applyVersion(request.items()[j], preVersions[j]);
|
applyVersion(request.items()[j], preVersions[j]);
|
||||||
}
|
}
|
||||||
throw (ElasticSearchException) e;
|
throw (ElasticSearchException) e;
|
||||||
|
@ -180,22 +180,22 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
} else {
|
} else {
|
||||||
logger.debug("[{}][{}] failed to execute bulk item (index) {}", e, shardRequest.request.index(), shardRequest.shardId, indexRequest);
|
logger.debug("[{}][{}] failed to execute bulk item (index) {}", e, shardRequest.request.index(), shardRequest.shardId, indexRequest);
|
||||||
}
|
}
|
||||||
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
|
responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
|
||||||
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(e)));
|
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(e)));
|
||||||
// nullify the request so it won't execute on the replicas
|
// nullify the request so it won't execute on the replicas
|
||||||
request.items()[i] = null;
|
request.items()[requestIndex] = null;
|
||||||
}
|
}
|
||||||
} else if (item.request() instanceof DeleteRequest) {
|
} else if (item.request() instanceof DeleteRequest) {
|
||||||
DeleteRequest deleteRequest = (DeleteRequest) item.request();
|
DeleteRequest deleteRequest = (DeleteRequest) item.request();
|
||||||
try {
|
try {
|
||||||
// add the response
|
// add the response
|
||||||
DeleteResponse deleteResponse = shardDeleteOperation(deleteRequest, indexShard).response();
|
DeleteResponse deleteResponse = shardDeleteOperation(deleteRequest, indexShard).response();
|
||||||
responses[i] = new BulkItemResponse(item.id(), "delete", deleteResponse);
|
responses[requestIndex] = new BulkItemResponse(item.id(), "delete", deleteResponse);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||||
if (retryPrimaryException(e)) {
|
if (retryPrimaryException(e)) {
|
||||||
// restore updated versions...
|
// restore updated versions...
|
||||||
for (int j = 0; j < i; j++) {
|
for (int j = 0; j < requestIndex; j++) {
|
||||||
applyVersion(request.items()[j], preVersions[j]);
|
applyVersion(request.items()[j], preVersions[j]);
|
||||||
}
|
}
|
||||||
throw (ElasticSearchException) e;
|
throw (ElasticSearchException) e;
|
||||||
|
@ -205,15 +205,15 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
} else {
|
} else {
|
||||||
logger.debug("[{}][{}] failed to execute bulk item (delete) {}", e, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
|
logger.debug("[{}][{}] failed to execute bulk item (delete) {}", e, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
|
||||||
}
|
}
|
||||||
responses[i] = new BulkItemResponse(item.id(), "delete",
|
responses[requestIndex] = new BulkItemResponse(item.id(), "delete",
|
||||||
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(e)));
|
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(e)));
|
||||||
// nullify the request so it won't execute on the replicas
|
// nullify the request so it won't execute on the replicas
|
||||||
request.items()[i] = null;
|
request.items()[requestIndex] = null;
|
||||||
}
|
}
|
||||||
} else if (item.request() instanceof UpdateRequest) {
|
} else if (item.request() instanceof UpdateRequest) {
|
||||||
UpdateRequest updateRequest = (UpdateRequest) item.request();
|
UpdateRequest updateRequest = (UpdateRequest) item.request();
|
||||||
int retryCount = 0;
|
// We need to do the requested retries plus the initial attempt. We don't do < 1+retry_on_conflict because retry_on_conflict may be Integer.MAX_VALUE
|
||||||
do {
|
for (int updateAttemptsCount = 0; updateAttemptsCount <= updateRequest.retryOnConflict(); updateAttemptsCount++) {
|
||||||
UpdateResult updateResult;
|
UpdateResult updateResult;
|
||||||
try {
|
try {
|
||||||
updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard);
|
updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard);
|
||||||
|
@ -221,6 +221,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
updateResult = new UpdateResult(null, null, false, t, null);
|
updateResult = new UpdateResult(null, null, false, t, null);
|
||||||
}
|
}
|
||||||
if (updateResult.success()) {
|
if (updateResult.success()) {
|
||||||
|
|
||||||
switch (updateResult.result.operation()) {
|
switch (updateResult.result.operation()) {
|
||||||
case UPSERT:
|
case UPSERT:
|
||||||
case INDEX:
|
case INDEX:
|
||||||
|
@ -234,8 +235,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
|
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
|
||||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
||||||
}
|
}
|
||||||
responses[i] = new BulkItemResponse(item.id(), "update", updateResponse);
|
responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResponse);
|
||||||
preVersions[i] = result.preVersion;
|
preVersions[requestIndex] = result.preVersion;
|
||||||
if (result.mappingToUpdate != null) {
|
if (result.mappingToUpdate != null) {
|
||||||
if (mappingsToUpdate == null) {
|
if (mappingsToUpdate == null) {
|
||||||
mappingsToUpdate = Sets.newHashSet();
|
mappingsToUpdate = Sets.newHashSet();
|
||||||
|
@ -246,40 +247,50 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
if (ops == null) {
|
if (ops == null) {
|
||||||
ops = new Engine.IndexingOperation[request.items().length];
|
ops = new Engine.IndexingOperation[request.items().length];
|
||||||
}
|
}
|
||||||
ops[i] = result.op;
|
ops[requestIndex] = result.op;
|
||||||
}
|
}
|
||||||
// Replace the update request to the translated index request to execute on the replica.
|
// Replace the update request to the translated index request to execute on the replica.
|
||||||
request.items()[i] = new BulkItemRequest(request.items()[i].id(), indexRequest);
|
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
|
||||||
break;
|
break;
|
||||||
case DELETE:
|
case DELETE:
|
||||||
DeleteResponse response = updateResult.writeResult.response();
|
DeleteResponse response = updateResult.writeResult.response();
|
||||||
DeleteRequest deleteRequest = updateResult.request();
|
DeleteRequest deleteRequest = updateResult.request();
|
||||||
updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
|
updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
|
||||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
|
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
|
||||||
responses[i] = new BulkItemResponse(item.id(), "update", updateResponse);
|
responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResponse);
|
||||||
// Replace the update request to the translated delete request to execute on the replica.
|
// Replace the update request to the translated delete request to execute on the replica.
|
||||||
request.items()[i] = new BulkItemRequest(request.items()[i].id(), deleteRequest);
|
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
|
||||||
break;
|
break;
|
||||||
case NONE:
|
case NONE:
|
||||||
responses[i] = new BulkItemResponse(item.id(), "update", updateResult.noopResult);
|
responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResult.noopResult);
|
||||||
request.items()[i] = null; // No need to go to the replica
|
request.items()[requestIndex] = null; // No need to go to the replica
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
// NOTE: Breaking out of the retry_on_conflict loop!
|
// NOTE: Breaking out of the retry_on_conflict loop!
|
||||||
break;
|
break;
|
||||||
} else if (updateResult.failure()) {
|
} else if (updateResult.failure()) {
|
||||||
Throwable t = updateResult.error;
|
Throwable t = updateResult.error;
|
||||||
if (!updateResult.retry) {
|
if (updateResult.retry) {
|
||||||
|
// updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration
|
||||||
|
if (updateAttemptsCount >= updateRequest.retryOnConflict()) {
|
||||||
|
// we can't try any more
|
||||||
|
responses[requestIndex] = new BulkItemResponse(item.id(), "update",
|
||||||
|
new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t)));;
|
||||||
|
|
||||||
|
request.items()[requestIndex] = null; // do not send to replicas
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
|
||||||
if (retryPrimaryException(t)) {
|
if (retryPrimaryException(t)) {
|
||||||
// restore updated versions...
|
// restore updated versions...
|
||||||
for (int j = 0; j < i; j++) {
|
for (int j = 0; j < requestIndex; j++) {
|
||||||
applyVersion(request.items()[j], preVersions[j]);
|
applyVersion(request.items()[j], preVersions[j]);
|
||||||
}
|
}
|
||||||
throw (ElasticSearchException) t;
|
throw (ElasticSearchException) t;
|
||||||
}
|
}
|
||||||
if (updateResult.result == null) {
|
if (updateResult.result == null) {
|
||||||
responses[i] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t)));
|
responses[requestIndex] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t)));
|
||||||
} else {
|
} else {
|
||||||
switch (updateResult.result.operation()) {
|
switch (updateResult.result.operation()) {
|
||||||
case UPSERT:
|
case UPSERT:
|
||||||
|
@ -290,7 +301,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
} else {
|
} else {
|
||||||
logger.debug("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest);
|
logger.debug("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest);
|
||||||
}
|
}
|
||||||
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
|
responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
|
||||||
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(t)));
|
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(t)));
|
||||||
break;
|
break;
|
||||||
case DELETE:
|
case DELETE:
|
||||||
|
@ -300,19 +311,23 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
} else {
|
} else {
|
||||||
logger.debug("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
|
logger.debug("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
|
||||||
}
|
}
|
||||||
responses[i] = new BulkItemResponse(item.id(), "delete",
|
responses[requestIndex] = new BulkItemResponse(item.id(), "delete",
|
||||||
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(t)));
|
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(t)));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// nullify the request so it won't execute on the replicas
|
// nullify the request so it won't execute on the replicas
|
||||||
request.items()[i] = null;
|
request.items()[requestIndex] = null;
|
||||||
// NOTE: Breaking out of the retry_on_conflict loop!
|
// NOTE: Breaking out of the retry_on_conflict loop!
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
} while (++retryCount < updateRequest.retryOnConflict());
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert responses[requestIndex] != null; // we must have set a response somewhere.
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mappingsToUpdate != null) {
|
if (mappingsToUpdate != null) {
|
||||||
|
|
|
@ -218,7 +218,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
||||||
public void onFailure(Throwable e) {
|
public void onFailure(Throwable e) {
|
||||||
try {
|
try {
|
||||||
channel.sendResponse(e);
|
channel.sendResponse(e);
|
||||||
} catch (Exception e1) {
|
} catch (Throwable e1) {
|
||||||
logger.warn("Failed to send response for " + transportAction, e1);
|
logger.warn("Failed to send response for " + transportAction, e1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -520,7 +520,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
||||||
try {
|
try {
|
||||||
PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, request));
|
PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, request));
|
||||||
performReplicas(response);
|
performReplicas(response);
|
||||||
} catch (Exception e) {
|
} catch (Throwable e) {
|
||||||
// shard has not been allocated yet, retry it here
|
// shard has not been allocated yet, retry it here
|
||||||
if (retryPrimaryException(e)) {
|
if (retryPrimaryException(e)) {
|
||||||
primaryOperationStarted.set(false);
|
primaryOperationStarted.set(false);
|
||||||
|
@ -691,7 +691,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
shardOperationOnReplica(shardRequest);
|
shardOperationOnReplica(shardRequest);
|
||||||
} catch (Exception e) {
|
} catch (Throwable e) {
|
||||||
if (!ignoreReplicaException(e)) {
|
if (!ignoreReplicaException(e)) {
|
||||||
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
|
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
|
||||||
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
|
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
|
||||||
|
@ -705,7 +705,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
shardOperationOnReplica(shardRequest);
|
shardOperationOnReplica(shardRequest);
|
||||||
} catch (Exception e) {
|
} catch (Throwable e) {
|
||||||
if (!ignoreReplicaException(e)) {
|
if (!ignoreReplicaException(e)) {
|
||||||
logger.warn("Failed to perform " + transportAction + " on replica" + shardIt.shardId(), e);
|
logger.warn("Failed to perform " + transportAction + " on replica" + shardIt.shardId(), e);
|
||||||
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
|
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
|
||||||
|
|
|
@ -281,7 +281,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the number of retries of a version conflict occurs because the document was updated between
|
* Sets the number of retries of a version conflict occurs because the document was updated between
|
||||||
* getting it and updating it. Defaults to 1.
|
* getting it and updating it. Defaults to 0.
|
||||||
*/
|
*/
|
||||||
public UpdateRequest retryOnConflict(int retryOnConflict) {
|
public UpdateRequest retryOnConflict(int retryOnConflict) {
|
||||||
this.retryOnConflict = retryOnConflict;
|
this.retryOnConflict = retryOnConflict;
|
||||||
|
|
|
@ -118,7 +118,7 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the number of retries of a version conflict occurs because the document was updated between
|
* Sets the number of retries of a version conflict occurs because the document was updated between
|
||||||
* getting it and updating it. Defaults to 1.
|
* getting it and updating it. Defaults to 0.
|
||||||
*/
|
*/
|
||||||
public UpdateRequestBuilder setRetryOnConflict(int retryOnConflict) {
|
public UpdateRequestBuilder setRetryOnConflict(int retryOnConflict) {
|
||||||
request.retryOnConflict(retryOnConflict);
|
request.retryOnConflict(retryOnConflict);
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package org.elasticsearch.test.integration.document;
|
package org.elasticsearch.test.integration.document;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
import org.elasticsearch.action.get.GetResponse;
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
|
@ -15,6 +16,9 @@ import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
|
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.BrokenBarrierException;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.hamcrest.Matchers.*;
|
import static org.hamcrest.Matchers.*;
|
||||||
|
|
||||||
|
@ -417,4 +421,46 @@ public class BulkTests extends AbstractSharedClusterTest {
|
||||||
assertThat(hits[0].getId(), equalTo("child1"));
|
assertThat(hits[0].getId(), equalTo("child1"));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailingVersionedUpdatedOnBulk() throws Exception {
|
||||||
|
createIndex("test");
|
||||||
|
index("test","type","1","field","1");
|
||||||
|
final BulkResponse[] responses = new BulkResponse[30];
|
||||||
|
final CyclicBarrier cyclicBarrier = new CyclicBarrier(responses.length);
|
||||||
|
Thread[] threads = new Thread[responses.length];
|
||||||
|
|
||||||
|
|
||||||
|
for (int i=0;i<responses.length;i++) {
|
||||||
|
final int threadID = i;
|
||||||
|
threads[threadID] = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
cyclicBarrier.await();
|
||||||
|
} catch (Exception e) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
BulkRequestBuilder requestBuilder = client().prepareBulk();
|
||||||
|
requestBuilder.add(client().prepareUpdate("test", "type", "1").setVersion(1).setDoc("field", threadID));
|
||||||
|
responses[threadID]=requestBuilder.get();
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
threads[threadID].start();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i=0;i < threads.length; i++) {
|
||||||
|
threads[i].join();
|
||||||
|
}
|
||||||
|
|
||||||
|
int successes = 0;
|
||||||
|
for (BulkResponse response : responses) {
|
||||||
|
if (!response.hasFailures()) successes ++;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(successes, equalTo(1));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -496,6 +496,7 @@ public class UpdateTests extends AbstractSharedClusterTest {
|
||||||
for (int i = 0; i < numberOfUpdatesPerThread; i++) {
|
for (int i = 0; i < numberOfUpdatesPerThread; i++) {
|
||||||
GetResponse response = client().prepareGet("test", "type1", Integer.toString(i)).execute().actionGet();
|
GetResponse response = client().prepareGet("test", "type1", Integer.toString(i)).execute().actionGet();
|
||||||
assertThat(response.getId(), equalTo(Integer.toString(i)));
|
assertThat(response.getId(), equalTo(Integer.toString(i)));
|
||||||
|
assertThat(response.isExists(), equalTo(true));
|
||||||
assertThat(response.getVersion(), equalTo((long) numberOfThreads));
|
assertThat(response.getVersion(), equalTo((long) numberOfThreads));
|
||||||
assertThat((Integer) response.getSource().get("field"), equalTo(numberOfThreads));
|
assertThat((Integer) response.getSource().get("field"), equalTo(numberOfThreads));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue