HBASE-24694 Support flush a single column family of table (#2218)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
aab8ce9e01
commit
ebf493f075
|
@ -882,6 +882,16 @@ public interface Admin extends Abortable, Closeable {
|
|||
*/
|
||||
void flush(TableName tableName) throws IOException;
|
||||
|
||||
/**
|
||||
* Flush the specified column family stores on all regions of the passed table.
|
||||
* This runs as a synchronous operation.
|
||||
*
|
||||
* @param tableName table to flush
|
||||
* @param columnFamily column family within a table
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
void flush(TableName tableName, byte[] columnFamily) throws IOException;
|
||||
|
||||
/**
|
||||
* Flush an individual region. Synchronous operation.
|
||||
*
|
||||
|
|
|
@ -306,6 +306,14 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<Void> flush(TableName tableName);
|
||||
|
||||
/**
|
||||
* Flush the specified column family stores on all regions of the passed table.
|
||||
* This runs as a synchronous operation.
|
||||
* @param tableName table to flush
|
||||
* @param columnFamily column family within a table
|
||||
*/
|
||||
CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily);
|
||||
|
||||
/**
|
||||
* Flush an individual region.
|
||||
* @param regionName region to flush
|
||||
|
|
|
@ -244,6 +244,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
return wrap(rawAdmin.flush(tableName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
|
||||
return wrap(rawAdmin.flush(tableName, columnFamily));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> flushRegion(byte[] regionName) {
|
||||
return wrap(rawAdmin.flushRegion(regionName));
|
||||
|
|
|
@ -1151,12 +1151,21 @@ public class HBaseAdmin implements Admin {
|
|||
|
||||
@Override
|
||||
public void flush(final TableName tableName) throws IOException {
|
||||
flush(tableName, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(final TableName tableName, byte[] columnFamily) throws IOException {
|
||||
checkTableExists(tableName);
|
||||
if (isTableDisabled(tableName)) {
|
||||
LOG.info("Table is disabled: " + tableName.getNameAsString());
|
||||
return;
|
||||
}
|
||||
execProcedure("flush-table-proc", tableName.getNameAsString(), new HashMap<>());
|
||||
Map<String, String> props = new HashMap<>();
|
||||
if (columnFamily != null) {
|
||||
props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
|
||||
}
|
||||
execProcedure("flush-table-proc", tableName.getNameAsString(), props);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -891,9 +891,13 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
locs -> locs.stream().map(HRegionLocation::getRegion).collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> flush(TableName tableName) {
|
||||
return flush(tableName, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void> flush(TableName tableName, byte[] columnFamily) {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
addListener(tableExists(tableName), (exists, err) -> {
|
||||
if (err != null) {
|
||||
|
@ -907,8 +911,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
} else if (!tableEnabled) {
|
||||
future.completeExceptionally(new TableNotEnabledException(tableName));
|
||||
} else {
|
||||
Map<String, String> props = new HashMap<>();
|
||||
if (columnFamily != null) {
|
||||
props.put(HConstants.FAMILY_KEY_STR, Bytes.toString(columnFamily));
|
||||
}
|
||||
addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, tableName.getNameAsString(),
|
||||
new HashMap<>()), (ret, err3) -> {
|
||||
props), (ret, err3) -> {
|
||||
if (err3 != null) {
|
||||
future.completeExceptionally(err3);
|
||||
} else {
|
||||
|
|
|
@ -616,6 +616,9 @@ public final class HConstants {
|
|||
*/
|
||||
public static final byte [] META_VERSION_QUALIFIER = Bytes.toBytes("v");
|
||||
|
||||
/** The family str as a key in map*/
|
||||
public static final String FAMILY_KEY_STR = "family";
|
||||
|
||||
/**
|
||||
* The current version of the meta table.
|
||||
* - pre-hbase 0.92. There is no META_VERSION column in the root table
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.procedure.flush;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
|
@ -28,7 +29,9 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
|||
import org.apache.hadoop.hbase.procedure.ProcedureMember;
|
||||
import org.apache.hadoop.hbase.procedure.Subprocedure;
|
||||
import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* This flush region implementation uses the distributed procedure framework to flush
|
||||
|
@ -40,23 +43,27 @@ public class FlushTableSubprocedure extends Subprocedure {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class);
|
||||
|
||||
private final String table;
|
||||
private final String family;
|
||||
private final List<HRegion> regions;
|
||||
private final FlushTableSubprocedurePool taskManager;
|
||||
|
||||
public FlushTableSubprocedure(ProcedureMember member,
|
||||
ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
|
||||
List<HRegion> regions, String table,
|
||||
List<HRegion> regions, String table, String family,
|
||||
FlushTableSubprocedurePool taskManager) {
|
||||
super(member, table, errorListener, wakeFrequency, timeout);
|
||||
this.table = table;
|
||||
this.family = family;
|
||||
this.regions = regions;
|
||||
this.taskManager = taskManager;
|
||||
}
|
||||
|
||||
private static class RegionFlushTask implements Callable<Void> {
|
||||
HRegion region;
|
||||
RegionFlushTask(HRegion region) {
|
||||
List<byte[]> families;
|
||||
RegionFlushTask(HRegion region, List<byte[]> families) {
|
||||
this.region = region;
|
||||
this.families = families;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -65,7 +72,11 @@ public class FlushTableSubprocedure extends Subprocedure {
|
|||
region.startRegionOperation();
|
||||
try {
|
||||
LOG.debug("Flush region " + region.toString() + " started...");
|
||||
region.flush(true);
|
||||
if (families == null) {
|
||||
region.flush(true);
|
||||
} else {
|
||||
region.flushcache(families, false, FlushLifeCycleTracker.DUMMY);
|
||||
}
|
||||
// TODO: flush result is not checked?
|
||||
} finally {
|
||||
LOG.debug("Closing region operation on " + region);
|
||||
|
@ -88,11 +99,15 @@ public class FlushTableSubprocedure extends Subprocedure {
|
|||
throw new IllegalStateException("Attempting to flush "
|
||||
+ table + " but we currently have outstanding tasks");
|
||||
}
|
||||
|
||||
List<byte[]> families = null;
|
||||
if (family != null) {
|
||||
LOG.debug("About to flush family {} on all regions for table {}", family, table);
|
||||
families = Arrays.asList(Bytes.toBytes(family));
|
||||
}
|
||||
// Add all hfiles already existing in region.
|
||||
for (HRegion region : regions) {
|
||||
// submit one task per region for parallelize by region.
|
||||
taskManager.submitTask(new RegionFlushTask(region));
|
||||
taskManager.submitTask(new RegionFlushTask(region, families));
|
||||
monitor.rethrowException();
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -51,6 +52,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
|
||||
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
|
@ -149,11 +151,19 @@ public class MasterFlushTableProcedureManager extends MasterProcedureManager {
|
|||
|
||||
ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
|
||||
|
||||
HBaseProtos.NameStringPair family = null;
|
||||
for (HBaseProtos.NameStringPair nsp : desc.getConfigurationList()) {
|
||||
if (HConstants.FAMILY_KEY_STR.equals(nsp.getName())) {
|
||||
family = nsp;
|
||||
}
|
||||
}
|
||||
byte[] procArgs = family != null ? family.toByteArray() : new byte[0];
|
||||
|
||||
// Kick of the global procedure from the master coordinator to the region servers.
|
||||
// We rely on the existing Distributed Procedure framework to prevent any concurrent
|
||||
// procedure with the same name.
|
||||
Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(),
|
||||
new byte[0], Lists.newArrayList(regionServers));
|
||||
procArgs, Lists.newArrayList(regionServers));
|
||||
monitor.rethrowException();
|
||||
if (proc == null) {
|
||||
String msg = "Failed to submit distributed procedure " + desc.getSignature() + " for '"
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
|
||||
/**
|
||||
* This manager class handles flushing of the regions for table on a {@link HRegionServer}.
|
||||
|
@ -128,10 +129,11 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
|
|||
* Because this gets the local list of regions to flush and not the set the master had,
|
||||
* there is a possibility of a race where regions may be missed.
|
||||
*
|
||||
* @param table
|
||||
* @param table table to flush
|
||||
* @param family column family within a table
|
||||
* @return Subprocedure to submit to the ProcedureMemeber.
|
||||
*/
|
||||
public Subprocedure buildSubprocedure(String table) {
|
||||
public Subprocedure buildSubprocedure(String table, String family) {
|
||||
|
||||
// don't run the subprocedure if the parent is stop(ping)
|
||||
if (rss.isStopping() || rss.isStopped()) {
|
||||
|
@ -162,7 +164,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
|
|||
FlushTableSubprocedurePool taskManager =
|
||||
new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
|
||||
return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
|
||||
timeoutMillis, involvedRegions, table, taskManager);
|
||||
timeoutMillis, involvedRegions, table, family, taskManager);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -183,8 +185,19 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
|
|||
|
||||
@Override
|
||||
public Subprocedure buildSubprocedure(String name, byte[] data) {
|
||||
String family = null;
|
||||
// Currently we do not put other data except family, so it is ok to
|
||||
// judge by length that if family was specified
|
||||
if (data.length > 0) {
|
||||
try {
|
||||
HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data);
|
||||
family = nsp.getValue();
|
||||
} catch (Exception e) {
|
||||
LOG.error("fail to get family by parsing from data", e);
|
||||
}
|
||||
}
|
||||
// The name of the procedure instance from the master is the table name.
|
||||
return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name);
|
||||
return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -117,6 +117,16 @@ public class TestFlushFromClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushTableFamily() throws Exception {
|
||||
try (Admin admin = TEST_UTIL.getAdmin()) {
|
||||
long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
|
||||
admin.flush(tableName, FAMILY_1);
|
||||
assertFalse(getRegionInfo().stream().
|
||||
anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncFlushTable() throws Exception {
|
||||
AsyncAdmin admin = asyncConn.getAdmin();
|
||||
|
@ -124,6 +134,15 @@ public class TestFlushFromClient {
|
|||
assertFalse(getRegionInfo().stream().anyMatch(r -> r.getMemStoreDataSize() != 0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsyncFlushTableFamily() throws Exception {
|
||||
AsyncAdmin admin = asyncConn.getAdmin();
|
||||
long sizeBeforeFlush = getRegionInfo().get(0).getMemStoreDataSize();
|
||||
admin.flush(tableName, FAMILY_1).get();
|
||||
assertFalse(getRegionInfo().stream().
|
||||
anyMatch(r -> r.getMemStoreDataSize() != sizeBeforeFlush / 2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushRegion() throws Exception {
|
||||
try (Admin admin = TEST_UTIL.getAdmin()) {
|
||||
|
|
|
@ -64,7 +64,11 @@ module Hbase
|
|||
rescue java.lang.IllegalArgumentException
|
||||
# Unknown region. Try table.
|
||||
begin
|
||||
@admin.flush(TableName.valueOf(name))
|
||||
if family_bytes.nil?
|
||||
@admin.flush(TableName.valueOf(name))
|
||||
else
|
||||
@admin.flush(TableName.valueOf(name), family_bytes)
|
||||
end
|
||||
rescue java.lang.IllegalArgumentException
|
||||
# Unknown table. Try region server.
|
||||
@admin.flushRegionServer(ServerName.valueOf(name))
|
||||
|
|
|
@ -25,10 +25,12 @@ module Shell
|
|||
Flush all regions in passed table or pass a region row to
|
||||
flush an individual region or a region server name whose format
|
||||
is 'host,port,startcode', to flush all its regions.
|
||||
You can also flush a single column family within a region.
|
||||
You can also flush a single column family for all regions within a table,
|
||||
or for an specific region only.
|
||||
For example:
|
||||
|
||||
hbase> flush 'TABLENAME'
|
||||
hbase> flush 'TABLENAME','FAMILYNAME'
|
||||
hbase> flush 'REGIONNAME'
|
||||
hbase> flush 'REGIONNAME','FAMILYNAME'
|
||||
hbase> flush 'ENCODED_REGIONNAME'
|
||||
|
|
|
@ -630,6 +630,11 @@ public class ThriftAdmin implements Admin {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(TableName tableName, byte[] columnFamily) {
|
||||
throw new NotImplementedException("flush not supported in ThriftAdmin");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushRegion(byte[] regionName) {
|
||||
throw new NotImplementedException("flushRegion not supported in ThriftAdmin");
|
||||
|
|
Loading…
Reference in New Issue