diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java index 2719c8857ee..592ff36695d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/metrics/TopMetrics.java @@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op; import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.User; @@ -137,8 +136,6 @@ public class TopMetrics implements MetricsSource { for (RollingWindowManager rollingWindowManager : rollingWindowManagers .values()) { rollingWindowManager.recordMetric(currTime, cmd, userName, 1); - rollingWindowManager.recordMetric(currTime, - TopConf.ALL_CMDS, userName, 1); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java index 63ff12585bc..65611f3bacd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindow.java @@ -112,8 +112,8 @@ public class RollingWindow { * as well as atomic fields. */ private class Bucket { - AtomicLong value = new AtomicLong(0); - AtomicLong updateTime = new AtomicLong(0); + private AtomicLong value = new AtomicLong(0); + private AtomicLong updateTime = new AtomicLong(-1); // -1 = never updated. /** * Check whether the last time that the bucket was updated is no longer @@ -124,7 +124,7 @@ public class RollingWindow { */ boolean isStaleNow(long time) { long utime = updateTime.get(); - return time - utime >= windowLenMs; + return (utime == -1) || (time - utime >= windowLenMs); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java index bdd0ab01b97..c3a68ff6cf6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/top/window/RollingWindowManager.java @@ -17,20 +17,22 @@ */ package org.apache.hadoop.hdfs.server.namenode.top.window; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Stack; import java.util.concurrent.ConcurrentHashMap; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair; -import org.apache.hadoop.metrics2.util.Metrics2Util.TopN; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,11 +68,15 @@ public class RollingWindowManager { public TopWindow(int windowMillis) { this.windowMillis = windowMillis; - this.top = Lists.newArrayList(); + this.top = new LinkedList<>(); } public void addOp(Op op) { - top.add(op); + if (op.getOpType().equals(TopConf.ALL_CMDS)) { + top.add(0, op); + } else { + top.add(op); + } } public int getWindowLenMs() { @@ -86,41 +92,59 @@ public class RollingWindowManager { * Represents an operation within a TopWindow. It contains a ranked * set of the top users for the operation. */ - public static class Op { + public static class Op implements Comparable { private final String opType; - private final List topUsers; + private final List users; private final long totalCount; + private final int limit; - public Op(String opType, long totalCount) { + public Op(String opType, UserCounts users, int limit) { this.opType = opType; - this.topUsers = Lists.newArrayList(); - this.totalCount = totalCount; - } - - public void addUser(User u) { - topUsers.add(u); + this.users = new ArrayList<>(users); + this.users.sort(Collections.reverseOrder()); + this.totalCount = users.getTotal(); + this.limit = limit; } public String getOpType() { return opType; } + public List getAllUsers() { + return users; + } + public List getTopUsers() { - return topUsers; + return (users.size() > limit) ? users.subList(0, limit) : users; } public long getTotalCount() { return totalCount; } + + @Override + public int compareTo(Op other) { + return Long.signum(totalCount - other.totalCount); + } + + @Override + public boolean equals(Object o) { + return (o instanceof Op) && totalCount == ((Op)o).totalCount; + } + + @Override + public int hashCode() { + return opType.hashCode(); + } } /** * Represents a user who called an Op within a TopWindow. Specifies the * user and the number of times the user called the operation. */ - public static class User { + public static class User implements Comparable { private final String user; - private final long count; + private long count; public User(String user, long count) { this.user = user; @@ -134,6 +158,56 @@ public class RollingWindowManager { public long getCount() { return count; } + + public void add(long delta) { + count += delta; + } + + @Override + public int compareTo(User other) { + return Long.signum(count - other.count); + } + + @Override + public boolean equals(Object o) { + return (o instanceof User) && user.equals(((User)o).user); + } + + @Override + public int hashCode() { + return user.hashCode(); + } + } + + private static class UserCounts extends ArrayList { + private long total = 0; + + UserCounts(int capacity) { + super(capacity); + } + + @Override + public boolean add(User user) { + long count = user.getCount(); + int i = indexOf(user); + if (i == -1) { + super.add(new User(user.getUser(), count)); + } else { + get(i).add(count); + } + total += count; + return true; + } + + @Override + public boolean addAll(Collection users) { + users.forEach(user -> add(user)); + return true; + } + + public long getTotal() { + return total; + } } /** @@ -142,7 +216,7 @@ public class RollingWindowManager { * operated on that metric. */ public ConcurrentHashMap metricMap = - new ConcurrentHashMap(); + new ConcurrentHashMap<>(); public RollingWindowManager(Configuration conf, int reportingPeriodMs) { @@ -184,35 +258,33 @@ public class RollingWindowManager { * * @param time the current time * @return a TopWindow describing the top users for each metric in the - * window. + * window. */ public TopWindow snapshot(long time) { TopWindow window = new TopWindow(windowLenMs); Set metricNames = metricMap.keySet(); LOG.debug("iterating in reported metrics, size={} values={}", metricNames.size(), metricNames); + UserCounts totalCounts = new UserCounts(metricMap.size()); for (Map.Entry entry : metricMap.entrySet()) { String metricName = entry.getKey(); RollingWindowMap rollingWindows = entry.getValue(); - TopN topN = getTopUsersForMetric(time, metricName, rollingWindows); - final int size = topN.size(); - if (size == 0) { - continue; - } - Op op = new Op(metricName, topN.getTotal()); - window.addOp(op); - // Reverse the users from the TopUsers using a stack, - // since we'd like them sorted in descending rather than ascending order - Stack reverse = new Stack(); - for (int i = 0; i < size; i++) { - reverse.push(topN.poll()); - } - for (int i = 0; i < size; i++) { - NameValuePair userEntry = reverse.pop(); - User user = new User(userEntry.getName(), userEntry.getValue()); - op.addUser(user); + UserCounts topN = getTopUsersForMetric(time, metricName, rollingWindows); + if (!topN.isEmpty()) { + window.addOp(new Op(metricName, topN, topUsersCnt)); + totalCounts.addAll(topN); } } + // synthesize the overall total op count with the top users for every op. + Set topUsers = new HashSet<>(); + for (Op op : window.getOps()) { + topUsers.addAll(op.getTopUsers()); + } + // intersect totals with the top users. + totalCounts.retainAll(topUsers); + // allowed to exceed the per-op topUsersCnt to capture total ops for + // any user + window.addOp(new Op(TopConf.ALL_CMDS, totalCounts, Integer.MAX_VALUE)); return window; } @@ -223,9 +295,9 @@ public class RollingWindowManager { * @param metricName Name of metric * @return */ - private TopN getTopUsersForMetric(long time, String metricName, + private UserCounts getTopUsersForMetric(long time, String metricName, RollingWindowMap rollingWindows) { - TopN topN = new TopN(topUsersCnt); + UserCounts topN = new UserCounts(topUsersCnt); Iterator> iterator = rollingWindows.entrySet().iterator(); while (iterator.hasNext()) { @@ -242,7 +314,7 @@ public class RollingWindowManager { } LOG.debug("offer window of metric: {} userName: {} sum: {}", metricName, userName, windowSum); - topN.offer(new NameValuePair(userName, windowSum)); + topN.add(new User(userName, windowSum)); } LOG.debug("topN users size for command {} is: {}", metricName, topN.size()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java index 494ed08dbea..f025531269e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/top/window/TestRollingWindowManager.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hdfs.server.namenode.top.window; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.apache.hadoop.hdfs.server.namenode.top.TopConf; + +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -56,7 +60,7 @@ public class TestRollingWindowManager { } @Test - public void testTops() { + public void testTops() throws Exception { long time = WINDOW_LEN_MS + BUCKET_LEN * 3 / 2; for (int i = 0; i < users.length; i++) manager.recordMetric(time, "open", users[i], (i + 1) * 2); @@ -66,11 +70,12 @@ public class TestRollingWindowManager { time++; TopWindow tops = manager.snapshot(time); - assertEquals("Unexpected number of ops", 2, tops.getOps().size()); + assertEquals("Unexpected number of ops", 3, tops.getOps().size()); + assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType()); for (Op op : tops.getOps()) { final List topUsers = op.getTopUsers(); assertEquals("Unexpected number of users", N_TOP_USERS, topUsers.size()); - if (op.getOpType() == "open") { + if (op.getOpType().equals("open")) { for (int i = 0; i < topUsers.size(); i++) { User user = topUsers.get(i); assertEquals("Unexpected count for user " + user.getUser(), @@ -86,8 +91,9 @@ public class TestRollingWindowManager { // move the window forward not to see the "open" results time += WINDOW_LEN_MS - 2; tops = manager.snapshot(time); - assertEquals("Unexpected number of ops", 1, tops.getOps().size()); - final Op op = tops.getOps().get(0); + assertEquals("Unexpected number of ops", 2, tops.getOps().size()); + assertEquals(TopConf.ALL_CMDS, tops.getOps().get(0).getOpType()); + final Op op = tops.getOps().get(1); assertEquals("Should only see close ops", "close", op.getOpType()); final List topUsers = op.getTopUsers(); for (int i = 0; i < topUsers.size(); i++) { @@ -99,4 +105,158 @@ public class TestRollingWindowManager { assertEquals("Unexpected total count for op", (1 + users.length) * (users.length / 2), op.getTotalCount()); } + + @Test + public void windowReset() throws Exception { + Configuration config = new Configuration(); + config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1); + config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS); + int period = 2; + RollingWindowManager rollingWindowManager = + new RollingWindowManager(config, period); + rollingWindowManager.recordMetric(0, "op1", users[0], 3); + checkValues(rollingWindowManager, 0, "op1", 3, 3); + checkValues(rollingWindowManager, period - 1, "op1", 3, 3); + checkValues(rollingWindowManager, period, "op1", 0, 0); + } + + @Test + public void testTotal() throws Exception { + Configuration config = new Configuration(); + config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1); + config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS); + int period = 10; + RollingWindowManager rollingWindowManager = + new RollingWindowManager(config, period); + + long t = 0; + rollingWindowManager.recordMetric(t, "op1", users[0], 3); + checkValues(rollingWindowManager, t, "op1", 3, 3); + + // both should have a value. + t = (long)(period * .5); + rollingWindowManager.recordMetric(t, "op2", users[0], 4); + checkValues(rollingWindowManager, t, "op1", 3, 7); + checkValues(rollingWindowManager, t, "op2", 4, 7); + + // neither should reset. + t = period - 1; + checkValues(rollingWindowManager, t, "op1", 3, 7); + checkValues(rollingWindowManager, t, "op2", 4, 7); + + // op1 should reset in its next period, but not op2. + t = period; + rollingWindowManager.recordMetric(10, "op1", users[0], 10); + checkValues(rollingWindowManager, t, "op1", 10, 14); + checkValues(rollingWindowManager, t, "op2", 4, 14); + + // neither should reset. + t = (long)(period * 1.25); + rollingWindowManager.recordMetric(t, "op2", users[0], 7); + checkValues(rollingWindowManager, t, "op1", 10, 21); + checkValues(rollingWindowManager, t, "op2", 11, 21); + + // op2 should reset. + t = (long)(period * 1.5); + rollingWindowManager.recordMetric(t, "op2", users[0], 13); + checkValues(rollingWindowManager, t, "op1", 10, 23); + checkValues(rollingWindowManager, t, "op2", 13, 23); + } + + @Test + public void testWithFuzzing() throws Exception { + Configuration config = new Configuration(); + config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1); + config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, N_TOP_USERS); + int period = users.length/2; + RollingWindowManager rollingWindowManager = + new RollingWindowManager(config, period); + + String[] ops = {"op1", "op2", "op3", "op4"}; + Random rand = new Random(); + for (int i=0; i < 10000; i++) { + rollingWindowManager.recordMetric(i, ops[rand.nextInt(ops.length)], + users[rand.nextInt(users.length)], + rand.nextInt(100)); + TopWindow window = rollingWindowManager.snapshot(i); + checkTotal(window); + } + } + + @Test + public void testOpTotal() throws Exception { + int numTopUsers = 2; + Configuration config = new Configuration(); + config.setInt(DFSConfigKeys.NNTOP_BUCKETS_PER_WINDOW_KEY, 1); + config.setInt(DFSConfigKeys.NNTOP_NUM_USERS_KEY, numTopUsers); + int period = users.length/2; + RollingWindowManager rollingWindowManager = + new RollingWindowManager(config, period); + + int numOps = 3; + rollingWindowManager.recordMetric(0, "op1", "user1", 10); + rollingWindowManager.recordMetric(0, "op1", "user2", 20); + rollingWindowManager.recordMetric(0, "op1", "user3", 30); + + rollingWindowManager.recordMetric(0, "op2", "user1", 1); + rollingWindowManager.recordMetric(0, "op2", "user4", 40); + rollingWindowManager.recordMetric(0, "op2", "user5", 50); + + rollingWindowManager.recordMetric(0, "op3", "user6", 1); + rollingWindowManager.recordMetric(0, "op3", "user7", 11); + rollingWindowManager.recordMetric(0, "op3", "user8", 1); + + TopWindow window = rollingWindowManager.snapshot(0); + Assert.assertEquals(numOps + 1, window.getOps().size()); + + Op allOp = window.getOps().get(0); + Assert.assertEquals(TopConf.ALL_CMDS, allOp.getOpType()); + List topUsers = allOp.getTopUsers(); + Assert.assertEquals(numTopUsers * numOps, topUsers.size()); + // ensure all the top users for each op are present in the total op. + for (int i = 1; i < numOps; i++) { + Assert.assertTrue( + topUsers.containsAll(window.getOps().get(i).getTopUsers())); + } + } + + private void checkValues(RollingWindowManager rwManager, long time, + String opType, long value, long expectedTotal) throws Exception { + TopWindow window = rwManager.snapshot(time); + for (Op windowOp : window.getOps()) { + if (opType.equals(windowOp.getOpType())) { + assertEquals(value, windowOp.getTotalCount()); + break; + } + } + assertEquals(expectedTotal, checkTotal(window)); + } + + private long checkTotal(TopWindow window) throws Exception { + long allOpTotal = 0; + long computedOpTotal = 0; + + Map userOpTally = new HashMap<>(); + for (String user : users) { + userOpTally.put(user, new User(user, 0)); + } + for (Op windowOp : window.getOps()) { + int multiplier; + if (TopConf.ALL_CMDS.equals(windowOp.getOpType())) { + multiplier = -1; + allOpTotal += windowOp.getTotalCount(); + } else { + multiplier = 1; + computedOpTotal += windowOp.getTotalCount(); + } + for (User user : windowOp.getAllUsers()) { + userOpTally.get(user.getUser()).add((int)(multiplier*user.getCount())); + } + } + assertEquals(allOpTotal, computedOpTotal); + for (String user : userOpTally.keySet()) { + assertEquals(0, userOpTally.get(user).getCount()); + } + return computedOpTotal; + } }