From 03d7f80b27031309f7156af3bafcb6ccea74f7c7 Mon Sep 17 00:00:00 2001 From: anshum Date: Mon, 8 Feb 2016 11:18:23 -0800 Subject: [PATCH] SOLR-8648: Support selective clearing up of stored async collection API responses via DELETESTATUS API --- solr/CHANGES.txt | 6 + .../org/apache/solr/cloud/DistributedMap.java | 14 ++- .../solr/cloud/OverseerTaskProcessor.java | 31 +++-- .../handler/admin/CollectionsHandler.java | 78 ++++++------ .../AsyncCallRequestStatusResponseTest.java | 12 +- .../apache/solr/cloud/DeleteStatusTest.java | 118 ++++++++++++++++++ .../solrj/request/CollectionAdminRequest.java | 45 +++++++ .../common/params/CollectionAdminParams.java | 24 ++++ .../solr/common/params/CollectionParams.java | 1 + 9 files changed, 273 insertions(+), 56 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java create mode 100644 solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index ef6a1082736..ab2f02a4a99 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -289,6 +289,9 @@ Upgrading from Solr 5.4 ... +* Clearing up stored async collection api responses via REQUESTSTATUS call is now deprecated and would be + removed in 6.0. See SOLR-8648 for more details. + Detailed Change List ---------------------- @@ -369,6 +372,9 @@ New Features both the "simple" merge policies, but also more advanced ones, e.g. UpgradeIndexMergePolicy. (Christine Poerschke, Shai Erera) +* SOLR-8648: DELETESTATUS API for selective deletion and flushing of stored async collection API responses. + (Anshum Gupta) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java index ec2ecb7856b..8434eb86b54 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java +++ b/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java @@ -162,8 +162,18 @@ public class DistributedMap { return stat.getNumChildren(); } - public void remove(String trackingId) throws KeeperException, InterruptedException { - zookeeper.delete(dir + "/" + prefix + trackingId, -1, true); + /** + * return true if the znode was successfully deleted + * false if the node didn't exist and therefore not deleted + * exception an exception occurred while deleting + */ + public boolean remove(String trackingId) throws KeeperException, InterruptedException { + try { + zookeeper.delete(dir + "/" + prefix + trackingId, -1, true); + } catch (KeeperException.NoNodeException e) { + return false; + } + return true; } /** diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java index eaad7658964..1bda80ccf97 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java @@ -262,7 +262,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { throws KeeperException, InterruptedException { String taskKey = messageHandler.getTaskKey(message); - if(taskKey == null) + if (taskKey == null) return true; OverseerMessageHandler.ExclusiveMarking marking = messageHandler.checkExclusiveMarking(taskKey, message); @@ -277,7 +277,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { throw new IllegalArgumentException("Undefined marking: " + marking); } - if(runningZKTasks.contains(id)) + if (runningZKTasks.contains(id)) return false; return true; @@ -295,7 +295,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { public void close() { isClosed = true; - if(tpe != null) { + if (tpe != null) { if (!tpe.isShutdown()) { ExecutorUtil.shutdownAndAwaitTermination(tpe); } @@ -399,7 +399,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { messageHandler.markExclusiveTask(taskKey, message); - if(asyncId != null) + if (asyncId != null) runningMap.put(asyncId, null); } @@ -436,7 +436,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { updateStats(statsName); } - if(asyncId != null) { + if (asyncId != null) { if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) { failureMap.put(asyncId, SolrResponse.serializable(response)); @@ -465,7 +465,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable { log.warn("Resetting task {} as the thread was interrupted.", head.getId()); Thread.currentThread().interrupt(); } finally { - if(!success) { + if (!success) { // Reset task from tracking data structures so that it can be retried. resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message); } @@ -485,8 +485,12 @@ public class OverseerTaskProcessor implements Runnable, Closeable { runningTasks.remove(id); } - if(asyncId != null) - runningMap.remove(asyncId); + if (asyncId != null) { + if (!runningMap.remove(asyncId)) { + log.warn("Could not find and remove async call [" + asyncId + "] from the running map."); + } + } + messageHandler.unmarkExclusiveTask(taskKey, operation, message); } @@ -494,8 +498,11 @@ public class OverseerTaskProcessor implements Runnable, Closeable { private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) { log.warn("Resetting task: {}, requestid: {}, taskKey: {}", id, asyncId, taskKey); try { - if (asyncId != null) - runningMap.remove(asyncId); + if (asyncId != null) { + if (!runningMap.remove(asyncId)) { + log.warn("Could not find and remove async call [" + asyncId + "] from the running map."); + } + } synchronized (runningTasks) { runningTasks.remove(id); @@ -520,14 +527,14 @@ public class OverseerTaskProcessor implements Runnable, Closeable { } private boolean isSuccessful() { - if(response == null) + if (response == null) return false; return !(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null); } } private void printTrackingMaps() { - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { synchronized (runningTasks) { log.debug("RunningTasks: {}", runningTasks.toString()); } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index c04386dec29..cbb646864f4 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -40,32 +40,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP; import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERSTATUS; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.FORCELEADER; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.LIST; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.REQUESTSTATUS; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD; -import static org.apache.solr.common.params.CollectionParams.CollectionAction.SYNCSHARD; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.*; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; import static org.apache.solr.common.params.CommonParams.NAME; import static org.apache.solr.common.params.CommonParams.VALUE_LONG; @@ -116,6 +91,7 @@ import org.apache.solr.common.cloud.ZkCmdExecutor; import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionAdminParams; import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CoreAdminParams; @@ -142,6 +118,7 @@ import com.google.common.collect.ImmutableSet; public class CollectionsHandler extends RequestHandlerBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected final CoreContainer coreContainer; public CollectionsHandler() { @@ -551,18 +528,11 @@ public class CollectionsHandler extends RequestHandlerBase { @Override Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { req.getParams().required().check(REQUESTID); - + final CoreContainer coreContainer = h.coreContainer; final String requestId = req.getParams().get(REQUESTID); final ZkController zkController = coreContainer.getZkController(); - - if (requestId.equals("-1")) { - // Special taskId (-1), clears up the request state maps. - zkController.getOverseerCompletedMap().clear(); - zkController.getOverseerFailureMap().clear(); - return null; - } - + final NamedList results = new NamedList<>(); if (zkController.getOverseerCompletedMap().contains(requestId)) { final DistributedMap.MapEvent mapEvent = zkController.getOverseerCompletedMap().get(requestId); @@ -579,7 +549,7 @@ public class CollectionsHandler extends RequestHandlerBase { } else { addStatusToResponse(results, NOT_FOUND, "Did not find [" + requestId + "] in any tasks queue"); } - + final SolrResponse response = new OverseerSolrResponse(results); rsp.getValues().addAll(response.getResponse()); return null; @@ -592,6 +562,42 @@ public class CollectionsHandler extends RequestHandlerBase { results.add("status", status); } }, + DELETESTATUS_OP(DELETESTATUS) { + @SuppressWarnings("unchecked") + @Override + Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception { + final CoreContainer coreContainer = h.coreContainer; + final String requestId = req.getParams().get(REQUESTID); + final ZkController zkController = coreContainer.getZkController(); + Boolean flush = req.getParams().getBool(CollectionAdminParams.FLUSH, false); + + if (requestId == null && !flush) { + throw new SolrException(ErrorCode.BAD_REQUEST, "Either requestid or flush parameter must be specified."); + } + + if (requestId != null && flush) { + throw new SolrException(ErrorCode.BAD_REQUEST, + "Both requestid and flush parameters can not be specified together."); + } + + if (flush) { + zkController.getOverseerCompletedMap().clear(); + zkController.getOverseerFailureMap().clear(); + rsp.getValues().add("status", "successfully cleared stored collection api responses"); + return null; + } else { + // Request to cleanup + if (zkController.getOverseerCompletedMap().remove(requestId)) { + rsp.getValues().add("status", "successfully removed stored response for [" + requestId + "]"); + } else if (zkController.getOverseerFailureMap().remove(requestId)) { + rsp.getValues().add("status", "successfully removed stored response for [" + requestId + "]"); + } else { + rsp.getValues().add("status", "[" + requestId + "] not found in stored responses"); + } + } + return null; + } + }, ADDREPLICA_OP(ADDREPLICA) { @Override Map call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) diff --git a/solr/core/src/test/org/apache/solr/cloud/AsyncCallRequestStatusResponseTest.java b/solr/core/src/test/org/apache/solr/cloud/AsyncCallRequestStatusResponseTest.java index da73acc2d95..71b6b44f051 100644 --- a/solr/core/src/test/org/apache/solr/cloud/AsyncCallRequestStatusResponseTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/AsyncCallRequestStatusResponseTest.java @@ -23,16 +23,16 @@ import org.apache.solr.common.util.NamedList; import org.junit.Test; public class AsyncCallRequestStatusResponseTest extends AbstractFullDistribZkTestBase { - + @ShardsFixed(num = 2) @Test public void testAsyncCallStatusResponse() throws Exception { CollectionAdminRequest.Create create = new CollectionAdminRequest.Create(); - create.setCollectionName("asynccall"); - create.setNumShards(2); - create.setAsyncId("1000"); - create.setConfigName("conf1"); - create.process(cloudClient); + create.setCollectionName("asynccall") + .setNumShards(2) + .setAsyncId("1000") + .setConfigName("conf1") + .process(cloudClient); waitForCollection(cloudClient.getZkStateReader(), "asynccall", 2); final RequestStatusState state = getRequestStateAfterCompletion("1000", 30, cloudClient); assertSame(RequestStatusState.COMPLETED, state); diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java new file mode 100644 index 00000000000..16ca35a0865 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.io.IOException; + +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.client.solrj.response.RequestStatusState; +import org.junit.Test; + +public class DeleteStatusTest extends AbstractFullDistribZkTestBase { + + @Test + public void testDeleteStatus() throws IOException, SolrServerException { + CollectionAdminRequest.Create create = new CollectionAdminRequest.Create(); + create.setCollectionName("requeststatus") + .setConfigName("conf1") + .setReplicationFactor(1) + .setNumShards(1) + .setAsyncId("collectioncreate") + .process(cloudClient); + + RequestStatusState state = getRequestStateAfterCompletion("collectioncreate", 30, cloudClient); + assertSame(RequestStatusState.COMPLETED, state); + + // Let's delete the stored response now + CollectionAdminRequest.DeleteStatus deleteStatus = new CollectionAdminRequest.DeleteStatus(); + CollectionAdminResponse rsp = deleteStatus + .setRequestId("collectioncreate") + .process(cloudClient); + assertEquals("successfully removed stored response for [collectioncreate]", rsp.getResponse().get("status")); + + // Make sure that the response was deleted from zk + state = getRequestState("collectioncreate", cloudClient); + assertSame(RequestStatusState.NOT_FOUND, state); + + // Try deleting the same requestid again + deleteStatus = new CollectionAdminRequest.DeleteStatus(); + rsp = deleteStatus + .setRequestId("collectioncreate") + .process(cloudClient); + assertEquals("[collectioncreate] not found in stored responses", rsp.getResponse().get("status")); + + // Let's try deleting a non-existent status + deleteStatus = new CollectionAdminRequest.DeleteStatus(); + rsp = deleteStatus + .setRequestId("foo") + .process(cloudClient); + assertEquals("[foo] not found in stored responses", rsp.getResponse().get("status")); + } + + @Test + public void testDeleteStatusFlush() throws Exception { + CollectionAdminRequest.Create create = new CollectionAdminRequest.Create(); + create.setConfigName("conf1") + .setCollectionName("foo") + .setAsyncId("foo") + .setNumShards(1) + .setReplicationFactor(1) + .process(cloudClient); + + create = new CollectionAdminRequest.Create(); + create.setConfigName("conf1") + .setCollectionName("bar") + .setAsyncId("bar") + .setNumShards(1) + .setReplicationFactor(1) + .process(cloudClient); + + RequestStatusState state = getRequestStateAfterCompletion("foo", 30, cloudClient); + assertEquals(RequestStatusState.COMPLETED, state); + + state = getRequestStateAfterCompletion("bar", 30, cloudClient); + assertEquals(RequestStatusState.COMPLETED, state); + + CollectionAdminRequest.DeleteStatus deleteStatus = new CollectionAdminRequest.DeleteStatus(); + deleteStatus.setFlush(true) + .process(cloudClient); + + assertEquals(RequestStatusState.NOT_FOUND, getRequestState("foo", cloudClient)); + assertEquals(RequestStatusState.NOT_FOUND, getRequestState("bar", cloudClient)); + + deleteStatus = new CollectionAdminRequest.DeleteStatus(); + try { + deleteStatus.process(cloudClient); + fail("delete status should have failed"); + } catch (HttpSolrClient.RemoteSolrException e) { + assertTrue(e.getMessage().contains("Either requestid or flush parameter must be specified.")); + } + + deleteStatus = new CollectionAdminRequest.DeleteStatus(); + try { + deleteStatus.setFlush(true) + .setRequestId("foo") + .process(cloudClient); + fail("delete status should have failed"); + } catch (HttpSolrClient.RemoteSolrException e) { + assertTrue(e.getMessage().contains("Both requestid and flush parameters can not be specified together.")); + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java index cec7e0b016d..4ae7fe81f36 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java @@ -22,6 +22,7 @@ import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionAdminParams; import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.params.CommonAdminParams; import org.apache.solr.common.params.CoreAdminParams; @@ -485,6 +486,50 @@ public abstract class CollectionAdminRequest { + protected String requestId = null; + protected Boolean flush = null; + + public DeleteStatus() { + action = CollectionAction.DELETESTATUS; + } + + public DeleteStatus setRequestId(String requestId) { + this.requestId = requestId; + return this; + } + + public DeleteStatus setFlush(Boolean flush) { + this.flush = flush; + return this; + } + + public String getRequestId() { + return this.requestId; + } + + public Boolean getFlush() { + return this.flush; + } + + @Override + public SolrParams getParams() { + ModifiableSolrParams params = (ModifiableSolrParams) super.getParams(); + if (requestId != null) + params.set(CoreAdminParams.REQUESTID, requestId); + + if (flush != null) + params.set(CollectionAdminParams.FLUSH, flush); + return params; + } + + @Override + protected DeleteStatus getThis() { + return this; + } + } + // CREATEALIAS request public static class CreateAlias extends CollectionAdminRequest { protected String aliasName; diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java new file mode 100644 index 00000000000..b9bf717663f --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.common.params; + +public abstract class CollectionAdminParams { + + /* Param used by DELETESTATUS call to clear all stored responses */ + public static final String FLUSH = "flush"; + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java index 06559b0816e..cb34b364efe 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java @@ -43,6 +43,7 @@ public interface CollectionParams REMOVEROLE(true), CLUSTERPROP(true), REQUESTSTATUS(false), + DELETESTATUS(false), ADDREPLICA(true), OVERSEERSTATUS(false), LIST(false),