From 2d4819dbed6ab8dcd180154a640604657dabad0b Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 26 Jan 2019 19:44:10 +0800 Subject: [PATCH] HBASE-21782 LoadIncrementalHFiles should not be IA.Public Signed-off-by: Michael Stack --- .../apache/hadoop/hbase/mapreduce/Driver.java | 13 +-- .../hadoop/hbase/tool/BulkLoadHFiles.java | 100 ++++++++++++++++++ .../hadoop/hbase/tool/BulkLoadHFilesTool.java | 70 ++++++++++++ .../hbase/tool/LoadIncrementalHFiles.java | 88 +++++++-------- .../hbase/tool/TestLoadIncrementalHFiles.java | 9 +- 5 files changed, 224 insertions(+), 56 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java index afa1ba7d53c..18f1617b877 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java @@ -19,12 +19,12 @@ package org.apache.hadoop.hbase.mapreduce; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.snapshot.ExportSnapshot; -import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; import org.apache.hadoop.util.ProgramDriver; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; /** * Driver for hbase mapreduce jobs. Select which to run by passing @@ -33,10 +33,7 @@ import org.apache.hadoop.util.ProgramDriver; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @InterfaceStability.Stable public class Driver { - /** - * @param args - * @throws Throwable - */ + public static void main(String[] args) throws Throwable { ProgramDriver pgd = new ProgramDriver(); @@ -47,7 +44,7 @@ public class Driver { pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS."); pgd.addClass(Import.NAME, Import.class, "Import data written by Export."); pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format."); - pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class, + pgd.addClass(BulkLoadHFilesTool.NAME, BulkLoadHFilesTool.class, "Complete a bulk data load."); pgd.addClass(CopyTable.NAME, CopyTable.class, "Export a table from local cluster to peer cluster."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java new file mode 100644 index 00000000000..f3d627ab2b1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The tool to let you load the output of {@code HFileOutputFormat} into an existing table + * programmatically. Not thread safe. + */ +@InterfaceAudience.Public +public interface BulkLoadHFiles { + + static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException"; + static final String MAX_FILES_PER_REGION_PER_FAMILY = + "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; + static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; + static final String CREATE_TABLE_CONF_KEY = "create.table"; + static final String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families"; + static final String ALWAYS_COPY_FILES = "always.copy.files"; + + /** + * Represents an HFile waiting to be loaded. An queue is used in this class in order to support + * the case where a region has split during the process of the load. When this happens, the HFile + * is split into two physical parts across the new region boundary, and each part is added back + * into the queue. The import process finishes when the queue is empty. + */ + @InterfaceAudience.Public + public static class LoadQueueItem { + + private final byte[] family; + + private final Path hfilePath; + + public LoadQueueItem(byte[] family, Path hfilePath) { + this.family = family; + this.hfilePath = hfilePath; + } + + @Override + public String toString() { + return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString(); + } + + public byte[] getFamily() { + return family; + } + + public Path getFilePath() { + return hfilePath; + } + } + + /** + * Perform a bulk load of the given directory into the given pre-existing table. + * @param tableName the table to load into + * @param family2Files map of family to List of hfiles + * @throws TableNotFoundException if table does not yet exist + */ + Map bulkLoad(TableName tableName, Map> family2Files) + throws TableNotFoundException, IOException; + + /** + * Perform a bulk load of the given directory into the given pre-existing table. + * @param tableName the table to load into + * @param dir the directory that was provided as the output path of a job using + * {@code HFileOutputFormat} + * @throws TableNotFoundException if table does not yet exist + */ + Map bulkLoad(TableName tableName, Path dir) + throws TableNotFoundException, IOException; + + static BulkLoadHFiles create(Configuration conf) { + return new BulkLoadHFilesTool(conf); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java new file mode 100644 index 00000000000..795bd66986a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.tool; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a + * tool. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class BulkLoadHFilesTool extends LoadIncrementalHFiles implements BulkLoadHFiles { + + public static final String NAME = "completebulkload"; + + public BulkLoadHFilesTool(Configuration conf) { + super(conf); + } + + private Map convert( + Map map) { + return map.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + } + + @Override + public Map bulkLoad(TableName tableName, + Map> family2Files) throws TableNotFoundException, IOException { + return convert(run(family2Files, tableName)); + } + + @Override + public Map bulkLoad(TableName tableName, Path dir) + throws TableNotFoundException, IOException { + return convert(run(dir, tableName)); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args); + System.exit(ret); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 3320b1fb7fe..314f2cba97c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.tool; +import static java.lang.String.format; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -46,7 +48,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import static java.lang.String.format; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -87,13 +88,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.FsDelegationToken; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hbase.thirdparty.com.google.common.collect.Maps; -import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; -import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSVisitor; @@ -104,22 +98,41 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; +import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; +import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Tool to load the output of HFileOutputFormat into an existing table. + * @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please + * rewrite your code if you rely on methods other than the {@link #run(Map, TableName)} + * and {@link #run(String, TableName)}, as all the methods other than them will be + * removed with no replacement. */ +@Deprecated @InterfaceAudience.Public public class LoadIncrementalHFiles extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class); - public static final String NAME = "completebulkload"; - static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException"; + /** + * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not + * depend on this value. + */ + @Deprecated + public static final String NAME = BulkLoadHFilesTool.NAME; + static final String RETRY_ON_IO_EXCEPTION = BulkLoadHFiles.RETRY_ON_IO_EXCEPTION; public static final String MAX_FILES_PER_REGION_PER_FAMILY = - "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; - private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; - public final static String CREATE_TABLE_CONF_KEY = "create.table"; - public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families"; - public final static String ALWAYS_COPY_FILES = "always.copy.files"; + BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY; + private static final String ASSIGN_SEQ_IDS = BulkLoadHFiles.ASSIGN_SEQ_IDS; + public final static String CREATE_TABLE_CONF_KEY = BulkLoadHFiles.CREATE_TABLE_CONF_KEY; + public final static String IGNORE_UNMATCHED_CF_CONF_KEY = + BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY; + public final static String ALWAYS_COPY_FILES = BulkLoadHFiles.ALWAYS_COPY_FILES; // We use a '.' prefix which is ignored when walking directory trees // above. It is invalid family name. @@ -142,28 +155,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * the case where a region has split during the process of the load. When this happens, the HFile * is split into two physical parts across the new region boundary, and each part is added back * into the queue. The import process finishes when the queue is empty. + * @deprecated Use {@link BulkLoadHFiles} instead. */ @InterfaceAudience.Public - public static class LoadQueueItem { - private final byte[] family; - private final Path hfilePath; + @Deprecated + public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem { public LoadQueueItem(byte[] family, Path hfilePath) { - this.family = family; - this.hfilePath = hfilePath; - } - - @Override - public String toString() { - return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString(); - } - - public byte[] getFamily() { - return family; - } - - public Path getFilePath() { - return hfilePath; + super(family, hfilePath); } } @@ -825,8 +824,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * If the table is created for the first time, then "completebulkload" reads the files twice. More * modifications necessary if we want to avoid doing it. */ - private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { - final Path hfofDir = new Path(dirPath); + private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException { final FileSystem fs = hfofDir.getFileSystem(getConf()); // Add column families @@ -1148,13 +1146,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return getConf().getBoolean(ALWAYS_COPY_FILES, false); } - /** - * Perform bulk load on the given table. - * @param hfofDir the directory that was provided as the output path of a job using - * HFileOutputFormat - * @param tableName the table to load into - */ - public Map run(String hfofDir, TableName tableName) + protected final Map run(Path hfofDir, TableName tableName) throws IOException { try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { @@ -1169,11 +1161,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { - return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(), + return doBulkLoad(hfofDir, admin, table, locator, isSilence(), isAlwaysCopyFiles()); } } } + /** + * Perform bulk load on the given table. + * @param hfofDir the directory that was provided as the output path of a job using + * HFileOutputFormat + * @param tableName the table to load into + */ + public Map run(String hfofDir, TableName tableName) + throws IOException { + return run(new Path(hfofDir), tableName); + } /** * Perform bulk load on the given table. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java index 85235b64200..129823efb8a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; -import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileTestUtil; @@ -346,7 +345,7 @@ public class TestLoadIncrementalHFiles { if (copyFiles) { conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); } - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); List args = Lists.newArrayList(baseDirectory.toString(), tableName.toString()); if (depth == 3) { args.add("-loadTable"); @@ -356,17 +355,17 @@ public class TestLoadIncrementalHFiles { if (deleteFile) { fs.delete(last, true); } - Map loaded = loader.run(map, tableName); + Map loaded = loader.bulkLoad(tableName, map); if (deleteFile) { expectedRows -= 1000; - for (LoadQueueItem item : loaded.keySet()) { + for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) { if (item.getFilePath().getName().equals(last.getName())) { fail(last + " should be missing"); } } } } else { - loader.run(args.toArray(new String[]{})); + loader.run(args.toArray(new String[] {})); } if (copyFiles) {