diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index d34f56f7d34..d5483eb6a9c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -890,6 +890,15 @@ public interface Admin extends Abortable, Closeable { */ void flushRegion(byte[] regionName) throws IOException; + /** + * Flush a column family within a region. Synchronous operation. + * + * @param regionName region to flush + * @param columnFamily column family within a region + * @throws IOException if a remote or network exception occurs + */ + void flushRegion(byte[] regionName, byte[] columnFamily) throws IOException; + /** * Flush all regions on the region server. Synchronous operation. * @param serverName the region server name to flush diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index b69fcc6ab0a..b272e756e15 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -312,6 +312,14 @@ public interface AsyncAdmin { */ CompletableFuture flushRegion(byte[] regionName); + /** + * Flush a column family within a region. + * @param regionName region to flush + * @param columnFamily column family within a region. If not present, flush the region's all + * column families. + */ + CompletableFuture flushRegion(byte[] regionName, byte[] columnFamily); + /** * Flush all region on the region server. * @param serverName server to flush diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 55e08770dbe..c004d9fa8d1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -249,6 +249,11 @@ class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.flushRegion(regionName)); } + @Override + public CompletableFuture flushRegion(byte[] regionName, byte[] columnFamily) { + return wrap(rawAdmin.flushRegion(regionName, columnFamily)); + } + @Override public CompletableFuture flushRegionServer(ServerName sn) { return wrap(rawAdmin.flushRegionServer(sn)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java index bb265a43f62..b0eea86a8bc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FlushRegionCallable.java @@ -82,7 +82,7 @@ public class FlushRegionCallable extends RegionAdminServiceCallable regionServerPair = getRegion(regionName); if (regionServerPair == null) { throw new IllegalArgumentException("Unknown regionname: " + Bytes.toStringBinary(regionName)); @@ -1170,16 +1175,17 @@ public class HBaseAdmin implements Admin { } final RegionInfo regionInfo = regionServerPair.getFirst(); ServerName serverName = regionServerPair.getSecond(); - flush(this.connection.getAdmin(serverName), regionInfo); + flush(this.connection.getAdmin(serverName), regionInfo, columnFamily); } - private void flush(AdminService.BlockingInterface admin, final RegionInfo info) + private void flush(AdminService.BlockingInterface admin, final RegionInfo info, + byte[] columnFamily) throws IOException { ProtobufUtil.call(() -> { // TODO: There is no timeout on this controller. Set one! HBaseRpcController controller = rpcControllerFactory.newController(); FlushRegionRequest request = - RequestConverter.buildFlushRegionRequest(info.getRegionName()); + RequestConverter.buildFlushRegionRequest(info.getRegionName(), columnFamily, false); admin.flushRegion(controller, request); return null; }); @@ -1188,7 +1194,7 @@ public class HBaseAdmin implements Admin { @Override public void flushRegionServer(ServerName serverName) throws IOException { for (RegionInfo region : getRegions(serverName)) { - flush(this.connection.getAdmin(serverName), region); + flush(this.connection.getAdmin(serverName), region, null); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 023e8fcc937..b25b287927d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -924,6 +924,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture flushRegion(byte[] regionName) { + return flushRegion(regionName, null); + } + + @Override + public CompletableFuture flushRegion(byte[] regionName, byte[] columnFamily) { CompletableFuture future = new CompletableFuture<>(); addListener(getRegionLocation(regionName), (location, err) -> { if (err != null) { @@ -936,7 +941,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { .completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName))); return; } - addListener(flush(serverName, location.getRegion()), (ret, err2) -> { + addListener(flush(serverName, location.getRegion(), columnFamily), (ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { @@ -947,14 +952,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return future; } - private CompletableFuture flush(final ServerName serverName, final RegionInfo regionInfo) { + private CompletableFuture flush(final ServerName serverName, final RegionInfo regionInfo, + byte[] columnFamily) { return this. newAdminCaller() .serverName(serverName) .action( (controller, stub) -> this. adminCall( controller, stub, RequestConverter.buildFlushRegionRequest(regionInfo - .getRegionName()), (s, c, req, done) -> s.flushRegion(c, req, done), - resp -> null)) + .getRegionName(), columnFamily, false), + (s, c, req, done) -> s.flushRegion(c, req, done), resp -> null)) .call(); } @@ -968,7 +974,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } List> compactFutures = new ArrayList<>(); if (hRegionInfos != null) { - hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region))); + hRegionInfos.forEach(region -> compactFutures.add(flush(sn, region, null))); } addListener(CompletableFuture.allOf( compactFutures.toArray(new CompletableFuture[compactFutures.size()])), (ret, err2) -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 13d49f1709b..f377d6c8daa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -1001,20 +1001,24 @@ public final class RequestConverter { * @return a protocol buffer FlushRegionRequest */ public static FlushRegionRequest buildFlushRegionRequest(final byte[] regionName) { - return buildFlushRegionRequest(regionName, false); + return buildFlushRegionRequest(regionName, null, false); } /** * Create a protocol buffer FlushRegionRequest for a given region name * @param regionName the name of the region to get info + * @param columnFamily column family within a region * @return a protocol buffer FlushRegionRequest */ public static FlushRegionRequest buildFlushRegionRequest(final byte[] regionName, - boolean writeFlushWALMarker) { + byte[] columnFamily, boolean writeFlushWALMarker) { FlushRegionRequest.Builder builder = FlushRegionRequest.newBuilder(); RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); builder.setRegion(region); builder.setWriteFlushWalMarker(writeFlushWALMarker); + if (columnFamily != null) { + builder.setFamily(UnsafeByteOperations.unsafeWrap(columnFamily)); + } return builder.build(); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto index aa04ba63027..b5bf2ea4210 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto @@ -137,6 +137,7 @@ message FlushRegionRequest { required RegionSpecifier region = 1; optional uint64 if_older_than_ts = 2; optional bool write_flush_wal_marker = 3; // whether to write a marker to WAL even if not flushed + optional bytes family = 4; } message FlushRegionResponse { diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto index 68194d63b1b..23fe777b5ef 100644 --- a/hbase-protocol/src/main/protobuf/Admin.proto +++ b/hbase-protocol/src/main/protobuf/Admin.proto @@ -127,6 +127,7 @@ message FlushRegionRequest { required RegionSpecifier region = 1; optional uint64 if_older_than_ts = 2; optional bool write_flush_wal_marker = 3; // whether to write a marker to WAL even if not flushed + optional bytes family = 4; } message FlushRegionResponse { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d1ef55cb444..bc00ab2e089 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1779,8 +1779,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; // Go behind the curtain so we can manage writing of the flush WAL marker - HRegion.FlushResultImpl flushResult = - region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); + HRegion.FlushResultImpl flushResult = null; + if (request.hasFamily()) { + List families = new ArrayList(); + families.add(request.getFamily().toByteArray()); + flushResult = + region.flushcache(families, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); + } else { + flushResult = region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); + } boolean compactionNeeded = flushResult.isCompactionNeeded(); if (compactionNeeded) { regionServer.compactSplitThread.requestSystemCompaction(region, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java index 7afd36b77be..b0fb0b7c58d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java @@ -63,8 +63,9 @@ public class TestFlushFromClient { Bytes.toBytes("1"), Bytes.toBytes("4"), Bytes.toBytes("8")); - private static final byte[] FAMILY = Bytes.toBytes("f1"); - + private static final byte[] FAMILY_1 = Bytes.toBytes("f1"); + private static final byte[] FAMILY_2 = Bytes.toBytes("f2"); + public static final byte[][] FAMILIES = {FAMILY_1, FAMILY_2}; @Rule public TestName name = new TestName(); @@ -85,11 +86,14 @@ public class TestFlushFromClient { @Before public void setUp() throws Exception { tableName = TableName.valueOf(name.getMethodName()); - try (Table t = TEST_UTIL.createTable(tableName, FAMILY, SPLITS)) { + try (Table t = TEST_UTIL.createTable(tableName, FAMILIES, SPLITS)) { List puts = ROWS.stream().map(r -> new Put(r)).collect(Collectors.toList()); for (int i = 0; i != 20; ++i) { byte[] value = Bytes.toBytes(i); - puts.forEach(p -> p.addColumn(FAMILY, value, value)); + puts.forEach(p -> { + p.addColumn(FAMILY_1, value, value); + p.addColumn(FAMILY_2, value, value); + }); } t.put(puts); } @@ -131,6 +135,18 @@ public class TestFlushFromClient { } } + @Test + public void testFlushRegionFamily() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + for (HRegion r : getRegionInfo()) { + long sizeBeforeFlush = r.getMemStoreDataSize(); + admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1); + TimeUnit.SECONDS.sleep(1); + assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize()); + } + } + } + @Test public void testAsyncFlushRegion() throws Exception { AsyncAdmin admin = asyncConn.getAdmin(); @@ -141,6 +157,17 @@ public class TestFlushFromClient { } } + @Test + public void testAsyncFlushRegionFamily() throws Exception { + AsyncAdmin admin = asyncConn.getAdmin(); + for (HRegion r : getRegionInfo()) { + long sizeBeforeFlush = r.getMemStoreDataSize(); + admin.flushRegion(r.getRegionInfo().getRegionName(), FAMILY_1).get(); + TimeUnit.SECONDS.sleep(1); + assertEquals(sizeBeforeFlush / 2, r.getMemStoreDataSize()); + } + } + @Test public void testFlushRegionServer() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 1507c7a5267..2ce5588861f 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -53,8 +53,14 @@ module Hbase #---------------------------------------------------------------------------------------------- # Requests a table or region or region server flush - def flush(name) - @admin.flushRegion(name.to_java_bytes) + def flush(name, family = nil) + family_bytes = nil + family_bytes = family.to_java_bytes unless family.nil? + if family_bytes.nil? + @admin.flushRegion(name.to_java_bytes) + else + @admin.flushRegion(name.to_java_bytes, family_bytes) + end rescue java.lang.IllegalArgumentException # Unknown region. Try table. begin diff --git a/hbase-shell/src/main/ruby/shell/commands/flush.rb b/hbase-shell/src/main/ruby/shell/commands/flush.rb index 1f6b3105a1d..f34999c4eb8 100644 --- a/hbase-shell/src/main/ruby/shell/commands/flush.rb +++ b/hbase-shell/src/main/ruby/shell/commands/flush.rb @@ -25,17 +25,20 @@ module Shell Flush all regions in passed table or pass a region row to flush an individual region or a region server name whose format is 'host,port,startcode', to flush all its regions. +You can also flush a single column family within a region. For example: hbase> flush 'TABLENAME' hbase> flush 'REGIONNAME' + hbase> flush 'REGIONNAME','FAMILYNAME' hbase> flush 'ENCODED_REGIONNAME' + hbase> flush 'ENCODED_REGIONNAME','FAMILYNAME' hbase> flush 'REGION_SERVER_NAME' EOF end - def command(table_or_region_name) - admin.flush(table_or_region_name) + def command(table_or_region_name, family = nil) + admin.flush(table_or_region_name, family) end end end diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index facdc62696d..8d0b43c4fb5 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -636,6 +636,11 @@ public class ThriftAdmin implements Admin { } + @Override + public void flushRegion(byte[] regionName, byte[] columnFamily) { + throw new NotImplementedException("flushRegion not supported in ThriftAdmin"); + } + @Override public void flushRegionServer(ServerName serverName) { throw new NotImplementedException("flushRegionServer not supported in ThriftAdmin");