HBASE-16121 Require only MasterServices to the ServerManager constructor

This commit is contained in:
Matteo Bertozzi 2016-06-28 07:42:40 -07:00
parent e768c4afac
commit 8bc4d4131c
4 changed files with 21 additions and 81 deletions

View File

@ -689,7 +689,7 @@ public class HMaster extends HRegionServer implements MasterServices {
status.setStatus("Publishing Cluster ID in ZooKeeper"); status.setStatus("Publishing Cluster ID in ZooKeeper");
ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId()); ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
this.serverManager = createServerManager(this, this); this.serverManager = createServerManager(this);
// Invalidate all write locks held previously // Invalidate all write locks held previously
this.tableLockManager.reapWriteLocks(); this.tableLockManager.reapWriteLocks();
@ -881,13 +881,11 @@ public class HMaster extends HRegionServer implements MasterServices {
/** /**
* Create a {@link ServerManager} instance. * Create a {@link ServerManager} instance.
*/ */
ServerManager createServerManager(final Server master, ServerManager createServerManager(final MasterServices master) throws IOException {
final MasterServices services)
throws IOException {
// We put this out here in a method so can do a Mockito.spy and stub it out // We put this out here in a method so can do a Mockito.spy and stub it out
// w/ a mocked up ServerManager. // w/ a mocked up ServerManager.
setupClusterConnection(); setupClusterConnection();
return new ServerManager(master, services); return new ServerManager(master);
} }
private void unassignExcessMetaReplica(ZooKeeperWatcher zkw, int numMetaReplicasConfigured) { private void unassignExcessMetaReplica(ZooKeeperWatcher zkw, int numMetaReplicasConfigured) {

View File

@ -150,8 +150,7 @@ public class ServerManager {
private final ArrayList<ServerName> drainingServers = private final ArrayList<ServerName> drainingServers =
new ArrayList<ServerName>(); new ArrayList<ServerName>();
private final Server master; private final MasterServices master;
private final MasterServices services;
private final ClusterConnection connection; private final ClusterConnection connection;
private final DeadServer deadservers = new DeadServer(); private final DeadServer deadservers = new DeadServer();
@ -204,18 +203,14 @@ public class ServerManager {
/** /**
* Constructor. * Constructor.
* @param master * @param master
* @param services
* @throws ZooKeeperConnectionException * @throws ZooKeeperConnectionException
*/ */
public ServerManager(final Server master, final MasterServices services) public ServerManager(final MasterServices master) throws IOException {
throws IOException { this(master, true);
this(master, services, true);
} }
ServerManager(final Server master, final MasterServices services, ServerManager(final MasterServices master, final boolean connect) throws IOException {
final boolean connect) throws IOException {
this.master = master; this.master = master;
this.services = services;
Configuration c = master.getConfiguration(); Configuration c = master.getConfiguration();
maxSkew = c.getLong("hbase.master.maxclockskew", 30000); maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
warningSkew = c.getLong("hbase.master.warningclockskew", 10000); warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
@ -430,7 +425,7 @@ public class ServerManager {
} }
// remove dead server with same hostname and port of newly checking in rs after master // remove dead server with same hostname and port of newly checking in rs after master
// initialization.See HBASE-5916 for more information. // initialization.See HBASE-5916 for more information.
if ((this.services == null || this.services.isInitialized()) if ((this.master == null || this.master.isInitialized())
&& this.deadservers.cleanPreviousInstance(serverName)) { && this.deadservers.cleanPreviousInstance(serverName)) {
// This server has now become alive after we marked it as dead. // This server has now become alive after we marked it as dead.
// We removed it's previous entry from the dead list to reflect it. // We removed it's previous entry from the dead list to reflect it.
@ -595,7 +590,7 @@ public class ServerManager {
} }
return; return;
} }
if (!services.isServerCrashProcessingEnabled()) { if (!master.isServerCrashProcessingEnabled()) {
LOG.info("Master doesn't enable ServerShutdownHandler during initialization, " LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
+ "delay expiring server " + serverName); + "delay expiring server " + serverName);
this.queuedDeadServers.add(serverName); this.queuedDeadServers.add(serverName);
@ -620,8 +615,8 @@ public class ServerManager {
return; return;
} }
boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName); boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(serverName);
this.services.getMasterProcedureExecutor(). this.master.getMasterProcedureExecutor().
submitProcedure(new ServerCrashProcedure(serverName, true, carryingMeta)); submitProcedure(new ServerCrashProcedure(serverName, true, carryingMeta));
LOG.debug("Added=" + serverName + LOG.debug("Added=" + serverName +
" to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta); " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
@ -659,13 +654,13 @@ public class ServerManager {
// We should not wait in the server shutdown handler thread since it can clog // We should not wait in the server shutdown handler thread since it can clog
// the handler threads and meta table could not be re-assigned in case // the handler threads and meta table could not be re-assigned in case
// the corresponding server is down. So we queue them up here instead. // the corresponding server is down. So we queue them up here instead.
if (!services.getAssignmentManager().isFailoverCleanupDone()) { if (!master.getAssignmentManager().isFailoverCleanupDone()) {
requeuedDeadServers.put(serverName, shouldSplitWal); requeuedDeadServers.put(serverName, shouldSplitWal);
return; return;
} }
this.deadservers.add(serverName); this.deadservers.add(serverName);
this.services.getMasterProcedureExecutor(). this.master.getMasterProcedureExecutor().
submitProcedure(new ServerCrashProcedure(serverName, shouldSplitWal, false)); submitProcedure(new ServerCrashProcedure(serverName, shouldSplitWal, false));
} }
@ -674,7 +669,7 @@ public class ServerManager {
* called after HMaster#assignMeta and AssignmentManager#joinCluster. * called after HMaster#assignMeta and AssignmentManager#joinCluster.
* */ * */
synchronized void processQueuedDeadServers() { synchronized void processQueuedDeadServers() {
if (!services.isServerCrashProcessingEnabled()) { if (!master.isServerCrashProcessingEnabled()) {
LOG.info("Master hasn't enabled ServerShutdownHandler"); LOG.info("Master hasn't enabled ServerShutdownHandler");
} }
Iterator<ServerName> serverIterator = queuedDeadServers.iterator(); Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
@ -685,7 +680,7 @@ public class ServerManager {
requeuedDeadServers.remove(tmpServerName); requeuedDeadServers.remove(tmpServerName);
} }
if (!services.getAssignmentManager().isFailoverCleanupDone()) { if (!master.getAssignmentManager().isFailoverCleanupDone()) {
LOG.info("AssignmentManager hasn't finished failover cleanup; waiting"); LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
} }
@ -753,7 +748,7 @@ public class ServerManager {
} }
OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
region, favoredNodes, region, favoredNodes,
(RecoveryMode.LOG_REPLAY == this.services.getMasterWalManager().getLogRecoveryMode())); (RecoveryMode.LOG_REPLAY == this.master.getMasterWalManager().getLogRecoveryMode()));
try { try {
OpenRegionResponse response = admin.openRegion(null, request); OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningState(response); return ResponseConverter.getRegionOpeningState(response);
@ -781,7 +776,7 @@ public class ServerManager {
} }
OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos, OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
(RecoveryMode.LOG_REPLAY == this.services.getMasterWalManager().getLogRecoveryMode())); (RecoveryMode.LOG_REPLAY == this.master.getMasterWalManager().getLogRecoveryMode()));
try { try {
OpenRegionResponse response = admin.openRegion(null, request); OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningStateList(response); return ResponseConverter.getRegionOpeningStateList(response);

View File

@ -51,66 +51,14 @@ public class TestClockSkewDetection {
@Test @Test
public void testClockSkewDetection() throws Exception { public void testClockSkewDetection() throws Exception {
final Configuration conf = HBaseConfiguration.create(); final Configuration conf = HBaseConfiguration.create();
ServerManager sm = new ServerManager(new Server() { ServerManager sm = new ServerManager(new MockNoopMasterServices(conf) {
@Override
public ClusterConnection getConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ServerName getServerName() {
return null;
}
@Override
public ZooKeeperWatcher getZooKeeper() {
return null;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}
@Override
public void abort(String why, Throwable e) {}
@Override
public boolean isAborted() {
return false;
}
@Override
public boolean isStopped() {
return false;
}
@Override
public void stop(String why) {
}
@Override
public ChoreService getChoreService() {
return null;
}
@Override @Override
public ClusterConnection getClusterConnection() { public ClusterConnection getClusterConnection() {
ClusterConnection conn = mock(ClusterConnection.class); ClusterConnection conn = mock(ClusterConnection.class);
when(conn.getRpcControllerFactory()).thenReturn(mock(RpcControllerFactory.class)); when(conn.getRpcControllerFactory()).thenReturn(mock(RpcControllerFactory.class));
return conn; return conn;
} }
}, null, true); }, true);
LOG.debug("regionServerStartup 1"); LOG.debug("regionServerStartup 1");
InetAddress ia1 = InetAddress.getLocalHost(); InetAddress ia1 = InetAddress.getLocalHost();

View File

@ -207,9 +207,8 @@ public class TestMasterNoCluster {
void initClusterSchemaService() throws IOException, InterruptedException {} void initClusterSchemaService() throws IOException, InterruptedException {}
@Override @Override
ServerManager createServerManager(Server master, MasterServices services) ServerManager createServerManager(MasterServices master) throws IOException {
throws IOException { ServerManager sm = super.createServerManager(master);
ServerManager sm = super.createServerManager(master, services);
// Spy on the created servermanager // Spy on the created servermanager
ServerManager spy = Mockito.spy(sm); ServerManager spy = Mockito.spy(sm);
// Fake a successful close. // Fake a successful close.