HBASE-17520 Implement isTableEnabled/Disabled/Available methods
This commit is contained in:
parent
b290d14e1f
commit
752b258b7c
|
@ -20,20 +20,33 @@ package org.apache.hadoop.hbase;
|
|||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Optional;
|
||||
import java.util.SortedMap;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor.CollectingVisitor;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor.Visitor;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.Consistency;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.RawAsyncTable;
|
||||
import org.apache.hadoop.hbase.client.RawScanResultConsumer;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
|
@ -46,6 +59,14 @@ public class AsyncMetaTableAccessor {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(AsyncMetaTableAccessor.class);
|
||||
|
||||
|
||||
/** The delimiter for meta columns for replicaIds > 0 */
|
||||
private static final char META_REPLICA_ID_DELIMITER = '_';
|
||||
|
||||
/** A regex for parsing server columns from meta. See above javadoc for meta layout */
|
||||
private static final Pattern SERVER_COLUMN_PATTERN = Pattern
|
||||
.compile("^server(_[0-9a-fA-F]{4})?$");
|
||||
|
||||
public static CompletableFuture<Boolean> tableExists(RawAsyncTable metaTable, TableName tableName) {
|
||||
if (tableName.equals(META_TABLE_NAME)) {
|
||||
return CompletableFuture.completedFuture(true);
|
||||
|
@ -121,6 +142,350 @@ public class AsyncMetaTableAccessor {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to get table regions' info and server.
|
||||
* @param metaTable
|
||||
* @param tableName table we're looking for, can be null for getting all regions
|
||||
* @return the list of regioninfos and server. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
public static CompletableFuture<List<Pair<HRegionInfo, ServerName>>> getTableRegionsAndLocations(
|
||||
RawAsyncTable metaTable, final Optional<TableName> tableName) {
|
||||
return getTableRegionsAndLocations(metaTable, tableName, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to get table regions' info and server.
|
||||
* @param metaTable
|
||||
* @param tableName table we're looking for, can be null for getting all regions
|
||||
* @param excludeOfflinedSplitParents don't return split parents
|
||||
* @return the list of regioninfos and server. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
public static CompletableFuture<List<Pair<HRegionInfo, ServerName>>> getTableRegionsAndLocations(
|
||||
RawAsyncTable metaTable, final Optional<TableName> tableName,
|
||||
final boolean excludeOfflinedSplitParents) {
|
||||
CompletableFuture<List<Pair<HRegionInfo, ServerName>>> future = new CompletableFuture<>();
|
||||
if (tableName.filter((t) -> t.equals(TableName.META_TABLE_NAME)).isPresent()) {
|
||||
future.completeExceptionally(new IOException(
|
||||
"This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
|
||||
}
|
||||
|
||||
// Make a version of CollectingVisitor that collects HRegionInfo and ServerAddress
|
||||
CollectingVisitor<Pair<HRegionInfo, ServerName>> visitor = new CollectingVisitor<Pair<HRegionInfo, ServerName>>() {
|
||||
private Optional<RegionLocations> current = null;
|
||||
|
||||
@Override
|
||||
public boolean visit(Result r) throws IOException {
|
||||
current = getRegionLocations(r);
|
||||
if (!current.isPresent() || current.get().getRegionLocation().getRegionInfo() == null) {
|
||||
LOG.warn("No serialized HRegionInfo in " + r);
|
||||
return true;
|
||||
}
|
||||
HRegionInfo hri = current.get().getRegionLocation().getRegionInfo();
|
||||
if (excludeOfflinedSplitParents && hri.isSplitParent()) return true;
|
||||
// Else call super and add this Result to the collection.
|
||||
return super.visit(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
void add(Result r) {
|
||||
if (!current.isPresent()) {
|
||||
return;
|
||||
}
|
||||
for (HRegionLocation loc : current.get().getRegionLocations()) {
|
||||
if (loc != null) {
|
||||
this.results.add(new Pair<HRegionInfo, ServerName>(loc.getRegionInfo(), loc
|
||||
.getServerName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
scanMeta(metaTable, tableName, QueryType.REGION, visitor).whenComplete((v, error) -> {
|
||||
if (error != null) {
|
||||
future.completeExceptionally(error);
|
||||
return;
|
||||
}
|
||||
future.complete(visitor.getResults());
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a scan of META table for given table.
|
||||
* @param metaTable
|
||||
* @param tableName table withing we scan
|
||||
* @param type scanned part of meta
|
||||
* @param visitor Visitor invoked against each row
|
||||
*/
|
||||
private static CompletableFuture<Void> scanMeta(RawAsyncTable metaTable,
|
||||
Optional<TableName> tableName, QueryType type, final Visitor visitor) {
|
||||
return scanMeta(metaTable, getTableStartRowForMeta(tableName, type),
|
||||
getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a scan of META table for given table.
|
||||
* @param metaTable
|
||||
* @param startRow Where to start the scan
|
||||
* @param stopRow Where to stop the scan
|
||||
* @param type scanned part of meta
|
||||
* @param maxRows maximum rows to return
|
||||
* @param visitor Visitor invoked against each row
|
||||
*/
|
||||
private static CompletableFuture<Void> scanMeta(RawAsyncTable metaTable, Optional<byte[]> startRow,
|
||||
Optional<byte[]> stopRow, QueryType type, int maxRows, final Visitor visitor) {
|
||||
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
|
||||
Scan scan = getMetaScan(metaTable, rowUpperLimit);
|
||||
for (byte[] family : type.getFamilies()) {
|
||||
scan.addFamily(family);
|
||||
}
|
||||
startRow.ifPresent(scan::withStartRow);
|
||||
stopRow.ifPresent(scan::withStopRow);
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Scanning META" + " starting at row=" + Bytes.toStringBinary(scan.getStartRow())
|
||||
+ " stopping at row=" + Bytes.toStringBinary(scan.getStopRow()) + " for max="
|
||||
+ rowUpperLimit + " with caching=" + scan.getCaching());
|
||||
}
|
||||
|
||||
CompletableFuture<Void> future = new CompletableFuture<Void>();
|
||||
metaTable.scan(scan, new MetaTableRawScanResultConsumer(rowUpperLimit, visitor, future));
|
||||
return future;
|
||||
}
|
||||
|
||||
private static final class MetaTableRawScanResultConsumer implements RawScanResultConsumer {
|
||||
|
||||
private int currentRowCount;
|
||||
|
||||
private final int rowUpperLimit;
|
||||
|
||||
private final Visitor visitor;
|
||||
|
||||
private final CompletableFuture<Void> future;
|
||||
|
||||
MetaTableRawScanResultConsumer(int rowUpperLimit, Visitor visitor, CompletableFuture<Void> future) {
|
||||
this.rowUpperLimit = rowUpperLimit;
|
||||
this.visitor = visitor;
|
||||
this.future = future;
|
||||
this.currentRowCount = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
future.completeExceptionally(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
|
||||
justification = "https://github.com/findbugsproject/findbugs/issues/79")
|
||||
public void onComplete() {
|
||||
future.complete(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(Result[] results, ScanController controller) {
|
||||
for (Result result : results) {
|
||||
try {
|
||||
if (!visitor.visit(result)) {
|
||||
controller.terminate();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
controller.terminate();
|
||||
}
|
||||
if (++currentRowCount >= rowUpperLimit) {
|
||||
controller.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static Scan getMetaScan(RawAsyncTable metaTable, int rowUpperLimit) {
|
||||
Scan scan = new Scan();
|
||||
int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
|
||||
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
|
||||
if (metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS,
|
||||
HConstants.DEFAULT_USE_META_REPLICAS)) {
|
||||
scan.setConsistency(Consistency.TIMELINE);
|
||||
}
|
||||
if (rowUpperLimit <= scannerCaching) {
|
||||
scan.setLimit(rowUpperLimit);
|
||||
}
|
||||
int rows = Math.min(rowUpperLimit, scannerCaching);
|
||||
scan.setCaching(rows);
|
||||
return scan;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an HRegionLocationList extracted from the result.
|
||||
* @return an HRegionLocationList containing all locations for the region range or null if we
|
||||
* can't deserialize the result.
|
||||
*/
|
||||
private static Optional<RegionLocations> getRegionLocations(final Result r) {
|
||||
if (r == null) return Optional.empty();
|
||||
Optional<HRegionInfo> regionInfo = getHRegionInfo(r, getRegionInfoColumn());
|
||||
if (!regionInfo.isPresent()) return Optional.empty();
|
||||
|
||||
List<HRegionLocation> locations = new ArrayList<HRegionLocation>(1);
|
||||
NavigableMap<byte[], NavigableMap<byte[], byte[]>> familyMap = r.getNoVersionMap();
|
||||
|
||||
locations.add(getRegionLocation(r, regionInfo.get(), 0));
|
||||
|
||||
NavigableMap<byte[], byte[]> infoMap = familyMap.get(getCatalogFamily());
|
||||
if (infoMap == null) return Optional.of(new RegionLocations(locations));
|
||||
|
||||
// iterate until all serverName columns are seen
|
||||
int replicaId = 0;
|
||||
byte[] serverColumn = getServerColumn(replicaId);
|
||||
SortedMap<byte[], byte[]> serverMap = null;
|
||||
serverMap = infoMap.tailMap(serverColumn, false);
|
||||
|
||||
if (serverMap.isEmpty()) return Optional.of(new RegionLocations(locations));
|
||||
|
||||
for (Map.Entry<byte[], byte[]> entry : serverMap.entrySet()) {
|
||||
replicaId = parseReplicaIdFromServerColumn(entry.getKey());
|
||||
if (replicaId < 0) {
|
||||
break;
|
||||
}
|
||||
HRegionLocation location = getRegionLocation(r, regionInfo.get(), replicaId);
|
||||
// In case the region replica is newly created, it's location might be null. We usually do not
|
||||
// have HRL's in RegionLocations object with null ServerName. They are handled as null HRLs.
|
||||
if (location == null || location.getServerName() == null) {
|
||||
locations.add(null);
|
||||
} else {
|
||||
locations.add(location);
|
||||
}
|
||||
}
|
||||
|
||||
return Optional.of(new RegionLocations(locations));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the HRegionLocation parsed from the given meta row Result
|
||||
* for the given regionInfo and replicaId. The regionInfo can be the default region info
|
||||
* for the replica.
|
||||
* @param r the meta row result
|
||||
* @param regionInfo RegionInfo for default replica
|
||||
* @param replicaId the replicaId for the HRegionLocation
|
||||
* @return HRegionLocation parsed from the given meta row Result for the given replicaId
|
||||
*/
|
||||
private static HRegionLocation getRegionLocation(final Result r, final HRegionInfo regionInfo,
|
||||
final int replicaId) {
|
||||
Optional<ServerName> serverName = getServerName(r, replicaId);
|
||||
long seqNum = getSeqNumDuringOpen(r, replicaId);
|
||||
HRegionInfo replicaInfo = RegionReplicaUtil.getRegionInfoForReplica(regionInfo, replicaId);
|
||||
return new HRegionLocation(replicaInfo, serverName.orElse(null), seqNum);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link ServerName} from catalog table {@link Result}.
|
||||
* @param r Result to pull from
|
||||
* @return A ServerName instance.
|
||||
*/
|
||||
private static Optional<ServerName> getServerName(final Result r, final int replicaId) {
|
||||
byte[] serverColumn = getServerColumn(replicaId);
|
||||
Cell cell = r.getColumnLatestCell(getCatalogFamily(), serverColumn);
|
||||
if (cell == null || cell.getValueLength() == 0) return Optional.empty();
|
||||
String hostAndPort = Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
|
||||
cell.getValueLength());
|
||||
byte[] startcodeColumn = getStartCodeColumn(replicaId);
|
||||
cell = r.getColumnLatestCell(getCatalogFamily(), startcodeColumn);
|
||||
if (cell == null || cell.getValueLength() == 0) return Optional.empty();
|
||||
try {
|
||||
return Optional.of(ServerName.valueOf(hostAndPort,
|
||||
Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.error("Ignoring invalid region for server " + hostAndPort + "; cell=" + cell, e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The latest seqnum that the server writing to meta observed when opening the region.
|
||||
* E.g. the seqNum when the result of {@link #getServerName(Result, int)} was written.
|
||||
* @param r Result to pull the seqNum from
|
||||
* @return SeqNum, or HConstants.NO_SEQNUM if there's no value written.
|
||||
*/
|
||||
private static long getSeqNumDuringOpen(final Result r, final int replicaId) {
|
||||
Cell cell = r.getColumnLatestCell(getCatalogFamily(), getSeqNumColumn(replicaId));
|
||||
if (cell == null || cell.getValueLength() == 0) return HConstants.NO_SEQNUM;
|
||||
return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tableName table we're working with
|
||||
* @return start row for scanning META according to query type
|
||||
*/
|
||||
private static Optional<byte[]> getTableStartRowForMeta(Optional<TableName> tableName,
|
||||
QueryType type) {
|
||||
return tableName.map((table) -> {
|
||||
switch (type) {
|
||||
case REGION:
|
||||
byte[] startRow = new byte[table.getName().length + 2];
|
||||
System.arraycopy(table.getName(), 0, startRow, 0, table.getName().length);
|
||||
startRow[startRow.length - 2] = HConstants.DELIMITER;
|
||||
startRow[startRow.length - 1] = HConstants.DELIMITER;
|
||||
return startRow;
|
||||
case ALL:
|
||||
case TABLE:
|
||||
default:
|
||||
return table.getName();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tableName table we're working with
|
||||
* @return stop row for scanning META according to query type
|
||||
*/
|
||||
private static Optional<byte[]> getTableStopRowForMeta(Optional<TableName> tableName,
|
||||
QueryType type) {
|
||||
return tableName.map((table) -> {
|
||||
final byte[] stopRow;
|
||||
switch (type) {
|
||||
case REGION:
|
||||
stopRow = new byte[table.getName().length + 3];
|
||||
System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
|
||||
stopRow[stopRow.length - 3] = ' ';
|
||||
stopRow[stopRow.length - 2] = HConstants.DELIMITER;
|
||||
stopRow[stopRow.length - 1] = HConstants.DELIMITER;
|
||||
break;
|
||||
case ALL:
|
||||
case TABLE:
|
||||
default:
|
||||
stopRow = new byte[table.getName().length + 1];
|
||||
System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
|
||||
stopRow[stopRow.length - 1] = ' ';
|
||||
break;
|
||||
}
|
||||
return stopRow;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the HRegionInfo object from the column {@link HConstants#CATALOG_FAMILY} and
|
||||
* <code>qualifier</code> of the catalog table result.
|
||||
* @param r a Result object from the catalog table scan
|
||||
* @param qualifier Column family qualifier
|
||||
* @return An HRegionInfo instance.
|
||||
*/
|
||||
private static Optional<HRegionInfo> getHRegionInfo(final Result r, byte[] qualifier) {
|
||||
Cell cell = r.getColumnLatestCell(getCatalogFamily(), qualifier);
|
||||
if (cell == null) return Optional.empty();
|
||||
return Optional.ofNullable(HRegionInfo.parseFromOrNull(cell.getValueArray(),
|
||||
cell.getValueOffset(), cell.getValueLength()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the column family used for meta columns.
|
||||
* @return HConstants.CATALOG_FAMILY.
|
||||
*/
|
||||
private static byte[] getCatalogFamily() {
|
||||
return HConstants.CATALOG_FAMILY;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the column family used for table columns.
|
||||
* @return HConstants.TABLE_FAMILY.
|
||||
|
@ -129,6 +494,14 @@ public class AsyncMetaTableAccessor {
|
|||
return HConstants.TABLE_FAMILY;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the column qualifier for serialized region info
|
||||
* @return HConstants.REGIONINFO_QUALIFIER
|
||||
*/
|
||||
private static byte[] getRegionInfoColumn() {
|
||||
return HConstants.REGIONINFO_QUALIFIER;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the column qualifier for serialized table state
|
||||
* @return HConstants.TABLE_STATE_QUALIFIER
|
||||
|
@ -136,4 +509,61 @@ public class AsyncMetaTableAccessor {
|
|||
private static byte[] getStateColumn() {
|
||||
return HConstants.TABLE_STATE_QUALIFIER;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the column qualifier for server column for replicaId
|
||||
* @param replicaId the replicaId of the region
|
||||
* @return a byte[] for server column qualifier
|
||||
*/
|
||||
private static byte[] getServerColumn(int replicaId) {
|
||||
return replicaId == 0
|
||||
? HConstants.SERVER_QUALIFIER
|
||||
: Bytes.toBytes(HConstants.SERVER_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
|
||||
+ String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the column qualifier for server start code column for replicaId
|
||||
* @param replicaId the replicaId of the region
|
||||
* @return a byte[] for server start code column qualifier
|
||||
*/
|
||||
private static byte[] getStartCodeColumn(int replicaId) {
|
||||
return replicaId == 0
|
||||
? HConstants.STARTCODE_QUALIFIER
|
||||
: Bytes.toBytes(HConstants.STARTCODE_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
|
||||
+ String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the column qualifier for seqNum column for replicaId
|
||||
* @param replicaId the replicaId of the region
|
||||
* @return a byte[] for seqNum column qualifier
|
||||
*/
|
||||
private static byte[] getSeqNumColumn(int replicaId) {
|
||||
return replicaId == 0
|
||||
? HConstants.SEQNUM_QUALIFIER
|
||||
: Bytes.toBytes(HConstants.SEQNUM_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
|
||||
+ String.format(HRegionInfo.REPLICA_ID_FORMAT, replicaId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the replicaId from the server column qualifier. See top of the class javadoc
|
||||
* for the actual meta layout
|
||||
* @param serverColumn the column qualifier
|
||||
* @return an int for the replicaId
|
||||
*/
|
||||
private static int parseReplicaIdFromServerColumn(byte[] serverColumn) {
|
||||
String serverStr = Bytes.toString(serverColumn);
|
||||
|
||||
Matcher matcher = SERVER_COLUMN_PATTERN.matcher(serverStr);
|
||||
if (matcher.matches() && matcher.groupCount() > 0) {
|
||||
String group = matcher.group(1);
|
||||
if (group != null && group.length() > 0) {
|
||||
return Integer.parseInt(group.substring(1), 16);
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -221,6 +221,30 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<HTableDescriptor[]> disableTables(Pattern pattern);
|
||||
|
||||
/**
|
||||
* @param tableName name of table to check
|
||||
* @return true if table is off-line. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> isTableDisabled(TableName tableName);
|
||||
|
||||
/**
|
||||
* @param tableName name of table to check
|
||||
* @return true if all regions of the table are available. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> isTableAvailable(TableName tableName);
|
||||
|
||||
/**
|
||||
* Use this api to check if the table has been created with the specified number of splitkeys
|
||||
* which was used while creating the given table. Note : If this api is used after a table's
|
||||
* region gets splitted, the api may return false. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
* @param tableName name of table to check
|
||||
* @param splitKeys keys to check if the table has been created with all split keys
|
||||
*/
|
||||
CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys);
|
||||
|
||||
/**
|
||||
* Get the status of alter command - indicates how many regions have received the updated schema
|
||||
* Asynchronous operation.
|
||||
|
@ -285,6 +309,13 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<NamespaceDescriptor[]> listNamespaceDescriptors();
|
||||
|
||||
/**
|
||||
* @param tableName name of table to check
|
||||
* @return true if table is on-line. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> isTableEnabled(TableName tableName);
|
||||
|
||||
/**
|
||||
* Turn the load balancer on or off.
|
||||
* @param on
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -44,9 +45,9 @@ import org.apache.hadoop.hbase.NotServingRegionException;
|
|||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -451,6 +452,112 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isTableEnabled(TableName tableName) {
|
||||
CompletableFuture<Boolean> future = new CompletableFuture<>();
|
||||
AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
|
||||
if (error != null) {
|
||||
future.completeExceptionally(error);
|
||||
return;
|
||||
}
|
||||
if (state.isPresent()) {
|
||||
future.complete(state.get().inStates(TableState.State.ENABLED));
|
||||
} else {
|
||||
future.completeExceptionally(new TableNotFoundException(tableName));
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
|
||||
CompletableFuture<Boolean> future = new CompletableFuture<>();
|
||||
AsyncMetaTableAccessor.getTableState(metaTable, tableName).whenComplete((state, error) -> {
|
||||
if (error != null) {
|
||||
future.completeExceptionally(error);
|
||||
return;
|
||||
}
|
||||
if (state.isPresent()) {
|
||||
future.complete(state.get().inStates(TableState.State.DISABLED));
|
||||
} else {
|
||||
future.completeExceptionally(new TableNotFoundException(tableName));
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
|
||||
return isTableAvailable(tableName, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isTableAvailable(TableName tableName, byte[][] splitKeys) {
|
||||
CompletableFuture<Boolean> future = new CompletableFuture<>();
|
||||
isTableEnabled(tableName).whenComplete(
|
||||
(enabled, error) -> {
|
||||
if (error != null) {
|
||||
future.completeExceptionally(error);
|
||||
return;
|
||||
}
|
||||
if (!enabled) {
|
||||
future.complete(false);
|
||||
} else {
|
||||
AsyncMetaTableAccessor.getTableRegionsAndLocations(metaTable, Optional.of(tableName))
|
||||
.whenComplete(
|
||||
(locations, error1) -> {
|
||||
if (error1 != null) {
|
||||
future.completeExceptionally(error1);
|
||||
return;
|
||||
}
|
||||
int notDeployed = 0;
|
||||
int regionCount = 0;
|
||||
for (Pair<HRegionInfo, ServerName> pair : locations) {
|
||||
HRegionInfo info = pair.getFirst();
|
||||
if (pair.getSecond() == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Table " + tableName + " has not deployed region "
|
||||
+ pair.getFirst().getEncodedName());
|
||||
}
|
||||
notDeployed++;
|
||||
} else if (splitKeys != null
|
||||
&& !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
|
||||
for (byte[] splitKey : splitKeys) {
|
||||
// Just check if the splitkey is available
|
||||
if (Bytes.equals(info.getStartKey(), splitKey)) {
|
||||
regionCount++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Always empty start row should be counted
|
||||
regionCount++;
|
||||
}
|
||||
}
|
||||
if (notDeployed > 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Table " + tableName + " has " + notDeployed + " regions");
|
||||
}
|
||||
future.complete(false);
|
||||
} else if (splitKeys != null && regionCount != splitKeys.length + 1) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Table " + tableName + " expected to have "
|
||||
+ (splitKeys.length + 1) + " regions, but only " + regionCount
|
||||
+ " available");
|
||||
}
|
||||
future.complete(false);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Table " + tableName + " should be available");
|
||||
}
|
||||
future.complete(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) {
|
||||
return this
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
|
||||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
@ -213,7 +216,7 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
|
|||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testCreateTableWithRegions() throws IOException, InterruptedException {
|
||||
public void testCreateTableWithRegions() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
|
||||
byte[][] splitKeys = { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 },
|
||||
|
@ -225,6 +228,9 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
|
|||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
admin.createTable(desc, splitKeys).join();
|
||||
|
||||
boolean tableAvailable = admin.isTableAvailable(tableName, splitKeys).get();
|
||||
assertTrue("Table should be created with splitKyes + 1 rows in META", tableAvailable);
|
||||
|
||||
List<HRegionLocation> regions;
|
||||
Iterator<HRegionLocation> hris;
|
||||
HRegionInfo hri;
|
||||
|
@ -835,4 +841,30 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsTableEnabledAndDisabled() throws Exception {
|
||||
final TableName table = TableName.valueOf("testIsTableEnabledAndDisabled");
|
||||
HTableDescriptor desc = new HTableDescriptor(table);
|
||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||
admin.createTable(desc).join();
|
||||
assertTrue(admin.isTableEnabled(table).get());
|
||||
assertFalse(admin.isTableDisabled(table).get());
|
||||
admin.disableTable(table).join();
|
||||
assertFalse(admin.isTableEnabled(table).get());
|
||||
assertTrue(admin.isTableDisabled(table).get());
|
||||
admin.deleteTable(table).join();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableAvailableWithRandomSplitKeys() throws Exception {
|
||||
TableName tableName = TableName.valueOf("testTableAvailableWithRandomSplitKeys");
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addFamily(new HColumnDescriptor("col"));
|
||||
byte[][] splitKeys = new byte[1][];
|
||||
splitKeys = new byte[][] { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 } };
|
||||
admin.createTable(desc).join();
|
||||
boolean tableAvailable = admin.isTableAvailable(tableName, splitKeys).get();
|
||||
assertFalse("Table should be created with 1 row in META", tableAvailable);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue