SOLR-12708: CREATE collection request doesn't fail or cleanup when the request fails (#568)

When performing an async CREATE requests, responses to internal requests are also included inside the "success" or "failed" elements of the general response. This will make the operation cleanup in case of failure, the same way as we do when the request is synchronous.
This commit is contained in:
Tomas Fernandez Lobbe 2019-02-20 13:24:46 -08:00
parent 1c55df083c
commit 128da043df
11 changed files with 216 additions and 44 deletions

View File

@ -30,6 +30,10 @@ Jetty 9.4.14.v20181114
Upgrade Notes
----------------------
When requesting the status of an async request via REQUESTSTATUS collections API, the response will
include the list of internal async requests (if any) in the "success" or "failed" keys (in addition
to them being included outside those keys for backwards compatibility). See SOLR-12708 for more
details
New Features
----------------------
@ -46,6 +50,8 @@ Bug Fixes
* SOLR-11876: In-place update fails when resolving from Tlog if schema has a required field (Justin Deoliveira, janhoy,
Ishan Chattopadhyaya)
* SOLR-12708: Async collection actions should not hide internal failures (Mano Kovacs, Varun Thacker, Tomás Fernández Löbbe)
Improvements
----------------------

View File

@ -287,7 +287,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
// Let's cleanup as we hit an exception
// We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
// element, which may be interpreted by the user as a positive ack
ocmh.cleanupCollection(collectionName, new NamedList());
ocmh.cleanupCollection(collectionName, new NamedList<Object>());
log.info("Cleaned up artifacts for failed create collection for [{}]", collectionName);
} else {
log.debug("Finished create command on all shards for collection: {}", collectionName);

View File

@ -189,6 +189,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
COLOCATED_WITH, null));
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String FAILURE_FIELD = "failure";
public static final String SUCCESS_FIELD = "success";
Overseer overseer;
HttpShardHandlerFactory shardHandlerFactory;
@ -878,32 +880,52 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
if (e != null && (rootThrowable == null || !okayExceptions.contains(rootThrowable))) {
log.error("Error from shard: " + shard, e);
SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
if (failure == null) {
failure = new SimpleOrderedMap();
results.add("failure", failure);
}
failure.add(nodeName, e.getClass().getName() + ":" + e.getMessage());
addFailure(results, nodeName, e.getClass().getName() + ":" + e.getMessage());
} else {
SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
if (success == null) {
success = new SimpleOrderedMap();
results.add("success", success);
}
success.add(nodeName, solrResponse.getResponse());
addSuccess(results, nodeName, solrResponse.getResponse());
}
}
@SuppressWarnings("unchecked")
private static void addFailure(NamedList<Object> results, String key, Object value) {
SimpleOrderedMap<Object> failure = (SimpleOrderedMap<Object>) results.get("failure");
if (failure == null) {
failure = new SimpleOrderedMap<>();
results.add("failure", failure);
}
failure.add(key, value);
}
@SuppressWarnings("unchecked")
private static void addSuccess(NamedList<Object> results, String key, Object value) {
SimpleOrderedMap<Object> success = (SimpleOrderedMap<Object>) results.get("success");
if (success == null) {
success = new SimpleOrderedMap<>();
results.add("success", success);
}
success.add(key, value);
}
/*
* backward compatibility reasons, add the response with the async ID as top level.
* This can be removed in Solr 9
*/
@Deprecated
public final static boolean INCLUDE_TOP_LEVEL_RESPONSE = true;
@SuppressWarnings("unchecked")
private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
for (String k:requestMap.keySet()) {
log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
NamedList reqResult = waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k));
if (INCLUDE_TOP_LEVEL_RESPONSE) {
results.add(requestMap.get(k), reqResult);
}
if ("failed".equalsIgnoreCase(((String)reqResult.get("STATUS")))) {
log.error("Error from shard {}: {}", k, reqResult);
addFailure(results, k, reqResult);
} else {
addSuccess(results, k, reqResult);
}
}
}

View File

