SOLR-5749: A new Overseer status collection API exposes overseer queue sizes, timing statistics, success and error counts and last N failures per operation

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1580463 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Shalin Shekhar Mangar 2014-03-23 07:22:28 +00:00
parent 3ecd526643
commit c263c714ce
9 changed files with 553 additions and 108 deletions

View File

@ -143,6 +143,9 @@ New Features
improve logging and force refresh cluster state every 15 seconds. improve logging and force refresh cluster state every 15 seconds.
(Timothy Potter via shalin) (Timothy Potter via shalin)
* SOLR-5749: A new Overseer status collection API exposes overseer queue sizes, timing
statistics, success and error counts and last N failures per operation. (shalin)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -26,6 +26,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor; import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
@ -52,10 +53,16 @@ public class DistributedQueue {
private final String prefix = "qn-"; private final String prefix = "qn-";
private final String response_prefix = "qnr-" ; private final String response_prefix = "qnr-" ;
private final Overseer.Stats stats;
public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl) { public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl) {
this(zookeeper, dir, acl, new Overseer.Stats());
}
public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl, Overseer.Stats stats) {
this.dir = dir; this.dir = dir;
ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout()); ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
try { try {
cmdExecutor.ensureExists(dir, zookeeper); cmdExecutor.ensureExists(dir, zookeeper);
@ -65,12 +72,12 @@ public class DistributedQueue {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, e); throw new SolrException(ErrorCode.SERVER_ERROR, e);
} }
if (acl != null) { if (acl != null) {
this.acl = acl; this.acl = acl;
} }
this.zookeeper = zookeeper; this.zookeeper = zookeeper;
this.stats = stats;
} }
/** /**
@ -155,25 +162,30 @@ public class DistributedQueue {
InterruptedException { InterruptedException {
TreeMap<Long,String> orderedChildren; TreeMap<Long,String> orderedChildren;
// Same as for element. Should refactor this. // Same as for element. Should refactor this.
while (true) { TimerContext time = stats.time(dir + "_remove");
try { try {
orderedChildren = orderedChildren(null); while (true) {
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException();
}
if (orderedChildren.size() == 0) throw new NoSuchElementException();
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try { try {
byte[] data = zookeeper.getData(path, null, null, true); orderedChildren = orderedChildren(null);
zookeeper.delete(path, -1, true);
return data;
} catch (KeeperException.NoNodeException e) { } catch (KeeperException.NoNodeException e) {
// Another client deleted the node first. throw new NoSuchElementException();
} }
if (orderedChildren.size() == 0) throw new NoSuchElementException();
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return data;
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
}
}
} }
} finally {
time.stop();
} }
} }
@ -183,15 +195,20 @@ public class DistributedQueue {
*/ */
public byte[] remove(QueueEvent event) throws KeeperException, public byte[] remove(QueueEvent event) throws KeeperException,
InterruptedException { InterruptedException {
String path = event.getId(); TimerContext time = stats.time(dir + "_remove_event");
String responsePath = dir + "/" + response_prefix try {
+ path.substring(path.lastIndexOf("-") + 1); String path = event.getId();
if (zookeeper.exists(responsePath, true)) { String responsePath = dir + "/" + response_prefix
zookeeper.setData(responsePath, event.getBytes(), true); + path.substring(path.lastIndexOf("-") + 1);
if (zookeeper.exists(responsePath, true)) {
zookeeper.setData(responsePath, event.getBytes(), true);
}
byte[] data = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return data;
} finally {
time.stop();
} }
byte[] data = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return data;
} }
@ -235,29 +252,34 @@ public class DistributedQueue {
public byte[] take() throws KeeperException, InterruptedException { public byte[] take() throws KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren; TreeMap<Long,String> orderedChildren;
// Same as for element. Should refactor this. // Same as for element. Should refactor this.
while (true) { TimerContext timer = stats.time(dir + "_take");
LatchChildWatcher childWatcher = new LatchChildWatcher(); try {
try { while (true) {
orderedChildren = orderedChildren(childWatcher); LatchChildWatcher childWatcher = new LatchChildWatcher();
} catch (KeeperException.NoNodeException e) {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
continue;
}
if (orderedChildren.size() == 0) {
childWatcher.await(DEFAULT_TIMEOUT);
continue;
}
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try { try {
byte[] data = zookeeper.getData(path, null, null, true); orderedChildren = orderedChildren(childWatcher);
zookeeper.delete(path, -1, true);
return data;
} catch (KeeperException.NoNodeException e) { } catch (KeeperException.NoNodeException e) {
// Another client deleted the node first. zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
continue;
}
if (orderedChildren.size() == 0) {
childWatcher.await(DEFAULT_TIMEOUT);
continue;
}
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
return data;
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
}
} }
} }
} finally {
timer.stop();
} }
} }
@ -268,8 +290,13 @@ public class DistributedQueue {
*/ */
public boolean offer(byte[] data) throws KeeperException, public boolean offer(byte[] data) throws KeeperException,
InterruptedException { InterruptedException {
return createData(dir + "/" + prefix, data, TimerContext time = stats.time(dir + "_offer");
CreateMode.PERSISTENT_SEQUENTIAL) != null; try {
return createData(dir + "/" + prefix, data,
CreateMode.PERSISTENT_SEQUENTIAL) != null;
} finally {
time.stop();
}
} }
/** /**
@ -298,21 +325,26 @@ public class DistributedQueue {
*/ */
public QueueEvent offer(byte[] data, long timeout) throws KeeperException, public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
InterruptedException { InterruptedException {
String path = createData(dir + "/" + prefix, data, TimerContext time = stats.time(dir + "_offer");
CreateMode.PERSISTENT_SEQUENTIAL); try {
String watchID = createData( String path = createData(dir + "/" + prefix, data,
dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1), CreateMode.PERSISTENT_SEQUENTIAL);
null, CreateMode.EPHEMERAL); String watchID = createData(
Object lock = new Object(); dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
LatchChildWatcher watcher = new LatchChildWatcher(lock); null, CreateMode.EPHEMERAL);
synchronized (lock) { Object lock = new Object();
if (zookeeper.exists(watchID, watcher, true) != null) { LatchChildWatcher watcher = new LatchChildWatcher(lock);
watcher.await(timeout); synchronized (lock) {
if (zookeeper.exists(watchID, watcher, true) != null) {
watcher.await(timeout);
}
} }
byte[] bytes = zookeeper.getData(watchID, null, null, true);
zookeeper.delete(watchID, -1, true);
return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
} finally {
time.stop();
} }
byte[] bytes = zookeeper.getData(watchID, null, null, true);
zookeeper.delete(watchID, -1, true);
return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
} }
/** /**
@ -322,9 +354,14 @@ public class DistributedQueue {
* @return data at the first element of the queue, or null. * @return data at the first element of the queue, or null.
*/ */
public byte[] peek() throws KeeperException, InterruptedException { public byte[] peek() throws KeeperException, InterruptedException {
TimerContext time = stats.time(dir + "_peek");
try {
QueueEvent element = element(); QueueEvent element = element();
if(element == null) return null; if (element == null) return null;
return element.getBytes(); return element.getBytes();
} finally {
time.stop();
}
} }
public static class QueueEvent { public static class QueueEvent {
@ -399,38 +436,48 @@ public class DistributedQueue {
* @return data at the first element of the queue, or null. * @return data at the first element of the queue, or null.
*/ */
public QueueEvent peek(long wait) throws KeeperException, InterruptedException { public QueueEvent peek(long wait) throws KeeperException, InterruptedException {
if (wait == 0) { TimerContext time = null;
return element(); if (wait == Long.MAX_VALUE) {
time = stats.time(dir + "_peek_wait_forever");
} else {
time = stats.time(dir + "_peek_wait" + wait);
} }
try {
TreeMap<Long,String> orderedChildren; if (wait == 0) {
boolean waitedEnough = false; return element();
while (true) {
LatchChildWatcher childWatcher = new LatchChildWatcher();
try {
orderedChildren = orderedChildren(childWatcher);
} catch (KeeperException.NoNodeException e) {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
continue;
}
if(waitedEnough) {
if(orderedChildren.isEmpty()) return null;
}
if (orderedChildren.size() == 0) {
childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT: wait);
waitedEnough = wait != Long.MAX_VALUE;
continue;
} }
for (String headNode : orderedChildren.values()) { TreeMap<Long, String> orderedChildren;
String path = dir + "/" + headNode; boolean waitedEnough = false;
while (true) {
LatchChildWatcher childWatcher = new LatchChildWatcher();
try { try {
byte[] data = zookeeper.getData(path, null, null, true); orderedChildren = orderedChildren(childWatcher);
return new QueueEvent(path, data, childWatcher.getWatchedEvent());
} catch (KeeperException.NoNodeException e) { } catch (KeeperException.NoNodeException e) {
// Another client deleted the node first. zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
continue;
}
if (waitedEnough) {
if (orderedChildren.isEmpty()) return null;
}
if (orderedChildren.size() == 0) {
childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
waitedEnough = wait != Long.MAX_VALUE;
continue;
}
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {
byte[] data = zookeeper.getData(path, null, null, true);
return new QueueEvent(path, data, childWatcher.getWatchedEvent());
} catch (KeeperException.NoNodeException e) {
// Another client deleted the node first.
}
} }
} }
} finally {
time.stop();
} }
} }
@ -441,11 +488,17 @@ public class DistributedQueue {
* @return Head of the queue or null. * @return Head of the queue or null.
*/ */
public byte[] poll() throws KeeperException, InterruptedException { public byte[] poll() throws KeeperException, InterruptedException {
TimerContext time = stats.time(dir + "_poll");
try { try {
return remove(); return remove();
} catch (NoSuchElementException e) { } catch (NoSuchElementException e) {
return null; return null;
} finally {
time.stop();
} }
} }
public Overseer.Stats getStats() {
return stats;
}
} }

