HBASE-18319 Implement getClusterStatus/getRegionLoad/getCompactionState/getLastMajorCompactionTimestamp methods

This commit is contained in:
Guanghao Zhang 2017-07-05 18:33:57 +08:00
parent 4fe7385767
commit b0a5fa0c2a
8 changed files with 592 additions and 16 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.List;
import java.util.Collection;
import java.util.Map;
@ -24,8 +25,10 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
@ -331,6 +334,11 @@ public interface AsyncAdmin {
*/
CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName);
/**
* Get the regions of a given table.
*/
CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName);
/**
* Flush a table.
* @param tableName table to flush
@ -796,4 +804,91 @@ public interface AsyncAdmin {
* @return procedure list wrapped by {@link CompletableFuture}
*/
CompletableFuture<List<ProcedureInfo>> listProcedures();
/**
* @return cluster status wrapped by {@link CompletableFuture}
*/
CompletableFuture<ClusterStatus> getClusterStatus();
/**
* @return current master server name wrapped by {@link CompletableFuture}
*/
default CompletableFuture<ServerName> getMaster() {
return getClusterStatus().thenApply(ClusterStatus::getMaster);
}
/**
* @return current backup master list wrapped by {@link CompletableFuture}
*/
default CompletableFuture<Collection<ServerName>> getBackupMasters() {
return getClusterStatus().thenApply(ClusterStatus::getBackupMasters);
}
/**
* @return current live region servers list wrapped by {@link CompletableFuture}
*/
default CompletableFuture<Collection<ServerName>> getRegionServers() {
return getClusterStatus().thenApply(ClusterStatus::getServers);
}
/**
* Get a list of {@link RegionLoad} of all regions hosted on a region seerver.
* @param serverName
* @return a list of {@link RegionLoad} wrapped by {@link CompletableFuture}
*/
default CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName) {
return getRegionLoads(serverName, Optional.empty());
}
/**
* Get a list of {@link RegionLoad} of all regions hosted on a region seerver for a table.
* @param serverName
* @param tableName
* @return a list of {@link RegionLoad} wrapped by {@link CompletableFuture}
*/
CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
Optional<TableName> tableName);
/**
* Check whether master is in maintenance mode
* @return true if master is in maintenance mode, false otherwise. The return value will be
* wrapped by a {@link CompletableFuture}
*/
CompletableFuture<Boolean> isMasterInMaintenanceMode();
/**
* Get the current compaction state of a table. It could be in a major compaction, a minor
* compaction, both, or none.
* @param tableName table to examine
* @return the current compaction state wrapped by a {@link CompletableFuture}
*/
CompletableFuture<CompactionState> getCompactionState(TableName tableName);
/**
* Get the current compaction state of region. It could be in a major compaction, a minor
* compaction, both, or none.
* @param regionName region to examine
* @return the current compaction state wrapped by a {@link CompletableFuture}
*/
CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName);
/**
* Get the timestamp of the last major compaction for the passed table.
* <p>
* The timestamp of the oldest HFile resulting from a major compaction of that table, or not
* present if no such HFile could be found.
* @param tableName table to examine
* @return the last major compaction timestamp wrapped by a {@link CompletableFuture}
*/
CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName);
/**
* Get the timestamp of the last major compaction for the passed region.
* <p>
* The timestamp of the oldest HFile resulting from a major compaction of that region, or not
* present if no such HFile could be found.
* @param regionName region to examine
* @return the last major compaction timestamp wrapped by a {@link CompletableFuture}
*/
CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(byte[] regionName);
}

View File

@ -27,8 +27,10 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
@ -224,6 +226,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.getOnlineRegions(serverName));
}
@Override
public CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName) {
return wrap(rawAdmin.getTableRegions(tableName));
}
@Override
public CompletableFuture<Void> flush(TableName tableName) {
return wrap(rawAdmin.flush(tableName));
@ -445,4 +452,41 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
public CompletableFuture<List<ProcedureInfo>> listProcedures() {
return wrap(rawAdmin.listProcedures());
}
@Override
public CompletableFuture<ClusterStatus> getClusterStatus() {
return wrap(rawAdmin.getClusterStatus());
}
@Override
public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
Optional<TableName> tableName) {
return wrap(rawAdmin.getRegionLoads(serverName, tableName));
}
@Override
public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
return wrap(rawAdmin.isMasterInMaintenanceMode());
}
@Override
public CompletableFuture<CompactionState> getCompactionState(TableName tableName) {
return wrap(rawAdmin.getCompactionState(tableName));
}
@Override
public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
return wrap(rawAdmin.getCompactionStateForRegion(regionName));
}
@Override
public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
return wrap(rawAdmin.getLastMajorCompactionTimestamp(tableName));
}
@Override
public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
byte[] regionName) {
return wrap(rawAdmin.getLastMajorCompactionTimestampForRegion(regionName));
}
}

