mirror of https://github.com/apache/lucene.git
SOLR-6459: Normalize logging of operations in Overseer and log current queue size.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1641218 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d22f5bf7b1
commit
509dcf87f9
|
@ -411,6 +411,9 @@ Other Changes
|
||||||
* SOLR-6747: Add an optional caching option as a workaround for SOLR-6586.
|
* SOLR-6747: Add an optional caching option as a workaround for SOLR-6586.
|
||||||
(Mark Miller, Gregory Chanan)
|
(Mark Miller, Gregory Chanan)
|
||||||
|
|
||||||
|
* SOLR-6459: Normalize logging of operations in Overseer and log current queue size.
|
||||||
|
(Ramkumar Aiyengar via Mark Miller)
|
||||||
|
|
||||||
================== 4.10.3 ==================
|
================== 4.10.3 ==================
|
||||||
|
|
||||||
Bug Fixes
|
Bug Fixes
|
||||||
|
|
|
@ -90,6 +90,7 @@ public class DistributedQueue {
|
||||||
TreeMap<Long,String> orderedChildren = new TreeMap<>();
|
TreeMap<Long,String> orderedChildren = new TreeMap<>();
|
||||||
|
|
||||||
List<String> childNames = zookeeper.getChildren(dir, watcher, true);
|
List<String> childNames = zookeeper.getChildren(dir, watcher, true);
|
||||||
|
stats.setQueueLength(childNames.size());
|
||||||
for (String childName : childNames) {
|
for (String childName : childNames) {
|
||||||
try {
|
try {
|
||||||
// Check format
|
// Check format
|
||||||
|
@ -117,6 +118,7 @@ public class DistributedQueue {
|
||||||
throws KeeperException, InterruptedException {
|
throws KeeperException, InterruptedException {
|
||||||
|
|
||||||
List<String> childNames = zookeeper.getChildren(dir, null, true);
|
List<String> childNames = zookeeper.getChildren(dir, null, true);
|
||||||
|
stats.setQueueLength(childNames.size());
|
||||||
for (String childName : childNames) {
|
for (String childName : childNames) {
|
||||||
if (childName != null) {
|
if (childName != null) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -216,7 +216,7 @@ public class Overseer implements Closeable {
|
||||||
final String operation = message.getStr(QUEUE_OPERATION);
|
final String operation = message.getStr(QUEUE_OPERATION);
|
||||||
final TimerContext timerContext = stats.time(operation);
|
final TimerContext timerContext = stats.time(operation);
|
||||||
try {
|
try {
|
||||||
clusterState = processMessage(clusterState, message, operation);
|
clusterState = processMessage(clusterState, message, operation, workQueue.getStats().getQueueLength());
|
||||||
stats.success(operation);
|
stats.success(operation);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// generally there is nothing we can do - in most cases, we have
|
// generally there is nothing we can do - in most cases, we have
|
||||||
|
@ -314,7 +314,7 @@ public class Overseer implements Closeable {
|
||||||
|
|
||||||
final TimerContext timerContext = stats.time(operation);
|
final TimerContext timerContext = stats.time(operation);
|
||||||
try {
|
try {
|
||||||
clusterState = processMessage(clusterState, message, operation);
|
clusterState = processMessage(clusterState, message, operation, workQueue.getStats().getQueueLength());
|
||||||
stats.success(operation);
|
stats.success(operation);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// generally there is nothing we can do - in most cases, we have
|
// generally there is nothing we can do - in most cases, we have
|
||||||
|
@ -461,8 +461,8 @@ public class Overseer implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterState processMessage(ClusterState clusterState,
|
private ClusterState processMessage(ClusterState clusterState,
|
||||||
final ZkNodeProps message, final String operation) {
|
final ZkNodeProps message, final String operation, int queueSize) {
|
||||||
|
log.info("processMessage: queueSize: {}, message = {}", queueSize, message);
|
||||||
CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation);
|
CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation);
|
||||||
if (collectionAction != null) {
|
if (collectionAction != null) {
|
||||||
switch (collectionAction) {
|
switch (collectionAction) {
|
||||||
|
@ -701,7 +701,6 @@ public class Overseer implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterState createReplica(ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState createReplica(ClusterState clusterState, ZkNodeProps message) {
|
||||||
log.info("createReplica() {} ", message);
|
|
||||||
String coll = message.getStr(ZkStateReader.COLLECTION_PROP);
|
String coll = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
if (!checkCollectionKeyExistence(message)) return clusterState;
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
|
||||||
|
@ -724,7 +723,7 @@ public class Overseer implements Closeable {
|
||||||
|
|
||||||
private ClusterState buildCollection(ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState buildCollection(ClusterState clusterState, ZkNodeProps message) {
|
||||||
String collection = message.getStr("name");
|
String collection = message.getStr("name");
|
||||||
log.info("building a new collection: " + collection);
|
log.info("Building a new collection: {}", collection);
|
||||||
if(clusterState.hasCollection(collection) ){
|
if(clusterState.hasCollection(collection) ){
|
||||||
log.warn("Collection {} already exists. exit" ,collection);
|
log.warn("Collection {} already exists. exit" ,collection);
|
||||||
return clusterState;
|
return clusterState;
|
||||||
|
@ -746,7 +745,7 @@ public class Overseer implements Closeable {
|
||||||
private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
|
private ClusterState updateShardState(ClusterState clusterState, ZkNodeProps message) {
|
||||||
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
if (!checkCollectionKeyExistence(message)) return clusterState;
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
log.info("Update shard state invoked for collection: " + collection + " with message: " + message);
|
log.info("Updating shard state for collection: {}", collection);
|
||||||
for (String key : message.keySet()) {
|
for (String key : message.keySet()) {
|
||||||
if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
|
if (ZkStateReader.COLLECTION_PROP.equals(key)) continue;
|
||||||
if (QUEUE_OPERATION.equals(key)) continue;
|
if (QUEUE_OPERATION.equals(key)) continue;
|
||||||
|
@ -932,7 +931,6 @@ public class Overseer implements Closeable {
|
||||||
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
if (!checkCollectionKeyExistence(message)) return clusterState;
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
|
Integer numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, null);
|
||||||
log.info("Update state numShards={} message={}", numShards, message);
|
|
||||||
|
|
||||||
List<String> shardNames = new ArrayList<>();
|
List<String> shardNames = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -1336,7 +1334,7 @@ public class Overseer implements Closeable {
|
||||||
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
|
||||||
if (!checkCollectionKeyExistence(message)) return clusterState;
|
if (!checkCollectionKeyExistence(message)) return clusterState;
|
||||||
|
|
||||||
log.info("Removing collection: " + collection + " shard: " + sliceId + " from clusterstate");
|
log.info("Removing collection: {}, shard: {} from cluster state", collection, sliceId);
|
||||||
|
|
||||||
DocCollection coll = clusterState.getCollection(collection);
|
DocCollection coll = clusterState.getCollection(collection);
|
||||||
|
|
||||||
|
@ -1964,6 +1962,7 @@ public class Overseer implements Closeable {
|
||||||
static final int MAX_STORED_FAILURES = 10;
|
static final int MAX_STORED_FAILURES = 10;
|
||||||
|
|
||||||
final Map<String, Stat> stats = new ConcurrentHashMap<>();
|
final Map<String, Stat> stats = new ConcurrentHashMap<>();
|
||||||
|
private volatile int queueLength;
|
||||||
|
|
||||||
public Map<String, Stat> getStats() {
|
public Map<String, Stat> getStats() {
|
||||||
return stats;
|
return stats;
|
||||||
|
@ -2034,6 +2033,14 @@ public class Overseer implements Closeable {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getQueueLength() {
|
||||||
|
return queueLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setQueueLength(int queueLength) {
|
||||||
|
this.queueLength = queueLength;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Stat {
|
public static class Stat {
|
||||||
|
|
Loading…
Reference in New Issue