HBASE-17500 Implement getTable/creatTable/deleteTable/truncateTable methods
This commit is contained in:
parent
85d701892e
commit
8aea84d723
|
@ -55,7 +55,7 @@ public class AsyncMetaTableAccessor {
|
|||
return getTableState(conn, tableName).thenApply(Optional::isPresent);
|
||||
}
|
||||
|
||||
private static CompletableFuture<Optional<TableState>> getTableState(AsyncConnection conn,
|
||||
public static CompletableFuture<Optional<TableState>> getTableState(AsyncConnection conn,
|
||||
TableName tableName) {
|
||||
CompletableFuture<Optional<TableState>> future = new CompletableFuture<>();
|
||||
getMetaTable(conn).thenAccept((metaTable) -> {
|
||||
|
|
|
@ -19,10 +19,8 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
@ -33,6 +31,19 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Unstable
|
||||
public interface AsyncAdmin {
|
||||
|
||||
/**
|
||||
* @return Async Connection used by this object.
|
||||
*/
|
||||
AsyncConnectionImpl getConnection();
|
||||
|
||||
/**
|
||||
* @param tableName Table to check.
|
||||
* @return True if table exists already. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> tableExists(final TableName tableName);
|
||||
|
||||
/**
|
||||
* List all the userspace tables.
|
||||
* @return - returns an array of HTableDescriptors wrapped by a {@link CompletableFuture}.
|
||||
|
@ -83,11 +94,75 @@ public interface AsyncAdmin {
|
|||
final boolean includeSysTables);
|
||||
|
||||
/**
|
||||
* @param tableName Table to check.
|
||||
* @return True if table exists already. The return value will be wrapped by a
|
||||
* {@link CompletableFuture}.
|
||||
* Method for getting the tableDescriptor
|
||||
* @param tableName as a {@link TableName}
|
||||
* @return the tableDescriptor wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Boolean> tableExists(final TableName tableName);
|
||||
CompletableFuture<HTableDescriptor> getTableDescriptor(final TableName tableName);
|
||||
|
||||
/**
|
||||
* Creates a new table.
|
||||
* @param desc table descriptor for table
|
||||
*/
|
||||
CompletableFuture<Void> createTable(HTableDescriptor desc);
|
||||
|
||||
/**
|
||||
* Creates a new table with the specified number of regions. The start key specified will become
|
||||
* the end key of the first region of the table, and the end key specified will become the start
|
||||
* key of the last region of the table (the first region has a null start key and the last region
|
||||
* has a null end key). BigInteger math will be used to divide the key range specified into enough
|
||||
* segments to make the required number of total regions.
|
||||
* @param desc table descriptor for table
|
||||
* @param startKey beginning of key range
|
||||
* @param endKey end of key range
|
||||
* @param numRegions the total number of regions to create
|
||||
*/
|
||||
CompletableFuture<Void> createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey,
|
||||
int numRegions);
|
||||
|
||||
/**
|
||||
* Creates a new table with an initial set of empty regions defined by the specified split keys.
|
||||
* The total number of regions created will be the number of split keys plus one.
|
||||
* Note : Avoid passing empty split key.
|
||||
* @param desc table descriptor for table
|
||||
* @param splitKeys array of split keys for the initial regions of the table
|
||||
*/
|
||||
CompletableFuture<Void> createTable(final HTableDescriptor desc, byte[][] splitKeys);
|
||||
|
||||
/**
|
||||
* Deletes a table.
|
||||
* @param tableName name of table to delete
|
||||
*/
|
||||
CompletableFuture<Void> deleteTable(final TableName tableName);
|
||||
|
||||
/**
|
||||
* Deletes tables matching the passed in pattern and wait on completion. Warning: Use this method
|
||||
* carefully, there is no prompting and the effect is immediate. Consider using
|
||||
* {@link #listTables(String, boolean)} and
|
||||
* {@link #deleteTable(org.apache.hadoop.hbase.TableName)}
|
||||
* @param regex The regular expression to match table names against
|
||||
* @return Table descriptors for tables that couldn't be deleted. The return value will be wrapped
|
||||
* by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<HTableDescriptor[]> deleteTables(String regex);
|
||||
|
||||
/**
|
||||
* Delete tables matching the passed in pattern and wait on completion. Warning: Use this method
|
||||
* carefully, there is no prompting and the effect is immediate. Consider using
|
||||
* {@link #listTables(Pattern, boolean) } and
|
||||
* {@link #deleteTable(org.apache.hadoop.hbase.TableName)}
|
||||
* @param pattern The pattern to match table names against
|
||||
* @return Table descriptors for tables that couldn't be deleted. The return value will be wrapped
|
||||
* by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<HTableDescriptor[]> deleteTables(Pattern pattern);
|
||||
|
||||
/**
|
||||
* Truncate a table.
|
||||
* @param tableName name of table to truncate
|
||||
* @param preserveSplits True if the splits should be preserved
|
||||
*/
|
||||
CompletableFuture<Void> truncateTable(final TableName tableName, final boolean preserveSplits);
|
||||
|
||||
/**
|
||||
* Turn the load balancer on or off.
|
||||
|
|
|
@ -18,14 +18,21 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder;
|
||||
|
@ -33,17 +40,28 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
|||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
||||
|
||||
/**
|
||||
* The implementation of AsyncAdmin.
|
||||
|
@ -52,6 +70,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalance
|
|||
@InterfaceStability.Evolving
|
||||
public class AsyncHBaseAdmin implements AsyncAdmin {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
|
||||
|
||||
private final AsyncConnectionImpl connection;
|
||||
|
||||
private final long rpcTimeoutNs;
|
||||
|
@ -64,6 +84,8 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
|
||||
private final int startLogErrorsCnt;
|
||||
|
||||
private final NonceGenerator ng;
|
||||
|
||||
AsyncHBaseAdmin(AsyncConnectionImpl connection) {
|
||||
this.connection = connection;
|
||||
this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
|
||||
|
@ -71,6 +93,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
this.pauseNs = connection.connConf.getPauseNs();
|
||||
this.maxAttempts = connection.connConf.getMaxRetries();
|
||||
this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt();
|
||||
this.ng = connection.getNonceGenerator();
|
||||
}
|
||||
|
||||
private <T> MasterRequestCallerBuilder<T> newCaller() {
|
||||
|
@ -114,6 +137,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncConnectionImpl getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> tableExists(TableName tableName) {
|
||||
return AsyncMetaTableAccessor.tableExists(connection, tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<HTableDescriptor[]> listTables() {
|
||||
return listTables((Pattern)null, false);
|
||||
|
@ -158,6 +191,137 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
.getTableNameArray(resp.getTableNamesList()))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<HTableDescriptor> getTableDescriptor(TableName tableName) {
|
||||
CompletableFuture<HTableDescriptor> future = new CompletableFuture<>();
|
||||
this.<List<TableSchema>> newCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<GetTableDescriptorsRequest, GetTableDescriptorsResponse, List<TableSchema>> call(
|
||||
controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s,
|
||||
c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp
|
||||
.getTableSchemaList())).call().whenComplete((tableSchemas, error) -> {
|
||||
if (error != null) {
|
||||
future.completeExceptionally(error);
|
||||
return;
|
||||
}
|
||||
if (!tableSchemas.isEmpty()) {
|
||||
future.complete(ProtobufUtil.convertToHTableDesc(tableSchemas.get(0)));
|
||||
} else {
|
||||
future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString()));
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> createTable(HTableDescriptor desc) {
|
||||
return createTable(desc, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey,
|
||||
int numRegions) {
|
||||
try {
|
||||
return createTable(desc, getSplitKeys(startKey, endKey, numRegions));
|
||||
} catch (IllegalArgumentException e) {
|
||||
return failedFuture(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> createTable(HTableDescriptor desc, byte[][] splitKeys) {
|
||||
if (desc.getTableName() == null) {
|
||||
return failedFuture(new IllegalArgumentException("TableName cannot be null"));
|
||||
}
|
||||
if (splitKeys != null && splitKeys.length > 0) {
|
||||
Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR);
|
||||
// Verify there are no duplicate split keys
|
||||
byte[] lastKey = null;
|
||||
for (byte[] splitKey : splitKeys) {
|
||||
if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) {
|
||||
return failedFuture(new IllegalArgumentException(
|
||||
"Empty split key must not be passed in the split keys."));
|
||||
}
|
||||
if (lastKey != null && Bytes.equals(splitKey, lastKey)) {
|
||||
return failedFuture(new IllegalArgumentException("All split keys must be unique, "
|
||||
+ "found duplicate: " + Bytes.toStringBinary(splitKey) + ", "
|
||||
+ Bytes.toStringBinary(lastKey)));
|
||||
}
|
||||
lastKey = splitKey;
|
||||
}
|
||||
}
|
||||
|
||||
CompletableFuture<Long> procFuture = this
|
||||
.<Long> newCaller()
|
||||
.action(
|
||||
(controller, stub) -> this.<CreateTableRequest, CreateTableResponse, Long> call(
|
||||
controller,
|
||||
stub,
|
||||
RequestConverter.buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(),
|
||||
ng.newNonce()), (s, c, req, done) -> s.createTable(c, req, done),
|
||||
(resp) -> resp.getProcId())).call();
|
||||
return waitProcedureResult(procFuture).whenComplete(
|
||||
new CreateTableProcedureBiConsumer(this, desc.getTableName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> deleteTable(TableName tableName) {
|
||||
CompletableFuture<Long> procFuture = this
|
||||
.<Long> newCaller()
|
||||
.action(
|
||||
(controller, stub) -> this.<DeleteTableRequest, DeleteTableResponse, Long> call(
|
||||
controller, stub,
|
||||
RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
|
||||
(s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId())).call();
|
||||
return waitProcedureResult(procFuture).whenComplete(
|
||||
new DeleteTableProcedureBiConsumer(this, tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<HTableDescriptor[]> deleteTables(String regex) {
|
||||
return deleteTables(Pattern.compile(regex));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<HTableDescriptor[]> deleteTables(Pattern pattern) {
|
||||
CompletableFuture<HTableDescriptor[]> future = new CompletableFuture<>();
|
||||
List<HTableDescriptor> failed = new LinkedList<>();
|
||||
listTables(pattern, false).whenComplete(
|
||||
(tables, error) -> {
|
||||
if (error != null) {
|
||||
future.completeExceptionally(error);
|
||||
return;
|
||||
}
|
||||
CompletableFuture[] futures = Arrays.stream(tables)
|
||||
.map((table) -> deleteTable(table.getTableName()).whenComplete((v, ex) -> {
|
||||
if (ex != null) {
|
||||
LOG.info("Failed to delete table " + table.getTableName(), ex);
|
||||
failed.add(table);
|
||||
}
|
||||
})).toArray(size -> new CompletableFuture[size]);
|
||||
CompletableFuture.allOf(futures).thenAccept((v) -> {
|
||||
future.complete(failed.toArray(new HTableDescriptor[failed.size()]));
|
||||
});
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
|
||||
CompletableFuture<Long> procFuture = this
|
||||
.<Long> newCaller()
|
||||
.action(
|
||||
(controller, stub) -> this.<TruncateTableRequest, TruncateTableResponse, Long> call(
|
||||
controller,
|
||||
stub,
|
||||
RequestConverter.buildTruncateTableRequest(tableName, preserveSplits,
|
||||
ng.getNonceGroup(), ng.newNonce()),
|
||||
(s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId())).call();
|
||||
return waitProcedureResult(procFuture).whenComplete(
|
||||
new TruncateTableProcedureBiConsumer(this, tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> setBalancerRunning(final boolean on) {
|
||||
return this
|
||||
|
@ -196,8 +360,154 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> tableExists(TableName tableName) {
|
||||
return AsyncMetaTableAccessor.tableExists(connection, tableName);
|
||||
private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) {
|
||||
if (numRegions < 3) {
|
||||
throw new IllegalArgumentException("Must create at least three regions");
|
||||
} else if (Bytes.compareTo(startKey, endKey) >= 0) {
|
||||
throw new IllegalArgumentException("Start key must be smaller than end key");
|
||||
}
|
||||
if (numRegions == 3) {
|
||||
return new byte[][] { startKey, endKey };
|
||||
}
|
||||
byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
|
||||
if (splitKeys == null || splitKeys.length != numRegions - 1) {
|
||||
throw new IllegalArgumentException("Unable to split key range into enough regions");
|
||||
}
|
||||
return splitKeys;
|
||||
}
|
||||
|
||||
private abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> {
|
||||
protected final AsyncAdmin admin;
|
||||
|
||||
ProcedureBiConsumer(AsyncAdmin admin) {
|
||||
this.admin = admin;
|
||||
}
|
||||
|
||||
abstract void onFinished();
|
||||
|
||||
abstract void onError(Throwable error);
|
||||
|
||||
@Override
|
||||
public void accept(Void v, Throwable error) {
|
||||
if (error != null) {
|
||||
onError(error);
|
||||
return;
|
||||
}
|
||||
onFinished();
|
||||
}
|
||||
}
|
||||
|
||||
private abstract class TableProcedureBiConsumer extends ProcedureBiConsumer {
|
||||
protected final TableName tableName;
|
||||
|
||||
TableProcedureBiConsumer(final AsyncAdmin admin, final TableName tableName) {
|
||||
super(admin);
|
||||
this.tableName = tableName;
|
||||
}
|
||||
|
||||
abstract String getOperationType();
|
||||
|
||||
String getDescription() {
|
||||
return "Operation: " + getOperationType() + ", " + "Table Name: "
|
||||
+ tableName.getNameWithNamespaceInclAsString();
|
||||
}
|
||||
|
||||
@Override
|
||||
void onFinished() {
|
||||
LOG.info(getDescription() + " completed");
|
||||
}
|
||||
|
||||
@Override
|
||||
void onError(Throwable error) {
|
||||
LOG.info(getDescription() + " failed with " + error.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer {
|
||||
|
||||
CreateTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
|
||||
super(admin, tableName);
|
||||
}
|
||||
|
||||
String getOperationType() {
|
||||
return "CREATE";
|
||||
}
|
||||
}
|
||||
|
||||
private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
|
||||
|
||||
DeleteTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
|
||||
super(admin, tableName);
|
||||
}
|
||||
|
||||
String getOperationType() {
|
||||
return "DELETE";
|
||||
}
|
||||
|
||||
@Override
|
||||
void onFinished() {
|
||||
this.admin.getConnection().getLocator().clearCache(this.tableName);
|
||||
super.onFinished();
|
||||
}
|
||||
}
|
||||
|
||||
private class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer {
|
||||
|
||||
TruncateTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
|
||||
super(admin, tableName);
|
||||
}
|
||||
|
||||
String getOperationType() {
|
||||
return "TRUNCATE";
|
||||
}
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
procFuture.whenComplete((procId, error) -> {
|
||||
if (error != null) {
|
||||
future.completeExceptionally(error);
|
||||
return;
|
||||
}
|
||||
getProcedureResult(procId, future);
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
private void getProcedureResult(final long procId, CompletableFuture<Void> future) {
|
||||
this.<GetProcedureResultResponse> newCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<GetProcedureResultRequest, GetProcedureResultResponse, GetProcedureResultResponse> call(
|
||||
controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(),
|
||||
(s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp))
|
||||
.call()
|
||||
.whenComplete(
|
||||
(response, error) -> {
|
||||
if (error != null) {
|
||||
LOG.warn("failed to get the procedure result procId=" + procId,
|
||||
ConnectionUtils.translateException(error));
|
||||
connection.RETRY_TIMER.newTimeout(t -> getProcedureResult(procId, future), pauseNs,
|
||||
TimeUnit.NANOSECONDS);
|
||||
return;
|
||||
}
|
||||
if (response.getState() == GetProcedureResultResponse.State.RUNNING) {
|
||||
connection.RETRY_TIMER.newTimeout(t -> getProcedureResult(procId, future), pauseNs,
|
||||
TimeUnit.NANOSECONDS);
|
||||
return;
|
||||
}
|
||||
if (response.hasException()) {
|
||||
IOException ioe = ForeignExceptionUtil.toIOException(response.getException());
|
||||
future.completeExceptionally(ioe);
|
||||
} else {
|
||||
future.complete(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private <T> CompletableFuture<T> failedFuture(Throwable error) {
|
||||
CompletableFuture<T> future = new CompletableFuture<>();
|
||||
future.completeExceptionally(error);
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,22 +17,38 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -47,16 +63,18 @@ public class TestAsyncAdmin {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(TestAdmin1.class);
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static byte [] FAMILY = Bytes.toBytes("testFamily");
|
||||
private static byte[] FAMILY = Bytes.toBytes("testFamily");
|
||||
|
||||
private static AsyncConnection ASYNC_CONN;
|
||||
private AsyncAdmin admin;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 3);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 1000);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PAUSE, 10);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1000);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 3000);
|
||||
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
|
||||
}
|
||||
|
@ -72,8 +90,22 @@ public class TestAsyncAdmin {
|
|||
this.admin = ASYNC_CONN.getAdmin();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableExist() throws Exception {
|
||||
final TableName table = TableName.valueOf("testTableExist");
|
||||
boolean exist;
|
||||
exist = admin.tableExists(table).get();
|
||||
assertEquals(false, exist);
|
||||
TEST_UTIL.createTable(table, FAMILY);
|
||||
exist = admin.tableExists(table).get();
|
||||
assertEquals(true, exist);
|
||||
exist = admin.tableExists(TableName.META_TABLE_NAME).get();
|
||||
assertEquals(true, exist);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListTables() throws Exception {
|
||||
int numTables = admin.listTables().get().length;
|
||||
TableName t1 = TableName.valueOf("testListTables1");
|
||||
TableName t2 = TableName.valueOf("testListTables2");
|
||||
TableName t3 = TableName.valueOf("testListTables3");
|
||||
|
@ -98,7 +130,7 @@ public class TestAsyncAdmin {
|
|||
|
||||
TableName[] tableNames = admin.listTableNames().get();
|
||||
size = tableNames.length;
|
||||
assertTrue(size >= tables.length);
|
||||
assertTrue(size == (numTables + tables.length));
|
||||
for (int i = 0; i < tables.length && i < size; i++) {
|
||||
boolean found = false;
|
||||
for (int j = 0; j < tableNames.length; j++) {
|
||||
|
@ -113,10 +145,6 @@ public class TestAsyncAdmin {
|
|||
for (int i = 0; i < tables.length; i++) {
|
||||
TEST_UTIL.deleteTable(tables[i]);
|
||||
}
|
||||
tableDescs = admin.listTables().get();
|
||||
assertEquals(0, tableDescs.length);
|
||||
tableNames = admin.listTableNames().get();
|
||||
assertEquals(0, tableNames.length);
|
||||
|
||||
tableDescs = admin.listTables((Pattern) null, true).get();
|
||||
assertTrue("Not found system tables", tableDescs.length > 0);
|
||||
|
@ -124,17 +152,372 @@ public class TestAsyncAdmin {
|
|||
assertTrue("Not found system tables", tableNames.length > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableExist() throws Exception {
|
||||
final TableName table = TableName.valueOf("testTableExist");
|
||||
boolean exist;
|
||||
exist = admin.tableExists(table).get();
|
||||
assertEquals(false, exist);
|
||||
TEST_UTIL.createTable(table, FAMILY);
|
||||
exist = admin.tableExists(table).get();
|
||||
assertEquals(true, exist);
|
||||
exist = admin.tableExists(TableName.META_TABLE_NAME).get();
|
||||
assertEquals(true, exist);
|
||||
@Test(timeout = 300000)
|
||||
public void testGetTableDescriptor() throws Exception {
|
||||
HColumnDescriptor fam1 = new HColumnDescriptor("fam1");
|
||||
HColumnDescriptor fam2 = new HColumnDescriptor("fam2");
|
||||
HColumnDescriptor fam3 = new HColumnDescriptor("fam3");
|
||||
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("myTestTable"));
|
||||
htd.addFamily(fam1);
|
||||
htd.addFamily(fam2);
|
||||
htd.addFamily(fam3);
|
||||
admin.createTable(htd).join();
|
||||
HTableDescriptor confirmedHtd = admin.getTableDescriptor(htd.getTableName()).get();
|
||||
assertEquals(htd.compareTo(confirmedHtd), 0);
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testCreateTable() throws Exception {
|
||||
HTableDescriptor[] tables = admin.listTables().get();
|
||||
int numTables = tables.length;
|
||||
TableName tableName = TableName.valueOf("testCreateTable");
|
||||
admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(FAMILY)))
|
||||
.join();
|
||||
tables = admin.listTables().get();
|
||||
assertEquals(numTables + 1, tables.length);
|
||||
assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
|
||||
.getTableStateManager().isTableState(tableName, TableState.State.ENABLED));
|
||||
assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName));
|
||||
}
|
||||
|
||||
private TableState.State getStateFromMeta(TableName table) throws Exception {
|
||||
Optional<TableState> state = AsyncMetaTableAccessor.getTableState(ASYNC_CONN, table).get();
|
||||
assertTrue(state.isPresent());
|
||||
return state.get().getState();
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testCreateTableNumberOfRegions() throws Exception {
|
||||
TableName tableName = TableName.valueOf("testCreateTableNumberOfRegions");
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
admin.createTable(desc).join();
|
||||
List<HRegionLocation> regions;
|
||||
try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
|
||||
regions = l.getAllRegionLocations();
|
||||
assertEquals("Table should have only 1 region", 1, regions.size());
|
||||
}
|
||||
|
||||
TableName TABLE_2 = TableName.valueOf(tableName.getNameAsString() + "_2");
|
||||
desc = new HTableDescriptor(TABLE_2);
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
admin.createTable(desc, new byte[][] { new byte[] { 42 } }).join();
|
||||
try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(TABLE_2)) {
|
||||
regions = l.getAllRegionLocations();
|
||||
assertEquals("Table should have only 2 region", 2, regions.size());
|
||||
}
|
||||
|
||||
TableName TABLE_3 = TableName.valueOf(tableName.getNameAsString() + "_3");
|
||||
desc = new HTableDescriptor(TABLE_3);
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
admin.createTable(desc, "a".getBytes(), "z".getBytes(), 3).join();
|
||||
try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(TABLE_3)) {
|
||||
regions = l.getAllRegionLocations();
|
||||
assertEquals("Table should have only 3 region", 3, regions.size());
|
||||
}
|
||||
|
||||
TableName TABLE_4 = TableName.valueOf(tableName.getNameAsString() + "_4");
|
||||
desc = new HTableDescriptor(TABLE_4);
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
try {
|
||||
admin.createTable(desc, "a".getBytes(), "z".getBytes(), 2).join();
|
||||
fail("Should not be able to create a table with only 2 regions using this API.");
|
||||
} catch (CompletionException e) {
|
||||
assertTrue(e.getCause() instanceof IllegalArgumentException);
|
||||
}
|
||||
|
||||
TableName TABLE_5 = TableName.valueOf(tableName.getNameAsString() + "_5");
|
||||
desc = new HTableDescriptor(TABLE_5);
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
admin.createTable(desc, new byte[] { 1 }, new byte[] { 127 }, 16).join();
|
||||
try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(TABLE_5)) {
|
||||
regions = l.getAllRegionLocations();
|
||||
assertEquals("Table should have 16 region", 16, regions.size());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testCreateTableWithRegions() throws IOException, InterruptedException {
|
||||
|
||||
TableName tableName = TableName.valueOf("testCreateTableWithRegions");
|
||||
|
||||
byte[][] splitKeys = { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 },
|
||||
new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 },
|
||||
new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 }, };
|
||||
int expectedRegions = splitKeys.length + 1;
|
||||
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
admin.createTable(desc, splitKeys).join();
|
||||
|
||||
List<HRegionLocation> regions;
|
||||
Iterator<HRegionLocation> hris;
|
||||
HRegionInfo hri;
|
||||
ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
|
||||
try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
|
||||
regions = l.getAllRegionLocations();
|
||||
|
||||
assertEquals(
|
||||
"Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
|
||||
expectedRegions, regions.size());
|
||||
System.err.println("Found " + regions.size() + " regions");
|
||||
|
||||
hris = regions.iterator();
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0);
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[0]));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[0]));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[1]));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[1]));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[2]));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[2]));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[3]));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[3]));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[4]));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[4]));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[5]));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[5]));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[6]));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[6]));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[7]));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[7]));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[8]));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[8]));
|
||||
assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
|
||||
|
||||
verifyRoundRobinDistribution(conn, l, expectedRegions);
|
||||
}
|
||||
|
||||
// Now test using start/end with a number of regions
|
||||
|
||||
// Use 80 bit numbers to make sure we aren't limited
|
||||
byte[] startKey = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
|
||||
byte[] endKey = { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 };
|
||||
|
||||
// Splitting into 10 regions, we expect (null,1) ... (9, null)
|
||||
// with (1,2) (2,3) (3,4) (4,5) (5,6) (6,7) (7,8) (8,9) in the middle
|
||||
|
||||
expectedRegions = 10;
|
||||
|
||||
TableName TABLE_2 = TableName.valueOf(tableName.getNameAsString() + "_2");
|
||||
|
||||
desc = new HTableDescriptor(TABLE_2);
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
admin.createTable(desc, startKey, endKey, expectedRegions).join();
|
||||
|
||||
try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(TABLE_2)) {
|
||||
regions = l.getAllRegionLocations();
|
||||
assertEquals(
|
||||
"Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
|
||||
expectedRegions, regions.size());
|
||||
System.err.println("Found " + regions.size() + " regions");
|
||||
|
||||
hris = regions.iterator();
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0);
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 }));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 }));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 }));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 }));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 }));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 }));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 }));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 }));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 }));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 }));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 }));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 }));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 }));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 }));
|
||||
assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 }));
|
||||
hri = hris.next().getRegionInfo();
|
||||
assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 }));
|
||||
assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0);
|
||||
|
||||
verifyRoundRobinDistribution(conn, l, expectedRegions);
|
||||
}
|
||||
|
||||
// Try once more with something that divides into something infinite
|
||||
|
||||
startKey = new byte[] { 0, 0, 0, 0, 0, 0 };
|
||||
endKey = new byte[] { 1, 0, 0, 0, 0, 0 };
|
||||
|
||||
expectedRegions = 5;
|
||||
|
||||
TableName TABLE_3 = TableName.valueOf(tableName.getNameAsString() + "_3");
|
||||
|
||||
desc = new HTableDescriptor(TABLE_3);
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
admin.createTable(desc, startKey, endKey, expectedRegions).join();
|
||||
|
||||
try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(TABLE_3)) {
|
||||
regions = l.getAllRegionLocations();
|
||||
assertEquals(
|
||||
"Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
|
||||
expectedRegions, regions.size());
|
||||
System.err.println("Found " + regions.size() + " regions");
|
||||
|
||||
verifyRoundRobinDistribution(conn, l, expectedRegions);
|
||||
}
|
||||
|
||||
// Try an invalid case where there are duplicate split keys
|
||||
splitKeys = new byte[][] { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 },
|
||||
new byte[] { 3, 3, 3 }, new byte[] { 2, 2, 2 } };
|
||||
|
||||
TableName TABLE_4 = TableName.valueOf(tableName.getNameAsString() + "_4");
|
||||
desc = new HTableDescriptor(TABLE_4);
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
try {
|
||||
admin.createTable(desc, splitKeys).join();
|
||||
fail("Should not be able to create this table because of " + "duplicate split keys");
|
||||
} catch (CompletionException e) {
|
||||
assertTrue(e.getCause() instanceof IllegalArgumentException);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyRoundRobinDistribution(ClusterConnection c, RegionLocator regionLocator,
|
||||
int expectedRegions) throws IOException {
|
||||
int numRS = c.getCurrentNrHRS();
|
||||
List<HRegionLocation> regions = regionLocator.getAllRegionLocations();
|
||||
Map<ServerName, List<HRegionInfo>> server2Regions = new HashMap<>();
|
||||
regions.stream().forEach((loc) -> {
|
||||
ServerName server = loc.getServerName();
|
||||
server2Regions.computeIfAbsent(server, (s) -> new ArrayList<>()).add(loc.getRegionInfo());
|
||||
});
|
||||
if (numRS >= 2) {
|
||||
// Ignore the master region server,
|
||||
// which contains less regions by intention.
|
||||
numRS--;
|
||||
}
|
||||
float average = (float) expectedRegions / numRS;
|
||||
int min = (int) Math.floor(average);
|
||||
int max = (int) Math.ceil(average);
|
||||
server2Regions.values().forEach((regionList) -> {
|
||||
assertTrue(regionList.size() == min || regionList.size() == max);
|
||||
});
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testCreateTableWithOnlyEmptyStartRow() throws IOException {
|
||||
byte[] tableName = Bytes.toBytes("testCreateTableWithOnlyEmptyStartRow");
|
||||
byte[][] splitKeys = new byte[1][];
|
||||
splitKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
|
||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
|
||||
desc.addFamily(new HColumnDescriptor("col"));
|
||||
try {
|
||||
admin.createTable(desc, splitKeys).join();
|
||||
fail("Test case should fail as empty split key is passed.");
|
||||
} catch (CompletionException e) {
|
||||
assertTrue(e.getCause() instanceof IllegalArgumentException);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testCreateTableWithEmptyRowInTheSplitKeys() throws IOException {
|
||||
byte[] tableName = Bytes.toBytes("testCreateTableWithEmptyRowInTheSplitKeys");
|
||||
byte[][] splitKeys = new byte[3][];
|
||||
splitKeys[0] = "region1".getBytes();
|
||||
splitKeys[1] = HConstants.EMPTY_BYTE_ARRAY;
|
||||
splitKeys[2] = "region2".getBytes();
|
||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
|
||||
desc.addFamily(new HColumnDescriptor("col"));
|
||||
try {
|
||||
admin.createTable(desc, splitKeys).join();
|
||||
fail("Test case should fail as empty split key is passed.");
|
||||
} catch (CompletionException e) {
|
||||
assertTrue(e.getCause() instanceof IllegalArgumentException);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testDeleteTable() throws Exception {
|
||||
TableName table = TableName.valueOf("testDeleteTable");
|
||||
admin.createTable(new HTableDescriptor(table).addFamily(new HColumnDescriptor(FAMILY))).join();
|
||||
assertTrue(admin.tableExists(table).get());
|
||||
TEST_UTIL.getAdmin().disableTable(table);
|
||||
admin.deleteTable(table).join();
|
||||
assertFalse(admin.tableExists(table).get());
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testDeleteTables() throws Exception {
|
||||
TableName[] tables = { TableName.valueOf("testDeleteTables1"),
|
||||
TableName.valueOf("testDeleteTables2"), TableName.valueOf("testDeleteTables3") };
|
||||
Arrays.stream(tables).map(HTableDescriptor::new)
|
||||
.map((table) -> table.addFamily(new HColumnDescriptor(FAMILY))).forEach((table) -> {
|
||||
admin.createTable(table).join();
|
||||
admin.tableExists(table.getTableName()).thenAccept((exist) -> assertTrue(exist)).join();
|
||||
try {
|
||||
TEST_UTIL.getAdmin().disableTable(table.getTableName());
|
||||
} catch (Exception e) {
|
||||
}
|
||||
});
|
||||
HTableDescriptor[] failed = admin.deleteTables(Pattern.compile("testDeleteTables.*")).get();
|
||||
assertEquals(0, failed.length);
|
||||
Arrays.stream(tables).forEach((table) -> {
|
||||
admin.tableExists(table).thenAccept((exist) -> assertFalse(exist)).join();
|
||||
});
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testTruncateTable() throws IOException {
|
||||
testTruncateTable(TableName.valueOf("testTruncateTable"), false);
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testTruncateTablePreservingSplits() throws IOException {
|
||||
testTruncateTable(TableName.valueOf("testTruncateTablePreservingSplits"), true);
|
||||
}
|
||||
|
||||
private void testTruncateTable(final TableName tableName, boolean preserveSplits)
|
||||
throws IOException {
|
||||
byte[][] splitKeys = new byte[2][];
|
||||
splitKeys[0] = Bytes.toBytes(4);
|
||||
splitKeys[1] = Bytes.toBytes(8);
|
||||
|
||||
// Create & Fill the table
|
||||
Table table = TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY, splitKeys);
|
||||
try {
|
||||
TEST_UTIL.loadNumericRows(table, HConstants.CATALOG_FAMILY, 0, 10);
|
||||
assertEquals(10, TEST_UTIL.countRows(table));
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
assertEquals(3, TEST_UTIL.getHBaseCluster().getRegions(tableName).size());
|
||||
|
||||
// Truncate & Verify
|
||||
TEST_UTIL.getAdmin().disableTable(tableName);
|
||||
admin.truncateTable(tableName, preserveSplits).join();
|
||||
table = TEST_UTIL.getConnection().getTable(tableName);
|
||||
try {
|
||||
assertEquals(0, TEST_UTIL.countRows(table));
|
||||
} finally {
|
||||
table.close();
|
||||
}
|
||||
if (preserveSplits) {
|
||||
assertEquals(3, TEST_UTIL.getHBaseCluster().getRegions(tableName).size());
|
||||
} else {
|
||||
assertEquals(1, TEST_UTIL.getHBaseCluster().getRegions(tableName).size());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
|
|
Loading…
Reference in New Issue