diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java index 9187473d73f..6b0d5888ce6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/AsyncMetaTableAccessor.java @@ -55,7 +55,7 @@ public class AsyncMetaTableAccessor { return getTableState(conn, tableName).thenApply(Optional::isPresent); } - private static CompletableFuture> getTableState(AsyncConnection conn, + public static CompletableFuture> getTableState(AsyncConnection conn, TableName tableName) { CompletableFuture> future = new CompletableFuture<>(); getMetaTable(conn).thenAccept((metaTable) -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 20a60709489..56036bfc35d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -19,10 +19,8 @@ package org.apache.hadoop.hbase.client; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; - -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -33,6 +31,19 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Public @InterfaceStability.Unstable public interface AsyncAdmin { + + /** + * @return Async Connection used by this object. + */ + AsyncConnectionImpl getConnection(); + + /** + * @param tableName Table to check. + * @return True if table exists already. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture tableExists(final TableName tableName); + /** * List all the userspace tables. * @return - returns an array of HTableDescriptors wrapped by a {@link CompletableFuture}. @@ -83,11 +94,75 @@ public interface AsyncAdmin { final boolean includeSysTables); /** - * @param tableName Table to check. - * @return True if table exists already. The return value will be wrapped by a - * {@link CompletableFuture}. + * Method for getting the tableDescriptor + * @param tableName as a {@link TableName} + * @return the tableDescriptor wrapped by a {@link CompletableFuture}. */ - CompletableFuture tableExists(final TableName tableName); + CompletableFuture getTableDescriptor(final TableName tableName); + + /** + * Creates a new table. + * @param desc table descriptor for table + */ + CompletableFuture createTable(HTableDescriptor desc); + + /** + * Creates a new table with the specified number of regions. The start key specified will become + * the end key of the first region of the table, and the end key specified will become the start + * key of the last region of the table (the first region has a null start key and the last region + * has a null end key). BigInteger math will be used to divide the key range specified into enough + * segments to make the required number of total regions. + * @param desc table descriptor for table + * @param startKey beginning of key range + * @param endKey end of key range + * @param numRegions the total number of regions to create + */ + CompletableFuture createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, + int numRegions); + + /** + * Creates a new table with an initial set of empty regions defined by the specified split keys. + * The total number of regions created will be the number of split keys plus one. + * Note : Avoid passing empty split key. + * @param desc table descriptor for table + * @param splitKeys array of split keys for the initial regions of the table + */ + CompletableFuture createTable(final HTableDescriptor desc, byte[][] splitKeys); + + /** + * Deletes a table. + * @param tableName name of table to delete + */ + CompletableFuture deleteTable(final TableName tableName); + + /** + * Deletes tables matching the passed in pattern and wait on completion. Warning: Use this method + * carefully, there is no prompting and the effect is immediate. Consider using + * {@link #listTables(String, boolean)} and + * {@link #deleteTable(org.apache.hadoop.hbase.TableName)} + * @param regex The regular expression to match table names against + * @return Table descriptors for tables that couldn't be deleted. The return value will be wrapped + * by a {@link CompletableFuture}. + */ + CompletableFuture deleteTables(String regex); + + /** + * Delete tables matching the passed in pattern and wait on completion. Warning: Use this method + * carefully, there is no prompting and the effect is immediate. Consider using + * {@link #listTables(Pattern, boolean) } and + * {@link #deleteTable(org.apache.hadoop.hbase.TableName)} + * @param pattern The pattern to match table names against + * @return Table descriptors for tables that couldn't be deleted. The return value will be wrapped + * by a {@link CompletableFuture}. + */ + CompletableFuture deleteTables(Pattern pattern); + + /** + * Truncate a table. + * @param tableName name of table to truncate + * @param preserveSplits True if the splits should be preserved + */ + CompletableFuture truncateTable(final TableName tableName, final boolean preserveSplits); /** * Turn the load balancer on or off. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 86821162b9f..5112b907802 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -18,14 +18,21 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.regex.Pattern; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterRequestCallerBuilder; @@ -33,17 +40,28 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; /** * The implementation of AsyncAdmin. @@ -52,6 +70,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalance @InterfaceStability.Evolving public class AsyncHBaseAdmin implements AsyncAdmin { + private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class); + private final AsyncConnectionImpl connection; private final long rpcTimeoutNs; @@ -64,6 +84,8 @@ public class AsyncHBaseAdmin implements AsyncAdmin { private final int startLogErrorsCnt; + private final NonceGenerator ng; + AsyncHBaseAdmin(AsyncConnectionImpl connection) { this.connection = connection; this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs(); @@ -71,6 +93,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin { this.pauseNs = connection.connConf.getPauseNs(); this.maxAttempts = connection.connConf.getMaxRetries(); this.startLogErrorsCnt = connection.connConf.getStartLogErrorsCnt(); + this.ng = connection.getNonceGenerator(); } private MasterRequestCallerBuilder newCaller() { @@ -114,6 +137,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return future; } + @Override + public AsyncConnectionImpl getConnection() { + return this.connection; + } + + @Override + public CompletableFuture tableExists(TableName tableName) { + return AsyncMetaTableAccessor.tableExists(connection, tableName); + } + @Override public CompletableFuture listTables() { return listTables((Pattern)null, false); @@ -158,6 +191,137 @@ public class AsyncHBaseAdmin implements AsyncAdmin { .getTableNameArray(resp.getTableNamesList()))).call(); } + @Override + public CompletableFuture getTableDescriptor(TableName tableName) { + CompletableFuture future = new CompletableFuture<>(); + this.> newCaller() + .action( + (controller, stub) -> this + .> call( + controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), (s, + c, req, done) -> s.getTableDescriptors(c, req, done), (resp) -> resp + .getTableSchemaList())).call().whenComplete((tableSchemas, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + if (!tableSchemas.isEmpty()) { + future.complete(ProtobufUtil.convertToHTableDesc(tableSchemas.get(0))); + } else { + future.completeExceptionally(new TableNotFoundException(tableName.getNameAsString())); + } + }); + return future; + } + + @Override + public CompletableFuture createTable(HTableDescriptor desc) { + return createTable(desc, null); + } + + @Override + public CompletableFuture createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, + int numRegions) { + try { + return createTable(desc, getSplitKeys(startKey, endKey, numRegions)); + } catch (IllegalArgumentException e) { + return failedFuture(e); + } + } + + @Override + public CompletableFuture createTable(HTableDescriptor desc, byte[][] splitKeys) { + if (desc.getTableName() == null) { + return failedFuture(new IllegalArgumentException("TableName cannot be null")); + } + if (splitKeys != null && splitKeys.length > 0) { + Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); + // Verify there are no duplicate split keys + byte[] lastKey = null; + for (byte[] splitKey : splitKeys) { + if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { + return failedFuture(new IllegalArgumentException( + "Empty split key must not be passed in the split keys.")); + } + if (lastKey != null && Bytes.equals(splitKey, lastKey)) { + return failedFuture(new IllegalArgumentException("All split keys must be unique, " + + "found duplicate: " + Bytes.toStringBinary(splitKey) + ", " + + Bytes.toStringBinary(lastKey))); + } + lastKey = splitKey; + } + } + + CompletableFuture procFuture = this + . newCaller() + .action( + (controller, stub) -> this. call( + controller, + stub, + RequestConverter.buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(), + ng.newNonce()), (s, c, req, done) -> s.createTable(c, req, done), + (resp) -> resp.getProcId())).call(); + return waitProcedureResult(procFuture).whenComplete( + new CreateTableProcedureBiConsumer(this, desc.getTableName())); + } + + @Override + public CompletableFuture deleteTable(TableName tableName) { + CompletableFuture procFuture = this + . newCaller() + .action( + (controller, stub) -> this. call( + controller, stub, + RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), + (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId())).call(); + return waitProcedureResult(procFuture).whenComplete( + new DeleteTableProcedureBiConsumer(this, tableName)); + } + + @Override + public CompletableFuture deleteTables(String regex) { + return deleteTables(Pattern.compile(regex)); + } + + @Override + public CompletableFuture deleteTables(Pattern pattern) { + CompletableFuture future = new CompletableFuture<>(); + List failed = new LinkedList<>(); + listTables(pattern, false).whenComplete( + (tables, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + CompletableFuture[] futures = Arrays.stream(tables) + .map((table) -> deleteTable(table.getTableName()).whenComplete((v, ex) -> { + if (ex != null) { + LOG.info("Failed to delete table " + table.getTableName(), ex); + failed.add(table); + } + })).toArray(size -> new CompletableFuture[size]); + CompletableFuture.allOf(futures).thenAccept((v) -> { + future.complete(failed.toArray(new HTableDescriptor[failed.size()])); + }); + }); + return future; + } + + @Override + public CompletableFuture truncateTable(TableName tableName, boolean preserveSplits) { + CompletableFuture procFuture = this + . newCaller() + .action( + (controller, stub) -> this. call( + controller, + stub, + RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, + ng.getNonceGroup(), ng.newNonce()), + (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId())).call(); + return waitProcedureResult(procFuture).whenComplete( + new TruncateTableProcedureBiConsumer(this, tableName)); + } + @Override public CompletableFuture setBalancerRunning(final boolean on) { return this @@ -196,8 +360,154 @@ public class AsyncHBaseAdmin implements AsyncAdmin { .call(); } - @Override - public CompletableFuture tableExists(TableName tableName) { - return AsyncMetaTableAccessor.tableExists(connection, tableName); + private byte[][] getSplitKeys(byte[] startKey, byte[] endKey, int numRegions) { + if (numRegions < 3) { + throw new IllegalArgumentException("Must create at least three regions"); + } else if (Bytes.compareTo(startKey, endKey) >= 0) { + throw new IllegalArgumentException("Start key must be smaller than end key"); + } + if (numRegions == 3) { + return new byte[][] { startKey, endKey }; + } + byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3); + if (splitKeys == null || splitKeys.length != numRegions - 1) { + throw new IllegalArgumentException("Unable to split key range into enough regions"); + } + return splitKeys; + } + + private abstract class ProcedureBiConsumer implements BiConsumer { + protected final AsyncAdmin admin; + + ProcedureBiConsumer(AsyncAdmin admin) { + this.admin = admin; + } + + abstract void onFinished(); + + abstract void onError(Throwable error); + + @Override + public void accept(Void v, Throwable error) { + if (error != null) { + onError(error); + return; + } + onFinished(); + } + } + + private abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { + protected final TableName tableName; + + TableProcedureBiConsumer(final AsyncAdmin admin, final TableName tableName) { + super(admin); + this.tableName = tableName; + } + + abstract String getOperationType(); + + String getDescription() { + return "Operation: " + getOperationType() + ", " + "Table Name: " + + tableName.getNameWithNamespaceInclAsString(); + } + + @Override + void onFinished() { + LOG.info(getDescription() + " completed"); + } + + @Override + void onError(Throwable error) { + LOG.info(getDescription() + " failed with " + error.getMessage()); + } + } + + private class CreateTableProcedureBiConsumer extends TableProcedureBiConsumer { + + CreateTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { + super(admin, tableName); + } + + String getOperationType() { + return "CREATE"; + } + } + + private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { + + DeleteTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { + super(admin, tableName); + } + + String getOperationType() { + return "DELETE"; + } + + @Override + void onFinished() { + this.admin.getConnection().getLocator().clearCache(this.tableName); + super.onFinished(); + } + } + + private class TruncateTableProcedureBiConsumer extends TableProcedureBiConsumer { + + TruncateTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { + super(admin, tableName); + } + + String getOperationType() { + return "TRUNCATE"; + } + } + + private CompletableFuture waitProcedureResult(CompletableFuture procFuture) { + CompletableFuture future = new CompletableFuture<>(); + procFuture.whenComplete((procId, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + getProcedureResult(procId, future); + }); + return future; + } + + private void getProcedureResult(final long procId, CompletableFuture future) { + this. newCaller() + .action( + (controller, stub) -> this + . call( + controller, stub, GetProcedureResultRequest.newBuilder().setProcId(procId).build(), + (s, c, req, done) -> s.getProcedureResult(c, req, done), (resp) -> resp)) + .call() + .whenComplete( + (response, error) -> { + if (error != null) { + LOG.warn("failed to get the procedure result procId=" + procId, + ConnectionUtils.translateException(error)); + connection.RETRY_TIMER.newTimeout(t -> getProcedureResult(procId, future), pauseNs, + TimeUnit.NANOSECONDS); + return; + } + if (response.getState() == GetProcedureResultResponse.State.RUNNING) { + connection.RETRY_TIMER.newTimeout(t -> getProcedureResult(procId, future), pauseNs, + TimeUnit.NANOSECONDS); + return; + } + if (response.hasException()) { + IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); + future.completeExceptionally(ioe); + } else { + future.complete(null); + } + }); + } + + private CompletableFuture failedFuture(Throwable error) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(error); + return future; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java index ccd68732631..0835b47150b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdmin.java @@ -17,22 +17,38 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletionException; import java.util.regex.Pattern; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -47,16 +63,18 @@ public class TestAsyncAdmin { private static final Log LOG = LogFactory.getLog(TestAdmin1.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static byte [] FAMILY = Bytes.toBytes("testFamily"); + private static byte[] FAMILY = Bytes.toBytes("testFamily"); private static AsyncConnection ASYNC_CONN; private AsyncAdmin admin; @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10); - TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 3); - TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 1000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PAUSE, 10); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 3000); + TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); TEST_UTIL.startMiniCluster(1); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); } @@ -72,8 +90,22 @@ public class TestAsyncAdmin { this.admin = ASYNC_CONN.getAdmin(); } + @Test + public void testTableExist() throws Exception { + final TableName table = TableName.valueOf("testTableExist"); + boolean exist; + exist = admin.tableExists(table).get(); + assertEquals(false, exist); + TEST_UTIL.createTable(table, FAMILY); + exist = admin.tableExists(table).get(); + assertEquals(true, exist); + exist = admin.tableExists(TableName.META_TABLE_NAME).get(); + assertEquals(true, exist); + } + @Test public void testListTables() throws Exception { + int numTables = admin.listTables().get().length; TableName t1 = TableName.valueOf("testListTables1"); TableName t2 = TableName.valueOf("testListTables2"); TableName t3 = TableName.valueOf("testListTables3"); @@ -98,7 +130,7 @@ public class TestAsyncAdmin { TableName[] tableNames = admin.listTableNames().get(); size = tableNames.length; - assertTrue(size >= tables.length); + assertTrue(size == (numTables + tables.length)); for (int i = 0; i < tables.length && i < size; i++) { boolean found = false; for (int j = 0; j < tableNames.length; j++) { @@ -113,10 +145,6 @@ public class TestAsyncAdmin { for (int i = 0; i < tables.length; i++) { TEST_UTIL.deleteTable(tables[i]); } - tableDescs = admin.listTables().get(); - assertEquals(0, tableDescs.length); - tableNames = admin.listTableNames().get(); - assertEquals(0, tableNames.length); tableDescs = admin.listTables((Pattern) null, true).get(); assertTrue("Not found system tables", tableDescs.length > 0); @@ -124,17 +152,372 @@ public class TestAsyncAdmin { assertTrue("Not found system tables", tableNames.length > 0); } - @Test - public void testTableExist() throws Exception { - final TableName table = TableName.valueOf("testTableExist"); - boolean exist; - exist = admin.tableExists(table).get(); - assertEquals(false, exist); - TEST_UTIL.createTable(table, FAMILY); - exist = admin.tableExists(table).get(); - assertEquals(true, exist); - exist = admin.tableExists(TableName.META_TABLE_NAME).get(); - assertEquals(true, exist); + @Test(timeout = 300000) + public void testGetTableDescriptor() throws Exception { + HColumnDescriptor fam1 = new HColumnDescriptor("fam1"); + HColumnDescriptor fam2 = new HColumnDescriptor("fam2"); + HColumnDescriptor fam3 = new HColumnDescriptor("fam3"); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("myTestTable")); + htd.addFamily(fam1); + htd.addFamily(fam2); + htd.addFamily(fam3); + admin.createTable(htd).join(); + HTableDescriptor confirmedHtd = admin.getTableDescriptor(htd.getTableName()).get(); + assertEquals(htd.compareTo(confirmedHtd), 0); + } + + @Test(timeout = 300000) + public void testCreateTable() throws Exception { + HTableDescriptor[] tables = admin.listTables().get(); + int numTables = tables.length; + TableName tableName = TableName.valueOf("testCreateTable"); + admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(FAMILY))) + .join(); + tables = admin.listTables().get(); + assertEquals(numTables + 1, tables.length); + assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster() + .getTableStateManager().isTableState(tableName, TableState.State.ENABLED)); + assertEquals(TableState.State.ENABLED, getStateFromMeta(tableName)); + } + + private TableState.State getStateFromMeta(TableName table) throws Exception { + Optional state = AsyncMetaTableAccessor.getTableState(ASYNC_CONN, table).get(); + assertTrue(state.isPresent()); + return state.get().getState(); + } + + @Test(timeout = 300000) + public void testCreateTableNumberOfRegions() throws Exception { + TableName tableName = TableName.valueOf("testCreateTableNumberOfRegions"); + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc).join(); + List regions; + try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) { + regions = l.getAllRegionLocations(); + assertEquals("Table should have only 1 region", 1, regions.size()); + } + + TableName TABLE_2 = TableName.valueOf(tableName.getNameAsString() + "_2"); + desc = new HTableDescriptor(TABLE_2); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, new byte[][] { new byte[] { 42 } }).join(); + try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(TABLE_2)) { + regions = l.getAllRegionLocations(); + assertEquals("Table should have only 2 region", 2, regions.size()); + } + + TableName TABLE_3 = TableName.valueOf(tableName.getNameAsString() + "_3"); + desc = new HTableDescriptor(TABLE_3); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, "a".getBytes(), "z".getBytes(), 3).join(); + try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(TABLE_3)) { + regions = l.getAllRegionLocations(); + assertEquals("Table should have only 3 region", 3, regions.size()); + } + + TableName TABLE_4 = TableName.valueOf(tableName.getNameAsString() + "_4"); + desc = new HTableDescriptor(TABLE_4); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + try { + admin.createTable(desc, "a".getBytes(), "z".getBytes(), 2).join(); + fail("Should not be able to create a table with only 2 regions using this API."); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + } + + TableName TABLE_5 = TableName.valueOf(tableName.getNameAsString() + "_5"); + desc = new HTableDescriptor(TABLE_5); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, new byte[] { 1 }, new byte[] { 127 }, 16).join(); + try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(TABLE_5)) { + regions = l.getAllRegionLocations(); + assertEquals("Table should have 16 region", 16, regions.size()); + } + } + + @Test(timeout = 300000) + public void testCreateTableWithRegions() throws IOException, InterruptedException { + + TableName tableName = TableName.valueOf("testCreateTableWithRegions"); + + byte[][] splitKeys = { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 }, + new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 }, + new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 }, }; + int expectedRegions = splitKeys.length + 1; + + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, splitKeys).join(); + + List regions; + Iterator hris; + HRegionInfo hri; + ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection(); + try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) { + regions = l.getAllRegionLocations(); + + assertEquals( + "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(), + expectedRegions, regions.size()); + System.err.println("Found " + regions.size() + " regions"); + + hris = regions.iterator(); + hri = hris.next().getRegionInfo(); + assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0); + assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[0])); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[0])); + assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[1])); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[1])); + assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[2])); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[2])); + assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[3])); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[3])); + assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[4])); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[4])); + assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[5])); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[5])); + assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[6])); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[6])); + assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[7])); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[7])); + assertTrue(Bytes.equals(hri.getEndKey(), splitKeys[8])); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), splitKeys[8])); + assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0); + + verifyRoundRobinDistribution(conn, l, expectedRegions); + } + + // Now test using start/end with a number of regions + + // Use 80 bit numbers to make sure we aren't limited + byte[] startKey = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + byte[] endKey = { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 }; + + // Splitting into 10 regions, we expect (null,1) ... (9, null) + // with (1,2) (2,3) (3,4) (4,5) (5,6) (6,7) (7,8) (8,9) in the middle + + expectedRegions = 10; + + TableName TABLE_2 = TableName.valueOf(tableName.getNameAsString() + "_2"); + + desc = new HTableDescriptor(TABLE_2); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, startKey, endKey, expectedRegions).join(); + + try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(TABLE_2)) { + regions = l.getAllRegionLocations(); + assertEquals( + "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(), + expectedRegions, regions.size()); + System.err.println("Found " + regions.size() + " regions"); + + hris = regions.iterator(); + hri = hris.next().getRegionInfo(); + assertTrue(hri.getStartKey() == null || hri.getStartKey().length == 0); + assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 })); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 })); + assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 })); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 2, 2, 2, 2, 2, 2, 2, 2, 2, 2 })); + assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 })); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 })); + assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 })); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 4, 4, 4, 4, 4, 4, 4, 4, 4, 4 })); + assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 })); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 })); + assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 })); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 6, 6, 6, 6, 6, 6, 6, 6, 6, 6 })); + assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 })); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 7, 7, 7, 7, 7, 7, 7, 7, 7, 7 })); + assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 })); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 8, 8, 8, 8, 8, 8, 8, 8, 8, 8 })); + assertTrue(Bytes.equals(hri.getEndKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 })); + hri = hris.next().getRegionInfo(); + assertTrue(Bytes.equals(hri.getStartKey(), new byte[] { 9, 9, 9, 9, 9, 9, 9, 9, 9, 9 })); + assertTrue(hri.getEndKey() == null || hri.getEndKey().length == 0); + + verifyRoundRobinDistribution(conn, l, expectedRegions); + } + + // Try once more with something that divides into something infinite + + startKey = new byte[] { 0, 0, 0, 0, 0, 0 }; + endKey = new byte[] { 1, 0, 0, 0, 0, 0 }; + + expectedRegions = 5; + + TableName TABLE_3 = TableName.valueOf(tableName.getNameAsString() + "_3"); + + desc = new HTableDescriptor(TABLE_3); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc, startKey, endKey, expectedRegions).join(); + + try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(TABLE_3)) { + regions = l.getAllRegionLocations(); + assertEquals( + "Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(), + expectedRegions, regions.size()); + System.err.println("Found " + regions.size() + " regions"); + + verifyRoundRobinDistribution(conn, l, expectedRegions); + } + + // Try an invalid case where there are duplicate split keys + splitKeys = new byte[][] { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, + new byte[] { 3, 3, 3 }, new byte[] { 2, 2, 2 } }; + + TableName TABLE_4 = TableName.valueOf(tableName.getNameAsString() + "_4"); + desc = new HTableDescriptor(TABLE_4); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + try { + admin.createTable(desc, splitKeys).join(); + fail("Should not be able to create this table because of " + "duplicate split keys"); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + + private void verifyRoundRobinDistribution(ClusterConnection c, RegionLocator regionLocator, + int expectedRegions) throws IOException { + int numRS = c.getCurrentNrHRS(); + List regions = regionLocator.getAllRegionLocations(); + Map> server2Regions = new HashMap<>(); + regions.stream().forEach((loc) -> { + ServerName server = loc.getServerName(); + server2Regions.computeIfAbsent(server, (s) -> new ArrayList<>()).add(loc.getRegionInfo()); + }); + if (numRS >= 2) { + // Ignore the master region server, + // which contains less regions by intention. + numRS--; + } + float average = (float) expectedRegions / numRS; + int min = (int) Math.floor(average); + int max = (int) Math.ceil(average); + server2Regions.values().forEach((regionList) -> { + assertTrue(regionList.size() == min || regionList.size() == max); + }); + } + + @Test(timeout = 300000) + public void testCreateTableWithOnlyEmptyStartRow() throws IOException { + byte[] tableName = Bytes.toBytes("testCreateTableWithOnlyEmptyStartRow"); + byte[][] splitKeys = new byte[1][]; + splitKeys[0] = HConstants.EMPTY_BYTE_ARRAY; + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + desc.addFamily(new HColumnDescriptor("col")); + try { + admin.createTable(desc, splitKeys).join(); + fail("Test case should fail as empty split key is passed."); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + + @Test(timeout = 300000) + public void testCreateTableWithEmptyRowInTheSplitKeys() throws IOException { + byte[] tableName = Bytes.toBytes("testCreateTableWithEmptyRowInTheSplitKeys"); + byte[][] splitKeys = new byte[3][]; + splitKeys[0] = "region1".getBytes(); + splitKeys[1] = HConstants.EMPTY_BYTE_ARRAY; + splitKeys[2] = "region2".getBytes(); + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + desc.addFamily(new HColumnDescriptor("col")); + try { + admin.createTable(desc, splitKeys).join(); + fail("Test case should fail as empty split key is passed."); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + + @Test(timeout = 300000) + public void testDeleteTable() throws Exception { + TableName table = TableName.valueOf("testDeleteTable"); + admin.createTable(new HTableDescriptor(table).addFamily(new HColumnDescriptor(FAMILY))).join(); + assertTrue(admin.tableExists(table).get()); + TEST_UTIL.getAdmin().disableTable(table); + admin.deleteTable(table).join(); + assertFalse(admin.tableExists(table).get()); + } + + @Test(timeout = 300000) + public void testDeleteTables() throws Exception { + TableName[] tables = { TableName.valueOf("testDeleteTables1"), + TableName.valueOf("testDeleteTables2"), TableName.valueOf("testDeleteTables3") }; + Arrays.stream(tables).map(HTableDescriptor::new) + .map((table) -> table.addFamily(new HColumnDescriptor(FAMILY))).forEach((table) -> { + admin.createTable(table).join(); + admin.tableExists(table.getTableName()).thenAccept((exist) -> assertTrue(exist)).join(); + try { + TEST_UTIL.getAdmin().disableTable(table.getTableName()); + } catch (Exception e) { + } + }); + HTableDescriptor[] failed = admin.deleteTables(Pattern.compile("testDeleteTables.*")).get(); + assertEquals(0, failed.length); + Arrays.stream(tables).forEach((table) -> { + admin.tableExists(table).thenAccept((exist) -> assertFalse(exist)).join(); + }); + } + + @Test(timeout = 300000) + public void testTruncateTable() throws IOException { + testTruncateTable(TableName.valueOf("testTruncateTable"), false); + } + + @Test(timeout = 300000) + public void testTruncateTablePreservingSplits() throws IOException { + testTruncateTable(TableName.valueOf("testTruncateTablePreservingSplits"), true); + } + + private void testTruncateTable(final TableName tableName, boolean preserveSplits) + throws IOException { + byte[][] splitKeys = new byte[2][]; + splitKeys[0] = Bytes.toBytes(4); + splitKeys[1] = Bytes.toBytes(8); + + // Create & Fill the table + Table table = TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY, splitKeys); + try { + TEST_UTIL.loadNumericRows(table, HConstants.CATALOG_FAMILY, 0, 10); + assertEquals(10, TEST_UTIL.countRows(table)); + } finally { + table.close(); + } + assertEquals(3, TEST_UTIL.getHBaseCluster().getRegions(tableName).size()); + + // Truncate & Verify + TEST_UTIL.getAdmin().disableTable(tableName); + admin.truncateTable(tableName, preserveSplits).join(); + table = TEST_UTIL.getConnection().getTable(tableName); + try { + assertEquals(0, TEST_UTIL.countRows(table)); + } finally { + table.close(); + } + if (preserveSplits) { + assertEquals(3, TEST_UTIL.getHBaseCluster().getRegions(tableName).size()); + } else { + assertEquals(1, TEST_UTIL.getHBaseCluster().getRegions(tableName).size()); + } } @Test(timeout = 30000)