HBASE-18283 Provide a construct method which accept a thread pool for AsyncAdmin

This commit is contained in:
Guanghao Zhang 2017-07-04 09:51:41 +08:00
parent 8318a092ac
commit 14f0423b58
15 changed files with 3276 additions and 2908 deletions

View File

@ -46,11 +46,6 @@ import org.apache.hadoop.hbase.util.Pair;
@InterfaceAudience.Public @InterfaceAudience.Public
public interface AsyncAdmin { public interface AsyncAdmin {
/**
* @return Async Connection used by this object.
*/
AsyncConnectionImpl getConnection();
/** /**
* @param tableName Table to check. * @param tableName Table to check.
* @return True if table exists already. The return value will be wrapped by a * @return True if table exists already. The return value will be wrapped by a
@ -105,7 +100,9 @@ public interface AsyncAdmin {
* Creates a new table. * Creates a new table.
* @param desc table descriptor for table * @param desc table descriptor for table
*/ */
CompletableFuture<Void> createTable(TableDescriptor desc); default CompletableFuture<Void> createTable(TableDescriptor desc) {
return createTable(desc, Optional.empty());
}
/** /**
* Creates a new table with the specified number of regions. The start key specified will become * Creates a new table with the specified number of regions. The start key specified will become
@ -128,7 +125,7 @@ public interface AsyncAdmin {
* @param desc table descriptor for table * @param desc table descriptor for table
* @param splitKeys array of split keys for the initial regions of the table * @param splitKeys array of split keys for the initial regions of the table
*/ */
CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys); CompletableFuture<Void> createTable(TableDescriptor desc, Optional<byte[][]> splitKeys);
/** /**
* Deletes a table. * Deletes a table.
@ -186,6 +183,13 @@ public interface AsyncAdmin {
*/ */
CompletableFuture<List<TableDescriptor>> disableTables(Pattern pattern); CompletableFuture<List<TableDescriptor>> disableTables(Pattern pattern);
/**
* @param tableName name of table to check
* @return true if table is on-line. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<Boolean> isTableEnabled(TableName tableName);
/** /**
* @param tableName name of table to check * @param tableName name of table to check
* @return true if table is off-line. The return value will be wrapped by a * @return true if table is off-line. The return value will be wrapped by a
@ -198,7 +202,9 @@ public interface AsyncAdmin {
* @return true if all regions of the table are available. The return value will be wrapped by a * @return true if all regions of the table are available. The return value will be wrapped by a
* {@link CompletableFuture}. * {@link CompletableFuture}.
*/ */
CompletableFuture<Boolean> isTableAvailable(TableName tableName); default CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
return isTableAvailable(tableName, null);
}
/** /**
* Use this api to check if the table has been created with the specified number of splitkeys * Use this api to check if the table has been created with the specified number of splitkeys
@ -274,13 +280,6 @@ public interface AsyncAdmin {
*/ */
CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors(); CompletableFuture<List<NamespaceDescriptor>> listNamespaceDescriptors();
/**
* @param tableName name of table to check
* @return true if table is on-line. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<Boolean> isTableEnabled(TableName tableName);
/** /**
* Turn the load balancer on or off. * Turn the load balancer on or off.
* @param on * @param on
@ -330,7 +329,7 @@ public interface AsyncAdmin {
/** /**
* Get all the online regions on a region server. * Get all the online regions on a region server.
*/ */
CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn); CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName serverName);
/** /**
* Flush a table. * Flush a table.
@ -422,15 +421,15 @@ public interface AsyncAdmin {
/** /**
* Compact all regions on the region server. * Compact all regions on the region server.
* @param sn the region server name * @param serverName the region server name
*/ */
CompletableFuture<Void> compactRegionServer(ServerName sn); CompletableFuture<Void> compactRegionServer(ServerName serverName);
/** /**
* Compact all regions on the region server. * Compact all regions on the region server.
* @param sn the region server name * @param serverName the region server name
*/ */
CompletableFuture<Void> majorCompactRegionServer(ServerName sn); CompletableFuture<Void> majorCompactRegionServer(ServerName serverName);
/** /**
* Merge two regions. * Merge two regions.
@ -563,18 +562,18 @@ public interface AsyncAdmin {
/** /**
* Append the replicable table-cf config of the specified peer * Append the replicable table-cf config of the specified peer
* @param id a short that identifies the cluster * @param peerId a short that identifies the cluster
* @param tableCfs A map from tableName to column family names * @param tableCfs A map from tableName to column family names
*/ */
CompletableFuture<Void> appendReplicationPeerTableCFs(String id, CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs); Map<TableName, ? extends Collection<String>> tableCfs);
/** /**
* Remove some table-cfs from config of the specified peer * Remove some table-cfs from config of the specified peer
* @param id a short name that identifies the cluster * @param peerId a short name that identifies the cluster
* @param tableCfs A map from tableName to column family names * @param tableCfs A map from tableName to column family names
*/ */
CompletableFuture<Void> removeReplicationPeerTableCFs(String id, CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs); Map<TableName, ? extends Collection<String>> tableCfs);
/** /**
@ -613,7 +612,9 @@ public interface AsyncAdmin {
* @param snapshotName name of the snapshot to be created * @param snapshotName name of the snapshot to be created
* @param tableName name of the table for which snapshot is created * @param tableName name of the table for which snapshot is created
*/ */
CompletableFuture<Void> snapshot(String snapshotName, TableName tableName); default CompletableFuture<Void> snapshot(String snapshotName, TableName tableName) {
return snapshot(snapshotName, tableName, SnapshotType.FLUSH);
}
/** /**
* Create typed snapshot of the table. Snapshots are considered unique based on <b>the name of the * Create typed snapshot of the table. Snapshots are considered unique based on <b>the name of the
@ -627,8 +628,10 @@ public interface AsyncAdmin {
* @param tableName name of the table to snapshot * @param tableName name of the table to snapshot
* @param type type of snapshot to take * @param type type of snapshot to take
*/ */
CompletableFuture<Void> snapshot(String snapshotName, TableName tableName, default CompletableFuture<Void> snapshot(String snapshotName, TableName tableName,
SnapshotType type); SnapshotType type) {
return snapshot(new SnapshotDescription(snapshotName, tableName, type));
}
/** /**
* Take a snapshot and wait for the server to complete that snapshot asynchronously. Only a single * Take a snapshot and wait for the server to complete that snapshot asynchronously. Only a single
@ -695,14 +698,16 @@ public interface AsyncAdmin {
* @return a list of snapshot descriptors for completed snapshots wrapped by a * @return a list of snapshot descriptors for completed snapshots wrapped by a
* {@link CompletableFuture} * {@link CompletableFuture}
*/ */
CompletableFuture<List<SnapshotDescription>> listSnapshots(); default CompletableFuture<List<SnapshotDescription>> listSnapshots() {
return listSnapshots(Optional.empty());
}
/** /**
* List all the completed snapshots matching the given pattern. * List all the completed snapshots matching the given pattern.
* @param pattern The compiled regular expression to match against * @param pattern The compiled regular expression to match against
* @return - returns a List of SnapshotDescription wrapped by a {@link CompletableFuture} * @return - returns a List of SnapshotDescription wrapped by a {@link CompletableFuture}
*/ */
CompletableFuture<List<SnapshotDescription>> listSnapshots(Pattern pattern); CompletableFuture<List<SnapshotDescription>> listSnapshots(Optional<Pattern> pattern);
/** /**
* List all the completed snapshots matching the given table name regular expression and snapshot * List all the completed snapshots matching the given table name regular expression and snapshot
@ -725,7 +730,9 @@ public interface AsyncAdmin {
* Delete existing snapshots whose names match the pattern passed. * Delete existing snapshots whose names match the pattern passed.
* @param pattern pattern for names of the snapshot to match * @param pattern pattern for names of the snapshot to match
*/ */
CompletableFuture<Void> deleteSnapshots(Pattern pattern); default CompletableFuture<Void> deleteSnapshots(Pattern pattern) {
return deleteTableSnapshots(null, pattern);
}
/** /**
* Delete all existing snapshots matching the given table name regular expression and snapshot * Delete all existing snapshots matching the given table name regular expression and snapshot

View File

@ -96,11 +96,17 @@ public interface AsyncConnection extends Closeable {
AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool); AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool);
/** /**
* Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned AsyncAdmin * Retrieve an AsyncAdmin implementation to administer an HBase cluster. The returned
* is not guaranteed to be thread-safe. A new instance should be created for each using thread. * {@code CompletableFuture} will be finished directly in the rpc framework's callback thread, so
* This is a lightweight operation. Pooling or caching of the returned AsyncAdmin is not * typically you should not do any time consuming work inside these methods.
* recommended.
* @return an AsyncAdmin instance for cluster administration * @return an AsyncAdmin instance for cluster administration
*/ */
AsyncAdmin getAdmin(); AsyncAdmin getAdmin();
/**
* Retrieve an AsyncAdmin implementation to administer an HBase cluster.
* @param pool the thread pool to use for executing callback
* @return an AsyncAdmin instance for cluster administration
*/
AsyncAdmin getAdmin(ExecutorService pool);
} }

