SOLR-6370: Allow tests to report/fail on many ZK watches being parallelly requested on the same data (This closes #84)

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1626253 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2014-09-19 15:48:44 +00:00
parent 1c2918f66d
commit 9f9be3f3c2
1 changed files with 265 additions and 34 deletions

View File

@ -26,13 +26,27 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.JMException;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.AtomicLongMap;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.jmx.ManagedUtil;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.SessionTracker.Session;
@ -58,10 +72,18 @@ public class ZkTestServer {
private int theTickTime = TICK_TIME;
static public enum LimitViolationAction {
IGNORE,
REPORT,
FAIL,
}
class ZKServerMain {
private ServerCnxnFactory cnxnFactory;
private ZooKeeperServer zooKeeperServer;
private LimitViolationAction violationReportAction = LimitViolationAction.REPORT;
private WatchLimiter limiter = new WatchLimiter(1, LimitViolationAction.IGNORE);
protected void initializeAndRun(String[] args) throws ConfigException,
IOException {
@ -81,6 +103,182 @@ public class ZkTestServer {
runFromConfig(config);
}
private class WatchLimit {
private long limit;
private final String desc;
private LimitViolationAction action;
private AtomicLongMap<String> counters = AtomicLongMap.create();
private ConcurrentHashMap<String,Long> maxCounters = new ConcurrentHashMap<>();
WatchLimit(long limit, String desc, LimitViolationAction action) {
this.limit = limit;
this.desc = desc;
this.action = action;
}
public void setAction(LimitViolationAction action) {
this.action = action;
}
public void setLimit(long limit) {
this.limit = limit;
}
public void updateForWatch(String key, Watcher watcher) {
if (watcher != null) {
log.debug("Watch added: {}: {}", desc, key);
long count = counters.incrementAndGet(key);
Long lastCount = maxCounters.get(key);
if (lastCount == null || count > lastCount) {
maxCounters.put(key, count);
}
if (count > limit && action != LimitViolationAction.IGNORE) {
String msg = "Number of watches created in parallel for data: " + key +
", type: " + desc + " exceeds limit (" + count + " > " + limit + ")";
log.warn("{}", msg);
if (action == LimitViolationAction.FAIL) throw new AssertionError(msg);
}
}
}
public void updateForFire(WatchedEvent event) {
log.debug("Watch fired: {}: {}", desc, event.getPath());
counters.decrementAndGet(event.getPath());
}
private String reportLimitViolations() {
Object[] maxKeys = maxCounters.keySet().toArray();
Arrays.sort(maxKeys, new Comparator<Object>() {
private final Comparator<Long> valComp = Ordering.natural().reverse();
@Override
public int compare(Object o1, Object o2) {
return valComp.compare(maxCounters.get(o1), maxCounters.get(o2));
}
});
StringBuilder sb = new StringBuilder();
boolean first = true;
for (Object key : maxKeys) {
long value = maxCounters.get(key);
if (value <= limit) continue;
if (first) {
sb.append("\nMaximum concurrent ").append(desc).append(" watches above limit:\n\n");
first = false;
}
sb.append("\t").append(maxCounters.get(key)).append('\t').append(key).append('\n');
}
return sb.toString();
}
}
public class WatchLimiter {
WatchLimit statLimit;
WatchLimit dataLimit;
WatchLimit childrenLimit;
private WatchLimiter (long limit, LimitViolationAction action) {
statLimit = new WatchLimit(limit, "create/delete", action);
dataLimit = new WatchLimit(limit, "data", action);
childrenLimit = new WatchLimit(limit, "children", action);
}
public void setAction(LimitViolationAction action) {
statLimit.setAction(action);
dataLimit.setAction(action);
childrenLimit.setAction(action);
}
public void setLimit(long limit) {
statLimit.setLimit(limit);
dataLimit.setLimit(limit);
childrenLimit.setLimit(limit);
}
public String reportLimitViolations() {
return statLimit.reportLimitViolations() +
dataLimit.reportLimitViolations() +
childrenLimit.reportLimitViolations();
}
private void updateForFire(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
case NodeDeleted:
statLimit.updateForFire(event);
break;
case NodeDataChanged:
dataLimit.updateForFire(event);
break;
case NodeChildrenChanged:
childrenLimit.updateForFire(event);
break;
}
}
}
private class TestServerCnxn extends NIOServerCnxn {
private final WatchLimiter limiter;
public TestServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk,
NIOServerCnxnFactory factory, WatchLimiter limiter) throws IOException {
super(zk, sock, sk, factory);
this.limiter = limiter;
}
@Override
public synchronized void process(WatchedEvent event) {
limiter.updateForFire(event);
super.process(event);
}
}
private class TestServerCnxnFactory extends NIOServerCnxnFactory {
private final WatchLimiter limiter;
public TestServerCnxnFactory(WatchLimiter limiter) throws IOException {
super();
this.limiter = limiter;
}
@Override
protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) throws IOException {
return new TestServerCnxn(zkServer, sock, sk, this, limiter);
}
}
private class TestZKDatabase extends ZKDatabase {
private final WatchLimiter limiter;
public TestZKDatabase(FileTxnSnapLog snapLog, WatchLimiter limiter) {
super(snapLog);
this.limiter = limiter;
}
@Override
public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {
limiter.statLimit.updateForWatch(path, serverCnxn);
return super.statNode(path, serverCnxn);
}
@Override
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
limiter.dataLimit.updateForWatch(path, watcher);
return super.getData(path, stat, watcher);
}
@Override
public List<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
limiter.childrenLimit.updateForWatch(path, watcher);
return super.getChildren(path, stat, watcher);
}
}
/**
* Run from a ServerConfig.
* @param config ServerConfig to use.
@ -93,15 +291,12 @@ public class ZkTestServer {
// so rather than spawning another thread, we will just call
// run() in this thread.
// create a file logger url from the command line args
zooKeeperServer = new ZooKeeperServer();
FileTxnSnapLog ftxn = new FileTxnSnapLog(new File(
config.getDataLogDir()), new File(config.getDataDir()));
zooKeeperServer.setTxnLogFactory(ftxn);
zooKeeperServer.setTickTime(config.getTickTime());
zooKeeperServer.setMinSessionTimeout(config.getMinSessionTimeout());
zooKeeperServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
cnxnFactory = ServerCnxnFactory.createFactory();
zooKeeperServer = new ZooKeeperServer(ftxn, config.getTickTime(),
config.getMinSessionTimeout(), config.getMaxSessionTimeout(),
null /* this is not used */, new TestZKDatabase(ftxn, limiter));
cnxnFactory = new TestServerCnxnFactory(limiter);
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
cnxnFactory.startup(zooKeeperServer);
@ -109,6 +304,15 @@ public class ZkTestServer {
// if (zooKeeperServer.isRunning()) {
zkServer.shutdown();
// }
if (violationReportAction != LimitViolationAction.IGNORE) {
String limitViolations = limiter.reportLimitViolations();
if (!limitViolations.isEmpty()) {
log.warn("Watch limit violations: {}", limitViolations);
if (violationReportAction == LimitViolationAction.FAIL) {
throw new AssertionError("Parallel watch limits violated");
}
}
}
} catch (InterruptedException e) {
// warn, but generally this is ok
log.warn("Server interrupted", e);
@ -153,6 +357,14 @@ public class ZkTestServer {
}
return port;
}
public void setViolationReportAction(LimitViolationAction violationReportAction) {
this.violationReportAction = violationReportAction;
}
public WatchLimiter getLimiter() {
return limiter;
}
}
public ZkTestServer(String zkDir) {
@ -162,6 +374,16 @@ public class ZkTestServer {
public ZkTestServer(String zkDir, int port) {
this.zkDir = zkDir;
this.clientPort = port;
String reportAction = System.getProperty("tests.zk.violationReportAction");
if (reportAction != null) {
log.info("Overriding violation report action to: {}", reportAction);
setViolationReportAction(LimitViolationAction.valueOf(reportAction));
}
String limiterAction = System.getProperty("tests.zk.limiterAction");
if (limiterAction != null) {
log.info("Overriding limiter action to: {}", limiterAction);
getLimiter().setAction(LimitViolationAction.valueOf(limiterAction));
}
}
public String getZkHost() {
@ -182,14 +404,17 @@ public class ZkTestServer {
public long getSessionId() {
return sessionId;
}
@Override
public int getTimeout() {
return 4000;
}
@Override
public boolean isClosing() {
return false;
}});
}
});
}
public void run() throws InterruptedException {
@ -311,9 +536,8 @@ public class ZkTestServer {
throws IOException
{
log.info("connecting to " + host + " " + port);
Socket sock = new Socket(host, port);
BufferedReader reader = null;
try {
try (Socket sock = new Socket(host, port)) {
OutputStream outstream = sock.getOutputStream();
outstream.write(cmd.getBytes(StandardCharsets.US_ASCII));
outstream.flush();
@ -325,12 +549,11 @@ public class ZkTestServer {
new InputStreamReader(sock.getInputStream(), "US-ASCII"));
StringBuilder sb = new StringBuilder();
String line;
while((line = reader.readLine()) != null) {
sb.append(line + "\n");
while ((line = reader.readLine()) != null) {
sb.append(line).append("\n");
}
return sb.toString();
} finally {
sock.close();
if (reader != null) {
reader.close();
}
@ -364,4 +587,12 @@ public class ZkTestServer {
public String getZkDir() {
return zkDir;
}
public void setViolationReportAction(LimitViolationAction violationReportAction) {
zkServer.setViolationReportAction(violationReportAction);
}
public ZKServerMain.WatchLimiter getLimiter() {
return zkServer.getLimiter();
}
}