HBASE-17667: Implement async flush/compact region methods
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
01af27061e
commit
2026540ea3
|
@ -405,6 +405,91 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<Void> closeRegion(ServerName sn, HRegionInfo hri);
|
||||
|
||||
/**
|
||||
* Get all the online regions on a region server.
|
||||
*/
|
||||
CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn);
|
||||
|
||||
/**
|
||||
* Flush a table.
|
||||
* @param tableName table to flush
|
||||
*/
|
||||
CompletableFuture<Void> flush(TableName tableName);
|
||||
|
||||
/**
|
||||
* Flush an individual region.
|
||||
* @param regionName region to flush
|
||||
*/
|
||||
CompletableFuture<Void> flushRegion(byte[] regionName);
|
||||
|
||||
/**
|
||||
* Compact a table. Asynchronous operation even if CompletableFuture.get().
|
||||
* @param tableName table to compact
|
||||
*/
|
||||
CompletableFuture<Void> compact(TableName tableName);
|
||||
|
||||
/**
|
||||
* Compact a column family within a table. Asynchronous operation even if CompletableFuture.get().
|
||||
* @param tableName table to compact
|
||||
* @param columnFamily column family within a table
|
||||
*/
|
||||
CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily);
|
||||
|
||||
/**
|
||||
* Compact an individual region. Asynchronous operation even if CompletableFuture.get().
|
||||
* @param regionName region to compact
|
||||
*/
|
||||
CompletableFuture<Void> compactRegion(byte[] regionName);
|
||||
|
||||
/**
|
||||
* Compact a column family within a region. Asynchronous operation even if
|
||||
* CompletableFuture.get().
|
||||
* @param regionName region to compact
|
||||
* @param columnFamily column family within a region
|
||||
*/
|
||||
CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily);
|
||||
|
||||
/**
|
||||
* Major compact a table. Asynchronous operation even if CompletableFuture.get().
|
||||
* @param tableName table to major compact
|
||||
*/
|
||||
CompletableFuture<Void> majorCompact(TableName tableName);
|
||||
|
||||
/**
|
||||
* Major compact a column family within a table. Asynchronous operation even if
|
||||
* CompletableFuture.get().
|
||||
* @param tableName table to major compact
|
||||
* @param columnFamily column family within a table
|
||||
*/
|
||||
CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily);
|
||||
|
||||
/**
|
||||
* Major compact a table or an individual region. Asynchronous operation even if
|
||||
* CompletableFuture.get().
|
||||
* @param regionName region to major compact
|
||||
*/
|
||||
CompletableFuture<Void> majorCompactRegion(byte[] regionName);
|
||||
|
||||
/**
|
||||
* Major compact a column family within region. Asynchronous operation even if
|
||||
* CompletableFuture.get().
|
||||
* @param regionName egion to major compact
|
||||
* @param columnFamily column family within a region
|
||||
*/
|
||||
CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily);
|
||||
|
||||
/**
|
||||
* Compact all regions on the region server.
|
||||
* @param sn the region server name
|
||||
*/
|
||||
CompletableFuture<Void> compactRegionServer(ServerName sn);
|
||||
|
||||
/**
|
||||
* Compact all regions on the region server.
|
||||
* @param sn the region server name
|
||||
*/
|
||||
CompletableFuture<Void> majorCompactRegionServer(ServerName sn);
|
||||
|
||||
/**
|
||||
* Merge two regions.
|
||||
* @param nameOfRegionA encoded or full name of region a
|
||||
|
|
|
@ -40,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
|
||||
import io.netty.util.Timeout;
|
||||
import io.netty.util.TimerTask;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
|
@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.TableExistsException;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
@ -80,10 +82,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SplitRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AbortProcedureRequest;
|
||||
|
@ -185,6 +192,7 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class AsyncHBaseAdmin implements AsyncAdmin {
|
||||
public static final String FLUSH_TABLE_PROCEDURE_SIGNATURE = "flush-table-proc";
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(AsyncHBaseAdmin.class);
|
||||
|
||||
|
@ -853,6 +861,274 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
.serverName(sn).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<List<HRegionInfo>> getOnlineRegions(ServerName sn) {
|
||||
return this.<List<HRegionInfo>> newAdminCaller()
|
||||
.action((controller, stub) -> this
|
||||
.<GetOnlineRegionRequest, GetOnlineRegionResponse, List<HRegionInfo>> adminCall(
|
||||
controller, stub, RequestConverter.buildGetOnlineRegionRequest(),
|
||||
(s, c, req, done) -> s.getOnlineRegion(c, req, done),
|
||||
resp -> ProtobufUtil.getRegionInfos(resp)))
|
||||
.serverName(sn).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> flush(TableName tableName) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
tableExists(tableName).whenComplete((exists, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
} else if (!exists) {
|
||||
future.completeExceptionally(new TableNotFoundException(tableName));
|
||||
} else {
|
||||
isTableEnabled(tableName).whenComplete((tableEnabled, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else if (!tableEnabled) {
|
||||
future.completeExceptionally(new TableNotEnabledException(tableName));
|
||||
} else {
|
||||
execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
|
||||
new HashMap<>()).whenComplete((ret, err3) -> {
|
||||
if (err3 != null) {
|
||||
future.completeExceptionally(err3);
|
||||
} else {
|
||||
future.complete(ret);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> flushRegion(byte[] regionName) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
getRegion(regionName).whenComplete((p, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
if (p == null || p.getFirst() == null) {
|
||||
future.completeExceptionally(
|
||||
new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)));
|
||||
return;
|
||||
}
|
||||
if (p.getSecond() == null) {
|
||||
future.completeExceptionally(
|
||||
new NoServerForRegionException(Bytes.toStringBinary(regionName)));
|
||||
return;
|
||||
}
|
||||
|
||||
this.<Void> newAdminCaller().serverName(p.getSecond())
|
||||
.action((controller, stub) -> this
|
||||
.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(controller, stub,
|
||||
RequestConverter.buildFlushRegionRequest(p.getFirst().getRegionName()),
|
||||
(s, c, req, done) -> s.flushRegion(c, req, done), resp -> null))
|
||||
.call().whenComplete((ret, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else {
|
||||
future.complete(ret);
|
||||
}
|
||||
});
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> compact(TableName tableName) {
|
||||
return compact(tableName, null, false, CompactType.NORMAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily) {
|
||||
return compact(tableName, columnFamily, false, CompactType.NORMAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> compactRegion(byte[] regionName) {
|
||||
return compactRegion(regionName, null, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> compactRegion(byte[] regionName, byte[] columnFamily) {
|
||||
return compactRegion(regionName, columnFamily, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> majorCompact(TableName tableName) {
|
||||
return compact(tableName, null, true, CompactType.NORMAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> majorCompact(TableName tableName, byte[] columnFamily) {
|
||||
return compact(tableName, columnFamily, true, CompactType.NORMAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> majorCompactRegion(byte[] regionName) {
|
||||
return compactRegion(regionName, null, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> majorCompactRegion(byte[] regionName, byte[] columnFamily) {
|
||||
return compactRegion(regionName, columnFamily, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> compactRegionServer(ServerName sn) {
|
||||
return compactRegionServer(sn, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> majorCompactRegionServer(ServerName sn) {
|
||||
return compactRegionServer(sn, true);
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> compactRegionServer(ServerName sn, boolean major) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
getOnlineRegions(sn).whenComplete((hRegionInfos, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
|
||||
if (hRegionInfos != null) {
|
||||
hRegionInfos.forEach(region -> compactFutures.add(compact(sn, region, major, null)));
|
||||
}
|
||||
CompletableFuture
|
||||
.allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
|
||||
.whenComplete((ret, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else {
|
||||
future.complete(ret);
|
||||
}
|
||||
});
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> compactRegion(final byte[] regionName, final byte[] columnFamily,
|
||||
final boolean major) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
getRegion(regionName).whenComplete((p, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
if (p == null || p.getFirst() == null) {
|
||||
future.completeExceptionally(
|
||||
new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)));
|
||||
return;
|
||||
}
|
||||
if (p.getSecond() == null) {
|
||||
// found a region without region server assigned.
|
||||
future.completeExceptionally(
|
||||
new NoServerForRegionException(Bytes.toStringBinary(regionName)));
|
||||
return;
|
||||
}
|
||||
compact(p.getSecond(), p.getFirst(), major, columnFamily).whenComplete((ret, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else {
|
||||
future.complete(ret);
|
||||
}
|
||||
});
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* List all region locations for the specific table.
|
||||
*/
|
||||
private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableName tableName) {
|
||||
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
|
||||
if (TableName.META_TABLE_NAME.equals(tableName)) {
|
||||
// For meta table, we use zk to fetch all locations.
|
||||
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
|
||||
registry.getMetaRegionLocation().whenComplete((metaRegions, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
} else if (metaRegions == null || metaRegions.isEmpty()
|
||||
|| metaRegions.getDefaultRegionLocation() == null) {
|
||||
future.completeExceptionally(new IOException("meta region does not found"));
|
||||
} else {
|
||||
future.complete(Collections.singletonList(metaRegions.getDefaultRegionLocation()));
|
||||
}
|
||||
// close the registry.
|
||||
IOUtils.closeQuietly(registry);
|
||||
});
|
||||
} else {
|
||||
// For non-meta table, we fetch all locations by scanning hbase:meta table
|
||||
AsyncMetaTableAccessor.getTableRegionsAndLocations(metaTable, Optional.of(tableName))
|
||||
.whenComplete((locations, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
} else if (locations == null || locations.isEmpty()) {
|
||||
future.complete(Collections.emptyList());
|
||||
} else {
|
||||
List<HRegionLocation> regionLocations = locations.stream()
|
||||
.map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
|
||||
.collect(Collectors.toList());
|
||||
future.complete(regionLocations);
|
||||
}
|
||||
});
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compact column family of a table, Asynchronous operation even if CompletableFuture.get()
|
||||
*/
|
||||
private CompletableFuture<Void> compact(final TableName tableName, final byte[] columnFamily,
|
||||
final boolean major, CompactType compactType) {
|
||||
if (CompactType.MOB.equals(compactType)) {
|
||||
// TODO support MOB compact.
|
||||
return failedFuture(new UnsupportedOperationException("MOB compact does not support"));
|
||||
}
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
getTableHRegionLocations(tableName).whenComplete((locations, err) -> {
|
||||
if (err != null) {
|
||||
future.completeExceptionally(err);
|
||||
return;
|
||||
}
|
||||
List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
|
||||
for (HRegionLocation location : locations) {
|
||||
if (location.getRegionInfo() == null || location.getRegionInfo().isOffline()) continue;
|
||||
if (location.getServerName() == null) continue;
|
||||
compactFutures
|
||||
.add(compact(location.getServerName(), location.getRegionInfo(), major, columnFamily));
|
||||
}
|
||||
// future complete unless all of the compact futures are completed.
|
||||
CompletableFuture
|
||||
.allOf(compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()]))
|
||||
.whenComplete((ret, err2) -> {
|
||||
if (err2 != null) {
|
||||
future.completeExceptionally(err2);
|
||||
} else {
|
||||
future.complete(ret);
|
||||
}
|
||||
});
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compact the region at specific region server.
|
||||
*/
|
||||
private CompletableFuture<Void> compact(final ServerName sn, final HRegionInfo hri,
|
||||
final boolean major, final byte[] family) {
|
||||
return this.<Void> newAdminCaller().serverName(sn)
|
||||
.action((controller, stub) -> this
|
||||
.<CompactRegionRequest, CompactRegionResponse, Void> adminCall(controller, stub,
|
||||
RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major, family),
|
||||
(s, c, req, done) -> s.compactRegion(c, req, done), resp -> null))
|
||||
.call();
|
||||
}
|
||||
|
||||
private byte[] toEncodeRegionName(byte[] regionName) {
|
||||
try {
|
||||
return HRegionInfo.isEncodedRegionName(regionName) ? regionName
|
||||
|
|
|
@ -1943,7 +1943,7 @@ public final class ProtobufUtil {
|
|||
* @param proto the GetOnlineRegionResponse
|
||||
* @return the list of region info or null if <code>proto</code> is null
|
||||
*/
|
||||
static List<HRegionInfo> getRegionInfos(final GetOnlineRegionResponse proto) {
|
||||
public static List<HRegionInfo> getRegionInfos(final GetOnlineRegionResponse proto) {
|
||||
if (proto == null) return null;
|
||||
List<HRegionInfo> regionInfos = new ArrayList<>(proto.getRegionInfoList().size());
|
||||
for (RegionInfo regionInfo: proto.getRegionInfoList()) {
|
||||
|
|
|
@ -25,6 +25,9 @@ import static org.junit.Assert.fail;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -40,12 +43,15 @@ import org.apache.hadoop.hbase.master.RegionState;
|
|||
import org.apache.hadoop.hbase.master.RegionStates;
|
||||
import org.apache.hadoop.hbase.master.ServerManager;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
|
@ -55,6 +61,8 @@ import org.junit.experimental.categories.Category;
|
|||
@Category({ MediumTests.class, ClientTests.class })
|
||||
public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
|
||||
|
||||
public static Random RANDOM = new Random(System.currentTimeMillis());
|
||||
|
||||
private void createTableWithDefaultConf(TableName TABLENAME) throws Exception {
|
||||
HTableDescriptor htd = new HTableDescriptor(TABLENAME);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor("value");
|
||||
|
@ -440,4 +448,223 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
|
|||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetOnlineRegions() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("testGetOnlineRegions");
|
||||
try {
|
||||
createTableAndGetOneRegion(tableName);
|
||||
AtomicInteger regionServerCount = new AtomicInteger(0);
|
||||
TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
|
||||
.map(rsThread -> rsThread.getRegionServer().getServerName()).forEach(serverName -> {
|
||||
try {
|
||||
Assert.assertEquals(admin.getOnlineRegions(serverName).get().size(),
|
||||
TEST_UTIL.getAdmin().getOnlineRegions(serverName).size());
|
||||
} catch (Exception e) {
|
||||
fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
|
||||
}
|
||||
regionServerCount.incrementAndGet();
|
||||
});
|
||||
Assert.assertEquals(regionServerCount.get(), 2);
|
||||
} finally {
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushTableAndRegion() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("testFlushRegion");
|
||||
try {
|
||||
HRegionInfo hri = createTableAndGetOneRegion(tableName);
|
||||
ServerName serverName = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager()
|
||||
.getRegionStates().getRegionServerOfRegion(hri);
|
||||
HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
|
||||
.map(rsThread -> rsThread.getRegionServer())
|
||||
.filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
|
||||
// write a put into the specific region
|
||||
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
|
||||
table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")));
|
||||
}
|
||||
Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
|
||||
// flush region and wait flush operation finished.
|
||||
LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
|
||||
admin.flushRegion(hri.getRegionName()).get();
|
||||
LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
|
||||
Threads.sleepWithoutInterrupt(500);
|
||||
while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
|
||||
Threads.sleep(50);
|
||||
}
|
||||
// check the memstore.
|
||||
Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
|
||||
|
||||
// write another put into the specific region
|
||||
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
|
||||
table.put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")));
|
||||
}
|
||||
Assert.assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0);
|
||||
admin.flush(tableName).get();
|
||||
Threads.sleepWithoutInterrupt(500);
|
||||
while (regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize() > 0) {
|
||||
Threads.sleep(50);
|
||||
}
|
||||
// check the memstore.
|
||||
Assert.assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemstoreSize(), 0);
|
||||
} finally {
|
||||
TEST_UTIL.deleteTable(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 600000)
|
||||
public void testCompactRpcAPI() throws Exception {
|
||||
String tableName = "testCompactRpcAPI";
|
||||
compactionTest(tableName, 8, CompactionState.MAJOR, false);
|
||||
compactionTest(tableName, 15, CompactionState.MINOR, false);
|
||||
compactionTest(tableName, 8, CompactionState.MAJOR, true);
|
||||
compactionTest(tableName, 15, CompactionState.MINOR, true);
|
||||
}
|
||||
|
||||
@Test(timeout = 600000)
|
||||
public void testCompactRegionServer() throws Exception {
|
||||
TableName table = TableName.valueOf("testCompactRegionServer");
|
||||
byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
|
||||
Table ht = null;
|
||||
try {
|
||||
ht = TEST_UTIL.createTable(table, families);
|
||||
loadData(ht, families, 3000, 8);
|
||||
List<HRegionServer> rsList = TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
|
||||
.map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
|
||||
List<Region> regions = new ArrayList<>();
|
||||
rsList.forEach(rs -> regions.addAll(rs.getOnlineRegions(table)));
|
||||
Assert.assertEquals(regions.size(), 1);
|
||||
int countBefore = countStoreFilesInFamilies(regions, families);
|
||||
Assert.assertTrue(countBefore > 0);
|
||||
// Minor compaction for all region servers.
|
||||
for (HRegionServer rs : rsList)
|
||||
admin.compactRegionServer(rs.getServerName()).get();
|
||||
Thread.sleep(5000);
|
||||
int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
|
||||
Assert.assertTrue(countAfterMinorCompaction < countBefore);
|
||||
// Major compaction for all region servers.
|
||||
for (HRegionServer rs : rsList)
|
||||
admin.majorCompactRegionServer(rs.getServerName()).get();
|
||||
Thread.sleep(5000);
|
||||
int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
|
||||
Assert.assertEquals(countAfterMajorCompaction, 3);
|
||||
} finally {
|
||||
if (ht != null) {
|
||||
TEST_UTIL.deleteTable(table);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void compactionTest(final String tableName, final int flushes,
|
||||
final CompactionState expectedState, boolean singleFamily) throws Exception {
|
||||
// Create a table with regions
|
||||
final TableName table = TableName.valueOf(tableName);
|
||||
byte[] family = Bytes.toBytes("family");
|
||||
byte[][] families =
|
||||
{ family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
|
||||
Table ht = null;
|
||||
try {
|
||||
ht = TEST_UTIL.createTable(table, families);
|
||||
loadData(ht, families, 3000, flushes);
|
||||
List<Region> regions = new ArrayList<>();
|
||||
TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads()
|
||||
.forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getOnlineRegions(table)));
|
||||
Assert.assertEquals(regions.size(), 1);
|
||||
int countBefore = countStoreFilesInFamilies(regions, families);
|
||||
int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
|
||||
assertTrue(countBefore > 0); // there should be some data files
|
||||
if (expectedState == CompactionState.MINOR) {
|
||||
if (singleFamily) {
|
||||
admin.compact(table, family).get();
|
||||
} else {
|
||||
admin.compact(table).get();
|
||||
}
|
||||
} else {
|
||||
if (singleFamily) {
|
||||
admin.majorCompact(table, family).get();
|
||||
} else {
|
||||
admin.majorCompact(table).get();
|
||||
}
|
||||
}
|
||||
long curt = System.currentTimeMillis();
|
||||
long waitTime = 5000;
|
||||
long endt = curt + waitTime;
|
||||
CompactionState state = TEST_UTIL.getAdmin().getCompactionState(table);
|
||||
while (state == CompactionState.NONE && curt < endt) {
|
||||
Thread.sleep(10);
|
||||
state = TEST_UTIL.getAdmin().getCompactionState(table);
|
||||
curt = System.currentTimeMillis();
|
||||
}
|
||||
// Now, should have the right compaction state,
|
||||
// otherwise, the compaction should have already been done
|
||||
if (expectedState != state) {
|
||||
for (Region region : regions) {
|
||||
state = CompactionState.valueOf(region.getCompactionState().toString());
|
||||
assertEquals(CompactionState.NONE, state);
|
||||
}
|
||||
} else {
|
||||
// Wait until the compaction is done
|
||||
state = TEST_UTIL.getAdmin().getCompactionState(table);
|
||||
while (state != CompactionState.NONE && curt < endt) {
|
||||
Thread.sleep(10);
|
||||
state = TEST_UTIL.getAdmin().getCompactionState(table);
|
||||
}
|
||||
// Now, compaction should be done.
|
||||
assertEquals(CompactionState.NONE, state);
|
||||
}
|
||||
int countAfter = countStoreFilesInFamilies(regions, families);
|
||||
int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
|
||||
assertTrue(countAfter < countBefore);
|
||||
if (!singleFamily) {
|
||||
if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
|
||||
else assertTrue(families.length < countAfter);
|
||||
} else {
|
||||
int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
|
||||
// assert only change was to single column family
|
||||
assertTrue(singleFamDiff == (countBefore - countAfter));
|
||||
if (expectedState == CompactionState.MAJOR) {
|
||||
assertTrue(1 == countAfterSingleFamily);
|
||||
} else {
|
||||
assertTrue(1 < countAfterSingleFamily);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (ht != null) {
|
||||
TEST_UTIL.deleteTable(table);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) {
|
||||
return countStoreFilesInFamilies(regions, new byte[][] { family });
|
||||
}
|
||||
|
||||
private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) {
|
||||
int count = 0;
|
||||
for (Region region : regions) {
|
||||
count += region.getStoreFileList(families).size();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
private static void loadData(final Table ht, final byte[][] families, final int rows,
|
||||
final int flushes) throws IOException {
|
||||
List<Put> puts = new ArrayList<>(rows);
|
||||
byte[] qualifier = Bytes.toBytes("val");
|
||||
for (int i = 0; i < flushes; i++) {
|
||||
for (int k = 0; k < rows; k++) {
|
||||
byte[] row = Bytes.toBytes(RANDOM.nextLong());
|
||||
Put p = new Put(row);
|
||||
for (int j = 0; j < families.length; ++j) {
|
||||
p.addColumn(families[j], qualifier, row);
|
||||
}
|
||||
puts.add(p);
|
||||
}
|
||||
ht.put(puts);
|
||||
TEST_UTIL.flush();
|
||||
puts.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue