HBASE-10350. Master/AM/RegionStates changes to create and assign region replicas (ddas)

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1569861 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Devaraj Das 2014-02-19 18:05:54 +00:00 committed by Enis Soztutar
parent 25b6103dad
commit 3d8ee3a06e
13 changed files with 732 additions and 111 deletions

View File

@ -266,7 +266,7 @@ public class MetaReader {
}
/** Returns the row key to use for this regionInfo */
protected static byte[] getMetaKeyForRegion(HRegionInfo regionInfo) {
public static byte[] getMetaKeyForRegion(HRegionInfo regionInfo) {
return RegionReplicaUtil.getRegionInfoForDefaultReplica(regionInfo).getRegionName();
}
@ -630,7 +630,8 @@ public class MetaReader {
* @param replicaId the replicaId of the region
* @return a byte[] for server column qualifier
*/
protected static byte[] getServerColumn(int replicaId) {
@VisibleForTesting
public static byte[] getServerColumn(int replicaId) {
return replicaId == 0
? HConstants.SERVER_QUALIFIER
: Bytes.toBytes(HConstants.SERVER_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
@ -642,7 +643,8 @@ public class MetaReader {
* @param replicaId the replicaId of the region
* @return a byte[] for server start code column qualifier
*/
protected static byte[] getStartCodeColumn(int replicaId) {
@VisibleForTesting
public static byte[] getStartCodeColumn(int replicaId) {
return replicaId == 0
? HConstants.STARTCODE_QUALIFIER
: Bytes.toBytes(HConstants.STARTCODE_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
@ -654,7 +656,8 @@ public class MetaReader {
* @param replicaId the replicaId of the region
* @return a byte[] for seqNum column qualifier
*/
protected static byte[] getSeqNumColumn(int replicaId) {
@VisibleForTesting
public static byte[] getSeqNumColumn(int replicaId) {
return replicaId == 0
? HConstants.SEQNUM_QUALIFIER
: Bytes.toBytes(HConstants.SEQNUM_QUALIFIER_STR + META_REPLICA_ID_DELIMITER

View File

@ -38,7 +38,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
@ -51,6 +50,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
@ -101,6 +101,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableReques
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
@ -108,6 +110,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterSta
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
@ -127,12 +131,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
@ -196,6 +195,7 @@ public class HBaseAdmin implements Admin {
this.cleanupConnectionOnClose = true;
}
@Override
public int getOperationTimeout() {
return operationTimeout;
}
@ -269,6 +269,7 @@ public class HBaseAdmin implements Admin {
}
/** @return HConnection used by this object. */
@Override
public HConnection getConnection() {
return connection;
}
@ -278,6 +279,7 @@ public class HBaseAdmin implements Admin {
* @throws ZooKeeperConnectionException
* @throws MasterNotRunningException
*/
@Override
public boolean isMasterRunning()
throws MasterNotRunningException, ZooKeeperConnectionException {
return connection.isMasterRunning();
@ -288,6 +290,7 @@ public class HBaseAdmin implements Admin {
* @return True if table exists already.
* @throws IOException
*/
@Override
public boolean tableExists(final TableName tableName)
throws IOException {
boolean b = false;
@ -320,6 +323,7 @@ public class HBaseAdmin implements Admin {
* @return - returns an array of HTableDescriptors
* @throws IOException if a remote or network exception occurs
*/
@Override
public HTableDescriptor[] listTables() throws IOException {
return this.connection.listTables();
}
@ -332,6 +336,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @see #listTables()
*/
@Override
public HTableDescriptor[] listTables(Pattern pattern) throws IOException {
List<HTableDescriptor> matched = new LinkedList<HTableDescriptor>();
HTableDescriptor[] tables = listTables();
@ -351,6 +356,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @see #listTables(java.util.regex.Pattern)
*/
@Override
public HTableDescriptor[] listTables(String regex) throws IOException {
return listTables(Pattern.compile(regex));
}
@ -398,6 +404,7 @@ public class HBaseAdmin implements Admin {
* @return TableName[] table names
* @throws IOException if a remote or network exception occurs
*/
@Override
public TableName[] listTableNames() throws IOException {
return this.connection.listTableNames();
}
@ -409,6 +416,7 @@ public class HBaseAdmin implements Admin {
* @throws TableNotFoundException
* @throws IOException if a remote or network exception occurs
*/
@Override
public HTableDescriptor getTableDescriptor(final TableName tableName)
throws TableNotFoundException, IOException {
return this.connection.getHTableDescriptor(tableName);
@ -440,6 +448,7 @@ public class HBaseAdmin implements Admin {
* and attempt-at-creation).
* @throws IOException if a remote or network exception occurs
*/
@Override
public void createTable(HTableDescriptor desc)
throws IOException {
createTable(desc, null);
@ -469,6 +478,7 @@ public class HBaseAdmin implements Admin {
* and attempt-at-creation).
* @throws IOException
*/
@Override
public void createTable(HTableDescriptor desc, byte [] startKey,
byte [] endKey, int numRegions)
throws IOException {
@ -505,6 +515,7 @@ public class HBaseAdmin implements Admin {
* and attempt-at-creation).
* @throws IOException
*/
@Override
public void createTable(final HTableDescriptor desc, byte [][] splitKeys)
throws IOException {
try {
@ -512,7 +523,7 @@ public class HBaseAdmin implements Admin {
} catch (SocketTimeoutException ste) {
LOG.warn("Creating " + desc.getTableName() + " took too long", ste);
}
int numRegs = splitKeys == null ? 1 : splitKeys.length + 1;
int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication();
int prevRegCount = 0;
boolean doneWithMetaScan = false;
for (int tries = 0; tries < this.numRetries * this.retryLongerMultiplier;
@ -523,19 +534,27 @@ public class HBaseAdmin implements Admin {
MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
@Override
public boolean processRow(Result rowResult) throws IOException {
HRegionInfo info = HRegionInfo.getHRegionInfo(rowResult);
if (info == null) {
RegionLocations list = MetaReader.getRegionLocations(rowResult);
if (list == null) {
LOG.warn("No serialized HRegionInfo in " + rowResult);
return true;
}
if (!info.getTable().equals(desc.getTableName())) {
HRegionLocation l = list.getRegionLocation();
if (l == null) {
return true;
}
if (!l.getRegionInfo().getTable().equals(desc.getTableName())) {
return false;
}
ServerName serverName = HRegionInfo.getServerName(rowResult);
// Make sure that regions are assigned to server
if (!(info.isOffline() || info.isSplit()) && serverName != null
&& serverName.getHostAndPort() != null) {
actualRegCount.incrementAndGet();
if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
HRegionLocation[] locations = list.getRegionLocations();
for (HRegionLocation location : locations) {
if (location == null) continue;
ServerName serverName = location.getServerName();
// Make sure that regions are assigned to server
if (serverName != null && serverName.getHostAndPort() != null) {
actualRegCount.incrementAndGet();
}
}
return true;
}
@ -593,6 +612,7 @@ public class HBaseAdmin implements Admin {
* and attempt-at-creation).
* @throws IOException
*/
@Override
public void createTableAsync(
final HTableDescriptor desc, final byte [][] splitKeys)
throws IOException {
@ -642,6 +662,7 @@ public class HBaseAdmin implements Admin {
* @param tableName name of table to delete
* @throws IOException if a remote or network exception occurs
*/
@Override
public void deleteTable(final TableName tableName) throws IOException {
boolean tableExists = true;
@ -736,6 +757,7 @@ public class HBaseAdmin implements Admin {
* @see #deleteTables(java.util.regex.Pattern)
* @see #deleteTable(java.lang.String)
*/
@Override
public HTableDescriptor[] deleteTables(String regex) throws IOException {
return deleteTables(Pattern.compile(regex));
}
@ -751,6 +773,7 @@ public class HBaseAdmin implements Admin {
* @return Table descriptors for tables that couldn't be deleted
* @throws IOException
*/
@Override
public HTableDescriptor[] deleteTables(Pattern pattern) throws IOException {
List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
for (HTableDescriptor table : listTables(pattern)) {
@ -772,6 +795,7 @@ public class HBaseAdmin implements Admin {
* @param preserveSplits True if the splits should be preserved
* @throws IOException if a remote or network exception occurs
*/
@Override
public void truncateTable(final TableName tableName, final boolean preserveSplits)
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@ -798,6 +822,7 @@ public class HBaseAdmin implements Admin {
* @see #disableTable(byte[])
* @see #enableTableAsync(byte[])
*/
@Override
public void enableTable(final TableName tableName)
throws IOException {
enableTableAsync(tableName);
@ -869,6 +894,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException
* @since 0.90.0
*/
@Override
public void enableTableAsync(final TableName tableName)
throws IOException {
TableName.isLegalFullyQualifiedTableName(tableName.getName());
@ -905,6 +931,7 @@ public class HBaseAdmin implements Admin {
* @see #enableTables(java.util.regex.Pattern)
* @see #enableTable(java.lang.String)
*/
@Override
public HTableDescriptor[] enableTables(String regex) throws IOException {
return enableTables(Pattern.compile(regex));
}
@ -919,6 +946,7 @@ public class HBaseAdmin implements Admin {
* @param pattern The pattern to match table names against
* @throws IOException
*/
@Override
public HTableDescriptor[] enableTables(Pattern pattern) throws IOException {
List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
for (HTableDescriptor table : listTables(pattern)) {
@ -947,6 +975,7 @@ public class HBaseAdmin implements Admin {
* @see #isTableEnabled(byte[])
* @since 0.90.0
*/
@Override
public void disableTableAsync(final TableName tableName) throws IOException {
TableName.isLegalFullyQualifiedTableName(tableName.getName());
executeCallable(new MasterCallable<Void>(getConnection()) {
@ -979,6 +1008,7 @@ public class HBaseAdmin implements Admin {
* TableNotFoundException means the table doesn't exist.
* TableNotEnabledException means the table isn't in enabled state.
*/
@Override
public void disableTable(final TableName tableName)
throws IOException {
disableTableAsync(tableName);
@ -1032,6 +1062,7 @@ public class HBaseAdmin implements Admin {
* @see #disableTables(java.util.regex.Pattern)
* @see #disableTable(java.lang.String)
*/
@Override
public HTableDescriptor[] disableTables(String regex) throws IOException {
return disableTables(Pattern.compile(regex));
}
@ -1047,6 +1078,7 @@ public class HBaseAdmin implements Admin {
* @return Table descriptors for tables that couldn't be disabled
* @throws IOException
*/
@Override
public HTableDescriptor[] disableTables(Pattern pattern) throws IOException {
List<HTableDescriptor> failed = new LinkedList<HTableDescriptor>();
for (HTableDescriptor table : listTables(pattern)) {
@ -1077,6 +1109,7 @@ public class HBaseAdmin implements Admin {
* @return true if table is on-line
* @throws IOException if a remote or network exception occurs
*/
@Override
public boolean isTableEnabled(TableName tableName) throws IOException {
checkTableExistence(tableName);
return connection.isTableEnabled(tableName);
@ -1097,6 +1130,7 @@ public class HBaseAdmin implements Admin {
* @return true if table is off-line
* @throws IOException if a remote or network exception occurs
*/
@Override
public boolean isTableDisabled(TableName tableName) throws IOException {
checkTableExistence(tableName);
return connection.isTableDisabled(tableName);
@ -1115,6 +1149,7 @@ public class HBaseAdmin implements Admin {
* @return true if all regions of the table are available
* @throws IOException if a remote or network exception occurs
*/
@Override
public boolean isTableAvailable(TableName tableName) throws IOException {
return connection.isTableAvailable(tableName);
}
@ -1139,6 +1174,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException
* if a remote or network excpetion occurs
*/
@Override
public boolean isTableAvailable(TableName tableName,
byte[][] splitKeys) throws IOException {
return connection.isTableAvailable(tableName, splitKeys);
@ -1165,6 +1201,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException
* if a remote or network exception occurs
*/
@Override
public Pair<Integer, Integer> getAlterStatus(final TableName tableName)
throws IOException {
return executeCallable(new MasterCallable<Pair<Integer, Integer>>(getConnection()) {
@ -1192,6 +1229,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException
* if a remote or network exception occurs
*/
@Override
public Pair<Integer, Integer> getAlterStatus(final byte[] tableName)
throws IOException {
return getAlterStatus(TableName.valueOf(tableName));
@ -1232,6 +1270,7 @@ public class HBaseAdmin implements Admin {
* @param column column descriptor of column to be added
* @throws IOException if a remote or network exception occurs
*/
@Override
public void addColumn(final TableName tableName, final HColumnDescriptor column)
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@ -1278,6 +1317,7 @@ public class HBaseAdmin implements Admin {
* @param columnName name of column to be deleted
* @throws IOException if a remote or network exception occurs
*/
@Override
public void deleteColumn(final TableName tableName, final byte [] columnName)
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@ -1326,6 +1366,7 @@ public class HBaseAdmin implements Admin {
* @param descriptor new column descriptor to use
* @throws IOException if a remote or network exception occurs
*/
@Override
public void modifyColumn(final TableName tableName, final HColumnDescriptor descriptor)
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@ -1346,6 +1387,7 @@ public class HBaseAdmin implements Admin {
* the one currently in <code>hbase:meta</code>
* @throws IOException if a remote or network exception occurs
*/
@Override
public void closeRegion(final String regionname, final String serverName)
throws IOException {
closeRegion(Bytes.toBytes(regionname), serverName);
@ -1361,6 +1403,7 @@ public class HBaseAdmin implements Admin {
* <code> host187.example.com,60020,1289493121758</code>
* @throws IOException if a remote or network exception occurs
*/
@Override
public void closeRegion(final byte [] regionname, final String serverName)
throws IOException {
CatalogTracker ct = getCatalogTracker();
@ -1408,6 +1451,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException
* if a remote or network exception occurs
*/
@Override
public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
final String serverName) throws IOException {
if (null == serverName || ("").equals(serverName.trim())) {
@ -1438,6 +1482,7 @@ public class HBaseAdmin implements Admin {
* @param hri
* @throws IOException
*/
@Override
public void closeRegion(final ServerName sn, final HRegionInfo hri)
throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
@ -1448,6 +1493,7 @@ public class HBaseAdmin implements Admin {
/**
* Get all the online regions on a region server.
*/
@Override
public List<HRegionInfo> getOnlineRegions(
final ServerName sn) throws IOException {
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
@ -1462,6 +1508,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void flush(final String tableNameOrRegionName)
throws IOException, InterruptedException {
flush(Bytes.toBytes(tableNameOrRegionName));
@ -1475,6 +1522,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void flush(final byte[] tableNameOrRegionName)
throws IOException, InterruptedException {
CatalogTracker ct = getCatalogTracker();
@ -1522,6 +1570,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void compact(final String tableNameOrRegionName)
throws IOException, InterruptedException {
compact(Bytes.toBytes(tableNameOrRegionName));
@ -1535,6 +1584,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void compact(final byte[] tableNameOrRegionName)
throws IOException, InterruptedException {
compact(tableNameOrRegionName, null, false);
@ -1549,6 +1599,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void compact(String tableOrRegionName, String columnFamily)
throws IOException, InterruptedException {
compact(Bytes.toBytes(tableOrRegionName), Bytes.toBytes(columnFamily));
@ -1563,6 +1614,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void compact(final byte[] tableNameOrRegionName, final byte[] columnFamily)
throws IOException, InterruptedException {
compact(tableNameOrRegionName, columnFamily, false);
@ -1576,6 +1628,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void majorCompact(final String tableNameOrRegionName)
throws IOException, InterruptedException {
majorCompact(Bytes.toBytes(tableNameOrRegionName));
@ -1589,6 +1642,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void majorCompact(final byte[] tableNameOrRegionName)
throws IOException, InterruptedException {
compact(tableNameOrRegionName, null, true);
@ -1603,6 +1657,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void majorCompact(final String tableNameOrRegionName,
final String columnFamily) throws IOException, InterruptedException {
majorCompact(Bytes.toBytes(tableNameOrRegionName),
@ -1618,6 +1673,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void majorCompact(final byte[] tableNameOrRegionName,
final byte[] columnFamily) throws IOException, InterruptedException {
compact(tableNameOrRegionName, columnFamily, true);
@ -1699,6 +1755,7 @@ public class HBaseAdmin implements Admin {
* @throws ZooKeeperConnectionException
* @throws MasterNotRunningException
*/
@Override
public void move(final byte [] encodedRegionName, final byte [] destServerName)
throws HBaseIOException, MasterNotRunningException, ZooKeeperConnectionException {
MasterKeepAliveConnection stub = connection.getKeepAliveMasterService();
@ -1726,6 +1783,7 @@ public class HBaseAdmin implements Admin {
* @throws ZooKeeperConnectionException
* @throws IOException
*/
@Override
public void assign(final byte[] regionName) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException {
final byte[] toBeAssigned = getRegionName(regionName);
@ -1754,6 +1812,7 @@ public class HBaseAdmin implements Admin {
* @throws ZooKeeperConnectionException
* @throws IOException
*/
@Override
public void unassign(final byte [] regionName, final boolean force)
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
final byte[] toBeUnassigned = getRegionName(regionName);
@ -1780,6 +1839,7 @@ public class HBaseAdmin implements Admin {
* Region to offline.
* @throws IOException
*/
@Override
public void offline(final byte [] regionName)
throws IOException {
MasterKeepAliveConnection master = connection.getKeepAliveMasterService();
@ -1798,6 +1858,7 @@ public class HBaseAdmin implements Admin {
* @param synchronous If true, it waits until current balance() call, if outstanding, to return.
* @return Previous balancer value
*/
@Override
public boolean setBalancerRunning(final boolean on, final boolean synchronous)
throws MasterNotRunningException, ZooKeeperConnectionException {
MasterKeepAliveConnection stub = connection.getKeepAliveMasterService();
@ -1828,6 +1889,7 @@ public class HBaseAdmin implements Admin {
* logs.
* @return True if balancer ran, false otherwise.
*/
@Override
public boolean balancer()
throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException {
MasterKeepAliveConnection stub = connection.getKeepAliveMasterService();
@ -1845,6 +1907,7 @@ public class HBaseAdmin implements Admin {
* @throws ServiceException
* @throws MasterNotRunningException
*/
@Override
public boolean enableCatalogJanitor(boolean enable)
throws ServiceException, MasterNotRunningException {
MasterKeepAliveConnection stub = connection.getKeepAliveMasterService();
@ -1862,6 +1925,7 @@ public class HBaseAdmin implements Admin {
* @throws ServiceException
* @throws MasterNotRunningException
*/
@Override
public int runCatalogScan() throws ServiceException, MasterNotRunningException {
MasterKeepAliveConnection stub = connection.getKeepAliveMasterService();
try {
@ -1877,6 +1941,7 @@ public class HBaseAdmin implements Admin {
* @throws ServiceException
* @throws org.apache.hadoop.hbase.MasterNotRunningException
*/
@Override
public boolean isCatalogJanitorEnabled() throws ServiceException, MasterNotRunningException {
MasterKeepAliveConnection stub = connection.getKeepAliveMasterService();
try {
@ -1895,6 +1960,7 @@ public class HBaseAdmin implements Admin {
* two adjacent regions
* @throws IOException
*/
@Override
public void mergeRegions(final byte[] encodedNameOfRegionA,
final byte[] encodedNameOfRegionB, final boolean forcible)
throws IOException {
@ -1930,6 +1996,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void split(final String tableNameOrRegionName)
throws IOException, InterruptedException {
split(Bytes.toBytes(tableNameOrRegionName));
@ -1943,11 +2010,13 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
@Override
public void split(final byte[] tableNameOrRegionName)
throws IOException, InterruptedException {
split(tableNameOrRegionName, null);
}
@Override
public void split(final String tableNameOrRegionName,
final String splitPoint) throws IOException, InterruptedException {
split(Bytes.toBytes(tableNameOrRegionName), Bytes.toBytes(splitPoint));
@ -1962,6 +2031,7 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException interrupt exception occurred
*/
@Override
public void split(final byte[] tableNameOrRegionName,
final byte [] splitPoint) throws IOException, InterruptedException {
CatalogTracker ct = getCatalogTracker();
@ -2016,6 +2086,7 @@ public class HBaseAdmin implements Admin {
* @param htd modified description of the table
* @throws IOException if a remote or network exception occurs
*/
@Override
public void modifyTable(final TableName tableName, final HTableDescriptor htd)
throws IOException {
if (!tableName.equals(htd.getTableName())) {
@ -2135,6 +2206,7 @@ public class HBaseAdmin implements Admin {
* Shuts down the HBase cluster
* @throws IOException if a remote or network exception occurs
*/
@Override
public synchronized void shutdown() throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
@ -2151,6 +2223,7 @@ public class HBaseAdmin implements Admin {
* @see #shutdown()
* @throws IOException if a remote or network exception occurs
*/
@Override
public synchronized void stopMaster() throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
@ -2167,6 +2240,7 @@ public class HBaseAdmin implements Admin {
* <code>example.org:1234</code>
* @throws IOException if a remote or network exception occurs
*/
@Override
public synchronized void stopRegionServer(final String hostnamePort)
throws IOException {
String hostname = Addressing.parseHostname(hostnamePort);
@ -2187,6 +2261,7 @@ public class HBaseAdmin implements Admin {
* @return cluster status
* @throws IOException if a remote or network exception occurs
*/
@Override
public ClusterStatus getClusterStatus() throws IOException {
return executeCallable(new MasterCallable<ClusterStatus>(getConnection()) {
@Override
@ -2206,6 +2281,7 @@ public class HBaseAdmin implements Admin {
/**
* @return Configuration used by the instance.
*/
@Override
public Configuration getConfiguration() {
return this.conf;
}
@ -2215,6 +2291,7 @@ public class HBaseAdmin implements Admin {
* @param descriptor descriptor which describes the new namespace
* @throws IOException
*/
@Override
public void createNamespace(final NamespaceDescriptor descriptor) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
@ -2233,6 +2310,7 @@ public class HBaseAdmin implements Admin {
* @param descriptor descriptor which describes the new namespace
* @throws IOException
*/
@Override
public void modifyNamespace(final NamespaceDescriptor descriptor) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
@ -2249,6 +2327,7 @@ public class HBaseAdmin implements Admin {
* @param name namespace name
* @throws IOException
*/
@Override
public void deleteNamespace(final String name) throws IOException {
executeCallable(new MasterCallable<Void>(getConnection()) {
@Override
@ -2266,6 +2345,7 @@ public class HBaseAdmin implements Admin {
* @return A descriptor
* @throws IOException
*/
@Override
public NamespaceDescriptor getNamespaceDescriptor(final String name) throws IOException {
return
executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection()) {
@ -2283,6 +2363,7 @@ public class HBaseAdmin implements Admin {
* @return List of descriptors
* @throws IOException
*/
@Override
public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException {
return
executeCallable(new MasterCallable<NamespaceDescriptor[]>(getConnection()) {
@ -2306,6 +2387,7 @@ public class HBaseAdmin implements Admin {
* @return A descriptor
* @throws IOException
*/
@Override
public HTableDescriptor[] listTableDescriptorsByNamespace(final String name) throws IOException {
return
executeCallable(new MasterCallable<HTableDescriptor[]>(getConnection()) {
@ -2330,6 +2412,7 @@ public class HBaseAdmin implements Admin {
* @return The list of table names in the namespace
* @throws IOException
*/
@Override
public TableName[] listTableNamesByNamespace(final String name) throws IOException {
return
executeCallable(new MasterCallable<TableName[]>(getConnection()) {
@ -2406,6 +2489,7 @@ public class HBaseAdmin implements Admin {
* @return Ordered list of {@link HRegionInfo}.
* @throws IOException
*/
@Override
public List<HRegionInfo> getTableRegions(final TableName tableName)
throws IOException {
CatalogTracker ct = getCatalogTracker();
@ -2437,6 +2521,7 @@ public class HBaseAdmin implements Admin {
* @return HTD[] the tableDescriptor
* @throws IOException if a remote or network exception occurs
*/
@Override
public HTableDescriptor[] getTableDescriptorsByTableName(List<TableName> tableNames)
throws IOException {
return this.connection.getHTableDescriptorsByTableName(tableNames);
@ -2448,6 +2533,7 @@ public class HBaseAdmin implements Admin {
* @return HTD[] the tableDescriptor
* @throws IOException if a remote or network exception occurs
*/
@Override
public HTableDescriptor[] getTableDescriptors(List<String> names)
throws IOException {
List<TableName> tableNames = new ArrayList<TableName>(names.size());
@ -2470,7 +2556,8 @@ public class HBaseAdmin implements Admin {
* @throws IOException if a remote or network exception occurs
* @throws FailedLogCloseException
*/
public synchronized byte[][] rollHLogWriter(String serverName)
@Override
public synchronized byte[][] rollHLogWriter(String serverName)
throws IOException, FailedLogCloseException {
ServerName sn = ServerName.valueOf(serverName);
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
@ -2489,6 +2576,7 @@ public class HBaseAdmin implements Admin {
}
}
@Override
public String[] getMasterCoprocessors() {
try {
return getClusterStatus().getMasterCoprocessors();
@ -2507,6 +2595,7 @@ public class HBaseAdmin implements Admin {
* @throws InterruptedException
* @return the current compaction state
*/
@Override
public CompactionState getCompactionState(final String tableNameOrRegionName)
throws IOException, InterruptedException {
return getCompactionState(Bytes.toBytes(tableNameOrRegionName));
@ -2521,6 +2610,7 @@ public class HBaseAdmin implements Admin {
* @throws InterruptedException
* @return the current compaction state
*/
@Override
public CompactionState getCompactionState(final byte[] tableNameOrRegionName)
throws IOException, InterruptedException {
CompactionState state = CompactionState.NONE;
@ -2613,6 +2703,7 @@ public class HBaseAdmin implements Admin {
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
@Override
public void snapshot(final String snapshotName,
final TableName tableName) throws IOException,
SnapshotCreationException, IllegalArgumentException {
@ -2662,6 +2753,7 @@ public class HBaseAdmin implements Admin {
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
@Override
public void snapshot(final byte[] snapshotName,
final TableName tableName) throws IOException,
SnapshotCreationException, IllegalArgumentException {
@ -2693,6 +2785,7 @@ public class HBaseAdmin implements Admin {
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
@Override
public void snapshot(final String snapshotName,
final TableName tableName,
SnapshotDescription.Type type) throws IOException, SnapshotCreationException,
@ -2739,6 +2832,7 @@ public class HBaseAdmin implements Admin {
* @throws SnapshotCreationException if snapshot failed to be taken
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
@Override
public void snapshot(SnapshotDescription snapshot) throws IOException, SnapshotCreationException,
IllegalArgumentException {
// actually take the snapshot
@ -2789,6 +2883,7 @@ public class HBaseAdmin implements Admin {
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
@Override
public SnapshotResponse takeSnapshotAsync(SnapshotDescription snapshot) throws IOException,
SnapshotCreationException {
ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
@ -2823,6 +2918,7 @@ public class HBaseAdmin implements Admin {
* @throws HBaseSnapshotException if the snapshot failed
* @throws UnknownSnapshotException if the requested snapshot is unknown
*/
@Override
public boolean isSnapshotFinished(final SnapshotDescription snapshot)
throws IOException, HBaseSnapshotException, UnknownSnapshotException {
@ -2848,6 +2944,7 @@ public class HBaseAdmin implements Admin {
* @throws RestoreSnapshotException if snapshot failed to be restored
* @throws IllegalArgumentException if the restore request is formatted incorrectly
*/
@Override
public void restoreSnapshot(final byte[] snapshotName)
throws IOException, RestoreSnapshotException {
restoreSnapshot(Bytes.toString(snapshotName));
@ -2866,6 +2963,7 @@ public class HBaseAdmin implements Admin {
* @throws RestoreSnapshotException if snapshot failed to be restored
* @throws IllegalArgumentException if the restore request is formatted incorrectly
*/
@Override
public void restoreSnapshot(final String snapshotName)
throws IOException, RestoreSnapshotException {
boolean takeFailSafeSnapshot =
@ -2889,6 +2987,7 @@ public class HBaseAdmin implements Admin {
* @throws RestoreSnapshotException if snapshot failed to be restored
* @throws IllegalArgumentException if the restore request is formatted incorrectly
*/
@Override
public void restoreSnapshot(final byte[] snapshotName, final boolean takeFailSafeSnapshot)
throws IOException, RestoreSnapshotException {
restoreSnapshot(Bytes.toString(snapshotName), takeFailSafeSnapshot);
@ -2910,6 +3009,7 @@ public class HBaseAdmin implements Admin {
* @throws RestoreSnapshotException if snapshot failed to be restored
* @throws IllegalArgumentException if the restore request is formatted incorrectly
*/
@Override
public void restoreSnapshot(final String snapshotName, boolean takeFailSafeSnapshot)
throws IOException, RestoreSnapshotException {
TableName tableName = null;
@ -3013,6 +3113,7 @@ public class HBaseAdmin implements Admin {
* @throws RestoreSnapshotException if snapshot failed to be cloned
* @throws IllegalArgumentException if the specified table has not a valid name
*/
@Override
public void cloneSnapshot(final byte[] snapshotName, final TableName tableName)
throws IOException, TableExistsException, RestoreSnapshotException, InterruptedException {
cloneSnapshot(Bytes.toString(snapshotName), tableName);
@ -3045,6 +3146,7 @@ public class HBaseAdmin implements Admin {
* @throws RestoreSnapshotException if snapshot failed to be cloned
* @throws IllegalArgumentException if the specified table has not a valid name
*/
@Override
public void cloneSnapshot(final String snapshotName, final TableName tableName)
throws IOException, TableExistsException, RestoreSnapshotException, InterruptedException {
if (tableExists(tableName)) {
@ -3065,6 +3167,7 @@ public class HBaseAdmin implements Admin {
* @return data returned after procedure execution. null if no return data.
* @throws IOException
*/
@Override
public byte[] execProcedureWithRet(String signature, String instance,
Map<String, String> props) throws IOException {
ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
@ -3098,6 +3201,7 @@ public class HBaseAdmin implements Admin {
* @param props Property/Value pairs of properties passing to the procedure
* @throws IOException
*/
@Override
public void execProcedure(String signature, String instance,
Map<String, String> props) throws IOException {
ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
@ -3164,6 +3268,7 @@ public class HBaseAdmin implements Admin {
* @return true if the specified procedure is finished successfully, false if it is still running
* @throws IOException if the specified procedure finished with error
*/
@Override
public boolean isProcedureFinished(String signature, String instance, Map<String, String> props)
throws IOException {
final ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
@ -3264,6 +3369,7 @@ public class HBaseAdmin implements Admin {
* @return a list of snapshot descriptors for completed snapshots
* @throws IOException if a network error occurs
*/
@Override
public List<SnapshotDescription> listSnapshots() throws IOException {
return executeCallable(new MasterCallable<List<SnapshotDescription>>(getConnection()) {
@Override
@ -3281,6 +3387,7 @@ public class HBaseAdmin implements Admin {
* @return - returns a List of SnapshotDescription
* @throws IOException if a remote or network exception occurs
*/
@Override
public List<SnapshotDescription> listSnapshots(String regex) throws IOException {
return listSnapshots(Pattern.compile(regex));
}
@ -3292,6 +3399,7 @@ public class HBaseAdmin implements Admin {
* @return - returns a List of SnapshotDescription
* @throws IOException if a remote or network exception occurs
*/
@Override
public List<SnapshotDescription> listSnapshots(Pattern pattern) throws IOException {
List<SnapshotDescription> matched = new LinkedList<SnapshotDescription>();
List<SnapshotDescription> snapshots = listSnapshots();
@ -3308,6 +3416,7 @@ public class HBaseAdmin implements Admin {
* @param snapshotName name of the snapshot
* @throws IOException if a remote or network exception occurs
*/
@Override
public void deleteSnapshot(final byte[] snapshotName) throws IOException {
deleteSnapshot(Bytes.toString(snapshotName));
}
@ -3317,6 +3426,7 @@ public class HBaseAdmin implements Admin {
* @param snapshotName name of the snapshot
* @throws IOException if a remote or network exception occurs
*/
@Override
public void deleteSnapshot(final String snapshotName) throws IOException {
// make sure the snapshot is possibly valid
TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(snapshotName));
@ -3337,6 +3447,7 @@ public class HBaseAdmin implements Admin {
* @param regex The regular expression to match against
* @throws IOException if a remote or network exception occurs
*/
@Override
public void deleteSnapshots(final String regex) throws IOException {
deleteSnapshots(Pattern.compile(regex));
}
@ -3346,6 +3457,7 @@ public class HBaseAdmin implements Admin {
* @param pattern pattern for names of the snapshot to match
* @throws IOException if a remote or network exception occurs
*/
@Override
public void deleteSnapshots(final Pattern pattern) throws IOException {
List<SnapshotDescription> snapshots = listSnapshots(pattern);
for (final SnapshotDescription snapshot : snapshots) {
@ -3430,6 +3542,7 @@ public class HBaseAdmin implements Admin {
*
* @return A MasterCoprocessorRpcChannel instance
*/
@Override
public CoprocessorRpcChannel coprocessorService() {
return new MasterCoprocessorRpcChannel(connection);
}

View File

@ -62,7 +62,13 @@ public class RegionReplicaUtil {
return getRegionInfoForReplica(regionInfo, DEFAULT_REPLICA_ID);
}
/** @return true if this replicaId corresponds to default replica for the region */
public static boolean isDefaultReplica(int replicaId) {
return DEFAULT_REPLICA_ID == replicaId;
}
/** @return true if this region is a default replica for the region */
public static boolean isDefaultReplica(HRegionInfo hri) {
return hri.getReplicaId() == DEFAULT_REPLICA_ID;
}
}

View File

@ -412,14 +412,18 @@ public final class HConstants {
public static final byte [] SEQNUM_QUALIFIER = Bytes.toBytes(SEQNUM_QUALIFIER_STR);
/** The state column qualifier */
public static final byte [] STATE_QUALIFIER = Bytes.toBytes("state");
public static final String STATE_QUALIFIER_STR = "state";
public static final byte [] STATE_QUALIFIER = Bytes.toBytes(STATE_QUALIFIER_STR);
/**
* The serverName column qualifier. Its the server where the region is
* transitioning on, while column server is the server where the region is
* opened on. They are the same when the region is in state OPEN.
*/
public static final byte [] SERVERNAME_QUALIFIER = Bytes.toBytes("sn");
public static final String SERVERNAME_QUALIFIER_STR = "sn";
public static final byte [] SERVERNAME_QUALIFIER = Bytes.toBytes(SERVERNAME_QUALIFIER_STR);
/** The lower-half split region column qualifier */
public static final byte [] SPLITA_QUALIFIER = Bytes.toBytes("splitA");

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
@ -270,7 +271,9 @@ public class MetaEditor extends MetaReader {
throws IOException {
List<Put> puts = new ArrayList<Put>();
for (HRegionInfo regionInfo : regionInfos) {
puts.add(makePutFromRegionInfo(regionInfo));
if (RegionReplicaUtil.isDefaultReplica(regionInfo)) {
puts.add(makePutFromRegionInfo(regionInfo));
}
}
putsToMetaTable(catalogTracker, puts);
LOG.info("Added " + puts.size());
@ -544,7 +547,7 @@ public class MetaEditor extends MetaReader {
return p;
}
private static Put addLocation(final Put p, final ServerName sn, long openSeqNum, int replicaId){
public static Put addLocation(final Put p, final ServerName sn, long openSeqNum, int replicaId){
p.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getServerColumn(replicaId),
Bytes.toBytes(sn.getHostAndPort()));
p.addImmutable(HConstants.CATALOG_FAMILY, MetaReader.getStartCodeColumn(replicaId),

View File

@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
@ -59,6 +61,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
@ -1382,7 +1385,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
/**
* Marks the region as online. Removes it from regions in transition and
* updates the in-memory assignment information.
@ -2659,20 +2662,51 @@ public class AssignmentManager extends ZooKeeperListener {
boolean retainAssignment = server.getConfiguration().
getBoolean("hbase.master.startup.retainassign", true);
Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
if (retainAssignment) {
assign(allRegions);
} else {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
assign(regions);
}
for (HRegionInfo hri : allRegions.keySet()) {
for (HRegionInfo hri : regionsFromMetaScan) {
TableName tableName = hri.getTable();
if (!tableStateManager.isTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED)) {
setEnabledTable(tableName);
}
}
// assign all the replicas that were not recorded in the meta
assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
}
/**
* Get a list of replica regions that are:
* not recorded in meta yet. We might not have recorded the locations
* for the replicas since the replicas may not have been online yet, master restarted
* in the middle of assigning, ZK erased, etc.
* @param regionsRecordedInMeta the list of regions we know are recorded in meta
* either as a default, or, as the location of a replica
* @param master
* @return list of replica regions
* @throws IOException
*/
public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
for (HRegionInfo hri : regionsRecordedInMeta) {
TableName table = hri.getTable();
HTableDescriptor htd = master.getTableDescriptors().get(table);
// look at the HTD for the replica count. That's the source of truth
int desiredRegionReplication = htd.getRegionReplication();
for (int i = 0; i < desiredRegionReplication; i++) {
HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
if (regionsRecordedInMeta.contains(replica)) continue;
regionsNotRecordedInMeta.add(replica);
}
}
return regionsNotRecordedInMeta;
}
/**
@ -2725,37 +2759,42 @@ public class AssignmentManager extends ZooKeeperListener {
Set<ServerName> offlineServers = new HashSet<ServerName>();
// Iterate regions in META
for (Result result : results) {
HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(result);
if (regionInfo == null) continue;
State state = RegionStateStore.getRegionState(result);
ServerName lastHost = HRegionInfo.getServerName(result);
ServerName regionLocation = RegionStateStore.getRegionServer(result);
regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
// Region is not open (either offline or in transition), skip
continue;
}
TableName tableName = regionInfo.getTable();
if (!onlineServers.contains(regionLocation)) {
// Region is located on a server that isn't online
offlineServers.add(regionLocation);
if (useZKForAssignment) {
HRegionLocation[] locations = MetaReader.getRegionLocations(result).getRegionLocations();
if (locations == null) continue;
for (HRegionLocation hrl : locations) {
HRegionInfo regionInfo = hrl.getRegionInfo();
if (regionInfo == null) continue;
int replicaId = regionInfo.getReplicaId();
State state = RegionStateStore.getRegionState(result, replicaId);
ServerName lastHost = hrl.getServerName();
ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
// Region is not open (either offline or in transition), skip
continue;
}
TableName tableName = regionInfo.getTable();
if (!onlineServers.contains(regionLocation)) {
// Region is located on a server that isn't online
offlineServers.add(regionLocation);
if (useZKForAssignment) {
regionStates.regionOffline(regionInfo);
}
} else if (!disabledOrEnablingTables.contains(tableName)) {
// Region is being served and on an active server
// add only if region not in disabled or enabling table
regionStates.regionOnline(regionInfo, regionLocation);
balancer.regionOnline(regionInfo, regionLocation);
} else if (useZKForAssignment) {
regionStates.regionOffline(regionInfo);
}
} else if (!disabledOrEnablingTables.contains(tableName)) {
// Region is being served and on an active server
// add only if region not in disabled or enabling table
regionStates.regionOnline(regionInfo, regionLocation);
balancer.regionOnline(regionInfo, regionLocation);
} else if (useZKForAssignment) {
regionStates.regionOffline(regionInfo);
}
// need to enable the table if not disabled or disabling or enabling
// this will be used in rolling restarts
if (!disabledOrDisablingOrEnabling.contains(tableName)
&& !getTableStateManager().isTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED)) {
setEnabledTable(tableName);
// need to enable the table if not disabled or disabling or enabling
// this will be used in rolling restarts
if (!disabledOrDisablingOrEnabling.contains(tableName)
&& !getTableStateManager().isTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED)) {
setEnabledTable(tableName);
}
}
}
return offlineServers;
@ -3876,4 +3915,8 @@ public class AssignmentManager extends ZooKeeperListener {
public LoadBalancer getBalancer() {
return this.balancer;
}
public Map<ServerName, List<HRegionInfo>> getSnapShotOfAssignment(List<HRegionInfo> infos) {
return getRegionStates().getRegionAssignments(infos);
}
}

View File

@ -1300,18 +1300,30 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
private HRegionInfo[] getHRegionInfos(HTableDescriptor hTableDescriptor,
byte[][] splitKeys) {
HRegionInfo[] hRegionInfos = null;
int numRegionReplicas = hTableDescriptor.getRegionReplication();
if (numRegionReplicas <= 0) {
LOG.warn("Invalid number of replicas per region in the table descriptor. Setting it to 1.");
numRegionReplicas = 1;
}
long regionId = System.currentTimeMillis();
if (splitKeys == null || splitKeys.length == 0) {
hRegionInfos = new HRegionInfo[]{
new HRegionInfo(hTableDescriptor.getTableName(), null, null)};
hRegionInfos = new HRegionInfo[numRegionReplicas];
for (int i = 0; i < numRegionReplicas; i++) {
hRegionInfos[i] = new HRegionInfo(hTableDescriptor.getTableName(), null, null,
false, regionId, (short)i);
}
} else {
int numRegions = splitKeys.length + 1;
hRegionInfos = new HRegionInfo[numRegions];
hRegionInfos = new HRegionInfo[numRegions * numRegionReplicas];
byte[] startKey = null;
byte[] endKey = null;
for (int i = 0; i < numRegions; i++) {
endKey = (i == splitKeys.length) ? null : splitKeys[i];
hRegionInfos[i] =
new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey);
for (int j = 0; j < numRegionReplicas; j++) {
hRegionInfos[i*numRegionReplicas + j] =
new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey,
false, regionId, (short)j);
}
startKey = endKey;
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.master;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -66,4 +67,20 @@ public class RackManager {
return UNKNOWN_RACK;
}
/**
* Same as {@link #getRack(ServerName)} except that a list is passed
* @param servers
* @return
*/
public List<String> getRack(List<ServerName> servers) {
// just a note - switchMapping caches results (at least the implementation should unless the
// resolution is really a lightweight process)
List<String> serversAsString = new ArrayList<String>(servers.size());
for (ServerName server : servers) {
serversAsString.add(server.getHostname());
}
List<String> racks = switchMapping.resolve(serversAsString);
return racks;
}
}

View File

@ -26,11 +26,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
@ -51,6 +54,9 @@ import com.google.common.base.Preconditions;
public class RegionStateStore {
private static final Log LOG = LogFactory.getLog(RegionStateStore.class);
/** The delimiter for meta columns for replicaIds > 0 */
protected static final char META_REPLICA_ID_DELIMITER = '_';
private volatile HRegion metaRegion;
private volatile HTableInterface metaTable;
private volatile boolean initialized;
@ -67,25 +73,48 @@ public class RegionStateStore {
* @return A ServerName instance or {@link HRegionInfo#getServerName(Result)}
* if necessary fields not found or empty.
*/
static ServerName getRegionServer(final Result r) {
Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, HConstants.SERVERNAME_QUALIFIER);
if (cell == null || cell.getValueLength() == 0) return HRegionInfo.getServerName(r);
static ServerName getRegionServer(final Result r, int replicaId) {
Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId));
if (cell == null || cell.getValueLength() == 0) {
RegionLocations locations = MetaReader.getRegionLocations(r);
if (locations != null) {
HRegionLocation location = locations.getRegionLocation(replicaId);
if (location != null) {
return location.getServerName();
}
}
return null;
}
return ServerName.parseServerName(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()));
}
private static byte[] getServerNameColumn(int replicaId) {
return replicaId == 0
? HConstants.SERVERNAME_QUALIFIER
: Bytes.toBytes(HConstants.SERVERNAME_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
+ String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
}
/**
* Pull the region state from a catalog table {@link Result}.
* @param r Result to pull the region state from
* @return the region state, or OPEN if there's no value written.
*/
static State getRegionState(final Result r) {
Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER);
static State getRegionState(final Result r, int replicaId) {
Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, getStateColumn(replicaId));
if (cell == null || cell.getValueLength() == 0) return State.OPEN;
return State.valueOf(Bytes.toString(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()));
}
private static byte[] getStateColumn(int replicaId) {
return replicaId == 0
? HConstants.STATE_QUALIFIER
: Bytes.toBytes(HConstants.STATE_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
+ String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
}
/**
* Check if we should persist a state change in meta. Generally it's
* better to persist all state changes. However, we should not do that
@ -159,27 +188,23 @@ public class RegionStateStore {
State state = newState.getState();
try {
Put put = new Put(hri.getRegionName());
int replicaId = hri.getReplicaId();
Put put = new Put(MetaReader.getMetaKeyForRegion(hri));
StringBuilder info = new StringBuilder("Updating row ");
info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
if (serverName != null && !serverName.equals(oldServer)) {
put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVERNAME_QUALIFIER,
put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
Bytes.toBytes(serverName.getServerName()));
info.append("&sn=").append(serverName);
}
if (openSeqNum >= 0) {
Preconditions.checkArgument(state == State.OPEN
&& serverName != null, "Open region should be on a server");
put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
Bytes.toBytes(serverName.getHostAndPort()));
put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
Bytes.toBytes(serverName.getStartcode()));
put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
Bytes.toBytes(openSeqNum));
MetaEditor.addLocation(put, serverName, openSeqNum, replicaId);
info.append("&openSeqNum=").append(openSeqNum);
info.append("&server=").append(serverName);
}
put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
Bytes.toBytes(state.name()));
LOG.info(info);

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -76,6 +77,11 @@ public class RegionStates {
*/
private final Map<ServerName, Set<HRegionInfo>> serverHoldings;
/**
* Maintains the mapping from the default region to the replica regions.
*/
private final Map<HRegionInfo, Set<HRegionInfo>> defaultReplicaToOtherReplicas;
/**
* Region to server assignment map.
* Contains the server a given region is currently assigned to.
@ -124,6 +130,7 @@ public class RegionStates {
regionStates = new HashMap<String, RegionState>();
regionsInTransition = new HashMap<String, RegionState>();
serverHoldings = new HashMap<ServerName, Set<HRegionInfo>>();
defaultReplicaToOtherReplicas = new HashMap<HRegionInfo, Set<HRegionInfo>>();
regionAssignments = new TreeMap<HRegionInfo, ServerName>();
lastAssignments = new HashMap<String, ServerName>();
processedServers = new HashMap<ServerName, Long>();
@ -141,6 +148,43 @@ public class RegionStates {
return (Map<HRegionInfo, ServerName>)regionAssignments.clone();
}
/**
* Return the replicas (including default) for the regions grouped by ServerName
* @param regions
* @return a pair containing the groupings as a map
*/
synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(List<HRegionInfo> regions) {
Map<ServerName, List<HRegionInfo>> map = new HashMap<ServerName, List<HRegionInfo>>();
for (HRegionInfo region : regions) {
HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region);
ServerName server = regionAssignments.get(defaultReplica);
List<HRegionInfo> regionsOnServer;
if (server != null) {
regionsOnServer = map.get(server);
if (regionsOnServer == null) {
regionsOnServer = new ArrayList<HRegionInfo>(1);
map.put(server, regionsOnServer);
}
regionsOnServer.add(defaultReplica);
}
Set<HRegionInfo> allReplicas = defaultReplicaToOtherReplicas.get(defaultReplica);
if (allReplicas != null) {
for (HRegionInfo hri : allReplicas) {
server = regionAssignments.get(hri);
if (server != null) {
regionsOnServer = map.get(server);
if (regionsOnServer == null) {
regionsOnServer = new ArrayList<HRegionInfo>(1);
map.put(server, regionsOnServer);
}
regionsOnServer.add(hri);
}
}
}
}
return map;
}
public synchronized ServerName getRegionServerOfRegion(HRegionInfo hri) {
return regionAssignments.get(hri);
}
@ -293,12 +337,7 @@ public class RegionStates {
regionsInTransition.put(encodedName, regionState);
}
if (lastHost != null && newState != State.SPLIT) {
Set<HRegionInfo> regions = serverHoldings.get(lastHost);
if (regions == null) {
regions = new HashSet<HRegionInfo>();
serverHoldings.put(lastHost, regions);
}
regions.add(hri);
addToServerHoldings(serverName, hri);
}
}
return regionState;
@ -372,24 +411,47 @@ public class RegionStates {
ServerName oldServerName = regionAssignments.put(hri, serverName);
if (!serverName.equals(oldServerName)) {
LOG.info("Onlined " + hri.getShortNameToLog() + " on " + serverName);
Set<HRegionInfo> regions = serverHoldings.get(serverName);
if (regions == null) {
regions = new HashSet<HRegionInfo>();
serverHoldings.put(serverName, regions);
}
regions.add(hri);
addToServerHoldings(serverName, hri);
if (oldServerName != null) {
LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
oldRegions.remove(hri);
if (oldRegions.isEmpty()) {
serverHoldings.remove(oldServerName);
}
removeFromServerHoldings(oldServerName, hri);
}
}
}
}
private void addToServerHoldings(ServerName serverName, HRegionInfo hri) {
Set<HRegionInfo> regions = serverHoldings.get(serverName);
if (regions == null) {
regions = new HashSet<HRegionInfo>();
serverHoldings.put(serverName, regions);
}
regions.add(hri);
HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri);
Set<HRegionInfo> replicas =
defaultReplicaToOtherReplicas.get(defaultReplica);
if (replicas == null) {
replicas = new HashSet<HRegionInfo>();
defaultReplicaToOtherReplicas.put(defaultReplica, replicas);
}
replicas.add(hri);
}
private void removeFromServerHoldings(ServerName serverName, HRegionInfo hri) {
Set<HRegionInfo> oldRegions = serverHoldings.get(serverName);
oldRegions.remove(hri);
if (oldRegions.isEmpty()) {
serverHoldings.remove(serverName);
}
HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri);
Set<HRegionInfo> replicas = defaultReplicaToOtherReplicas.get(defaultReplica);
replicas.remove(hri);
if (replicas.isEmpty()) {
defaultReplicaToOtherReplicas.remove(defaultReplica);
}
}
/**
* A dead server's hlogs have been split so that all the regions
* used to be open on it can be safely assigned now. Mark them assignable.
@ -468,11 +530,7 @@ public class RegionStates {
ServerName oldServerName = regionAssignments.remove(hri);
if (oldServerName != null && serverHoldings.containsKey(oldServerName)) {
LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
oldRegions.remove(hri);
if (oldRegions.isEmpty()) {
serverHoldings.remove(oldServerName);
}
removeFromServerHoldings(oldServerName, hri);
}
}
}

View File

@ -33,6 +33,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
@ -41,7 +43,6 @@ import org.apache.hadoop.hbase.catalog.MetaReader.Visitor;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.master.balancer.FavoredNodesPlan;
import org.apache.hadoop.hbase.util.Pair;
/**
* Used internally for reading meta and constructing datastructures that are
@ -100,20 +101,27 @@ public class SnapshotOfRegionAssignmentFromMeta {
public boolean visit(Result result) throws IOException {
try {
if (result == null || result.isEmpty()) return true;
Pair<HRegionInfo, ServerName> regionAndServer =
HRegionInfo.getHRegionInfoAndServerName(result);
HRegionInfo hri = regionAndServer.getFirst();
if (hri == null) return true;
RegionLocations rl = MetaReader.getRegionLocations(result);
if (rl == null) return true;
HRegionInfo hri = rl.getRegionLocation(0).getRegionInfo();
if (hri == null) return true;
if (hri.getTable() == null) return true;
if (disabledTables.contains(hri.getTable())) {
return true;
}
// Are we to include split parents in the list?
if (excludeOfflinedSplitParents && hri.isSplit()) return true;
// Add the current assignment to the snapshot
addAssignment(hri, regionAndServer.getSecond());
addRegion(hri);
HRegionLocation[] hrls = rl.getRegionLocations();
// Add the current assignment to the snapshot for all replicas
for (int i = 0; i < hrls.length; i++) {
if (hrls[i] == null) continue;
hri = hrls[i].getRegionInfo();
if (hri == null) continue;
addAssignment(hri, hrls[i].getServerName());
addRegion(hri);
}
// the code below is to handle favored nodes
byte[] favoredNodes = result.getValue(HConstants.CATALOG_FAMILY,
FavoredNodeAssignmentHelper.FAVOREDNODES_QUALIFIER);
@ -158,6 +166,8 @@ public class SnapshotOfRegionAssignmentFromMeta {
// Process the region to region server map
regionToRegionServerMap.put(regionInfo, server);
if (server == null) return;
// Process the region server to region map
List<HRegionInfo> regionList = regionServerToRegionMap.get(server);
if (regionList == null) {

View File

@ -28,7 +28,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -40,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.master.AssignmentManager;
@ -143,7 +143,13 @@ public abstract class ModifyRegionUtils {
CompletionService<HRegionInfo> completionService =
new ExecutorCompletionService<HRegionInfo>(exec);
List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
int defaultReplicas = 0;
for (final HRegionInfo newRegion : newRegions) {
regionInfos.add(newRegion);
if (!RegionReplicaUtil.isDefaultReplica(newRegion)) {
continue;
}
defaultReplicas++;
completionService.submit(new Callable<HRegionInfo>() {
@Override
public HRegionInfo call() throws IOException {
@ -153,10 +159,8 @@ public abstract class ModifyRegionUtils {
}
try {
// wait for all regions to finish creation
for (int i = 0; i < regionNumber; i++) {
Future<HRegionInfo> future = completionService.take();
HRegionInfo regionInfo = future.get();
regionInfos.add(regionInfo);
for (int i = 0; i < defaultReplicas; i++) {
completionService.take().get();
}
} catch (InterruptedException e) {
LOG.error("Caught " + e + " during region creation");

View File

@ -0,0 +1,323 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.catalog.MetaReader.Visitor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestMasterOperationsForRegionReplicas {
final static Log LOG = LogFactory.getLog(TestRegionPlacement.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static HBaseAdmin admin;
private static int numSlaves = 2;
@BeforeClass
public static void setupBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
TEST_UTIL.startMiniCluster(numSlaves);
admin = new HBaseAdmin(conf);
while(admin.getClusterStatus().getServers().size() < numSlaves) {
Thread.sleep(100);
}
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testCreateTableWithSingleReplica() throws Exception {
final int numRegions = 3;
final int numReplica = 1;
final TableName table = TableName.valueOf("singleReplicaTable");
try {
HTableDescriptor desc = new HTableDescriptor(table);
desc.setRegionReplication(numReplica);
desc.addFamily(new HColumnDescriptor("family"));
admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), numRegions);
CatalogTracker ct = new CatalogTracker(TEST_UTIL.getConfiguration());
validateNumberOfRowsInMeta(table, numRegions, ct);
List<HRegionInfo> hris = MetaReader.getTableRegions(ct, table);
assert(hris.size() == numRegions * numReplica);
} finally {
admin.disableTable(table);
admin.deleteTable(table);
}
}
@Test
public void testCreateTableWithMultipleReplicas() throws Exception {
final TableName table = TableName.valueOf("fooTable");
final int numRegions = 3;
final int numReplica = 2;
try {
HTableDescriptor desc = new HTableDescriptor(table);
desc.setRegionReplication(numReplica);
desc.addFamily(new HColumnDescriptor("family"));
admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), numRegions);
TEST_UTIL.waitTableEnabled(table.getName());
CatalogTracker ct = new CatalogTracker(TEST_UTIL.getConfiguration());
validateNumberOfRowsInMeta(table, numRegions, ct);
List<HRegionInfo> hris = MetaReader.getTableRegions(ct, table);
assert(hris.size() == numRegions * numReplica);
// check that the master created expected number of RegionState objects
for (int i = 0; i < numRegions; i++) {
for (int j = 0; j < numReplica; j++) {
HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
.getRegionStates().getRegionState(replica);
assert (state != null);
}
}
// TODO: HBASE-10351 should uncomment the following tests (since the tests assume region placements are handled)
// List<Result> metaRows = MetaReader.fullScan(ct);
// int numRows = 0;
// for (Result result : metaRows) {
// RegionLocations locations = MetaReader.getRegionLocations(result);
// HRegionInfo hri = locations.getRegionLocation().getRegionInfo();
// if (!hri.getTable().equals(table)) continue;
// numRows += 1;
// HRegionLocation[] servers = locations.getRegionLocations();
// // have two locations for the replicas of a region, and the locations should be different
// assert(servers.length == 2);
// assert(!servers[0].equals(servers[1]));
// }
// assert(numRows == numRegions);
//
// // The same verification of the meta as above but with the SnapshotOfRegionAssignmentFromMeta
// // class
// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
//
// // Now kill the master, restart it and see if the assignments are kept
// ServerName master = TEST_UTIL.getHBaseClusterInterface().getClusterStatus().getMaster();
// TEST_UTIL.getHBaseClusterInterface().stopMaster(master);
// TEST_UTIL.getHBaseClusterInterface().waitForMasterToStop(master, 30000);
// TEST_UTIL.getHBaseClusterInterface().startMaster(master.getHostname());
// TEST_UTIL.getHBaseClusterInterface().waitForActiveAndReadyMaster();
// for (int i = 0; i < numRegions; i++) {
// for (int j = 0; j < numReplica; j++) {
// HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hris.get(i), j);
// RegionState state = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
// .getRegionStates().getRegionState(replica);
// assert (state != null);
// }
// }
// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
//
// // Now shut the whole cluster down, and verify the assignments are kept so that the
// // availability constraints are met.
// TEST_UTIL.getConfiguration().setBoolean("hbase.master.startup.retainassign", true);
// TEST_UTIL.shutdownMiniHBaseCluster();
// TEST_UTIL.startMiniHBaseCluster(1, numSlaves);
// TEST_UTIL.waitTableEnabled(table.getName());
// ct = new CatalogTracker(TEST_UTIL.getConfiguration());
// validateFromSnapshotFromMeta(table, numRegions, numReplica, ct);
//
// // Now shut the whole cluster down, and verify regions are assigned even if there is only
// // one server running
// TEST_UTIL.shutdownMiniHBaseCluster();
// TEST_UTIL.startMiniHBaseCluster(1, 1);
// TEST_UTIL.waitTableEnabled(table.getName());
// ct = new CatalogTracker(TEST_UTIL.getConfiguration());
// validateSingleRegionServerAssignment(ct, numRegions, numReplica);
// for (int i = 1; i < numSlaves; i++) { //restore the cluster
// TEST_UTIL.getMiniHBaseCluster().startRegionServer();
// }
//TODO: HBASE-10361 patch should uncomment the test below
// //check on alter table
// admin.disableTable(table);
// assert(admin.isTableDisabled(table));
// //increase the replica
// desc.setRegionReplication(numReplica + 1);
// admin.modifyTable(table, desc);
// admin.enableTable(table);
// assert(admin.isTableEnabled(table));
// List<HRegionInfo> regions = TEST_UTIL.getMiniHBaseCluster().getMaster()
// .getAssignmentManager().getRegionStates().getRegionsOfTable(table);
// assert(regions.size() == numRegions * (numReplica + 1));
//
// //decrease the replica(earlier, table was modified to have a replica count of numReplica + 1)
// admin.disableTable(table);
// desc.setRegionReplication(numReplica);
// admin.modifyTable(table, desc);
// admin.enableTable(table);
// assert(admin.isTableEnabled(table));
// regions = TEST_UTIL.getMiniHBaseCluster().getMaster()
// .getAssignmentManager().getRegionStates().getRegionsOfTable(table);
// assert(regions.size() == numRegions * numReplica);
// //also make sure the meta table has the replica locations removed
// hris = MetaReader.getTableRegions(ct, table);
// assert(hris.size() == numRegions * numReplica);
// //just check that the number of default replica regions in the meta table are the same
// //as the number of regions the table was created with, and the count of the
// //replicas is numReplica for each region
// Map<HRegionInfo, Integer> defaultReplicas = new HashMap<HRegionInfo, Integer>();
// for (HRegionInfo hri : hris) {
// Integer i;
// HRegionInfo regionReplica0 = hri.getRegionInfoForReplica(0);
// defaultReplicas.put(regionReplica0,
// (i = defaultReplicas.get(regionReplica0)) == null ? 1 : i + 1);
// }
// assert(defaultReplicas.size() == numRegions);
// Collection<Integer> counts = new HashSet<Integer>(defaultReplicas.values());
// assert(counts.size() == 1 && counts.contains(new Integer(numReplica)));
} finally {
admin.disableTable(table);
admin.deleteTable(table);
}
}
//@Test (TODO: enable when we have support for alter_table- HBASE-10361).
public void testIncompleteMetaTableReplicaInformation() throws Exception {
final TableName table = TableName.valueOf("fooTableTest1");
final int numRegions = 3;
final int numReplica = 2;
try {
// Create a table and let the meta table be updated with the location of the
// region locations.
HTableDescriptor desc = new HTableDescriptor(table);
desc.setRegionReplication(numReplica);
desc.addFamily(new HColumnDescriptor("family"));
admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), numRegions);
TEST_UTIL.waitTableEnabled(table.getName());
CatalogTracker ct = new CatalogTracker(TEST_UTIL.getConfiguration());
Set<byte[]> tableRows = new HashSet<byte[]>();
List<HRegionInfo> hris = MetaReader.getTableRegions(ct, table);
for (HRegionInfo hri : hris) {
tableRows.add(hri.getRegionName());
}
admin.disableTable(table);
// now delete one replica info from all the rows
// this is to make the meta appear to be only partially updated
HTable metaTable = new HTable(TableName.META_TABLE_NAME, ct.getConnection());
for (byte[] row : tableRows) {
Delete deleteOneReplicaLocation = new Delete(row);
deleteOneReplicaLocation.deleteColumns(HConstants.CATALOG_FAMILY, MetaReader.getServerColumn(1));
deleteOneReplicaLocation.deleteColumns(HConstants.CATALOG_FAMILY, MetaReader.getSeqNumColumn(1));
deleteOneReplicaLocation.deleteColumns(HConstants.CATALOG_FAMILY, MetaReader.getStartCodeColumn(1));
metaTable.delete(deleteOneReplicaLocation);
}
metaTable.close();
// even if the meta table is partly updated, when we re-enable the table, we should
// get back the desired number of replicas for the regions
admin.enableTable(table);
assert(admin.isTableEnabled(table));
List<HRegionInfo> regions = TEST_UTIL.getMiniHBaseCluster().getMaster()
.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
assert(regions.size() == numRegions * numReplica);
} finally {
admin.disableTable(table);
admin.deleteTable(table);
}
}
private String printRegions(List<HRegionInfo> regions) {
StringBuffer strBuf = new StringBuffer();
for (HRegionInfo r : regions) {
strBuf.append(" ____ " + r.toString());
}
return strBuf.toString();
}
private void validateNumberOfRowsInMeta(final TableName table, int numRegions, CatalogTracker ct)
throws IOException {
assert(admin.tableExists(table));
final AtomicInteger count = new AtomicInteger();
Visitor visitor = new Visitor() {
@Override
public boolean visit(Result r) throws IOException {
if (HRegionInfo.getHRegionInfo(r).getTable().equals(table)) count.incrementAndGet();
return true;
}
};
MetaReader.fullScan(ct, visitor);
assert(count.get() == numRegions);
}
private void validateFromSnapshotFromMeta(TableName table, int numRegions,
int numReplica, CatalogTracker ct) throws IOException {
SnapshotOfRegionAssignmentFromMeta snapshot = new SnapshotOfRegionAssignmentFromMeta(ct);
snapshot.initialize();
Map<HRegionInfo, ServerName> regionToServerMap = snapshot.getRegionToRegionServerMap();
assert(regionToServerMap.size() == numRegions * numReplica + 1); //'1' for the namespace
Map<ServerName, List<HRegionInfo>> serverToRegionMap = snapshot.getRegionServerToRegionMap();
for (Map.Entry<ServerName, List<HRegionInfo>> entry : serverToRegionMap.entrySet()) {
List<HRegionInfo> regions = entry.getValue();
Set<byte[]> setOfStartKeys = new HashSet<byte[]>();
for (HRegionInfo region : regions) {
byte[] startKey = region.getStartKey();
if (region.getTable().equals(table)) {
setOfStartKeys.add(startKey); //ignore other tables
LOG.info("--STARTKEY " + new String(startKey)+"--");
}
}
// the number of startkeys will be equal to the number of regions hosted in each server
// (each server will be hosting one replica of a region)
assertEquals(setOfStartKeys.size() , numRegions);
}
}
private void validateSingleRegionServerAssignment(CatalogTracker ct, int numRegions,
int numReplica) throws IOException {
SnapshotOfRegionAssignmentFromMeta snapshot = new SnapshotOfRegionAssignmentFromMeta(ct);
snapshot.initialize();
Map<HRegionInfo, ServerName> regionToServerMap = snapshot.getRegionToRegionServerMap();
assert(regionToServerMap.size() == numRegions * numReplica + 1); //'1' for the namespace
Map<ServerName, List<HRegionInfo>> serverToRegionMap = snapshot.getRegionServerToRegionMap();
assert(serverToRegionMap.keySet().size() == 1);
assert(serverToRegionMap.values().iterator().next().size() == numRegions * numReplica + 1);
}
}