SOLR-13828: Improve ExecutePlanAction error handling.

This commit is contained in:
Andrzej Bialecki 2019-10-10 23:58:40 +02:00
parent 2189172b74
commit 9c96834ca7
5 changed files with 236 additions and 13 deletions

View File

@ -246,6 +246,8 @@ Bug Fixes
* SOLR-13293: ConcurrentUpdateHttp2SolrClient always log AsynchronousCloseException exception error on indexing.
(Cao Manh Dat)
* SOLR-13828: Improve ExecutePlanAction error handling. (ab)
Other Changes
----------------------

View File

@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -38,6 +40,8 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.TestInjection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@ -52,6 +56,24 @@ public class ExecutePlanAction extends TriggerActionBase {
private static final String PREFIX = "op-";
static final int DEFAULT_TASK_TIMEOUT_SECONDS = 120;
public static final String TASK_TIMEOUT_SECONDS = "taskTimeoutSeconds";
public static final String TASK_TIMEOUT_FAIL = "taskTimeoutFail";
int taskTimeoutSeconds;
boolean taskTimeoutFail;
public ExecutePlanAction() {
TriggerUtils.validProperties(validProperties, TASK_TIMEOUT_SECONDS, TASK_TIMEOUT_FAIL);
}
@Override
public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
super.configure(loader, cloudManager, properties);
String str = String.valueOf(properties.getOrDefault(TASK_TIMEOUT_SECONDS, DEFAULT_TASK_TIMEOUT_SECONDS));
taskTimeoutSeconds = Integer.parseInt(str);
str = String.valueOf(properties.getOrDefault(TASK_TIMEOUT_FAIL, false));
taskTimeoutFail = Boolean.parseBoolean(str);
}
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
@ -63,11 +85,11 @@ public class ExecutePlanAction extends TriggerActionBase {
return;
}
try {
int counter = 0;
for (SolrRequest operation : operations) {
log.debug("Executing operation: {}", operation.getParams());
try {
SolrResponse response = null;
int counter = 0;
if (operation instanceof CollectionAdminRequest.AsyncCollectionAdminRequest) {
CollectionAdminRequest.AsyncCollectionAdminRequest req = (CollectionAdminRequest.AsyncCollectionAdminRequest) operation;
// waitForFinalState so that the end effects of operations are visible
@ -77,16 +99,34 @@ public class ExecutePlanAction extends TriggerActionBase {
log.trace("Saved requestId: {} in znode: {}", asyncId, znode);
// TODO: find a better way of using async calls using dataProvider API !!!
req.setAsyncId(asyncId);
SolrResponse asyncResponse = cloudManager.request(req);
if (asyncResponse.getResponse().get("error") != null) {
throw new IOException("" + asyncResponse.getResponse().get("error"));
if (TestInjection.delayInExecutePlanAction != null) {
cloudManager.getTimeSource().sleep(TestInjection.delayInExecutePlanAction);
}
CollectionAdminRequest.RequestStatusResponse statusResponse = null;
RequestStatusState state = RequestStatusState.FAILED;
if (!TestInjection.failInExecutePlanAction) {
SolrResponse asyncResponse = cloudManager.request(req);
if (asyncResponse.getResponse().get("error") != null) {
throw new IOException("" + asyncResponse.getResponse().get("error"));
}
asyncId = (String)asyncResponse.getResponse().get("requestid");
statusResponse = waitForTaskToFinish(cloudManager, asyncId,
taskTimeoutSeconds, TimeUnit.SECONDS);
}
asyncId = (String)asyncResponse.getResponse().get("requestid");
CollectionAdminRequest.RequestStatusResponse statusResponse = waitForTaskToFinish(cloudManager, asyncId,
DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (statusResponse != null) {
RequestStatusState state = statusResponse.getRequestStatus();
state = statusResponse.getRequestStatus();
// overwrite to test a long-running task
if (TestInjection.delayInExecutePlanAction != null &&
TestInjection.delayInExecutePlanAction > TimeUnit.MILLISECONDS.convert(taskTimeoutSeconds, TimeUnit.SECONDS)) {
state = RequestStatusState.RUNNING;
}
if (TestInjection.failInExecutePlanAction) {
state = RequestStatusState.FAILED;
}
// should we accept partial success here? i.e. some operations won't be completed
// successfully but the event processing will still be declared a success
if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED || state == RequestStatusState.NOT_FOUND) {
// remove pending task marker for this request
try {
cloudManager.getDistribStateManager().removeData(znode, -1);
} catch (Exception e) {
@ -95,7 +135,26 @@ public class ExecutePlanAction extends TriggerActionBase {
}
response = statusResponse;
}
if (state == RequestStatusState.RUNNING || state == RequestStatusState.SUBMITTED) {
String msg = String.format(Locale.ROOT, "Task %s is still running after " + taskTimeoutSeconds + " seconds. Consider increasing " +
TASK_TIMEOUT_SECONDS + " action property or `waitFor` of the trigger %s. Operation: %s",
asyncId, event.source, req);
if (taskTimeoutFail) {
throw new IOException(msg);
} else {
log.warn(msg);
}
} else if (state == RequestStatusState.FAILED) {
// remove it as a pending task
try {
cloudManager.getDistribStateManager().removeData(znode, -1);
} catch (Exception e) {
log.warn("Unexpected exception while trying to delete znode: " + znode, e);
}
throw new IOException("Task " + asyncId + " failed: " + (statusResponse != null ? statusResponse : " timed out. Operation: " + req));
}
} else {
// generic response - can't easily determine success or failure
response = cloudManager.request(operation);
}
NamedList<Object> result = response.getResponse();
@ -105,6 +164,7 @@ public class ExecutePlanAction extends TriggerActionBase {
responses.add(result);
return responses;
});
counter++;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unexpected exception executing operation: " + operation.getParams(), e);
@ -160,12 +220,14 @@ public class ExecutePlanAction extends TriggerActionBase {
}
cloudManager.getTimeSource().sleep(5000);
}
log.debug("Task with requestId={} did not complete within 5 minutes. Last state={}", requestId, state);
log.debug("Task with requestId={} did not complete within {} seconds. Last state={}", timeoutSeconds, requestId, state);
return statusResponse;
}
/**
* Saves the given asyncId in ZK as a persistent sequential node.
* Saves the given asyncId in ZK as a persistent sequential node. This allows us to wait for the completion
* of pending tasks from this event in {@link ScheduledTriggers}
* before starting the actions of the next event.
*
* @return the path of the newly created node in ZooKeeper
*/

View File

@ -141,6 +141,10 @@ public class TestInjection {
public volatile static Integer delayBeforeSlaveCommitRefresh=null;
public volatile static Integer delayInExecutePlanAction=null;
public volatile static boolean failInExecutePlanAction = false;
public volatile static boolean uifOutOfMemoryError = false;
public volatile static Map<String, String> additionalSystemProps = null;
@ -171,6 +175,8 @@ public class TestInjection {
failIndexFingerprintRequests = null;
wrongIndexFingerprint = null;
delayBeforeSlaveCommitRefresh = null;
delayInExecutePlanAction = null;
failInExecutePlanAction = false;
uifOutOfMemoryError = false;
notifyPauseForeverDone();
newSearcherHooks.clear();

View File

@ -22,6 +22,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@ -33,6 +36,7 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest;
import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@ -44,6 +48,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TestInjection;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
@ -66,6 +71,26 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
private SolrResourceLoader loader;
private SolrCloudManager cloudManager;
public static class StartAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
startedProcessing.countDown();
}
}
private static CountDownLatch startedProcessing = new CountDownLatch(1);
public static class FinishAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
finishedProcessing.countDown();
}
}
private static CountDownLatch finishedProcessing = new CountDownLatch(1);
@BeforeClass
public static void setupCluster() throws Exception {
@ -84,6 +109,9 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
finishedProcessing = new CountDownLatch(1);
startedProcessing = new CountDownLatch(1);
}
@ -91,6 +119,7 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
public void tearDown() throws Exception {
shutdownCluster();
super.tearDown();
TestInjection.reset();
}
@Test
@ -233,4 +262,119 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
assertNotNull(replicasOnSurvivor);
assertEquals(docCollection.toString(), 2, replicasOnSurvivor.size());
}
@Test
public void testTaskTimeout() throws Exception {
int DELAY = 2000;
boolean taskTimeoutFail = random().nextBoolean();
TestInjection.delayInExecutePlanAction = DELAY;
CloudSolrClient solrClient = cluster.getSolrClient();
String triggerName = "node_lost_trigger2";
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : '" + triggerName + "'," +
"'event' : 'nodeLost'," +
"'waitFor' : '1s'," +
"'enabled' : true," +
"'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
"{'name':'execute_plan','class':'solr.ExecutePlanAction', 'taskTimeoutSeconds' : '1','taskTimeoutFail':'" + taskTimeoutFail + "'}," +
"{'name':'finish','class':'" + FinishAction.class.getName() + "'}]" +
"}}";
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
String collectionName = "testTaskTimeout";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", 1, 2);
create.setMaxShardsPerNode(1);
create.process(solrClient);
cluster.waitForActiveCollection(collectionName, 1, 2);
waitForState("Timed out waiting for replicas of new collection to be active",
collectionName, clusterShape(1, 2));
JettySolrRunner sourceNode = cluster.getRandomJetty(random());
for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
JettySolrRunner runner = cluster.getJettySolrRunner(i);
if (runner == sourceNode) {
JettySolrRunner j = cluster.stopJettySolrRunner(i);
cluster.waitForJettyToStop(j);
}
}
boolean await = finishedProcessing.await(DELAY * 5, TimeUnit.MILLISECONDS);
if (taskTimeoutFail) {
assertFalse("finished processing event but should fail", await);
} else {
assertTrue("did not finish processing event in time", await);
}
String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName + "/execute_plan";
assertTrue(path + " does not exist", zkClient().exists(path, true));
List<String> requests = zkClient().getChildren(path, null, true);
assertFalse("some requests should be still present", requests.isEmpty());
// in either case the task will complete and move the replica as needed
waitForState("Timed out waiting for replicas of collection to be 2 again",
collectionName, clusterShape(1, 2));
}
@Test
public void testTaskFail() throws Exception {
TestInjection.failInExecutePlanAction = true;
CloudSolrClient solrClient = cluster.getSolrClient();
String triggerName = "node_lost_trigger3";
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : '" + triggerName + "'," +
"'event' : 'nodeLost'," +
"'waitFor' : '1s'," +
"'enabled' : true," +
"'actions' : [{'name':'start', 'class' : '" + StartAction.class.getName() + "'}," +
"{'name':'compute_plan','class':'solr.ComputePlanAction'}," +
"{'name':'execute_plan','class':'solr.ExecutePlanAction'}," +
"{'name':'finish','class':'" + FinishAction.class.getName() + "'}]" +
"}}";
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
String collectionName = "testTaskFail";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", 1, 2);
create.setMaxShardsPerNode(1);
create.process(solrClient);
cluster.waitForActiveCollection(collectionName, 1, 2);
waitForState("Timed out waiting for replicas of new collection to be active",
collectionName, clusterShape(1, 2));
// don't stop the jetty that runs our SolrCloudManager
JettySolrRunner runner = cluster.stopJettySolrRunner(1);
cluster.waitForJettyToStop(runner);
boolean await = startedProcessing.await(10, TimeUnit.SECONDS);
assertTrue("did not start processing event in time", await);
await = finishedProcessing.await(2, TimeUnit.SECONDS);
assertFalse("finished processing event but should fail", await);
String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName + "/execute_plan";
assertTrue(path + " does not exist", zkClient().exists(path, true));
List<String> requests = zkClient().getChildren(path, null, true);
assertTrue("there should be no requests pending but got " + requests, requests.isEmpty());
// the task never completed - we actually lost a replica
try {
CloudUtil.waitForState(cloudManager, collectionName, 5, TimeUnit.SECONDS,
CloudUtil.clusterShape(1, 2));
fail("completed a task that should have failed");
} catch (TimeoutException te) {
// expected
}
}
}

View File

@ -69,10 +69,19 @@ The `ExecutePlanAction` executes the Collection API commands emitted by the `Com
the cluster using SolrJ. It executes the commands serially, waiting for each of them to succeed before
continuing with the next one.
Currently, it has no configurable parameters.
Currently, it has the following configurable parameters:
If any one of the commands fail, then the complete chain of actions are
executed again at the next run of the trigger. If the Overseer node fails while `ExecutePlanAction` is running,
`taskTimeoutSeconds`::
Default value of this parameter is 120 seconds. This value defines how long the action will wait for a
command to complete its execution. If a timeout is reached while the command is still running then
the command status is provisionally considered a success but a warning is logged, unless `taskTimeoutFail`
is set to true.
`taskTimeoutFail`::
Boolean with a default value of false. If this value is true then a timeout in command processing will be
marked as failure and an exception will be thrown.
If the Overseer node fails while `ExecutePlanAction` is running,
then the new Overseer node will run the chain of actions for the same event again after waiting for any
running Collection API operations belonging to the event to complete.