HBASE-17596 Implement add/delete/modify column family methods
This commit is contained in:
parent
d22bfc0367
commit
5093a49e0b
|
@ -20,10 +20,12 @@ package org.apache.hadoop.hbase.client;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* The asynchronous administrative API for HBase.
|
||||
|
@ -216,6 +218,39 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<HTableDescriptor[]> disableTables(Pattern pattern);
|
||||
|
||||
/**
|
||||
* Get the status of alter command - indicates how many regions have received the updated schema
|
||||
* Asynchronous operation.
|
||||
* @param tableName TableName instance
|
||||
* @return Pair indicating the number of regions updated Pair.getFirst() is the regions that are
|
||||
* yet to be updated Pair.getSecond() is the total number of regions of the table. The
|
||||
* return value will be wrapped by a {@link CompletableFuture}.
|
||||
*/
|
||||
CompletableFuture<Pair<Integer, Integer>> getAlterStatus(final TableName tableName);
|
||||
|
||||
/**
|
||||
* Add a column family to an existing table.
|
||||
* @param tableName name of the table to add column family to
|
||||
* @param columnFamily column family descriptor of column family to be added
|
||||
*/
|
||||
CompletableFuture<Void> addColumnFamily(final TableName tableName,
|
||||
final HColumnDescriptor columnFamily);
|
||||
|
||||
/**
|
||||
* Delete a column family from a table.
|
||||
* @param tableName name of table
|
||||
* @param columnFamily name of column family to be deleted
|
||||
*/
|
||||
CompletableFuture<Void> deleteColumnFamily(final TableName tableName, final byte[] columnFamily);
|
||||
|
||||
/**
|
||||
* Modify an existing column family on a table.
|
||||
* @param tableName name of table
|
||||
* @param columnFamily new column family descriptor to use
|
||||
*/
|
||||
CompletableFuture<Void> modifyColumnFamily(final TableName tableName,
|
||||
final HColumnDescriptor columnFamily);
|
||||
|
||||
/**
|
||||
* Turn the load balancer on or off.
|
||||
* @param on
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.regex.Pattern;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||
|
@ -43,14 +44,20 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback;
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteColumnResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableNamesRequest;
|
||||
|
@ -62,12 +69,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DeleteTabl
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* The implementation of AsyncAdmin.
|
||||
|
@ -369,6 +379,44 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
return batchTableOperations(pattern, (table) -> disableTable(table), "DISABLE");
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Pair<Integer, Integer>> getAlterStatus(TableName tableName) {
|
||||
return this
|
||||
.<Pair<Integer, Integer>> newCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<GetSchemaAlterStatusRequest, GetSchemaAlterStatusResponse, Pair<Integer, Integer>> call(
|
||||
controller, stub, RequestConverter.buildGetSchemaAlterStatusRequest(tableName), (s,
|
||||
c, req, done) -> s.getSchemaAlterStatus(c, req, done), (resp) -> new Pair<>(
|
||||
resp.getYetToUpdateRegions(), resp.getTotalRegions()))).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> addColumnFamily(TableName tableName, HColumnDescriptor columnFamily) {
|
||||
return this.<AddColumnRequest, AddColumnResponse> procedureCall(
|
||||
RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
|
||||
ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(),
|
||||
new AddColumnFamilyProcedureBiConsumer(this, tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> deleteColumnFamily(TableName tableName, byte[] columnFamily) {
|
||||
return this.<DeleteColumnRequest, DeleteColumnResponse> procedureCall(
|
||||
RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
|
||||
ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done),
|
||||
(resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(this, tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> modifyColumnFamily(TableName tableName,
|
||||
HColumnDescriptor columnFamily) {
|
||||
return this.<ModifyColumnRequest, ModifyColumnResponse> procedureCall(
|
||||
RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(),
|
||||
ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done),
|
||||
(resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(this, tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> setBalancerRunning(final boolean on) {
|
||||
return this
|
||||
|
@ -531,6 +579,39 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
}
|
||||
|
||||
private class AddColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
|
||||
|
||||
AddColumnFamilyProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
|
||||
super(admin, tableName);
|
||||
}
|
||||
|
||||
String getOperationType() {
|
||||
return "ADD_COLUMN_FAMILY";
|
||||
}
|
||||
}
|
||||
|
||||
private class DeleteColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
|
||||
|
||||
DeleteColumnFamilyProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
|
||||
super(admin, tableName);
|
||||
}
|
||||
|
||||
String getOperationType() {
|
||||
return "DELETE_COLUMN_FAMILY";
|
||||
}
|
||||
}
|
||||
|
||||
private class ModifyColumnFamilyProcedureBiConsumer extends TableProcedureBiConsumer {
|
||||
|
||||
ModifyColumnFamilyProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
|
||||
super(admin, tableName);
|
||||
}
|
||||
|
||||
String getOperationType() {
|
||||
return "MODIFY_COLUMN_FAMILY";
|
||||
}
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
procFuture.whenComplete((procId, error) -> {
|
||||
|
|
|
@ -32,12 +32,14 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
|
@ -45,15 +47,20 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||
import org.apache.hadoop.hbase.constraint.ConstraintException;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -68,6 +75,8 @@ public class TestAsyncAdmin {
|
|||
private static final Log LOG = LogFactory.getLog(TestAdmin1.class);
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static byte[] FAMILY = Bytes.toBytes("testFamily");
|
||||
private static final byte[] FAMILY_0 = Bytes.toBytes("cf0");
|
||||
private static final byte[] FAMILY_1 = Bytes.toBytes("cf1");
|
||||
|
||||
private static AsyncConnection ASYNC_CONN;
|
||||
private AsyncAdmin admin;
|
||||
|
@ -683,6 +692,189 @@ public class TestAsyncAdmin {
|
|||
admin.createTable(htd).join();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddColumnFamily() throws IOException {
|
||||
TableName TABLE_NAME = TableName.valueOf("testAddColumnFamily");
|
||||
// Create a table with two families
|
||||
HTableDescriptor baseHtd = new HTableDescriptor(TABLE_NAME);
|
||||
baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
|
||||
admin.createTable(baseHtd).join();
|
||||
admin.disableTable(TABLE_NAME).join();
|
||||
try {
|
||||
// Verify the table descriptor
|
||||
verifyTableDescriptor(TABLE_NAME, FAMILY_0);
|
||||
|
||||
// Modify the table removing one family and verify the descriptor
|
||||
admin.addColumnFamily(TABLE_NAME, new HColumnDescriptor(FAMILY_1)).join();
|
||||
verifyTableDescriptor(TABLE_NAME, FAMILY_0, FAMILY_1);
|
||||
} finally {
|
||||
admin.deleteTable(TABLE_NAME);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddSameColumnFamilyTwice() throws Exception {
|
||||
TableName TABLE_NAME = TableName.valueOf("testAddSameColumnFamilyTwice");
|
||||
// Create a table with one families
|
||||
HTableDescriptor baseHtd = new HTableDescriptor(TABLE_NAME);
|
||||
baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
|
||||
admin.createTable(baseHtd).join();
|
||||
admin.disableTable(TABLE_NAME).join();
|
||||
try {
|
||||
// Verify the table descriptor
|
||||
verifyTableDescriptor(TABLE_NAME, FAMILY_0);
|
||||
|
||||
// Modify the table removing one family and verify the descriptor
|
||||
this.admin.addColumnFamily(TABLE_NAME, new HColumnDescriptor(FAMILY_1)).join();
|
||||
verifyTableDescriptor(TABLE_NAME, FAMILY_0, FAMILY_1);
|
||||
|
||||
try {
|
||||
// Add same column family again - expect failure
|
||||
this.admin.addColumnFamily(TABLE_NAME, new HColumnDescriptor(FAMILY_1)).join();
|
||||
Assert.fail("Delete a non-exist column family should fail");
|
||||
} catch (Exception e) {
|
||||
// Expected.
|
||||
}
|
||||
} finally {
|
||||
admin.deleteTable(TABLE_NAME).join();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testModifyColumnFamily() throws Exception {
|
||||
TableName TABLE_NAME = TableName.valueOf("testModifyColumnFamily");
|
||||
|
||||
HColumnDescriptor cfDescriptor = new HColumnDescriptor(FAMILY_0);
|
||||
int blockSize = cfDescriptor.getBlocksize();
|
||||
// Create a table with one families
|
||||
HTableDescriptor baseHtd = new HTableDescriptor(TABLE_NAME);
|
||||
baseHtd.addFamily(cfDescriptor);
|
||||
admin.createTable(baseHtd).join();
|
||||
admin.disableTable(TABLE_NAME).join();
|
||||
try {
|
||||
// Verify the table descriptor
|
||||
verifyTableDescriptor(TABLE_NAME, FAMILY_0);
|
||||
|
||||
int newBlockSize = 2 * blockSize;
|
||||
cfDescriptor.setBlocksize(newBlockSize);
|
||||
|
||||
// Modify colymn family
|
||||
admin.modifyColumnFamily(TABLE_NAME, cfDescriptor).join();
|
||||
|
||||
HTableDescriptor htd = admin.getTableDescriptor(TABLE_NAME).get();
|
||||
HColumnDescriptor hcfd = htd.getFamily(FAMILY_0);
|
||||
assertTrue(hcfd.getBlocksize() == newBlockSize);
|
||||
} finally {
|
||||
admin.deleteTable(TABLE_NAME).join();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testModifyNonExistingColumnFamily() throws IOException {
|
||||
TableName TABLE_NAME = TableName.valueOf("testModifyNonExistingColumnFamily");
|
||||
|
||||
HColumnDescriptor cfDescriptor = new HColumnDescriptor(FAMILY_1);
|
||||
int blockSize = cfDescriptor.getBlocksize();
|
||||
// Create a table with one families
|
||||
HTableDescriptor baseHtd = new HTableDescriptor(TABLE_NAME);
|
||||
baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
|
||||
admin.createTable(baseHtd).join();
|
||||
admin.disableTable(TABLE_NAME).join();
|
||||
try {
|
||||
// Verify the table descriptor
|
||||
verifyTableDescriptor(TABLE_NAME, FAMILY_0);
|
||||
|
||||
int newBlockSize = 2 * blockSize;
|
||||
cfDescriptor.setBlocksize(newBlockSize);
|
||||
|
||||
// Modify a column family that is not in the table.
|
||||
try {
|
||||
admin.modifyColumnFamily(TABLE_NAME, cfDescriptor).join();
|
||||
Assert.fail("Modify a non-exist column family should fail");
|
||||
} catch (Exception e) {
|
||||
// Expected.
|
||||
}
|
||||
} finally {
|
||||
admin.deleteTable(TABLE_NAME).join();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteColumnFamily() throws IOException {
|
||||
TableName TABLE_NAME = TableName.valueOf("testDeleteColumnFamily");
|
||||
// Create a table with two families
|
||||
HTableDescriptor baseHtd = new HTableDescriptor(TABLE_NAME);
|
||||
baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
|
||||
baseHtd.addFamily(new HColumnDescriptor(FAMILY_1));
|
||||
admin.createTable(baseHtd).join();
|
||||
admin.disableTable(TABLE_NAME).join();
|
||||
try {
|
||||
// Verify the table descriptor
|
||||
verifyTableDescriptor(TABLE_NAME, FAMILY_0, FAMILY_1);
|
||||
|
||||
// Modify the table removing one family and verify the descriptor
|
||||
admin.deleteColumnFamily(TABLE_NAME, FAMILY_1).join();
|
||||
verifyTableDescriptor(TABLE_NAME, FAMILY_0);
|
||||
} finally {
|
||||
admin.deleteTable(TABLE_NAME).join();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteSameColumnFamilyTwice() throws IOException {
|
||||
TableName TABLE_NAME = TableName.valueOf("testDeleteSameColumnFamilyTwice");
|
||||
// Create a table with two families
|
||||
HTableDescriptor baseHtd = new HTableDescriptor(TABLE_NAME);
|
||||
baseHtd.addFamily(new HColumnDescriptor(FAMILY_0));
|
||||
baseHtd.addFamily(new HColumnDescriptor(FAMILY_1));
|
||||
admin.createTable(baseHtd).join();
|
||||
admin.disableTable(TABLE_NAME).join();
|
||||
try {
|
||||
// Verify the table descriptor
|
||||
verifyTableDescriptor(TABLE_NAME, FAMILY_0, FAMILY_1);
|
||||
|
||||
// Modify the table removing one family and verify the descriptor
|
||||
admin.deleteColumnFamily(TABLE_NAME, FAMILY_1).join();
|
||||
verifyTableDescriptor(TABLE_NAME, FAMILY_0);
|
||||
|
||||
try {
|
||||
// Delete again - expect failure
|
||||
admin.deleteColumnFamily(TABLE_NAME, FAMILY_1).join();
|
||||
Assert.fail("Delete a non-exist column family should fail");
|
||||
} catch (Exception e) {
|
||||
// Expected.
|
||||
}
|
||||
} finally {
|
||||
admin.deleteTable(TABLE_NAME).join();
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyTableDescriptor(final TableName tableName, final byte[]... families)
|
||||
throws IOException {
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
|
||||
// Verify descriptor from master
|
||||
HTableDescriptor htd = admin.getTableDescriptor(tableName);
|
||||
verifyTableDescriptor(htd, tableName, families);
|
||||
|
||||
// Verify descriptor from HDFS
|
||||
MasterFileSystem mfs = TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem();
|
||||
Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), tableName);
|
||||
HTableDescriptor td = FSTableDescriptors
|
||||
.getTableDescriptorFromFs(mfs.getFileSystem(), tableDir);
|
||||
verifyTableDescriptor(td, tableName, families);
|
||||
}
|
||||
|
||||
private void verifyTableDescriptor(final HTableDescriptor htd, final TableName tableName,
|
||||
final byte[]... families) {
|
||||
Set<byte[]> htdFamilies = htd.getFamiliesKeys();
|
||||
assertEquals(tableName, htd.getTableName());
|
||||
assertEquals(families.length, htdFamilies.size());
|
||||
for (byte[] familyName : families) {
|
||||
assertTrue("Expected family " + Bytes.toString(familyName), htdFamilies.contains(familyName));
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testBalancer() throws Exception {
|
||||
boolean initialState = admin.isBalancerEnabled().get();
|
||||
|
|
Loading…
Reference in New Issue