HBASE-7789 Clean DeadServer.java and add a Jitter method in ConnectionUtils
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1445087 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d2fb5a546f
commit
8bd7bffb36
|
@ -49,4 +49,19 @@ public class ConnectionUtils {
|
|||
long jitter = (long)(normalPause * RANDOM.nextFloat() * 0.01f); // 1% possible jitter
|
||||
return normalPause + jitter;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Adds / subs a 10% jitter to a pause time. Minimum is 1.
|
||||
* @param pause the expected pause.
|
||||
* @param jitter the jitter ratio, between 0 and 1, exclusive.
|
||||
*/
|
||||
public static long addJitter(final long pause, final float jitter) {
|
||||
float lag = pause * (RANDOM.nextFloat() - 0.5f) * jitter;
|
||||
long newPause = pause + (long) lag;
|
||||
if (newPause <= 0) {
|
||||
return 1;
|
||||
}
|
||||
return newPause;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,20 +18,26 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Class to hold dead servers list and utility querying dead server list.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DeadServer implements Set<ServerName> {
|
||||
public class DeadServer {
|
||||
/**
|
||||
* Set of known dead servers. On znode expiration, servers are added here.
|
||||
* This is needed in case of a network partitioning where the server's lease
|
||||
|
@ -39,75 +45,66 @@ public class DeadServer implements Set<ServerName> {
|
|||
* and it's server logs are recovered, it will be told to call server startup
|
||||
* because by then, its regions have probably been reassigned.
|
||||
*/
|
||||
private final Set<ServerName> deadServers = new HashSet<ServerName>();
|
||||
|
||||
/** Number of dead servers currently being processed */
|
||||
private int numProcessing;
|
||||
|
||||
public DeadServer() {
|
||||
super();
|
||||
this.numProcessing = 0;
|
||||
}
|
||||
private final Map<ServerName, Long> deadServers = new HashMap<ServerName, Long>();
|
||||
|
||||
/**
|
||||
* @param serverName Server name
|
||||
* @return true if server is dead
|
||||
* Number of dead servers currently being processed
|
||||
*/
|
||||
public boolean isDeadServer(final String serverName) {
|
||||
return isDeadServer(new ServerName(serverName));
|
||||
}
|
||||
private int numProcessing = 0;
|
||||
|
||||
/**
|
||||
* A dead server that comes back alive has a different start code.
|
||||
* A dead server that comes back alive has a different start code. The new start code should be
|
||||
* greater than the old one, but we don't take this into account in this method.
|
||||
*
|
||||
* @param newServerName Servername as either <code>host:port</code> or
|
||||
* <code>host,port,startcode</code>.
|
||||
* <code>host,port,startcode</code>.
|
||||
* @return true if this server was dead before and coming back alive again
|
||||
*/
|
||||
public boolean cleanPreviousInstance(final ServerName newServerName) {
|
||||
ServerName sn =
|
||||
ServerName.findServerWithSameHostnamePort(this.deadServers, newServerName);
|
||||
if (sn == null) return false;
|
||||
return this.deadServers.remove(sn);
|
||||
public synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
|
||||
Iterator<ServerName> it = deadServers.keySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
ServerName sn = it.next();
|
||||
if (ServerName.isSameHostnameAndPort(sn, newServerName)) {
|
||||
it.remove();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param serverName
|
||||
* @return true if this server is on the dead servers list.
|
||||
*/
|
||||
boolean isDeadServer(final ServerName serverName) {
|
||||
return this.deadServers.contains(serverName);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if we have a server with matching hostname and port.
|
||||
*/
|
||||
boolean isDeadServerWithSameHostnamePort(final ServerName serverName) {
|
||||
return ServerName.findServerWithSameHostnamePort(this.deadServers,
|
||||
serverName) != null;
|
||||
public synchronized boolean isDeadServer(final ServerName serverName) {
|
||||
return deadServers.containsKey(serverName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if there are currently any dead servers being processed by the
|
||||
* master. Returns true if at least one region server is currently being
|
||||
* processed as dead.
|
||||
*
|
||||
* @return true if any RS are being processed as dead
|
||||
*/
|
||||
public boolean areDeadServersInProgress() {
|
||||
public synchronized boolean areDeadServersInProgress() {
|
||||
return numProcessing != 0;
|
||||
}
|
||||
|
||||
public synchronized Set<ServerName> clone() {
|
||||
Set<ServerName> clone = new HashSet<ServerName>(this.deadServers.size());
|
||||
clone.addAll(this.deadServers);
|
||||
public synchronized Set<ServerName> copyServerNames() {
|
||||
Set<ServerName> clone = new HashSet<ServerName>(deadServers.size());
|
||||
clone.addAll(deadServers.keySet());
|
||||
return clone;
|
||||
}
|
||||
|
||||
public synchronized boolean add(ServerName e) {
|
||||
this.numProcessing++;
|
||||
return deadServers.add(e);
|
||||
return deadServers.put(e, EnvironmentEdgeManager.currentTimeMillis()) != null;
|
||||
}
|
||||
|
||||
public synchronized void finish(ServerName e) {
|
||||
@SuppressWarnings("UnusedParameters")
|
||||
public synchronized void finish(ServerName ignored) {
|
||||
this.numProcessing--;
|
||||
}
|
||||
|
||||
|
@ -119,55 +116,51 @@ public class DeadServer implements Set<ServerName> {
|
|||
return deadServers.isEmpty();
|
||||
}
|
||||
|
||||
public synchronized boolean contains(Object o) {
|
||||
return deadServers.contains(o);
|
||||
}
|
||||
|
||||
public Iterator<ServerName> iterator() {
|
||||
return this.deadServers.iterator();
|
||||
}
|
||||
|
||||
public synchronized Object[] toArray() {
|
||||
return deadServers.toArray();
|
||||
}
|
||||
|
||||
public synchronized <T> T[] toArray(T[] a) {
|
||||
return deadServers.toArray(a);
|
||||
}
|
||||
|
||||
public synchronized boolean remove(Object o) {
|
||||
return this.deadServers.remove(o);
|
||||
}
|
||||
|
||||
public synchronized boolean containsAll(Collection<?> c) {
|
||||
return deadServers.containsAll(c);
|
||||
}
|
||||
|
||||
public synchronized boolean addAll(Collection<? extends ServerName> c) {
|
||||
return deadServers.addAll(c);
|
||||
}
|
||||
|
||||
public synchronized boolean retainAll(Collection<?> c) {
|
||||
return deadServers.retainAll(c);
|
||||
}
|
||||
|
||||
public synchronized boolean removeAll(Collection<?> c) {
|
||||
return deadServers.removeAll(c);
|
||||
}
|
||||
|
||||
public synchronized void clear() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public synchronized boolean equals(Object o) {
|
||||
return deadServers.equals(o);
|
||||
}
|
||||
|
||||
public synchronized int hashCode() {
|
||||
return deadServers.hashCode();
|
||||
public synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
|
||||
Iterator<ServerName> it = deadServers.keySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
ServerName sn = it.next();
|
||||
if (ServerName.isSameHostnameAndPort(sn, newServerName)) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized String toString() {
|
||||
return this.deadServers.toString();
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (ServerName sn : deadServers.keySet()) {
|
||||
if (sb.length() > 0) {
|
||||
sb.append(", ");
|
||||
}
|
||||
sb.append(sn.toString());
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract all the servers dead since a given time, and sort them.
|
||||
* @param ts the time, 0 for all
|
||||
* @return a sorted array list, by death time, lowest values first.
|
||||
*/
|
||||
public synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts){
|
||||
List<Pair<ServerName, Long>> res = new ArrayList<Pair<ServerName, Long>>(size());
|
||||
|
||||
for (Map.Entry<ServerName, Long> entry:deadServers.entrySet()){
|
||||
if (entry.getValue() >= ts){
|
||||
res.add(new Pair<ServerName, Long>(entry.getKey(), entry.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
Collections.sort(res, ServerNameDeathDateComparator);
|
||||
return res;
|
||||
}
|
||||
|
||||
private static Comparator<Pair<ServerName, Long>> ServerNameDeathDateComparator =
|
||||
new Comparator<Pair<ServerName, Long>>(){
|
||||
|
||||
@Override
|
||||
public int compare(Pair<ServerName, Long> o1, Pair<ServerName, Long> o2) {
|
||||
return o1.getSecond().compareTo(o2.getSecond());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -579,7 +579,6 @@ Server {
|
|||
* @param abortable If fatal exception we'll call abort on this. May be null.
|
||||
* If it is we'll use the Connection associated with the passed
|
||||
* {@link Configuration} as our {@link Abortable}.
|
||||
* @param defaultTimeout Timeout to use. Pass zero for no timeout
|
||||
* ({@link Object#wait(long)} when passed a <code>0</code> waits for ever).
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -1859,7 +1858,7 @@ Server {
|
|||
return new ClusterStatus(VersionInfo.getVersion(),
|
||||
this.fileSystemManager.getClusterId().toString(),
|
||||
this.serverManager.getOnlineServers(),
|
||||
this.serverManager.getDeadServers(),
|
||||
this.serverManager.getDeadServers().copyServerNames(),
|
||||
this.serverName,
|
||||
backupMasters,
|
||||
this.assignmentManager.getRegionStates().getRegionsInTransition(),
|
||||
|
|
|
@ -63,7 +63,7 @@ public class MasterStatusServlet extends HttpServlet {
|
|||
ServerName rootLocation = getRootLocationOrNull(master);
|
||||
ServerName metaLocation = master.getCatalogTracker().getMetaLocation();
|
||||
List<ServerName> servers = master.getServerManager().getOnlineServersList();
|
||||
Set<ServerName> deadServers = master.getServerManager().getDeadServers();
|
||||
Set<ServerName> deadServers = master.getServerManager().getDeadServers().copyServerNames();
|
||||
|
||||
response.setContentType("text/html");
|
||||
MasterStatusTmpl tmpl;
|
||||
|
|
|
@ -130,7 +130,7 @@ public class ServerManager {
|
|||
private final MasterServices services;
|
||||
private final HConnection connection;
|
||||
|
||||
private final DeadServer deadservers;
|
||||
private final DeadServer deadservers = new DeadServer();
|
||||
|
||||
private final long maxSkew;
|
||||
private final long warningSkew;
|
||||
|
@ -188,7 +188,6 @@ public class ServerManager {
|
|||
Configuration c = master.getConfiguration();
|
||||
maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
|
||||
warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
|
||||
this.deadservers = new DeadServer();
|
||||
this.connection = connect ? HConnectionManager.getConnection(c) : null;
|
||||
}
|
||||
|
||||
|
@ -405,8 +404,9 @@ public class ServerManager {
|
|||
}
|
||||
}
|
||||
|
||||
public Set<ServerName> getDeadServers() {
|
||||
return this.deadservers.clone();
|
||||
|
||||
public DeadServer getDeadServers() {
|
||||
return this.deadservers;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -458,7 +458,7 @@ public class ServerManager {
|
|||
LOG.warn("Received expiration of " + serverName +
|
||||
" but server is not currently online");
|
||||
}
|
||||
if (this.deadservers.contains(serverName)) {
|
||||
if (this.deadservers.isDeadServer(serverName)) {
|
||||
// TODO: Can this happen? It shouldn't be online in this case?
|
||||
LOG.warn("Received expiration of " + serverName +
|
||||
" but server shutdown is already in progress");
|
||||
|
@ -886,13 +886,8 @@ public class ServerManager {
|
|||
* To clear any dead server with same host name and port of any online server
|
||||
*/
|
||||
void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
|
||||
ServerName sn;
|
||||
for (ServerName serverName : getOnlineServersList()) {
|
||||
while ((sn = ServerName.
|
||||
findServerWithSameHostnamePort(this.deadservers, serverName)) != null) {
|
||||
this.deadservers.remove(sn);
|
||||
}
|
||||
deadservers.cleanAllPreviousInstances(serverName);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -75,7 +75,7 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
this.server = server;
|
||||
this.services = services;
|
||||
this.deadServers = deadServers;
|
||||
if (!this.deadServers.contains(this.serverName)) {
|
||||
if (!this.deadServers.isDeadServer(this.serverName)) {
|
||||
LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
|
||||
}
|
||||
this.shouldSplitHlog = shouldSplitHlog;
|
||||
|
|
|
@ -140,6 +140,7 @@ public class TestAssignmentManager {
|
|||
this.serverManager = Mockito.mock(ServerManager.class);
|
||||
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true);
|
||||
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_B)).thenReturn(true);
|
||||
Mockito.when(this.serverManager.getDeadServers()).thenReturn(new DeadServer());
|
||||
final Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
|
||||
onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
|
||||
onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
|
||||
|
|
|
@ -17,28 +17,39 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestDeadServer {
|
||||
final ServerName hostname123 = new ServerName("127.0.0.1", 123, 3L);
|
||||
final ServerName hostname123_2 = new ServerName("127.0.0.1", 123, 4L);
|
||||
final ServerName hostname1234 = new ServerName("127.0.0.2", 1234, 4L);
|
||||
final ServerName hostname12345 = new ServerName("127.0.0.2", 12345, 4L);
|
||||
|
||||
@Test public void testIsDead() {
|
||||
DeadServer ds = new DeadServer();
|
||||
final ServerName hostname123 = new ServerName("127.0.0.1", 123, 3L);
|
||||
ds.add(hostname123);
|
||||
assertTrue(ds.areDeadServersInProgress());
|
||||
ds.finish(hostname123);
|
||||
assertFalse(ds.areDeadServersInProgress());
|
||||
final ServerName hostname1234 = new ServerName("127.0.0.2", 1234, 4L);
|
||||
|
||||
ds.add(hostname1234);
|
||||
assertTrue(ds.areDeadServersInProgress());
|
||||
ds.finish(hostname1234);
|
||||
assertFalse(ds.areDeadServersInProgress());
|
||||
final ServerName hostname12345 = new ServerName("127.0.0.2", 12345, 4L);
|
||||
|
||||
ds.add(hostname12345);
|
||||
assertTrue(ds.areDeadServersInProgress());
|
||||
ds.finish(hostname12345);
|
||||
|
@ -52,11 +63,54 @@ public class TestDeadServer {
|
|||
ds.add(deadServer);
|
||||
assertTrue(ds.isDeadServer(deadServer));
|
||||
final ServerName deadServerHostComingAlive =
|
||||
new ServerName("127.0.0.1", 9090, 112321L);
|
||||
new ServerName("127.0.0.1", 9090, 223341L);
|
||||
assertTrue(ds.cleanPreviousInstance(deadServerHostComingAlive));
|
||||
assertFalse(ds.isDeadServer(deadServer));
|
||||
assertFalse(ds.cleanPreviousInstance(deadServerHostComingAlive));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSortExtract(){
|
||||
ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
|
||||
EnvironmentEdgeManager.injectEdge(mee);
|
||||
mee.setValue(1);
|
||||
|
||||
DeadServer d = new DeadServer();
|
||||
|
||||
|
||||
d.add(hostname123);
|
||||
mee.incValue(1);
|
||||
d.add(hostname1234);
|
||||
mee.incValue(1);
|
||||
d.add(hostname12345);
|
||||
|
||||
List<Pair<ServerName, Long>> copy = d.copyDeadServersSince(2L);
|
||||
Assert.assertEquals(2, copy.size());
|
||||
|
||||
Assert.assertEquals(hostname1234, copy.get(0).getFirst());
|
||||
Assert.assertEquals(new Long(2L), copy.get(0).getSecond());
|
||||
|
||||
Assert.assertEquals(hostname12345, copy.get(1).getFirst());
|
||||
Assert.assertEquals(new Long(3L), copy.get(1).getSecond());
|
||||
|
||||
EnvironmentEdgeManager.reset();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClean(){
|
||||
DeadServer d = new DeadServer();
|
||||
d.add(hostname123);
|
||||
|
||||
d.cleanPreviousInstance(hostname12345);
|
||||
Assert.assertFalse(d.isEmpty());
|
||||
|
||||
d.cleanPreviousInstance(hostname1234);
|
||||
Assert.assertFalse(d.isEmpty());
|
||||
|
||||
d.cleanPreviousInstance(hostname123_2);
|
||||
Assert.assertTrue(d.isEmpty());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -310,7 +310,7 @@ public class TestRollingRestart {
|
|||
ServerName serverName) throws InterruptedException {
|
||||
ServerManager sm = activeMaster.getMaster().getServerManager();
|
||||
// First wait for it to be in dead list
|
||||
while (!sm.getDeadServers().contains(serverName)) {
|
||||
while (!sm.getDeadServers().isDeadServer(serverName)) {
|
||||
log("Waiting for [" + serverName + "] to be listed as dead in master");
|
||||
Thread.sleep(1);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue