From 485e0d2fa4dd92dbbd4a483b2e2f554e7848f494 Mon Sep 17 00:00:00 2001 From: bsglz <18031031@qq.com> Date: Fri, 7 Aug 2020 18:28:53 +0800 Subject: [PATCH] HBASE-24694 Support flush a single column family of table (#2179) Signed-off-by: Wellington Chevreuil --- .../org/apache/hadoop/hbase/client/Admin.java | 10 ++++++++ .../hbase/client/AdminOverAsyncAdmin.java | 5 ++++ .../hadoop/hbase/client/AsyncAdmin.java | 8 ++++++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 5 ++++ .../hbase/client/RawAsyncHBaseAdmin.java | 12 +++++++-- .../org/apache/hadoop/hbase/HConstants.java | 3 +++ .../flush/FlushTableSubprocedure.java | 25 +++++++++++++++---- .../MasterFlushTableProcedureManager.java | 12 ++++++++- ...egionServerFlushTableProcedureManager.java | 19 +++++++++++--- .../hbase/client/TestFlushFromClient.java | 19 ++++++++++++++ .../hbase/rsgroup/VerifyingRSGroupAdmin.java | 4 +++ hbase-shell/src/main/ruby/hbase/admin.rb | 6 ++++- .../src/main/ruby/shell/commands/flush.rb | 4 ++- .../hbase/thrift2/client/ThriftAdmin.java | 5 ++++ 14 files changed, 124 insertions(+), 13 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 68b343ae6af..155f0249daa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -513,6 +513,16 @@ public interface Admin extends Abortable, Closeable { */ void flush(TableName tableName) throws IOException; + /** + * Flush the specified column family stores on all regions of the passed table. + * This runs as a synchronous operation. + * + * @param tableName table to flush + * @param columnFamily column family within a table + * @throws IOException if a remote or network exception occurs + */ + void flush(TableName tableName, byte[] columnFamily) throws IOException; + /** * Flush an individual region. Synchronous operation. * diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java index 7533c091813..1255753f05e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java @@ -244,6 +244,11 @@ class AdminOverAsyncAdmin implements Admin { get(admin.flush(tableName)); } + @Override + public void flush(TableName tableName, byte[] columnFamily) throws IOException { + get(admin.flush(tableName, columnFamily)); + } + @Override public void flushRegion(byte[] regionName) throws IOException { get(admin.flushRegion(regionName)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index f119c7ebea4..336903d42e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -302,6 +302,14 @@ public interface AsyncAdmin { */ CompletableFuture flush(TableName tableName); + /** + * Flush the specified column family stores on all regions of the passed table. + * This runs as a synchronous operation. + * @param tableName table to flush + * @param columnFamily column family within a table + */ + CompletableFuture flush(TableName tableName, byte[] columnFamily); + /** * Flush an individual region. * @param regionName region to flush diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 1a49919c057..c29fe7118dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -243,6 +243,11 @@ class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.flush(tableName)); } + @Override + public CompletableFuture flush(TableName tableName, byte[] columnFamily) { + return wrap(rawAdmin.flush(tableName, columnFamily)); + } + @Override public CompletableFuture flushRegion(byte[] regionName) { return wrap(rawAdmin.flushRegion(regionName)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 63c177a7b6d..1330b2a1756 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -893,9 +893,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); } } - @Override public CompletableFuture flush(TableName tableName) { + return flush(tableName, null); + } + + @Override + public CompletableFuture flush(TableName tableName, byte[] columnFamily) { CompletableFuture future = new CompletableFuture<>(); addListener(tableExists(tableName), (exists, err) -> { if (err != null) { @@ -909,8 +913,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } else if (!tableEnabled) { future.completeExceptionally(new TableNotEnabledException(tableName)); } else { + Map props = new HashMap<>(); + if (columnFamily != null) { + props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily)); + } addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), - new HashMap<>()), (ret, err3) -> { + props), (ret, err3) -> { if (err3 != null) { future.completeExceptionally(err3); } else { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index d0ea8c53092..f6f00c55254 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -581,6 +581,9 @@ public final class HConstants { */ public static final byte [] META_VERSION_QUALIFIER = Bytes.toBytes("v"); + /** The family str as a key in map*/ + public static final String FAMILY_KEY_STR = "family"; + /** * The current version of the meta table. * - pre-hbase 0.92. There is no META_VERSION column in the root table diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java index 5c005a75a9a..d1240395241 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/FlushTableSubprocedure.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.procedure.flush; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; @@ -28,7 +29,9 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; /** * This flush region implementation uses the distributed procedure framework to flush @@ -40,23 +43,27 @@ public class FlushTableSubprocedure extends Subprocedure { private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class); private final String table; + private final String family; private final List regions; private final FlushTableSubprocedurePool taskManager; public FlushTableSubprocedure(ProcedureMember member, ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, - List regions, String table, + List regions, String table, String family, FlushTableSubprocedurePool taskManager) { super(member, table, errorListener, wakeFrequency, timeout); this.table = table; + this.family = family; this.regions = regions; this.taskManager = taskManager; } private static class RegionFlushTask implements Callable { HRegion region; - RegionFlushTask(HRegion region) { + List families; + RegionFlushTask(HRegion region, List families) { this.region = region; + this.families = families; } @Override @@ -65,7 +72,11 @@ public class FlushTableSubprocedure extends Subprocedure { region.startRegionOperation(); try { LOG.debug("Flush region " + region.toString() + " started..."); - region.flush(true); + if (families == null) { + region.flush(true); + } else { + region.flushcache(families, false, FlushLifeCycleTracker.DUMMY); + } // TODO: flush result is not checked? } finally { LOG.debug("Closing region operation on " + region); @@ -88,11 +99,15 @@ public class FlushTableSubprocedure extends Subprocedure { throw new IllegalStateException("Attempting to flush " + table + " but we currently have outstanding tasks"); } - + List families = null; + if (family != null) { + LOG.debug("About to flush family {} on all regions for table {}", family, table); + families = Arrays.asList(Bytes.toBytes(family)); + } // Add all hfiles already existing in region. for (HRegion region : regions) { // submit one task per region for parallelize by region. - taskManager.submitTask(new RegionFlushTask(region)); + taskManager.submitTask(new RegionFlushTask(region, families)); monitor.rethrowException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java index 1f7a5e2f26a..c6a3b92d378 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/MasterFlushTableProcedureManager.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -51,6 +52,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @@ -149,11 +151,19 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager { ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); + HBaseProtos.NameStringPair family = null; + for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) { + if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) { + family = nsp; + } + } + byte[] procArgs = family != null ? family.toByteArray() : new byte[0]; + // Kick of the global procedure from the master coordinator to the region servers. // We rely on the existing Distributed Procedure framework to prevent any concurrent // procedure with the same name. Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), - new byte[0], Lists.newArrayList(regionServers)); + procArgs, Lists.newArrayList(regionServers)); monitor.rethrowException(); if (proc == null) { String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '" diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java index 31d0276ddde..e05ca1eb7be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java @@ -51,6 +51,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; /** * This manager class handles flushing of the regions for table on a {@link HRegionServer}. @@ -129,9 +130,10 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur * there is a possibility of a race where regions may be missed. * * @param table + * @param family * @return Subprocedure to submit to the ProcedureMemeber. */ - public Subprocedure buildSubprocedure(String table) { + public Subprocedure buildSubprocedure(String table, String family) { // don't run the subprocedure if the parent is stop(ping) if (rss.isStopping() || rss.isStopped()) { @@ -162,7 +164,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur FlushTableSubprocedurePool taskManager = new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, - timeoutMillis, involvedRegions, table, taskManager); + timeoutMillis, involvedRegions, table, family, taskManager); } /** @@ -183,8 +185,19 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur @Override public Subprocedure buildSubprocedure(String name, byte[] data) { + String family = null; + // Currently we do not put other data except family, so it is ok to + // judge by length that if family was specified + if (data.length > 0) { + try { + HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data); + family = nsp.getValue(); + } catch (Exception e) { + LOG.error("fail to get family by parsing from data", e); + } + } // The name of the procedure instance from the master is the table name. - return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name); + return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java index 308529616b6..cd496bea35c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFlushFromClient.java @@ -117,6 +117,16 @@ public class TestFlushFromClient { } } + @Test + public void testFlushTableFamily() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize(); + admin.flush(tableName, FAMILY_1); + assertFalse(getRegionInfo().stream(). + anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2)); + } + } + @Test public void testAsyncFlushTable() throws Exception { AsyncAdmin admin = asyncConn.getAdmin(); @@ -124,6 +134,15 @@ public class TestFlushFromClient { assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); } + @Test + public void testAsyncFlushTableFamily() throws Exception { + AsyncAdmin admin = asyncConn.getAdmin(); + long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize(); + admin.flush(tableName, FAMILY_1).get(); + assertFalse(getRegionInfo().stream(). + anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2)); + } + @Test public void testFlushRegion() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java index 6d0574a7b44..aad0d416d26 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java @@ -220,6 +220,10 @@ public class VerifyingRSGroupAdmin implements Admin, Closeable { admin.flush(tableName); } + public void flush(TableName tableName, byte[] columnFamily) throws IOException { + admin.flush(tableName, columnFamily); + } + public void flushRegion(byte[] regionName) throws IOException { admin.flushRegion(regionName); } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 807b5a10e80..f298a123dbc 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -67,7 +67,11 @@ module Hbase rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException # Unknown region. Try table. begin - @admin.flush(TableName.valueOf(name)) + if family_bytes.nil? + @admin.flush(TableName.valueOf(name)) + else + @admin.flush(TableName.valueOf(name), family_bytes) + end rescue java.lang.IllegalArgumentException # Unknown table. Try region server. @admin.flushRegionServer(ServerName.valueOf(name)) diff --git a/hbase-shell/src/main/ruby/shell/commands/flush.rb b/hbase-shell/src/main/ruby/shell/commands/flush.rb index f34999c4eb8..69bcf134672 100644 --- a/hbase-shell/src/main/ruby/shell/commands/flush.rb +++ b/hbase-shell/src/main/ruby/shell/commands/flush.rb @@ -25,10 +25,12 @@ module Shell Flush all regions in passed table or pass a region row to flush an individual region or a region server name whose format is 'host,port,startcode', to flush all its regions. -You can also flush a single column family within a region. +You can also flush a single column family for all regions within a table, +or for an specific region only. For example: hbase> flush 'TABLENAME' + hbase> flush 'TABLENAME','FAMILYNAME' hbase> flush 'REGIONNAME' hbase> flush 'REGIONNAME','FAMILYNAME' hbase> flush 'ENCODED_REGIONNAME' diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index de0efc2e1ee..9758d087b70 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -473,6 +473,11 @@ public class ThriftAdmin implements Admin { } + @Override + public void flush(TableName tableName, byte[] columnFamily) { + throw new NotImplementedException("flush not supported in ThriftAdmin"); + } + @Override public void flushRegion(byte[] regionName) { throw new NotImplementedException("flushRegion not supported in ThriftAdmin");