diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index b764726e76a..3d6370564e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -405,6 +405,91 @@ public interface AsyncAdmin { */ CompletableFuture closeRegion(ServerName sn, HRegionInfo hri); + /** + * Get all the online regions on a region server. + */ + CompletableFuture> getOnlineRegions(ServerName sn); + + /** + * Flush a table. + * @param tableName table to flush + */ + CompletableFuture flush(TableName tableName); + + /** + * Flush an individual region. + * @param regionName region to flush + */ + CompletableFuture flushRegion(byte[] regionName); + + /** + * Compact a table. Asynchronous operation even if CompletableFuture.get(). + * @param tableName table to compact + */ + CompletableFuture compact(TableName tableName); + + /** + * Compact a column family within a table. Asynchronous operation even if CompletableFuture.get(). + * @param tableName table to compact + * @param columnFamily column family within a table + */ + CompletableFuture compact(TableName tableName, byte[] columnFamily); + + /** + * Compact an individual region. Asynchronous operation even if CompletableFuture.get(). + * @param regionName region to compact + */ + CompletableFuture compactRegion(byte[] regionName); + + /** + * Compact a column family within a region. Asynchronous operation even if + * CompletableFuture.get(). + * @param regionName region to compact + * @param columnFamily column family within a region + */ + CompletableFuture compactRegion(byte[] regionName, byte[] columnFamily); + + /** + * Major compact a table. Asynchronous operation even if CompletableFuture.get(). + * @param tableName table to major compact + */ + CompletableFuture majorCompact(TableName tableName); + + /** + * Major compact a column family within a table. Asynchronous operation even if + * CompletableFuture.get(). + * @param tableName table to major compact + * @param columnFamily column family within a table + */ + CompletableFuture majorCompact(TableName tableName, byte[] columnFamily); + + /** + * Major compact a table or an individual region. Asynchronous operation even if + * CompletableFuture.get(). + * @param regionName region to major compact + */ + CompletableFuture majorCompactRegion(byte[] regionName); + + /** + * Major compact a column family within region. Asynchronous operation even if + * CompletableFuture.get(). + * @param regionName egion to major compact + * @param columnFamily column family within a region + */ + CompletableFuture majorCompactRegion(byte[] regionName, byte[] columnFamily); + + /** + * Compact all regions on the region server. + * @param sn the region server name + */ + CompletableFuture compactRegionServer(ServerName sn); + + /** + * Compact all regions on the region server. + * @param sn the region server name + */ + CompletableFuture majorCompactRegionServer(ServerName sn); + /** * Merge two regions. * @param nameOfRegionA encoded or full name of region a diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 019d0c6e77c..baad8711cc6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -40,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -80,10 +82,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest; @@ -185,6 +192,7 @@ import org.apache.hadoop.hbase.util.Pair; @InterfaceAudience.Private @InterfaceStability.Evolving public class AsyncHBaseAdmin implements AsyncAdmin { + public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc"; private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class); @@ -853,6 +861,274 @@ public class AsyncHBaseAdmin implements AsyncAdmin { .serverName(sn).call(); } + @Override + public CompletableFuture> getOnlineRegions(ServerName sn) { + return this.> newAdminCaller() + .action((controller, stub) -> this + .> adminCall( + controller, stub, RequestConverter.buildGetOnlineRegionRequest(), + (s, c, req, done) -> s.getOnlineRegion(c, req, done), + resp -> ProtobufUtil.getRegionInfos(resp))) + .serverName(sn).call(); + } + + @Override + public CompletableFuture flush(TableName tableName) { + CompletableFuture future = new CompletableFuture<>(); + tableExists(tableName).whenComplete((exists, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (!exists) { + future.completeExceptionally(new TableNotFoundException(tableName)); + } else { + isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else if (!tableEnabled) { + future.completeExceptionally(new TableNotEnabledException(tableName)); + } else { + execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), + new HashMap<>()).whenComplete((ret, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(ret); + } + }); + } + }); + } + }); + return future; + } + + @Override + public CompletableFuture flushRegion(byte[] regionName) { + CompletableFuture future = new CompletableFuture<>(); + getRegion(regionName).whenComplete((p, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (p == null || p.getFirst() == null) { + future.completeExceptionally( + new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName))); + return; + } + if (p.getSecond() == null) { + future.completeExceptionally( + new NoServerForRegionException(Bytes.toStringBinary(regionName))); + return; + } + + this. newAdminCaller().serverName(p.getSecond()) + .action((controller, stub) -> this + . adminCall(controller, stub, + RequestConverter.buildFlushRegionRequest(p.getFirst().getRegionName()), + (s, c, req, done) -> s.flushRegion(c, req, done), resp -> null)) + .call().whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + @Override + public CompletableFuture compact(TableName tableName) { + return compact(tableName, null, false, CompactType.NORMAL); + } + + @Override + public CompletableFuture compact(TableName tableName, byte[] columnFamily) { + return compact(tableName, columnFamily, false, CompactType.NORMAL); + } + + @Override + public CompletableFuture compactRegion(byte[] regionName) { + return compactRegion(regionName, null, false); + } + + @Override + public CompletableFuture compactRegion(byte[] regionName, byte[] columnFamily) { + return compactRegion(regionName, columnFamily, false); + } + + @Override + public CompletableFuture majorCompact(TableName tableName) { + return compact(tableName, null, true, CompactType.NORMAL); + } + + @Override + public CompletableFuture majorCompact(TableName tableName, byte[] columnFamily) { + return compact(tableName, columnFamily, true, CompactType.NORMAL); + } + + @Override + public CompletableFuture majorCompactRegion(byte[] regionName) { + return compactRegion(regionName, null, true); + } + + @Override + public CompletableFuture majorCompactRegion(byte[] regionName, byte[] columnFamily) { + return compactRegion(regionName, columnFamily, true); + } + + @Override + public CompletableFuture compactRegionServer(ServerName sn) { + return compactRegionServer(sn, false); + } + + @Override + public CompletableFuture majorCompactRegionServer(ServerName sn) { + return compactRegionServer(sn, true); + } + + private CompletableFuture compactRegionServer(ServerName sn, boolean major) { + CompletableFuture future = new CompletableFuture<>(); + getOnlineRegions(sn).whenComplete((hRegionInfos, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + List> compactFutures = new ArrayList<>(); + if (hRegionInfos != null) { + hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null))); + } + CompletableFuture + .allOf(compactFutures.toArray(new CompletableFuture[compactFutures.size()])) + .whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + private CompletableFuture compactRegion(final byte[] regionName, final byte[] columnFamily, + final boolean major) { + CompletableFuture future = new CompletableFuture<>(); + getRegion(regionName).whenComplete((p, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (p == null || p.getFirst() == null) { + future.completeExceptionally( + new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName))); + return; + } + if (p.getSecond() == null) { + // found a region without region server assigned. + future.completeExceptionally( + new NoServerForRegionException(Bytes.toStringBinary(regionName))); + return; + } + compact(p.getSecond(), p.getFirst(), major, columnFamily).whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + /** + * List all region locations for the specific table. + */ + private CompletableFuture> getTableHRegionLocations(TableName tableName) { + CompletableFuture> future = new CompletableFuture<>(); + if (TableName.META_TABLE_NAME.equals(tableName)) { + // For meta table, we use zk to fetch all locations. + AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration()); + registry.getMetaRegionLocation().whenComplete((metaRegions, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (metaRegions == null || metaRegions.isEmpty() + || metaRegions.getDefaultRegionLocation() == null) { + future.completeExceptionally(new IOException("meta region does not found")); + } else { + future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation())); + } + // close the registry. + IOUtils.closeQuietly(registry); + }); + } else { + // For non-meta table, we fetch all locations by scanning hbase:meta table + AsyncMetaTableAccessor.getTableRegionsAndLocations(metaTable, Optional.of(tableName)) + .whenComplete((locations, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else if (locations == null || locations.isEmpty()) { + future.complete(Collections.emptyList()); + } else { + List regionLocations = locations.stream() + .map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond())) + .collect(Collectors.toList()); + future.complete(regionLocations); + } + }); + } + return future; + } + + /** + * Compact column family of a table, Asynchronous operation even if CompletableFuture.get() + */ + private CompletableFuture compact(final TableName tableName, final byte[] columnFamily, + final boolean major, CompactType compactType) { + if (CompactType.MOB.equals(compactType)) { + // TODO support MOB compact. + return failedFuture(new UnsupportedOperationException("MOB compact does not support")); + } + CompletableFuture future = new CompletableFuture<>(); + getTableHRegionLocations(tableName).whenComplete((locations, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + List> compactFutures = new ArrayList<>(); + for (HRegionLocation location : locations) { + if (location.getRegionInfo() == null || location.getRegionInfo().isOffline()) continue; + if (location.getServerName() == null) continue; + compactFutures + .add(compact(location.getServerName(), location.getRegionInfo(), major, columnFamily)); + } + // future complete unless all of the compact futures are completed. + CompletableFuture + .allOf(compactFutures.toArray(new CompletableFuture[compactFutures.size()])) + .whenComplete((ret, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(ret); + } + }); + }); + return future; + } + + /** + * Compact the region at specific region server. + */ + private CompletableFuture compact(final ServerName sn, final HRegionInfo hri, + final boolean major, final byte[] family) { + return this. newAdminCaller().serverName(sn) + .action((controller, stub) -> this + . adminCall(controller, stub, + RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family), + (s, c, req, done) -> s.compactRegion(c, req, done), resp -> null)) + .call(); + } + private byte[] toEncodeRegionName(byte[] regionName) { try { return HRegionInfo.isEncodedRegionName(regionName) ? regionName diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index ff576f71a17..2e62debc8b2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -1943,7 +1943,7 @@ public final class ProtobufUtil { * @param proto the GetOnlineRegionResponse * @return the list of region info or null if proto is null */ - static List getRegionInfos(final GetOnlineRegionResponse proto) { + public static List getRegionInfos(final GetOnlineRegionResponse proto) { if (proto == null) return null; List regionInfos = new ArrayList<>(proto.getRegionInfoList().size()); for (RegionInfo regionInfo: proto.getRegionInfoList()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java index 038d6d4fb74..04bd2244c27 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java @@ -25,6 +25,9 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -40,12 +43,15 @@ import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -55,6 +61,8 @@ import org.junit.experimental.categories.Category; @Category({ MediumTests.class, ClientTests.class }) public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { + public static Random RANDOM = new Random(System.currentTimeMillis()); + private void createTableWithDefaultConf(TableName TABLENAME) throws Exception { HTableDescriptor htd = new HTableDescriptor(TABLENAME); HColumnDescriptor hcd = new HColumnDescriptor("value"); @@ -440,4 +448,223 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { TEST_UTIL.deleteTable(tableName); } } + + @Test + public void testGetOnlineRegions() throws Exception { + final TableName tableName = TableName.valueOf("testGetOnlineRegions"); + try { + createTableAndGetOneRegion(tableName); + AtomicInteger regionServerCount = new AtomicInteger(0); + TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() + .map(rsThread -> rsThread.getRegionServer().getServerName()).forEach(serverName -> { + try { + Assert.assertEquals(admin.getOnlineRegions(serverName).get().size(), + TEST_UTIL.getAdmin().getOnlineRegions(serverName).size()); + } catch (Exception e) { + fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage()); + } + regionServerCount.incrementAndGet(); + }); + Assert.assertEquals(regionServerCount.get(), 2); + } finally { + TEST_UTIL.deleteTable(tableName); + } + } + + @Test + public void testFlushTableAndRegion() throws Exception { + final TableName tableName = TableName.valueOf("testFlushRegion"); + try { + HRegionInfo hri = createTableAndGetOneRegion(tableName); + ServerName serverName = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager() + .getRegionStates().getRegionServerOfRegion(hri); + HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() + .map(rsThread -> rsThread.getRegionServer()) + .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get(); + // write a put into the specific region + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1"))); + } + Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0); + // flush region and wait flush operation finished. + LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName())); + admin.flushRegion(hri.getRegionName()).get(); + LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName())); + Threads.sleepWithoutInterrupt(500); + while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) { + Threads.sleep(50); + } + // check the memstore. + Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0); + + // write another put into the specific region + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2"))); + } + Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0); + admin.flush(tableName).get(); + Threads.sleepWithoutInterrupt(500); + while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) { + Threads.sleep(50); + } + // check the memstore. + Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0); + } finally { + TEST_UTIL.deleteTable(tableName); + } + } + + @Test(timeout = 600000) + public void testCompactRpcAPI() throws Exception { + String tableName = "testCompactRpcAPI"; + compactionTest(tableName, 8, CompactionState.MAJOR, false); + compactionTest(tableName, 15, CompactionState.MINOR, false); + compactionTest(tableName, 8, CompactionState.MAJOR, true); + compactionTest(tableName, 15, CompactionState.MINOR, true); + } + + @Test(timeout = 600000) + public void testCompactRegionServer() throws Exception { + TableName table = TableName.valueOf("testCompactRegionServer"); + byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") }; + Table ht = null; + try { + ht = TEST_UTIL.createTable(table, families); + loadData(ht, families, 3000, 8); + List rsList = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream() + .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList()); + List regions = new ArrayList<>(); + rsList.forEach(rs -> regions.addAll(rs.getOnlineRegions(table))); + Assert.assertEquals(regions.size(), 1); + int countBefore = countStoreFilesInFamilies(regions, families); + Assert.assertTrue(countBefore > 0); + // Minor compaction for all region servers. + for (HRegionServer rs : rsList) + admin.compactRegionServer(rs.getServerName()).get(); + Thread.sleep(5000); + int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families); + Assert.assertTrue(countAfterMinorCompaction < countBefore); + // Major compaction for all region servers. + for (HRegionServer rs : rsList) + admin.majorCompactRegionServer(rs.getServerName()).get(); + Thread.sleep(5000); + int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families); + Assert.assertEquals(countAfterMajorCompaction, 3); + } finally { + if (ht != null) { + TEST_UTIL.deleteTable(table); + } + } + } + + private void compactionTest(final String tableName, final int flushes, + final CompactionState expectedState, boolean singleFamily) throws Exception { + // Create a table with regions + final TableName table = TableName.valueOf(tableName); + byte[] family = Bytes.toBytes("family"); + byte[][] families = + { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) }; + Table ht = null; + try { + ht = TEST_UTIL.createTable(table, families); + loadData(ht, families, 3000, flushes); + List regions = new ArrayList<>(); + TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads() + .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getOnlineRegions(table))); + Assert.assertEquals(regions.size(), 1); + int countBefore = countStoreFilesInFamilies(regions, families); + int countBeforeSingleFamily = countStoreFilesInFamily(regions, family); + assertTrue(countBefore > 0); // there should be some data files + if (expectedState == CompactionState.MINOR) { + if (singleFamily) { + admin.compact(table, family).get(); + } else { + admin.compact(table).get(); + } + } else { + if (singleFamily) { + admin.majorCompact(table, family).get(); + } else { + admin.majorCompact(table).get(); + } + } + long curt = System.currentTimeMillis(); + long waitTime = 5000; + long endt = curt + waitTime; + CompactionState state = TEST_UTIL.getAdmin().getCompactionState(table); + while (state == CompactionState.NONE && curt < endt) { + Thread.sleep(10); + state = TEST_UTIL.getAdmin().getCompactionState(table); + curt = System.currentTimeMillis(); + } + // Now, should have the right compaction state, + // otherwise, the compaction should have already been done + if (expectedState != state) { + for (Region region : regions) { + state = CompactionState.valueOf(region.getCompactionState().toString()); + assertEquals(CompactionState.NONE, state); + } + } else { + // Wait until the compaction is done + state = TEST_UTIL.getAdmin().getCompactionState(table); + while (state != CompactionState.NONE && curt < endt) { + Thread.sleep(10); + state = TEST_UTIL.getAdmin().getCompactionState(table); + } + // Now, compaction should be done. + assertEquals(CompactionState.NONE, state); + } + int countAfter = countStoreFilesInFamilies(regions, families); + int countAfterSingleFamily = countStoreFilesInFamily(regions, family); + assertTrue(countAfter < countBefore); + if (!singleFamily) { + if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter); + else assertTrue(families.length < countAfter); + } else { + int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily; + // assert only change was to single column family + assertTrue(singleFamDiff == (countBefore - countAfter)); + if (expectedState == CompactionState.MAJOR) { + assertTrue(1 == countAfterSingleFamily); + } else { + assertTrue(1 < countAfterSingleFamily); + } + } + } finally { + if (ht != null) { + TEST_UTIL.deleteTable(table); + } + } + } + + private static int countStoreFilesInFamily(List regions, final byte[] family) { + return countStoreFilesInFamilies(regions, new byte[][] { family }); + } + + private static int countStoreFilesInFamilies(List regions, final byte[][] families) { + int count = 0; + for (Region region : regions) { + count += region.getStoreFileList(families).size(); + } + return count; + } + + private static void loadData(final Table ht, final byte[][] families, final int rows, + final int flushes) throws IOException { + List puts = new ArrayList<>(rows); + byte[] qualifier = Bytes.toBytes("val"); + for (int i = 0; i < flushes; i++) { + for (int k = 0; k < rows; k++) { + byte[] row = Bytes.toBytes(RANDOM.nextLong()); + Put p = new Put(row); + for (int j = 0; j < families.length; ++j) { + p.addColumn(families[j], qualifier, row); + } + puts.add(p); + } + ht.put(puts); + TEST_UTIL.flush(); + puts.clear(); + } + } }