Fix NPE when rejecting bulk updates (#42923)

Single updates use a different internal code path than updates that are wrapped in a bulk request.
While working on a refactoring to bring both closer together I've noticed that bulk updates were
failing some of the tests that single updates passed. In particular, bulk updates cause
NullPointerExceptions to be thrown and listeners not being properly notified when being rejected
from the thread pool.
This commit is contained in:
Yannick Welsch 2019-06-06 14:04:45 +02:00
parent ba8bd8dfbe
commit 72735be673
2 changed files with 14 additions and 3 deletions

View File

@ -342,11 +342,10 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
BulkItemResponse operationResponse,
final UpdateHelper.Result translate) {
final BulkItemResponse response;
DocWriteResponse.Result translatedResult = translate.getResponseResult();
if (operationResponse.isFailed()) {
response = new BulkItemResponse(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, operationResponse.getFailure());
} else {
final DocWriteResponse.Result translatedResult = translate.getResponseResult();
final UpdateResponse updateResponse;
if (translatedResult == DocWriteResponse.Result.CREATED || translatedResult == DocWriteResponse.Result.UPDATED) {
final IndexRequest updateIndexRequest = translate.action();

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
@ -700,7 +701,18 @@ public class UpdateIT extends ESIntegTestCase {
.setRetryOnConflict(retryOnConflict)
.setUpsert(jsonBuilder().startObject().field("field", 1).endObject())
.request();
client().update(ur, new UpdateListener(j));
if (randomBoolean()) {
client().update(ur, new UpdateListener(j));
} else {
client().prepareBulk().add(ur).execute(ActionListener.map(new UpdateListener(j), br -> {
final BulkItemResponse ir = br.getItems()[0];
if (ir.isFailed()) {
throw ir.getFailure().getCause();
} else {
return ir.getResponse();
}
}));
}
} catch (NoNodeAvailableException nne) {
updateRequestsOutstanding.release();
synchronized (failedMap) {