HBASE-24404 Support flush a single column family of region (#2098)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
bsglz 2020-07-31 16:31:14 +08:00 committed by GitHub
parent cb179467fe
commit 1c45c8c7db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 110 additions and 22 deletions

View File

@ -890,6 +890,15 @@ public interface Admin extends Abortable, Closeable {
*/
void flushRegion(byte[] regionName) throws IOException;
/**
* Flush a column family within a region. Synchronous operation.
*
* @param regionName region to flush
* @param columnFamily column family within a region
* @throws IOException if a remote or network exception occurs
*/
void flushRegion(byte[] regionName, byte[] columnFamily) throws IOException;
/**
* Flush all regions on the region server. Synchronous operation.
* @param serverName the region server name to flush

View File

@ -312,6 +312,14 @@ public interface AsyncAdmin {
*/
CompletableFuture<Void> flushRegion(byte[] regionName);
/**
* Flush a column family within a region.
* @param regionName region to flush
* @param columnFamily column family within a region. If not present, flush the region's all
* column families.
*/
CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily);
/**
* Flush all region on the region server.
* @param serverName server to flush

View File

@ -249,6 +249,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.flushRegion(regionName));
}
@Override
public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
return wrap(rawAdmin.flushRegion(regionName, columnFamily));
}
@Override
public CompletableFuture<Void> flushRegionServer(ServerName sn) {
return wrap(rawAdmin.flushRegionServer(sn));

View File

@ -82,7 +82,7 @@ public class FlushRegionCallable extends RegionAdminServiceCallable<FlushRegionR
}
FlushRegionRequest request =
RequestConverter.buildFlushRegionRequest(regionName, writeFlushWalMarker);
RequestConverter.buildFlushRegionRequest(regionName, null, writeFlushWalMarker);
return stub.flushRegion(controller, request);
}
}

View File

@ -1161,6 +1161,11 @@ public class HBaseAdmin implements Admin {
@Override
public void flushRegion(final byte[] regionName) throws IOException {
flushRegion(regionName, null);
}
@Override
public void flushRegion(final byte[] regionName, byte[] columnFamily) throws IOException {
Pair<RegionInfo, ServerName> regionServerPair = getRegion(regionName);
if (regionServerPair == null) {
throw new IllegalArgumentException("Unknown regionname: " + Bytes.toStringBinary(regionName));
@ -1170,16 +1175,17 @@ public class HBaseAdmin implements Admin {
}
final RegionInfo regionInfo = regionServerPair.getFirst();
ServerName serverName = regionServerPair.getSecond();
flush(this.connection.getAdmin(serverName), regionInfo);
flush(this.connection.getAdmin(serverName), regionInfo, columnFamily);
}
private void flush(AdminService.BlockingInterface admin, final RegionInfo info)
private void flush(AdminService.BlockingInterface admin, final RegionInfo info,
byte[] columnFamily)
throws IOException {
ProtobufUtil.call(() -> {
// TODO: There is no timeout on this controller. Set one!
HBaseRpcController controller = rpcControllerFactory.newController();
FlushRegionRequest request =
RequestConverter.buildFlushRegionRequest(info.getRegionName());
RequestConverter.buildFlushRegionRequest(info.getRegionName(), columnFamily, false);
admin.flushRegion(controller, request);
return null;
});
@ -1188,7 +1194,7 @@ public class HBaseAdmin implements Admin {
@Override
public void flushRegionServer(ServerName serverName) throws IOException {
for (RegionInfo region : getRegions(serverName)) {
flush(this.connection.getAdmin(serverName), region);
flush(this.connection.getAdmin(serverName), region, null);
}
}

View File

@ -924,6 +924,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
return flushRegion(regionName, null);
}
@Override
public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(getRegionLocation(regionName), (location, err) -> {
if (err != null) {
@ -936,7 +941,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
return;
}
addListener(flush(serverName, location.getRegion()), (ret, err2) -> {
addListener(flush(serverName, location.getRegion(), columnFamily), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
@ -947,14 +952,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return future;
}
private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo) {
private CompletableFuture<Void> flush(final ServerName serverName, final RegionInfo regionInfo,
byte[] columnFamily) {
return this.<Void> newAdminCaller()
.serverName(serverName)
.action(
(controller, stub) -> this.<FlushRegionRequest, FlushRegionResponse, Void> adminCall(
controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo
.getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done),
resp -> null))
.getRegionName(), columnFamily, false),
(s, c, req, done) -> s.flushRegion(c, req, done), resp -> null))
.call();
}
@ -968,7 +974,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
if (hRegionInfos != null) {
hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region)));
hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, null)));
}
addListener(CompletableFuture.allOf(
compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {

View File

@ -1001,20 +1001,24 @@ public final class RequestConverter {
* @return a protocol buffer FlushRegionRequest
*/
public static FlushRegionRequest buildFlushRegionRequest(final byte[] regionName) {
return buildFlushRegionRequest(regionName, false);
return buildFlushRegionRequest(regionName, null, false);
}
/**
* Create a protocol buffer FlushRegionRequest for a given region name
* @param regionName the name of the region to get info
* @param columnFamily column family within a region
* @return a protocol buffer FlushRegionRequest
*/
public static FlushRegionRequest buildFlushRegionRequest(final byte[] regionName,
boolean writeFlushWALMarker) {
byte[] columnFamily, boolean writeFlushWALMarker) {
FlushRegionRequest.Builder builder = FlushRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setWriteFlushWalMarker(writeFlushWALMarker);
if (columnFamily != null) {
builder.setFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
}
return builder.build();
}

View File

@ -137,6 +137,7 @@ message FlushRegionRequest {
required RegionSpecifier region = 1;
optional uint64 if_older_than_ts = 2;
optional bool write_flush_wal_marker = 3; // whether to write a marker to WAL even if not flushed
optional bytes family = 4;
}
message FlushRegionResponse {

View File

@ -127,6 +127,7 @@ message FlushRegionRequest {
required RegionSpecifier region = 1;
optional uint64 if_older_than_ts = 2;
optional bool write_flush_wal_marker = 3; // whether to write a marker to WAL even if not flushed
optional bytes family = 4;
}
message FlushRegionResponse {

View File

@ -1779,8 +1779,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
request.getWriteFlushWalMarker() : false;
// Go behind the curtain so we can manage writing of the flush WAL marker
HRegion.FlushResultImpl flushResult =
region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
HRegion.FlushResultImpl flushResult = null;
if (request.hasFamily()) {
List families = new ArrayList();
families.add(request.getFamily().toByteArray());
flushResult =
region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
} else {
flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
}
boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) {
regionServer.compactSplitThread.requestSystemCompaction(region,

View File

@ -63,8 +63,9 @@ public class TestFlushFromClient {
Bytes.toBytes("1"),
Bytes.toBytes("4"),
Bytes.toBytes("8"));
private static final byte[] FAMILY = Bytes.toBytes("f1");
private static final byte[] FAMILY_1 = Bytes.toBytes("f1");
private static final byte[] FAMILY_2 = Bytes.toBytes("f2");
public static final byte[][] FAMILIES = {FAMILY_1, FAMILY_2};
@Rule
public TestName name = new TestName();
@ -85,11 +86,14 @@ public class TestFlushFromClient {
@Before
public void setUp() throws Exception {
tableName = TableName.valueOf(name.getMethodName());
try (Table t = TEST_UTIL.createTable(tableName, FAMILY, SPLITS)) {
try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) {
List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList());
for (int i = 0; i != 20; ++i) {
byte[] value = Bytes.toBytes(i);
puts.forEach(p -> p.addColumn(FAMILY, value, value));
puts.forEach(p -> {
p.addColumn(FAMILY_1, value, value);
p.addColumn(FAMILY_2, value, value);
});
}
t.put(puts);
}
@ -131,6 +135,18 @@ public class TestFlushFromClient {
}
}
@Test
public void testFlushRegionFamily() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
for (HRegion r : getRegionInfo()) {
long sizeBeforeFlush = r.getMemStoreDataSize();
admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1);
TimeUnit.SECONDS.sleep(1);
assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
}
}
}
@Test
public void testAsyncFlushRegion() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
@ -141,6 +157,17 @@ public class TestFlushFromClient {
}
}
@Test
public void testAsyncFlushRegionFamily() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
for (HRegion r : getRegionInfo()) {
long sizeBeforeFlush = r.getMemStoreDataSize();
admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get();
TimeUnit.SECONDS.sleep(1);
assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
}
}
@Test
public void testFlushRegionServer() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {

View File

@ -53,8 +53,14 @@ module Hbase
#----------------------------------------------------------------------------------------------
# Requests a table or region or region server flush
def flush(name)
@admin.flushRegion(name.to_java_bytes)
def flush(name, family = nil)
family_bytes = nil
family_bytes = family.to_java_bytes unless family.nil?
if family_bytes.nil?
@admin.flushRegion(name.to_java_bytes)
else
@admin.flushRegion(name.to_java_bytes, family_bytes)
end
rescue java.lang.IllegalArgumentException
# Unknown region. Try table.
begin

View File

@ -25,17 +25,20 @@ module Shell
Flush all regions in passed table or pass a region row to
flush an individual region or a region server name whose format
is 'host,port,startcode', to flush all its regions.
You can also flush a single column family within a region.
For example:
hbase> flush 'TABLENAME'
hbase> flush 'REGIONNAME'
hbase> flush 'REGIONNAME','FAMILYNAME'
hbase> flush 'ENCODED_REGIONNAME'
hbase> flush 'ENCODED_REGIONNAME','FAMILYNAME'
hbase> flush 'REGION_SERVER_NAME'
EOF
end
def command(table_or_region_name)
admin.flush(table_or_region_name)
def command(table_or_region_name, family = nil)
admin.flush(table_or_region_name, family)
end
end
end

View File

@ -636,6 +636,11 @@ public class ThriftAdmin implements Admin {
}
@Override
public void flushRegion(byte[] regionName, byte[] columnFamily) {
throw new NotImplementedException("flushRegion not supported in ThriftAdmin");
}
@Override
public void flushRegionServer(ServerName serverName) {
throw new NotImplementedException("flushRegionServer not supported in ThriftAdmin");