HBASE-6317 Master clean start up and Partially enabled tables make region assignment inconsistent (RajeshBabu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1392802 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d6cca23ed3
commit
4c514ea23b
@ -19,6 +19,7 @@
|
|||||||
package org.apache.hadoop.hbase.master.handler;
|
package org.apache.hadoop.hbase.master.handler;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
@ -27,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
@ -36,7 +38,11 @@ import org.apache.hadoop.hbase.master.AssignmentManager;
|
|||||||
import org.apache.hadoop.hbase.master.BulkAssigner;
|
import org.apache.hadoop.hbase.master.BulkAssigner;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||||
|
import org.apache.hadoop.hbase.master.RegionStates;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.cloudera.htrace.Trace;
|
import org.cloudera.htrace.Trace;
|
||||||
|
|
||||||
@ -50,6 +56,7 @@ public class EnableTableHandler extends EventHandler {
|
|||||||
private final String tableNameStr;
|
private final String tableNameStr;
|
||||||
private final AssignmentManager assignmentManager;
|
private final AssignmentManager assignmentManager;
|
||||||
private final CatalogTracker ct;
|
private final CatalogTracker ct;
|
||||||
|
private boolean retainAssignment = false;
|
||||||
|
|
||||||
public EnableTableHandler(Server server, byte [] tableName,
|
public EnableTableHandler(Server server, byte [] tableName,
|
||||||
CatalogTracker catalogTracker, AssignmentManager assignmentManager,
|
CatalogTracker catalogTracker, AssignmentManager assignmentManager,
|
||||||
@ -60,6 +67,7 @@ public class EnableTableHandler extends EventHandler {
|
|||||||
this.tableNameStr = Bytes.toString(tableName);
|
this.tableNameStr = Bytes.toString(tableName);
|
||||||
this.ct = catalogTracker;
|
this.ct = catalogTracker;
|
||||||
this.assignmentManager = assignmentManager;
|
this.assignmentManager = assignmentManager;
|
||||||
|
this.retainAssignment = skipTableStateCheck;
|
||||||
// Check if table exists
|
// Check if table exists
|
||||||
if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
|
if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
|
||||||
throw new TableNotFoundException(Bytes.toString(tableName));
|
throw new TableNotFoundException(Bytes.toString(tableName));
|
||||||
@ -111,10 +119,12 @@ public class EnableTableHandler extends EventHandler {
|
|||||||
LOG.error("Error trying to enable the table " + this.tableNameStr, e);
|
LOG.error("Error trying to enable the table " + this.tableNameStr, e);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
LOG.error("Error trying to enable the table " + this.tableNameStr, e);
|
LOG.error("Error trying to enable the table " + this.tableNameStr, e);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.error("Error trying to enable the table " + this.tableNameStr, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleEnableTable() throws IOException, KeeperException {
|
private void handleEnableTable() throws IOException, KeeperException, InterruptedException {
|
||||||
// I could check table is disabling and if so, not enable but require
|
// I could check table is disabling and if so, not enable but require
|
||||||
// that user first finish disabling but that might be obnoxious.
|
// that user first finish disabling but that might be obnoxious.
|
||||||
|
|
||||||
@ -123,18 +133,18 @@ public class EnableTableHandler extends EventHandler {
|
|||||||
boolean done = false;
|
boolean done = false;
|
||||||
// Get the regions of this table. We're done when all listed
|
// Get the regions of this table. We're done when all listed
|
||||||
// tables are onlined.
|
// tables are onlined.
|
||||||
List<HRegionInfo> regionsInMeta;
|
List<Pair<HRegionInfo, ServerName>> tableRegionsAndLocations = MetaReader
|
||||||
regionsInMeta = MetaReader.getTableRegions(this.ct, tableName, true);
|
.getTableRegionsAndLocations(this.ct, tableName, true);
|
||||||
int countOfRegionsInTable = regionsInMeta.size();
|
int countOfRegionsInTable = tableRegionsAndLocations.size();
|
||||||
List<HRegionInfo> regions = regionsToAssign(regionsInMeta);
|
List<HRegionInfo> regions = regionsToAssignWithServerName(tableRegionsAndLocations);
|
||||||
int regionsCount = regions.size();
|
int regionsCount = regions.size();
|
||||||
if (regionsCount == 0) {
|
if (regionsCount == 0) {
|
||||||
done = true;
|
done = true;
|
||||||
}
|
}
|
||||||
LOG.info("Table '" + this.tableNameStr + "' has " + countOfRegionsInTable
|
LOG.info("Table '" + this.tableNameStr + "' has " + countOfRegionsInTable
|
||||||
+ " regions, of which " + regionsCount + " are offline.");
|
+ " regions, of which " + regionsCount + " are offline.");
|
||||||
BulkEnabler bd = new BulkEnabler(this.server, regions,
|
BulkEnabler bd = new BulkEnabler(this.server, regions, countOfRegionsInTable,
|
||||||
countOfRegionsInTable);
|
this.retainAssignment);
|
||||||
try {
|
try {
|
||||||
if (bd.bulkAssign()) {
|
if (bd.bulkAssign()) {
|
||||||
done = true;
|
done = true;
|
||||||
@ -158,19 +168,31 @@ public class EnableTableHandler extends EventHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param regionsInMeta This datastructure is edited by this method.
|
* @param regionsInMeta
|
||||||
* @return The <code>regionsInMeta</code> list minus the regions that have
|
* @return List of regions neither in transition nor assigned.
|
||||||
* been onlined; i.e. List of regions that need onlining.
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private List<HRegionInfo> regionsToAssign(
|
private List<HRegionInfo> regionsToAssignWithServerName(
|
||||||
final List<HRegionInfo> regionsInMeta)
|
final List<Pair<HRegionInfo, ServerName>> regionsInMeta) throws IOException {
|
||||||
throws IOException {
|
ServerManager serverManager = ((HMaster) this.server).getServerManager();
|
||||||
final List<HRegionInfo> onlineRegions =
|
List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
|
||||||
this.assignmentManager.getRegionStates()
|
RegionStates regionStates = this.assignmentManager.getRegionStates();
|
||||||
.getRegionsOfTable(tableName);
|
for (Pair<HRegionInfo, ServerName> regionLocation : regionsInMeta) {
|
||||||
regionsInMeta.removeAll(onlineRegions);
|
HRegionInfo hri = regionLocation.getFirst();
|
||||||
return regionsInMeta;
|
ServerName sn = regionLocation.getSecond();
|
||||||
|
if (!regionStates.isRegionInTransition(hri) && !regionStates.isRegionAssigned(hri)) {
|
||||||
|
if (this.retainAssignment && sn != null && serverManager.isServerOnline(sn)) {
|
||||||
|
this.assignmentManager.addPlan(hri.getEncodedName(), new RegionPlan(hri, null, sn));
|
||||||
|
}
|
||||||
|
regions.add(hri);
|
||||||
|
} else {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Skipping assign for the region " + hri + " during enable table "
|
||||||
|
+ hri.getTableNameAsString() + " because its already in tranition or assigned.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return regions;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -180,12 +202,14 @@ public class EnableTableHandler extends EventHandler {
|
|||||||
private final List<HRegionInfo> regions;
|
private final List<HRegionInfo> regions;
|
||||||
// Count of regions in table at time this assign was launched.
|
// Count of regions in table at time this assign was launched.
|
||||||
private final int countOfRegionsInTable;
|
private final int countOfRegionsInTable;
|
||||||
|
private final boolean retainAssignment;
|
||||||
|
|
||||||
BulkEnabler(final Server server, final List<HRegionInfo> regions,
|
BulkEnabler(final Server server, final List<HRegionInfo> regions,
|
||||||
final int countOfRegionsInTable) {
|
final int countOfRegionsInTable, boolean retainAssignment) {
|
||||||
super(server);
|
super(server);
|
||||||
this.regions = regions;
|
this.regions = regions;
|
||||||
this.countOfRegionsInTable = countOfRegionsInTable;
|
this.countOfRegionsInTable = countOfRegionsInTable;
|
||||||
|
this.retainAssignment = retainAssignment;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -193,7 +217,9 @@ public class EnableTableHandler extends EventHandler {
|
|||||||
boolean roundRobinAssignment = this.server.getConfiguration().getBoolean(
|
boolean roundRobinAssignment = this.server.getConfiguration().getBoolean(
|
||||||
"hbase.master.enabletable.roundrobin", false);
|
"hbase.master.enabletable.roundrobin", false);
|
||||||
|
|
||||||
if (!roundRobinAssignment) {
|
// In case of masterRestart always go with single assign. Going thro
|
||||||
|
// roundRobinAssignment will use bulkassign which may lead to double assignment.
|
||||||
|
if (retainAssignment || !roundRobinAssignment) {
|
||||||
for (HRegionInfo region : regions) {
|
for (HRegionInfo region : regions) {
|
||||||
if (assignmentManager.getRegionStates()
|
if (assignmentManager.getRegionStates()
|
||||||
.isRegionInTransition(region)) {
|
.isRegionInTransition(region)) {
|
||||||
@ -202,8 +228,13 @@ public class EnableTableHandler extends EventHandler {
|
|||||||
final HRegionInfo hri = region;
|
final HRegionInfo hri = region;
|
||||||
pool.execute(Trace.wrap(new Runnable() {
|
pool.execute(Trace.wrap(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
|
if (retainAssignment) {
|
||||||
|
// Already plan is populated.
|
||||||
|
assignmentManager.assign(hri, true, false, false);
|
||||||
|
} else {
|
||||||
assignmentManager.assign(hri, true);
|
assignmentManager.assign(hri, true);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
|
|||||||
import org.apache.hadoop.hbase.master.RegionState.State;
|
import org.apache.hadoop.hbase.master.RegionState.State;
|
||||||
import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer;
|
import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer;
|
||||||
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||||
|
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
|
||||||
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
|
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
|
||||||
@ -78,6 +79,7 @@ import org.junit.BeforeClass;
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
@ -96,6 +98,8 @@ public class TestAssignmentManager {
|
|||||||
private static final HRegionInfo REGIONINFO =
|
private static final HRegionInfo REGIONINFO =
|
||||||
new HRegionInfo(Bytes.toBytes("t"),
|
new HRegionInfo(Bytes.toBytes("t"),
|
||||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
|
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
|
||||||
|
private static int assignmentCount;
|
||||||
|
private static boolean enabling = false;
|
||||||
|
|
||||||
// Mocked objects or; get redone for each test.
|
// Mocked objects or; get redone for each test.
|
||||||
private Server server;
|
private Server server;
|
||||||
@ -399,11 +403,9 @@ public class TestAssignmentManager {
|
|||||||
|
|
||||||
// We need a mocked catalog tracker.
|
// We need a mocked catalog tracker.
|
||||||
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
|
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
|
||||||
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server
|
|
||||||
.getConfiguration());
|
|
||||||
// Create an AM.
|
// Create an AM.
|
||||||
AssignmentManager am = new AssignmentManager(this.server,
|
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
|
||||||
this.serverManager, ct, balancer, executor, null);
|
this.server, this.serverManager);
|
||||||
try {
|
try {
|
||||||
processServerShutdownHandler(ct, am, false);
|
processServerShutdownHandler(ct, am, false);
|
||||||
} finally {
|
} finally {
|
||||||
@ -854,6 +856,42 @@ public class TestAssignmentManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test verifies whether all the enabling table regions assigned only once during master startup.
|
||||||
|
*
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws IOException
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMasterRestartWhenTableInEnabling() throws KeeperException, IOException, Exception {
|
||||||
|
enabling = true;
|
||||||
|
List<ServerName> destServers = new ArrayList<ServerName>(1);
|
||||||
|
destServers.add(SERVERNAME_A);
|
||||||
|
Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(destServers);
|
||||||
|
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true);
|
||||||
|
HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0);
|
||||||
|
Server server = new HMaster(HTU.getConfiguration());
|
||||||
|
Whitebox.setInternalState(server, "serverManager", this.serverManager);
|
||||||
|
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server,
|
||||||
|
this.serverManager);
|
||||||
|
try {
|
||||||
|
// set table in enabling state.
|
||||||
|
am.getZKTable().setEnablingTable(REGIONINFO.getTableNameAsString());
|
||||||
|
new EnableTableHandler(server, REGIONINFO.getTableName(), am.getCatalogTracker(), am, true)
|
||||||
|
.process();
|
||||||
|
assertEquals("Number of assignments should be 1.", 1, assignmentCount);
|
||||||
|
assertTrue("Table should be enabled.",
|
||||||
|
am.getZKTable().isEnabledTable(REGIONINFO.getTableNameAsString()));
|
||||||
|
} finally {
|
||||||
|
enabling = false;
|
||||||
|
assignmentCount = 0;
|
||||||
|
am.getZKTable().setEnabledTable(REGIONINFO.getTableNameAsString());
|
||||||
|
am.shutdown();
|
||||||
|
ZKAssign.deleteAllNodes(this.watcher);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new ephemeral node in the SPLITTING state for the specified region.
|
* Creates a new ephemeral node in the SPLITTING state for the specified region.
|
||||||
* Create it ephemeral in case regionserver dies mid-split.
|
* Create it ephemeral in case regionserver dies mid-split.
|
||||||
@ -928,9 +966,15 @@ public class TestAssignmentManager {
|
|||||||
ScanResponse.Builder builder = ScanResponse.newBuilder();
|
ScanResponse.Builder builder = ScanResponse.newBuilder();
|
||||||
builder.setMoreResults(true);
|
builder.setMoreResults(true);
|
||||||
builder.addResult(ProtobufUtil.toResult(r));
|
builder.addResult(ProtobufUtil.toResult(r));
|
||||||
Mockito.when(ri.scan(
|
if (enabling) {
|
||||||
(RpcController)Mockito.any(), (ScanRequest)Mockito.any())).
|
Mockito.when(ri.scan((RpcController) Mockito.any(), (ScanRequest) Mockito.any()))
|
||||||
thenReturn(builder.build());
|
.thenReturn(builder.build()).thenReturn(builder.build()).thenReturn(builder.build())
|
||||||
|
.thenReturn(builder.build()).thenReturn(builder.build())
|
||||||
|
.thenReturn(ScanResponse.newBuilder().setMoreResults(false).build());
|
||||||
|
} else {
|
||||||
|
Mockito.when(ri.scan((RpcController) Mockito.any(), (ScanRequest) Mockito.any())).thenReturn(
|
||||||
|
builder.build());
|
||||||
|
}
|
||||||
// If a get, return the above result too for REGIONINFO
|
// If a get, return the above result too for REGIONINFO
|
||||||
GetResponse.Builder getBuilder = GetResponse.newBuilder();
|
GetResponse.Builder getBuilder = GetResponse.newBuilder();
|
||||||
getBuilder.setResult(ProtobufUtil.toResult(r));
|
getBuilder.setResult(ProtobufUtil.toResult(r));
|
||||||
@ -984,9 +1028,14 @@ public class TestAssignmentManager {
|
|||||||
@Override
|
@Override
|
||||||
public void assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan,
|
public void assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan,
|
||||||
boolean hijack) {
|
boolean hijack) {
|
||||||
|
if (enabling) {
|
||||||
|
assignmentCount++;
|
||||||
|
this.regionOnline(region, SERVERNAME_A);
|
||||||
|
} else {
|
||||||
super.assign(region, setOfflineInZK, forceNewPlan, hijack);
|
super.assign(region, setOfflineInZK, forceNewPlan, hijack);
|
||||||
this.gate.set(true);
|
this.gate.set(true);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void assign(List<HRegionInfo> regions)
|
public void assign(List<HRegionInfo> regions)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user