diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index f1b45d59c27..6a49d3d9792 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -143,6 +143,9 @@ New Features improve logging and force refresh cluster state every 15 seconds. (Timothy Potter via shalin) +* SOLR-5749: A new Overseer status collection API exposes overseer queue sizes, timing + statistics, success and error counts and last N failures per operation. (shalin) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java index d6fc0aaf652..754d89bc450 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java @@ -26,6 +26,7 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkCmdExecutor; +import org.apache.solr.util.stats.TimerContext; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -52,10 +53,16 @@ public class DistributedQueue { private final String prefix = "qn-"; private final String response_prefix = "qnr-" ; + + private final Overseer.Stats stats; public DistributedQueue(SolrZkClient zookeeper, String dir, List acl) { + this(zookeeper, dir, acl, new Overseer.Stats()); + } + + public DistributedQueue(SolrZkClient zookeeper, String dir, List acl, Overseer.Stats stats) { this.dir = dir; - + ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout()); try { cmdExecutor.ensureExists(dir, zookeeper); @@ -65,12 +72,12 @@ public class DistributedQueue { Thread.currentThread().interrupt(); throw new SolrException(ErrorCode.SERVER_ERROR, e); } - + if (acl != null) { this.acl = acl; } this.zookeeper = zookeeper; - + this.stats = stats; } /** @@ -155,25 +162,30 @@ public class DistributedQueue { InterruptedException { TreeMap orderedChildren; // Same as for element. Should refactor this. - while (true) { - try { - orderedChildren = orderedChildren(null); - } catch (KeeperException.NoNodeException e) { - throw new NoSuchElementException(); - } - if (orderedChildren.size() == 0) throw new NoSuchElementException(); - - for (String headNode : orderedChildren.values()) { - String path = dir + "/" + headNode; + TimerContext time = stats.time(dir + "_remove"); + try { + while (true) { try { - byte[] data = zookeeper.getData(path, null, null, true); - zookeeper.delete(path, -1, true); - return data; + orderedChildren = orderedChildren(null); } catch (KeeperException.NoNodeException e) { - // Another client deleted the node first. + throw new NoSuchElementException(); } + if (orderedChildren.size() == 0) throw new NoSuchElementException(); + + for (String headNode : orderedChildren.values()) { + String path = dir + "/" + headNode; + try { + byte[] data = zookeeper.getData(path, null, null, true); + zookeeper.delete(path, -1, true); + return data; + } catch (KeeperException.NoNodeException e) { + // Another client deleted the node first. + } + } + } - + } finally { + time.stop(); } } @@ -183,15 +195,20 @@ public class DistributedQueue { */ public byte[] remove(QueueEvent event) throws KeeperException, InterruptedException { - String path = event.getId(); - String responsePath = dir + "/" + response_prefix - + path.substring(path.lastIndexOf("-") + 1); - if (zookeeper.exists(responsePath, true)) { - zookeeper.setData(responsePath, event.getBytes(), true); + TimerContext time = stats.time(dir + "_remove_event"); + try { + String path = event.getId(); + String responsePath = dir + "/" + response_prefix + + path.substring(path.lastIndexOf("-") + 1); + if (zookeeper.exists(responsePath, true)) { + zookeeper.setData(responsePath, event.getBytes(), true); + } + byte[] data = zookeeper.getData(path, null, null, true); + zookeeper.delete(path, -1, true); + return data; + } finally { + time.stop(); } - byte[] data = zookeeper.getData(path, null, null, true); - zookeeper.delete(path, -1, true); - return data; } @@ -235,29 +252,34 @@ public class DistributedQueue { public byte[] take() throws KeeperException, InterruptedException { TreeMap orderedChildren; // Same as for element. Should refactor this. - while (true) { - LatchChildWatcher childWatcher = new LatchChildWatcher(); - try { - orderedChildren = orderedChildren(childWatcher); - } catch (KeeperException.NoNodeException e) { - zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true); - continue; - } - if (orderedChildren.size() == 0) { - childWatcher.await(DEFAULT_TIMEOUT); - continue; - } - - for (String headNode : orderedChildren.values()) { - String path = dir + "/" + headNode; + TimerContext timer = stats.time(dir + "_take"); + try { + while (true) { + LatchChildWatcher childWatcher = new LatchChildWatcher(); try { - byte[] data = zookeeper.getData(path, null, null, true); - zookeeper.delete(path, -1, true); - return data; + orderedChildren = orderedChildren(childWatcher); } catch (KeeperException.NoNodeException e) { - // Another client deleted the node first. + zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true); + continue; + } + if (orderedChildren.size() == 0) { + childWatcher.await(DEFAULT_TIMEOUT); + continue; + } + + for (String headNode : orderedChildren.values()) { + String path = dir + "/" + headNode; + try { + byte[] data = zookeeper.getData(path, null, null, true); + zookeeper.delete(path, -1, true); + return data; + } catch (KeeperException.NoNodeException e) { + // Another client deleted the node first. + } } } + } finally { + timer.stop(); } } @@ -268,8 +290,13 @@ public class DistributedQueue { */ public boolean offer(byte[] data) throws KeeperException, InterruptedException { - return createData(dir + "/" + prefix, data, - CreateMode.PERSISTENT_SEQUENTIAL) != null; + TimerContext time = stats.time(dir + "_offer"); + try { + return createData(dir + "/" + prefix, data, + CreateMode.PERSISTENT_SEQUENTIAL) != null; + } finally { + time.stop(); + } } /** @@ -298,21 +325,26 @@ public class DistributedQueue { */ public QueueEvent offer(byte[] data, long timeout) throws KeeperException, InterruptedException { - String path = createData(dir + "/" + prefix, data, - CreateMode.PERSISTENT_SEQUENTIAL); - String watchID = createData( - dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1), - null, CreateMode.EPHEMERAL); - Object lock = new Object(); - LatchChildWatcher watcher = new LatchChildWatcher(lock); - synchronized (lock) { - if (zookeeper.exists(watchID, watcher, true) != null) { - watcher.await(timeout); + TimerContext time = stats.time(dir + "_offer"); + try { + String path = createData(dir + "/" + prefix, data, + CreateMode.PERSISTENT_SEQUENTIAL); + String watchID = createData( + dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1), + null, CreateMode.EPHEMERAL); + Object lock = new Object(); + LatchChildWatcher watcher = new LatchChildWatcher(lock); + synchronized (lock) { + if (zookeeper.exists(watchID, watcher, true) != null) { + watcher.await(timeout); + } } + byte[] bytes = zookeeper.getData(watchID, null, null, true); + zookeeper.delete(watchID, -1, true); + return new QueueEvent(watchID, bytes, watcher.getWatchedEvent()); + } finally { + time.stop(); } - byte[] bytes = zookeeper.getData(watchID, null, null, true); - zookeeper.delete(watchID, -1, true); - return new QueueEvent(watchID, bytes, watcher.getWatchedEvent()); } /** @@ -322,9 +354,14 @@ public class DistributedQueue { * @return data at the first element of the queue, or null. */ public byte[] peek() throws KeeperException, InterruptedException { + TimerContext time = stats.time(dir + "_peek"); + try { QueueEvent element = element(); - if(element == null) return null; + if (element == null) return null; return element.getBytes(); + } finally { + time.stop(); + } } public static class QueueEvent { @@ -399,38 +436,48 @@ public class DistributedQueue { * @return data at the first element of the queue, or null. */ public QueueEvent peek(long wait) throws KeeperException, InterruptedException { - if (wait == 0) { - return element(); + TimerContext time = null; + if (wait == Long.MAX_VALUE) { + time = stats.time(dir + "_peek_wait_forever"); + } else { + time = stats.time(dir + "_peek_wait" + wait); } - - TreeMap orderedChildren; - boolean waitedEnough = false; - while (true) { - LatchChildWatcher childWatcher = new LatchChildWatcher(); - try { - orderedChildren = orderedChildren(childWatcher); - } catch (KeeperException.NoNodeException e) { - zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true); - continue; - } - if(waitedEnough) { - if(orderedChildren.isEmpty()) return null; - } - if (orderedChildren.size() == 0) { - childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT: wait); - waitedEnough = wait != Long.MAX_VALUE; - continue; + try { + if (wait == 0) { + return element(); } - for (String headNode : orderedChildren.values()) { - String path = dir + "/" + headNode; + TreeMap orderedChildren; + boolean waitedEnough = false; + while (true) { + LatchChildWatcher childWatcher = new LatchChildWatcher(); try { - byte[] data = zookeeper.getData(path, null, null, true); - return new QueueEvent(path, data, childWatcher.getWatchedEvent()); + orderedChildren = orderedChildren(childWatcher); } catch (KeeperException.NoNodeException e) { - // Another client deleted the node first. + zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true); + continue; + } + if (waitedEnough) { + if (orderedChildren.isEmpty()) return null; + } + if (orderedChildren.size() == 0) { + childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait); + waitedEnough = wait != Long.MAX_VALUE; + continue; + } + + for (String headNode : orderedChildren.values()) { + String path = dir + "/" + headNode; + try { + byte[] data = zookeeper.getData(path, null, null, true); + return new QueueEvent(path, data, childWatcher.getWatchedEvent()); + } catch (KeeperException.NoNodeException e) { + // Another client deleted the node first. + } } } + } finally { + time.stop(); } } @@ -441,11 +488,17 @@ public class DistributedQueue { * @return Head of the queue or null. */ public byte[] poll() throws KeeperException, InterruptedException { + TimerContext time = stats.time(dir + "_poll"); try { return remove(); } catch (NoSuchElementException e) { return null; + } finally { + time.stop(); } } - + + public Overseer.Stats getStats() { + return stats; + } } diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index ef6ce661953..be8ef7586dc 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -27,12 +27,16 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClosableThread; import org.apache.solr.common.cloud.ClusterState; @@ -47,6 +51,9 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.handler.component.ShardHandler; +import org.apache.solr.util.stats.Clock; +import org.apache.solr.util.stats.Timer; +import org.apache.solr.util.stats.TimerContext; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -62,8 +69,11 @@ public class Overseer { public static final String REMOVESHARD = "removeshard"; public static final String ADD_ROUTING_RULE = "addroutingrule"; public static final String REMOVE_ROUTING_RULE = "removeroutingrule"; + public static final String STATE = "state"; public static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates + public static final String CREATESHARD = "createshard"; + public static final String UPDATESHARDSTATE = "updateshardstate"; private static Logger log = LoggerFactory.getLogger(Overseer.class); @@ -88,13 +98,16 @@ public class Overseer { // Internal map which holds the information about failed tasks. private final DistributedMap failureMap; + private final Stats zkStats; + private Map clusterProps; private boolean isClosed = false; - public ClusterStateUpdater(final ZkStateReader reader, final String myId) { + public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) { this.zkClient = reader.getZkClient(); - this.stateUpdateQueue = getInQueue(zkClient); - this.workQueue = getInternalQueue(zkClient); + this.zkStats = zkStats; + this.stateUpdateQueue = getInQueue(zkClient, zkStats); + this.workQueue = getInternalQueue(zkClient, zkStats); this.failureMap = getFailureMap(zkClient); this.runningMap = getRunningMap(zkClient); this.completedMap = getCompletedMap(zkClient); @@ -102,6 +115,14 @@ public class Overseer { this.reader = reader; clusterProps = reader.getClusterProps(); } + + public Stats getStateUpdateQueueStats() { + return stateUpdateQueue.getStats(); + } + + public Stats getWorkQueueStats() { + return workQueue.getStats(); + } @Override public void run() { @@ -133,8 +154,10 @@ public class Overseer { else if (LeaderStatus.YES == isLeader) { final ZkNodeProps message = ZkNodeProps.load(head); final String operation = message.getStr(QUEUE_OPERATION); + final TimerContext timerContext = stats.time(operation); try { clusterState = processMessage(clusterState, message, operation); + stats.success(operation); } catch (Exception e) { // generally there is nothing we can do - in most cases, we have // an issue that will fail again on retry or we cannot communicate with @@ -142,6 +165,9 @@ public class Overseer { // TODO: if ordering for the message is not important, we could // track retries and put it back on the end of the queue log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e); + stats.error(operation); + } finally { + timerContext.stop(); } zkClient.setData(ZkStateReader.CLUSTER_STATE, ZkStateReader.toJSON(clusterState), true); @@ -208,8 +234,10 @@ public class Overseer { while (head != null) { final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); final String operation = message.getStr(QUEUE_OPERATION); + final TimerContext timerContext = stats.time(operation); try { clusterState = processMessage(clusterState, message, operation); + stats.success(operation); } catch (Exception e) { // generally there is nothing we can do - in most cases, we have // an issue that will fail again on retry or we cannot communicate with @@ -217,6 +245,9 @@ public class Overseer { // TODO: if ordering for the message is not important, we could // track retries and put it back on the end of the queue log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e); + stats.error(operation); + } finally { + timerContext.stop(); } workQueue.offer(head.getBytes()); @@ -253,7 +284,7 @@ public class Overseer { private ClusterState processMessage(ClusterState clusterState, final ZkNodeProps message, final String operation) { - if ("state".equals(operation)) { + if (STATE.equals(operation)) { if( isLegacy( clusterProps )) { clusterState = updateState(clusterState, message); } else { @@ -279,9 +310,9 @@ public class Overseer { message.getStr(ZkStateReader.SHARD_ID_PROP), sb.length() > 0 ? sb.toString() : null); - } else if ("createshard".equals(operation)) { + } else if (CREATESHARD.equals(operation)) { clusterState = createShard(clusterState, message); - } else if ("updateshardstate".equals(operation)) { + } else if (UPDATESHARDSTATE.equals(operation)) { clusterState = updateShardState(clusterState, message); } else if (OverseerCollectionProcessor.CREATECOLLECTION.equals(operation)) { clusterState = buildCollection(clusterState, message); @@ -1013,7 +1044,14 @@ public class Overseer { public boolean isClosed() { return this.isClosed; } - + + public DistributedQueue getStateUpdateQueue() { + return stateUpdateQueue; + } + + public DistributedQueue getWorkQueue() { + return workQueue; + } } static void getShardNames(Integer numShards, List shardNames) { @@ -1077,11 +1115,15 @@ public class Overseer { private String adminPath; private OverseerCollectionProcessor ocp; + + private Stats stats; + // overseer not responsible for closing reader public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader) throws KeeperException, InterruptedException { this.reader = reader; this.shardHandler = shardHandler; this.adminPath = adminPath; + this.stats = new Stats(); } public void start(String id) { @@ -1090,12 +1132,12 @@ public class Overseer { createOverseerNode(reader.getZkClient()); //launch cluster state updater thread ThreadGroup tg = new ThreadGroup("Overseer state updater."); - updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id)); + updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id, stats)); updaterThread.setDaemon(true); ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process."); - ocp = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath); + ocp = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats); ccThread = new OverseerThread(ccTg, ocp, "Overseer-" + id); ccThread.setDaemon(true); @@ -1136,14 +1178,18 @@ public class Overseer { * Get queue that can be used to send messages to Overseer. */ public static DistributedQueue getInQueue(final SolrZkClient zkClient) { + return getInQueue(zkClient, new Stats()); + } + + static DistributedQueue getInQueue(final SolrZkClient zkClient, Stats zkStats) { createOverseerNode(zkClient); - return new DistributedQueue(zkClient, "/overseer/queue", null); + return new DistributedQueue(zkClient, "/overseer/queue", null, zkStats); } /* Internal queue, not to be used outside of Overseer */ - static DistributedQueue getInternalQueue(final SolrZkClient zkClient) { + static DistributedQueue getInternalQueue(final SolrZkClient zkClient, Stats zkStats) { createOverseerNode(zkClient); - return new DistributedQueue(zkClient, "/overseer/queue-work", null); + return new DistributedQueue(zkClient, "/overseer/queue-work", null, zkStats); } /* Internal map for failed tasks, not to be used outside of the Overseer */ @@ -1166,8 +1212,12 @@ public class Overseer { /* Collection creation queue */ static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) { + return getCollectionQueue(zkClient, new Stats()); + } + + static DistributedQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) { createOverseerNode(zkClient); - return new DistributedQueue(zkClient, "/overseer/collection-queue-work", null); + return new DistributedQueue(zkClient, "/overseer/collection-queue-work", null, zkStats); } private static void createOverseerNode(final SolrZkClient zkClient) { @@ -1192,4 +1242,109 @@ public class Overseer { return reader; } + /** + * Used to hold statistics about overseer operations. It will be exposed + * to the OverseerCollectionProcessor to return statistics. + * + * This is experimental API and subject to change. + */ + public static class Stats { + static final int MAX_STORED_FAILURES = 10; + + final Map stats = Collections.synchronizedMap(new HashMap()); + + public Map getStats() { + return stats; + } + + public int getSuccessCount(String operation) { + Stat stat = stats.get(operation.toLowerCase(Locale.ROOT)); + return stat == null ? 0 : stat.success.get(); + } + + public int getErrorCount(String operation) { + Stat stat = stats.get(operation.toLowerCase(Locale.ROOT)); + return stat == null ? 0 : stat.errors.get(); + } + + public void success(String operation) { + String op = operation.toLowerCase(Locale.ROOT); + Stat stat = stats.get(op); + if (stat == null) { + stat = new Stat(); + stats.put(op, stat); + } + stat.success.incrementAndGet(); + } + + public void error(String operation) { + String op = operation.toLowerCase(Locale.ROOT); + Stat stat = stats.get(op); + if (stat == null) { + stat = new Stat(); + stats.put(op, stat); + } + stat.errors.incrementAndGet(); + } + + public TimerContext time(String operation) { + String op = operation.toLowerCase(Locale.ROOT); + Stat stat = stats.get(op); + if (stat == null) { + stat = new Stat(); + stats.put(op, stat); + } + return stat.requestTime.time(); + } + + public void storeFailureDetails(String operation, ZkNodeProps request, SolrResponse resp) { + String op = operation.toLowerCase(Locale.ROOT); + Stat stat = stats.get(op); + if (stat == null) { + stat = new Stat(); + stats.put(op, stat); + } + LinkedList failedOps = stat.failureDetails; + synchronized (failedOps) { + if (failedOps.size() >= MAX_STORED_FAILURES) { + failedOps.removeFirst(); + } + failedOps.addLast(new FailedOp(request, resp)); + } + } + + public List getFailureDetails(String operation) { + Stat stat = stats.get(operation.toLowerCase(Locale.ROOT)); + if (stat == null || stat.failureDetails.isEmpty()) return null; + LinkedList failedOps = stat.failureDetails; + synchronized (failedOps) { + ArrayList ret = new ArrayList<>(failedOps); + return ret; + } + } + } + + public static class Stat { + public final AtomicInteger success; + public final AtomicInteger errors; + public final Timer requestTime; + public final LinkedList failureDetails; + + public Stat() { + this.success = new AtomicInteger(); + this.errors = new AtomicInteger(); + this.requestTime = new Timer(TimeUnit.MILLISECONDS, TimeUnit.MINUTES, Clock.defaultClock()); + this.failureDetails = new LinkedList<>(); + } + } + + public static class FailedOp { + public final ZkNodeProps req; + public final SolrResponse resp; + + public FailedOp(ZkNodeProps req, SolrResponse resp) { + this.req = req; + this.resp = resp; + } + } } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index e8dd2cc1f34..a07acdb7816 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -59,6 +59,9 @@ import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.handler.component.ShardResponse; import org.apache.solr.update.SolrIndexSplitter; +import org.apache.solr.util.stats.Snapshot; +import org.apache.solr.util.stats.Timer; +import org.apache.solr.util.stats.TimerContext; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -85,6 +88,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS; import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE; @@ -162,15 +166,18 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { private ZkStateReader zkStateReader; private boolean isClosed; - - public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) { - this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()), + + private Overseer.Stats stats; + + public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath, Overseer.Stats stats) { + this(zkStateReader, myId, shardHandler, adminPath, stats, Overseer.getCollectionQueue(zkStateReader.getZkClient(), stats), Overseer.getRunningMap(zkStateReader.getZkClient()), Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient())); } protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath, + Overseer.Stats stats, DistributedQueue workQueue, DistributedMap runningMap, DistributedMap completedMap, @@ -183,6 +190,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { this.runningMap = runningMap; this.completedMap = completedMap; this.failureMap = failureMap; + this.stats = stats; } @Override @@ -227,7 +235,13 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString()); final String operation = message.getStr(QUEUE_OPERATION); - SolrResponse response = processMessage(message, operation); + final TimerContext timerContext = stats.time("collection_" + operation); // even if operation is async, it is sync! + SolrResponse response = null; + try { + response = processMessage(message, operation); + } finally { + timerContext.stop(); + } head.setBytes(SolrResponse.serializable(response)); if (!operation.equals(REQUESTSTATUS) && asyncId != null) { @@ -242,6 +256,13 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { workQueue.remove(head); + if (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null) { + stats.error("collection_" + operation); + stats.storeFailureDetails("collection_" + operation, message, response); + } else { + stats.success("collection_" + operation); + } + log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString()); } catch (KeeperException e) { if (e.code() == KeeperException.Code.SESSIONEXPIRED @@ -451,7 +472,11 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { addReplica(zkStateReader.getClusterState(), message, results); } else if (REQUESTSTATUS.equals(operation)) { requestStatus(message, results); - } else { + } else if (OVERSEERSTATUS.isEqual(operation)) { + getOverseerStatus(message, results); + } + + else { throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation); } @@ -469,6 +494,79 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread { return new OverseerSolrResponse(results); } + private void getOverseerStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException { + String leaderNode = getLeaderNode(zkStateReader.getZkClient()); + results.add("leader", leaderNode); + Stat stat = new Stat(); + zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true); + results.add("overseer_queue_size", stat.getNumChildren()); + stat = new Stat(); + zkStateReader.getZkClient().getData("/overseer/queue-work",null, stat, true); + results.add("overseer_work_queue_size", stat.getNumChildren()); + stat = new Stat(); + zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true); + results.add("overseer_collection_queue_size", stat.getNumChildren()); + + NamedList overseerStats = new NamedList(); + NamedList collectionStats = new NamedList(); + NamedList stateUpdateQueueStats = new NamedList(); + NamedList workQueueStats = new NamedList(); + NamedList collectionQueueStats = new NamedList(); + for (Map.Entry entry : this.stats.getStats().entrySet()) { + String key = entry.getKey(); + NamedList lst = new SimpleOrderedMap<>(); + if (key.startsWith("collection_")) { + collectionStats.add(key.substring(11), lst); + int successes = this.stats.getSuccessCount(entry.getKey()); + int errors = this.stats.getErrorCount(entry.getKey()); + lst.add("requests", successes); + lst.add("errors", errors); + List failureDetails = this.stats.getFailureDetails(key); + if (failureDetails != null) { + List> failures = new ArrayList<>(); + for (Overseer.FailedOp failedOp : failureDetails) { + SimpleOrderedMap fail = new SimpleOrderedMap<>(); + fail.add("request", failedOp.req.getProperties()); + fail.add("response", failedOp.resp.getResponse()); + failures.add(fail); + } + lst.add("recent_failures", failures); + } + } else if (key.startsWith("/overseer/queue_")) { + stateUpdateQueueStats.add(key.substring(16), lst); + } else if (key.startsWith("/overseer/queue-work_")) { + workQueueStats.add(key.substring(21), lst); + } else if (key.startsWith("/overseer/collection-queue-work_")) { + collectionQueueStats.add(key.substring(32), lst); + } else { + // overseer stats + overseerStats.add(key, lst); + int successes = this.stats.getSuccessCount(entry.getKey()); + int errors = this.stats.getErrorCount(entry.getKey()); + lst.add("requests", successes); + lst.add("errors", errors); + } + Timer timer = entry.getValue().requestTime; + Snapshot snapshot = timer.getSnapshot(); + lst.add("totalTime", timer.getSum()); + lst.add("avgRequestsPerMinute", timer.getMeanRate()); + lst.add("5minRateReqsPerMinute", timer.getFiveMinuteRate()); + lst.add("15minRateReqsPerMinute", timer.getFifteenMinuteRate()); + lst.add("avgTimePerRequest", timer.getMean()); + lst.add("medianRequestTime", snapshot.getMedian()); + lst.add("75thPcRequestTime", snapshot.get75thPercentile()); + lst.add("95thPcRequestTime", snapshot.get95thPercentile()); + lst.add("99thPcRequestTime", snapshot.get99thPercentile()); + lst.add("999thPcRequestTime", snapshot.get999thPercentile()); + } + results.add("overseer_operations", overseerStats); + results.add("collection_operations", collectionStats); + results.add("overseer_queue", stateUpdateQueueStats); + results.add("overseer_internal_queue", workQueueStats); + results.add("collection_queue", collectionQueueStats); + + } + private void processRoleCommand(ZkNodeProps message, String operation) throws KeeperException, InterruptedException { SolrZkClient zkClient = zkStateReader.getZkClient(); Map roles = null; diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index fab03a6018c..d1d2f7b2d0a 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -34,6 +34,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS; import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE; import java.io.IOException; @@ -206,6 +207,10 @@ public class CollectionsHandler extends RequestHandlerBase { this.handleRequestStatus(req, rsp); break; } + case OVERSEERSTATUS: { + this.handleOverseerStatus(req, rsp); + break; + } default: { throw new RuntimeException("Unknown action: " + action); } @@ -214,6 +219,12 @@ public class CollectionsHandler extends RequestHandlerBase { rsp.setHttpCaching(false); } + private void handleOverseerStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { + Map props = ZkNodeProps.makeMap( + Overseer.QUEUE_OPERATION, OVERSEERSTATUS.toLower()); + handleResponse(OVERSEERSTATUS.toLower(), new ZkNodeProps(props), rsp); + } + private void handleProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { req.getParams().required().check("name"); String name = req.getParams().get("name"); diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java index f51cdc013a4..d5e22042bf5 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java @@ -96,7 +96,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 { DistributedQueue workQueue, DistributedMap runningMap, DistributedMap completedMap, DistributedMap failureMap) { - super(zkStateReader, myId, shardHandler, adminPath, workQueue, runningMap, completedMap, failureMap); + super(zkStateReader, myId, shardHandler, adminPath, new Overseer.Stats(), workQueue, runningMap, completedMap, failureMap); } @Override diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java new file mode 100644 index 00000000000..48297363cb9 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java @@ -0,0 +1,123 @@ +package org.apache.solr.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.HttpSolrServer; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.common.cloud.DocRouter; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +public class OverseerStatusTest extends BasicDistributedZkTest { + + public OverseerStatusTest() { + schemaString = "schema15.xml"; // we need a string id + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + System.setProperty("numShards", Integer.toString(sliceCount)); + System.setProperty("solr.xml.persist", "true"); + } + + @Override + @After + public void tearDown() throws Exception { + if (VERBOSE || printLayoutOnTearDown) { + super.printLayout(); + } + if (controlClient != null) { + controlClient.shutdown(); + } + if (cloudClient != null) { + cloudClient.shutdown(); + } + if (controlClientCloud != null) { + controlClientCloud.shutdown(); + } + super.tearDown(); + } + + @Override + public void doTest() throws Exception { + waitForThingsToLevelOut(15); + + String collectionName = "overseer_status_test"; + CollectionAdminResponse response = createCollection(collectionName, 1, 1, 1); + NamedList resp = invokeCollectionApi("action", + CollectionParams.CollectionAction.OVERSEERSTATUS.toLower()); + NamedList collection_operations = (NamedList) resp.get("collection_operations"); + NamedList overseer_operations = (NamedList) resp.get("overseer_operations"); + SimpleOrderedMap createcollection = (SimpleOrderedMap) collection_operations.get(OverseerCollectionProcessor.CREATECOLLECTION); + assertEquals("No stats for createcollection in OverseerCollectionProcessor", 1, createcollection.get("requests")); + createcollection = (SimpleOrderedMap) overseer_operations.get("createcollection"); + assertEquals("No stats for createcollection in Overseer", 1, createcollection.get("requests")); + + invokeCollectionApi("action", CollectionParams.CollectionAction.RELOAD.toLower(), "name", collectionName); + resp = invokeCollectionApi("action", + CollectionParams.CollectionAction.OVERSEERSTATUS.toLower()); + collection_operations = (NamedList) resp.get("collection_operations"); + SimpleOrderedMap reload = (SimpleOrderedMap) collection_operations.get(OverseerCollectionProcessor.RELOADCOLLECTION); + assertEquals("No stats for reload in OverseerCollectionProcessor", 1, reload.get("requests")); + + try { + invokeCollectionApi("action", CollectionParams.CollectionAction.SPLITSHARD.toLower(), + "collection", "non_existent_collection", + "shard", "non_existent_shard"); + } catch (Exception e) { + // expected because we did not correctly specify required params for split + } + resp = invokeCollectionApi("action", + CollectionParams.CollectionAction.OVERSEERSTATUS.toLower()); + collection_operations = (NamedList) resp.get("collection_operations"); + SimpleOrderedMap split = (SimpleOrderedMap) collection_operations.get(OverseerCollectionProcessor.SPLITSHARD); + assertEquals("No stats for split in OverseerCollectionProcessor", 1, split.get("errors")); + assertNotNull(split.get("recent_failures")); + + waitForThingsToLevelOut(15); + } + + private NamedList invokeCollectionApi(String... args) throws SolrServerException, IOException { + ModifiableSolrParams params = new ModifiableSolrParams(); + SolrRequest request = new QueryRequest(params); + for (int i = 0; i < args.length - 1; i+=2) { + params.add(args[i], args[i+1]); + } + request.setPath("/admin/collections"); + + String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient) + .getBaseURL(); + baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length()); + + HttpSolrServer baseServer = new HttpSolrServer(baseUrl); + baseServer.setConnectionTimeout(15000); + baseServer.setSoTimeout(60000 * 5); + return baseServer.request(request); + } +} diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index 921315dcc90..b245aea6424 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -43,6 +43,7 @@ import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.handler.admin.CollectionsHandler; import org.apache.solr.handler.component.HttpShardHandlerFactory; import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.zookeeper.CreateMode; @@ -912,7 +913,7 @@ public class OverseerTest extends SolrTestCaseJ4 { reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); //prepopulate work queue with some items to emulate previous overseer died before persisting state - DistributedQueue queue = Overseer.getInternalQueue(zkClient); + DistributedQueue queue = Overseer.getInternalQueue(zkClient, new Overseer.Stats()); ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state", ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr", ZkStateReader.NODE_NAME_PROP, "node1", diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java index 5befcfe0e2a..71fb24ceda9 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java @@ -43,7 +43,8 @@ public interface CollectionParams REMOVEROLE, CLUSTERPROP, REQUESTSTATUS, - ADDREPLICA; + ADDREPLICA, + OVERSEERSTATUS; public static CollectionAction get( String p ) {