@ -46,6 +46,9 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.Overseer;
@ -64,6 +67,7 @@ import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
@ -241,9 +245,12 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
Assign.AssignStrategy assignStrategy = assignStrategyFactory.create(clusterState, restoreCollection);
List<ReplicaPosition> replicaPositions = assignStrategy.assign(ocmh.cloudManager, assignRequest);
sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
CountDownLatch countDownLatch = new CountDownLatch(restoreCollection.getSlices().size());
//Create one replica per shard and copy backed up data to it
for (Slice slice : restoreCollection.getSlices()) {
log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
log.info("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
HashMap<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
propMap.put(COLLECTION_PROP, restoreCollectionName);
@ -274,7 +281,37 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
propMap.put(ASYNC, asyncId);
}
ocmh.addPropertyParams(message, propMap);
ocmh.addReplica(clusterState, new ZkNodeProps(propMap), new NamedList(), null);
final NamedList addReplicaResult = new NamedList();
ocmh.addReplica(clusterState, new ZkNodeProps(propMap), addReplicaResult, () -> {
Object addResultFailure = addReplicaResult.get("failure");
if (addResultFailure != null) {
SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
if (failure == null) {
failure = new SimpleOrderedMap();
results.add("failure", failure);
}
failure.addAll((NamedList) addResultFailure);
} else {
SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
if (success == null) {
success = new SimpleOrderedMap();
results.add("success", success);
}
success.addAll((NamedList) addReplicaResult.get("success"));
}
countDownLatch.countDown();
});
}
boolean allIsDone = countDownLatch.await(1, TimeUnit.HOURS);
if (!allIsDone) {
throw new TimeoutException("Initial replicas were not created within 1 hour. Timing out.");
}
Object failures = results.get("failure");
if (failures != null && ((SimpleOrderedMap) failures).size() > 0) {
log.error("Restore failed to create initial replicas.");
ocmh.cleanupCollection(restoreCollectionName, new NamedList<Object>());
return;
}
//refresh the location copy of collection state

View File

@ -194,9 +194,10 @@ public class CoreAdminHandler extends RequestHandlerBase implements PermissionNa
removeTask("running", taskObject.taskId);
if (exceptionCaught) {
addTask("failed", taskObject, true);
} else
} else {
addTask("completed", taskObject, true);
}
}
});
} finally {
MDC.remove("CoreAdminHandler.asyncId");

View File

@ -16,11 +16,10 @@
*/
package org.apache.solr.cloud;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.common.util.NamedList;
import org.junit.BeforeClass;
import org.junit.Test;
@ -42,21 +41,20 @@ public class AsyncCallRequestStatusResponseTest extends SolrCloudTestCase {
waitForState("Expected collection 'asynccall' to have 2 shards and 1 replica", "asynccall", clusterShape(2, 2));
int tries = 0;
while (true) {
final RequestStatusState state
= CollectionAdminRequest.requestStatus(asyncId).process(cluster.getSolrClient()).getRequestStatus();
if (state == RequestStatusState.COMPLETED)
break;
if (tries++ > 10)
fail("Expected to see RequestStatusState.COMPLETED but was " + state.toString());
TimeUnit.SECONDS.sleep(1);
}
RequestStatusState state = AbstractFullDistribZkTestBase.getRequestStateAfterCompletion(asyncId, 30, cluster.getSolrClient());
assertEquals("Unexpected request status: " + state, "completed", state.getKey());
CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus(asyncId);
CollectionAdminResponse rsp = requestStatus.process(cluster.getSolrClient());
NamedList<?> r = rsp.getResponse();
// Check that there's more response than the hardcoded status and states
assertEquals("Assertion Failure" + r.toString(), 5, r.size());
if (OverseerCollectionMessageHandler.INCLUDE_TOP_LEVEL_RESPONSE) {
assertEquals("Expected 5 elements in the response" + r, 5, r.size());
} else {
assertEquals("Expected 3 elements in the response" + r, 3, r.size());
}
assertNotNull("Expected 'responseHeader' response" + r, r.get("responseHeader"));
assertNotNull("Expected 'success' response" + r, r.get("success"));
assertNotNull("Expected 'status' response" + r, r.get("status"));
assertEquals("Expected 4 elements in the success element" + r.get("success"), 4, ((NamedList<?>)r.get("success")).size());
}
}

View File

