HBASE-5927 SSH and DisableTableHandler happening together does not clear the znode of the region and RIT map (Rajesh)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1339913 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
512b75643f
commit
eef90a4604
|
@ -2124,7 +2124,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
unassign(region, force, null);
|
||||
}
|
||||
|
||||
private void deleteClosingOrClosedNode(HRegionInfo region) {
|
||||
/**
|
||||
*
|
||||
* @param region regioninfo of znode to be deleted.
|
||||
*/
|
||||
public void deleteClosingOrClosedNode(HRegionInfo region) {
|
||||
try {
|
||||
if (!ZKAssign.deleteNode(master.getZooKeeper(), region.getEncodedName(),
|
||||
EventHandler.EventType.M_ZK_REGION_CLOSING)) {
|
||||
|
|
|
@ -288,10 +288,10 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
if (hris != null) {
|
||||
List<HRegionInfo> toAssignRegions = new ArrayList<HRegionInfo>();
|
||||
for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
|
||||
RegionState rit = this.services.getAssignmentManager().isRegionInTransition(e.getKey());
|
||||
if (processDeadRegion(e.getKey(), e.getValue(),
|
||||
this.services.getAssignmentManager(),
|
||||
this.server.getCatalogTracker())) {
|
||||
RegionState rit = this.services.getAssignmentManager().isRegionInTransition(e.getKey());
|
||||
ServerName addressFromAM = this.services.getAssignmentManager()
|
||||
.getRegionServerOfRegion(e.getKey());
|
||||
if (rit != null && !rit.isClosing() && !rit.isPendingClose()) {
|
||||
|
@ -308,6 +308,22 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
toAssignRegions.add(e.getKey());
|
||||
}
|
||||
}
|
||||
// If the table was partially disabled and the RS went down, we should clear the RIT
|
||||
// and remove the node for the region.
|
||||
// The rit that we use may be stale in case the table was in DISABLING state
|
||||
// but though we did assign we will not be clearing the znode in CLOSING state.
|
||||
// Doing this will have no harm. See HBASE-5927
|
||||
if (rit != null
|
||||
&& (rit.isClosing() || rit.isPendingClose())
|
||||
&& this.services.getAssignmentManager().getZKTable()
|
||||
.isDisablingOrDisabledTable(rit.getRegion().getTableNameAsString())) {
|
||||
HRegionInfo hri = rit.getRegion();
|
||||
AssignmentManager am = this.services.getAssignmentManager();
|
||||
am.deleteClosingOrClosedNode(hri);
|
||||
am.regionOffline(hri);
|
||||
// To avoid region assignment if table is in disabling or disabled state.
|
||||
toAssignRegions.remove(hri);
|
||||
}
|
||||
}
|
||||
// Get all available servers
|
||||
List<ServerName> availableServers = services.getServerManager()
|
||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerLoad;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.RegionTransition;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
|
@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -399,44 +402,7 @@ public class TestAssignmentManager {
|
|||
AssignmentManager am = new AssignmentManager(this.server,
|
||||
this.serverManager, ct, balancer, executor, null);
|
||||
try {
|
||||
// Make sure our new AM gets callbacks; once registered, can't unregister.
|
||||
// Thats ok because we make a new zk watcher for each test.
|
||||
this.watcher.registerListenerFirst(am);
|
||||
|
||||
// Need to set up a fake scan of meta for the servershutdown handler
|
||||
// Make an RS Interface implementation. Make it so a scanner can go against it.
|
||||
ClientProtocol implementation = Mockito.mock(ClientProtocol.class);
|
||||
// Get a meta row result that has region up on SERVERNAME_A
|
||||
Result r = Mocking.getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
|
||||
ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||
builder.setMoreResults(false);
|
||||
builder.addResult(ProtobufUtil.toResult(r));
|
||||
Mockito.when(implementation.scan(
|
||||
(RpcController)Mockito.any(), (ScanRequest)Mockito.any())).
|
||||
thenReturn(builder.build());
|
||||
|
||||
// Get a connection w/ mocked up common methods.
|
||||
HConnection connection =
|
||||
HConnectionTestingUtility.getMockedConnectionAndDecorate(HTU.getConfiguration(),
|
||||
null, implementation, SERVERNAME_B, REGIONINFO);
|
||||
|
||||
// Make it so we can get a catalogtracker from servermanager.. .needed
|
||||
// down in guts of server shutdown handler.
|
||||
Mockito.when(ct.getConnection()).thenReturn(connection);
|
||||
Mockito.when(this.server.getCatalogTracker()).thenReturn(ct);
|
||||
|
||||
// Now make a server shutdown handler instance and invoke process.
|
||||
// Have it that SERVERNAME_A died.
|
||||
DeadServer deadServers = new DeadServer();
|
||||
deadServers.add(SERVERNAME_A);
|
||||
// I need a services instance that will return the AM
|
||||
MasterServices services = Mockito.mock(MasterServices.class);
|
||||
Mockito.when(services.getAssignmentManager()).thenReturn(am);
|
||||
Mockito.when(services.getServerManager()).thenReturn(this.serverManager);
|
||||
ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
|
||||
services, deadServers, SERVERNAME_A, false);
|
||||
handler.process();
|
||||
// The region in r will have been assigned. It'll be up in zk as unassigned.
|
||||
processServerShutdownHandler(ct, am);
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
am.shutdown();
|
||||
|
@ -445,6 +411,113 @@ public class TestAssignmentManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* To test closed region handler to remove rit and delete corresponding znode
|
||||
* if region in pending close or closing while processing shutdown of a region
|
||||
* server.(HBASE-5927).
|
||||
*
|
||||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
* @throws ServiceException
|
||||
*/
|
||||
@Test
|
||||
public void testSSHWhenDisableTableInProgress() throws KeeperException, IOException,
|
||||
ServiceException {
|
||||
testCaseWithPartiallyDisabledState(Table.State.DISABLING);
|
||||
testCaseWithPartiallyDisabledState(Table.State.DISABLED);
|
||||
}
|
||||
|
||||
private void testCaseWithPartiallyDisabledState(Table.State state) throws KeeperException,
|
||||
IOException, NodeExistsException, ServiceException {
|
||||
// Create and startup an executor. This is used by AssignmentManager
|
||||
// handling zk callbacks.
|
||||
ExecutorService executor = startupMasterExecutor("testSSHWhenDisableTableInProgress");
|
||||
// We need a mocked catalog tracker.
|
||||
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
|
||||
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration());
|
||||
// Create an AM.
|
||||
AssignmentManager am = new AssignmentManager(this.server, this.serverManager, ct, balancer,
|
||||
executor, null);
|
||||
// adding region to regions and servers maps.
|
||||
am.regionOnline(REGIONINFO, SERVERNAME_A);
|
||||
// adding region in pending close.
|
||||
am.regionsInTransition.put(REGIONINFO.getEncodedName(), new RegionState(REGIONINFO,
|
||||
State.PENDING_CLOSE));
|
||||
if (state == Table.State.DISABLING) {
|
||||
am.getZKTable().setDisablingTable(REGIONINFO.getTableNameAsString());
|
||||
} else {
|
||||
am.getZKTable().setDisabledTable(REGIONINFO.getTableNameAsString());
|
||||
}
|
||||
RegionTransition data = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_CLOSING,
|
||||
REGIONINFO.getRegionName(), SERVERNAME_A);
|
||||
// RegionTransitionData data = new
|
||||
// RegionTransitionData(EventType.M_ZK_REGION_CLOSING,
|
||||
// REGIONINFO.getRegionName(), SERVERNAME_A);
|
||||
String node = ZKAssign.getNodeName(this.watcher, REGIONINFO.getEncodedName());
|
||||
// create znode in M_ZK_REGION_CLOSING state.
|
||||
ZKUtil.createAndWatch(this.watcher, node, data.toByteArray());
|
||||
try {
|
||||
processServerShutdownHandler(ct, am);
|
||||
// check znode deleted or not.
|
||||
// In both cases the znode should be deleted.
|
||||
assertTrue("The znode should be deleted.", ZKUtil.checkExists(this.watcher, node) == -1);
|
||||
// check whether in rit or not. In the DISABLING case also the below
|
||||
// assert will be true but the piece of code added for HBASE-5927 will not
|
||||
// do that.
|
||||
if (state == Table.State.DISABLED) {
|
||||
assertTrue("Region state of region in pending close should be removed from rit.",
|
||||
am.regionsInTransition.isEmpty());
|
||||
}
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
am.shutdown();
|
||||
// Clean up all znodes
|
||||
ZKAssign.deleteAllNodes(this.watcher);
|
||||
}
|
||||
}
|
||||
|
||||
private void processServerShutdownHandler(CatalogTracker ct, AssignmentManager am)
|
||||
throws IOException, ServiceException {
|
||||
// Make sure our new AM gets callbacks; once registered, can't unregister.
|
||||
// Thats ok because we make a new zk watcher for each test.
|
||||
this.watcher.registerListenerFirst(am);
|
||||
|
||||
// Need to set up a fake scan of meta for the servershutdown handler
|
||||
// Make an RS Interface implementation. Make it so a scanner can go against it.
|
||||
ClientProtocol implementation = Mockito.mock(ClientProtocol.class);
|
||||
// Get a meta row result that has region up on SERVERNAME_A
|
||||
Result r = Mocking.getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
|
||||
ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||
builder.setMoreResults(true);
|
||||
builder.addResult(ProtobufUtil.toResult(r));
|
||||
Mockito.when(implementation.scan(
|
||||
(RpcController)Mockito.any(), (ScanRequest)Mockito.any())).
|
||||
thenReturn(builder.build());
|
||||
|
||||
// Get a connection w/ mocked up common methods.
|
||||
HConnection connection =
|
||||
HConnectionTestingUtility.getMockedConnectionAndDecorate(HTU.getConfiguration(),
|
||||
null, implementation, SERVERNAME_B, REGIONINFO);
|
||||
|
||||
// Make it so we can get a catalogtracker from servermanager.. .needed
|
||||
// down in guts of server shutdown handler.
|
||||
Mockito.when(ct.getConnection()).thenReturn(connection);
|
||||
Mockito.when(this.server.getCatalogTracker()).thenReturn(ct);
|
||||
|
||||
// Now make a server shutdown handler instance and invoke process.
|
||||
// Have it that SERVERNAME_A died.
|
||||
DeadServer deadServers = new DeadServer();
|
||||
deadServers.add(SERVERNAME_A);
|
||||
// I need a services instance that will return the AM
|
||||
MasterServices services = Mockito.mock(MasterServices.class);
|
||||
Mockito.when(services.getAssignmentManager()).thenReturn(am);
|
||||
Mockito.when(services.getServerManager()).thenReturn(this.serverManager);
|
||||
ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
|
||||
services, deadServers, SERVERNAME_A, false);
|
||||
handler.process();
|
||||
// The region in r will have been assigned. It'll be up in zk as unassigned.
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and startup executor pools. Start same set as master does (just
|
||||
* run a few less).
|
||||
|
|
Loading…
Reference in New Issue