Shard level tasks in Bulk Action lose reference to their parent tasks

During the bulk action a hierachy of tasks is getting created: bulk->bulk[s] (coord node) -> bulk[s] (primary shard node) -> bulk[s][p] and bulk[s][r]. Due to a bug the first bulk[s] task didn't have bulk's task id is set as a parent id.  This commit fixes this bug.
This commit is contained in:
Igor Motov 2016-04-13 20:42:37 -04:00
parent 24130e1f30
commit f030796b0b
3 changed files with 81 additions and 14 deletions

View File

@ -52,6 +52,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -107,7 +108,12 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
@Override
protected void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
protected final void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
throw new UnsupportedOperationException("task parameter is required for this operation");
}
@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
@ -143,7 +149,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
try {
executeBulk(bulkRequest, startTime, listener, responses);
executeBulk(task, bulkRequest, startTime, listener, responses);
} catch (Throwable t) {
listener.onFailure(t);
}
@ -163,7 +169,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
if (counter.decrementAndGet() == 0) {
try {
executeBulk(bulkRequest, startTime, listener, responses);
executeBulk(task, bulkRequest, startTime, listener, responses);
} catch (Throwable t) {
listener.onFailure(t);
}
@ -172,12 +178,12 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
});
} else {
if (counter.decrementAndGet() == 0) {
executeBulk(bulkRequest, startTime, listener, responses);
executeBulk(task, bulkRequest, startTime, listener, responses);
}
}
}
} else {
executeBulk(bulkRequest, startTime, listener, responses);
executeBulk(task, bulkRequest, startTime, listener, responses);
}
}
@ -222,14 +228,14 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
*/
public void executeBulk(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
final long startTimeNanos = relativeTime();
executeBulk(bulkRequest, startTimeNanos, listener, new AtomicArray<>(bulkRequest.requests.size()));
executeBulk(null, bulkRequest, startTimeNanos, listener, new AtomicArray<>(bulkRequest.requests.size()));
}
private long buildTookInMillis(long startTimeNanos) {
return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos);
}
void executeBulk(final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
final ClusterState clusterState = clusterService.state();
// TODO use timeout to wait here if its blocked...
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
@ -333,12 +339,16 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
String nodeId = clusterService.localNode().getId();
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(bulkRequest, shardId, bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
bulkShardRequest.timeout(bulkRequest.timeout());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.fieldstats.FieldStatsAction;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -61,6 +62,7 @@ import java.util.function.Function;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -246,6 +248,55 @@ public class TasksIT extends ESIntegTestCase {
}
}
public void testTransportBulkTasks() {
registerTaskManageListeners(BulkAction.NAME); // main task
registerTaskManageListeners(BulkAction.NAME + "[s]"); // shard task
registerTaskManageListeners(BulkAction.NAME + "[s][p]"); // shard task on primary
registerTaskManageListeners(BulkAction.NAME + "[s][r]"); // shard task on replica
createIndex("test");
ensureGreen("test"); // Make sure all shards are allocated to catch replication tasks
client().prepareBulk().add(client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}")).get();
// the bulk operation should produce one main task
assertEquals(1, numberOfEvents(BulkAction.NAME, Tuple::v1));
// we should also get 1 or 2 [s] operation with main operation as a parent
// in case the primary is located on the coordinating node we will have 1 operation, otherwise - 2
List<TaskInfo> shardTasks = findEvents(BulkAction.NAME + "[s]", Tuple::v1);
assertThat(shardTasks.size(), allOf(lessThanOrEqualTo(2), greaterThanOrEqualTo(1)));
// Select the effective shard task
TaskInfo shardTask;
if (shardTasks.size() == 1) {
// we have only one task - it's going to be the parent task for all [s][p] and [s][r] tasks
shardTask = shardTasks.get(0);
// and it should have the main task as a parent
assertParentTask(shardTask, findEvents(BulkAction.NAME, Tuple::v1).get(0));
} else {
if (shardTasks.get(0).getParentTaskId().equals(shardTasks.get(1).getTaskId())) {
// task 1 is the parent of task 0, that means that task 0 will control [s][p] and [s][r] tasks
shardTask = shardTasks.get(0);
// in turn the parent of the task 1 should be the main task
assertParentTask(shardTasks.get(1), findEvents(BulkAction.NAME, Tuple::v1).get(0));
} else {
// otherwise task 1 will control [s][p] and [s][r] tasks
shardTask = shardTasks.get(1);
// in turn the parent of the task 0 should be the main task
assertParentTask(shardTasks.get(0), findEvents(BulkAction.NAME, Tuple::v1).get(0));
}
}
// we should also get one [s][p] operation with shard operation as a parent
assertEquals(1, numberOfEvents(BulkAction.NAME + "[s][p]", Tuple::v1));
assertParentTask(findEvents(BulkAction.NAME + "[s][p]", Tuple::v1), shardTask);
// we should get as many [s][r] operations as we have replica shards
// they all should have the same shard task as a parent
assertEquals(getNumShards("test").numReplicas, numberOfEvents(BulkAction.NAME + "[s][r]", Tuple::v1));
assertParentTask(findEvents(BulkAction.NAME + "[s][r]", Tuple::v1), shardTask);
}
/**
* Very basic "is it plugged in" style test that indexes a document and
* makes sure that you can fetch the status of the process. The goal here is
@ -487,10 +538,14 @@ public class TasksIT extends ESIntegTestCase {
*/
private void assertParentTask(List<TaskInfo> tasks, TaskInfo parentTask) {
for (TaskInfo task : tasks) {
assertTrue(task.getParentTaskId().isSet());
assertEquals(parentTask.getNode().getId(), task.getParentTaskId().getNodeId());
assertTrue(Strings.hasLength(task.getParentTaskId().getNodeId()));
assertEquals(parentTask.getId(), task.getParentTaskId().getId());
assertParentTask(task, parentTask);
}
}
private void assertParentTask(TaskInfo task, TaskInfo parentTask) {
assertTrue(task.getParentTaskId().isSet());
assertEquals(parentTask.getNode().getId(), task.getParentTaskId().getNodeId());
assertTrue(Strings.hasLength(task.getParentTaskId().getNodeId()));
assertEquals(parentTask.getId(), task.getParentTaskId().getId());
}
}

View File

@ -122,12 +122,13 @@ public class TransportBulkActionTookTests extends ESTestCase {
@Override
void executeBulk(
Task task,
BulkRequest bulkRequest,
long startTimeNanos,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses) {
expected.set(1000000);
super.executeBulk(bulkRequest, startTimeNanos, listener, responses);
super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses);
}
};
} else {
@ -151,13 +152,14 @@ public class TransportBulkActionTookTests extends ESTestCase {
@Override
void executeBulk(
Task task,
BulkRequest bulkRequest,
long startTimeNanos,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses) {
long elapsed = spinForAtLeastOneMillisecond();
expected.set(elapsed);
super.executeBulk(bulkRequest, startTimeNanos, listener, responses);
super.executeBulk(task, bulkRequest, startTimeNanos, listener, responses);
}
};
}
@ -183,7 +185,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
bulkRequest.add(bulkAction.getBytes(StandardCharsets.UTF_8), 0, bulkAction.length(), null, null);
AtomicLong expected = new AtomicLong();
TransportBulkAction action = createAction(controlled, expected);
action.doExecute(bulkRequest, new ActionListener<BulkResponse>() {
action.doExecute(null, bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
if (controlled) {