diff --git a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java index 19d6d9f4cc7..c9bbb8fd17a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java @@ -29,7 +29,6 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create; import org.apache.solr.client.solrj.request.CollectionAdminRequest.SplitShard; import org.apache.solr.client.solrj.request.QueryRequest; -import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.RequestStatusState; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.ModifiableSolrParams; @@ -77,17 +76,24 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase { distributedQueue.offer(Utils.toJSON(Utils.makeMap( "collection", "A_COLL", QUEUE_OPERATION, MOCK_COLL_TASK.toLower(), - ASYNC, String.valueOf(i), + ASYNC, Integer.toString(i), - "sleep", (i == 0 ? "1000" : "1") //first task waits for 1 second, and thus blocking - // all other tasks. Subsequent tasks only wait for 1ms + // third task waits for a long time, and thus blocks the queue for all other tasks for A_COLL. + // Subsequent tasks as well as the first two only wait for 1ms + "sleep", (i == 2 ? "10000" : "1") ))); log.info("MOCK task added {}", i); - } - Thread.sleep(100);//wait and post the next message - //this is not going to be blocked because it operates on another collection + // Wait until we see the second A_COLL task getting processed (assuming the first got processed as well) + Long task1CollA = waitForTaskToCompleted(client, 1); + + assertNotNull("Queue did not process first two tasks on A_COLL, can't run test", task1CollA); + + // Make sure the long running task did not finish, otherwise no way the B_COLL task can be tested to run in parallel with it + assertNull("Long running task finished too early, can't test", checkTaskHasCompleted(client, 2)); + + // Enqueue a task on another collection not competing with the lock on A_COLL and see that it can be executed right away distributedQueue.offer(Utils.toJSON(Utils.makeMap( "collection", "B_COLL", QUEUE_OPERATION, MOCK_COLL_TASK.toLower(), @@ -95,24 +101,43 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase { "sleep", "1" ))); + // We now check that either the B_COLL task has completed before the third (long running) task on A_COLL, + // Or if both have completed (if this check got significantly delayed for some reason), we verify B_COLL was first. + Long taskCollB = waitForTaskToCompleted(client, 200); - Long acoll = null, bcoll = null; - for (int i = 0; i < 500; i++) { - if (bcoll == null) { - CollectionAdminResponse statusResponse = getStatusResponse("200", client); - bcoll = (Long) statusResponse.getResponse().get("MOCK_FINISHED"); - } - if (acoll == null) { - CollectionAdminResponse statusResponse = getStatusResponse("2", client); - acoll = (Long) statusResponse.getResponse().get("MOCK_FINISHED"); - } - if (acoll != null && bcoll != null) break; - Thread.sleep(100); + // We do not wait for the long running task to finish, that would be a waste of time. + Long task2CollA = checkTaskHasCompleted(client, 2); + + // Given the wait delay (500 iterations of 100ms), the task has plenty of time to complete, so this is not expected. + assertNotNull("Task on B_COLL did not complete, can't test", taskCollB); + // We didn't wait for the 3rd A_COLL task to complete (test can run quickly) but if it did, we expect the B_COLL to have finished first. + assertTrue("task2CollA: " + task2CollA + " taskCollB: " + taskCollB, task2CollA == null || task2CollA > taskCollB); + } + } + + /** + * Verifies the status of an async task submitted to the Overseer Collection queue. + * @return null if the task has not completed, the completion timestamp if the task has completed + * (see mockOperation() in {@link org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler}). + */ + private Long checkTaskHasCompleted(SolrClient client, int requestId) throws IOException, SolrServerException { + return (Long) getStatusResponse(Integer.toString(requestId), client).getResponse().get("MOCK_FINISHED"); + } + + /** + * Waits until the specified async task has completed or time ran out. + * @return null if the task has not completed, the completion timestamp if the task has completed + */ + private Long waitForTaskToCompleted(SolrClient client, int requestId) throws Exception { + for (int i = 0; i < 500; i++) { + Long task = checkTaskHasCompleted(client, requestId); + if (task != null) { + return task; } - assertTrue(acoll != null && bcoll != null); - assertTrue("acoll: " + acoll + " bcoll: " + bcoll, acoll > bcoll); + Thread.sleep(100); } + return null; } private void testParallelCollectionAPICalls() throws IOException, SolrServerException {