From 8bd7bffb36ce1d76eb20274e8f130f76cea86bf6 Mon Sep 17 00:00:00 2001 From: nkeywal Date: Tue, 12 Feb 2013 10:20:34 +0000 Subject: [PATCH] HBASE-7789 Clean DeadServer.java and add a Jitter method in ConnectionUtils git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1445087 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/client/ConnectionUtils.java | 15 ++ .../hadoop/hbase/master/DeadServer.java | 175 +++++++++--------- .../apache/hadoop/hbase/master/HMaster.java | 3 +- .../hbase/master/MasterStatusServlet.java | 2 +- .../hadoop/hbase/master/ServerManager.java | 17 +- .../master/handler/ServerShutdownHandler.java | 2 +- .../hbase/master/TestAssignmentManager.java | 1 + .../hadoop/hbase/master/TestDeadServer.java | 72 ++++++- .../hbase/master/TestRollingRestart.java | 2 +- 9 files changed, 173 insertions(+), 116 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index d368b243d06..b97608ad39d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -49,4 +49,19 @@ public class ConnectionUtils { long jitter = (long)(normalPause * RANDOM.nextFloat() * 0.01f); // 1% possible jitter return normalPause + jitter; } + + + /** + * Adds / subs a 10% jitter to a pause time. Minimum is 1. + * @param pause the expected pause. + * @param jitter the jitter ratio, between 0 and 1, exclusive. + */ + public static long addJitter(final long pause, final float jitter) { + float lag = pause * (RANDOM.nextFloat() - 0.5f) * jitter; + long newPause = pause + (long) lag; + if (newPause <= 0) { + return 1; + } + return newPause; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java index 5ba323adad5..1c226f7a0df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java @@ -18,20 +18,26 @@ */ package org.apache.hadoop.hbase.master; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * Class to hold dead servers list and utility querying dead server list. */ @InterfaceAudience.Private -public class DeadServer implements Set { +public class DeadServer { /** * Set of known dead servers. On znode expiration, servers are added here. * This is needed in case of a network partitioning where the server's lease @@ -39,75 +45,66 @@ public class DeadServer implements Set { * and it's server logs are recovered, it will be told to call server startup * because by then, its regions have probably been reassigned. */ - private final Set deadServers = new HashSet(); - - /** Number of dead servers currently being processed */ - private int numProcessing; - - public DeadServer() { - super(); - this.numProcessing = 0; - } + private final Map deadServers = new HashMap(); /** - * @param serverName Server name - * @return true if server is dead + * Number of dead servers currently being processed */ - public boolean isDeadServer(final String serverName) { - return isDeadServer(new ServerName(serverName)); - } + private int numProcessing = 0; /** - * A dead server that comes back alive has a different start code. + * A dead server that comes back alive has a different start code. The new start code should be + * greater than the old one, but we don't take this into account in this method. + * * @param newServerName Servername as either host:port or - * host,port,startcode. + * host,port,startcode. * @return true if this server was dead before and coming back alive again */ - public boolean cleanPreviousInstance(final ServerName newServerName) { - ServerName sn = - ServerName.findServerWithSameHostnamePort(this.deadServers, newServerName); - if (sn == null) return false; - return this.deadServers.remove(sn); + public synchronized boolean cleanPreviousInstance(final ServerName newServerName) { + Iterator it = deadServers.keySet().iterator(); + while (it.hasNext()) { + ServerName sn = it.next(); + if (ServerName.isSameHostnameAndPort(sn, newServerName)) { + it.remove(); + return true; + } + } + + return false; } /** * @param serverName * @return true if this server is on the dead servers list. */ - boolean isDeadServer(final ServerName serverName) { - return this.deadServers.contains(serverName); - } - - /** - * @return True if we have a server with matching hostname and port. - */ - boolean isDeadServerWithSameHostnamePort(final ServerName serverName) { - return ServerName.findServerWithSameHostnamePort(this.deadServers, - serverName) != null; + public synchronized boolean isDeadServer(final ServerName serverName) { + return deadServers.containsKey(serverName); } /** * Checks if there are currently any dead servers being processed by the * master. Returns true if at least one region server is currently being * processed as dead. + * * @return true if any RS are being processed as dead */ - public boolean areDeadServersInProgress() { + public synchronized boolean areDeadServersInProgress() { return numProcessing != 0; } - public synchronized Set clone() { - Set clone = new HashSet(this.deadServers.size()); - clone.addAll(this.deadServers); + public synchronized Set copyServerNames() { + Set clone = new HashSet(deadServers.size()); + clone.addAll(deadServers.keySet()); return clone; } public synchronized boolean add(ServerName e) { this.numProcessing++; - return deadServers.add(e); + return deadServers.put(e, EnvironmentEdgeManager.currentTimeMillis()) != null; } - public synchronized void finish(ServerName e) { + @SuppressWarnings("UnusedParameters") + public synchronized void finish(ServerName ignored) { this.numProcessing--; } @@ -119,55 +116,51 @@ public class DeadServer implements Set { return deadServers.isEmpty(); } - public synchronized boolean contains(Object o) { - return deadServers.contains(o); - } - - public Iterator iterator() { - return this.deadServers.iterator(); - } - - public synchronized Object[] toArray() { - return deadServers.toArray(); - } - - public synchronized T[] toArray(T[] a) { - return deadServers.toArray(a); - } - - public synchronized boolean remove(Object o) { - return this.deadServers.remove(o); - } - - public synchronized boolean containsAll(Collection c) { - return deadServers.containsAll(c); - } - - public synchronized boolean addAll(Collection c) { - return deadServers.addAll(c); - } - - public synchronized boolean retainAll(Collection c) { - return deadServers.retainAll(c); - } - - public synchronized boolean removeAll(Collection c) { - return deadServers.removeAll(c); - } - - public synchronized void clear() { - throw new NotImplementedException(); - } - - public synchronized boolean equals(Object o) { - return deadServers.equals(o); - } - - public synchronized int hashCode() { - return deadServers.hashCode(); + public synchronized void cleanAllPreviousInstances(final ServerName newServerName) { + Iterator it = deadServers.keySet().iterator(); + while (it.hasNext()) { + ServerName sn = it.next(); + if (ServerName.isSameHostnameAndPort(sn, newServerName)) { + it.remove(); + } + } } public synchronized String toString() { - return this.deadServers.toString(); + StringBuilder sb = new StringBuilder(); + for (ServerName sn : deadServers.keySet()) { + if (sb.length() > 0) { + sb.append(", "); + } + sb.append(sn.toString()); + } + return sb.toString(); } + + /** + * Extract all the servers dead since a given time, and sort them. + * @param ts the time, 0 for all + * @return a sorted array list, by death time, lowest values first. + */ + public synchronized List> copyDeadServersSince(long ts){ + List> res = new ArrayList>(size()); + + for (Map.Entry entry:deadServers.entrySet()){ + if (entry.getValue() >= ts){ + res.add(new Pair(entry.getKey(), entry.getValue())); + } + } + + Collections.sort(res, ServerNameDeathDateComparator); + return res; + } + + private static Comparator> ServerNameDeathDateComparator = + new Comparator>(){ + + @Override + public int compare(Pair o1, Pair o2) { + return o1.getSecond().compareTo(o2.getSecond()); + } + }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index fde49ce59e6..9ccff6f5c46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -579,7 +579,6 @@ Server { * @param abortable If fatal exception we'll call abort on this. May be null. * If it is we'll use the Connection associated with the passed * {@link Configuration} as our {@link Abortable}. - * @param defaultTimeout Timeout to use. Pass zero for no timeout * ({@link Object#wait(long)} when passed a 0 waits for ever). * @throws IOException */ @@ -1859,7 +1858,7 @@ Server { return new ClusterStatus(VersionInfo.getVersion(), this.fileSystemManager.getClusterId().toString(), this.serverManager.getOnlineServers(), - this.serverManager.getDeadServers(), + this.serverManager.getDeadServers().copyServerNames(), this.serverName, backupMasters, this.assignmentManager.getRegionStates().getRegionsInTransition(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java index 79b380274a0..b8f8aeeb435 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStatusServlet.java @@ -63,7 +63,7 @@ public class MasterStatusServlet extends HttpServlet { ServerName rootLocation = getRootLocationOrNull(master); ServerName metaLocation = master.getCatalogTracker().getMetaLocation(); List servers = master.getServerManager().getOnlineServersList(); - Set deadServers = master.getServerManager().getDeadServers(); + Set deadServers = master.getServerManager().getDeadServers().copyServerNames(); response.setContentType("text/html"); MasterStatusTmpl tmpl; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 34db1c22f9b..3088ec4abaa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -130,7 +130,7 @@ public class ServerManager { private final MasterServices services; private final HConnection connection; - private final DeadServer deadservers; + private final DeadServer deadservers = new DeadServer(); private final long maxSkew; private final long warningSkew; @@ -188,7 +188,6 @@ public class ServerManager { Configuration c = master.getConfiguration(); maxSkew = c.getLong("hbase.master.maxclockskew", 30000); warningSkew = c.getLong("hbase.master.warningclockskew", 10000); - this.deadservers = new DeadServer(); this.connection = connect ? HConnectionManager.getConnection(c) : null; } @@ -405,8 +404,9 @@ public class ServerManager { } } - public Set getDeadServers() { - return this.deadservers.clone(); + + public DeadServer getDeadServers() { + return this.deadservers; } /** @@ -458,7 +458,7 @@ public class ServerManager { LOG.warn("Received expiration of " + serverName + " but server is not currently online"); } - if (this.deadservers.contains(serverName)) { + if (this.deadservers.isDeadServer(serverName)) { // TODO: Can this happen? It shouldn't be online in this case? LOG.warn("Received expiration of " + serverName + " but server shutdown is already in progress"); @@ -886,13 +886,8 @@ public class ServerManager { * To clear any dead server with same host name and port of any online server */ void clearDeadServersWithSameHostNameAndPortOfOnlineServer() { - ServerName sn; for (ServerName serverName : getOnlineServersList()) { - while ((sn = ServerName. - findServerWithSameHostnamePort(this.deadservers, serverName)) != null) { - this.deadservers.remove(sn); - } + deadservers.cleanAllPreviousInstances(serverName); } } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index c0fca8699eb..1d7a66de711 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -75,7 +75,7 @@ public class ServerShutdownHandler extends EventHandler { this.server = server; this.services = services; this.deadServers = deadServers; - if (!this.deadServers.contains(this.serverName)) { + if (!this.deadServers.isDeadServer(this.serverName)) { LOG.warn(this.serverName + " is NOT in deadservers; it should be!"); } this.shouldSplitHlog = shouldSplitHlog; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java index 8e408c64818..17d0f67585c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java @@ -140,6 +140,7 @@ public class TestAssignmentManager { this.serverManager = Mockito.mock(ServerManager.class); Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true); Mockito.when(this.serverManager.isServerOnline(SERVERNAME_B)).thenReturn(true); + Mockito.when(this.serverManager.getDeadServers()).thenReturn(new DeadServer()); final Map onlineServers = new HashMap(); onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD); onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java index c393a9d77ef..384c37eab2c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDeadServer.java @@ -17,28 +17,39 @@ */ package org.apache.hadoop.hbase.master; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(SmallTests.class) +import java.util.List; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category(MediumTests.class) public class TestDeadServer { + final ServerName hostname123 = new ServerName("127.0.0.1", 123, 3L); + final ServerName hostname123_2 = new ServerName("127.0.0.1", 123, 4L); + final ServerName hostname1234 = new ServerName("127.0.0.2", 1234, 4L); + final ServerName hostname12345 = new ServerName("127.0.0.2", 12345, 4L); + @Test public void testIsDead() { DeadServer ds = new DeadServer(); - final ServerName hostname123 = new ServerName("127.0.0.1", 123, 3L); ds.add(hostname123); assertTrue(ds.areDeadServersInProgress()); ds.finish(hostname123); assertFalse(ds.areDeadServersInProgress()); - final ServerName hostname1234 = new ServerName("127.0.0.2", 1234, 4L); + ds.add(hostname1234); assertTrue(ds.areDeadServersInProgress()); ds.finish(hostname1234); assertFalse(ds.areDeadServersInProgress()); - final ServerName hostname12345 = new ServerName("127.0.0.2", 12345, 4L); + ds.add(hostname12345); assertTrue(ds.areDeadServersInProgress()); ds.finish(hostname12345); @@ -52,11 +63,54 @@ public class TestDeadServer { ds.add(deadServer); assertTrue(ds.isDeadServer(deadServer)); final ServerName deadServerHostComingAlive = - new ServerName("127.0.0.1", 9090, 112321L); + new ServerName("127.0.0.1", 9090, 223341L); assertTrue(ds.cleanPreviousInstance(deadServerHostComingAlive)); assertFalse(ds.isDeadServer(deadServer)); assertFalse(ds.cleanPreviousInstance(deadServerHostComingAlive)); } + + @Test + public void testSortExtract(){ + ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(mee); + mee.setValue(1); + + DeadServer d = new DeadServer(); + + + d.add(hostname123); + mee.incValue(1); + d.add(hostname1234); + mee.incValue(1); + d.add(hostname12345); + + List> copy = d.copyDeadServersSince(2L); + Assert.assertEquals(2, copy.size()); + + Assert.assertEquals(hostname1234, copy.get(0).getFirst()); + Assert.assertEquals(new Long(2L), copy.get(0).getSecond()); + + Assert.assertEquals(hostname12345, copy.get(1).getFirst()); + Assert.assertEquals(new Long(3L), copy.get(1).getSecond()); + + EnvironmentEdgeManager.reset(); + } + + @Test + public void testClean(){ + DeadServer d = new DeadServer(); + d.add(hostname123); + + d.cleanPreviousInstance(hostname12345); + Assert.assertFalse(d.isEmpty()); + + d.cleanPreviousInstance(hostname1234); + Assert.assertFalse(d.isEmpty()); + + d.cleanPreviousInstance(hostname123_2); + Assert.assertTrue(d.isEmpty()); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java index 37f40d55a0b..297d5dae313 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java @@ -310,7 +310,7 @@ public class TestRollingRestart { ServerName serverName) throws InterruptedException { ServerManager sm = activeMaster.getMaster().getServerManager(); // First wait for it to be in dead list - while (!sm.getDeadServers().contains(serverName)) { + while (!sm.getDeadServers().isDeadServer(serverName)) { log("Waiting for [" + serverName + "] to be listed as dead in master"); Thread.sleep(1); }