View File

@ -46,12 +46,14 @@ import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.NamespaceDescriptor;
@ -89,10 +91,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegion
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
@ -115,6 +122,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColu
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
@ -133,6 +142,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsInMaintenanceModeResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
@ -141,6 +152,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamesp
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampForRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MergeTableRegionsResponse;
@ -178,7 +192,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Remov
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.*;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
@ -728,14 +742,26 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
}
@Override
public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName) {
return this.<List<HRegionInfo>> newAdminCaller()
.action((controller, stub) -> this
.<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall(
controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
(s, c, req, done) -> s.getOnlineRegion(c, req, done),
resp -> ProtobufUtil.getRegionInfos(resp)))
.serverName(sn).call();
.serverName(serverName).call();
}
@Override
public CompletableFuture<List<HRegionInfo>> getTableRegions(TableName tableName) {
if (tableName.equals(META_TABLE_NAME)) {
return connection.getLocator().getRegionLocation(tableName, null, null, operationTimeoutNs)
.thenApply(loc -> Arrays.asList(loc.getRegionInfo()));
} else {
return AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
.thenApply(
locs -> locs.stream().map(loc -> loc.getRegionInfo()).collect(Collectors.toList()));
}
}
@Override
@ -2275,4 +2301,189 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
}
return false;
}
}
@Override
public CompletableFuture<ClusterStatus> getClusterStatus() {
return this
.<ClusterStatus> newMasterCaller()
.action(
(controller, stub) -> this
.<GetClusterStatusRequest, GetClusterStatusResponse, ClusterStatus> call(controller,
stub, RequestConverter.buildGetClusterStatusRequest(),
(s, c, req, done) -> s.getClusterStatus(c, req, done),
resp -> ProtobufUtil.convert(resp.getClusterStatus()))).call();
}
@Override
public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
Optional<TableName> tableName) {
return this
.<List<RegionLoad>> newAdminCaller()
.action(
(controller, stub) -> this
.<GetRegionLoadRequest, GetRegionLoadResponse, List<RegionLoad>> adminCall(
controller, stub, RequestConverter.buildGetRegionLoadRequest(tableName), (s, c,
req, done) -> s.getRegionLoad(controller, req, done),
ProtobufUtil::getRegionLoadInfo)).serverName(serverName).call();
}
@Override
public CompletableFuture<Boolean> isMasterInMaintenanceMode() {
return this
.<Boolean> newMasterCaller()
.action(
(controller, stub) -> this
.<IsInMaintenanceModeRequest, IsInMaintenanceModeResponse, Boolean> call(controller,
stub, IsInMaintenanceModeRequest.newBuilder().build(),
(s, c, req, done) -> s.isMasterInMaintenanceMode(c, req, done),
resp -> resp.getInMaintenanceMode())).call();
}
@Override
public CompletableFuture<CompactionState> getCompactionState(TableName tableName) {
CompletableFuture<CompactionState> future = new CompletableFuture<>();
getTableHRegionLocations(tableName).whenComplete(
(locations, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
List<CompactionState> regionStates = new ArrayList<>();
List<CompletableFuture<CompactionState>> futures = new ArrayList<>();
locations.stream().filter(loc -> loc.getServerName() != null)
.filter(loc -> loc.getRegionInfo() != null)
.filter(loc -> !loc.getRegionInfo().isOffline())
.map(loc -> loc.getRegionInfo().getRegionName()).forEach(region -> {
futures.add(getCompactionStateForRegion(region).whenComplete((regionState, err2) -> {
// If any region compaction state is MAJOR_AND_MINOR
// the table compaction state is MAJOR_AND_MINOR, too.
if (err2 != null) {
future.completeExceptionally(err2);
} else if (regionState == CompactionState.MAJOR_AND_MINOR) {
future.complete(regionState);
} else {
regionStates.add(regionState);
}
}));
});
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
.whenComplete((ret, err3) -> {
// If future not completed, check all regions's compaction state
if (!future.isCompletedExceptionally() && !future.isDone()) {
CompactionState state = CompactionState.NONE;
for (CompactionState regionState : regionStates) {
switch (regionState) {
case MAJOR:
if (state == CompactionState.MINOR) {
future.complete(CompactionState.MAJOR_AND_MINOR);
} else {
state = CompactionState.MAJOR;
}
break;
case MINOR:
if (state == CompactionState.MAJOR) {
future.complete(CompactionState.MAJOR_AND_MINOR);
} else {
state = CompactionState.MINOR;
}
break;
case NONE:
default:
}
if (!future.isDone()) {
future.complete(state);
}
}
}
});
});
return future;
}
@Override
public CompletableFuture<CompactionState> getCompactionStateForRegion(byte[] regionName) {
CompletableFuture<CompactionState> future = new CompletableFuture<>();
getRegionLocation(regionName).whenComplete(
(location, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
ServerName serverName = location.getServerName();
if (serverName == null) {
future.completeExceptionally(new NoServerForRegionException(Bytes
.toStringBinary(regionName)));
return;
}
this.<GetRegionInfoResponse> newAdminCaller()
.action(
(controller, stub) -> this
.<GetRegionInfoRequest, GetRegionInfoResponse, GetRegionInfoResponse> adminCall(
controller, stub, RequestConverter.buildGetRegionInfoRequest(location
.getRegionInfo().getRegionName(), true), (s, c, req, done) -> s
.getRegionInfo(controller, req, done), resp -> resp))
.serverName(serverName).call().whenComplete((resp2, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
if (resp2.hasCompactionState()) {
future.complete(ProtobufUtil.createCompactionState(resp2.getCompactionState()));
} else {
future.complete(CompactionState.NONE);
}
}
});
});
return future;
}
@Override
public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestamp(TableName tableName) {
MajorCompactionTimestampRequest request =
MajorCompactionTimestampRequest.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
return this
.<Optional<Long>> newMasterCaller()
.action(
(controller, stub) -> this
.<MajorCompactionTimestampRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
controller, stub, request,
(s, c, req, done) -> s.getLastMajorCompactionTimestamp(c, req, done),
ProtobufUtil::toOptionalTimestamp)).call();
}
@Override
public CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(
byte[] regionName) {
CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
// regionName may be a full region name or encoded region name, so getRegionInfo(byte[]) first
getRegionInfo(regionName)
.whenComplete(
(region, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
MajorCompactionTimestampForRegionRequest.Builder builder =
MajorCompactionTimestampForRegionRequest.newBuilder();
builder.setRegion(RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName));
this.<Optional<Long>> newMasterCaller()
.action(
(controller, stub) -> this
.<MajorCompactionTimestampForRegionRequest, MajorCompactionTimestampResponse, Optional<Long>> call(
controller, stub, builder.build(), (s, c, req, done) -> s
.getLastMajorCompactionTimestampForRegion(c, req, done),
ProtobufUtil::toOptionalTimestamp)).call()
.whenComplete((timestamp, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(timestamp);
}
});
});
return future;
}
}

