HBASE-24694 Support flush a single column family of table (#2179)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
bsglz 2020-08-07 18:28:53 +08:00 committed by GitHub
parent 9b49bd6b66
commit 485e0d2fa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 124 additions and 13 deletions

View File

@ -513,6 +513,16 @@ public interface Admin extends Abortable, Closeable {
*/ */
void flush(TableName tableName) throws IOException; void flush(TableName tableName) throws IOException;
/**
* Flush the specified column family stores on all regions of the passed table.
* This runs as a synchronous operation.
*
* @param tableName table to flush
* @param columnFamily column family within a table
* @throws IOException if a remote or network exception occurs
*/
void flush(TableName tableName, byte[] columnFamily) throws IOException;
/** /**
* Flush an individual region. Synchronous operation. * Flush an individual region. Synchronous operation.
* *

View File

@ -244,6 +244,11 @@ class AdminOverAsyncAdmin implements Admin {
get(admin.flush(tableName)); get(admin.flush(tableName));
} }
@Override
public void flush(TableName tableName, byte[] columnFamily) throws IOException {
get(admin.flush(tableName, columnFamily));
}
@Override @Override
public void flushRegion(byte[] regionName) throws IOException { public void flushRegion(byte[] regionName) throws IOException {
get(admin.flushRegion(regionName)); get(admin.flushRegion(regionName));

View File

@ -302,6 +302,14 @@ public interface AsyncAdmin {
*/ */
CompletableFuture<Void> flush(TableName tableName); CompletableFuture<Void> flush(TableName tableName);
/**
* Flush the specified column family stores on all regions of the passed table.
* This runs as a synchronous operation.
* @param tableName table to flush
* @param columnFamily column family within a table
*/
CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily);
/** /**
* Flush an individual region. * Flush an individual region.
* @param regionName region to flush * @param regionName region to flush

View File

@ -243,6 +243,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.flush(tableName)); return wrap(rawAdmin.flush(tableName));
} }
@Override
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
return wrap(rawAdmin.flush(tableName, columnFamily));
}
@Override @Override
public CompletableFuture<Void> flushRegion(byte[] regionName) { public CompletableFuture<Void> flushRegion(byte[] regionName) {
return wrap(rawAdmin.flushRegion(regionName)); return wrap(rawAdmin.flushRegion(regionName));

View File

@ -893,9 +893,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList())); locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
} }
} }
@Override @Override
public CompletableFuture<Void> flush(TableName tableName) { public CompletableFuture<Void> flush(TableName tableName) {
return flush(tableName, null);
}
@Override
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
addListener(tableExists(tableName), (exists, err) -> { addListener(tableExists(tableName), (exists, err) -> {
if (err != null) { if (err != null) {
@ -909,8 +913,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
} else if (!tableEnabled) { } else if (!tableEnabled) {
future.completeExceptionally(new TableNotEnabledException(tableName)); future.completeExceptionally(new TableNotEnabledException(tableName));
} else { } else {
Map<String, String> props = new HashMap<>();
if (columnFamily != null) {
props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
}
addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(), addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
new HashMap<>()), (ret, err3) -> { props), (ret, err3) -> {
if (err3 != null) { if (err3 != null) {
future.completeExceptionally(err3); future.completeExceptionally(err3);
} else { } else {

View File

@ -581,6 +581,9 @@ public final class HConstants {
*/ */
public static final byte [] META_VERSION_QUALIFIER = Bytes.toBytes("v"); public static final byte [] META_VERSION_QUALIFIER = Bytes.toBytes("v");
/** The family str as a key in map*/
public static final String FAMILY_KEY_STR = "family";
/** /**
* The current version of the meta table. * The current version of the meta table.
* - pre-hbase 0.92. There is no META_VERSION column in the root table * - pre-hbase 0.92. There is no META_VERSION column in the root table

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.procedure.flush; package org.apache.hadoop.hbase.procedure.flush;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -28,7 +29,9 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool; import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
/** /**
* This flush region implementation uses the distributed procedure framework to flush * This flush region implementation uses the distributed procedure framework to flush
@ -40,23 +43,27 @@ public class FlushTableSubprocedure extends Subprocedure {
private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class); private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class);
private final String table; private final String table;
private final String family;
private final List<HRegion> regions; private final List<HRegion> regions;
private final FlushTableSubprocedurePool taskManager; private final FlushTableSubprocedurePool taskManager;
public FlushTableSubprocedure(ProcedureMember member, public FlushTableSubprocedure(ProcedureMember member,
ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
List<HRegion> regions, String table, List<HRegion> regions, String table, String family,
FlushTableSubprocedurePool taskManager) { FlushTableSubprocedurePool taskManager) {
super(member, table, errorListener, wakeFrequency, timeout); super(member, table, errorListener, wakeFrequency, timeout);
this.table = table; this.table = table;
this.family = family;
this.regions = regions; this.regions = regions;
this.taskManager = taskManager; this.taskManager = taskManager;
} }
private static class RegionFlushTask implements Callable<Void> { private static class RegionFlushTask implements Callable<Void> {
HRegion region; HRegion region;
RegionFlushTask(HRegion region) { List<byte[]> families;
RegionFlushTask(HRegion region, List<byte[]> families) {
this.region = region; this.region = region;
this.families = families;
} }
@Override @Override
@ -65,7 +72,11 @@ public class FlushTableSubprocedure extends Subprocedure {
region.startRegionOperation(); region.startRegionOperation();
try { try {
LOG.debug("Flush region " + region.toString() + " started..."); LOG.debug("Flush region " + region.toString() + " started...");
region.flush(true); if (families == null) {
region.flush(true);
} else {
region.flushcache(families, false, FlushLifeCycleTracker.DUMMY);
}
// TODO: flush result is not checked? // TODO: flush result is not checked?
} finally { } finally {
LOG.debug("Closing region operation on " + region); LOG.debug("Closing region operation on " + region);
@ -88,11 +99,15 @@ public class FlushTableSubprocedure extends Subprocedure {
throw new IllegalStateException("Attempting to flush " throw new IllegalStateException("Attempting to flush "
+ table + " but we currently have outstanding tasks"); + table + " but we currently have outstanding tasks");
} }
List<byte[]> families = null;
if (family != null) {
LOG.debug("About to flush family {} on all regions for table {}", family, table);
families = Arrays.asList(Bytes.toBytes(family));
}
// Add all hfiles already existing in region. // Add all hfiles already existing in region.
for (HRegion region : regions) { for (HRegion region : regions) {
// submit one task per region for parallelize by region. // submit one task per region for parallelize by region.
taskManager.submitTask(new RegionFlushTask(region)); taskManager.submitTask(new RegionFlushTask(region, families));
monitor.rethrowException(); monitor.rethrowException();
} }

View File

@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -51,6 +52,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@ -149,11 +151,19 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager {
ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
HBaseProtos.NameStringPair family = null;
for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) {
if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) {
family = nsp;
}
}
byte[] procArgs = family != null ? family.toByteArray() : new byte[0];
// Kick of the global procedure from the master coordinator to the region servers. // Kick of the global procedure from the master coordinator to the region servers.
// We rely on the existing Distributed Procedure framework to prevent any concurrent // We rely on the existing Distributed Procedure framework to prevent any concurrent
// procedure with the same name. // procedure with the same name.
Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(),
new byte[0], Lists.newArrayList(regionServers)); procArgs, Lists.newArrayList(regionServers));
monitor.rethrowException(); monitor.rethrowException();
if (proc == null) { if (proc == null) {
String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '" String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '"

View File

@ -51,6 +51,7 @@ import org.apache.zookeeper.KeeperException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
/** /**
* This manager class handles flushing of the regions for table on a {@link HRegionServer}. * This manager class handles flushing of the regions for table on a {@link HRegionServer}.
@ -129,9 +130,10 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
* there is a possibility of a race where regions may be missed. * there is a possibility of a race where regions may be missed.
* *
* @param table * @param table
* @param family
* @return Subprocedure to submit to the ProcedureMemeber. * @return Subprocedure to submit to the ProcedureMemeber.
*/ */
public Subprocedure buildSubprocedure(String table) { public Subprocedure buildSubprocedure(String table, String family) {
// don't run the subprocedure if the parent is stop(ping) // don't run the subprocedure if the parent is stop(ping)
if (rss.isStopping() || rss.isStopped()) { if (rss.isStopping() || rss.isStopped()) {
@ -162,7 +164,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
FlushTableSubprocedurePool taskManager = FlushTableSubprocedurePool taskManager =
new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
timeoutMillis, involvedRegions, table, taskManager); timeoutMillis, involvedRegions, table, family, taskManager);
} }
/** /**
@ -183,8 +185,19 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
@Override @Override
public Subprocedure buildSubprocedure(String name, byte[] data) { public Subprocedure buildSubprocedure(String name, byte[] data) {
String family = null;
// Currently we do not put other data except family, so it is ok to
// judge by length that if family was specified
if (data.length > 0) {
try {
HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data);
family = nsp.getValue();
} catch (Exception e) {
LOG.error("fail to get family by parsing from data", e);
}
}
// The name of the procedure instance from the master is the table name. // The name of the procedure instance from the master is the table name.
return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name); return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family);
} }
} }

View File

@ -117,6 +117,16 @@ public class TestFlushFromClient {
} }
} }
@Test
public void testFlushTableFamily() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
admin.flush(tableName, FAMILY_1);
assertFalse(getRegionInfo().stream().
anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
}
}
@Test @Test
public void testAsyncFlushTable() throws Exception { public void testAsyncFlushTable() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin(); AsyncAdmin admin = asyncConn.getAdmin();
@ -124,6 +134,15 @@ public class TestFlushFromClient {
assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0)); assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
} }
@Test
public void testAsyncFlushTableFamily() throws Exception {
AsyncAdmin admin = asyncConn.getAdmin();
long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
admin.flush(tableName, FAMILY_1).get();
assertFalse(getRegionInfo().stream().
anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
}
@Test @Test
public void testFlushRegion() throws Exception { public void testFlushRegion() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) { try (Admin admin = TEST_UTIL.getAdmin()) {

View File

@ -220,6 +220,10 @@ public class VerifyingRSGroupAdmin implements Admin, Closeable {
admin.flush(tableName); admin.flush(tableName);
} }
public void flush(TableName tableName, byte[] columnFamily) throws IOException {
admin.flush(tableName, columnFamily);
}
public void flushRegion(byte[] regionName) throws IOException { public void flushRegion(byte[] regionName) throws IOException {
admin.flushRegion(regionName); admin.flushRegion(regionName);
} }

View File

@ -67,7 +67,11 @@ module Hbase
rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException rescue java.lang.IllegalArgumentException, org.apache.hadoop.hbase.UnknownRegionException
# Unknown region. Try table. # Unknown region. Try table.
begin begin
@admin.flush(TableName.valueOf(name)) if family_bytes.nil?
@admin.flush(TableName.valueOf(name))
else
@admin.flush(TableName.valueOf(name), family_bytes)
end
rescue java.lang.IllegalArgumentException rescue java.lang.IllegalArgumentException
# Unknown table. Try region server. # Unknown table. Try region server.
@admin.flushRegionServer(ServerName.valueOf(name)) @admin.flushRegionServer(ServerName.valueOf(name))

View File

@ -25,10 +25,12 @@ module Shell
Flush all regions in passed table or pass a region row to Flush all regions in passed table or pass a region row to
flush an individual region or a region server name whose format flush an individual region or a region server name whose format
is 'host,port,startcode', to flush all its regions. is 'host,port,startcode', to flush all its regions.
You can also flush a single column family within a region. You can also flush a single column family for all regions within a table,
or for an specific region only.
For example: For example:
hbase> flush 'TABLENAME' hbase> flush 'TABLENAME'
hbase> flush 'TABLENAME','FAMILYNAME'
hbase> flush 'REGIONNAME' hbase> flush 'REGIONNAME'
hbase> flush 'REGIONNAME','FAMILYNAME' hbase> flush 'REGIONNAME','FAMILYNAME'
hbase> flush 'ENCODED_REGIONNAME' hbase> flush 'ENCODED_REGIONNAME'

View File

@ -473,6 +473,11 @@ public class ThriftAdmin implements Admin {
} }
@Override
public void flush(TableName tableName, byte[] columnFamily) {
throw new NotImplementedException("flush not supported in ThriftAdmin");
}
@Override @Override
public void flushRegion(byte[] regionName) { public void flushRegion(byte[] regionName) {
throw new NotImplementedException("flushRegion not supported in ThriftAdmin"); throw new NotImplementedException("flushRegion not supported in ThriftAdmin");