HBASE-21782 LoadIncrementalHFiles should not be IA.Public

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
zhangduo 2019-01-26 19:44:10 +08:00
parent 5f25985b98
commit 2d4819dbed
5 changed files with 224 additions and 56 deletions

View File

@ -19,12 +19,12 @@
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.snapshot.ExportSnapshot; import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.util.ProgramDriver; import org.apache.hadoop.util.ProgramDriver;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/** /**
* Driver for hbase mapreduce jobs. Select which to run by passing * Driver for hbase mapreduce jobs. Select which to run by passing
@ -33,10 +33,7 @@ import org.apache.hadoop.util.ProgramDriver;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@InterfaceStability.Stable @InterfaceStability.Stable
public class Driver { public class Driver {
/**
* @param args
* @throws Throwable
*/
public static void main(String[] args) throws Throwable { public static void main(String[] args) throws Throwable {
ProgramDriver pgd = new ProgramDriver(); ProgramDriver pgd = new ProgramDriver();
@ -47,7 +44,7 @@ public class Driver {
pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS."); pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
pgd.addClass(Import.NAME, Import.class, "Import data written by Export."); pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format."); pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format.");
pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class, pgd.addClass(BulkLoadHFilesTool.NAME, BulkLoadHFilesTool.class,
"Complete a bulk data load."); "Complete a bulk data load.");
pgd.addClass(CopyTable.NAME, CopyTable.class, pgd.addClass(CopyTable.NAME, CopyTable.class,
"Export a table from local cluster to peer cluster."); "Export a table from local cluster to peer cluster.");

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.tool;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The tool to let you load the output of {@code HFileOutputFormat} into an existing table
* programmatically. Not thread safe.
*/
@InterfaceAudience.Public
public interface BulkLoadHFiles {
static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
static final String MAX_FILES_PER_REGION_PER_FAMILY =
"hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
static final String CREATE_TABLE_CONF_KEY = "create.table";
static final String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
static final String ALWAYS_COPY_FILES = "always.copy.files";
/**
* Represents an HFile waiting to be loaded. An queue is used in this class in order to support
* the case where a region has split during the process of the load. When this happens, the HFile
* is split into two physical parts across the new region boundary, and each part is added back
* into the queue. The import process finishes when the queue is empty.
*/
@InterfaceAudience.Public
public static class LoadQueueItem {
private final byte[] family;
private final Path hfilePath;
public LoadQueueItem(byte[] family, Path hfilePath) {
this.family = family;
this.hfilePath = hfilePath;
}
@Override
public String toString() {
return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString();
}
public byte[] getFamily() {
return family;
}
public Path getFilePath() {
return hfilePath;
}
}
/**
* Perform a bulk load of the given directory into the given pre-existing table.
* @param tableName the table to load into
* @param family2Files map of family to List of hfiles
* @throws TableNotFoundException if table does not yet exist
*/
Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Map<byte[], List<Path>> family2Files)
throws TableNotFoundException, IOException;
/**
* Perform a bulk load of the given directory into the given pre-existing table.
* @param tableName the table to load into
* @param dir the directory that was provided as the output path of a job using
* {@code HFileOutputFormat}
* @throws TableNotFoundException if table does not yet exist
*/
Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir)
throws TableNotFoundException, IOException;
static BulkLoadHFiles create(Configuration conf) {
return new BulkLoadHFilesTool(conf);
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.tool;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a
* tool.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
public class BulkLoadHFilesTool extends LoadIncrementalHFiles implements BulkLoadHFiles {
public static final String NAME = "completebulkload";
public BulkLoadHFilesTool(Configuration conf) {
super(conf);
}
private Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> convert(
Map<LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> map) {
return map.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}
@Override
public Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName,
Map<byte[], List<Path>> family2Files) throws TableNotFoundException, IOException {
return convert(run(family2Files, tableName));
}
@Override
public Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir)
throws TableNotFoundException, IOException {
return convert(run(dir, tableName));
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
System.exit(ret);
}
}

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.tool; package org.apache.hadoop.hbase.tool;
import static java.lang.String.format;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -46,7 +48,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.lang.String.format;
import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
@ -87,13 +88,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.FSVisitor; import org.apache.hadoop.hbase.util.FSVisitor;
@ -104,22 +98,41 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Tool to load the output of HFileOutputFormat into an existing table. * Tool to load the output of HFileOutputFormat into an existing table.
* @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please
* rewrite your code if you rely on methods other than the {@link #run(Map, TableName)}
* and {@link #run(String, TableName)}, as all the methods other than them will be
* removed with no replacement.
*/ */
@Deprecated
@InterfaceAudience.Public @InterfaceAudience.Public
public class LoadIncrementalHFiles extends Configured implements Tool { public class LoadIncrementalHFiles extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class); private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class);
public static final String NAME = "completebulkload"; /**
static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException"; * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not
* depend on this value.
*/
@Deprecated
public static final String NAME = BulkLoadHFilesTool.NAME;
static final String RETRY_ON_IO_EXCEPTION = BulkLoadHFiles.RETRY_ON_IO_EXCEPTION;
public static final String MAX_FILES_PER_REGION_PER_FAMILY = public static final String MAX_FILES_PER_REGION_PER_FAMILY =
"hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY;
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; private static final String ASSIGN_SEQ_IDS = BulkLoadHFiles.ASSIGN_SEQ_IDS;
public final static String CREATE_TABLE_CONF_KEY = "create.table"; public final static String CREATE_TABLE_CONF_KEY = BulkLoadHFiles.CREATE_TABLE_CONF_KEY;
public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families"; public final static String IGNORE_UNMATCHED_CF_CONF_KEY =
public final static String ALWAYS_COPY_FILES = "always.copy.files"; BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY;
public final static String ALWAYS_COPY_FILES = BulkLoadHFiles.ALWAYS_COPY_FILES;
// We use a '.' prefix which is ignored when walking directory trees // We use a '.' prefix which is ignored when walking directory trees
// above. It is invalid family name. // above. It is invalid family name.
@ -142,28 +155,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* the case where a region has split during the process of the load. When this happens, the HFile * the case where a region has split during the process of the load. When this happens, the HFile
* is split into two physical parts across the new region boundary, and each part is added back * is split into two physical parts across the new region boundary, and each part is added back
* into the queue. The import process finishes when the queue is empty. * into the queue. The import process finishes when the queue is empty.
* @deprecated Use {@link BulkLoadHFiles} instead.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
public static class LoadQueueItem { @Deprecated
private final byte[] family; public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem {
private final Path hfilePath;
public LoadQueueItem(byte[] family, Path hfilePath) { public LoadQueueItem(byte[] family, Path hfilePath) {
this.family = family; super(family, hfilePath);
this.hfilePath = hfilePath;
}
@Override
public String toString() {
return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString();
}
public byte[] getFamily() {
return family;
}
public Path getFilePath() {
return hfilePath;
} }
} }
@ -825,8 +824,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* If the table is created for the first time, then "completebulkload" reads the files twice. More * If the table is created for the first time, then "completebulkload" reads the files twice. More
* modifications necessary if we want to avoid doing it. * modifications necessary if we want to avoid doing it.
*/ */
private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException {
final Path hfofDir = new Path(dirPath);
final FileSystem fs = hfofDir.getFileSystem(getConf()); final FileSystem fs = hfofDir.getFileSystem(getConf());
// Add column families // Add column families
@ -1148,13 +1146,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
return getConf().getBoolean(ALWAYS_COPY_FILES, false); return getConf().getBoolean(ALWAYS_COPY_FILES, false);
} }
/** protected final Map<LoadQueueItem, ByteBuffer> run(Path hfofDir, TableName tableName)
* Perform bulk load on the given table.
* @param hfofDir the directory that was provided as the output path of a job using
* HFileOutputFormat
* @param tableName the table to load into
*/
public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName)
throws IOException { throws IOException {
try (Connection connection = ConnectionFactory.createConnection(getConf()); try (Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin()) { Admin admin = connection.getAdmin()) {
@ -1169,11 +1161,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
} }
try (Table table = connection.getTable(tableName); try (Table table = connection.getTable(tableName);
RegionLocator locator = connection.getRegionLocator(tableName)) { RegionLocator locator = connection.getRegionLocator(tableName)) {
return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(), return doBulkLoad(hfofDir, admin, table, locator, isSilence(),
isAlwaysCopyFiles()); isAlwaysCopyFiles());
} }
} }
} }
/**
* Perform bulk load on the given table.
* @param hfofDir the directory that was provided as the output path of a job using
* HFileOutputFormat
* @param tableName the table to load into
*/
public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName)
throws IOException {
return run(new Path(hfofDir), tableName);
}
/** /**
* Perform bulk load on the given table. * Perform bulk load on the given table.

View File

@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.util.HFileTestUtil;
@ -346,7 +345,7 @@ public class TestLoadIncrementalHFiles {
if (copyFiles) { if (copyFiles) {
conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
} }
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString()); List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString());
if (depth == 3) { if (depth == 3) {
args.add("-loadTable"); args.add("-loadTable");
@ -356,17 +355,17 @@ public class TestLoadIncrementalHFiles {
if (deleteFile) { if (deleteFile) {
fs.delete(last, true); fs.delete(last, true);
} }
Map<LoadQueueItem, ByteBuffer> loaded = loader.run(map, tableName); Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map);
if (deleteFile) { if (deleteFile) {
expectedRows -= 1000; expectedRows -= 1000;
for (LoadQueueItem item : loaded.keySet()) { for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) {
if (item.getFilePath().getName().equals(last.getName())) { if (item.getFilePath().getName().equals(last.getName())) {
fail(last + " should be missing"); fail(last + " should be missing");
} }
} }
} }
} else { } else {
loader.run(args.toArray(new String[]{})); loader.run(args.toArray(new String[] {}));
} }
if (copyFiles) { if (copyFiles) {