View File

@ -279,6 +279,11 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override @Override
public AsyncAdmin getAdmin() { public AsyncAdmin getAdmin() {
return new AsyncHBaseAdmin(this); return new RawAsyncHBaseAdmin(this);
}
@Override
public AsyncAdmin getAdmin(ExecutorService pool) {
return new AsyncHBaseAdmin(new RawAsyncHBaseAdmin(this), pool);
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -19,11 +19,13 @@ package org.apache.hadoop.hbase.shaded.protobuf;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -1271,19 +1273,26 @@ public final class RequestConverter {
final byte [][] splitKeys, final byte [][] splitKeys,
final long nonceGroup, final long nonceGroup,
final long nonce) { final long nonce) {
return buildCreateTableRequest(hTableDesc, Optional.ofNullable(splitKeys), nonceGroup, nonce);
}
/**
* Creates a protocol buffer CreateTableRequest
* @param hTableDesc
* @param splitKeys
* @return a CreateTableRequest
*/
public static CreateTableRequest buildCreateTableRequest(TableDescriptor hTableDesc,
Optional<byte[][]> splitKeys, long nonceGroup, long nonce) {
CreateTableRequest.Builder builder = CreateTableRequest.newBuilder(); CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
builder.setTableSchema(ProtobufUtil.convertToTableSchema(hTableDesc)); builder.setTableSchema(ProtobufUtil.convertToTableSchema(hTableDesc));
if (splitKeys != null) { splitKeys.ifPresent(keys -> Arrays.stream(keys).forEach(
for (byte [] splitKey : splitKeys) { key -> builder.addSplitKeys(UnsafeByteOperations.unsafeWrap(key))));
builder.addSplitKeys(UnsafeByteOperations.unsafeWrap(splitKey));
}
}
builder.setNonceGroup(nonceGroup); builder.setNonceGroup(nonceGroup);
builder.setNonce(nonce); builder.setNonce(nonce);
return builder.build(); return builder.build();
} }
/** /**
* Creates a protocol buffer ModifyTableRequest * Creates a protocol buffer ModifyTableRequest
* *

View File

@ -19,15 +19,32 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
/** /**
* Class to test AsyncAdmin. * Class to test AsyncAdmin.
@ -36,13 +53,34 @@ public abstract class TestAsyncAdminBase {
protected static final Log LOG = LogFactory.getLog(TestAsyncAdminBase.class); protected static final Log LOG = LogFactory.getLog(TestAsyncAdminBase.class);
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected static byte[] FAMILY = Bytes.toBytes("testFamily"); protected static final byte[] FAMILY = Bytes.toBytes("testFamily");
protected static final byte[] FAMILY_0 = Bytes.toBytes("cf0"); protected static final byte[] FAMILY_0 = Bytes.toBytes("cf0");
protected static final byte[] FAMILY_1 = Bytes.toBytes("cf1"); protected static final byte[] FAMILY_1 = Bytes.toBytes("cf1");
protected static AsyncConnection ASYNC_CONN; protected static AsyncConnection ASYNC_CONN;
protected AsyncAdmin admin; protected AsyncAdmin admin;
@Parameter
public Supplier<AsyncAdmin> getAdmin;
private static AsyncAdmin getRawAsyncAdmin() {
return ASYNC_CONN.getAdmin();
}
private static AsyncAdmin getAsyncAdmin() {
return ASYNC_CONN.getAdmin(ForkJoinPool.commonPool());
}
@Parameters
public static List<Object[]> params() {
return Arrays.asList(new Supplier<?>[] { TestAsyncAdminBase::getRawAsyncAdmin },
new Supplier<?>[] { TestAsyncAdminBase::getAsyncAdmin });
}
@Rule
public TestName testName = new TestName();
protected TableName tableName;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
@ -60,7 +98,43 @@ public abstract class TestAsyncAdminBase {
} }
@Before @Before
public void setUp() throws Exception { public void setUp() {
this.admin = ASYNC_CONN.getAdmin(); admin = ASYNC_CONN.getAdmin();
String methodName = testName.getMethodName();
tableName = TableName.valueOf(methodName.substring(0, methodName.length() - 3));
}
@After
public void tearDown() {
admin.listTableNames(Optional.of(Pattern.compile(tableName.getNameAsString() + ".*")), false)
.whenCompleteAsync((tables, err) -> {
if (tables != null) {
tables.forEach(table -> {
try {
admin.disableTable(table).join();
} catch (Exception e) {
LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
}
admin.deleteTable(table).join();
});
}
}, ForkJoinPool.commonPool()).join();
}
protected void createTableWithDefaultConf(TableName tableName) {
createTableWithDefaultConf(tableName, Optional.empty());
}
protected void createTableWithDefaultConf(TableName tableName, Optional<byte[][]> splitKeys) {
createTableWithDefaultConf(tableName, splitKeys, FAMILY);
}
protected void createTableWithDefaultConf(TableName tableName, Optional<byte[][]> splitKeys,
byte[]... families) {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
for (byte[] family : families) {
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).build());
}
admin.createTable(builder.build(), splitKeys).join();
} }
} }

View File

@ -23,7 +23,10 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({ MediumTests.class, ClientTests.class }) @Category({ MediumTests.class, ClientTests.class })
public class TestAsyncBalancerAdminApi extends TestAsyncAdminBase { public class TestAsyncBalancerAdminApi extends TestAsyncAdminBase {

View File

@ -42,10 +42,13 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/** /**
* Class to test asynchronous namespace admin operations. * Class to test asynchronous namespace admin operations.
*/ */
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class }) @Category({ LargeTests.class, ClientTests.class })
public class TestAsyncNamespaceAdminApi extends TestAsyncAdminBase { public class TestAsyncNamespaceAdminApi extends TestAsyncAdminBase {

View File

@ -66,7 +66,7 @@ public class TestAsyncProcedureAdminApi extends TestAsyncAdminBase {
@Test @Test
public void testExecProcedure() throws Exception { public void testExecProcedure() throws Exception {
TableName tableName = TableName.valueOf("testExecProcedure"); String snapshotString = "offlineTableSnapshot";
try { try {
Table table = TEST_UTIL.createTable(tableName, Bytes.toBytes("cf")); Table table = TEST_UTIL.createTable(tableName, Bytes.toBytes("cf"));
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
@ -74,13 +74,13 @@ public class TestAsyncProcedureAdminApi extends TestAsyncAdminBase {
table.put(put); table.put(put);
} }
// take a snapshot of the enabled table // take a snapshot of the enabled table
String snapshotString = "offlineTableSnapshot";
Map<String, String> props = new HashMap<>(); Map<String, String> props = new HashMap<>();
props.put("table", tableName.getNameAsString()); props.put("table", tableName.getNameAsString());
admin.execProcedure(SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, snapshotString, admin.execProcedure(SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, snapshotString,
props).get(); props).get();
LOG.debug("Snapshot completed."); LOG.debug("Snapshot completed.");
} finally { } finally {
admin.deleteSnapshot(snapshotString).join();
TEST_UTIL.deleteTable(tableName); TEST_UTIL.deleteTable(tableName);
} }
} }

View File

@ -18,10 +18,6 @@
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.quotas.QuotaCache; import org.apache.hadoop.hbase.quotas.QuotaCache;
@ -34,50 +30,35 @@ import org.apache.hadoop.hbase.quotas.ThrottleType;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
@Category({ ClientTests.class, MediumTests.class }) @Category({ ClientTests.class, MediumTests.class })
public class TestAsyncQuotaAdminApi { public class TestAsyncQuotaAdminApi extends TestAsyncAdminBase {
private static final Log LOG = LogFactory.getLog(TestAsyncQuotaAdminApi.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected static AsyncConnection ASYNC_CONN;
protected AsyncAdmin admin;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 2000); TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 2000);
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
} }
@AfterClass
public static void tearDownAfterClass() throws Exception {
IOUtils.closeQuietly(ASYNC_CONN);
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws Exception {
this.admin = ASYNC_CONN.getAdmin();
}
@Test @Test
public void testThrottleType() throws Exception { public void testThrottleType() throws Exception {
String userName = User.getCurrent().getShortName(); String userName = User.getCurrent().getShortName();

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -31,11 +32,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
@ -51,36 +51,29 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/** /**
* Class to test asynchronous region admin operations. * Class to test asynchronous region admin operations.
*/ */
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class }) @Category({ LargeTests.class, ClientTests.class })
public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
public static Random RANDOM = new Random(System.currentTimeMillis()); public static Random RANDOM = new Random(System.currentTimeMillis());
private void createTableWithDefaultConf(TableName TABLENAME) throws Exception {
HTableDescriptor htd = new HTableDescriptor(TABLENAME);
HColumnDescriptor hcd = new HColumnDescriptor("value");
htd.addFamily(hcd);
admin.createTable(htd, null).get();
}
@Test @Test
public void testCloseRegion() throws Exception { public void testCloseRegion() throws Exception {
TableName TABLENAME = TableName.valueOf("TestHBACloseRegion"); createTableWithDefaultConf(tableName);
createTableWithDefaultConf(TABLENAME);
HRegionInfo info = null; HRegionInfo info = null;
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TABLENAME); HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()); List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) { for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.getTable().isSystemTable()) { if (!regionInfo.getTable().isSystemTable()) {
@ -102,16 +95,14 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
@Test @Test
public void testCloseRegionIfInvalidRegionNameIsPassed() throws Exception { public void testCloseRegionIfInvalidRegionNameIsPassed() throws Exception {
final String name = "TestHBACloseRegion1"; createTableWithDefaultConf(tableName);
byte[] TABLENAME = Bytes.toBytes(name);
createTableWithDefaultConf(TableName.valueOf(TABLENAME));
HRegionInfo info = null; HRegionInfo info = null;
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TableName.valueOf(TABLENAME)); HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()); List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) { for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.isMetaTable()) { if (!regionInfo.isMetaTable()) {
if (regionInfo.getRegionNameAsString().contains(name)) { if (regionInfo.getRegionNameAsString().contains(tableName.getNameAsString())) {
info = regionInfo; info = regionInfo;
boolean catchNotServingException = false; boolean catchNotServingException = false;
try { try {
@ -132,10 +123,9 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
@Test @Test
public void testCloseRegionWhenServerNameIsEmpty() throws Exception { public void testCloseRegionWhenServerNameIsEmpty() throws Exception {
byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegionWhenServerNameIsEmpty"); createTableWithDefaultConf(tableName);
createTableWithDefaultConf(TableName.valueOf(TABLENAME));
HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(TableName.valueOf(TABLENAME)); HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices()); List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
for (HRegionInfo regionInfo : onlineRegions) { for (HRegionInfo regionInfo : onlineRegions) {
if (!regionInfo.isMetaTable()) { if (!regionInfo.isMetaTable()) {
@ -147,166 +137,61 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
} }
@Test @Test
public void testGetRegion() throws Exception { public void testGetRegionLocation() throws Exception {
AsyncHBaseAdmin rawAdmin = (AsyncHBaseAdmin) admin; RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
final TableName tableName = TableName.valueOf("testGetRegion");
LOG.info("Started " + tableName);
TEST_UTIL.createMultiRegionTable(tableName, HConstants.CATALOG_FAMILY); TEST_UTIL.createMultiRegionTable(tableName, HConstants.CATALOG_FAMILY);
AsyncTableRegionLocator locator = ASYNC_CONN.getRegionLocator(tableName);
try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { HRegionLocation regionLocation = locator.getRegionLocation(Bytes.toBytes("mmm")).get();
HRegionLocation regionLocation = locator.getRegionLocation(Bytes.toBytes("mmm")); HRegionInfo region = regionLocation.getRegionInfo();
HRegionInfo region = regionLocation.getRegionInfo(); byte[] regionName = regionLocation.getRegionInfo().getRegionName();
byte[] regionName = region.getRegionName(); HRegionLocation location = rawAdmin.getRegionLocation(regionName).get();
HRegionLocation location = rawAdmin.getRegionLocation(regionName).get(); assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName()));
assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName())); location = rawAdmin.getRegionLocation(region.getEncodedNameAsBytes()).get();
location = rawAdmin.getRegionLocation(region.getEncodedNameAsBytes()).get(); assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName()));
assertTrue(Bytes.equals(regionName, location.getRegionInfo().getRegionName()));
}
}
@Test
public void testMergeRegions() throws Exception {
final TableName tableName = TableName.valueOf("testMergeRegions");
HColumnDescriptor cd = new HColumnDescriptor("d");
HTableDescriptor td = new HTableDescriptor(tableName);
td.addFamily(cd);
byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
Admin syncAdmin = TEST_UTIL.getAdmin();
try {
TEST_UTIL.createTable(td, splitRows);
TEST_UTIL.waitTableAvailable(tableName);
List<HRegionInfo> tableRegions;
HRegionInfo regionA;
HRegionInfo regionB;
// merge with full name
tableRegions = syncAdmin.getTableRegions(tableName);
assertEquals(3, syncAdmin.getTableRegions(tableName).size());
regionA = tableRegions.get(0);
regionB = tableRegions.get(1);
admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
assertEquals(2, syncAdmin.getTableRegions(tableName).size());
// merge with encoded name
tableRegions = syncAdmin.getTableRegions(tableName);
regionA = tableRegions.get(0);
regionB = tableRegions.get(1);
admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
assertEquals(1, syncAdmin.getTableRegions(tableName).size());
} finally {
syncAdmin.disableTable(tableName);
syncAdmin.deleteTable(tableName);
}
}
@Test
public void testSplitTable() throws Exception {
splitTests(TableName.valueOf("testSplitTable"), 3000, false, null);
splitTests(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3"));
splitTests(TableName.valueOf("testSplitRegion"), 3000, true, null);
splitTests(TableName.valueOf("testSplitRegionWithSplitPoint"), 3000, true, Bytes.toBytes("3"));
}
private void splitTests(TableName tableName, int rowCount, boolean isSplitRegion,
byte[] splitPoint) throws Exception {
int count = 0;
// create table
HColumnDescriptor cd = new HColumnDescriptor("d");
HTableDescriptor td = new HTableDescriptor(tableName);
td.addFamily(cd);
Table table = TEST_UTIL.createTable(td, null);
TEST_UTIL.waitTableAvailable(tableName);
List<HRegionInfo> regions = TEST_UTIL.getAdmin().getTableRegions(tableName);
assertEquals(regions.size(), 1);
List<Put> puts = new ArrayList<>();
for (int i = 0; i < rowCount; i++) {
Put put = new Put(Bytes.toBytes(i));
put.addColumn(Bytes.toBytes("d"), null, Bytes.toBytes("value" + i));
puts.add(put);
}
table.put(puts);
if (isSplitRegion) {
admin.splitRegion(regions.get(0).getRegionName(), Optional.ofNullable(splitPoint)).get();
} else {
if (splitPoint == null) {
admin.split(tableName).get();
} else {
admin.split(tableName, splitPoint).get();
}
}
for (int i = 0; i < 45; i++) {
try {
List<HRegionInfo> hRegionInfos = TEST_UTIL.getAdmin().getTableRegions(tableName);
count = hRegionInfos.size();
if (count >= 2) {
break;
}
Thread.sleep(1000L);
} catch (Exception e) {
LOG.error(e);
}
}
assertEquals(count, 2);
} }
@Test @Test
public void testAssignRegionAndUnassignRegion() throws Exception { public void testAssignRegionAndUnassignRegion() throws Exception {
final TableName tableName = TableName.valueOf("testAssignRegionAndUnassignRegion"); createTableWithDefaultConf(tableName);
// assign region.
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
AssignmentManager am = master.getAssignmentManager();
HRegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
// assert region on server
RegionStates regionStates = am.getRegionStates();
ServerName serverName = regionStates.getRegionServerOfRegion(hri);
TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
assertTrue(regionStates.getRegionState(hri).isOpened());
// Region is assigned now. Let's assign it again.
// Master should not abort, and region should stay assigned.
admin.assign(hri.getRegionName()).get();
try { try {
// create test table am.waitForAssignment(hri);
HTableDescriptor desc = new HTableDescriptor(tableName); fail("Expected NoSuchProcedureException");
desc.addFamily(new HColumnDescriptor(FAMILY)); } catch (NoSuchProcedureException e) {
admin.createTable(desc).get(); // Expected
// assign region.
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
AssignmentManager am = master.getAssignmentManager();
HRegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
// assert region on server
RegionStates regionStates = am.getRegionStates();
ServerName serverName = regionStates.getRegionServerOfRegion(hri);
TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
assertTrue(regionStates.getRegionState(hri).isOpened());
// Region is assigned now. Let's assign it again.
// Master should not abort, and region should stay assigned.
admin.assign(hri.getRegionName()).get();
try {
am.waitForAssignment(hri);
fail("Expected NoSuchProcedureException");
} catch (NoSuchProcedureException e) {
// Expected
}
assertTrue(regionStates.getRegionState(hri).isOpened());
// unassign region
admin.unassign(hri.getRegionName(), true).get();
try {
am.waitForAssignment(hri);
fail("Expected NoSuchProcedureException");
} catch (NoSuchProcedureException e) {
// Expected
}
assertTrue(regionStates.getRegionState(hri).isClosed());
} finally {
TEST_UTIL.deleteTable(tableName);
} }
assertTrue(regionStates.getRegionState(hri).isOpened());
// unassign region
admin.unassign(hri.getRegionName(), true).get();
try {
am.waitForAssignment(hri);
fail("Expected NoSuchProcedureException");
} catch (NoSuchProcedureException e) {
// Expected
}
assertTrue(regionStates.getRegionState(hri).isClosed());
} }
HRegionInfo createTableAndGetOneRegion(final TableName tableName) HRegionInfo createTableAndGetOneRegion(final TableName tableName)
throws IOException, InterruptedException, ExecutionException { throws IOException, InterruptedException, ExecutionException {
HTableDescriptor desc = new HTableDescriptor(tableName); TableDescriptor desc =
desc.addFamily(new HColumnDescriptor(FAMILY)); TableDescriptorBuilder.newBuilder(tableName)
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build()).build();
admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get(); admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get();
// wait till the table is assigned // wait till the table is assigned
@ -333,280 +218,341 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
// Will cause the Master to tell the regionserver to shut itself down because // Will cause the Master to tell the regionserver to shut itself down because
// regionserver is reporting the state as OPEN. // regionserver is reporting the state as OPEN.
public void testOfflineRegion() throws Exception { public void testOfflineRegion() throws Exception {
final TableName tableName = TableName.valueOf("testOfflineRegion"); HRegionInfo hri = createTableAndGetOneRegion(tableName);
try {
HRegionInfo hri = createTableAndGetOneRegion(tableName);
RegionStates regionStates = RegionStates regionStates =
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
admin.offline(hri.getRegionName()).get(); admin.offline(hri.getRegionName()).get();
long timeoutTime = System.currentTimeMillis() + 3000; long timeoutTime = System.currentTimeMillis() + 3000;
while (true) { while (true) {
if (regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OFFLINE) if (regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OFFLINE)
.contains(hri)) .contains(hri)) break;
break; long now = System.currentTimeMillis();
long now = System.currentTimeMillis(); if (now > timeoutTime) {
if (now > timeoutTime) { fail("Failed to offline the region in time");
fail("Failed to offline the region in time"); break;
break;
}
Thread.sleep(10);
} }
RegionState regionState = regionStates.getRegionState(hri); Thread.sleep(10);
assertTrue(regionState.isOffline());
} finally {
TEST_UTIL.deleteTable(tableName);
} }
RegionState regionState = regionStates.getRegionState(hri);
assertTrue(regionState.isOffline());
} }
@Test @Test
public void testGetRegionByStateOfTable() throws Exception { public void testGetRegionByStateOfTable() throws Exception {
final TableName tableName = TableName.valueOf("testGetRegionByStateOfTable"); HRegionInfo hri = createTableAndGetOneRegion(tableName);
try {
HRegionInfo hri = createTableAndGetOneRegion(tableName);
RegionStates regionStates = RegionStates regionStates =
TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
assertTrue(regionStates.getRegionByStateOfTable(tableName) assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN)
.get(RegionState.State.OPEN) .contains(hri));
.contains(hri)); assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom")) .get(RegionState.State.OPEN).contains(hri));
.get(RegionState.State.OPEN)
.contains(hri));
} finally {
TEST_UTIL.deleteTable(tableName);
}
} }
@Test @Test
public void testMoveRegion() throws Exception { public void testMoveRegion() throws Exception {
final TableName tableName = TableName.valueOf("testMoveRegion"); admin.setBalancerOn(false).join();
try {
HRegionInfo hri = createTableAndGetOneRegion(tableName);
HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); HRegionInfo hri = createTableAndGetOneRegion(tableName);
RegionStates regionStates = master.getAssignmentManager().getRegionStates(); RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
ServerName serverName = regionStates.getRegionServerOfRegion(hri); ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
ServerManager serverManager = master.getServerManager();
ServerName destServerName = null;
List<JVMClusterUtil.RegionServerThread> regionServers =
TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
HRegionServer destServer = regionServer.getRegionServer();
destServerName = destServer.getServerName();
if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
break;
}
}
assertTrue(destServerName != null && !destServerName.equals(serverName));
admin.move(hri.getRegionName(), Optional.of(destServerName)).get();
long timeoutTime = System.currentTimeMillis() + 30000; HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
while (true) { ServerManager serverManager = master.getServerManager();
ServerName sn = regionStates.getRegionServerOfRegion(hri); ServerName destServerName = null;
if (sn != null && sn.equals(destServerName)) { List<JVMClusterUtil.RegionServerThread> regionServers =
TEST_UTIL.assertRegionOnServer(hri, sn, 200); TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
break; for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
} HRegionServer destServer = regionServer.getRegionServer();
long now = System.currentTimeMillis(); destServerName = destServer.getServerName();
if (now > timeoutTime) { if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
fail("Failed to move the region in time: " + regionStates.getRegionState(hri)); break;
}
regionStates.wait(50);
} }
} finally {
TEST_UTIL.deleteTable(tableName);
} }
assertTrue(destServerName != null && !destServerName.equals(serverName));
admin.move(hri.getRegionName(), Optional.of(destServerName)).get();
long timeoutTime = System.currentTimeMillis() + 30000;
while (true) {
ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
if (sn != null && sn.equals(destServerName)) {
break;
}
long now = System.currentTimeMillis();
if (now > timeoutTime) {
fail("Failed to move the region in time: " + hri);
}
Thread.sleep(100);
}
admin.setBalancerOn(true).join();
} }
@Test @Test
public void testGetOnlineRegions() throws Exception { public void testGetOnlineRegions() throws Exception {
final TableName tableName = TableName.valueOf("testGetOnlineRegions"); createTableAndGetOneRegion(tableName);
try { AtomicInteger regionServerCount = new AtomicInteger(0);
createTableAndGetOneRegion(tableName); TEST_UTIL
AtomicInteger regionServerCount = new AtomicInteger(0); .getHBaseCluster()
TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() .getLiveRegionServerThreads()
.map(rsThread -> rsThread.getRegionServer().getServerName()).forEach(serverName -> { .stream()
.map(rsThread -> rsThread.getRegionServer())
.forEach(
rs -> {
ServerName serverName = rs.getServerName();
try { try {
Assert.assertEquals(admin.getOnlineRegions(serverName).get().size(), Assert.assertEquals(admin.getOnlineRegions(serverName).get().size(), rs
TEST_UTIL.getAdmin().getOnlineRegions(serverName).size()); .getOnlineRegions().size());
} catch (Exception e) { } catch (Exception e) {
fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage()); fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
} }
regionServerCount.incrementAndGet(); regionServerCount.incrementAndGet();
}); });
Assert.assertEquals(regionServerCount.get(), 2); Assert.assertEquals(regionServerCount.get(), 2);
} catch (Exception e) {
LOG.info("Exception", e);
throw e;
} finally {
TEST_UTIL.deleteTable(tableName);
}
} }
@Test @Test
public void testFlushTableAndRegion() throws Exception { public void testFlushTableAndRegion() throws Exception {
final TableName tableName = TableName.valueOf("testFlushRegion"); HRegionInfo hri = createTableAndGetOneRegion(tableName);
try { ServerName serverName =
HRegionInfo hri = createTableAndGetOneRegion(tableName); TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
ServerName serverName = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() .getRegionServerOfRegion(hri);
.getRegionStates().getRegionServerOfRegion(hri); HRegionServer regionServer =
HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
.map(rsThread -> rsThread.getRegionServer()) .map(rsThread -> rsThread.getRegionServer())
.filter(rs -> rs.getServerName().equals(serverName)).findFirst().get(); .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
// write a put into the specific region
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")));
}
Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
// flush region and wait flush operation finished.
LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
admin.flushRegion(hri.getRegionName()).get();
LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
Threads.sleepWithoutInterrupt(500);
while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
Threads.sleep(50);
}
// check the memstore.
Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
// write another put into the specific region // write a put into the specific region
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { ASYNC_CONN.getRawTable(tableName)
table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2"))); .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")))
} .join();
Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0); Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
admin.flush(tableName).get(); // flush region and wait flush operation finished.
Threads.sleepWithoutInterrupt(500); LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) { admin.flushRegion(hri.getRegionName()).get();
Threads.sleep(50); LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
} Threads.sleepWithoutInterrupt(500);
// check the memstore. while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0); Threads.sleep(50);
} finally {
TEST_UTIL.deleteTable(tableName);
} }
// check the memstore.
Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
// write another put into the specific region
ASYNC_CONN.getRawTable(tableName)
.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")))
.join();
Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
admin.flush(tableName).get();
Threads.sleepWithoutInterrupt(500);
while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
Threads.sleep(50);
}
// check the memstore.
Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
} }
@Test(timeout = 600000) @Test
public void testCompactRpcAPI() throws Exception { public void testMergeRegions() throws Exception {
String tableName = "testCompactRpcAPI"; byte[][] splitRows = new byte[][] { Bytes.toBytes("3"), Bytes.toBytes("6") };
compactionTest(tableName, 8, CompactionState.MAJOR, false); createTableWithDefaultConf(tableName, Optional.of(splitRows));
compactionTest(tableName, 15, CompactionState.MINOR, false);
compactionTest(tableName, 8, CompactionState.MAJOR, true); RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
compactionTest(tableName, 15, CompactionState.MINOR, true); List<HRegionLocation> regionLocations =
AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
HRegionInfo regionA;
HRegionInfo regionB;
// merge with full name
assertEquals(3, regionLocations.size());
regionA = regionLocations.get(0).getRegionInfo();
regionB = regionLocations.get(1).getRegionInfo();
admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
regionLocations =
AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
assertEquals(2, regionLocations.size());
// merge with encoded name
regionA = regionLocations.get(0).getRegionInfo();
regionB = regionLocations.get(1).getRegionInfo();
admin.mergeRegions(regionA.getRegionName(), regionB.getRegionName(), false).get();
regionLocations =
AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
assertEquals(1, regionLocations.size());
} }
@Test(timeout = 600000) @Test
public void testSplitTable() throws Exception {
splitTest(TableName.valueOf("testSplitTable"), 3000, false, null);
splitTest(TableName.valueOf("testSplitTableWithSplitPoint"), 3000, false, Bytes.toBytes("3"));
splitTest(TableName.valueOf("testSplitTableRegion"), 3000, true, null);
splitTest(TableName.valueOf("testSplitTableRegionWithSplitPoint2"), 3000, true, Bytes.toBytes("3"));
}
private void
splitTest(TableName tableName, int rowCount, boolean isSplitRegion, byte[] splitPoint)
throws Exception {
// create table
createTableWithDefaultConf(tableName);
RawAsyncTable metaTable = ASYNC_CONN.getRawTable(META_TABLE_NAME);
List<HRegionLocation> regionLocations =
AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName)).get();
assertEquals(1, regionLocations.size());
RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
List<Put> puts = new ArrayList<>();
for (int i = 0; i < rowCount; i++) {
Put put = new Put(Bytes.toBytes(i));
put.addColumn(FAMILY, null, Bytes.toBytes("value" + i));
puts.add(put);
}
table.putAll(puts).join();
if (isSplitRegion) {
admin.splitRegion(regionLocations.get(0).getRegionInfo().getRegionName(),
Optional.ofNullable(splitPoint)).get();
} else {
if (splitPoint == null) {
admin.split(tableName).get();
} else {
admin.split(tableName, splitPoint).get();
}
}
int count = 0;
for (int i = 0; i < 45; i++) {
try {
regionLocations =
AsyncMetaTableAccessor.getTableHRegionLocations(metaTable, Optional.of(tableName))
.get();
count = regionLocations.size();
if (count >= 2) {
break;
}
Thread.sleep(1000L);
} catch (Exception e) {
LOG.error(e);
}
}
assertEquals(count, 2);
}
@Test
public void testCompactRegionServer() throws Exception { public void testCompactRegionServer() throws Exception {
TableName table = TableName.valueOf("testCompactRegionServer");
byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") }; byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
Table ht = null; createTableWithDefaultConf(tableName, Optional.empty(), families);
try { loadData(tableName, families, 3000, 8);
ht = TEST_UTIL.createTable(table, families);
loadData(ht, families, 3000, 8); List<HRegionServer> rsList =
List<HRegionServer> rsList = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
.map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList()); .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
List<Region> regions = new ArrayList<>(); List<Region> regions = new ArrayList<>();
rsList.forEach(rs -> regions.addAll(rs.getOnlineRegions(table))); rsList.forEach(rs -> regions.addAll(rs.getOnlineRegions(tableName)));
Assert.assertEquals(regions.size(), 1); Assert.assertEquals(regions.size(), 1);
int countBefore = countStoreFilesInFamilies(regions, families); int countBefore = countStoreFilesInFamilies(regions, families);
Assert.assertTrue(countBefore > 0); Assert.assertTrue(countBefore > 0);
// Minor compaction for all region servers.
for (HRegionServer rs : rsList) // Minor compaction for all region servers.
admin.compactRegionServer(rs.getServerName()).get(); for (HRegionServer rs : rsList)
Thread.sleep(5000); admin.compactRegionServer(rs.getServerName()).get();
int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families); Thread.sleep(5000);
Assert.assertTrue(countAfterMinorCompaction < countBefore); int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
// Major compaction for all region servers. Assert.assertTrue(countAfterMinorCompaction < countBefore);
for (HRegionServer rs : rsList)
admin.majorCompactRegionServer(rs.getServerName()).get(); // Major compaction for all region servers.
Thread.sleep(5000); for (HRegionServer rs : rsList)
int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families); admin.majorCompactRegionServer(rs.getServerName()).get();
Assert.assertEquals(countAfterMajorCompaction, 3); Thread.sleep(5000);
} finally { int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
if (ht != null) { Assert.assertEquals(countAfterMajorCompaction, 3);
TEST_UTIL.deleteTable(table);
}
}
} }
private void compactionTest(final String tableName, final int flushes, @Test
public void testCompact() throws Exception {
compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false);
compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, false);
compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, true);
compactionTest(TableName.valueOf("testCompact4"), 15, CompactionState.MINOR, true);
}
private void compactionTest(final TableName tableName, final int flushes,
final CompactionState expectedState, boolean singleFamily) throws Exception { final CompactionState expectedState, boolean singleFamily) throws Exception {
// Create a table with regions // Create a table with regions
final TableName table = TableName.valueOf(tableName);
byte[] family = Bytes.toBytes("family"); byte[] family = Bytes.toBytes("family");
byte[][] families = byte[][] families =
{ family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) }; { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
Table ht = null; createTableWithDefaultConf(tableName, Optional.empty(), families);
try { loadData(tableName, families, 3000, flushes);
ht = TEST_UTIL.createTable(table, families);
loadData(ht, families, 3000, flushes); List<Region> regions = new ArrayList<>();
List<Region> regions = new ArrayList<>(); TEST_UTIL
TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads() .getHBaseCluster()
.forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getOnlineRegions(table))); .getLiveRegionServerThreads()
Assert.assertEquals(regions.size(), 1); .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getOnlineRegions(tableName)));
int countBefore = countStoreFilesInFamilies(regions, families); Assert.assertEquals(regions.size(), 1);
int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
assertTrue(countBefore > 0); // there should be some data files int countBefore = countStoreFilesInFamilies(regions, families);
if (expectedState == CompactionState.MINOR) { int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
if (singleFamily) { assertTrue(countBefore > 0); // there should be some data files
admin.compact(table, Optional.of(family)).get(); if (expectedState == CompactionState.MINOR) {
} else { if (singleFamily) {
admin.compact(table, Optional.empty()).get(); admin.compact(tableName, Optional.of(family)).get();
}
} else { } else {
if (singleFamily) { admin.compact(tableName, Optional.empty()).get();
admin.majorCompact(table, Optional.of(family)).get();
} else {
admin.majorCompact(table, Optional.empty()).get();
}
} }
long curt = System.currentTimeMillis(); } else {
long waitTime = 5000; if (singleFamily) {
long endt = curt + waitTime; admin.majorCompact(tableName, Optional.of(family)).get();
CompactionState state = TEST_UTIL.getAdmin().getCompactionState(table);
while (state == CompactionState.NONE && curt < endt) {
Thread.sleep(10);
state = TEST_UTIL.getAdmin().getCompactionState(table);
curt = System.currentTimeMillis();
}
// Now, should have the right compaction state,
// otherwise, the compaction should have already been done
if (expectedState != state) {
for (Region region : regions) {
state = CompactionState.valueOf(region.getCompactionState().toString());
assertEquals(CompactionState.NONE, state);
}
} else { } else {
// Wait until the compaction is done admin.majorCompact(tableName, Optional.empty()).get();
state = TEST_UTIL.getAdmin().getCompactionState(table); }
while (state != CompactionState.NONE && curt < endt) { }
Thread.sleep(10);
state = TEST_UTIL.getAdmin().getCompactionState(table); long curt = System.currentTimeMillis();
} long waitTime = 5000;
// Now, compaction should be done. long endt = curt + waitTime;
CompactionState state = TEST_UTIL.getAdmin().getCompactionState(tableName);
while (state == CompactionState.NONE && curt < endt) {
Thread.sleep(10);
state = TEST_UTIL.getAdmin().getCompactionState(tableName);
curt = System.currentTimeMillis();
}
// Now, should have the right compaction state,
// otherwise, the compaction should have already been done
if (expectedState != state) {
for (Region region : regions) {
state = CompactionState.valueOf(region.getCompactionState().toString());
assertEquals(CompactionState.NONE, state); assertEquals(CompactionState.NONE, state);
} }
int countAfter = countStoreFilesInFamilies(regions, families); } else {
int countAfterSingleFamily = countStoreFilesInFamily(regions, family); // Wait until the compaction is done
assertTrue(countAfter < countBefore); state = TEST_UTIL.getAdmin().getCompactionState(tableName);
if (!singleFamily) { while (state != CompactionState.NONE && curt < endt) {
if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter); Thread.sleep(10);
else assertTrue(families.length < countAfter); state = TEST_UTIL.getAdmin().getCompactionState(tableName);
} else {
int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
// assert only change was to single column family
assertTrue(singleFamDiff == (countBefore - countAfter));
if (expectedState == CompactionState.MAJOR) {
assertTrue(1 == countAfterSingleFamily);
} else {
assertTrue(1 < countAfterSingleFamily);
}
} }
} finally { // Now, compaction should be done.
if (ht != null) { assertEquals(CompactionState.NONE, state);
TEST_UTIL.deleteTable(table); }
int countAfter = countStoreFilesInFamilies(regions, families);
int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
assertTrue(countAfter < countBefore);
if (!singleFamily) {
if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
else assertTrue(families.length < countAfter);
} else {
int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
// assert only change was to single column family
assertTrue(singleFamDiff == (countBefore - countAfter));
if (expectedState == CompactionState.MAJOR) {
assertTrue(1 == countAfterSingleFamily);
} else {
assertTrue(1 < countAfterSingleFamily);
} }
} }
} }
@ -623,8 +569,9 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
return count; return count;
} }
private static void loadData(final Table ht, final byte[][] families, final int rows, private static void loadData(final TableName tableName, final byte[][] families, final int rows,
final int flushes) throws IOException { final int flushes) throws IOException {
RawAsyncTable table = ASYNC_CONN.getRawTable(tableName);
List<Put> puts = new ArrayList<>(rows); List<Put> puts = new ArrayList<>(rows);
byte[] qualifier = Bytes.toBytes("val"); byte[] qualifier = Bytes.toBytes("val");
for (int i = 0; i < flushes; i++) { for (int i = 0; i < flushes; i++) {
@ -636,7 +583,7 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
} }
puts.add(p); puts.add(p);
} }
ht.put(puts); table.putAll(puts).join();
TEST_UTIL.flush(); TEST_UTIL.flush();
puts.clear(); puts.clear();
} }

