mirror of https://github.com/apache/lucene.git
SOLR-14524: Harden MultiThreadedOCPTest testFillWorkQueue() (#1548)
Make MultiThreadedOCPTest.testFillWorkQueue() less vulnerable to timing issues Co-authored-by: Ilan Ginzburg <iginzburg@salesforce.com>
This commit is contained in:
parent
7c55ba9547
commit
dec6922528
|
@ -29,7 +29,6 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest.SplitShard;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest.SplitShard;
|
||||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
|
||||||
import org.apache.solr.client.solrj.response.RequestStatusState;
|
import org.apache.solr.client.solrj.response.RequestStatusState;
|
||||||
import org.apache.solr.common.params.CollectionParams;
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
|
@ -77,17 +76,24 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
|
||||||
distributedQueue.offer(Utils.toJSON(Utils.makeMap(
|
distributedQueue.offer(Utils.toJSON(Utils.makeMap(
|
||||||
"collection", "A_COLL",
|
"collection", "A_COLL",
|
||||||
QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
|
QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
|
||||||
ASYNC, String.valueOf(i),
|
ASYNC, Integer.toString(i),
|
||||||
|
|
||||||
"sleep", (i == 0 ? "1000" : "1") //first task waits for 1 second, and thus blocking
|
// third task waits for a long time, and thus blocks the queue for all other tasks for A_COLL.
|
||||||
// all other tasks. Subsequent tasks only wait for 1ms
|
// Subsequent tasks as well as the first two only wait for 1ms
|
||||||
|
"sleep", (i == 2 ? "10000" : "1")
|
||||||
)));
|
)));
|
||||||
log.info("MOCK task added {}", i);
|
log.info("MOCK task added {}", i);
|
||||||
|
|
||||||
}
|
}
|
||||||
Thread.sleep(100);//wait and post the next message
|
|
||||||
|
|
||||||
//this is not going to be blocked because it operates on another collection
|
// Wait until we see the second A_COLL task getting processed (assuming the first got processed as well)
|
||||||
|
Long task1CollA = waitForTaskToCompleted(client, 1);
|
||||||
|
|
||||||
|
assertNotNull("Queue did not process first two tasks on A_COLL, can't run test", task1CollA);
|
||||||
|
|
||||||
|
// Make sure the long running task did not finish, otherwise no way the B_COLL task can be tested to run in parallel with it
|
||||||
|
assertNull("Long running task finished too early, can't test", checkTaskHasCompleted(client, 2));
|
||||||
|
|
||||||
|
// Enqueue a task on another collection not competing with the lock on A_COLL and see that it can be executed right away
|
||||||
distributedQueue.offer(Utils.toJSON(Utils.makeMap(
|
distributedQueue.offer(Utils.toJSON(Utils.makeMap(
|
||||||
"collection", "B_COLL",
|
"collection", "B_COLL",
|
||||||
QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
|
QUEUE_OPERATION, MOCK_COLL_TASK.toLower(),
|
||||||
|
@ -95,24 +101,43 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
|
||||||
"sleep", "1"
|
"sleep", "1"
|
||||||
)));
|
)));
|
||||||
|
|
||||||
|
// We now check that either the B_COLL task has completed before the third (long running) task on A_COLL,
|
||||||
|
// Or if both have completed (if this check got significantly delayed for some reason), we verify B_COLL was first.
|
||||||
|
Long taskCollB = waitForTaskToCompleted(client, 200);
|
||||||
|
|
||||||
Long acoll = null, bcoll = null;
|
// We do not wait for the long running task to finish, that would be a waste of time.
|
||||||
|
Long task2CollA = checkTaskHasCompleted(client, 2);
|
||||||
|
|
||||||
|
// Given the wait delay (500 iterations of 100ms), the task has plenty of time to complete, so this is not expected.
|
||||||
|
assertNotNull("Task on B_COLL did not complete, can't test", taskCollB);
|
||||||
|
// We didn't wait for the 3rd A_COLL task to complete (test can run quickly) but if it did, we expect the B_COLL to have finished first.
|
||||||
|
assertTrue("task2CollA: " + task2CollA + " taskCollB: " + taskCollB, task2CollA == null || task2CollA > taskCollB);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies the status of an async task submitted to the Overseer Collection queue.
|
||||||
|
* @return <code>null</code> if the task has not completed, the completion timestamp if the task has completed
|
||||||
|
* (see mockOperation() in {@link org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler}).
|
||||||
|
*/
|
||||||
|
private Long checkTaskHasCompleted(SolrClient client, int requestId) throws IOException, SolrServerException {
|
||||||
|
return (Long) getStatusResponse(Integer.toString(requestId), client).getResponse().get("MOCK_FINISHED");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits until the specified async task has completed or time ran out.
|
||||||
|
* @return <code>null</code> if the task has not completed, the completion timestamp if the task has completed
|
||||||
|
*/
|
||||||
|
private Long waitForTaskToCompleted(SolrClient client, int requestId) throws Exception {
|
||||||
for (int i = 0; i < 500; i++) {
|
for (int i = 0; i < 500; i++) {
|
||||||
if (bcoll == null) {
|
Long task = checkTaskHasCompleted(client, requestId);
|
||||||
CollectionAdminResponse statusResponse = getStatusResponse("200", client);
|
if (task != null) {
|
||||||
bcoll = (Long) statusResponse.getResponse().get("MOCK_FINISHED");
|
return task;
|
||||||
}
|
}
|
||||||
if (acoll == null) {
|
|
||||||
CollectionAdminResponse statusResponse = getStatusResponse("2", client);
|
|
||||||
acoll = (Long) statusResponse.getResponse().get("MOCK_FINISHED");
|
|
||||||
}
|
|
||||||
if (acoll != null && bcoll != null) break;
|
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
assertTrue(acoll != null && bcoll != null);
|
|
||||||
assertTrue("acoll: " + acoll + " bcoll: " + bcoll, acoll > bcoll);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testParallelCollectionAPICalls() throws IOException, SolrServerException {
|
private void testParallelCollectionAPICalls() throws IOException, SolrServerException {
|
||||||
|
|
Loading…
Reference in New Issue