@ -17,11 +17,15 @@
package org.apache.solr.cloud;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import java.util.Properties;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.params.CoreAdminParams;
import org.junit.BeforeClass;
import org.junit.Test;
@ -66,8 +70,10 @@ public class CreateCollectionCleanupTest extends SolrCloudTestCase {
@Test
public void testCreateCollectionCleanup() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
String collectionName = "foo";
assertThat(CollectionAdminRequest.listCollections(cloudClient), not(hasItem(collectionName)));
// Create a collection that would fail
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("foo","conf1",1,1);
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,"conf1",1,1);
Properties properties = new Properties();
properties.put(CoreAdminParams.DATA_DIR, "/some_invalid_dir/foo");
@ -76,7 +82,32 @@ public class CreateCollectionCleanupTest extends SolrCloudTestCase {
assertFalse(rsp.isSuccess());
// Confirm using LIST that the collection does not exist
assertFalse(CollectionAdminRequest.listCollections(cloudClient).contains("foo"));
assertThat("Failed collection is still in the clusterstate: " + cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollectionOrNull(collectionName),
CollectionAdminRequest.listCollections(cloudClient), not(hasItem(collectionName)));
}
@Test
public void testAsyncCreateCollectionCleanup() throws Exception {
final CloudSolrClient cloudClient = cluster.getSolrClient();
String collectionName = "foo2";
assertThat(CollectionAdminRequest.listCollections(cloudClient), not(hasItem(collectionName)));
// Create a collection that would fail
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,"conf1",1,1);
Properties properties = new Properties();
properties.put(CoreAdminParams.DATA_DIR, "/some_invalid_dir/foo2");
create.setProperties(properties);
create.setAsyncId("testAsyncCreateCollectionCleanup");
create.process(cloudClient);
RequestStatusState state = AbstractFullDistribZkTestBase.getRequestStateAfterCompletion("testAsyncCreateCollectionCleanup", 30, cloudClient);
assertThat(state.getKey(), is("failed"));
// Confirm using LIST that the collection does not exist
assertThat("Failed collection is still in the clusterstate: " + cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollectionOrNull(collectionName),
CollectionAdminRequest.listCollections(cloudClient), not(hasItem(collectionName)));
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.solr.cloud.api.collections;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.not;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@ -57,12 +60,14 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
protected static final int NUM_SHARDS = 2;//granted we sometimes shard split to get more
protected static final int NUM_SPLIT_SHARDS = 3; //We always split shard1 so total shards post split will be 3
protected static final String BACKUPNAME_PREFIX = "mytestbackup";
int replFactor;
int numTlogReplicas;
int numPullReplicas;
private static long docsSeed; // see indexDocs()
private String testSuffix = "test1";
@BeforeClass
public static void createCluster() throws Exception {
@ -72,7 +77,7 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
/**
* @return The name of the collection to use.
*/
public abstract String getCollectionName();
public abstract String getCollectionNamePrefix();
/**
* @return The name of the backup repository to use.
@ -85,8 +90,18 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
*/
public abstract String getBackupLocation();
public String getCollectionName(){
return getCollectionNamePrefix() + "_" + testSuffix;
}
public void setTestSuffix(String testSuffix) {
this.testSuffix = testSuffix;
}
@Test
public void test() throws Exception {
setTestSuffix("testok");
boolean isImplicit = random().nextBoolean();
boolean doSplitShardOperation = !isImplicit && random().nextBoolean();
replFactor = TestUtil.nextInt(random(), 1, 2);
@ -146,6 +161,58 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
testInvalidPath(getCollectionName());
}
@Test
public void testRestoreFailure() throws Exception {
setTestSuffix("testfailure");
replFactor = TestUtil.nextInt(random(), 1, 2);
numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
numPullReplicas = TestUtil.nextInt(random(), 0, 1);
CollectionAdminRequest.Create create =
CollectionAdminRequest.createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor, numTlogReplicas, numPullReplicas);
if (NUM_SHARDS * (replFactor + numTlogReplicas + numPullReplicas) > cluster.getJettySolrRunners().size()) {
create.setMaxShardsPerNode((int)Math.ceil(NUM_SHARDS * (replFactor + numTlogReplicas + numPullReplicas) / cluster.getJettySolrRunners().size())); //just to assert it survives the restoration
}
CloudSolrClient solrClient = cluster.getSolrClient();
create.process(solrClient);
indexDocs(getCollectionName(), false);
String backupLocation = getBackupLocation();
String backupName = BACKUPNAME_PREFIX + testSuffix;
DocCollection backupCollection = solrClient.getZkStateReader().getClusterState().getCollection(getCollectionName());
log.info("Triggering Backup command");
{
CollectionAdminRequest.Backup backup = CollectionAdminRequest.backupCollection(getCollectionName(), backupName)
.setLocation(backupLocation).setRepositoryName(getBackupRepoName());
assertEquals(0, backup.process(solrClient).getStatus());
}
log.info("Triggering Restore command");
String restoreCollectionName = getCollectionName() + "_restored";
{
CollectionAdminRequest.Restore restore = CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
.setLocation(backupLocation).setRepositoryName(getBackupRepoName());
if (backupCollection.getReplicas().size() > cluster.getJettySolrRunners().size()) {
// may need to increase maxShardsPerNode (e.g. if it was shard split, then now we need more)
restore.setMaxShardsPerNode((int)Math.ceil(backupCollection.getReplicas().size()/cluster.getJettySolrRunners().size()));
}
restore.setConfigName("confFaulty");
assertEquals(RequestStatusState.FAILED, restore.processAndWait(solrClient, 30));
assertThat("Failed collection is still in the clusterstate: " + cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollectionOrNull(restoreCollectionName),
CollectionAdminRequest.listCollections(solrClient), not(hasItem(restoreCollectionName)));
}
}
/**
* This test validates the backup of collection configuration using
* {@linkplain CollectionAdminParams#NO_INDEX_BACKUP_STRATEGY}.
@ -226,7 +293,7 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
private void testBackupAndRestore(String collectionName, int backupReplFactor) throws Exception {
String backupLocation = getBackupLocation();
String backupName = "mytestbackup";
String backupName = BACKUPNAME_PREFIX + testSuffix;
CloudSolrClient client = cluster.getSolrClient();
DocCollection backupCollection = client.getZkStateReader().getClusterState().getCollection(collectionName);
@ -312,7 +379,7 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
if (random().nextBoolean()) {
assertEquals(0, restore.process(client).getStatus());
} else {
assertEquals(RequestStatusState.COMPLETED, restore.processAndWait(client, 30));//async
assertEquals(RequestStatusState.COMPLETED, restore.processAndWait(client, 60));//async
}
AbstractDistribZkTestBase.waitForRecoveriesToFinish(
restoreCollectionName, cluster.getSolrClient().getZkStateReader(), log.isDebugEnabled(), true, 30);

View File

@ -32,6 +32,7 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@ -39,6 +40,7 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.backup.BackupManager;
@ -137,8 +139,10 @@ public class TestHdfsCloudBackupRestore extends AbstractCloudBackupRestoreTestCa
configureCluster(NUM_SHARDS)// nodes
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.addConfig("confFaulty", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.withSolrXml(SOLR_XML)
.configure();
cluster.getZkClient().delete(ZkConfigManager.CONFIGS_ZKNODE + Path.SEPARATOR + "confFaulty" + Path.SEPARATOR + "solrconfig.xml", -1, true);
}
@AfterClass
@ -154,7 +158,7 @@ public class TestHdfsCloudBackupRestore extends AbstractCloudBackupRestoreTestCa
}
@Override
public String getCollectionName() {
public String getCollectionNamePrefix() {
return "hdfsbackuprestore";
}

View File

@ -16,7 +16,9 @@
*/
package org.apache.solr.cloud.api.collections;
import org.apache.hadoop.fs.Path;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.junit.BeforeClass;
import org.junit.Test;
@ -33,7 +35,9 @@ public class TestLocalFSCloudBackupRestore extends AbstractCloudBackupRestoreTes
public static void setupClass() throws Exception {
configureCluster(NUM_SHARDS)// nodes
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.addConfig("confFaulty", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
cluster.getZkClient().delete(ZkConfigManager.CONFIGS_ZKNODE + Path.SEPARATOR + "confFaulty" + Path.SEPARATOR + "solrconfig.xml", -1, true);
boolean whitespacesInPath = random().nextBoolean();
if (whitespacesInPath) {
@ -44,7 +48,7 @@ public class TestLocalFSCloudBackupRestore extends AbstractCloudBackupRestoreTes
}
@Override
public String getCollectionName() {
public String getCollectionNamePrefix() {
return "backuprestore";
}

View File

@ -2354,6 +2354,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted whie waiting for request completion. Last state seen: " + state, e);
}
}