HBASE-13947 Use MasterServices instead of Server in AssignmentManager
This commit is contained in:
parent
b7a82d8311
commit
be599ddf86
|
@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
|
|||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
|
@ -135,7 +134,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
= "hbase.assignment.already.intransition.waittime";
|
||||
static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
|
||||
|
||||
protected final Server server;
|
||||
protected final MasterServices server;
|
||||
|
||||
private ServerManager serverManager;
|
||||
|
||||
|
@ -271,7 +270,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
*/
|
||||
public AssignmentManager(Server server, ServerManager serverManager,
|
||||
public AssignmentManager(MasterServices server, ServerManager serverManager,
|
||||
final LoadBalancer balancer,
|
||||
final ExecutorService service, MetricsMaster metricsMaster,
|
||||
final TableLockManager tableLockManager) throws KeeperException,
|
||||
|
@ -2841,7 +2840,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
}
|
||||
// assign all the replicas that were not recorded in the meta
|
||||
assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
|
||||
assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2929,7 +2928,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// maybe because it crashed.
|
||||
PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
|
||||
if (p.getFirst() != null && p.getSecond() != null) {
|
||||
int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
|
||||
int numReplicas = server.getTableDescriptors().get(p.getFirst().
|
||||
getTable()).getRegionReplication();
|
||||
for (HRegionInfo merge : p) {
|
||||
for (int i = 1; i < numReplicas; i++) {
|
||||
|
@ -4039,7 +4038,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
}
|
||||
int numReplicas = 1;
|
||||
try {
|
||||
numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
|
||||
numReplicas = server.getTableDescriptors().get(mergedHri.getTable()).
|
||||
getRegionReplication();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
|
||||
|
@ -4068,7 +4067,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// the replica1s of daughters will be on the same machine
|
||||
int numReplicas = 1;
|
||||
try {
|
||||
numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
|
||||
numReplicas = server.getTableDescriptors().get(parentHri.getTable()).
|
||||
getRegionReplication();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.executor.ExecutorType;
|
|||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.LoadBalancer;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
|
@ -77,7 +78,7 @@ public class TestDrainingServer {
|
|||
public static void afterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.assignment.usezk", true);
|
||||
|
@ -89,7 +90,7 @@ public class TestDrainingServer {
|
|||
AssignmentManager am;
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
final HMaster master = Mockito.mock(HMaster.class);
|
||||
final Server server = Mockito.mock(Server.class);
|
||||
final MasterServices server = Mockito.mock(MasterServices.class);
|
||||
final ServerManager serverManager = Mockito.mock(ServerManager.class);
|
||||
final ServerName SERVERNAME_A = ServerName.valueOf("mockserver_a.org", 1000, 8000);
|
||||
final ServerName SERVERNAME_B = ServerName.valueOf("mockserver_b.org", 1001, 8000);
|
||||
|
@ -118,12 +119,12 @@ public class TestDrainingServer {
|
|||
Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
|
||||
Mockito.when(serverManager.getOnlineServersList())
|
||||
.thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
|
||||
|
||||
|
||||
Mockito.when(serverManager.createDestinationServersList())
|
||||
.thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
|
||||
Mockito.when(serverManager.createDestinationServersList(null))
|
||||
.thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
|
||||
|
||||
|
||||
for (ServerName sn : onlineServers.keySet()) {
|
||||
Mockito.when(serverManager.isServerOnline(sn)).thenReturn(true);
|
||||
Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1)).thenReturn(true);
|
||||
|
@ -166,7 +167,7 @@ public class TestDrainingServer {
|
|||
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf);
|
||||
AssignmentManager am;
|
||||
final HMaster master = Mockito.mock(HMaster.class);
|
||||
final Server server = Mockito.mock(Server.class);
|
||||
final MasterServices server = Mockito.mock(MasterServices.class);
|
||||
final ServerManager serverManager = Mockito.mock(ServerManager.class);
|
||||
final ServerName SERVERNAME_A = ServerName.valueOf("mockserverbulk_a.org", 1000, 8000);
|
||||
final ServerName SERVERNAME_B = ServerName.valueOf("mockserverbulk_b.org", 1001, 8000);
|
||||
|
@ -176,7 +177,7 @@ public class TestDrainingServer {
|
|||
final Map<HRegionInfo, ServerName> bulk = new HashMap<HRegionInfo, ServerName>();
|
||||
|
||||
Set<ServerName> bunchServersAssigned = new HashSet<ServerName>();
|
||||
|
||||
|
||||
HRegionInfo REGIONINFO_A = new HRegionInfo(TableName.valueOf("table_A"),
|
||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
|
||||
HRegionInfo REGIONINFO_B = new HRegionInfo(TableName.valueOf("table_B"),
|
||||
|
@ -219,21 +220,21 @@ public class TestDrainingServer {
|
|||
Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
|
||||
Mockito.when(serverManager.getOnlineServersList()).thenReturn(
|
||||
new ArrayList<ServerName>(onlineServers.keySet()));
|
||||
|
||||
|
||||
Mockito.when(serverManager.createDestinationServersList()).thenReturn(
|
||||
new ArrayList<ServerName>(onlineServers.keySet()));
|
||||
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(
|
||||
new ArrayList<ServerName>(onlineServers.keySet()));
|
||||
|
||||
|
||||
for (Entry<HRegionInfo, ServerName> entry : bulk.entrySet()) {
|
||||
Mockito.when(serverManager.isServerOnline(entry.getValue())).thenReturn(true);
|
||||
Mockito.when(serverManager.sendRegionClose(entry.getValue(),
|
||||
Mockito.when(serverManager.sendRegionClose(entry.getValue(),
|
||||
entry.getKey(), -1)).thenReturn(true);
|
||||
Mockito.when(serverManager.sendRegionOpen(entry.getValue(),
|
||||
entry.getKey(), -1, null)).thenReturn(RegionOpeningState.OPENED);
|
||||
Mockito.when(serverManager.sendRegionOpen(entry.getValue(),
|
||||
entry.getKey(), -1, null)).thenReturn(RegionOpeningState.OPENED);
|
||||
Mockito.when(serverManager.addServerToDrainList(entry.getValue())).thenReturn(true);
|
||||
}
|
||||
|
||||
|
||||
Mockito.when(master.getServerManager()).thenReturn(serverManager);
|
||||
|
||||
drainedServers.add(SERVERNAME_A);
|
||||
|
@ -243,42 +244,42 @@ public class TestDrainingServer {
|
|||
|
||||
am = new AssignmentManager(server, serverManager,
|
||||
balancer, startupMasterExecutor("mockExecutorServiceBulk"), null, null);
|
||||
|
||||
|
||||
Mockito.when(master.getAssignmentManager()).thenReturn(am);
|
||||
|
||||
zkWatcher.registerListener(am);
|
||||
|
||||
|
||||
for (ServerName drained : drainedServers) {
|
||||
addServerToDrainedList(drained, onlineServers, serverManager);
|
||||
}
|
||||
|
||||
|
||||
am.assign(bulk);
|
||||
|
||||
Map<String, RegionState> regionsInTransition = am.getRegionStates().getRegionsInTransition();
|
||||
for (Entry<String, RegionState> entry : regionsInTransition.entrySet()) {
|
||||
setRegionOpenedOnZK(zkWatcher, entry.getValue().getServerName(),
|
||||
setRegionOpenedOnZK(zkWatcher, entry.getValue().getServerName(),
|
||||
entry.getValue().getRegion());
|
||||
}
|
||||
|
||||
|
||||
am.waitForAssignment(REGIONINFO_A);
|
||||
am.waitForAssignment(REGIONINFO_B);
|
||||
am.waitForAssignment(REGIONINFO_C);
|
||||
am.waitForAssignment(REGIONINFO_D);
|
||||
am.waitForAssignment(REGIONINFO_E);
|
||||
|
||||
|
||||
Map<HRegionInfo, ServerName> regionAssignments = am.getRegionStates().getRegionAssignments();
|
||||
for (Entry<HRegionInfo, ServerName> entry : regionAssignments.entrySet()) {
|
||||
LOG.info("Region Assignment: "
|
||||
LOG.info("Region Assignment: "
|
||||
+ entry.getKey().getRegionNameAsString() + " Server: " + entry.getValue());
|
||||
bunchServersAssigned.add(entry.getValue());
|
||||
}
|
||||
|
||||
|
||||
for (ServerName sn : drainedServers) {
|
||||
assertFalse(bunchServersAssigned.contains(sn));
|
||||
}
|
||||
}
|
||||
|
||||
private void addServerToDrainedList(ServerName serverName,
|
||||
private void addServerToDrainedList(ServerName serverName,
|
||||
Map<ServerName, ServerLoad> onlineServers, ServerManager serverManager) {
|
||||
onlineServers.remove(serverName);
|
||||
List<ServerName> availableServers = new ArrayList<ServerName>(onlineServers.keySet());
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MetaMockingUtil;
|
||||
import org.apache.hadoop.hbase.RegionException;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
|
@ -133,7 +134,7 @@ public class TestAssignmentManager {
|
|||
private static boolean enabling = false;
|
||||
|
||||
// Mocked objects or; get redone for each test.
|
||||
private Server server;
|
||||
private MasterServices server;
|
||||
private ServerManager serverManager;
|
||||
private ZooKeeperWatcher watcher;
|
||||
private CoordinatedStateManager cp;
|
||||
|
@ -161,7 +162,7 @@ public class TestAssignmentManager {
|
|||
// Mock a Server. Have it return a legit Configuration and ZooKeeperWatcher.
|
||||
// If abort is called, be sure to fail the test (don't just swallow it
|
||||
// silently as is mockito default).
|
||||
this.server = Mockito.mock(Server.class);
|
||||
this.server = Mockito.mock(MasterServices.class);
|
||||
Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("master,1,1"));
|
||||
Mockito.when(server.getConfiguration()).thenReturn(HTU.getConfiguration());
|
||||
this.watcher =
|
||||
|
@ -1039,7 +1040,7 @@ public class TestAssignmentManager {
|
|||
|
||||
CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager(
|
||||
HTU.getConfiguration());
|
||||
Server server = new HMaster(HTU.getConfiguration(), csm);
|
||||
MasterServices server = new HMaster(HTU.getConfiguration(), csm);
|
||||
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server,
|
||||
this.serverManager);
|
||||
|
||||
|
@ -1094,7 +1095,8 @@ public class TestAssignmentManager {
|
|||
HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0);
|
||||
CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager(
|
||||
HTU.getConfiguration());
|
||||
Server server = new HMaster(HTU.getConfiguration(), csm);
|
||||
MasterServices server = new HMaster(HTU.getConfiguration(), csm);
|
||||
|
||||
Whitebox.setInternalState(server, "serverManager", this.serverManager);
|
||||
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server,
|
||||
this.serverManager);
|
||||
|
@ -1109,7 +1111,7 @@ public class TestAssignmentManager {
|
|||
// set table in enabling state.
|
||||
am.getTableStateManager().setTableState(REGIONINFO.getTable(),
|
||||
Table.State.ENABLING);
|
||||
new EnableTableHandler(server, REGIONINFO.getTable(),
|
||||
new EnableTableHandler((Server)server, REGIONINFO.getTable(),
|
||||
am, new NullTableLockManager(), true).prepare()
|
||||
.process();
|
||||
assertEquals("Number of assignments should be 1.", 1, assignmentCount);
|
||||
|
@ -1143,7 +1145,7 @@ public class TestAssignmentManager {
|
|||
HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0);
|
||||
CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager(
|
||||
HTU.getConfiguration());
|
||||
Server server = new HMaster(HTU.getConfiguration(), csm);
|
||||
MasterServices server = new HMaster(HTU.getConfiguration(), csm);
|
||||
Whitebox.setInternalState(server, "serverManager", this.serverManager);
|
||||
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server,
|
||||
this.serverManager);
|
||||
|
@ -1288,9 +1290,9 @@ public class TestAssignmentManager {
|
|||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
*/
|
||||
private AssignmentManagerWithExtrasForTesting setUpMockedAssignmentManager(final Server server,
|
||||
final ServerManager manager) throws IOException, KeeperException,
|
||||
ServiceException, CoordinatedStateException {
|
||||
private AssignmentManagerWithExtrasForTesting setUpMockedAssignmentManager(
|
||||
final MasterServices server, final ServerManager manager)
|
||||
throws IOException, KeeperException, ServiceException, CoordinatedStateException {
|
||||
// Make an RS Interface implementation. Make it so a scanner can go against
|
||||
// it and a get to return the single region, REGIONINFO, this test is
|
||||
// messing with. Needed when "new master" joins cluster. AM will try and
|
||||
|
@ -1334,6 +1336,7 @@ public class TestAssignmentManager {
|
|||
getMockedConnectionAndDecorate(HTU.getConfiguration(), null,
|
||||
ri, SERVERNAME_LIVE, REGIONINFO);
|
||||
Mockito.when(this.server.getConnection()).thenReturn(connection);
|
||||
|
||||
// These mocks were done up when all connections were managed. World is different now we
|
||||
// moved to unmanaged connections. It messes up the intercepts done in these tests.
|
||||
// Just mark connections as marked and then down in MetaTableAccessor, it will go the path
|
||||
|
@ -1361,7 +1364,7 @@ public class TestAssignmentManager {
|
|||
private ClusterConnection connection;
|
||||
|
||||
public AssignmentManagerWithExtrasForTesting(
|
||||
final Server master, ClusterConnection connection, final ServerManager serverManager,
|
||||
final MasterServices master, ClusterConnection connection, final ServerManager serverManager,
|
||||
final LoadBalancer balancer,
|
||||
final ExecutorService service, final TableLockManager tableLockManager)
|
||||
throws KeeperException, IOException, CoordinatedStateException {
|
||||
|
|
Loading…
Reference in New Issue