SOLR-8648: Support selective clearing up of stored async collection API responses via DELETESTATUS API

This commit is contained in:
anshum 2016-02-08 11:18:23 -08:00
parent fc5b1ac279
commit 03d7f80b27
9 changed files with 273 additions and 56 deletions

View File

@ -289,6 +289,9 @@ Upgrading from Solr 5.4
...
</mergePolicyFactory>
* Clearing up stored async collection api responses via REQUESTSTATUS call is now deprecated and would be
removed in 6.0. See SOLR-8648 for more details.
Detailed Change List
----------------------
@ -369,6 +372,9 @@ New Features
both the "simple" merge policies, but also more advanced ones, e.g. UpgradeIndexMergePolicy.
(Christine Poerschke, Shai Erera)
* SOLR-8648: DELETESTATUS API for selective deletion and flushing of stored async collection API responses.
(Anshum Gupta)
Bug Fixes
----------------------

View File

@ -162,8 +162,18 @@ public class DistributedMap {
return stat.getNumChildren();
}
public void remove(String trackingId) throws KeeperException, InterruptedException {
zookeeper.delete(dir + "/" + prefix + trackingId, -1, true);
/**
* return true if the znode was successfully deleted
* false if the node didn't exist and therefore not deleted
* exception an exception occurred while deleting
*/
public boolean remove(String trackingId) throws KeeperException, InterruptedException {
try {
zookeeper.delete(dir + "/" + prefix + trackingId, -1, true);
} catch (KeeperException.NoNodeException e) {
return false;
}
return true;
}
/**

View File

@ -262,7 +262,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
throws KeeperException, InterruptedException {
String taskKey = messageHandler.getTaskKey(message);
if(taskKey == null)
if (taskKey == null)
return true;
OverseerMessageHandler.ExclusiveMarking marking = messageHandler.checkExclusiveMarking(taskKey, message);
@ -277,7 +277,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
throw new IllegalArgumentException("Undefined marking: " + marking);
}
if(runningZKTasks.contains(id))
if (runningZKTasks.contains(id))
return false;
return true;
@ -295,7 +295,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
public void close() {
isClosed = true;
if(tpe != null) {
if (tpe != null) {
if (!tpe.isShutdown()) {
ExecutorUtil.shutdownAndAwaitTermination(tpe);
}
@ -399,7 +399,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
messageHandler.markExclusiveTask(taskKey, message);
if(asyncId != null)
if (asyncId != null)
runningMap.put(asyncId, null);
}
@ -436,7 +436,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
updateStats(statsName);
}
if(asyncId != null) {
if (asyncId != null) {
if (response != null && (response.getResponse().get("failure") != null
|| response.getResponse().get("exception") != null)) {
failureMap.put(asyncId, SolrResponse.serializable(response));
@ -465,7 +465,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
log.warn("Resetting task {} as the thread was interrupted.", head.getId());
Thread.currentThread().interrupt();
} finally {
if(!success) {
if (!success) {
// Reset task from tracking data structures so that it can be retried.
resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
}
@ -485,8 +485,12 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
runningTasks.remove(id);
}
if(asyncId != null)
runningMap.remove(asyncId);
if (asyncId != null) {
if (!runningMap.remove(asyncId)) {
log.warn("Could not find and remove async call [" + asyncId + "] from the running map.");
}
}
messageHandler.unmarkExclusiveTask(taskKey, operation, message);
}
@ -494,8 +498,11 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) {
log.warn("Resetting task: {}, requestid: {}, taskKey: {}", id, asyncId, taskKey);
try {
if (asyncId != null)
runningMap.remove(asyncId);
if (asyncId != null) {
if (!runningMap.remove(asyncId)) {
log.warn("Could not find and remove async call [" + asyncId + "] from the running map.");
}
}
synchronized (runningTasks) {
runningTasks.remove(id);
@ -520,14 +527,14 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
private boolean isSuccessful() {
if(response == null)
if (response == null)
return false;
return !(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null);
}
}
private void printTrackingMaps() {
if(log.isDebugEnabled()) {
if (log.isDebugEnabled()) {
synchronized (runningTasks) {
log.debug("RunningTasks: {}", runningTasks.toString());
}

View File

@ -40,32 +40,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.FORCELEADER;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.LIST;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REQUESTSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.SYNCSHARD;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.CommonParams.VALUE_LONG;
@ -116,6 +91,7 @@ import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
@ -142,6 +118,7 @@ import com.google.common.collect.ImmutableSet;
public class CollectionsHandler extends RequestHandlerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final CoreContainer coreContainer;
public CollectionsHandler() {
@ -551,18 +528,11 @@ public class CollectionsHandler extends RequestHandlerBase {
@Override
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
req.getParams().required().check(REQUESTID);
final CoreContainer coreContainer = h.coreContainer;
final String requestId = req.getParams().get(REQUESTID);
final ZkController zkController = coreContainer.getZkController();
if (requestId.equals("-1")) {
// Special taskId (-1), clears up the request state maps.
zkController.getOverseerCompletedMap().clear();
zkController.getOverseerFailureMap().clear();
return null;
}
final NamedList<Object> results = new NamedList<>();
if (zkController.getOverseerCompletedMap().contains(requestId)) {
final DistributedMap.MapEvent mapEvent = zkController.getOverseerCompletedMap().get(requestId);
@ -579,7 +549,7 @@ public class CollectionsHandler extends RequestHandlerBase {
} else {
addStatusToResponse(results, NOT_FOUND, "Did not find [" + requestId + "] in any tasks queue");
}
final SolrResponse response = new OverseerSolrResponse(results);
rsp.getValues().addAll(response.getResponse());
return null;
@ -592,6 +562,42 @@ public class CollectionsHandler extends RequestHandlerBase {
results.add("status", status);
}
},
DELETESTATUS_OP(DELETESTATUS) {
@SuppressWarnings("unchecked")
@Override
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
final CoreContainer coreContainer = h.coreContainer;
final String requestId = req.getParams().get(REQUESTID);
final ZkController zkController = coreContainer.getZkController();
Boolean flush = req.getParams().getBool(CollectionAdminParams.FLUSH, false);
if (requestId == null && !flush) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Either requestid or flush parameter must be specified.");
}
if (requestId != null && flush) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Both requestid and flush parameters can not be specified together.");
}
if (flush) {
zkController.getOverseerCompletedMap().clear();
zkController.getOverseerFailureMap().clear();
rsp.getValues().add("status", "successfully cleared stored collection api responses");
return null;
} else {
// Request to cleanup
if (zkController.getOverseerCompletedMap().remove(requestId)) {
rsp.getValues().add("status", "successfully removed stored response for [" + requestId + "]");
} else if (zkController.getOverseerFailureMap().remove(requestId)) {
rsp.getValues().add("status", "successfully removed stored response for [" + requestId + "]");
} else {
rsp.getValues().add("status", "[" + requestId + "] not found in stored responses");
}
}
return null;
}
},
ADDREPLICA_OP(ADDREPLICA) {
@Override
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h)

View File

@ -23,16 +23,16 @@ import org.apache.solr.common.util.NamedList;
import org.junit.Test;
public class AsyncCallRequestStatusResponseTest extends AbstractFullDistribZkTestBase {
@ShardsFixed(num = 2)
@Test
public void testAsyncCallStatusResponse() throws Exception {
CollectionAdminRequest.Create create = new CollectionAdminRequest.Create();
create.setCollectionName("asynccall");
create.setNumShards(2);
create.setAsyncId("1000");
create.setConfigName("conf1");
create.process(cloudClient);
create.setCollectionName("asynccall")
.setNumShards(2)
.setAsyncId("1000")
.setConfigName("conf1")
.process(cloudClient);
waitForCollection(cloudClient.getZkStateReader(), "asynccall", 2);
final RequestStatusState state = getRequestStateAfterCompletion("1000", 30, cloudClient);
assertSame(RequestStatusState.COMPLETED, state);

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.junit.Test;
public class DeleteStatusTest extends AbstractFullDistribZkTestBase {
@Test
public void testDeleteStatus() throws IOException, SolrServerException {
CollectionAdminRequest.Create create = new CollectionAdminRequest.Create();
create.setCollectionName("requeststatus")
.setConfigName("conf1")
.setReplicationFactor(1)
.setNumShards(1)
.setAsyncId("collectioncreate")
.process(cloudClient);
RequestStatusState state = getRequestStateAfterCompletion("collectioncreate", 30, cloudClient);
assertSame(RequestStatusState.COMPLETED, state);
// Let's delete the stored response now
CollectionAdminRequest.DeleteStatus deleteStatus = new CollectionAdminRequest.DeleteStatus();
CollectionAdminResponse rsp = deleteStatus
.setRequestId("collectioncreate")
.process(cloudClient);
assertEquals("successfully removed stored response for [collectioncreate]", rsp.getResponse().get("status"));
// Make sure that the response was deleted from zk
state = getRequestState("collectioncreate", cloudClient);
assertSame(RequestStatusState.NOT_FOUND, state);
// Try deleting the same requestid again
deleteStatus = new CollectionAdminRequest.DeleteStatus();
rsp = deleteStatus
.setRequestId("collectioncreate")
.process(cloudClient);
assertEquals("[collectioncreate] not found in stored responses", rsp.getResponse().get("status"));
// Let's try deleting a non-existent status
deleteStatus = new CollectionAdminRequest.DeleteStatus();
rsp = deleteStatus
.setRequestId("foo")
.process(cloudClient);
assertEquals("[foo] not found in stored responses", rsp.getResponse().get("status"));
}
@Test
public void testDeleteStatusFlush() throws Exception {
CollectionAdminRequest.Create create = new CollectionAdminRequest.Create();
create.setConfigName("conf1")
.setCollectionName("foo")
.setAsyncId("foo")
.setNumShards(1)
.setReplicationFactor(1)
.process(cloudClient);
create = new CollectionAdminRequest.Create();
create.setConfigName("conf1")
.setCollectionName("bar")
.setAsyncId("bar")
.setNumShards(1)
.setReplicationFactor(1)
.process(cloudClient);
RequestStatusState state = getRequestStateAfterCompletion("foo", 30, cloudClient);
assertEquals(RequestStatusState.COMPLETED, state);
state = getRequestStateAfterCompletion("bar", 30, cloudClient);
assertEquals(RequestStatusState.COMPLETED, state);
CollectionAdminRequest.DeleteStatus deleteStatus = new CollectionAdminRequest.DeleteStatus();
deleteStatus.setFlush(true)
.process(cloudClient);
assertEquals(RequestStatusState.NOT_FOUND, getRequestState("foo", cloudClient));
assertEquals(RequestStatusState.NOT_FOUND, getRequestState("bar", cloudClient));
deleteStatus = new CollectionAdminRequest.DeleteStatus();
try {
deleteStatus.process(cloudClient);
fail("delete status should have failed");
} catch (HttpSolrClient.RemoteSolrException e) {
assertTrue(e.getMessage().contains("Either requestid or flush parameter must be specified."));
}
deleteStatus = new CollectionAdminRequest.DeleteStatus();
try {
deleteStatus.setFlush(true)
.setRequestId("foo")
.process(cloudClient);
fail("delete status should have failed");
} catch (HttpSolrClient.RemoteSolrException e) {
assertTrue(e.getMessage().contains("Both requestid and flush parameters can not be specified together."));
}
}
}

View File

@ -22,6 +22,7 @@ import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
@ -485,6 +486,50 @@ public abstract class CollectionAdminRequest <Q extends CollectionAdminRequest<Q
}
}
// DELETESTATUS request
public static class DeleteStatus extends CollectionAdminRequest<DeleteStatus> {
protected String requestId = null;
protected Boolean flush = null;
public DeleteStatus() {
action = CollectionAction.DELETESTATUS;
}
public DeleteStatus setRequestId(String requestId) {
this.requestId = requestId;
return this;
}
public DeleteStatus setFlush(Boolean flush) {
this.flush = flush;
return this;
}
public String getRequestId() {
return this.requestId;
}
public Boolean getFlush() {
return this.flush;
}
@Override
public SolrParams getParams() {
ModifiableSolrParams params = (ModifiableSolrParams) super.getParams();
if (requestId != null)
params.set(CoreAdminParams.REQUESTID, requestId);
if (flush != null)
params.set(CollectionAdminParams.FLUSH, flush);
return params;
}
@Override
protected DeleteStatus getThis() {
return this;
}
}
// CREATEALIAS request
public static class CreateAlias extends CollectionAdminRequest<CreateAlias> {
protected String aliasName;

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.common.params;
public abstract class CollectionAdminParams {
/* Param used by DELETESTATUS call to clear all stored responses */
public static final String FLUSH = "flush";
}

View File

@ -43,6 +43,7 @@ public interface CollectionParams
REMOVEROLE(true),
CLUSTERPROP(true),
REQUESTSTATUS(false),
DELETESTATUS(false),
ADDREPLICA(true),
OVERSEERSTATUS(false),
LIST(false),