From 32fbca6ea7b65043041e622660e07915f04090fe Mon Sep 17 00:00:00 2001 From: markrmiller Date: Fri, 19 Feb 2016 14:33:50 -0500 Subject: [PATCH] SOLR-8694: DistributedMap/Queue can create too many Watchers and some code simplification. --- solr/CHANGES.txt | 3 + .../org/apache/solr/cloud/DistributedMap.java | 152 +----------------- .../apache/solr/cloud/DistributedQueue.java | 6 +- .../java/org/apache/solr/cloud/Overseer.java | 6 +- .../solr/cloud/SizeLimitedDistributedMap.java | 29 ++-- .../handler/admin/CollectionsHandler.java | 8 +- 6 files changed, 34 insertions(+), 170 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 953b2d4cf0b..c0c88561590 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -182,6 +182,9 @@ Bug Fixes * SOLR-8701: CloudSolrClient decides that there are no healthy nodes to handle a request too early. (Mark Miller) +* SOLR-8694: DistributedMap/Queue can create too many Watchers and some code simplification. + (Scott Blum via Mark Miller) + Optimizations ---------------------- * SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java index 8434eb86b54..c3b5690fd26 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java +++ b/solr/core/src/java/org/apache/solr/cloud/DistributedMap.java @@ -22,15 +22,9 @@ import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkCmdExecutor; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; import java.util.List; /** @@ -39,19 +33,13 @@ import java.util.List; * don't have to be ordered i.e. DistributedQueue. */ public class DistributedMap { - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - protected static long DEFAULT_TIMEOUT = 5*60*1000; - protected final String dir; protected SolrZkClient zookeeper; protected final String prefix = "mn-"; - protected final String response_prefix = "mnr-" ; - - public DistributedMap(SolrZkClient zookeeper, String dir, List acl) { + public DistributedMap(SolrZkClient zookeeper, String dir) { this.dir = dir; ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout()); @@ -67,89 +55,13 @@ public class DistributedMap { this.zookeeper = zookeeper; } - protected class LatchChildWatcher implements Watcher { - Object lock = new Object(); - private WatchedEvent event = null; - - public LatchChildWatcher() {} - - public LatchChildWatcher(Object lock) { - this.lock = lock; - } - - @Override - public void process(WatchedEvent event) { - LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: " - + event.getState() + " type " + event.getType()); - synchronized (lock) { - this.event = event; - lock.notifyAll(); - } - } - - public void await(long timeout) throws InterruptedException { - synchronized (lock) { - lock.wait(timeout); - } - } - - public WatchedEvent getWatchedEvent() { - return event; - } + public void put(String trackingId, byte[] data) throws KeeperException, InterruptedException { + zookeeper.makePath(dir + "/" + prefix + trackingId, data, CreateMode.PERSISTENT, null, false, true); } - /** - * Inserts data into zookeeper. - * - * @return true if data was successfully added - */ - protected String createData(String path, byte[] data, CreateMode mode) - throws KeeperException, InterruptedException { - for (;;) { - try { - return zookeeper.create(path, data, mode, true); - } catch (KeeperException.NoNodeException e) { - try { - zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true); - } catch (KeeperException.NodeExistsException ne) { - // someone created it - } - } - } - } - - - public boolean put(String trackingId, byte[] data) throws KeeperException, InterruptedException { - return createData(dir + "/" + prefix + trackingId, data, - CreateMode.PERSISTENT) != null; - } - - /** - * Offer the data and wait for the response - * - */ - public MapEvent put(String trackingId, byte[] data, long timeout) throws KeeperException, - InterruptedException { - String path = createData(dir + "/" + prefix + trackingId, data, - CreateMode.PERSISTENT); - String watchID = createData( - dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1), - null, CreateMode.EPHEMERAL); - Object lock = new Object(); - LatchChildWatcher watcher = new LatchChildWatcher(lock); - synchronized (lock) { - if (zookeeper.exists(watchID, watcher, true) != null) { - watcher.await(timeout); - } - } - byte[] bytes = zookeeper.getData(watchID, null, null, true); - zookeeper.delete(watchID, -1, true); - return new MapEvent(watchID, bytes, watcher.getWatchedEvent()); - } - - public MapEvent get(String trackingId) throws KeeperException, InterruptedException { - return new MapEvent(trackingId, zookeeper.getData(dir + "/" + prefix + trackingId, null, null, true), null); + public byte[] get(String trackingId) throws KeeperException, InterruptedException { + return zookeeper.getData(dir + "/" + prefix + trackingId, null, null, true); } public boolean contains(String trackingId) throws KeeperException, InterruptedException { @@ -187,58 +99,4 @@ public class DistributedMap { } - public static class MapEvent { - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((id == null) ? 0 : id.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (obj == null) return false; - if (getClass() != obj.getClass()) return false; - MapEvent other = (MapEvent) obj; - if (id == null) { - if (other.id != null) return false; - } else if (!id.equals(other.id)) return false; - return true; - } - - private WatchedEvent event = null; - private String id; - private byte[] bytes; - - MapEvent(String id, byte[] bytes, WatchedEvent event) { - this.id = id; - this.bytes = bytes; - this.event = event; - } - - public void setId(String id) { - this.id = id; - } - - public String getId() { - return id; - } - - public void setBytes(byte[] bytes) { - this.bytes = bytes; - } - - public byte[] getBytes() { - return bytes; - } - - public WatchedEvent getWatchedEvent() { - return event; - } - - } - - } diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java index 87d92c668dd..e424b7e89e6 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java +++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java @@ -318,7 +318,7 @@ public class DistributedQueue { } return orderedChildren; } catch (KeeperException.NoNodeException e) { - zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true); + zookeeper.makePath(dir, false, true); // go back to the loop and try again } } @@ -408,6 +408,10 @@ public class DistributedQueue { @Override public void process(WatchedEvent event) { + // session events are not change events, and do not remove the watcher; except for Expired + if (Event.EventType.None.equals(event.getType()) && !Event.KeeperState.Expired.equals(event.getState())) { + return; + } updateLock.lock(); try { // this watcher is automatically cleared when fired diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 3fe2e5c3c21..8dfacb10590 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -912,19 +912,19 @@ public class Overseer implements Closeable { /* Internal map for failed tasks, not to be used outside of the Overseer */ static DistributedMap getRunningMap(final SolrZkClient zkClient) { createOverseerNode(zkClient); - return new DistributedMap(zkClient, "/overseer/collection-map-running", null); + return new DistributedMap(zkClient, "/overseer/collection-map-running"); } /* Size-limited map for successfully completed tasks*/ static DistributedMap getCompletedMap(final SolrZkClient zkClient) { createOverseerNode(zkClient); - return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed", null, NUM_RESPONSES_TO_STORE); + return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed", NUM_RESPONSES_TO_STORE); } /* Map for failed tasks, not to be used outside of the Overseer */ static DistributedMap getFailureMap(final SolrZkClient zkClient) { createOverseerNode(zkClient); - return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", null, NUM_RESPONSES_TO_STORE); + return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", NUM_RESPONSES_TO_STORE); } /* Collection creation queue */ diff --git a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java index 418eb664641..3326dcae4a3 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java +++ b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java @@ -33,41 +33,40 @@ public class SizeLimitedDistributedMap extends DistributedMap { private final int maxSize; - public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, List acl, int maxSize) { - super(zookeeper, dir, acl); + public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize) { + super(zookeeper, dir); this.maxSize = maxSize; } - + @Override - public boolean put(String trackingId, byte[] data) throws KeeperException, InterruptedException { - if(this.size() >= maxSize) { + public void put(String trackingId, byte[] data) throws KeeperException, InterruptedException { + if (this.size() >= maxSize) { // Bring down the size List children = zookeeper.getChildren(dir, null, true); int cleanupSize = maxSize / 10; - + final PriorityQueue priorityQueue = new PriorityQueue(cleanupSize) { @Override protected boolean lessThan(Long a, Long b) { return (a > b); } }; - - for(String child: children) { + + for (String child : children) { Stat stat = zookeeper.exists(dir + "/" + child, null, true); priorityQueue.insertWithOverflow(stat.getMzxid()); } - + long topElementMzxId = (Long) priorityQueue.top(); - - for(String child:children) { + + for (String child : children) { Stat stat = zookeeper.exists(dir + "/" + child, null, true); - if(stat.getMzxid() <= topElementMzxId) + if (stat.getMzxid() <= topElementMzxId) zookeeper.delete(dir + "/" + child, -1, true); } } - - return createData(dir + "/" + prefix + trackingId, data, - CreateMode.PERSISTENT) != null; + + super.put(trackingId, data); } } diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index c9698ae2e7f..74507d199e1 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -564,12 +564,12 @@ public class CollectionsHandler extends RequestHandlerBase { final NamedList results = new NamedList<>(); if (zkController.getOverseerCompletedMap().contains(requestId)) { - final DistributedMap.MapEvent mapEvent = zkController.getOverseerCompletedMap().get(requestId); - rsp.getValues().addAll(SolrResponse.deserialize(mapEvent.getBytes()).getResponse()); + final byte[] mapEntry = zkController.getOverseerCompletedMap().get(requestId); + rsp.getValues().addAll(SolrResponse.deserialize(mapEntry).getResponse()); addStatusToResponse(results, COMPLETED, "found [" + requestId + "] in completed tasks"); } else if (zkController.getOverseerFailureMap().contains(requestId)) { - final DistributedMap.MapEvent mapEvent = zkController.getOverseerFailureMap().get(requestId); - rsp.getValues().addAll(SolrResponse.deserialize(mapEvent.getBytes()).getResponse()); + final byte[] mapEntry = zkController.getOverseerFailureMap().get(requestId); + rsp.getValues().addAll(SolrResponse.deserialize(mapEntry).getResponse()); addStatusToResponse(results, FAILED, "found [" + requestId + "] in failed tasks"); } else if (zkController.getOverseerRunningMap().contains(requestId)) { addStatusToResponse(results, RUNNING, "found [" + requestId + "] in running tasks");