HBASE-17646: Implement Async getRegion method
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
613bcb3622
commit
697a55a878
|
@ -27,12 +27,15 @@ import java.util.concurrent.CompletableFuture;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.AsyncConnection;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.RawAsyncTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableState;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* The asynchronous meta table accessor. Used to read/write region and assignment information store
|
||||
|
@ -74,6 +77,37 @@ public class AsyncMetaTableAccessor {
|
|||
return future;
|
||||
}
|
||||
|
||||
public static CompletableFuture<Pair<HRegionInfo, ServerName>> getRegion(RawAsyncTable metaTable,
|
||||
byte[] regionName) {
|
||||
CompletableFuture<Pair<HRegionInfo, ServerName>> future = new CompletableFuture<>();
|
||||
byte[] row = regionName;
|
||||
HRegionInfo parsedInfo = null;
|
||||
try {
|
||||
parsedInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
|
||||
row = MetaTableAccessor.getMetaKeyForRegion(parsedInfo);
|
||||
} catch (Exception parseEx) {
|
||||
// Ignore if regionName is a encoded region name.
|
||||
}
|
||||
|
||||
final HRegionInfo finalHRI = parsedInfo;
|
||||
metaTable.get(new Get(row).addFamily(HConstants.CATALOG_FAMILY)).whenComplete((r, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
RegionLocations locations = MetaTableAccessor.getRegionLocations(r);
|
||||
HRegionLocation hrl = locations == null ? null
|
||||
: locations.getRegionLocation(finalHRI == null ? 0 : finalHRI.getReplicaId());
|
||||
if (hrl == null) {
|
||||
future.complete(null);
|
||||
} else {
|
||||
future.complete(new Pair<>(hrl.getRegionInfo(), hrl.getServerName()));
|
||||
}
|
||||
});
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
private static Optional<TableState> getTableState(Result r) throws IOException {
|
||||
Cell cell = r.getColumnLatestCell(getTableFamily(), getStateColumn());
|
||||
if (cell == null) return Optional.empty();
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NU
|
|||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
|
||||
|
@ -31,6 +32,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
|
|||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
|
||||
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
|
||||
|
@ -81,6 +83,8 @@ class AsyncConnectionConfiguration {
|
|||
|
||||
private final int scannerCaching;
|
||||
|
||||
private final int metaScannerCaching;
|
||||
|
||||
private final long scannerMaxResultSize;
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
|
@ -105,6 +109,7 @@ class AsyncConnectionConfiguration {
|
|||
HBASE_REGIONSERVER_LEASE_PERIOD_KEY, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
|
||||
this.scannerCaching =
|
||||
conf.getInt(HBASE_CLIENT_SCANNER_CACHING, DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
|
||||
this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING);
|
||||
this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
||||
DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
||||
}
|
||||
|
@ -149,6 +154,10 @@ class AsyncConnectionConfiguration {
|
|||
return scannerCaching;
|
||||
}
|
||||
|
||||
int getMetaScannerCaching(){
|
||||
return metaScannerCaching;
|
||||
}
|
||||
|
||||
long getScannerMaxResultSize() {
|
||||
return scannerMaxResultSize;
|
||||
}
|
||||
|
|
|
@ -32,16 +32,22 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.AdminRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
|
||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
@ -189,12 +195,10 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
if (controller.failed()) {
|
||||
future.completeExceptionally(new IOException(controller.errorText()));
|
||||
} else {
|
||||
if (respConverter != null) {
|
||||
try {
|
||||
future.complete(respConverter.convert(resp));
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
try {
|
||||
future.complete(respConverter.convert(resp));
|
||||
} catch (IOException e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -507,8 +511,81 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> closeRegion(byte[] regionname, String serverName) {
|
||||
throw new UnsupportedOperationException("closeRegion method depends on getRegion API, will support soon.");
|
||||
public CompletableFuture<Void> closeRegion(byte[] regionName, String serverName) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
getRegion(regionName).whenComplete((p, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
if (p == null || p.getFirst() == null) {
|
||||
future.completeExceptionally(new UnknownRegionException(Bytes.toStringBinary(regionName)));
|
||||
return;
|
||||
}
|
||||
if (serverName != null) {
|
||||
closeRegion(ServerName.valueOf(serverName), p.getFirst()).whenComplete((p2, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
}else{
|
||||
future.complete(null);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
if (p.getSecond() == null) {
|
||||
future.completeExceptionally(new NotServingRegionException(regionName));
|
||||
} else {
|
||||
closeRegion(p.getSecond(), p.getFirst()).whenComplete((p2, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
}else{
|
||||
future.complete(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
CompletableFuture<Pair<HRegionInfo, ServerName>> getRegion(byte[] regionName) {
|
||||
if (regionName == null) {
|
||||
return failedFuture(new IllegalArgumentException("Pass region name"));
|
||||
}
|
||||
CompletableFuture<Pair<HRegionInfo, ServerName>> future = new CompletableFuture<>();
|
||||
AsyncMetaTableAccessor.getRegion(metaTable, regionName).whenComplete(
|
||||
(p, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
} else if (p != null) {
|
||||
future.complete(p);
|
||||
} else {
|
||||
metaTable.scanAll(
|
||||
new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY))
|
||||
.whenComplete((results, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
return;
|
||||
}
|
||||
String encodedName = Bytes.toString(regionName);
|
||||
if (results != null && !results.isEmpty()) {
|
||||
for (Result r : results) {
|
||||
if (r.isEmpty() || MetaTableAccessor.getHRegionInfo(r) == null) continue;
|
||||
RegionLocations rl = MetaTableAccessor.getRegionLocations(r);
|
||||
if (rl != null) {
|
||||
for (HRegionLocation h : rl.getRegionLocations()) {
|
||||
if (h != null && encodedName.equals(h.getRegionInfo().getEncodedName())) {
|
||||
future.complete(new Pair<>(h.getRegionInfo(), h.getServerName()));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
future.complete(null);
|
||||
});
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -530,7 +607,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
.action(
|
||||
(controller, stub) -> this.<CloseRegionRequest, CloseRegionResponse, Void> adminCall(
|
||||
controller, stub, ProtobufUtil.buildCloseRegionRequest(sn, hri.getRegionName()),
|
||||
(s, c, req, done) -> s.closeRegion(controller, req, done), null))
|
||||
(s, c, req, done) -> s.closeRegion(controller, req, done), resp -> null))
|
||||
.serverName(sn).call();
|
||||
}
|
||||
|
||||
|
|
|
@ -102,7 +102,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
|
|||
this.pauseNs = builder.pauseNs;
|
||||
this.maxAttempts = builder.maxAttempts;
|
||||
this.startLogErrorsCnt = builder.startLogErrorsCnt;
|
||||
this.defaultScannerCaching = conn.connConf.getScannerCaching();
|
||||
this.defaultScannerCaching = tableName.isSystemTable() ? conn.connConf.getMetaScannerCaching()
|
||||
: conn.connConf.getScannerCaching();
|
||||
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
|
||||
}
|
||||
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -963,6 +964,35 @@ public class TestAsyncAdmin {
|
|||
onlineRegions.contains(info));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseRegionThatFetchesTheHRIFromMeta() throws Exception {
|
||||
TableName TABLENAME = TableName.valueOf("TestHBACloseRegion2");
|
||||
createTableWithDefaultConf(TABLENAME);
|
||||
|
||||
HRegionInfo info = null;
|
||||
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TABLENAME);
|
||||
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
|
||||
for (HRegionInfo regionInfo : onlineRegions) {
|
||||
if (!regionInfo.isMetaTable()) {
|
||||
|
||||
if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion2")) {
|
||||
info = regionInfo;
|
||||
admin.closeRegion(regionInfo.getRegionNameAsString(), rs.getServerName().getServerName())
|
||||
.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean isInList = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()).contains(info);
|
||||
long timeout = System.currentTimeMillis() + 10000;
|
||||
while ((System.currentTimeMillis() < timeout) && (isInList)) {
|
||||
Thread.sleep(100);
|
||||
isInList = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()).contains(info);
|
||||
}
|
||||
|
||||
assertFalse("The region should not be present in online regions list.", isInList);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseRegionWhenServerNameIsNull() throws Exception {
|
||||
byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegion3");
|
||||
|
@ -1035,4 +1065,23 @@ public class TestAsyncAdmin {
|
|||
assertTrue("The region should be present in online regions list.",
|
||||
onlineRegions.contains(info));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRegion() throws Exception {
|
||||
AsyncHBaseAdmin rawAdmin = (AsyncHBaseAdmin) admin;
|
||||
|
||||
final TableName tableName = TableName.valueOf("testGetRegion");
|
||||
LOG.info("Started " + tableName);
|
||||
TEST_UTIL.createMultiRegionTable(tableName, HConstants.CATALOG_FAMILY);
|
||||
|
||||
try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
|
||||
HRegionLocation regionLocation = locator.getRegionLocation(Bytes.toBytes("mmm"));
|
||||
HRegionInfo region = regionLocation.getRegionInfo();
|
||||
byte[] regionName = region.getRegionName();
|
||||
Pair<HRegionInfo, ServerName> pair = rawAdmin.getRegion(regionName).get();
|
||||
assertTrue(Bytes.equals(regionName, pair.getFirst().getRegionName()));
|
||||
pair = rawAdmin.getRegion(region.getEncodedNameAsBytes()).get();
|
||||
assertTrue(Bytes.equals(regionName, pair.getFirst().getRegionName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue