Reindex: More digging on flaky test
This adds more logging and a missing assertion to a flaky reindex test.
This commit is contained in:
parent
a2f3c274bf
commit
6f64e9728b
|
@ -19,12 +19,14 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.reindex;
|
package org.elasticsearch.index.reindex;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.action.ActionFuture;
|
import org.elasticsearch.action.ActionFuture;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||||
import org.elasticsearch.action.ingest.DeletePipelineRequest;
|
import org.elasticsearch.action.ingest.DeletePipelineRequest;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.index.IndexModule;
|
import org.elasticsearch.index.IndexModule;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
|
@ -107,7 +109,7 @@ public class CancelTests extends ReindexTestCase {
|
||||||
* to be modified by the reindex action. That way at least one worker
|
* to be modified by the reindex action. That way at least one worker
|
||||||
* is blocked. */
|
* is blocked. */
|
||||||
int numModifiedDocs = randomIntBetween(builder.request().getSlices() * 2, numDocs);
|
int numModifiedDocs = randomIntBetween(builder.request().getSlices() * 2, numDocs);
|
||||||
logger.debug("chose to modify [{}] docs", numModifiedDocs);
|
logger.debug("chose to modify [{}] out of [{}] docs", numModifiedDocs, numDocs);
|
||||||
ALLOWED_OPERATIONS.release(numModifiedDocs - builder.request().getSlices());
|
ALLOWED_OPERATIONS.release(numModifiedDocs - builder.request().getSlices());
|
||||||
|
|
||||||
// Now execute the reindex action...
|
// Now execute the reindex action...
|
||||||
|
@ -118,7 +120,8 @@ public class CancelTests extends ReindexTestCase {
|
||||||
* exhausted their slice while others might have quite a bit left
|
* exhausted their slice while others might have quite a bit left
|
||||||
* to work on. We can't control that. */
|
* to work on. We can't control that. */
|
||||||
logger.debug("waiting for updates to be blocked");
|
logger.debug("waiting for updates to be blocked");
|
||||||
awaitBusy(() -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0);
|
boolean blocked = awaitBusy(() -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0);
|
||||||
|
assertTrue("updates blocked", blocked);
|
||||||
|
|
||||||
// Status should show the task running
|
// Status should show the task running
|
||||||
TaskInfo mainTask = findTaskToCancel(action, builder.request().getSlices());
|
TaskInfo mainTask = findTaskToCancel(action, builder.request().getSlices());
|
||||||
|
@ -128,7 +131,7 @@ public class CancelTests extends ReindexTestCase {
|
||||||
// Description shouldn't be empty
|
// Description shouldn't be empty
|
||||||
assertThat(mainTask.getDescription(), taskDescriptionMatcher);
|
assertThat(mainTask.getDescription(), taskDescriptionMatcher);
|
||||||
|
|
||||||
// Cancel the request while the reindex action is blocked by the indexing operation listeners.
|
// Cancel the request while the action is blocked by the indexing operation listeners.
|
||||||
// This will prevent further requests from being sent.
|
// This will prevent further requests from being sent.
|
||||||
ListTasksResponse cancelTasksResponse = client().admin().cluster().prepareCancelTasks().setTaskId(mainTask.getTaskId()).get();
|
ListTasksResponse cancelTasksResponse = client().admin().cluster().prepareCancelTasks().setTaskId(mainTask.getTaskId()).get();
|
||||||
cancelTasksResponse.rethrowFailures("Cancel");
|
cancelTasksResponse.rethrowFailures("Cancel");
|
||||||
|
@ -293,6 +296,7 @@ public class CancelTests extends ReindexTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class BlockingOperationListener implements IndexingOperationListener {
|
public static class BlockingOperationListener implements IndexingOperationListener {
|
||||||
|
private static final Logger log = Loggers.getLogger(CancelTests.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Engine.Index preIndex(ShardId shardId, Engine.Index index) {
|
public Engine.Index preIndex(ShardId shardId, Engine.Index index) {
|
||||||
|
@ -310,7 +314,9 @@ public class CancelTests extends ReindexTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
log.debug("checking");
|
||||||
if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) {
|
if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) {
|
||||||
|
log.debug("passed");
|
||||||
return operation;
|
return operation;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
Loading…
Reference in New Issue