Add more debugging information to rethrottles

I'm still trying to track down failures like:
https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+dockeralpine-periodic/1180/console

It looks like a task is hanging but I'm not sure why. So this
adds more logging for next time.
This commit is contained in:
Nik Everett 2017-04-11 16:36:12 -04:00
parent 1847bbac4d
commit e99f90fb46
5 changed files with 15 additions and 21 deletions

View File

@ -211,16 +211,12 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
@Override
public void rethrottle(float newRequestsPerSecond) {
synchronized (delayedPrepareBulkRequestReference) {
if (logger.isDebugEnabled()) {
logger.debug("[{}]: Rethrottling to [{}] requests per second", getId(), newRequestsPerSecond);
}
logger.debug("[{}]: rethrottling to [{}] requests per second", getId(), newRequestsPerSecond);
setRequestsPerSecond(newRequestsPerSecond);
DelayedPrepareBulkRequest delayedPrepareBulkRequest = this.delayedPrepareBulkRequestReference.get();
if (delayedPrepareBulkRequest == null) {
if (logger.isDebugEnabled()) {
logger.debug("[{}]: Skipping rescheduling because there is no scheduled task", getId());
}
logger.debug("[{}]: skipping rescheduling because there is no scheduled task", getId());
// No request has been queued yet so nothing to reschedule.
return;
}
@ -259,10 +255,8 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
* The user is attempting to slow the request down. We'll let the change in throttle take effect the next time we delay
* prepareBulkRequest. We can't just reschedule the request further out in the future the bulk context might time out.
*/
if (logger.isDebugEnabled()) {
logger.debug("[{}]: Skipping rescheduling because the new throttle [{}] is slower than the old one [{}].", getId(),
newRequestsPerSecond, requestsPerSecond);
}
logger.debug("[{}]: skipping rescheduling because the new throttle [{}] is slower than the old one [{}]", getId(),
newRequestsPerSecond, requestsPerSecond);
return this;
}
@ -270,9 +264,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
// Actually reschedule the task
if (false == FutureUtils.cancel(future)) {
// Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here.
if (logger.isDebugEnabled()) {
logger.debug("[{}]: Skipping rescheduling we couldn't cancel the task.", getId());
}
logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", getId());
return this;
}
@ -281,9 +273,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
* test it you'll find that requests sneak through. So each request is given a runOnce boolean to prevent that.
*/
TimeValue newDelay = newDelay(remainingDelay, newRequestsPerSecond);
if (logger.isDebugEnabled()) {
logger.debug("[{}]: Rescheduling for [{}] in the future.", getId(), newDelay);
}
logger.debug("[{}]: rescheduling for [{}] in the future", getId(), newDelay);
return new DelayedPrepareBulkRequest(threadPool, requestsPerSecond, newDelay, command);
}

View File

@ -168,7 +168,7 @@ public class WorkingBulkByScrollTaskTests extends ESTestCase {
}
});
// Rethrottle on a random number of threads, on of which is this thread.
// Rethrottle on a random number of threads, one of which is this thread.
Runnable test = () -> {
try {
int rethrottles = 0;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.reindex;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
@ -54,14 +55,14 @@ public class TransportRethrottleAction extends TransportTasksAction<BulkByScroll
@Override
protected void taskOperation(RethrottleRequest request, BulkByScrollTask task, ActionListener<TaskInfo> listener) {
rethrottle(clusterService.localNode().getId(), client, task, request.getRequestsPerSecond(), listener);
rethrottle(logger, clusterService.localNode().getId(), client, task, request.getRequestsPerSecond(), listener);
}
static void rethrottle(String localNodeId, Client client, BulkByScrollTask task, float newRequestsPerSecond,
static void rethrottle(Logger logger, String localNodeId, Client client, BulkByScrollTask task, float newRequestsPerSecond,
ActionListener<TaskInfo> listener) {
int runningSubTasks = task.runningSliceSubTasks();
if (runningSubTasks == 0) {
// Nothing to do, all sub tasks are done
logger.debug("rethrottling local task [{}] to [{}] requests per second", task.getId(), newRequestsPerSecond);
task.rethrottle(newRequestsPerSecond);
listener.onResponse(task.taskInfo(localNodeId, true));
return;
@ -69,6 +70,7 @@ public class TransportRethrottleAction extends TransportTasksAction<BulkByScroll
RethrottleRequest subRequest = new RethrottleRequest();
subRequest.setRequestsPerSecond(newRequestsPerSecond / runningSubTasks);
subRequest.setParentTaskId(new TaskId(localNodeId, task.getId()));
logger.debug("rethrottling children of task [{}] to [{}] requests per second", task.getId(), subRequest.getRequestsPerSecond());
client.execute(RethrottleAction.INSTANCE, subRequest, ActionListener.wrap(r -> {
r.rethrowFailures("Rethrottle");
listener.onResponse(task.getInfoGivenSliceInfo(localNodeId, r.getTasks()));

View File

@ -38,6 +38,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.ingest.IngestTestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.hamcrest.Matcher;
import org.junit.Before;
@ -60,6 +61,7 @@ import static org.hamcrest.Matchers.hasSize;
* different cancellation places - that is the responsibility of AsyncBulkByScrollActionTests which have more precise control to
* simulate failures but do not exercise important portion of the stack like transport and task management.
*/
@TestLogging("org.elasticsearch.action.bulk.byscroll:DEBUG,org.elasticsearch.index.reindex:DEBUG")
public class CancelTests extends ReindexTestCase {
protected static final String INDEX = "reindex-cancel-index";

View File

@ -73,7 +73,7 @@ public class TransportRethrottleActionTests extends ESTestCase {
@SuppressWarnings("unchecked")
ActionListener<TaskInfo> listener = mock(ActionListener.class);
TransportRethrottleAction.rethrottle(localNodeId, client, task, newRequestsPerSecond, listener);
TransportRethrottleAction.rethrottle(logger, localNodeId, client, task, newRequestsPerSecond, listener);
// Capture the sub request and the listener so we can verify they are sane
ArgumentCaptor<RethrottleRequest> subRequest = ArgumentCaptor.forClass(RethrottleRequest.class);