View File

@ -165,6 +165,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MajorCompactionTimestampResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedure;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
@ -1806,7 +1807,8 @@ public final class ProtobufUtil {
public static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoad(
final RpcController controller, final AdminService.BlockingInterface admin,
final TableName tableName) throws IOException {
GetRegionLoadRequest request = RequestConverter.buildGetRegionLoadRequest(tableName);
GetRegionLoadRequest request =
RequestConverter.buildGetRegionLoadRequest(Optional.ofNullable(tableName));
GetRegionLoadResponse response;
try {
response = admin.getRegionLoad(controller, request);
@ -1816,7 +1818,7 @@ public final class ProtobufUtil {
return getRegionLoadInfo(response);
}
static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoadInfo(
public static List<org.apache.hadoop.hbase.RegionLoad> getRegionLoadInfo(
GetRegionLoadResponse regionLoadResponse) {
List<org.apache.hadoop.hbase.RegionLoad> regionLoadList =
new ArrayList<>(regionLoadResponse.getRegionLoadsCount());
@ -3066,6 +3068,11 @@ public final class ProtobufUtil {
return CompactionState.valueOf(state.toString());
}
public static Optional<Long> toOptionalTimestamp(MajorCompactionTimestampResponse resp) {
long timestamp = resp.getCompactionTimestamp();
return timestamp == 0 ? Optional.empty() : Optional.of(timestamp);
}
/**
* Creates {@link org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type}
* from {@link SnapshotType}

View File

@ -796,15 +796,23 @@ public final class RequestConverter {
/**
* Create a protocol buffer GetRegionLoadRequest for all regions/regions of a table.
*
* @param tableName the table for which regionLoad should be obtained from RS
* @return a protocol buffer GetRegionLoadRequest
* @deprecated use {@link #buildGetRegionLoadRequest(Optional)} instead.
*/
@Deprecated
public static GetRegionLoadRequest buildGetRegionLoadRequest(final TableName tableName) {
return buildGetRegionLoadRequest(Optional.ofNullable(tableName));
}
/**
* Create a protocol buffer GetRegionLoadRequest for all regions/regions of a table.
* @param tableName the table for which regionLoad should be obtained from RS
* @return a protocol buffer GetRegionLoadRequest
*/
public static GetRegionLoadRequest buildGetRegionLoadRequest(final TableName tableName) {
public static GetRegionLoadRequest buildGetRegionLoadRequest(Optional<TableName> tableName) {
GetRegionLoadRequest.Builder builder = GetRegionLoadRequest.newBuilder();
if (tableName != null) {
builder.setTableName(ProtobufUtil.toProtoTableName(tableName));
}
tableName.ifPresent(table -> builder.setTableName(ProtobufUtil.toProtoTableName(table)));
return builder.build();
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@RunWith(Parameterized.class)
@Category({ MiscTests.class, MediumTests.class })
public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
@Test
public void testRegionLoad() throws Exception {
// Turn off the balancer
admin.setBalancerOn(false).join();
TableName[] tables =
new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"),
TableName.valueOf(tableName.getNameAsString() + "2"),
TableName.valueOf(tableName.getNameAsString() + "3") };
createAndLoadTable(tables);
// Check if regions match with the regionLoad from the server
Collection<ServerName> servers = admin.getRegionServers().get();
for (ServerName serverName : servers) {
List<HRegionInfo> regions = admin.getOnlineRegions(serverName).get();
checkRegionsAndRegionLoads(regions, admin.getRegionLoads(serverName).get());
}
// Check if regionLoad matches the table's regions and nothing is missed
for (TableName table : tables) {
List<HRegionInfo> tableRegions = admin.getTableRegions(table).get();
List<RegionLoad> regionLoads = Lists.newArrayList();
for (ServerName serverName : servers) {
regionLoads.addAll(admin.getRegionLoads(serverName, Optional.of(table)).get());
}
checkRegionsAndRegionLoads(tableRegions, regionLoads);
}
// Check RegionLoad matches the regionLoad from ClusterStatus
ClusterStatus clusterStatus = admin.getClusterStatus().get();
for (ServerName serverName : clusterStatus.getServers()) {
ServerLoad serverLoad = clusterStatus.getLoad(serverName);
compareRegionLoads(serverLoad.getRegionsLoad().values(), admin.getRegionLoads(serverName)
.get());
}
}
private void compareRegionLoads(Collection<RegionLoad> regionLoadCluster,
Collection<RegionLoad> regionLoads) {
assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match",
regionLoadCluster.size(), regionLoads.size());
for (RegionLoad loadCluster : regionLoadCluster) {
boolean matched = false;
for (RegionLoad load : regionLoads) {
if (Bytes.equals(loadCluster.getName(), load.getName())) {
matched = true;
continue;
}
}
assertTrue("The contents of region load from cluster and server should match", matched);
}
}
private void checkRegionsAndRegionLoads(Collection<HRegionInfo> regions,
Collection<RegionLoad> regionLoads) {
assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size());
Map<byte[], RegionLoad> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (RegionLoad regionLoad : regionLoads) {
regionLoadMap.put(regionLoad.getName(), regionLoad);
}
for (HRegionInfo info : regions) {
assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString()
+ " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName()));
}
}
private void createAndLoadTable(TableName[] tables) {
for (TableName table : tables) {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table);
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build());
admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join();
RawAsyncTable asyncTable = ASYNC_CONN.getRawTable(table);
List<Put> puts = new ArrayList<>();
for (byte[] row : HBaseTestingUtility.ROWS) {
puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v")));
}
asyncTable.putAll(puts).join();
}
}
}

View File

@ -515,10 +515,10 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
long curt = System.currentTimeMillis();
long waitTime = 5000;
long endt = curt + waitTime;
CompactionState state = TEST_UTIL.getAdmin().getCompactionState(tableName);
CompactionState state = admin.getCompactionState(tableName).get();
while (state == CompactionState.NONE && curt < endt) {
Thread.sleep(10);
state = TEST_UTIL.getAdmin().getCompactionState(tableName);
state = admin.getCompactionState(tableName).get();
curt = System.currentTimeMillis();
}
// Now, should have the right compaction state,
@ -530,10 +530,10 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
}
} else {
// Wait until the compaction is done
state = TEST_UTIL.getAdmin().getCompactionState(tableName);
state = admin.getCompactionState(tableName).get();
while (state != CompactionState.NONE && curt < endt) {
Thread.sleep(10);
state = TEST_UTIL.getAdmin().getCompactionState(tableName);
state = admin.getCompactionState(tableName).get();
}
// Now, compaction should be done.
assertEquals(CompactionState.NONE, state);

View File

@ -38,6 +38,7 @@ import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@ -774,4 +775,82 @@ public class TestAsyncTableAdminApi extends TestAsyncAdminBase {
boolean tableAvailable = admin.isTableAvailable(tableName, splitKeys).get();
assertFalse("Table should be created with 1 row in META", tableAvailable);
}
}
@Test
public void testCompactionTimestamps() throws Exception {
createTableWithDefaultConf(tableName);
RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
Optional<Long> ts = admin.getLastMajorCompactionTimestamp(tableName).get();
assertFalse(ts.isPresent());
Put p = new Put(Bytes.toBytes("row1"));
p.addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"));
table.put(p).join();
ts = admin.getLastMajorCompactionTimestamp(tableName).get();
// no files written -> no data
assertFalse(ts.isPresent());
admin.flush(tableName).join();
ts = admin.getLastMajorCompactionTimestamp(tableName).get();
// still 0, we flushed a file, but no major compaction happened
assertFalse(ts.isPresent());
byte[] regionName =
ASYNC_CONN.getRegionLocator(tableName).getRegionLocation(Bytes.toBytes("row1")).get()
.getRegionInfo().getRegionName();
Optional<Long> ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get();
assertFalse(ts1.isPresent());
p = new Put(Bytes.toBytes("row2"));
p.addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"));
table.put(p).join();
admin.flush(tableName).join();
ts1 = admin.getLastMajorCompactionTimestamp(tableName).get();
// make sure the region API returns the same value, as the old file is still around
assertFalse(ts1.isPresent());
for (int i = 0; i < 3; i++) {
table.put(p).join();
admin.flush(tableName).join();
}
admin.majorCompact(tableName).join();
long curt = System.currentTimeMillis();
long waitTime = 10000;
long endt = curt + waitTime;
CompactionState state = admin.getCompactionState(tableName).get();
LOG.info("Current compaction state 1 is " + state);
while (state == CompactionState.NONE && curt < endt) {
Thread.sleep(100);
state = admin.getCompactionState(tableName).get();
curt = System.currentTimeMillis();
LOG.info("Current compaction state 2 is " + state);
}
// Now, should have the right compaction state, let's wait until the compaction is done
if (state == CompactionState.MAJOR) {
state = admin.getCompactionState(tableName).get();
LOG.info("Current compaction state 3 is " + state);
while (state != CompactionState.NONE && curt < endt) {
Thread.sleep(10);
state = admin.getCompactionState(tableName).get();
LOG.info("Current compaction state 4 is " + state);
}
}
// Sleep to wait region server report
Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2);
ts = admin.getLastMajorCompactionTimestamp(tableName).get();
// after a compaction our earliest timestamp will have progressed forward
assertTrue(ts.isPresent());
assertTrue(ts.get() > 0);
// region api still the same
ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get();
assertTrue(ts1.isPresent());
assertEquals(ts.get(), ts1.get());
table.put(p).join();
admin.flush(tableName).join();
ts = admin.getLastMajorCompactionTimestamp(tableName).join();
assertTrue(ts.isPresent());
assertEquals(ts.get(), ts1.get());
ts1 = admin.getLastMajorCompactionTimestampForRegion(regionName).get();
assertTrue(ts1.isPresent());
assertEquals(ts.get(), ts1.get());
}
}