HBASE-18319 Implement getClusterStatus/getRegionLoad/getCompactionState/getLastMajorCompactionTimestamp methods
This commit is contained in:
parent
89d2adfe92
commit
359f97711f
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
@ -24,8 +25,10 @@ import java.util.Optional;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||
import org.apache.hadoop.hbase.RegionLoad;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -331,6 +334,11 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName);
|
||||
|
||||
/**
|
||||
* Get the regions of a given table.
|
||||
*/
|
||||
CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName);
|
||||
|
||||
/**
|
||||
* Flush a table.
|
||||
* @param tableName table to flush
|
||||
|
@ -796,4 +804,91 @@ public interface AsyncAdmin {
|
|||
* @return procedure list wrapped by {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<List<ProcedureInfo>> listProcedures();
|
||||
|
||||
/**
|
||||
* @return cluster status wrapped by {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<ClusterStatus> getClusterStatus();
|
||||
|
||||
/**
|
||||
* @return current master server name wrapped by {@link CompletableFuture}
|
||||
*/
|
||||
default CompletableFuture<ServerName> getMaster() {
|
||||
return getClusterStatus().thenApply(ClusterStatus::getMaster);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return current backup master list wrapped by {@link CompletableFuture}
|
||||
*/
|
||||
default CompletableFuture<Collection<ServerName>> getBackupMasters() {
|
||||
return getClusterStatus().thenApply(ClusterStatus::getBackupMasters);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return current live region servers list wrapped by {@link CompletableFuture}
|
||||
*/
|
||||
default CompletableFuture<Collection<ServerName>> getRegionServers() {
|
||||
return getClusterStatus().thenApply(ClusterStatus::getServers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of {@link RegionLoad} of all regions hosted on a region seerver.
|
||||
* @param serverName
|
||||
* @return a list of {@link RegionLoad} wrapped by {@link CompletableFuture}
|
||||
*/
|
||||
default CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName) {
|
||||
return getRegionLoads(serverName, Optional.empty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of {@link RegionLoad} of all regions hosted on a region seerver for a table.
|
||||
* @param serverName
|
||||
* @param tableName
|
||||
* @return a list of {@link RegionLoad} wrapped by {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
|
||||
Optional<TableName> tableName);
|
||||
|
||||
/**
|
||||
* Check whether master is in maintenance mode
|
||||
* @return true if master is in maintenance mode, false otherwise. The return value will be
|
||||
* wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<Boolean> isMasterInMaintenanceMode();
|
||||
|
||||
/**
|
||||
* Get the current compaction state of a table. It could be in a major compaction, a minor
|
||||
* compaction, both, or none.
|
||||
* @param tableName table to examine
|
||||
* @return the current compaction state wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<CompactionState> getCompactionState(TableName tableName);
|
||||
|
||||
/**
|
||||
* Get the current compaction state of region. It could be in a major compaction, a minor
|
||||
* compaction, both, or none.
|
||||
* @param regionName region to examine
|
||||
* @return the current compaction state wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName);
|
||||
|
||||
/**
|
||||
* Get the timestamp of the last major compaction for the passed table.
|
||||
* <p>
|
||||
* The timestamp of the oldest HFile resulting from a major compaction of that table, or not
|
||||
* present if no such HFile could be found.
|
||||
* @param tableName table to examine
|
||||
* @return the last major compaction timestamp wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName);
|
||||
|
||||
/**
|
||||
* Get the timestamp of the last major compaction for the passed region.
|
||||
* <p>
|
||||
* The timestamp of the oldest HFile resulting from a major compaction of that region, or not
|
||||
* present if no such HFile could be found.
|
||||
* @param regionName region to examine
|
||||
* @return the last major compaction timestamp wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(byte[] regionName);
|
||||
}
|
||||
|
|
|
@ -27,8 +27,10 @@ import java.util.regex.Pattern;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||
import org.apache.hadoop.hbase.RegionLoad;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -224,6 +226,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
return wrap(rawAdmin.getOnlineRegions(serverName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName) {
|
||||
return wrap(rawAdmin.getTableRegions(tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> flush(TableName tableName) {
|
||||
return wrap(rawAdmin.flush(tableName));
|
||||
|
@ -445,4 +452,41 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
public CompletableFuture<List<ProcedureInfo>> listProcedures() {
|
||||
return wrap(rawAdmin.listProcedures());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ClusterStatus> getClusterStatus() {
|
||||
return wrap(rawAdmin.getClusterStatus());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
|
||||
Optional<TableName> tableName) {
|
||||
return wrap(rawAdmin.getRegionLoads(serverName, tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
|
||||
return wrap(rawAdmin.isMasterInMaintenanceMode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CompactionState> getCompactionState(TableName tableName) {
|
||||
return wrap(rawAdmin.getCompactionState(tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
|
||||
return wrap(rawAdmin.getCompactionStateForRegion(regionName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
|
||||
return wrap(rawAdmin.getLastMajorCompactionTimestamp(tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
|
||||
byte[] regionName) {
|
||||
return wrap(rawAdmin.getLastMajorCompactionTimestampForRegion(regionName));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,12 +46,14 @@ import java.util.stream.Stream;
|
|||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||
import org.apache.hadoop.hbase.RegionLoad;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
|
@ -89,10 +91,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
|
||||
|
@ -115,6 +122,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColu
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
|
||||
|
@ -133,6 +142,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
|
@ -141,6 +152,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamesp
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
|
||||
|
@ -178,7 +192,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.*;
|
||||
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
|
||||
|
@ -728,14 +742,26 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
|
||||
public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName) {
|
||||
return this.<List<HRegionInfo>> newAdminCaller()
|
||||
.action((controller, stub) -> this
|
||||
.<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall(
|
||||
controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
|
||||
(s, c, req, done) -> s.getOnlineRegion(c, req, done),
|
||||
resp -> ProtobufUtil.getRegionInfos(resp)))
|
||||
.serverName(sn).call();
|
||||
.serverName(serverName).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName) {
|
||||
if (tableName.equals(META_TABLE_NAME)) {
|
||||
return connection.getLocator().getRegionLocation(tableName, null, null, operationTimeoutNs)
|
||||
.thenApply(loc -> Arrays.asList(loc.getRegionInfo()));
|
||||
} else {
|
||||
return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
|
||||
.thenApply(
|
||||
locs -> locs.stream().map(loc -> loc.getRegionInfo()).collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2275,4 +2301,189 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ClusterStatus> getClusterStatus() {
|
||||
return this
|
||||
.<ClusterStatus> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<GetClusterStatusRequest, GetClusterStatusResponse, ClusterStatus> call(controller,
|
||||
stub, RequestConverter.buildGetClusterStatusRequest(),
|
||||
(s, c, req, done) -> s.getClusterStatus(c, req, done),
|
||||
resp -> ProtobufUtil.convert(resp.getClusterStatus()))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
|
||||
Optional<TableName> tableName) {
|
||||
return this
|
||||
.<List<RegionLoad>> newAdminCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionLoad>> adminCall(
|
||||
controller, stub, RequestConverter.buildGetRegionLoadRequest(tableName), (s, c,
|
||||
req, done) -> s.getRegionLoad(controller, req, done),
|
||||
ProtobufUtil::getRegionLoadInfo)).serverName(serverName).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
|
||||
return this
|
||||
.<Boolean> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
|
||||
stub, IsInMaintenanceModeRequest.newBuilder().build(),
|
||||
(s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
|
||||
resp -> resp.getInMaintenanceMode())).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CompactionState> getCompactionState(TableName tableName) {
|
||||
CompletableFuture<CompactionState> future = new CompletableFuture<>();
|
||||
getTableHRegionLocations(tableName).whenComplete(
|
||||
(locations, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
List<CompactionState> regionStates = new ArrayList<>();
|
||||
List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
|
||||
locations.stream().filter(loc -> loc.getServerName() != null)
|
||||
.filter(loc -> loc.getRegionInfo() != null)
|
||||
.filter(loc -> !loc.getRegionInfo().isOffline())
|
||||
.map(loc -> loc.getRegionInfo().getRegionName()).forEach(region -> {
|
||||
futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
|
||||
// If any region compaction state is MAJOR_AND_MINOR
|
||||
// the table compaction state is MAJOR_AND_MINOR, too.
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else if (regionState == CompactionState.MAJOR_AND_MINOR) {
|
||||
|
||||
future.complete(regionState);
|
||||
} else {
|
||||
regionStates.add(regionState);
|
||||
}
|
||||
}));
|
||||
});
|
||||
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
|
||||
.whenComplete((ret, err3) -> {
|
||||
// If future not completed, check all regions's compaction state
|
||||
if (!future.isCompletedExceptionally() && !future.isDone()) {
|
||||
CompactionState state = CompactionState.NONE;
|
||||
for (CompactionState regionState : regionStates) {
|
||||
switch (regionState) {
|
||||
case MAJOR:
|
||||
if (state == CompactionState.MINOR) {
|
||||
future.complete(CompactionState.MAJOR_AND_MINOR);
|
||||
} else {
|
||||
state = CompactionState.MAJOR;
|
||||
}
|
||||
break;
|
||||
case MINOR:
|
||||
if (state == CompactionState.MAJOR) {
|
||||
future.complete(CompactionState.MAJOR_AND_MINOR);
|
||||
} else {
|
||||
state = CompactionState.MINOR;
|
||||
}
|
||||
break;
|
||||
case NONE:
|
||||
default:
|
||||
}
|
||||
if (!future.isDone()) {
|
||||
future.complete(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
|
||||
CompletableFuture<CompactionState> future = new CompletableFuture<>();
|
||||
getRegionLocation(regionName).whenComplete(
|
||||
(location, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
ServerName serverName = location.getServerName();
|
||||
if (serverName == null) {
|
||||
future.completeExceptionally(new NoServerForRegionException(Bytes
|
||||
.toStringBinary(regionName)));
|
||||
return;
|
||||
}
|
||||
this.<GetRegionInfoResponse> newAdminCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
|
||||
controller, stub, RequestConverter.buildGetRegionInfoRequest(location
|
||||
.getRegionInfo().getRegionName(), true), (s, c, req, done) -> s
|
||||
.getRegionInfo(controller, req, done), resp -> resp))
|
||||
.serverName(serverName).call().whenComplete((resp2, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else {
|
||||
if (resp2.hasCompactionState()) {
|
||||
future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
|
||||
} else {
|
||||
future.complete(CompactionState.NONE);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
|
||||
MajorCompactionTimestampRequest request =
|
||||
MajorCompactionTimestampRequest.newBuilder()
|
||||
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
|
||||
return this
|
||||
.<Optional<Long>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
|
||||
controller, stub, request,
|
||||
(s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
|
||||
ProtobufUtil::toOptionalTimestamp)).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
|
||||
byte[] regionName) {
|
||||
CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
|
||||
// regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
|
||||
getRegionInfo(regionName)
|
||||
.whenComplete(
|
||||
(region, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
MajorCompactionTimestampForRegionRequest.Builder builder =
|
||||
MajorCompactionTimestampForRegionRequest.newBuilder();
|
||||
builder.setRegion(RequestConverter.buildRegionSpecifier(
|
||||
RegionSpecifierType.REGION_NAME, regionName));
|
||||
this.<Optional<Long>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<MajorCompactionTimestampForRegionRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
|
||||
controller, stub, builder.build(), (s, c, req, done) -> s
|
||||
.getLastMajorCompactionTimestampForRegion(c, req, done),
|
||||
ProtobufUtil::toOptionalTimestamp)).call()
|
||||
.whenComplete((timestamp, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else {
|
||||
future.complete(timestamp);
|
||||
}
|
||||
});
|
||||
});
|
||||
return future;
|
||||
}
|
||||
}
|
|
@ -165,6 +165,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedure;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
|
||||
|
@ -1806,7 +1807,8 @@ public final class ProtobufUtil {
|
|||
public static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoad(
|
||||
final RpcController controller, final AdminService.BlockingInterface admin,
|
||||
final TableName tableName) throws IOException {
|
||||
GetRegionLoadRequest request = RequestConverter.buildGetRegionLoadRequest(tableName);
|
||||
GetRegionLoadRequest request =
|
||||
RequestConverter.buildGetRegionLoadRequest(Optional.ofNullable(tableName));
|
||||
GetRegionLoadResponse response;
|
||||
try {
|
||||
response = admin.getRegionLoad(controller, request);
|
||||
|
@ -1816,7 +1818,7 @@ public final class ProtobufUtil {
|
|||
return getRegionLoadInfo(response);
|
||||
}
|
||||
|
||||
static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoadInfo(
|
||||
public static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoadInfo(
|
||||
GetRegionLoadResponse regionLoadResponse) {
|
||||
List<org.apache.hadoop.hbase.RegionLoad> regionLoadList =
|
||||
new ArrayList<>(regionLoadResponse.getRegionLoadsCount());
|
||||
|
@ -3066,6 +3068,11 @@ public final class ProtobufUtil {
|
|||
return CompactionState.valueOf(state.toString());
|
||||
}
|
||||
|
||||
public static Optional<Long> toOptionalTimestamp(MajorCompactionTimestampResponse resp) {
|
||||
long timestamp = resp.getCompactionTimestamp();
|
||||
return timestamp == 0 ? Optional.empty() : Optional.of(timestamp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates {@link org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type}
|
||||
* from {@link SnapshotType}
|
||||
|
|
|
@ -796,15 +796,23 @@ public final class RequestConverter {
|
|||
|
||||
/**
|
||||
* Create a protocol buffer GetRegionLoadRequest for all regions/regions of a table.
|
||||
*
|
||||
* @param tableName the table for which regionLoad should be obtained from RS
|
||||
* @return a protocol buffer GetRegionLoadRequest
|
||||
* @deprecated use {@link #buildGetRegionLoadRequest(Optional)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
public static GetRegionLoadRequest buildGetRegionLoadRequest(final TableName tableName) {
|
||||
return buildGetRegionLoadRequest(Optional.ofNullable(tableName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a protocol buffer GetRegionLoadRequest for all regions/regions of a table.
|
||||
* @param tableName the table for which regionLoad should be obtained from RS
|
||||
* @return a protocol buffer GetRegionLoadRequest
|
||||
*/
|
||||
public static GetRegionLoadRequest buildGetRegionLoadRequest(final TableName tableName) {
|
||||
public static GetRegionLoadRequest buildGetRegionLoadRequest(Optional<TableName> tableName) {
|
||||
GetRegionLoadRequest.Builder builder = GetRegionLoadRequest.newBuilder();
|
||||
if (tableName != null) {
|
||||
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
|
||||
}
|
||||
tableName.ifPresent(table -> builder.setTableName(ProtobufUtil.toProtoTableName(table)));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.RegionLoad;
|
||||
import org.apache.hadoop.hbase.ServerLoad;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@Category({ MiscTests.class, MediumTests.class })
|
||||
public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
|
||||
|
||||
@Test
|
||||
public void testRegionLoad() throws Exception {
|
||||
// Turn off the balancer
|
||||
admin.setBalancerOn(false).join();
|
||||
TableName[] tables =
|
||||
new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"),
|
||||
TableName.valueOf(tableName.getNameAsString() + "2"),
|
||||
TableName.valueOf(tableName.getNameAsString() + "3") };
|
||||
createAndLoadTable(tables);
|
||||
// Check if regions match with the regionLoad from the server
|
||||
Collection<ServerName> servers = admin.getRegionServers().get();
|
||||
for (ServerName serverName : servers) {
|
||||
List<HRegionInfo> regions = admin.getOnlineRegions(serverName).get();
|
||||
checkRegionsAndRegionLoads(regions, admin.getRegionLoads(serverName).get());
|
||||
}
|
||||
|
||||
// Check if regionLoad matches the table's regions and nothing is missed
|
||||
for (TableName table : tables) {
|
||||
List<HRegionInfo> tableRegions = admin.getTableRegions(table).get();
|
||||
List<RegionLoad> regionLoads = Lists.newArrayList();
|
||||
for (ServerName serverName : servers) {
|
||||
regionLoads.addAll(admin.getRegionLoads(serverName, Optional.of(table)).get());
|
||||
}
|
||||
checkRegionsAndRegionLoads(tableRegions, regionLoads);
|
||||
}
|
||||
|
||||
// Check RegionLoad matches the regionLoad from ClusterStatus
|
||||
ClusterStatus clusterStatus = admin.getClusterStatus().get();
|
||||
for (ServerName serverName : clusterStatus.getServers()) {
|
||||
ServerLoad serverLoad = clusterStatus.getLoad(serverName);
|
||||
compareRegionLoads(serverLoad.getRegionsLoad().values(), admin.getRegionLoads(serverName)
|
||||
.get());
|
||||
}
|
||||
}
|
||||
|
||||
private void compareRegionLoads(Collection<RegionLoad> regionLoadCluster,
|
||||
Collection<RegionLoad> regionLoads) {
|
||||
|
||||
assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match",
|
||||
regionLoadCluster.size(), regionLoads.size());
|
||||
|
||||
for (RegionLoad loadCluster : regionLoadCluster) {
|
||||
boolean matched = false;
|
||||
for (RegionLoad load : regionLoads) {
|
||||
if (Bytes.equals(loadCluster.getName(), load.getName())) {
|
||||
matched = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
assertTrue("The contents of region load from cluster and server should match", matched);
|
||||
}
|
||||
}
|
||||
|
||||
private void checkRegionsAndRegionLoads(Collection<HRegionInfo> regions,
|
||||
Collection<RegionLoad> regionLoads) {
|
||||
|
||||
assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size());
|
||||
|
||||
Map<byte[], RegionLoad> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
|
||||
for (RegionLoad regionLoad : regionLoads) {
|
||||
regionLoadMap.put(regionLoad.getName(), regionLoad);
|
||||
}
|
||||
for (HRegionInfo info : regions) {
|
||||
assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString()
|
||||
+ " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName()));
|
||||
}
|
||||
}
|
||||
|
||||
private void createAndLoadTable(TableName[] tables) {
|
||||
for (TableName table : tables) {
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table);
|
||||
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
|
||||
admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join();
|
||||
RawAsyncTable asyncTable = ASYNC_CONN.getRawTable(table);
|
||||
List<Put> puts = new ArrayList<>();
|
||||
for (byte[] row : HBaseTestingUtility.ROWS) {
|
||||
puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v")));
|
||||
}
|
||||
asyncTable.putAll(puts).join();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -515,10 +515,10 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
|
|||
long curt = System.currentTimeMillis();
|
||||
long waitTime = 5000;
|
||||
long endt = curt + waitTime;
|
||||
CompactionState state = TEST_UTIL.getAdmin().getCompactionState(tableName);
|
||||
CompactionState state = admin.getCompactionState(tableName).get();
|
||||
while (state == CompactionState.NONE && curt < endt) {
|
||||
Thread.sleep(10);
|
||||
state = TEST_UTIL.getAdmin().getCompactionState(tableName);
|
||||
state = admin.getCompactionState(tableName).get();
|
||||
curt = System.currentTimeMillis();
|
||||
}
|
||||
// Now, should have the right compaction state,
|
||||
|
@ -530,10 +530,10 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
|
|||
}
|
||||
} else {
|
||||
// Wait until the compaction is done
|
||||
state = TEST_UTIL.getAdmin().getCompactionState(tableName);
|
||||
state = admin.getCompactionState(tableName).get();
|
||||
while (state != CompactionState.NONE && curt < endt) {
|
||||
Thread.sleep(10);
|
||||
state = TEST_UTIL.getAdmin().getCompactionState(tableName);
|
||||
state = admin.getCompactionState(tableName).get();
|
||||
}
|
||||
// Now, compaction should be done.
|
||||
assertEquals(CompactionState.NONE, state);
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.util.regex.Pattern;
|
|||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
|
@ -774,4 +775,82 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
|
|||
boolean tableAvailable = admin.isTableAvailable(tableName, splitKeys).get();
|
||||
assertFalse("Table should be created with 1 row in META", tableAvailable);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionTimestamps() throws Exception {
|
||||
createTableWithDefaultConf(tableName);
|
||||
RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
|
||||
Optional<Long> ts = admin.getLastMajorCompactionTimestamp(tableName).get();
|
||||
assertFalse(ts.isPresent());
|
||||
Put p = new Put(Bytes.toBytes("row1"));
|
||||
p.addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"));
|
||||
table.put(p).join();
|
||||
ts = admin.getLastMajorCompactionTimestamp(tableName).get();
|
||||
// no files written -> no data
|
||||
assertFalse(ts.isPresent());
|
||||
|
||||
admin.flush(tableName).join();
|
||||
ts = admin.getLastMajorCompactionTimestamp(tableName).get();
|
||||
// still 0, we flushed a file, but no major compaction happened
|
||||
assertFalse(ts.isPresent());
|
||||
|
||||
byte[] regionName =
|
||||
ASYNC_CONN.getRegionLocator(tableName).getRegionLocation(Bytes.toBytes("row1")).get()
|
||||
.getRegionInfo().getRegionName();
|
||||
Optional<Long> ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get();
|
||||
assertFalse(ts1.isPresent());
|
||||
p = new Put(Bytes.toBytes("row2"));
|
||||
p.addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"));
|
||||
table.put(p).join();
|
||||
admin.flush(tableName).join();
|
||||
ts1 = admin.getLastMajorCompactionTimestamp(tableName).get();
|
||||
// make sure the region API returns the same value, as the old file is still around
|
||||
assertFalse(ts1.isPresent());
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
table.put(p).join();
|
||||
admin.flush(tableName).join();
|
||||
}
|
||||
admin.majorCompact(tableName).join();
|
||||
long curt = System.currentTimeMillis();
|
||||
long waitTime = 10000;
|
||||
long endt = curt + waitTime;
|
||||
CompactionState state = admin.getCompactionState(tableName).get();
|
||||
LOG.info("Current compaction state 1 is " + state);
|
||||
while (state == CompactionState.NONE && curt < endt) {
|
||||
Thread.sleep(100);
|
||||
state = admin.getCompactionState(tableName).get();
|
||||
curt = System.currentTimeMillis();
|
||||
LOG.info("Current compaction state 2 is " + state);
|
||||
}
|
||||
// Now, should have the right compaction state, let's wait until the compaction is done
|
||||
if (state == CompactionState.MAJOR) {
|
||||
state = admin.getCompactionState(tableName).get();
|
||||
LOG.info("Current compaction state 3 is " + state);
|
||||
while (state != CompactionState.NONE && curt < endt) {
|
||||
Thread.sleep(10);
|
||||
state = admin.getCompactionState(tableName).get();
|
||||
LOG.info("Current compaction state 4 is " + state);
|
||||
}
|
||||
}
|
||||
// Sleep to wait region server report
|
||||
Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2);
|
||||
|
||||
ts = admin.getLastMajorCompactionTimestamp(tableName).get();
|
||||
// after a compaction our earliest timestamp will have progressed forward
|
||||
assertTrue(ts.isPresent());
|
||||
assertTrue(ts.get() > 0);
|
||||
// region api still the same
|
||||
ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get();
|
||||
assertTrue(ts1.isPresent());
|
||||
assertEquals(ts.get(), ts1.get());
|
||||
table.put(p).join();
|
||||
admin.flush(tableName).join();
|
||||
ts = admin.getLastMajorCompactionTimestamp(tableName).join();
|
||||
assertTrue(ts.isPresent());
|
||||
assertEquals(ts.get(), ts1.get());
|
||||
ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get();
|
||||
assertTrue(ts1.isPresent());
|
||||
assertEquals(ts.get(), ts1.get());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue