HBASE-18350 RSGroups are broken under AMv2
- Table moving to RSG was buggy, because it left the table unassigned. Now it is fixed we immediately assign to an appropriate RS (MoveRegionProcedure). - Table was locked while moving, but unassign operation hung, because locked table queues are not scheduled while locked. Fixed. - ProcedureSyncWait was buggy, because it searched the procId in executor, but executor does not store the return values of internal operations (they are stored, but immediately removed by the cleaner). - list_rsgroups in the shell show also the assigned tables and servers. Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
e1941aa6d1
commit
41cc9a125f
|
@ -44,13 +44,10 @@ import org.apache.hadoop.hbase.master.RegionState;
|
|||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
|
||||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.procedure2.LockType;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Service to support Region Server Grouping (HBase-6721).
|
||||
|
@ -88,10 +85,10 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
for(ServerName server: master.getServerManager().getOnlineServers().keySet()) {
|
||||
onlineServers.add(server.getAddress());
|
||||
}
|
||||
for (Address el: servers) {
|
||||
if (!onlineServers.contains(el)) {
|
||||
for (Address address: servers) {
|
||||
if (!onlineServers.contains(address)) {
|
||||
throw new ConstraintException(
|
||||
"Server " + el + " is not an online server in 'default' RSGroup.");
|
||||
"Server " + address + " is not an online server in 'default' RSGroup.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -192,18 +189,20 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
}
|
||||
|
||||
/**
|
||||
* Moves every region from servers which are currently located on these servers,
|
||||
* but should not be located there.
|
||||
* @param servers the servers that will move to new group
|
||||
* @param tables these tables will be kept on the servers, others will be moved
|
||||
* @param targetGroupName the target group name
|
||||
* @param tables The regions of tables assigned to these servers will not unassign
|
||||
* @throws IOException
|
||||
*/
|
||||
private void unassignRegionFromServers(Set<Address> servers, String targetGroupName,
|
||||
Set<TableName> tables) throws IOException {
|
||||
boolean foundRegionsToUnassign;
|
||||
private void moveRegionsFromServers(Set<Address> servers, Set<TableName> tables,
|
||||
String targetGroupName) throws IOException {
|
||||
boolean foundRegionsToMove;
|
||||
RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName);
|
||||
Set<Address> allSevers = new HashSet<>(servers);
|
||||
do {
|
||||
foundRegionsToUnassign = false;
|
||||
foundRegionsToMove = false;
|
||||
for (Iterator<Address> iter = allSevers.iterator(); iter.hasNext();) {
|
||||
Address rs = iter.next();
|
||||
// Get regions that are associated with this server and filter regions by tables.
|
||||
|
@ -214,22 +213,22 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
}
|
||||
}
|
||||
|
||||
LOG.info("Unassigning " + regions.size() +
|
||||
" region(s) from " + rs + " for server move to " + targetGroupName);
|
||||
LOG.info("Moving " + regions.size() + " region(s) from " + rs +
|
||||
" for server move to " + targetGroupName);
|
||||
if (!regions.isEmpty()) {
|
||||
for (RegionInfo region: regions) {
|
||||
// Regions might get assigned from tables of target group so we need to filter
|
||||
if (!targetGrp.containsTable(region.getTable())) {
|
||||
this.master.getAssignmentManager().unassign(region);
|
||||
this.master.getAssignmentManager().move(region);
|
||||
if (master.getAssignmentManager().getRegionStates().
|
||||
getRegionState(region).isFailedOpen()) {
|
||||
getRegionState(region).isFailedOpen()) {
|
||||
continue;
|
||||
}
|
||||
foundRegionsToUnassign = true;
|
||||
foundRegionsToMove = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!foundRegionsToUnassign) {
|
||||
if (!foundRegionsToMove) {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
@ -239,36 +238,27 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
LOG.warn("Sleep interrupted", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} while (foundRegionsToUnassign);
|
||||
} while (foundRegionsToMove);
|
||||
}
|
||||
|
||||
/**
|
||||
* Moves every region of tables which should be kept on the servers,
|
||||
* but currently they are located on other servers.
|
||||
* @param servers the regions of these servers will be kept on the servers,
|
||||
* others will be moved
|
||||
* @param tables the tables that will move to new group
|
||||
* @param targetGroupName the target group name
|
||||
* @param servers the regions of tables assigned to these servers will not unassign
|
||||
* @throws IOException
|
||||
*/
|
||||
private void unassignRegionFromTables(Set<TableName> tables, String targetGroupName,
|
||||
Set<Address> servers) throws IOException {
|
||||
private void moveRegionsToServers(Set<Address> servers, Set<TableName> tables,
|
||||
String targetGroupName) throws IOException {
|
||||
for (TableName table: tables) {
|
||||
LOG.info("Unassigning region(s) from " + table + " for table move to " + targetGroupName);
|
||||
LockManager.MasterLock lock = master.getLockManager().createMasterLock(table,
|
||||
LockType.EXCLUSIVE, this.getClass().getName() + ": RSGroup: table move");
|
||||
try {
|
||||
try {
|
||||
lock.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException("Interrupted when waiting for table lock", e);
|
||||
LOG.info("Moving region(s) from " + table + " for table move to " + targetGroupName);
|
||||
for (RegionInfo region : master.getAssignmentManager().getRegionStates().getRegionsOfTable(table)) {
|
||||
ServerName sn = master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(region);
|
||||
if (!servers.contains(sn.getAddress())) {
|
||||
master.getAssignmentManager().move(region);
|
||||
}
|
||||
for (RegionInfo region :
|
||||
master.getAssignmentManager().getRegionStates().getRegionsOfTable(table)) {
|
||||
ServerName sn = master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(region);
|
||||
if (!servers.contains(sn.getAddress())) {
|
||||
master.getAssignmentManager().unassign(region);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -329,39 +319,34 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
Set<Address> movedServers = rsGroupInfoManager.moveServers(servers, srcGrp.getName(),
|
||||
targetGroupName);
|
||||
List<Address> editableMovedServers = Lists.newArrayList(movedServers);
|
||||
boolean foundRegionsToUnassign;
|
||||
boolean foundRegionsToMove;
|
||||
do {
|
||||
foundRegionsToUnassign = false;
|
||||
foundRegionsToMove = false;
|
||||
for (Iterator<Address> iter = editableMovedServers.iterator(); iter.hasNext();) {
|
||||
Address rs = iter.next();
|
||||
// Get regions that are associated with this server.
|
||||
List<RegionInfo> regions = getRegions(rs);
|
||||
|
||||
// Unassign regions for a server
|
||||
// TODO: This is problematic especially if hbase:meta is in the mix.
|
||||
// We need to update state in hbase:meta on Master and if unassigned we hang
|
||||
// around in here. There is a silly sort on linked list done above
|
||||
// in getRegions putting hbase:meta last which helps but probably has holes.
|
||||
LOG.info("Unassigning " + regions.size() +
|
||||
" region(s) from " + rs + " for server move to " + targetGroupName);
|
||||
if (!regions.isEmpty()) {
|
||||
// TODO bulk unassign or throttled unassign?
|
||||
for (RegionInfo region: regions) {
|
||||
// Regions might get assigned from tables of target group so we need to filter
|
||||
if (!targetGrp.containsTable(region.getTable())) {
|
||||
this.master.getAssignmentManager().unassign(region);
|
||||
if (master.getAssignmentManager().getRegionStates().
|
||||
getRegionState(region).isFailedOpen()) {
|
||||
// If region is in FAILED_OPEN state, it won't recover, not without
|
||||
// operator intervention... in hbase-2.0.0 at least. Continue rather
|
||||
// than mark region as 'foundRegionsToUnassign'.
|
||||
continue;
|
||||
}
|
||||
foundRegionsToUnassign = true;
|
||||
}
|
||||
LOG.info("Moving " + regions.size() + " region(s) from " + rs +
|
||||
" for server move to " + targetGroupName);
|
||||
|
||||
for (RegionInfo region: regions) {
|
||||
// Regions might get assigned from tables of target group so we need to filter
|
||||
if (targetGrp.containsTable(region.getTable())) {
|
||||
continue;
|
||||
}
|
||||
LOG.info("Moving region " + region.getShortNameToLog());
|
||||
this.master.getAssignmentManager().move(region);
|
||||
if (master.getAssignmentManager().getRegionStates().
|
||||
getRegionState(region).isFailedOpen()) {
|
||||
// If region is in FAILED_OPEN state, it won't recover, not without
|
||||
// operator intervention... in hbase-2.0.0 at least. Continue rather
|
||||
// than mark region as 'foundRegionsToMove'.
|
||||
continue;
|
||||
}
|
||||
foundRegionsToMove = true;
|
||||
}
|
||||
if (!foundRegionsToUnassign) {
|
||||
if (!foundRegionsToMove) {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
@ -371,7 +356,8 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
LOG.warn("Sleep interrupted", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} while (foundRegionsToUnassign);
|
||||
} while (foundRegionsToMove);
|
||||
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postMoveServers(servers, targetGroupName);
|
||||
}
|
||||
|
@ -412,29 +398,27 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
"Source RSGroup " + srcGroup + " is same as target " + targetGroup +
|
||||
" RSGroup for table " + table);
|
||||
}
|
||||
LOG.info("Moving table " + table.getNameAsString() + " to RSGroup " + targetGroup);
|
||||
}
|
||||
rsGroupInfoManager.moveTables(tables, targetGroup);
|
||||
|
||||
// targetGroup is null when a table is being deleted. In this case no further
|
||||
// action is required.
|
||||
if (targetGroup != null) {
|
||||
for (TableName table: tables) {
|
||||
for (RegionInfo region :
|
||||
master.getAssignmentManager().getRegionStates().getRegionsOfTable(table)) {
|
||||
LOG.info("Moving region " + region.getShortNameToLog() +
|
||||
" to RSGroup " + targetGroup);
|
||||
master.getAssignmentManager().move(region);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postMoveTables(tables, targetGroup);
|
||||
}
|
||||
}
|
||||
for (TableName table: tables) {
|
||||
LockManager.MasterLock lock = master.getLockManager().createMasterLock(table,
|
||||
LockType.EXCLUSIVE, this.getClass().getName() + ": RSGroup: table move");
|
||||
try {
|
||||
try {
|
||||
lock.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException("Interrupted when waiting for table lock", e);
|
||||
}
|
||||
for (RegionInfo region :
|
||||
master.getAssignmentManager().getRegionStates().getRegionsOfTable(table)) {
|
||||
master.getAssignmentManager().unassign(region);
|
||||
}
|
||||
} finally {
|
||||
lock.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -582,10 +566,10 @@ public class RSGroupAdminServer implements RSGroupAdmin {
|
|||
String srcGroup = getRSGroupOfServer(servers.iterator().next()).getName();
|
||||
rsGroupInfoManager.moveServersAndTables(servers, tables, srcGroup, targetGroup);
|
||||
|
||||
//unassign regions which not belong to these tables
|
||||
unassignRegionFromServers(servers, targetGroup, tables);
|
||||
//unassign regions which not assigned to these servers
|
||||
unassignRegionFromTables(tables, targetGroup, servers);
|
||||
//move regions which should not belong to these tables
|
||||
moveRegionsFromServers(servers, tables, targetGroup);
|
||||
//move regions which should belong to these servers
|
||||
moveRegionsToServers(servers, tables, targetGroup);
|
||||
|
||||
if (master.getMasterCoprocessorHost() != null) {
|
||||
master.getMasterCoprocessorHost().postMoveServersAndTables(servers, tables, targetGroup);
|
||||
|
|
|
@ -124,7 +124,8 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
|
||||
List<RegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME);
|
||||
for (RegionInfo regionInfo : misplacedRegions) {
|
||||
regionPlans.add(new RegionPlan(regionInfo, null, null));
|
||||
ServerName serverName = findServerForRegion(clusterState, regionInfo);
|
||||
regionPlans.add(new RegionPlan(regionInfo, serverName, null));
|
||||
}
|
||||
try {
|
||||
List<RSGroupInfo> rsgi = rsGroupInfoManager.listRSGroups();
|
||||
|
@ -323,6 +324,19 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
return misplacedRegions;
|
||||
}
|
||||
|
||||
private ServerName findServerForRegion(
|
||||
Map<ServerName, List<RegionInfo>> existingAssignments, RegionInfo region)
|
||||
{
|
||||
for (Map.Entry<ServerName, List<RegionInfo>> entry : existingAssignments.entrySet()) {
|
||||
if (entry.getValue().contains(region)) {
|
||||
return entry.getKey();
|
||||
}
|
||||
}
|
||||
|
||||
throw new IllegalStateException("Could not find server for region "
|
||||
+ region.getShortNameToLog());
|
||||
}
|
||||
|
||||
private Map<ServerName, List<RegionInfo>> correctAssignments(
|
||||
Map<ServerName, List<RegionInfo>> existingAssignments)
|
||||
throws HBaseIOException{
|
||||
|
|
|
@ -51,13 +51,11 @@ import org.junit.AfterClass;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
|
||||
|
||||
@Ignore // TODO: Fix after HBASE-14614 goes in.
|
||||
@Category({MediumTests.class})
|
||||
public class TestRSGroups extends TestRSGroupsBase {
|
||||
protected static final Log LOG = LogFactory.getLog(TestRSGroups.class);
|
||||
|
@ -149,7 +147,7 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
});
|
||||
}
|
||||
|
||||
@Ignore @Test
|
||||
@Test
|
||||
public void testBasicStartUp() throws IOException {
|
||||
RSGroupInfo defaultInfo = rsGroupAdmin.getRSGroupInfo(RSGroupInfo.DEFAULT_GROUP);
|
||||
assertEquals(4, defaultInfo.getServers().size());
|
||||
|
@ -159,7 +157,7 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
assertEquals(3, count);
|
||||
}
|
||||
|
||||
@Ignore @Test
|
||||
@Test
|
||||
public void testNamespaceCreateAndAssign() throws Exception {
|
||||
LOG.info("testNamespaceCreateAndAssign");
|
||||
String nsName = tablePrefix+"_foo";
|
||||
|
@ -185,7 +183,7 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
Assert.assertEquals(1, ProtobufUtil.getOnlineRegions(rs).size());
|
||||
}
|
||||
|
||||
@Ignore @Test
|
||||
@Test
|
||||
public void testDefaultNamespaceCreateAndAssign() throws Exception {
|
||||
LOG.info("testDefaultNamespaceCreateAndAssign");
|
||||
final byte[] tableName = Bytes.toBytes(tablePrefix + "_testCreateAndAssign");
|
||||
|
@ -203,7 +201,7 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
});
|
||||
}
|
||||
|
||||
@Ignore @Test
|
||||
@Test
|
||||
public void testNamespaceConstraint() throws Exception {
|
||||
String nsName = tablePrefix+"_foo";
|
||||
String groupName = tablePrefix+"_foo";
|
||||
|
@ -238,7 +236,7 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Ignore @Test
|
||||
@Test
|
||||
public void testGroupInfoMultiAccessing() throws Exception {
|
||||
RSGroupInfoManager manager = rsGroupAdminEndpoint.getGroupInfoManager();
|
||||
RSGroupInfo defaultGroup = manager.getRSGroup("default");
|
||||
|
@ -249,7 +247,7 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
it.next();
|
||||
}
|
||||
|
||||
@Ignore @Test
|
||||
@Test
|
||||
public void testMisplacedRegions() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(tablePrefix+"_testMisplacedRegions");
|
||||
LOG.info("testMisplacedRegions");
|
||||
|
@ -277,7 +275,7 @@ public class TestRSGroups extends TestRSGroupsBase {
|
|||
});
|
||||
}
|
||||
|
||||
@Ignore @Test
|
||||
@Test
|
||||
public void testCloneSnapshot() throws Exception {
|
||||
byte[] FAMILY = Bytes.toBytes("test");
|
||||
String snapshotName = tableName.getNameAsString() + "_snap";
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -99,7 +98,7 @@ public class TestRSGroupsOfflineMode {
|
|||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Ignore @Test
|
||||
@Test
|
||||
public void testOffline() throws Exception, InterruptedException {
|
||||
// Table should be after group table name so it gets assigned later.
|
||||
final TableName failoverTable = TableName.valueOf(name.getMethodName());
|
||||
|
|
|
@ -76,21 +76,18 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
|
|||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureInMemoryChore;
|
||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||
|
||||
// TODO: why are they here?
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* The AssignmentManager is the coordinator for region assign/unassign operations.
|
||||
|
@ -552,6 +549,14 @@ public class AssignmentManager implements ServerListener {
|
|||
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
|
||||
}
|
||||
|
||||
public void move(final RegionInfo regionInfo) throws IOException {
|
||||
RegionStateNode node = this.regionStates.getRegionNode(regionInfo);
|
||||
ServerName sourceServer = node.getRegionLocation();
|
||||
RegionPlan plan = new RegionPlan(regionInfo, sourceServer, null);
|
||||
MoveRegionProcedure proc = createMoveRegionProcedure(plan);
|
||||
ProcedureSyncWait.submitAndWaitProcedure(master.getMasterProcedureExecutor(), proc);
|
||||
}
|
||||
|
||||
public Future<byte[]> moveAsync(final RegionPlan regionPlan) {
|
||||
MoveRegionProcedure proc = createMoveRegionProcedure(regionPlan);
|
||||
return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
|
||||
|
@ -590,7 +595,7 @@ public class AssignmentManager implements ServerListener {
|
|||
}
|
||||
|
||||
ProcedureSyncWait.waitForProcedureToCompleteIOE(
|
||||
master.getMasterProcedureExecutor(), proc.getProcId(), timeout);
|
||||
master.getMasterProcedureExecutor(), proc, timeout);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -61,14 +61,14 @@ public final class ProcedureSyncWait {
|
|||
|
||||
private static class ProcedureFuture implements Future<byte[]> {
|
||||
private final ProcedureExecutor<MasterProcedureEnv> procExec;
|
||||
private final long procId;
|
||||
private final Procedure<?> proc;
|
||||
|
||||
private boolean hasResult = false;
|
||||
private byte[] result = null;
|
||||
|
||||
public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, long procId) {
|
||||
public ProcedureFuture(ProcedureExecutor<MasterProcedureEnv> procExec, Procedure<?> proc) {
|
||||
this.procExec = procExec;
|
||||
this.procId = procId;
|
||||
this.proc = proc;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -84,7 +84,7 @@ public final class ProcedureSyncWait {
|
|||
public byte[] get() throws InterruptedException, ExecutionException {
|
||||
if (hasResult) return result;
|
||||
try {
|
||||
return waitForProcedureToComplete(procExec, procId, Long.MAX_VALUE);
|
||||
return waitForProcedureToComplete(procExec, proc, Long.MAX_VALUE);
|
||||
} catch (Exception e) {
|
||||
throw new ExecutionException(e);
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ public final class ProcedureSyncWait {
|
|||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
if (hasResult) return result;
|
||||
try {
|
||||
result = waitForProcedureToComplete(procExec, procId, unit.toMillis(timeout));
|
||||
result = waitForProcedureToComplete(procExec, proc, unit.toMillis(timeout));
|
||||
hasResult = true;
|
||||
return result;
|
||||
} catch (TimeoutIOException e) {
|
||||
|
@ -107,26 +107,27 @@ public final class ProcedureSyncWait {
|
|||
}
|
||||
|
||||
public static Future<byte[]> submitProcedure(final ProcedureExecutor<MasterProcedureEnv> procExec,
|
||||
final Procedure proc) {
|
||||
final Procedure<?> proc) {
|
||||
if (proc.isInitializing()) {
|
||||
procExec.submitProcedure(proc);
|
||||
}
|
||||
return new ProcedureFuture(procExec, proc.getProcId());
|
||||
return new ProcedureFuture(procExec, proc);
|
||||
}
|
||||
|
||||
public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
|
||||
final Procedure proc) throws IOException {
|
||||
final Procedure<?> proc) throws IOException {
|
||||
if (proc.isInitializing()) {
|
||||
procExec.submitProcedure(proc);
|
||||
}
|
||||
return waitForProcedureToCompleteIOE(procExec, proc.getProcId(), Long.MAX_VALUE);
|
||||
return waitForProcedureToCompleteIOE(procExec, proc, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
public static byte[] waitForProcedureToCompleteIOE(
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout)
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec,
|
||||
final Procedure<?> proc, final long timeout)
|
||||
throws IOException {
|
||||
try {
|
||||
return waitForProcedureToComplete(procExec, procId, timeout);
|
||||
return waitForProcedureToComplete(procExec, proc, timeout);
|
||||
} catch (IOException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
|
@ -135,30 +136,27 @@ public final class ProcedureSyncWait {
|
|||
}
|
||||
|
||||
public static byte[] waitForProcedureToComplete(
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final long timeout)
|
||||
final ProcedureExecutor<MasterProcedureEnv> procExec,
|
||||
final Procedure<?> proc, final long timeout)
|
||||
throws IOException {
|
||||
waitFor(procExec.getEnvironment(), "pid=" + procId,
|
||||
waitFor(procExec.getEnvironment(), "pid=" + proc.getProcId(),
|
||||
new ProcedureSyncWait.Predicate<Boolean>() {
|
||||
@Override
|
||||
public Boolean evaluate() throws IOException {
|
||||
return !procExec.isRunning() || procExec.isFinished(procId);
|
||||
return !procExec.isRunning() || procExec.isFinished(proc.getProcId());
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
Procedure result = procExec.getResult(procId);
|
||||
if (result != null) {
|
||||
if (result.hasException()) {
|
||||
// If the procedure fails, we should always have an exception captured. Throw it.
|
||||
throw result.getException().unwrapRemoteIOException();
|
||||
}
|
||||
return result.getResult();
|
||||
if (!procExec.isRunning()) {
|
||||
throw new IOException("The Master is Aborting");
|
||||
}
|
||||
|
||||
if (proc.hasException()) {
|
||||
// If the procedure fails, we should always have an exception captured. Throw it.
|
||||
throw proc.getException().unwrapRemoteIOException();
|
||||
} else {
|
||||
if (procExec.isRunning()) {
|
||||
throw new IOException("pid= " + procId + "not found");
|
||||
} else {
|
||||
throw new IOException("The Master is Aborting");
|
||||
}
|
||||
return proc.getResult();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -251,7 +251,7 @@ if ( fqtn != null ) {
|
|||
RegionInfoBuilder.FIRST_META_REGIONINFO, j);
|
||||
ServerName metaLocation = metaTableLocator.waitMetaRegionLocation(master.getZooKeeper(), j, 1);
|
||||
for (int i = 0; i < 1; i++) {
|
||||
String url = "";
|
||||
String hostAndPort = "";
|
||||
String readReq = "N/A";
|
||||
String writeReq = "N/A";
|
||||
String fileSize = "N/A";
|
||||
|
@ -262,7 +262,7 @@ if ( fqtn != null ) {
|
|||
if (metaLocation != null) {
|
||||
ServerLoad sl = master.getServerManager().getLoad(metaLocation);
|
||||
// The host name portion should be safe, but I don't know how we handle IDNs so err on the side of failing safely.
|
||||
url = "//" + URLEncoder.encode(metaLocation.getHostname()) + ":" + master.getRegionServerInfoPort(metaLocation) + "/";
|
||||
hostAndPort = URLEncoder.encode(metaLocation.getHostname()) + ":" + master.getRegionServerInfoPort(metaLocation);
|
||||
if (sl != null) {
|
||||
Map<byte[], RegionLoad> map = sl.getRegionsLoad();
|
||||
if (map.containsKey(meta.getRegionName())) {
|
||||
|
@ -279,7 +279,7 @@ if ( fqtn != null ) {
|
|||
%>
|
||||
<tr>
|
||||
<td><%= escapeXml(meta.getRegionNameAsString()) %></td>
|
||||
<td><a href="<%= url %>"><%= StringEscapeUtils.escapeHtml4(metaLocation.getHostname().toString()) + ":" + master.getRegionServerInfoPort(metaLocation) %></a></td>
|
||||
<td><a href="http://<%= hostAndPort %>/"><%= StringEscapeUtils.escapeHtml4(hostAndPort) %></a></td>
|
||||
<td><%= readReq%></td>
|
||||
<td><%= writeReq%></td>
|
||||
<td><%= fileSize%></td>
|
||||
|
|
|
@ -36,7 +36,7 @@ module Hbase
|
|||
#--------------------------------------------------------------------------
|
||||
# Returns a list of groups in hbase
|
||||
def list_rs_groups
|
||||
@admin.listRSGroups.map(&:getName)
|
||||
@admin.listRSGroups
|
||||
end
|
||||
|
||||
#--------------------------------------------------------------------------
|
||||
|
@ -44,32 +44,7 @@ module Hbase
|
|||
def get_rsgroup(group_name)
|
||||
group = @admin.getRSGroupInfo(group_name)
|
||||
raise(ArgumentError, 'Group does not exist: ' + group_name) if group.nil?
|
||||
|
||||
res = {}
|
||||
yield('Servers:') if block_given?
|
||||
|
||||
servers = []
|
||||
group.getServers.each do |v|
|
||||
if block_given?
|
||||
yield(v.toString)
|
||||
else
|
||||
servers << v.toString
|
||||
end
|
||||
end
|
||||
res[:servers] = servers
|
||||
|
||||
tables = []
|
||||
yield('Tables:') if block_given?
|
||||
group.getTables.each do |v|
|
||||
if block_given?
|
||||
yield(v.toString)
|
||||
else
|
||||
tables << v.toString
|
||||
end
|
||||
end
|
||||
res[:tables] = tables
|
||||
|
||||
res unless block_given?
|
||||
group
|
||||
end
|
||||
|
||||
#--------------------------------------------------------------------------
|
||||
|
|
|
@ -30,9 +30,17 @@ EOF
|
|||
end
|
||||
|
||||
def command(group_name)
|
||||
formatter.header(['RSGROUP '.concat(group_name)])
|
||||
rsgroup_admin.get_rsgroup(group_name) do |s|
|
||||
formatter.row([s])
|
||||
group = rsgroup_admin.get_rsgroup(group_name)
|
||||
|
||||
formatter.header(['SERVERS'])
|
||||
group.getServers.each do |server|
|
||||
formatter.row([server.toString])
|
||||
end
|
||||
formatter.footer
|
||||
|
||||
formatter.header(['TABLES'])
|
||||
group.getTables.each do |table|
|
||||
formatter.row([table.getNameAsString])
|
||||
end
|
||||
formatter.footer
|
||||
end
|
||||
|
|
|
@ -32,15 +32,44 @@ EOF
|
|||
end
|
||||
|
||||
def command(regex = '.*')
|
||||
formatter.header(['GROUPS'])
|
||||
formatter.header(['NAME', 'SERVER / TABLE'])
|
||||
|
||||
regex = /#{regex}/ unless regex.is_a?(Regexp)
|
||||
list = rsgroup_admin.list_rs_groups.grep(regex)
|
||||
list = rsgroup_admin.list_rs_groups
|
||||
groups = 0
|
||||
|
||||
list.each do |group|
|
||||
formatter.row([group])
|
||||
next unless group.getName.match(regex)
|
||||
|
||||
groups += 1
|
||||
group_name_printed = false
|
||||
|
||||
group.getServers.each do |server|
|
||||
if group_name_printed
|
||||
group_name = ''
|
||||
else
|
||||
group_name = group.getName
|
||||
group_name_printed = true
|
||||
end
|
||||
|
||||
formatter.row([group_name, 'server ' + server.toString])
|
||||
end
|
||||
|
||||
group.getTables.each do |table|
|
||||
if group_name_printed
|
||||
group_name = ''
|
||||
else
|
||||
group_name = group.getName
|
||||
group_name_printed = true
|
||||
end
|
||||
|
||||
formatter.row([group_name, 'table ' + table.getNameAsString])
|
||||
end
|
||||
|
||||
formatter.row([group.getName, '']) unless group_name_printed
|
||||
end
|
||||
|
||||
formatter.footer(list.size)
|
||||
formatter.footer(groups)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -20,7 +20,8 @@ module Shell
|
|||
class MoveServersRsgroup < Command
|
||||
def help
|
||||
<<-EOF
|
||||
Reassign RegionServers from one group to another.
|
||||
Reassign RegionServers from one group to another. Every region of the
|
||||
RegionServer will be moved to another RegionServer.
|
||||
|
||||
Example:
|
||||
|
||||
|
|
Loading…
Reference in New Issue