View File

@ -45,10 +45,13 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/** /**
* Class to test asynchronous replication admin operations. * Class to test asynchronous replication admin operations.
*/ */
@RunWith(Parameterized.class)
@Category({LargeTests.class, ClientTests.class}) @Category({LargeTests.class, ClientTests.class})
public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
@ -57,9 +60,6 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
private final String ID_SECOND = "2"; private final String ID_SECOND = "2";
private final String KEY_SECOND = "127.0.0.1:2181:/hbase2"; private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
@Rule
public TestName name = new TestName();
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
@ -142,12 +142,12 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
public void testAppendPeerTableCFs() throws Exception { public void testAppendPeerTableCFs() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE); rpc1.setClusterKey(KEY_ONE);
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5"); final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6"); final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
// Add a valid peer // Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join(); admin.addReplicationPeer(ID_ONE, rpc1).join();
@ -244,10 +244,10 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
public void testRemovePeerTableCFs() throws Exception { public void testRemovePeerTableCFs() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE); rpc1.setClusterKey(KEY_ONE);
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
// Add a valid peer // Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join(); admin.addReplicationPeer(ID_ONE, rpc1).join();
Map<TableName, List<String>> tableCFs = new HashMap<>(); Map<TableName, List<String>> tableCFs = new HashMap<>();
@ -360,8 +360,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
public void testNamespacesAndTableCfsConfigConflict() throws Exception { public void testNamespacesAndTableCfsConfigConflict() throws Exception {
String ns1 = "ns1"; String ns1 = "ns1";
String ns2 = "ns2"; String ns2 = "ns2";
final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName()); final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2"); final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
ReplicationPeerConfig rpc = new ReplicationPeerConfig(); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE); rpc.setClusterKey(KEY_ONE);

