HBASE-18343 Track the remaining unimplemented methods for async admin
This commit is contained in:
parent
f8e892d7aa
commit
1978b78cdf
|
@ -21,6 +21,7 @@ import java.util.List;
|
|||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -32,9 +33,11 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.procedure2.LockInfo;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
||||
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
||||
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -404,6 +407,34 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<Void> majorCompactRegionServer(ServerName serverName);
|
||||
|
||||
/**
|
||||
* Turn the Merge switch on or off.
|
||||
* @param on
|
||||
* @return Previous switch value wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<Boolean> setMergeOn(boolean on);
|
||||
|
||||
/**
|
||||
* Query the current state of the Merge switch.
|
||||
* @return true if the switch is on, false otherwise. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<Boolean> isMergeOn();
|
||||
|
||||
/**
|
||||
* Turn the Split switch on or off.
|
||||
* @param on
|
||||
* @return Previous switch value wrapped by a {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<Boolean> setSplitOn(boolean on);
|
||||
|
||||
/**
|
||||
* Query the current state of the Split switch.
|
||||
* @return true if the switch is on, false otherwise. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<Boolean> isSplitOn();
|
||||
|
||||
/**
|
||||
* Merge two regions.
|
||||
* @param nameOfRegionA encoded or full name of region a
|
||||
|
@ -770,6 +801,12 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<List<ProcedureInfo>> listProcedures();
|
||||
|
||||
/**
|
||||
* List procedure locks.
|
||||
* @return lock list wrapped by {@link CompletableFuture}
|
||||
*/
|
||||
CompletableFuture<List<LockInfo>> listProcedureLocks();
|
||||
|
||||
/**
|
||||
* Mark a region server as draining to prevent additional regions from getting assigned to it.
|
||||
* @param servers
|
||||
|
@ -851,6 +888,24 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<Void> updateConfiguration();
|
||||
|
||||
/**
|
||||
* Roll the log writer. I.e. for filesystem based write ahead logs, start writing to a new file.
|
||||
* <p>
|
||||
* When the returned CompletableFuture is done, it only means the rollWALWriter request was sent
|
||||
* to the region server and may need some time to finish the rollWALWriter operation. As a side
|
||||
* effect of this call, the named region server may schedule store flushes at the request of the
|
||||
* wal.
|
||||
* @param serverName The servername of the region server.
|
||||
*/
|
||||
CompletableFuture<Void> rollWALWriter(ServerName serverName);
|
||||
|
||||
/**
|
||||
* Clear compacting queues on a region server.
|
||||
* @param serverName
|
||||
* @param queues the set of queue name
|
||||
*/
|
||||
CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues);
|
||||
|
||||
/**
|
||||
* Get a list of {@link RegionLoad} of all regions hosted on a region seerver for a table.
|
||||
* @param serverName
|
||||
|
@ -903,6 +958,12 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<Optional<Long>> getLastMajorCompactionTimestampForRegion(byte[] regionName);
|
||||
|
||||
/**
|
||||
* @return the list of supported security capabilities. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<List<SecurityCapability>> getSecurityCapabilities();
|
||||
|
||||
/**
|
||||
* Turn the load balancer on or off.
|
||||
* @param on
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Collection;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -36,6 +37,8 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
||||
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
||||
import org.apache.hadoop.hbase.procedure2.LockInfo;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
|
@ -257,6 +260,26 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
return wrap(rawAdmin.majorCompactRegionServer(serverName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> setMergeOn(boolean on) {
|
||||
return wrap(rawAdmin.setMergeOn(on));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isMergeOn() {
|
||||
return wrap(rawAdmin.isMergeOn());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> setSplitOn(boolean on) {
|
||||
return wrap(rawAdmin.setSplitOn(on));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isSplitOn() {
|
||||
return wrap(rawAdmin.isSplitOn());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
|
||||
boolean forcible) {
|
||||
|
@ -438,6 +461,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
return wrap(rawAdmin.listProcedures());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<LockInfo>> listProcedureLocks() {
|
||||
return wrap(rawAdmin.listProcedureLocks());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> drainRegionServers(List<ServerName> servers) {
|
||||
return wrap(rawAdmin.drainRegionServers(servers));
|
||||
|
@ -483,6 +511,21 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
return wrap(rawAdmin.updateConfiguration());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
|
||||
return wrap(rawAdmin.rollWALWriter(serverName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
|
||||
return wrap(rawAdmin.clearCompactionQueues(serverName, queues));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
|
||||
return wrap(rawAdmin.getSecurityCapabilities());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
|
||||
Optional<TableName> tableName) {
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -71,8 +72,10 @@ import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterReques
|
|||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
||||
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
||||
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.procedure2.LockInfo;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaFilter;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaSettings;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
|
||||
|
@ -83,6 +86,8 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
|
||||
|
@ -95,6 +100,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionIn
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
|
||||
|
@ -162,8 +169,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedur
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsSplitOrMergeEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDrainingRegionServersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListLocksRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListLocksResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListProceduresRequest;
|
||||
|
@ -192,6 +203,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalog
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetCleanerChoreRunningRequest;
|
||||
|
@ -200,6 +213,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormali
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
|
||||
|
@ -1033,6 +1048,51 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> setMergeOn(boolean on) {
|
||||
return setSplitOrMergeOn(on, MasterSwitchType.MERGE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isMergeOn() {
|
||||
return isSplitOrMergeOn(MasterSwitchType.MERGE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> setSplitOn(boolean on) {
|
||||
return setSplitOrMergeOn(on, MasterSwitchType.SPLIT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> isSplitOn() {
|
||||
return isSplitOrMergeOn(MasterSwitchType.SPLIT);
|
||||
}
|
||||
|
||||
private CompletableFuture<Boolean> setSplitOrMergeOn(boolean on, MasterSwitchType switchType) {
|
||||
SetSplitOrMergeEnabledRequest request =
|
||||
RequestConverter.buildSetSplitOrMergeEnabledRequest(on, false, switchType);
|
||||
return this
|
||||
.<Boolean> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<SetSplitOrMergeEnabledRequest, SetSplitOrMergeEnabledResponse, Boolean> call(
|
||||
controller, stub, request, (s, c, req, done) -> s.setSplitOrMergeEnabled(c, req,
|
||||
done), (resp) -> resp.getPrevValueList().get(0))).call();
|
||||
}
|
||||
|
||||
private CompletableFuture<Boolean> isSplitOrMergeOn(MasterSwitchType switchType) {
|
||||
IsSplitOrMergeEnabledRequest request =
|
||||
RequestConverter.buildIsSplitOrMergeEnabledRequest(switchType);
|
||||
return this
|
||||
.<Boolean> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<IsSplitOrMergeEnabledRequest, IsSplitOrMergeEnabledResponse, Boolean> call(
|
||||
controller, stub, request,
|
||||
(s, c, req, done) -> s.isSplitOrMergeEnabled(c, req, done),
|
||||
(resp) -> resp.getEnabled())).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOfRegionB,
|
||||
boolean forcible) {
|
||||
|
@ -1929,6 +1989,17 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
.collect(Collectors.toList()))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<LockInfo>> listProcedureLocks() {
|
||||
return this
|
||||
.<List<LockInfo>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this.<ListLocksRequest, ListLocksResponse, List<LockInfo>> call(
|
||||
controller, stub, ListLocksRequest.newBuilder().build(),
|
||||
(s, c, req, done) -> s.listLocks(c, req, done), resp -> resp.getLockList().stream()
|
||||
.map(ProtobufUtil::toLockInfo).collect(Collectors.toList()))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> drainRegionServers(List<ServerName> servers) {
|
||||
return this
|
||||
|
@ -2421,6 +2492,41 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> rollWALWriter(ServerName serverName) {
|
||||
return this
|
||||
.<Void> newAdminCaller()
|
||||
.action(
|
||||
(controller, stub) -> this.<RollWALWriterRequest, RollWALWriterResponse, Void> adminCall(
|
||||
controller, stub, RequestConverter.buildRollWALWriterRequest(),
|
||||
(s, c, req, done) -> s.rollWALWriter(controller, req, done), resp -> null))
|
||||
.serverName(serverName).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) {
|
||||
return this
|
||||
.<Void> newAdminCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<ClearCompactionQueuesRequest, ClearCompactionQueuesResponse, Void> adminCall(
|
||||
controller, stub, RequestConverter.buildClearCompactionQueuesRequest(queues), (s,
|
||||
c, req, done) -> s.clearCompactionQueues(controller, req, done), resp -> null))
|
||||
.serverName(serverName).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<SecurityCapability>> getSecurityCapabilities() {
|
||||
return this
|
||||
.<List<SecurityCapability>> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<SecurityCapabilitiesRequest, SecurityCapabilitiesResponse, List<SecurityCapability>> call(
|
||||
controller, stub, SecurityCapabilitiesRequest.newBuilder().build(), (s, c, req,
|
||||
done) -> s.getSecurityCapabilities(c, req, done), (resp) -> ProtobufUtil
|
||||
.toSecurityCapabilityList(resp.getCapabilitiesList()))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<RegionLoad>> getRegionLoads(ServerName serverName,
|
||||
Optional<TableName> tableName) {
|
||||
|
|
|
@ -34,14 +34,18 @@ import java.util.Optional;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.RegionLoad;
|
||||
import org.apache.hadoop.hbase.ServerLoad;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -121,6 +125,92 @@ public class TestAsyncClusterAdminApi extends TestAsyncAdminBase {
|
|||
Files.copy(cnf3Path, cnfPath, StandardCopyOption.REPLACE_EXISTING);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollWALWALWriter() throws Exception {
|
||||
setUpforLogRolling();
|
||||
String className = this.getClass().getName();
|
||||
StringBuilder v = new StringBuilder(className);
|
||||
while (v.length() < 1000) {
|
||||
v.append(className);
|
||||
}
|
||||
byte[] value = Bytes.toBytes(v.toString());
|
||||
HRegionServer regionServer = startAndWriteData(tableName, value);
|
||||
LOG.info("after writing there are "
|
||||
+ AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files");
|
||||
|
||||
// flush all regions
|
||||
for (Region r : regionServer.getOnlineRegionsLocalContext()) {
|
||||
r.flush(true);
|
||||
}
|
||||
admin.rollWALWriter(regionServer.getServerName()).join();
|
||||
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
|
||||
LOG.info("after flushing all regions and rolling logs there are " +
|
||||
count + " log files");
|
||||
assertTrue(("actual count: " + count), count <= 2);
|
||||
}
|
||||
|
||||
private void setUpforLogRolling() {
|
||||
// Force a region split after every 768KB
|
||||
TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE,
|
||||
768L * 1024L);
|
||||
|
||||
// We roll the log after every 32 writes
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
|
||||
|
||||
TEST_UTIL.getConfiguration().setInt(
|
||||
"hbase.regionserver.logroll.errors.tolerated", 2);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
|
||||
|
||||
// For less frequently updated regions flush after every 2 flushes
|
||||
TEST_UTIL.getConfiguration().setInt(
|
||||
"hbase.hregion.memstore.optionalflushcount", 2);
|
||||
|
||||
// We flush the cache after every 8192 bytes
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
|
||||
8192);
|
||||
|
||||
// Increase the amount of time between client retries
|
||||
TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
|
||||
|
||||
// Reduce thread wake frequency so that other threads can get
|
||||
// a chance to run.
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY,
|
||||
2 * 1000);
|
||||
|
||||
/**** configuration for testLogRollOnDatanodeDeath ****/
|
||||
// lower the namenode & datanode heartbeat so the namenode
|
||||
// quickly detects datanode failures
|
||||
TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
|
||||
TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
|
||||
// the namenode might still try to choose the recently-dead datanode
|
||||
// for a pipeline, so try to a new pipeline multiple times
|
||||
TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
|
||||
TEST_UTIL.getConfiguration().setInt(
|
||||
"hbase.regionserver.hlog.tolerable.lowreplication", 2);
|
||||
TEST_UTIL.getConfiguration().setInt(
|
||||
"hbase.regionserver.hlog.lowreplication.rolllimit", 3);
|
||||
}
|
||||
|
||||
private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception {
|
||||
createTableWithDefaultConf(tableName);
|
||||
RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
|
||||
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
|
||||
for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
|
||||
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
|
||||
put.addColumn(FAMILY, null, value);
|
||||
table.put(put).join();
|
||||
if (i % 32 == 0) {
|
||||
// After every 32 writes sleep to let the log roller run
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return regionServer;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRegionLoads() throws Exception {
|
||||
// Turn off the balancer
|
||||
|
|
|
@ -354,6 +354,79 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
|
|||
Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitSwitch() throws Exception {
|
||||
createTableWithDefaultConf(tableName);
|
||||
byte[][] families = { FAMILY };
|
||||
loadData(tableName, families, 1000);
|
||||
|
||||
RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
|
||||
List<HRegionLocation> regionLocations =
|
||||
AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
|
||||
int originalCount = regionLocations.size();
|
||||
|
||||
initSplitMergeSwitch();
|
||||
assertTrue(admin.setSplitOn(false).get());
|
||||
admin.split(tableName).join();
|
||||
int count = admin.getTableRegions(tableName).get().size();
|
||||
assertTrue(originalCount == count);
|
||||
|
||||
assertFalse(admin.setSplitOn(true).get());
|
||||
admin.split(tableName).join();
|
||||
while ((count = admin.getTableRegions(tableName).get().size()) == originalCount) {
|
||||
Threads.sleep(100);
|
||||
}
|
||||
assertTrue(originalCount < count);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
// It was ignored in TestSplitOrMergeStatus, too
|
||||
public void testMergeSwitch() throws Exception {
|
||||
createTableWithDefaultConf(tableName);
|
||||
byte[][] families = { FAMILY };
|
||||
loadData(tableName, families, 1000);
|
||||
|
||||
RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
|
||||
List<HRegionLocation> regionLocations =
|
||||
AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
|
||||
int originalCount = regionLocations.size();
|
||||
|
||||
initSplitMergeSwitch();
|
||||
admin.split(tableName).join();
|
||||
int postSplitCount = originalCount;
|
||||
while ((postSplitCount = admin.getTableRegions(tableName).get().size()) == originalCount) {
|
||||
Threads.sleep(100);
|
||||
}
|
||||
assertTrue("originalCount=" + originalCount + ", postSplitCount=" + postSplitCount,
|
||||
originalCount != postSplitCount);
|
||||
|
||||
// Merge switch is off so merge should NOT succeed.
|
||||
assertTrue(admin.setMergeOn(false).get());
|
||||
List<HRegionInfo> regions = admin.getTableRegions(tableName).get();
|
||||
assertTrue(regions.size() > 1);
|
||||
admin.mergeRegions(regions.get(0).getRegionName(), regions.get(1).getRegionName(), true).join();
|
||||
int count = admin.getTableRegions(tableName).get().size();
|
||||
assertTrue("postSplitCount=" + postSplitCount + ", count=" + count, postSplitCount == count);
|
||||
|
||||
// Merge switch is on so merge should succeed.
|
||||
assertFalse(admin.setMergeOn(true).get());
|
||||
admin.mergeRegions(regions.get(0).getRegionName(), regions.get(1).getRegionName(), true).join();
|
||||
count = admin.getTableRegions(tableName).get().size();
|
||||
assertTrue((postSplitCount / 2) == count);
|
||||
}
|
||||
|
||||
private void initSplitMergeSwitch() throws Exception {
|
||||
if (!admin.isSplitOn().get()) {
|
||||
admin.setSplitOn(true).get();
|
||||
}
|
||||
if (!admin.isMergeOn().get()) {
|
||||
admin.setMergeOn(true).get();
|
||||
}
|
||||
assertTrue(admin.isSplitOn().get());
|
||||
assertTrue(admin.isMergeOn().get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeRegions() throws Exception {
|
||||
byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
|
||||
|
@ -569,6 +642,11 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
|
|||
return count;
|
||||
}
|
||||
|
||||
private static void loadData(final TableName tableName, final byte[][] families, final int rows)
|
||||
throws IOException {
|
||||
loadData(tableName, families, rows, 1);
|
||||
}
|
||||
|
||||
private static void loadData(final TableName tableName, final byte[][] families, final int rows,
|
||||
final int flushes) throws IOException {
|
||||
RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
|
||||
|
|
Loading…
Reference in New Issue