HBASE-17511 Implement enable/disable table methods

This commit is contained in:
Guanghao Zhang 2017-02-05 14:18:40 +08:00
parent ffa0cea2a3
commit 26a94844f5
4 changed files with 347 additions and 71 deletions

View File

@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RawAsyncTable;
import org.apache.hadoop.hbase.client.Result;
@ -44,29 +43,20 @@ public class AsyncMetaTableAccessor {
private static final Log LOG = LogFactory.getLog(AsyncMetaTableAccessor.class);
private static CompletableFuture<RawAsyncTable> getMetaTable(AsyncConnection conn) {
return CompletableFuture.completedFuture(conn.getRawTable(META_TABLE_NAME));
}
public static CompletableFuture<Boolean> tableExists(AsyncConnection conn, TableName tableName) {
public static CompletableFuture<Boolean> tableExists(RawAsyncTable metaTable, TableName tableName) {
if (tableName.equals(META_TABLE_NAME)) {
return CompletableFuture.completedFuture(true);
}
return getTableState(conn, tableName).thenApply(Optional::isPresent);
return getTableState(metaTable, tableName).thenApply(Optional::isPresent);
}
public static CompletableFuture<Optional<TableState>> getTableState(AsyncConnection conn,
public static CompletableFuture<Optional<TableState>> getTableState(RawAsyncTable metaTable,
TableName tableName) {
CompletableFuture<Optional<TableState>> future = new CompletableFuture<>();
getMetaTable(conn).thenAccept((metaTable) -> {
Get get = new Get(tableName.getName()).addColumn(getTableFamily(), getStateColumn());
long time = EnvironmentEdgeManager.currentTime();
try {
get.setTimeRange(0, time);
} catch (IOException ioe) {
future.completeExceptionally(ioe);
return;
}
metaTable.get(get).whenComplete((result, error) -> {
if (error != null) {
future.completeExceptionally(error);
@ -78,7 +68,9 @@ public class AsyncMetaTableAccessor {
future.completeExceptionally(e);
}
});
});
} catch (IOException ioe) {
future.completeExceptionally(ioe);
}
return future;
}

View File

@ -164,6 +164,58 @@ public interface AsyncAdmin {
*/
CompletableFuture<Void> truncateTable(final TableName tableName, final boolean preserveSplits);
/**
* Enable a table. The table has to be in disabled state for it to be enabled.
* @param tableName name of the table
*/
CompletableFuture<Void> enableTable(final TableName tableName);
/**
* Enable tables matching the passed in pattern. Warning: Use this method carefully, there is no
* prompting and the effect is immediate. Consider using {@link #listTables(Pattern, boolean)} and
* {@link #enableTable(TableName)}
* @param regex The regular expression to match table names against
* @return Table descriptors for tables that couldn't be enabled. The return value will be wrapped
* by a {@link CompletableFuture}.
*/
CompletableFuture<HTableDescriptor[]> enableTables(String regex);
/**
* Enable tables matching the passed in pattern. Warning: Use this method carefully, there is no
* prompting and the effect is immediate. Consider using {@link #listTables(Pattern, boolean)} and
* {@link #enableTable(TableName)}
* @param pattern The pattern to match table names against
* @return Table descriptors for tables that couldn't be enabled. The return value will be wrapped
* by a {@link CompletableFuture}.
*/
CompletableFuture<HTableDescriptor[]> enableTables(Pattern pattern);
/**
* Disable a table. The table has to be in enabled state for it to be disabled.
* @param tableName
*/
CompletableFuture<Void> disableTable(final TableName tableName);
/**
* Disable tables matching the passed in pattern. Warning: Use this method carefully, there is no
* prompting and the effect is immediate. Consider using {@link #listTables(Pattern, boolean)} and
* {@link #disableTable(TableName)}
* @param regex The regular expression to match table names against
* @return Table descriptors for tables that couldn't be disabled. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<HTableDescriptor[]> disableTables(String regex);
/**
* Disable tables matching the passed in pattern. Warning: Use this method carefully, there is no
* prompting and the effect is immediate. Consider using {@link #listTables(Pattern, boolean)} and
* {@link #disableTable(TableName)}
* @param pattern The pattern to match table names against
* @return Table descriptors for tables that couldn't be disabled. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<HTableDescriptor[]> disableTables(Pattern pattern);
/**
* Turn the load balancer on or off.
* @param on

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
@ -43,6 +45,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
@ -74,6 +80,8 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
private final AsyncConnectionImpl connection;
private final RawAsyncTable metaTable;
private final long rpcTimeoutNs;
private final long operationTimeoutNs;
@ -88,6 +96,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
AsyncHBaseAdmin(AsyncConnectionImpl connection) {
this.connection = connection;
this.metaTable = connection.getRawTable(META_TABLE_NAME);
this.rpcTimeoutNs = connection.connConf.getRpcTimeoutNs();
this.operationTimeoutNs = connection.connConf.getOperationTimeoutNs();
this.pauseNs = connection.connConf.getPauseNs();
@ -137,6 +146,46 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
return future;
}
private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq,
RpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter,
TableProcedureBiConsumer consumer) {
CompletableFuture<Long> procFuture = this
.<Long> newCaller()
.action(
(controller, stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall,
respConverter)).call();
return waitProcedureResult(procFuture).whenComplete(consumer);
}
@FunctionalInterface
private interface TableOperator {
CompletableFuture<Void> operate(TableName table);
}
private CompletableFuture<HTableDescriptor[]> batchTableOperations(Pattern pattern,
TableOperator operator, String operationType) {
CompletableFuture<HTableDescriptor[]> future = new CompletableFuture<>();
List<HTableDescriptor> failed = new LinkedList<>();
listTables(pattern, false).whenComplete(
(tables, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
CompletableFuture[] futures = Arrays.stream(tables)
.map((table) -> operator.operate(table.getTableName()).whenComplete((v, ex) -> {
if (ex != null) {
LOG.info("Failed to " + operationType + " table " + table.getTableName(), ex);
failed.add(table);
}
})).toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).thenAccept((v) -> {
future.complete(failed.toArray(new HTableDescriptor[failed.size()]));
});
});
return future;
}
@Override
public AsyncConnectionImpl getConnection() {
return this.connection;
@ -144,7 +193,7 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Boolean> tableExists(TableName tableName) {
return AsyncMetaTableAccessor.tableExists(connection, tableName);
return AsyncMetaTableAccessor.tableExists(metaTable, tableName);
}
@Override
@ -252,29 +301,17 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
}
}
CompletableFuture<Long> procFuture = this
.<Long> newCaller()
.action(
(controller, stub) -> this.<CreateTableRequest, CreateTableResponse, Long> call(
controller,
stub,
RequestConverter.buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.createTable(c, req, done),
(resp) -> resp.getProcId())).call();
return waitProcedureResult(procFuture).whenComplete(
return this.<CreateTableRequest, CreateTableResponse> procedureCall(
RequestConverter.buildCreateTableRequest(desc, splitKeys, ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(),
new CreateTableProcedureBiConsumer(this, desc.getTableName()));
}
@Override
public CompletableFuture<Void> deleteTable(TableName tableName) {
CompletableFuture<Long> procFuture = this
.<Long> newCaller()
.action(
(controller, stub) -> this.<DeleteTableRequest, DeleteTableResponse, Long> call(
controller, stub,
RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId())).call();
return waitProcedureResult(procFuture).whenComplete(
return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(),
new DeleteTableProcedureBiConsumer(this, tableName));
}
@ -285,41 +322,51 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<HTableDescriptor[]> deleteTables(Pattern pattern) {
CompletableFuture<HTableDescriptor[]> future = new CompletableFuture<>();
List<HTableDescriptor> failed = new LinkedList<>();
listTables(pattern, false).whenComplete(
(tables, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
CompletableFuture[] futures = Arrays.stream(tables)
.map((table) -> deleteTable(table.getTableName()).whenComplete((v, ex) -> {
if (ex != null) {
LOG.info("Failed to delete table " + table.getTableName(), ex);
failed.add(table);
}
})).toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).thenAccept((v) -> {
future.complete(failed.toArray(new HTableDescriptor[failed.size()]));
});
});
return future;
return batchTableOperations(pattern, (table) -> deleteTable(table), "DELETE");
}
@Override
public CompletableFuture<Void> truncateTable(TableName tableName, boolean preserveSplits) {
CompletableFuture<Long> procFuture = this
.<Long> newCaller()
.action(
(controller, stub) -> this.<TruncateTableRequest, TruncateTableResponse, Long> call(
controller,
stub,
RequestConverter.buildTruncateTableRequest(tableName, preserveSplits,
ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId())).call();
return waitProcedureResult(procFuture).whenComplete(
new TruncateTableProcedureBiConsumer(this, tableName));
return this.<TruncateTableRequest, TruncateTableResponse> procedureCall(
RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(),
ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done),
(resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(this, tableName));
}
@Override
public CompletableFuture<Void> enableTable(TableName tableName) {
return this.<EnableTableRequest, EnableTableResponse> procedureCall(RequestConverter
.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(),
new EnableTableProcedureBiConsumer(this, tableName));
}
@Override
public CompletableFuture<HTableDescriptor[]> enableTables(String regex) {
return enableTables(Pattern.compile(regex));
}
@Override
public CompletableFuture<HTableDescriptor[]> enableTables(Pattern pattern) {
return batchTableOperations(pattern, (table) -> enableTable(table), "ENABLE");
}
@Override
public CompletableFuture<Void> disableTable(TableName tableName) {
return this.<DisableTableRequest, DisableTableResponse> procedureCall(RequestConverter
.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()),
(s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(),
new DisableTableProcedureBiConsumer(this, tableName));
}
@Override
public CompletableFuture<HTableDescriptor[]> disableTables(String regex) {
return disableTables(Pattern.compile(regex));
}
@Override
public CompletableFuture<HTableDescriptor[]> disableTables(Pattern pattern) {
return batchTableOperations(pattern, (table) -> disableTable(table), "DISABLE");
}
@Override
@ -462,6 +509,28 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
}
}
private class EnableTableProcedureBiConsumer extends TableProcedureBiConsumer {
EnableTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
super(admin, tableName);
}
String getOperationType() {
return "ENABLE";
}
}
private class DisableTableProcedureBiConsumer extends TableProcedureBiConsumer {
DisableTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
super(admin, tableName);
}
String getOperationType() {
return "DISABLE";
}
}
private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
CompletableFuture<Void> future = new CompletableFuture<>();
procFuture.whenComplete((procId, error) -> {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
@ -46,6 +47,9 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -181,7 +185,8 @@ public class TestAsyncAdmin {
}
private TableState.State getStateFromMeta(TableName table) throws Exception {
Optional<TableState> state = AsyncMetaTableAccessor.getTableState(ASYNC_CONN, table).get();
Optional<TableState> state = AsyncMetaTableAccessor.getTableState(
ASYNC_CONN.getRawTable(META_TABLE_NAME), table).get();
assertTrue(state.isPresent());
return state.get().getState();
}
@ -520,6 +525,164 @@ public class TestAsyncAdmin {
}
}
@Test(timeout = 300000)
public void testDisableAndEnableTable() throws Exception {
final byte[] row = Bytes.toBytes("row");
final byte[] qualifier = Bytes.toBytes("qualifier");
final byte[] value = Bytes.toBytes("value");
final TableName table = TableName.valueOf("testDisableAndEnableTable");
Table ht = TEST_UTIL.createTable(table, HConstants.CATALOG_FAMILY);
Put put = new Put(row);
put.addColumn(HConstants.CATALOG_FAMILY, qualifier, value);
ht.put(put);
Get get = new Get(row);
get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
ht.get(get);
this.admin.disableTable(ht.getName()).join();
assertTrue("Table must be disabled.", TEST_UTIL.getHBaseCluster().getMaster()
.getTableStateManager().isTableState(ht.getName(), TableState.State.DISABLED));
assertEquals(TableState.State.DISABLED, getStateFromMeta(table));
// Test that table is disabled
get = new Get(row);
get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
boolean ok = false;
try {
ht.get(get);
} catch (TableNotEnabledException e) {
ok = true;
}
ok = false;
// verify that scan encounters correct exception
Scan scan = new Scan();
try {
ResultScanner scanner = ht.getScanner(scan);
Result res = null;
do {
res = scanner.next();
} while (res != null);
} catch (TableNotEnabledException e) {
ok = true;
}
assertTrue(ok);
this.admin.enableTable(table).join();
assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster().getMaster()
.getTableStateManager().isTableState(ht.getName(), TableState.State.ENABLED));
assertEquals(TableState.State.ENABLED, getStateFromMeta(table));
// Test that table is enabled
try {
ht.get(get);
} catch (RetriesExhaustedException e) {
ok = false;
}
assertTrue(ok);
ht.close();
}
@Test(timeout = 300000)
public void testDisableAndEnableTables() throws Exception {
final byte[] row = Bytes.toBytes("row");
final byte[] qualifier = Bytes.toBytes("qualifier");
final byte[] value = Bytes.toBytes("value");
final TableName table1 = TableName.valueOf("testDisableAndEnableTable1");
final TableName table2 = TableName.valueOf("testDisableAndEnableTable2");
Table ht1 = TEST_UTIL.createTable(table1, HConstants.CATALOG_FAMILY);
Table ht2 = TEST_UTIL.createTable(table2, HConstants.CATALOG_FAMILY);
Put put = new Put(row);
put.addColumn(HConstants.CATALOG_FAMILY, qualifier, value);
ht1.put(put);
ht2.put(put);
Get get = new Get(row);
get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
ht1.get(get);
ht2.get(get);
this.admin.disableTables("testDisableAndEnableTable.*").join();
// Test that tables are disabled
get = new Get(row);
get.addColumn(HConstants.CATALOG_FAMILY, qualifier);
boolean ok = false;
try {
ht1.get(get);
ht2.get(get);
} catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
ok = true;
}
assertEquals(TableState.State.DISABLED, getStateFromMeta(table1));
assertEquals(TableState.State.DISABLED, getStateFromMeta(table2));
assertTrue(ok);
this.admin.enableTables("testDisableAndEnableTable.*").join();
// Test that tables are enabled
try {
ht1.get(get);
} catch (IOException e) {
ok = false;
}
try {
ht2.get(get);
} catch (IOException e) {
ok = false;
}
assertTrue(ok);
ht1.close();
ht2.close();
assertEquals(TableState.State.ENABLED, getStateFromMeta(table1));
assertEquals(TableState.State.ENABLED, getStateFromMeta(table2));
}
@Test(timeout = 300000)
public void testEnableTableRetainAssignment() throws Exception {
final TableName tableName = TableName.valueOf("testEnableTableAssignment");
byte[][] splitKeys = { new byte[] { 1, 1, 1 }, new byte[] { 2, 2, 2 }, new byte[] { 3, 3, 3 },
new byte[] { 4, 4, 4 }, new byte[] { 5, 5, 5 }, new byte[] { 6, 6, 6 },
new byte[] { 7, 7, 7 }, new byte[] { 8, 8, 8 }, new byte[] { 9, 9, 9 } };
int expectedRegions = splitKeys.length + 1;
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
admin.createTable(desc, splitKeys).join();
try (RegionLocator l = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
List<HRegionLocation> regions = l.getAllRegionLocations();
assertEquals(
"Tried to create " + expectedRegions + " regions " + "but only found " + regions.size(),
expectedRegions, regions.size());
// Disable table.
admin.disableTable(tableName).join();
// Enable table, use retain assignment to assign regions.
admin.enableTable(tableName).join();
List<HRegionLocation> regions2 = l.getAllRegionLocations();
// Check the assignment.
assertEquals(regions.size(), regions2.size());
assertTrue(regions2.containsAll(regions));
}
}
@Test(timeout = 300000)
public void testDisableCatalogTable() throws Exception {
try {
this.admin.disableTable(TableName.META_TABLE_NAME).join();
fail("Expected to throw ConstraintException");
} catch (Exception e) {
}
// Before the fix for HBASE-6146, the below table creation was failing as the hbase:meta table
// actually getting disabled by the disableTable() call.
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testDisableCatalogTable"
.getBytes()));
HColumnDescriptor hcd = new HColumnDescriptor("cf1".getBytes());
htd.addFamily(hcd);
admin.createTable(htd).join();
}
@Test(timeout = 30000)
public void testBalancer() throws Exception {
boolean initialState = admin.isBalancerEnabled().get();