View File

@ -27,12 +27,16 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClosableThread; import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
@ -47,6 +51,9 @@ import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.handler.component.ShardHandler; import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.util.stats.Clock;
import org.apache.solr.util.stats.Timer;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -62,8 +69,11 @@ public class Overseer {
public static final String REMOVESHARD = "removeshard"; public static final String REMOVESHARD = "removeshard";
public static final String ADD_ROUTING_RULE = "addroutingrule"; public static final String ADD_ROUTING_RULE = "addroutingrule";
public static final String REMOVE_ROUTING_RULE = "removeroutingrule"; public static final String REMOVE_ROUTING_RULE = "removeroutingrule";
public static final String STATE = "state";
public static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates public static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates
public static final String CREATESHARD = "createshard";
public static final String UPDATESHARDSTATE = "updateshardstate";
private static Logger log = LoggerFactory.getLogger(Overseer.class); private static Logger log = LoggerFactory.getLogger(Overseer.class);
@ -88,13 +98,16 @@ public class Overseer {
// Internal map which holds the information about failed tasks. // Internal map which holds the information about failed tasks.
private final DistributedMap failureMap; private final DistributedMap failureMap;
private final Stats zkStats;
private Map clusterProps; private Map clusterProps;
private boolean isClosed = false; private boolean isClosed = false;
public ClusterStateUpdater(final ZkStateReader reader, final String myId) { public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
this.zkClient = reader.getZkClient(); this.zkClient = reader.getZkClient();
this.stateUpdateQueue = getInQueue(zkClient); this.zkStats = zkStats;
this.workQueue = getInternalQueue(zkClient); this.stateUpdateQueue = getInQueue(zkClient, zkStats);
this.workQueue = getInternalQueue(zkClient, zkStats);
this.failureMap = getFailureMap(zkClient); this.failureMap = getFailureMap(zkClient);
this.runningMap = getRunningMap(zkClient); this.runningMap = getRunningMap(zkClient);
this.completedMap = getCompletedMap(zkClient); this.completedMap = getCompletedMap(zkClient);
@ -102,6 +115,14 @@ public class Overseer {
this.reader = reader; this.reader = reader;
clusterProps = reader.getClusterProps(); clusterProps = reader.getClusterProps();
} }
public Stats getStateUpdateQueueStats() {
return stateUpdateQueue.getStats();
}
public Stats getWorkQueueStats() {
return workQueue.getStats();
}
@Override @Override
public void run() { public void run() {
@ -133,8 +154,10 @@ public class Overseer {
else if (LeaderStatus.YES == isLeader) { else if (LeaderStatus.YES == isLeader) {
final ZkNodeProps message = ZkNodeProps.load(head); final ZkNodeProps message = ZkNodeProps.load(head);
final String operation = message.getStr(QUEUE_OPERATION); final String operation = message.getStr(QUEUE_OPERATION);
final TimerContext timerContext = stats.time(operation);
try { try {
clusterState = processMessage(clusterState, message, operation); clusterState = processMessage(clusterState, message, operation);
stats.success(operation);
} catch (Exception e) { } catch (Exception e) {
// generally there is nothing we can do - in most cases, we have // generally there is nothing we can do - in most cases, we have
// an issue that will fail again on retry or we cannot communicate with // an issue that will fail again on retry or we cannot communicate with
@ -142,6 +165,9 @@ public class Overseer {
// TODO: if ordering for the message is not important, we could // TODO: if ordering for the message is not important, we could
// track retries and put it back on the end of the queue // track retries and put it back on the end of the queue
log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e); log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
stats.error(operation);
} finally {
timerContext.stop();
} }
zkClient.setData(ZkStateReader.CLUSTER_STATE, zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(clusterState), true); ZkStateReader.toJSON(clusterState), true);
@ -208,8 +234,10 @@ public class Overseer {
while (head != null) { while (head != null) {
final ZkNodeProps message = ZkNodeProps.load(head.getBytes()); final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String operation = message.getStr(QUEUE_OPERATION); final String operation = message.getStr(QUEUE_OPERATION);
final TimerContext timerContext = stats.time(operation);
try { try {
clusterState = processMessage(clusterState, message, operation); clusterState = processMessage(clusterState, message, operation);
stats.success(operation);
} catch (Exception e) { } catch (Exception e) {
// generally there is nothing we can do - in most cases, we have // generally there is nothing we can do - in most cases, we have
// an issue that will fail again on retry or we cannot communicate with // an issue that will fail again on retry or we cannot communicate with
@ -217,6 +245,9 @@ public class Overseer {
// TODO: if ordering for the message is not important, we could // TODO: if ordering for the message is not important, we could
// track retries and put it back on the end of the queue // track retries and put it back on the end of the queue
log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e); log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
stats.error(operation);
} finally {
timerContext.stop();
} }
workQueue.offer(head.getBytes()); workQueue.offer(head.getBytes());
@ -253,7 +284,7 @@ public class Overseer {
private ClusterState processMessage(ClusterState clusterState, private ClusterState processMessage(ClusterState clusterState,
final ZkNodeProps message, final String operation) { final ZkNodeProps message, final String operation) {
if ("state".equals(operation)) { if (STATE.equals(operation)) {
if( isLegacy( clusterProps )) { if( isLegacy( clusterProps )) {
clusterState = updateState(clusterState, message); clusterState = updateState(clusterState, message);
} else { } else {
@ -279,9 +310,9 @@ public class Overseer {
message.getStr(ZkStateReader.SHARD_ID_PROP), message.getStr(ZkStateReader.SHARD_ID_PROP),
sb.length() > 0 ? sb.toString() : null); sb.length() > 0 ? sb.toString() : null);
} else if ("createshard".equals(operation)) { } else if (CREATESHARD.equals(operation)) {
clusterState = createShard(clusterState, message); clusterState = createShard(clusterState, message);
} else if ("updateshardstate".equals(operation)) { } else if (UPDATESHARDSTATE.equals(operation)) {
clusterState = updateShardState(clusterState, message); clusterState = updateShardState(clusterState, message);
} else if (OverseerCollectionProcessor.CREATECOLLECTION.equals(operation)) { } else if (OverseerCollectionProcessor.CREATECOLLECTION.equals(operation)) {
clusterState = buildCollection(clusterState, message); clusterState = buildCollection(clusterState, message);
@ -1013,7 +1044,14 @@ public class Overseer {
public boolean isClosed() { public boolean isClosed() {
return this.isClosed; return this.isClosed;
} }
public DistributedQueue getStateUpdateQueue() {
return stateUpdateQueue;
}
public DistributedQueue getWorkQueue() {
return workQueue;
}
} }
static void getShardNames(Integer numShards, List<String> shardNames) { static void getShardNames(Integer numShards, List<String> shardNames) {
@ -1077,11 +1115,15 @@ public class Overseer {
private String adminPath; private String adminPath;
private OverseerCollectionProcessor ocp; private OverseerCollectionProcessor ocp;
private Stats stats;
// overseer not responsible for closing reader // overseer not responsible for closing reader
public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader) throws KeeperException, InterruptedException { public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader) throws KeeperException, InterruptedException {
this.reader = reader; this.reader = reader;
this.shardHandler = shardHandler; this.shardHandler = shardHandler;
this.adminPath = adminPath; this.adminPath = adminPath;
this.stats = new Stats();
} }
public void start(String id) { public void start(String id) {
@ -1090,12 +1132,12 @@ public class Overseer {
createOverseerNode(reader.getZkClient()); createOverseerNode(reader.getZkClient());
//launch cluster state updater thread //launch cluster state updater thread
ThreadGroup tg = new ThreadGroup("Overseer state updater."); ThreadGroup tg = new ThreadGroup("Overseer state updater.");
updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id)); updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id, stats));
updaterThread.setDaemon(true); updaterThread.setDaemon(true);
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process."); ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
ocp = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath); ocp = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats);
ccThread = new OverseerThread(ccTg, ocp, "Overseer-" + id); ccThread = new OverseerThread(ccTg, ocp, "Overseer-" + id);
ccThread.setDaemon(true); ccThread.setDaemon(true);
@ -1136,14 +1178,18 @@ public class Overseer {
* Get queue that can be used to send messages to Overseer. * Get queue that can be used to send messages to Overseer.
*/ */
public static DistributedQueue getInQueue(final SolrZkClient zkClient) { public static DistributedQueue getInQueue(final SolrZkClient zkClient) {
return getInQueue(zkClient, new Stats());
}
static DistributedQueue getInQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient); createOverseerNode(zkClient);
return new DistributedQueue(zkClient, "/overseer/queue", null); return new DistributedQueue(zkClient, "/overseer/queue", null, zkStats);
} }
/* Internal queue, not to be used outside of Overseer */ /* Internal queue, not to be used outside of Overseer */
static DistributedQueue getInternalQueue(final SolrZkClient zkClient) { static DistributedQueue getInternalQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient); createOverseerNode(zkClient);
return new DistributedQueue(zkClient, "/overseer/queue-work", null); return new DistributedQueue(zkClient, "/overseer/queue-work", null, zkStats);
} }
/* Internal map for failed tasks, not to be used outside of the Overseer */ /* Internal map for failed tasks, not to be used outside of the Overseer */
@ -1166,8 +1212,12 @@ public class Overseer {
/* Collection creation queue */ /* Collection creation queue */
static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) { static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
return getCollectionQueue(zkClient, new Stats());
}
static DistributedQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient); createOverseerNode(zkClient);
return new DistributedQueue(zkClient, "/overseer/collection-queue-work", null); return new DistributedQueue(zkClient, "/overseer/collection-queue-work", null, zkStats);
} }
private static void createOverseerNode(final SolrZkClient zkClient) { private static void createOverseerNode(final SolrZkClient zkClient) {
@ -1192,4 +1242,109 @@ public class Overseer {
return reader; return reader;
} }
/**
* Used to hold statistics about overseer operations. It will be exposed
* to the OverseerCollectionProcessor to return statistics.
*
* This is experimental API and subject to change.
*/
public static class Stats {
static final int MAX_STORED_FAILURES = 10;
final Map<String, Stat> stats = Collections.synchronizedMap(new HashMap<String, Stat>());
public Map<String, Stat> getStats() {
return stats;
}
public int getSuccessCount(String operation) {
Stat stat = stats.get(operation.toLowerCase(Locale.ROOT));
return stat == null ? 0 : stat.success.get();
}
public int getErrorCount(String operation) {
Stat stat = stats.get(operation.toLowerCase(Locale.ROOT));
return stat == null ? 0 : stat.errors.get();
}
public void success(String operation) {
String op = operation.toLowerCase(Locale.ROOT);
Stat stat = stats.get(op);
if (stat == null) {
stat = new Stat();
stats.put(op, stat);
}
stat.success.incrementAndGet();
}
public void error(String operation) {
String op = operation.toLowerCase(Locale.ROOT);
Stat stat = stats.get(op);
if (stat == null) {
stat = new Stat();
stats.put(op, stat);
}
stat.errors.incrementAndGet();
}
public TimerContext time(String operation) {
String op = operation.toLowerCase(Locale.ROOT);
Stat stat = stats.get(op);
if (stat == null) {
stat = new Stat();
stats.put(op, stat);
}
return stat.requestTime.time();
}
public void storeFailureDetails(String operation, ZkNodeProps request, SolrResponse resp) {
String op = operation.toLowerCase(Locale.ROOT);
Stat stat = stats.get(op);
if (stat == null) {
stat = new Stat();
stats.put(op, stat);
}
LinkedList<FailedOp> failedOps = stat.failureDetails;
synchronized (failedOps) {
if (failedOps.size() >= MAX_STORED_FAILURES) {
failedOps.removeFirst();
}
failedOps.addLast(new FailedOp(request, resp));
}
}
public List<FailedOp> getFailureDetails(String operation) {
Stat stat = stats.get(operation.toLowerCase(Locale.ROOT));
if (stat == null || stat.failureDetails.isEmpty()) return null;
LinkedList<FailedOp> failedOps = stat.failureDetails;
synchronized (failedOps) {
ArrayList<FailedOp> ret = new ArrayList<>(failedOps);
return ret;
}
}
}
public static class Stat {
public final AtomicInteger success;
public final AtomicInteger errors;
public final Timer requestTime;
public final LinkedList<FailedOp> failureDetails;
public Stat() {
this.success = new AtomicInteger();
this.errors = new AtomicInteger();
this.requestTime = new Timer(TimeUnit.MILLISECONDS, TimeUnit.MINUTES, Clock.defaultClock());
this.failureDetails = new LinkedList<>();
}
}
public static class FailedOp {
public final ZkNodeProps req;
public final SolrResponse resp;
public FailedOp(ZkNodeProps req, SolrResponse resp) {
this.req = req;
this.resp = resp;
}
}
} }

View File

@ -59,6 +59,9 @@ import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest; import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse; import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.update.SolrIndexSplitter; import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.util.stats.Snapshot;
import org.apache.solr.util.stats.Timer;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
@ -85,6 +88,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
@ -162,15 +166,18 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
private ZkStateReader zkStateReader; private ZkStateReader zkStateReader;
private boolean isClosed; private boolean isClosed;
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) { private Overseer.Stats stats;
this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()),
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath, Overseer.Stats stats) {
this(zkStateReader, myId, shardHandler, adminPath, stats, Overseer.getCollectionQueue(zkStateReader.getZkClient(), stats),
Overseer.getRunningMap(zkStateReader.getZkClient()), Overseer.getRunningMap(zkStateReader.getZkClient()),
Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient())); Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()));
} }
protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler,
String adminPath, String adminPath,
Overseer.Stats stats,
DistributedQueue workQueue, DistributedQueue workQueue,
DistributedMap runningMap, DistributedMap runningMap,
DistributedMap completedMap, DistributedMap completedMap,
@ -183,6 +190,7 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
this.runningMap = runningMap; this.runningMap = runningMap;
this.completedMap = completedMap; this.completedMap = completedMap;
this.failureMap = failureMap; this.failureMap = failureMap;
this.stats = stats;
} }
@Override @Override
@ -227,7 +235,13 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString()); log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
final String operation = message.getStr(QUEUE_OPERATION); final String operation = message.getStr(QUEUE_OPERATION);
SolrResponse response = processMessage(message, operation); final TimerContext timerContext = stats.time("collection_" + operation); // even if operation is async, it is sync!
SolrResponse response = null;
try {
response = processMessage(message, operation);
} finally {
timerContext.stop();
}
head.setBytes(SolrResponse.serializable(response)); head.setBytes(SolrResponse.serializable(response));
if (!operation.equals(REQUESTSTATUS) && asyncId != null) { if (!operation.equals(REQUESTSTATUS) && asyncId != null) {
@ -242,6 +256,13 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
workQueue.remove(head); workQueue.remove(head);
if (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null) {
stats.error("collection_" + operation);
stats.storeFailureDetails("collection_" + operation, message, response);
} else {
stats.success("collection_" + operation);
}
log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString()); log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
} catch (KeeperException e) { } catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED if (e.code() == KeeperException.Code.SESSIONEXPIRED
@ -451,7 +472,11 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
addReplica(zkStateReader.getClusterState(), message, results); addReplica(zkStateReader.getClusterState(), message, results);
} else if (REQUESTSTATUS.equals(operation)) { } else if (REQUESTSTATUS.equals(operation)) {
requestStatus(message, results); requestStatus(message, results);
} else { } else if (OVERSEERSTATUS.isEqual(operation)) {
getOverseerStatus(message, results);
}
else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation); + operation);
} }
@ -469,6 +494,79 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
return new OverseerSolrResponse(results); return new OverseerSolrResponse(results);
} }
private void getOverseerStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
String leaderNode = getLeaderNode(zkStateReader.getZkClient());
results.add("leader", leaderNode);
Stat stat = new Stat();
zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
results.add("overseer_queue_size", stat.getNumChildren());
stat = new Stat();
zkStateReader.getZkClient().getData("/overseer/queue-work",null, stat, true);
results.add("overseer_work_queue_size", stat.getNumChildren());
stat = new Stat();
zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
results.add("overseer_collection_queue_size", stat.getNumChildren());
NamedList overseerStats = new NamedList();
NamedList collectionStats = new NamedList();
NamedList stateUpdateQueueStats = new NamedList();
NamedList workQueueStats = new NamedList();
NamedList collectionQueueStats = new NamedList();
for (Map.Entry<String, Overseer.Stat> entry : this.stats.getStats().entrySet()) {
String key = entry.getKey();
NamedList<Object> lst = new SimpleOrderedMap<>();
if (key.startsWith("collection_")) {
collectionStats.add(key.substring(11), lst);
int successes = this.stats.getSuccessCount(entry.getKey());
int errors = this.stats.getErrorCount(entry.getKey());
lst.add("requests", successes);
lst.add("errors", errors);
List<Overseer.FailedOp> failureDetails = this.stats.getFailureDetails(key);
if (failureDetails != null) {
List<SimpleOrderedMap<Object>> failures = new ArrayList<>();
for (Overseer.FailedOp failedOp : failureDetails) {
SimpleOrderedMap<Object> fail = new SimpleOrderedMap<>();
fail.add("request", failedOp.req.getProperties());
fail.add("response", failedOp.resp.getResponse());
failures.add(fail);
}
lst.add("recent_failures", failures);
}
} else if (key.startsWith("/overseer/queue_")) {
stateUpdateQueueStats.add(key.substring(16), lst);
} else if (key.startsWith("/overseer/queue-work_")) {
workQueueStats.add(key.substring(21), lst);
} else if (key.startsWith("/overseer/collection-queue-work_")) {
collectionQueueStats.add(key.substring(32), lst);
} else {
// overseer stats
overseerStats.add(key, lst);
int successes = this.stats.getSuccessCount(entry.getKey());
int errors = this.stats.getErrorCount(entry.getKey());
lst.add("requests", successes);
lst.add("errors", errors);
}
Timer timer = entry.getValue().requestTime;
Snapshot snapshot = timer.getSnapshot();
lst.add("totalTime", timer.getSum());
lst.add("avgRequestsPerMinute", timer.getMeanRate());
lst.add("5minRateReqsPerMinute", timer.getFiveMinuteRate());
lst.add("15minRateReqsPerMinute", timer.getFifteenMinuteRate());
lst.add("avgTimePerRequest", timer.getMean());
lst.add("medianRequestTime", snapshot.getMedian());
lst.add("75thPcRequestTime", snapshot.get75thPercentile());
lst.add("95thPcRequestTime", snapshot.get95thPercentile());
lst.add("99thPcRequestTime", snapshot.get99thPercentile());
lst.add("999thPcRequestTime", snapshot.get999thPercentile());
}
results.add("overseer_operations", overseerStats);
results.add("collection_operations", collectionStats);
results.add("overseer_queue", stateUpdateQueueStats);
results.add("overseer_internal_queue", workQueueStats);
results.add("collection_queue", collectionQueueStats);
}
private void processRoleCommand(ZkNodeProps message, String operation) throws KeeperException, InterruptedException { private void processRoleCommand(ZkNodeProps message, String operation) throws KeeperException, InterruptedException {
SolrZkClient zkClient = zkStateReader.getZkClient(); SolrZkClient zkClient = zkStateReader.getZkClient();
Map roles = null; Map roles = null;

View File

@ -34,6 +34,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP; import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE; import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
import java.io.IOException; import java.io.IOException;
@ -206,6 +207,10 @@ public class CollectionsHandler extends RequestHandlerBase {
this.handleRequestStatus(req, rsp); this.handleRequestStatus(req, rsp);
break; break;
} }
case OVERSEERSTATUS: {
this.handleOverseerStatus(req, rsp);
break;
}
default: { default: {
throw new RuntimeException("Unknown action: " + action); throw new RuntimeException("Unknown action: " + action);
} }
@ -214,6 +219,12 @@ public class CollectionsHandler extends RequestHandlerBase {
rsp.setHttpCaching(false); rsp.setHttpCaching(false);
} }
private void handleOverseerStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
Map<String, Object> props = ZkNodeProps.makeMap(
Overseer.QUEUE_OPERATION, OVERSEERSTATUS.toLower());
handleResponse(OVERSEERSTATUS.toLower(), new ZkNodeProps(props), rsp);
}
private void handleProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException { private void handleProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
req.getParams().required().check("name"); req.getParams().required().check("name");
String name = req.getParams().get("name"); String name = req.getParams().get("name");

View File

@ -96,7 +96,7 @@ public class OverseerCollectionProcessorTest extends SolrTestCaseJ4 {
DistributedQueue workQueue, DistributedMap runningMap, DistributedQueue workQueue, DistributedMap runningMap,
DistributedMap completedMap, DistributedMap completedMap,
DistributedMap failureMap) { DistributedMap failureMap) {
super(zkStateReader, myId, shardHandler, adminPath, workQueue, runningMap, completedMap, failureMap); super(zkStateReader, myId, shardHandler, adminPath, new Overseer.Stats(), workQueue, runningMap, completedMap, failureMap);
} }
@Override @Override

