SOLR-5886: Store async call results in zk so that they can be returned by the REQUESTSTATUS API. Also, restrict the number of stored response to 10,000 each as a safety net.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1686089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Anshum Gupta 2015-06-17 18:46:10 +00:00
parent c33623f316
commit 4b16875058
11 changed files with 219 additions and 128 deletions

View File

@ -126,6 +126,9 @@ New Features
* SOLR-7668: Add 'port' tag support in replica placement rules (Adam McElwee, Noble Paul)
* SOLR-5886: Response for an async call is now stored in zk so that it can be returned by the REQUESTSTATUS API.
Also, the number of stored (failed and successful) responses are now restricted to 10,000 each as a safety net.
(Anshum Gupta)
Bug Fixes
----------------------

View File

@ -42,15 +42,15 @@ public class DistributedMap {
private static final Logger LOG = LoggerFactory
.getLogger(DistributedMap.class);
private static long DEFAULT_TIMEOUT = 5*60*1000;
protected static long DEFAULT_TIMEOUT = 5*60*1000;
private final String dir;
protected final String dir;
private SolrZkClient zookeeper;
protected SolrZkClient zookeeper;
private final String prefix = "mn-";
protected final String prefix = "mn-";
private final String response_prefix = "mnr-" ;
protected final String response_prefix = "mnr-" ;
public DistributedMap(SolrZkClient zookeeper, String dir, List<ACL> acl) {
this.dir = dir;
@ -68,7 +68,7 @@ public class DistributedMap {
this.zookeeper = zookeeper;
}
private class LatchChildWatcher implements Watcher {
protected class LatchChildWatcher implements Watcher {
Object lock = new Object();
private WatchedEvent event = null;
@ -105,7 +105,7 @@ public class DistributedMap {
*
* @return true if data was successfully added
*/
private String createData(String path, byte[] data, CreateMode mode)
protected String createData(String path, byte[] data, CreateMode mode)
throws KeeperException, InterruptedException {
for (;;) {
try {

View File

@ -78,9 +78,11 @@ public class Overseer implements Closeable {
public static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates
public static final int NUM_RESPONSES_TO_STORE = 10000;
private static Logger log = LoggerFactory.getLogger(Overseer.class);
static enum LeaderStatus {DONT_KNOW, NO, YES}
enum LeaderStatus {DONT_KNOW, NO, YES}
private class ClusterStateUpdater implements Runnable, Closeable {
@ -900,16 +902,16 @@ public class Overseer implements Closeable {
return new DistributedMap(zkClient, "/overseer/collection-map-running", null);
}
/* Internal map for successfully completed tasks, not to be used outside of the Overseer */
/* Size-limited map for successfully completed tasks*/
static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
return new DistributedMap(zkClient, "/overseer/collection-map-completed", null);
return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed", null, NUM_RESPONSES_TO_STORE);
}
/* Internal map for failed tasks, not to be used outside of the Overseer */
/* Map for failed tasks, not to be used outside of the Overseer */
static DistributedMap getFailureMap(final SolrZkClient zkClient) {
createOverseerNode(zkClient);
return new DistributedMap(zkClient, "/overseer/collection-map-failure", null);
return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", null, NUM_RESPONSES_TO_STORE);
}
/* Collection creation queue */

View File

@ -128,7 +128,7 @@ import static org.apache.solr.common.util.StrUtils.formatString;
public class OverseerCollectionProcessor implements Runnable, Closeable {
public static final String NUM_SLICES = "numShards";
static final boolean CREATE_NODE_SET_SHUFFLE_DEFAULT = true;
public static final String CREATE_NODE_SET_SHUFFLE = "createNodeSet.shuffle";
public static final String CREATE_NODE_SET = "createNodeSet";
@ -2848,11 +2848,12 @@ public class OverseerCollectionProcessor implements Runnable, Closeable {
}
if(asyncId != null) {
if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
failureMap.put(asyncId, null);
if (response != null && (response.getResponse().get("failure") != null
|| response.getResponse().get("exception") != null)) {
failureMap.put(asyncId, SolrResponse.serializable(response));
log.debug("Updated failed map for task with zkid:[{}]", head.getId());
} else {
completedMap.put(asyncId, null);
completedMap.put(asyncId, SolrResponse.serializable(response));
log.debug("Updated completed map for task with zkid:[{}]", head.getId());
}
} else {

View File

@ -0,0 +1,79 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.List;
import org.apache.lucene.util.PriorityQueue;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A size limited distributed map maintained in zk.
* Oldest znodes (as per modification time) are evicted as newer ones come in.
*/
public class SizeLimitedDistributedMap extends DistributedMap {
protected static final Logger log = LoggerFactory
.getLogger(DistributedMap.class);
private final int maxSize;
public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, List<ACL> acl, int maxSize) {
super(zookeeper, dir, acl);
this.maxSize = maxSize;
}
@Override
public boolean put(String trackingId, byte[] data) throws KeeperException, InterruptedException {
if(this.size() >= maxSize) {
// Bring down the size
List<String> children = zookeeper.getChildren(dir, null, true);
int cleanupSize = maxSize / 10;
final PriorityQueue priorityQueue = new PriorityQueue<Long>(cleanupSize) {
@Override
protected boolean lessThan(Long a, Long b) {
return (a > b);
}
};
for(String child: children) {
Stat stat = zookeeper.exists(dir + "/" + child, null, true);
priorityQueue.insertWithOverflow(stat.getMzxid());
}
long topElementMzxId = (Long) priorityQueue.top();
for(String child:children) {
Stat stat = zookeeper.exists(dir + "/" + child, null, true);
if(stat.getMzxid() <= topElementMzxId)
zookeeper.delete(dir + "/" + child, -1, true);
}
}
return createData(dir + "/" + prefix + trackingId, data,
CreateMode.PERSISTENT) != null;
}
}

View File

@ -20,10 +20,6 @@ package org.apache.solr.handler.admin;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@ -39,6 +35,7 @@ import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.DistributedMap;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.DistributedQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
@ -48,13 +45,9 @@ import org.apache.solr.cloud.rule.ReplicaAssigner;
import org.apache.solr.cloud.rule.Rule;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@ -63,11 +56,9 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.BlobHandler;
import org.apache.solr.handler.RequestHandlerBase;
@ -524,12 +515,20 @@ public class CollectionsHandler extends RequestHandlerBase {
} else {
NamedList<Object> results = new NamedList<>();
if (coreContainer.getZkController().getOverseerCompletedMap().contains(requestId)) {
DistributedMap.MapEvent mapEvent = coreContainer.getZkController().getOverseerCompletedMap().get(requestId);
SimpleOrderedMap success = new SimpleOrderedMap();
if(mapEvent != null) {
rsp.getValues().addAll(SolrResponse.deserialize(mapEvent.getBytes()).getResponse());
}
success.add("state", "completed");
success.add("msg", "found " + requestId + " in completed tasks");
results.add("status", success);
} else if (coreContainer.getZkController().getOverseerFailureMap().contains(requestId)) {
SimpleOrderedMap success = new SimpleOrderedMap();
DistributedMap.MapEvent mapEvent = coreContainer.getZkController().getOverseerFailureMap().get(requestId);
if(mapEvent != null) {
rsp.getValues().addAll(SolrResponse.deserialize(mapEvent.getBytes()).getResponse());
}
success.add("state", "failed");
success.add("msg", "found " + requestId + " in failed tasks");
results.add("status", success);

View File

@ -0,0 +1,46 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.common.util.NamedList;
import org.junit.Test;
public class AsyncCallRequestStatusResponseTest extends AbstractFullDistribZkTestBase {
@ShardsFixed(num = 2)
@Test
public void testAsyncCallStatusResponse() throws Exception {
CollectionAdminRequest.Create create = new CollectionAdminRequest.Create();
create.setCollectionName("asynccall");
create.setNumShards(2);
create.setAsyncId("1000");
create.setConfigName("conf1");
create.process(cloudClient);
waitForCollection(cloudClient.getZkStateReader(), "asynccall", 2);
String state = getRequestStateAfterCompletion("1000", 30, cloudClient);
assertTrue(state.equals("completed"));
CollectionAdminRequest.RequestStatus requestStatus = new CollectionAdminRequest.RequestStatus();
requestStatus.setRequestId("1000");
CollectionAdminResponse rsp = requestStatus.process(cloudClient);
NamedList r = rsp.getResponse();
// Check that there's more response than the hardcoded status and states
assertEquals("Assertion Failure" + r.toString(), 5, r.size());
}
}

View File

@ -17,19 +17,12 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import java.io.IOException;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.RequestStatus;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.SplitShard;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.update.DirectUpdateHandler2;
import org.junit.Test;
/**
@ -90,27 +83,4 @@ public class CollectionsAPIAsyncDistributedZkTest extends AbstractFullDistribZkT
assertEquals("Shard split did not complete. Last recorded state: " + state, "completed", state);
}
}
private String getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client)
throws IOException, SolrServerException {
String state = null;
while(waitForSeconds-- > 0) {
state = getRequestState(requestId, client);
if(state.equals("completed") || state.equals("failed"))
return state;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
return state;
}
private String getRequestState(String requestId, SolrClient client) throws IOException, SolrServerException {
RequestStatus request = new RequestStatus();
request.setRequestId(requestId);
CollectionAdminResponse response = request.process(client);
NamedList innerResponse = (NamedList) response.getResponse().get("status");
return (String) innerResponse.get("state");
}
}

View File

@ -17,6 +17,30 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.SolrClient;
@ -49,7 +73,6 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
@ -57,40 +80,8 @@ import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoMBean.Category;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;

View File

@ -17,27 +17,24 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
import org.apache.solr.client.solrj.SolrRequest;
import java.io.IOException;
import java.util.Random;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.RequestStatus;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.SplitShard;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.update.DirectUpdateHandler2;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* Tests the Multi threaded Collections API.
*/
@ -254,38 +251,6 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
index("id", id);
// todo - target diff servers and use cloud clients as well as non-cloud clients
}
private String getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client)
throws IOException, SolrServerException {
String state = null;
long maxWait = System.nanoTime() + TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS);
while (System.nanoTime() < maxWait) {
state = getRequestState(requestId, client);
if(state.equals("completed") || state.equals("failed"))
return state;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
return state;
}
private String getRequestState(int requestId, SolrClient client) throws IOException, SolrServerException {
return getRequestState(String.valueOf(requestId), client);
}
private String getRequestState(String requestId, SolrClient client) throws IOException, SolrServerException {
RequestStatus requestStatusRequest = new RequestStatus();
requestStatusRequest.setRequestId(requestId);
CollectionAdminResponse response = requestStatusRequest.process(client);
NamedList innerResponse = (NamedList) response.getResponse().get("status");
return (String) innerResponse.get("state");
}
}

View File

@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.params.CoreConnectionPNames;
@ -45,6 +46,7 @@ import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
@ -1871,4 +1873,37 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
return cs;
}
protected String getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client)
throws IOException, SolrServerException {
String state = null;
long maxWait = System.nanoTime() + TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS);
while (System.nanoTime() < maxWait) {
state = getRequestState(requestId, client);
if(state.equals("completed") || state.equals("failed"))
return state;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
return state;
}
protected String getRequestState(int requestId, SolrClient client) throws IOException, SolrServerException {
return getRequestState(String.valueOf(requestId), client);
}
protected String getRequestState(String requestId, SolrClient client) throws IOException, SolrServerException {
CollectionAdminRequest.RequestStatus requestStatusRequest = new CollectionAdminRequest.RequestStatus();
requestStatusRequest.setRequestId(requestId);
CollectionAdminResponse response = requestStatusRequest.process(client);
NamedList innerResponse = (NamedList) response.getResponse().get("status");
return (String) innerResponse.get("state");
}
}