diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index d4602b37991..f778e6fe58c 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -308,6 +308,8 @@ Bug Fixes * SOLR-13293: ConcurrentUpdateHttp2SolrClient always log AsynchronousCloseException exception error on indexing. (Cao Manh Dat) +* SOLR-13828: Improve ExecutePlanAction error handling. (ab) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java index 6179bcc5803..2a7e026e2ad 100644 --- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java +++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java @@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -38,6 +40,8 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.Utils; +import org.apache.solr.core.SolrResourceLoader; +import org.apache.solr.util.TestInjection; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -52,6 +56,24 @@ public class ExecutePlanAction extends TriggerActionBase { private static final String PREFIX = "op-"; static final int DEFAULT_TASK_TIMEOUT_SECONDS = 120; + public static final String TASK_TIMEOUT_SECONDS = "taskTimeoutSeconds"; + public static final String TASK_TIMEOUT_FAIL = "taskTimeoutFail"; + + int taskTimeoutSeconds; + boolean taskTimeoutFail; + + public ExecutePlanAction() { + TriggerUtils.validProperties(validProperties, TASK_TIMEOUT_SECONDS, TASK_TIMEOUT_FAIL); + } + + @Override + public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map properties) throws TriggerValidationException { + super.configure(loader, cloudManager, properties); + String str = String.valueOf(properties.getOrDefault(TASK_TIMEOUT_SECONDS, DEFAULT_TASK_TIMEOUT_SECONDS)); + taskTimeoutSeconds = Integer.parseInt(str); + str = String.valueOf(properties.getOrDefault(TASK_TIMEOUT_FAIL, false)); + taskTimeoutFail = Boolean.parseBoolean(str); + } @Override public void process(TriggerEvent event, ActionContext context) throws Exception { @@ -63,11 +85,11 @@ public class ExecutePlanAction extends TriggerActionBase { return; } try { + int counter = 0; for (SolrRequest operation : operations) { log.debug("Executing operation: {}", operation.getParams()); try { SolrResponse response = null; - int counter = 0; if (operation instanceof CollectionAdminRequest.AsyncCollectionAdminRequest) { CollectionAdminRequest.AsyncCollectionAdminRequest req = (CollectionAdminRequest.AsyncCollectionAdminRequest) operation; // waitForFinalState so that the end effects of operations are visible @@ -77,16 +99,34 @@ public class ExecutePlanAction extends TriggerActionBase { log.trace("Saved requestId: {} in znode: {}", asyncId, znode); // TODO: find a better way of using async calls using dataProvider API !!! req.setAsyncId(asyncId); - SolrResponse asyncResponse = cloudManager.request(req); - if (asyncResponse.getResponse().get("error") != null) { - throw new IOException("" + asyncResponse.getResponse().get("error")); + if (TestInjection.delayInExecutePlanAction != null) { + cloudManager.getTimeSource().sleep(TestInjection.delayInExecutePlanAction); + } + CollectionAdminRequest.RequestStatusResponse statusResponse = null; + RequestStatusState state = RequestStatusState.FAILED; + if (!TestInjection.failInExecutePlanAction) { + SolrResponse asyncResponse = cloudManager.request(req); + if (asyncResponse.getResponse().get("error") != null) { + throw new IOException("" + asyncResponse.getResponse().get("error")); + } + asyncId = (String)asyncResponse.getResponse().get("requestid"); + statusResponse = waitForTaskToFinish(cloudManager, asyncId, + taskTimeoutSeconds, TimeUnit.SECONDS); } - asyncId = (String)asyncResponse.getResponse().get("requestid"); - CollectionAdminRequest.RequestStatusResponse statusResponse = waitForTaskToFinish(cloudManager, asyncId, - DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS); if (statusResponse != null) { - RequestStatusState state = statusResponse.getRequestStatus(); + state = statusResponse.getRequestStatus(); + // overwrite to test a long-running task + if (TestInjection.delayInExecutePlanAction != null && + TestInjection.delayInExecutePlanAction > TimeUnit.MILLISECONDS.convert(taskTimeoutSeconds, TimeUnit.SECONDS)) { + state = RequestStatusState.RUNNING; + } + if (TestInjection.failInExecutePlanAction) { + state = RequestStatusState.FAILED; + } + // should we accept partial success here? i.e. some operations won't be completed + // successfully but the event processing will still be declared a success if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED || state == RequestStatusState.NOT_FOUND) { + // remove pending task marker for this request try { cloudManager.getDistribStateManager().removeData(znode, -1); } catch (Exception e) { @@ -95,7 +135,26 @@ public class ExecutePlanAction extends TriggerActionBase { } response = statusResponse; } + if (state == RequestStatusState.RUNNING || state == RequestStatusState.SUBMITTED) { + String msg = String.format(Locale.ROOT, "Task %s is still running after " + taskTimeoutSeconds + " seconds. Consider increasing " + + TASK_TIMEOUT_SECONDS + " action property or `waitFor` of the trigger %s. Operation: %s", + asyncId, event.source, req); + if (taskTimeoutFail) { + throw new IOException(msg); + } else { + log.warn(msg); + } + } else if (state == RequestStatusState.FAILED) { + // remove it as a pending task + try { + cloudManager.getDistribStateManager().removeData(znode, -1); + } catch (Exception e) { + log.warn("Unexpected exception while trying to delete znode: " + znode, e); + } + throw new IOException("Task " + asyncId + " failed: " + (statusResponse != null ? statusResponse : " timed out. Operation: " + req)); + } } else { + // generic response - can't easily determine success or failure response = cloudManager.request(operation); } NamedList result = response.getResponse(); @@ -105,6 +164,7 @@ public class ExecutePlanAction extends TriggerActionBase { responses.add(result); return responses; }); + counter++; } catch (IOException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unexpected exception executing operation: " + operation.getParams(), e); @@ -160,12 +220,14 @@ public class ExecutePlanAction extends TriggerActionBase { } cloudManager.getTimeSource().sleep(5000); } - log.debug("Task with requestId={} did not complete within 5 minutes. Last state={}", requestId, state); + log.debug("Task with requestId={} did not complete within {} seconds. Last state={}", timeoutSeconds, requestId, state); return statusResponse; } /** - * Saves the given asyncId in ZK as a persistent sequential node. + * Saves the given asyncId in ZK as a persistent sequential node. This allows us to wait for the completion + * of pending tasks from this event in {@link ScheduledTriggers} + * before starting the actions of the next event. * * @return the path of the newly created node in ZooKeeper */ diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java index 12de7feb2b0..ef140d06f11 100644 --- a/solr/core/src/java/org/apache/solr/util/TestInjection.java +++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java @@ -141,6 +141,10 @@ public class TestInjection { public volatile static Integer delayBeforeSlaveCommitRefresh=null; + public volatile static Integer delayInExecutePlanAction=null; + + public volatile static boolean failInExecutePlanAction = false; + public volatile static boolean uifOutOfMemoryError = false; public volatile static Map additionalSystemProps = null; @@ -171,6 +175,8 @@ public class TestInjection { failIndexFingerprintRequests = null; wrongIndexFingerprint = null; delayBeforeSlaveCommitRefresh = null; + delayInExecutePlanAction = null; + failInExecutePlanAction = false; uifOutOfMemoryError = false; notifyPauseForeverDone(); newSearcherHooks.clear(); diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java index d6e44ca05f7..d286faf57b2 100644 --- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java @@ -22,6 +22,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -33,6 +36,7 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest; +import org.apache.solr.cloud.CloudUtil; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; @@ -44,6 +48,7 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.Utils; import org.apache.solr.core.SolrResourceLoader; import org.apache.solr.util.LogLevel; +import org.apache.solr.util.TestInjection; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; @@ -66,6 +71,26 @@ public class ExecutePlanActionTest extends SolrCloudTestCase { private SolrResourceLoader loader; private SolrCloudManager cloudManager; + public static class StartAction extends TriggerActionBase { + + @Override + public void process(TriggerEvent event, ActionContext context) throws Exception { + startedProcessing.countDown(); + } + } + + private static CountDownLatch startedProcessing = new CountDownLatch(1); + + public static class FinishAction extends TriggerActionBase { + + @Override + public void process(TriggerEvent event, ActionContext context) throws Exception { + finishedProcessing.countDown(); + } + } + + private static CountDownLatch finishedProcessing = new CountDownLatch(1); + @BeforeClass public static void setupCluster() throws Exception { @@ -84,6 +109,9 @@ public class ExecutePlanActionTest extends SolrCloudTestCase { cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager(); + + finishedProcessing = new CountDownLatch(1); + startedProcessing = new CountDownLatch(1); } @@ -91,6 +119,7 @@ public class ExecutePlanActionTest extends SolrCloudTestCase { public void tearDown() throws Exception { shutdownCluster(); super.tearDown(); + TestInjection.reset(); } @Test @@ -233,4 +262,119 @@ public class ExecutePlanActionTest extends SolrCloudTestCase { assertNotNull(replicasOnSurvivor); assertEquals(docCollection.toString(), 2, replicasOnSurvivor.size()); } + + @Test + public void testTaskTimeout() throws Exception { + int DELAY = 2000; + boolean taskTimeoutFail = random().nextBoolean(); + TestInjection.delayInExecutePlanAction = DELAY; + CloudSolrClient solrClient = cluster.getSolrClient(); + String triggerName = "node_lost_trigger2"; + + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : '" + triggerName + "'," + + "'event' : 'nodeLost'," + + "'waitFor' : '1s'," + + "'enabled' : true," + + "'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," + + "{'name':'execute_plan','class':'solr.ExecutePlanAction', 'taskTimeoutSeconds' : '1','taskTimeoutFail':'" + taskTimeoutFail + "'}," + + "{'name':'finish','class':'" + FinishAction.class.getName() + "'}]" + + "}}"; + SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand); + NamedList response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + String collectionName = "testTaskTimeout"; + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, + "conf", 1, 2); + create.setMaxShardsPerNode(1); + create.process(solrClient); + + cluster.waitForActiveCollection(collectionName, 1, 2); + + waitForState("Timed out waiting for replicas of new collection to be active", + collectionName, clusterShape(1, 2)); + + JettySolrRunner sourceNode = cluster.getRandomJetty(random()); + + for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) { + JettySolrRunner runner = cluster.getJettySolrRunner(i); + if (runner == sourceNode) { + JettySolrRunner j = cluster.stopJettySolrRunner(i); + cluster.waitForJettyToStop(j); + } + } + + boolean await = finishedProcessing.await(DELAY * 5, TimeUnit.MILLISECONDS); + if (taskTimeoutFail) { + assertFalse("finished processing event but should fail", await); + } else { + assertTrue("did not finish processing event in time", await); + } + String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName + "/execute_plan"; + assertTrue(path + " does not exist", zkClient().exists(path, true)); + List requests = zkClient().getChildren(path, null, true); + assertFalse("some requests should be still present", requests.isEmpty()); + + // in either case the task will complete and move the replica as needed + waitForState("Timed out waiting for replicas of collection to be 2 again", + collectionName, clusterShape(1, 2)); + } + + @Test + public void testTaskFail() throws Exception { + TestInjection.failInExecutePlanAction = true; + CloudSolrClient solrClient = cluster.getSolrClient(); + String triggerName = "node_lost_trigger3"; + + String setTriggerCommand = "{" + + "'set-trigger' : {" + + "'name' : '" + triggerName + "'," + + "'event' : 'nodeLost'," + + "'waitFor' : '1s'," + + "'enabled' : true," + + "'actions' : [{'name':'start', 'class' : '" + StartAction.class.getName() + "'}," + + "{'name':'compute_plan','class':'solr.ComputePlanAction'}," + + "{'name':'execute_plan','class':'solr.ExecutePlanAction'}," + + "{'name':'finish','class':'" + FinishAction.class.getName() + "'}]" + + "}}"; + SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand); + NamedList response = solrClient.request(req); + assertEquals(response.get("result").toString(), "success"); + + String collectionName = "testTaskFail"; + CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, + "conf", 1, 2); + create.setMaxShardsPerNode(1); + create.process(solrClient); + + cluster.waitForActiveCollection(collectionName, 1, 2); + + waitForState("Timed out waiting for replicas of new collection to be active", + collectionName, clusterShape(1, 2)); + + // don't stop the jetty that runs our SolrCloudManager + JettySolrRunner runner = cluster.stopJettySolrRunner(1); + cluster.waitForJettyToStop(runner); + + boolean await = startedProcessing.await(10, TimeUnit.SECONDS); + assertTrue("did not start processing event in time", await); + await = finishedProcessing.await(2, TimeUnit.SECONDS); + assertFalse("finished processing event but should fail", await); + + String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName + "/execute_plan"; + assertTrue(path + " does not exist", zkClient().exists(path, true)); + List requests = zkClient().getChildren(path, null, true); + assertTrue("there should be no requests pending but got " + requests, requests.isEmpty()); + + // the task never completed - we actually lost a replica + try { + CloudUtil.waitForState(cloudManager, collectionName, 5, TimeUnit.SECONDS, + CloudUtil.clusterShape(1, 2)); + fail("completed a task that should have failed"); + } catch (TimeoutException te) { + // expected + } + } } diff --git a/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc b/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc index 5571377fbeb..3ad37726b10 100644 --- a/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc +++ b/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc @@ -69,10 +69,19 @@ The `ExecutePlanAction` executes the Collection API commands emitted by the `Com the cluster using SolrJ. It executes the commands serially, waiting for each of them to succeed before continuing with the next one. -Currently, it has no configurable parameters. +Currently, it has the following configurable parameters: -If any one of the commands fail, then the complete chain of actions are -executed again at the next run of the trigger. If the Overseer node fails while `ExecutePlanAction` is running, +`taskTimeoutSeconds`:: +Default value of this parameter is 120 seconds. This value defines how long the action will wait for a +command to complete its execution. If a timeout is reached while the command is still running then +the command status is provisionally considered a success but a warning is logged, unless `taskTimeoutFail` +is set to true. + +`taskTimeoutFail`:: +Boolean with a default value of false. If this value is true then a timeout in command processing will be +marked as failure and an exception will be thrown. + +If the Overseer node fails while `ExecutePlanAction` is running, then the new Overseer node will run the chain of actions for the same event again after waiting for any running Collection API operations belonging to the event to complete.