HBASE-24404 Support flush a single column family of region (#2032)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
bsglz 2020-07-11 01:13:50 +08:00 committed by GitHub
parent b2321b3583
commit 724f0478ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 123 additions and 26 deletions

View File

@ -521,6 +521,15 @@ public interface Admin extends Abortable, Closeable {
*/
void flushRegion(byte[] regionName) throws IOException;
/**
* Flush a column family within a region. Synchronous operation.
*
* @param regionName region to flush
* @param columnFamily column family within a region
* @throws IOException if a remote or network exception occurs
*/
void flushRegion(byte[] regionName, byte[] columnFamily) throws IOException;
/**
* Flush all regions on the region server. Synchronous operation.
* @param serverName the region server name to flush

View File

@ -249,6 +249,11 @@ class AdminOverAsyncAdmin implements Admin {
get(admin.flushRegion(regionName));
}
@Override
public void flushRegion(byte[] regionName, byte[] columnFamily) throws IOException {
get(admin.flushRegion(regionName, columnFamily));
}
@Override
public void flushRegionServer(ServerName serverName) throws IOException {
get(admin.flushRegionServer(serverName));

View File

@ -308,6 +308,14 @@ public interface AsyncAdmin {
*/
CompletableFuture<Void> flushRegion(byte[] regionName);
/**
* Flush a column family within a region.
* @param regionName region to flush
* @param columnFamily column family within a region. If not present, flush the region's all
* column families.
*/
CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily);
/**
* Flush all region on the region server.
* @param serverName server to flush

View File

@ -248,6 +248,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.flushRegion(regionName));
}
@Override
public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
return wrap(rawAdmin.flushRegion(regionName, columnFamily));
}
@Override
public CompletableFuture<Void> flushRegionServer(ServerName sn) {
return wrap(rawAdmin.flushRegionServer(sn));

View File

@ -926,10 +926,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> flushRegion(byte[] regionName) {
return flushRegionInternal(regionName, false).thenAccept(r -> {
return flushRegionInternal(regionName, null, false).thenAccept(r -> {
});
}
@Override
public CompletableFuture<Void> flushRegion(byte[] regionName, byte[] columnFamily) {
Preconditions.checkNotNull(columnFamily, "columnFamily is null."
+ "If you don't specify a columnFamily, use flushRegion(regionName) instead");
return flushRegionInternal(regionName, columnFamily, false)
.thenAccept(r -> {});
}
/**
* This method is for internal use only, where we need the response of the flush.
* <p/>
@ -937,7 +945,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
* API.
*/
CompletableFuture<FlushRegionResponse> flushRegionInternal(byte[] regionName,
boolean writeFlushWALMarker) {
byte[] columnFamily, boolean writeFlushWALMarker) {
CompletableFuture<FlushRegionResponse> future = new CompletableFuture<>();
addListener(getRegionLocation(regionName), (location, err) -> {
if (err != null) {
@ -950,23 +958,25 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
return;
}
addListener(flush(serverName, location.getRegion(), writeFlushWALMarker), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
addListener(
flush(serverName, location.getRegion(), columnFamily, writeFlushWALMarker),
(ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}});
});
return future;
}
private CompletableFuture<FlushRegionResponse> flush(ServerName serverName, RegionInfo regionInfo,
boolean writeFlushWALMarker) {
byte[] columnFamily, boolean writeFlushWALMarker) {
return this.<FlushRegionResponse> newAdminCaller().serverName(serverName)
.action((controller, stub) -> this
.<FlushRegionRequest, FlushRegionResponse, FlushRegionResponse> adminCall(controller, stub,
RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(), writeFlushWALMarker),
RequestConverter.buildFlushRegionRequest(regionInfo.getRegionName(),
columnFamily, writeFlushWALMarker),
(s, c, req, done) -> s.flushRegion(c, req, done), resp -> resp))
.call();
}
@ -981,8 +991,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
List<CompletableFuture<Void>> compactFutures = new ArrayList<>();
if (hRegionInfos != null) {
hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, false).thenAccept(r -> {
})));
hRegionInfos.forEach(
region -> compactFutures.add(
flush(sn, region, null, false).thenAccept(r -> {})
)
);
}
addListener(CompletableFuture.allOf(
compactFutures.toArray(new CompletableFuture<?>[compactFutures.size()])), (ret, err2) -> {

View File

@ -779,20 +779,24 @@ public final class RequestConverter {
* @return a protocol buffer FlushRegionRequest
*/
public static FlushRegionRequest buildFlushRegionRequest(final byte[] regionName) {
return buildFlushRegionRequest(regionName, false);
return buildFlushRegionRequest(regionName, null, false);
}
/**
* Create a protocol buffer FlushRegionRequest for a given region name
* @param regionName the name of the region to get info
* @param columnFamily column family within a region
* @return a protocol buffer FlushRegionRequest
*/
public static FlushRegionRequest buildFlushRegionRequest(final byte[] regionName,
boolean writeFlushWALMarker) {
byte[] columnFamily, boolean writeFlushWALMarker) {
FlushRegionRequest.Builder builder = FlushRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setWriteFlushWalMarker(writeFlushWALMarker);
if (columnFamily != null) {
builder.setFamily(UnsafeByteOperations.unsafeWrap(columnFamily));
}
return builder.build();
}

View File

@ -137,6 +137,7 @@ message FlushRegionRequest {
required RegionSpecifier region = 1;
optional uint64 if_older_than_ts = 2;
optional bool write_flush_wal_marker = 3; // whether to write a marker to WAL even if not flushed
optional bytes family = 4;
}
message FlushRegionResponse {

View File

@ -74,7 +74,7 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
boolean writeFlushWALMarker) {
RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
return admin.flushRegionInternal(regionName, writeFlushWALMarker);
return admin.flushRegionInternal(regionName, null, writeFlushWALMarker);
}
@Override

View File

@ -1788,8 +1788,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
request.getWriteFlushWalMarker() : false;
// Go behind the curtain so we can manage writing of the flush WAL marker
HRegion.FlushResultImpl flushResult =
region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
HRegion.FlushResultImpl flushResult = null;
if (request.hasFamily()) {
List families = new ArrayList();
families.add(request.getFamily().toByteArray());
flushResult =
region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
} else {
flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
}
boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) {
regionServer.compactSplitThread.requestSystemCompaction(region,

View File

@ -63,8 +63,9 @@ public class TestFlushFromClient {
Bytes.toBytes("1"),
Bytes.toBytes("4"),
Bytes.toBytes("8"));
private static final byte[] FAMILY = Bytes.toBytes("f1");
private static final byte[] FAMILY_1 = Bytes.toBytes("f1");
private static final byte[] FAMILY_2 = Bytes.toBytes("f2");
public static final byte[][] FAMILIES = {FAMILY_1, FAMILY_2};
@Rule
public TestName name = new TestName();
@ -85,11 +86,14 @@ public class TestFlushFromClient {
@Before
public void setUp() throws Exception {
tableName = TableName.valueOf(name.getMethodName());
try (Table t = TEST_UTIL.createTable(tableName, FAMILY, SPLITS)) {
try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) {
List<Put> puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList());
for (int i = 0; i != 20; ++i) {
byte[] value = Bytes.toBytes(i);
puts.forEach(p -> p.addColumn(FAMILY, value, value));
puts.forEach(p -> {
p.addColumn(FAMILY_1, value, value);
p.addColumn(FAMILY_2, value, value);
});
}
t.put(puts);
}
@ -131,6 +135,18 @@ public class TestFlushFromClient {
}
}
@Test
public void testFlushRegionFamily() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
for (HRegion r : getRegionInfo()) {
long sizeBeforeFlush = r.getMemStoreDataSize();
admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1);
TimeUnit.SECONDS.sleep(1);
assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
}
}
}
@Test
public void testAsyncFlushRegion() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
@ -141,6 +157,17 @@ public class TestFlushFromClient {
}
}
@Test
public void testAsyncFlushRegionFamily() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
for (HRegion r : getRegionInfo()) {
long sizeBeforeFlush = r.getMemStoreDataSize();
admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get();
TimeUnit.SECONDS.sleep(1);
assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize());
}
}
@Test
public void testFlushRegionServer() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {

View File

@ -224,6 +224,10 @@ public class VerifyingRSGroupAdmin implements Admin, Closeable {
admin.flushRegion(regionName);
}
public void flushRegion(byte[] regionName, byte[] columnFamily) throws IOException {
admin.flushRegion(regionName, columnFamily);
}
public void flushRegionServer(ServerName serverName) throws IOException {
admin.flushRegionServer(serverName);
}

View File

@ -55,8 +55,14 @@ module Hbase
#----------------------------------------------------------------------------------------------
# Requests a table or region or region server flush
def flush(name)
@admin.flushRegion(name.to_java_bytes)
def flush(name, family = nil)
family_bytes = nil
family_bytes = family.to_java_bytes unless family.nil?
if family_bytes.nil?
@admin.flushRegion(name.to_java_bytes)
else
@admin.flushRegion(name.to_java_bytes, family_bytes)
end
rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
# Unknown region. Try table.
begin

View File

@ -25,17 +25,20 @@ module Shell
Flush all regions in passed table or pass a region row to
flush an individual region or a region server name whose format
is 'host,port,startcode', to flush all its regions.
You can also flush a single column family within a region.
For example:
hbase> flush 'TABLENAME'
hbase> flush 'REGIONNAME'
hbase> flush 'REGIONNAME','FAMILYNAME'
hbase> flush 'ENCODED_REGIONNAME'
hbase> flush 'ENCODED_REGIONNAME','FAMILYNAME'
hbase> flush 'REGION_SERVER_NAME'
EOF
end
def command(table_or_region_name)
admin.flush(table_or_region_name)
def command(table_or_region_name, family = nil)
admin.flush(table_or_region_name, family)
end
end
end

View File

@ -479,6 +479,11 @@ public class ThriftAdmin implements Admin {
}
@Override
public void flushRegion(byte[] regionName, byte[] columnFamily) {
throw new NotImplementedException("flushRegion not supported in ThriftAdmin");
}
@Override
public void flushRegionServer(ServerName serverName) {
throw new NotImplementedException("flushRegionServer not supported in ThriftAdmin");