View File

@ -29,12 +29,16 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@RunWith(Parameterized.class)
@Category({ LargeTests.class, ClientTests.class }) @Category({ LargeTests.class, ClientTests.class })
public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase { public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
@ -42,15 +46,6 @@ public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
String snapshotName2 = "snapshotName2"; String snapshotName2 = "snapshotName2";
String snapshotName3 = "snapshotName3"; String snapshotName3 = "snapshotName3";
@Rule
public TestName testName = new TestName();
TableName tableName;
@Before
public void setup() {
tableName = TableName.valueOf(testName.getMethodName());
}
@After @After
public void cleanup() throws Exception { public void cleanup() throws Exception {
admin.deleteSnapshots(Pattern.compile(".*")).get(); admin.deleteSnapshots(Pattern.compile(".*")).get();
@ -175,10 +170,13 @@ public class TestAsyncSnapshotAdminApi extends TestAsyncAdminBase {
admin.snapshot(snapshotName3, tableName).get(); admin.snapshot(snapshotName3, tableName).get();
Assert.assertEquals(admin.listSnapshots().get().size(), 3); Assert.assertEquals(admin.listSnapshots().get().size(), 3);
Assert.assertEquals(admin.listSnapshots(Pattern.compile("(.*)")).get().size(), 3); Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("(.*)"))).get().size(), 3);
Assert.assertEquals(admin.listSnapshots(Pattern.compile("snapshotName(\\d+)")).get().size(), 3); Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("snapshotName(\\d+)")))
Assert.assertEquals(admin.listSnapshots(Pattern.compile("snapshotName[1|3]")).get().size(), 2); .get().size(), 3);
Assert.assertEquals(admin.listSnapshots(Pattern.compile("snapshot(.*)")).get().size(), 3); Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("snapshotName[1|3]")))
.get().size(), 2);
Assert.assertEquals(admin.listSnapshots(Optional.of(Pattern.compile("snapshot(.*)"))).get()
.size(), 3);
Assert.assertEquals( Assert.assertEquals(
admin.listTableSnapshots(Pattern.compile("testListSnapshots"), Pattern.compile("s(.*)")).get() admin.listTableSnapshots(Pattern.compile("testListSnapshots"), Pattern.compile("s(.*)")).get()
.size(), .size(),