diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index b42fbbefb8f..e8b701b7ae3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -374,9 +374,15 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To * hfiles that need to be retried. If it is successful it will return an empty list. NOTE: To * maintain row atomicity guarantees, region server side should succeed atomically and fails * atomically. + * @param conn Connection to use + * @param tableName Table to which these hfiles should be loaded to + * @param copyFiles whether replicate to peer cluster while bulkloading + * @param first the start key of region + * @param lqis hfiles should be loaded * @return empty list if success, list of items to retry on recoverable failure */ - private CompletableFuture> tryAtomicRegionLoad( + @VisibleForTesting + protected CompletableFuture> tryAtomicRegionLoad( final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles, final byte[] first, Collection lqis) { List> familyPaths = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java index 6d0e9142f6f..d90d75fc5ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java @@ -26,10 +26,13 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -41,6 +44,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Table; @@ -102,7 +106,6 @@ public class TestBulkLoadHFiles { // change default behavior so that tag values are returned with normal rpcs util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getCanonicalName()); - util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); util.startMiniCluster(); setupNamespace(); @@ -741,4 +744,42 @@ public class TestBulkLoadHFiles { assertEquals(1000, countRows(table)); } } + + @Test + public void testBulkLoadByFamily() throws Exception { + Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily"); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + String tableName = tn.getMethodName(); + String[] families = { "cf1", "cf2", "cf3" }; + for (int i = 0; i < families.length; i++) { + byte[] from = Bytes.toBytes(i + "begin"); + byte[] to = Bytes.toBytes(i + "end"); + Path familyDir = new Path(dir, families[i]); + HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"), + Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000); + } + Table table = util.createTable(TableName.valueOf(tableName), families); + final AtomicInteger attmptedCalls = new AtomicInteger(); + util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); + BulkLoadHFiles loader = new BulkLoadHFilesTool(util.getConfiguration()) { + @Override + protected CompletableFuture> tryAtomicRegionLoad( + final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles, + final byte[] first, Collection lqis) { + attmptedCalls.incrementAndGet(); + return super.tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis); + } + }; + try { + loader.bulkLoad(table.getName(), dir); + assertEquals(families.length, attmptedCalls.get()); + assertEquals(1000 * families.length, HBaseTestingUtility.countRows(table)); + } finally { + if (null != table) { + table.close(); + } + util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java deleted file mode 100644 index 42a81a95435..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.tool; - -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.junit.BeforeClass; -import org.junit.ClassRule; - -public class TestBulkLoadHFilesByFamily extends TestBulkLoadHFiles { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestBulkLoadHFilesByFamily.class); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); - util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY, - MAX_FILES_PER_REGION_PER_FAMILY); - // change default behavior so that tag values are returned with normal rpcs - util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, - KeyValueCodecWithTags.class.getCanonicalName()); - util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); - util.startMiniCluster(); - setupNamespace(); - } -}