View File

@ -0,0 +1,123 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
public class OverseerStatusTest extends BasicDistributedZkTest {
public OverseerStatusTest() {
schemaString = "schema15.xml"; // we need a string id
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
System.setProperty("numShards", Integer.toString(sliceCount));
System.setProperty("solr.xml.persist", "true");
}
@Override
@After
public void tearDown() throws Exception {
if (VERBOSE || printLayoutOnTearDown) {
super.printLayout();
}
if (controlClient != null) {
controlClient.shutdown();
}
if (cloudClient != null) {
cloudClient.shutdown();
}
if (controlClientCloud != null) {
controlClientCloud.shutdown();
}
super.tearDown();
}
@Override
public void doTest() throws Exception {
waitForThingsToLevelOut(15);
String collectionName = "overseer_status_test";
CollectionAdminResponse response = createCollection(collectionName, 1, 1, 1);
NamedList<Object> resp = invokeCollectionApi("action",
CollectionParams.CollectionAction.OVERSEERSTATUS.toLower());
NamedList<Object> collection_operations = (NamedList<Object>) resp.get("collection_operations");
NamedList<Object> overseer_operations = (NamedList<Object>) resp.get("overseer_operations");
SimpleOrderedMap<Object> createcollection = (SimpleOrderedMap<Object>) collection_operations.get(OverseerCollectionProcessor.CREATECOLLECTION);
assertEquals("No stats for createcollection in OverseerCollectionProcessor", 1, createcollection.get("requests"));
createcollection = (SimpleOrderedMap<Object>) overseer_operations.get("createcollection");
assertEquals("No stats for createcollection in Overseer", 1, createcollection.get("requests"));
invokeCollectionApi("action", CollectionParams.CollectionAction.RELOAD.toLower(), "name", collectionName);
resp = invokeCollectionApi("action",
CollectionParams.CollectionAction.OVERSEERSTATUS.toLower());
collection_operations = (NamedList<Object>) resp.get("collection_operations");
SimpleOrderedMap<Object> reload = (SimpleOrderedMap<Object>) collection_operations.get(OverseerCollectionProcessor.RELOADCOLLECTION);
assertEquals("No stats for reload in OverseerCollectionProcessor", 1, reload.get("requests"));
try {
invokeCollectionApi("action", CollectionParams.CollectionAction.SPLITSHARD.toLower(),
"collection", "non_existent_collection",
"shard", "non_existent_shard");
} catch (Exception e) {
// expected because we did not correctly specify required params for split
}
resp = invokeCollectionApi("action",
CollectionParams.CollectionAction.OVERSEERSTATUS.toLower());
collection_operations = (NamedList<Object>) resp.get("collection_operations");
SimpleOrderedMap<Object> split = (SimpleOrderedMap<Object>) collection_operations.get(OverseerCollectionProcessor.SPLITSHARD);
assertEquals("No stats for split in OverseerCollectionProcessor", 1, split.get("errors"));
assertNotNull(split.get("recent_failures"));
waitForThingsToLevelOut(15);
}
private NamedList<Object> invokeCollectionApi(String... args) throws SolrServerException, IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
SolrRequest request = new QueryRequest(params);
for (int i = 0; i < args.length - 1; i+=2) {
params.add(args[i], args[i+1]);
}
request.setPath("/admin/collections");
String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
.getBaseURL();
baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
baseServer.setConnectionTimeout(15000);
baseServer.setSoTimeout(60000 * 5);
return baseServer.request(request);
}
}

View File

@ -43,6 +43,7 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.handler.component.HttpShardHandlerFactory; import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
@ -912,7 +913,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
reader = new ZkStateReader(zkClient); reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate(); reader.createClusterStateWatchersAndUpdate();
//prepopulate work queue with some items to emulate previous overseer died before persisting state //prepopulate work queue with some items to emulate previous overseer died before persisting state
DistributedQueue queue = Overseer.getInternalQueue(zkClient); DistributedQueue queue = Overseer.getInternalQueue(zkClient, new Overseer.Stats());
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state", ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr", ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
ZkStateReader.NODE_NAME_PROP, "node1", ZkStateReader.NODE_NAME_PROP, "node1",

View File

@ -43,7 +43,8 @@ public interface CollectionParams
REMOVEROLE, REMOVEROLE,
CLUSTERPROP, CLUSTERPROP,
REQUESTSTATUS, REQUESTSTATUS,
ADDREPLICA; ADDREPLICA,
OVERSEERSTATUS;
public static CollectionAction get( String p ) public static CollectionAction get( String p )
{ {