From 897678130280a61e3d69900207b00561953911f3 Mon Sep 17 00:00:00 2001 From: niuyulin Date: Mon, 23 Nov 2020 08:33:41 +0800 Subject: [PATCH] HBASE-25213 Should request Compaction after bulkLoadHFiles is done (#2684) Signed-off-by: Guanghao Zhang --- .../hadoop/hbase/regionserver/HRegion.java | 63 ++-- .../hbase/regionserver/TestBulkLoad.java | 275 +++--------------- .../hbase/regionserver/TestBulkloadBase.java | 215 ++++++++++++++ .../TestCompactionAfterBulkLoad.java | 110 +++++++ 4 files changed, 414 insertions(+), 249 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkloadBase.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c123faf4b07..964523554fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7047,6 +7047,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } isSuccessful = true; + //request compaction + familyWithFinalPath.keySet().forEach(family -> { + HStore store = getStore(family); + try { + if (this.rsServices != null && store.needsCompaction()) { + this.rsServices.getCompactionRequestor().requestCompaction(this, store, + "bulkload hfiles request compaction", Store.PRIORITY_USER + 1, + CompactionLifeCycleTracker.DUMMY, null); + } + } catch (IOException e) { + LOG.error("bulkload hfiles request compaction error ", e); + } + }); } finally { if (wal != null && !storeFiles.isEmpty()) { // Write a bulk load event for hfiles that are loaded @@ -7786,20 +7799,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Utility methods /** - * A utility method to create new instances of HRegion based on the - * {@link HConstants#REGION_IMPL} configuration property. - * @param tableDir qualified path of directory where region should be located, - * usually the table directory. - * @param wal The WAL is the outbound log for any updates to the HRegion - * The wal file is a logfile from the previous execution that's - * custom-computed for this HRegion. The HRegionServer computes and sorts the - * appropriate wal info for this HRegion. If there is a previous file - * (implying that the HRegion has been written-to before), then read it from - * the supplied path. + * A utility method to create new instances of HRegion based on the {@link HConstants#REGION_IMPL} + * configuration property. + * @param tableDir qualified path of directory where region should be located, usually the table + * directory. + * @param wal The WAL is the outbound log for any updates to the HRegion The wal file is a logfile + * from the previous execution that's custom-computed for this HRegion. The HRegionServer + * computes and sorts the appropriate wal info for this HRegion. If there is a previous + * file (implying that the HRegion has been written-to before), then read it from the + * supplied path. * @param fs is the filesystem. * @param conf is global configuration settings. - * @param regionInfo - RegionInfo that describes the region - * is new), then read them from the supplied path. + * @param regionInfo - RegionInfo that describes the region is new), then read them from the + * supplied path. * @param htd the table descriptor * @return the new instance */ @@ -7825,7 +7837,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Convenience method creating new HRegions. Used by createTable. - * * @param info Info for region to create. * @param rootDir Root directory for HBase instance * @param wal shared WAL @@ -7833,14 +7844,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return new HRegion */ public static HRegion createHRegion(final RegionInfo info, final Path rootDir, - final Configuration conf, final TableDescriptor hTableDescriptor, final WAL wal, - final boolean initialize) throws IOException { - LOG.info("creating " + info + ", tableDescriptor=" + - (hTableDescriptor == null ? "null" : hTableDescriptor) + ", regionDir=" + rootDir); + final Configuration conf, final TableDescriptor hTableDescriptor, final WAL wal, + final boolean initialize) throws IOException { + return createHRegion(info, rootDir, conf, hTableDescriptor, wal, initialize, null); + } + + /** + * Convenience method creating new HRegions. Used by createTable. + * @param info Info for region to create. + * @param rootDir Root directory for HBase instance + * @param wal shared WAL + * @param initialize - true to initialize the region + * @param rsRpcServices An interface we can request flushes against. + * @return new HRegion + */ + public static HRegion createHRegion(final RegionInfo info, final Path rootDir, + final Configuration conf, final TableDescriptor hTableDescriptor, final WAL wal, + final boolean initialize, RegionServerServices rsRpcServices) throws IOException { + LOG.info("creating " + info + ", tableDescriptor=" + + (hTableDescriptor == null ? "null" : hTableDescriptor) + ", regionDir=" + rootDir); createRegionDir(conf, info, rootDir); FileSystem fs = rootDir.getFileSystem(conf); Path tableDir = CommonFSUtils.getTableDir(rootDir, info.getTable()); - HRegion region = HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, null); + HRegion region = + HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, rsRpcServices); if (initialize) { region.initialize(null); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 595321ed601..3a934b74935 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -18,92 +18,42 @@ package org.apache.hadoop.hbase.regionserver; import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.hamcrest.MockitoHamcrest.argThat; -import java.io.File; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import java.util.Random; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; + import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; -import org.junit.Before; import org.junit.ClassRule; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TemporaryFolder; -import org.junit.rules.TestName; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; /** * This class attempts to unit test bulk HLog loading. */ @Category(SmallTests.class) -public class TestBulkLoad { +public class TestBulkLoad extends TestBulkloadBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestBulkLoad.class); - @ClassRule - public static TemporaryFolder testFolder = new TemporaryFolder(); - private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private final WAL log = mock(WAL.class); - private final Configuration conf = HBaseConfiguration.create(); - private final Random random = new Random(); - private final byte[] randomBytes = new byte[100]; - private final byte[] family1 = Bytes.toBytes("family1"); - private final byte[] family2 = Bytes.toBytes("family2"); - private final byte[] family3 = Bytes.toBytes("family3"); - - @Rule - public TestName name = new TestName(); - - @Before - public void before() throws IOException { - random.nextBytes(randomBytes); - // Mockito.when(log.append(htd, info, key, edits, inMemstore)); - } - @Test public void verifyBulkLoadEvent() throws IOException { TableName tableName = TableName.valueOf("test", "test"); @@ -113,22 +63,22 @@ public class TestBulkLoad { storeFileName = (new Path(storeFileName)).getName(); List storeFileNames = new ArrayList<>(); storeFileNames.add(storeFileName); - when(log.appendMarker(any(), any(), argThat( - bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), familyName, storeFileNames)))). - thenAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) { - WALKeyImpl walKey = invocation.getArgument(1); - MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); - if (mvcc != null) { - MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); - walKey.setWriteEntry(we); - } - return 01L; - }; - }); - testRegionWithFamiliesAndSpecifiedTableName(tableName, family1) - .bulkLoadHFiles(familyPaths, false, null); + when(log.appendMarker(any(), any(), + argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), familyName, storeFileNames)))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + WALKeyImpl walKey = invocation.getArgument(1); + MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); + if (mvcc != null) { + MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); + walKey.setWriteEntry(we); + } + return 01L; + }; + }); + testRegionWithFamiliesAndSpecifiedTableName(tableName, family1).bulkLoadHFiles(familyPaths, + false, null); verify(log).sync(anyLong()); } @@ -139,19 +89,19 @@ public class TestBulkLoad { @Test public void shouldBulkLoadSingleFamilyHLog() throws IOException { - when(log.appendMarker(any(), - any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) { - WALKeyImpl walKey = invocation.getArgument(1); - MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); - if (mvcc != null) { - MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); - walKey.setWriteEntry(we); - } - return 01L; - }; - }); + when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + WALKeyImpl walKey = invocation.getArgument(1); + MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); + if (mvcc != null) { + MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); + walKey.setWriteEntry(we); + } + return 01L; + }; + }); testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null); verify(log).sync(anyLong()); } @@ -178,19 +128,19 @@ public class TestBulkLoad { @Test public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException { - when(log.appendMarker(any(), - any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) { - WALKeyImpl walKey = invocation.getArgument(1); - MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); - if (mvcc != null) { - MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); - walKey.setWriteEntry(we); - } - return 01L; - }; - }); + when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + WALKeyImpl walKey = invocation.getArgument(1); + MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); + if (mvcc != null) { + MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); + walKey.setWriteEntry(we); + } + return 01L; + }; + }); TableName tableName = TableName.valueOf("test", "test"); testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2) .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null); @@ -237,141 +187,4 @@ public class TestBulkLoad { list.addAll(asList(withMissingHFileForFamily(family2))); testRegionWithFamilies(family1, family2).bulkLoadHFiles(list, false, null); } - - private Pair withMissingHFileForFamily(byte[] family) { - return new Pair<>(family, getNotExistFilePath()); - } - - private String getNotExistFilePath() { - Path path = new Path(TEST_UTIL.getDataTestDir(), "does_not_exist"); - return path.toUri().getPath(); - } - - private Pair withInvalidColumnFamilyButProperHFileLocation(byte[] family) - throws IOException { - createHFileForFamilies(family); - return new Pair<>(new byte[]{0x00, 0x01, 0x02}, getNotExistFilePath()); - } - - - private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName, - byte[]... families) - throws IOException { - HRegionInfo hRegionInfo = new HRegionInfo(tableName); - HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); - for (byte[] family : families) { - hTableDescriptor.addFamily(new HColumnDescriptor(family)); - } - ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, - 0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); - // TODO We need a way to do this without creating files - return HRegion.createHRegion(hRegionInfo, - new Path(testFolder.newFolder().toURI()), - conf, - hTableDescriptor, - log); - - } - - private HRegion testRegionWithFamilies(byte[]... families) throws IOException { - TableName tableName = TableName.valueOf(name.getMethodName()); - return testRegionWithFamiliesAndSpecifiedTableName(tableName, families); - } - - private List> getBlankFamilyPaths(){ - return new ArrayList<>(); - } - - private List> withFamilyPathsFor(byte[]... families) throws IOException { - List> familyPaths = getBlankFamilyPaths(); - for (byte[] family : families) { - familyPaths.add(new Pair<>(family, createHFileForFamilies(family))); - } - return familyPaths; - } - - private String createHFileForFamilies(byte[] family) throws IOException { - HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf); - // TODO We need a way to do this without creating files - File hFileLocation = testFolder.newFile(); - FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); - try { - hFileFactory.withOutputStream(out); - hFileFactory.withFileContext(new HFileContextBuilder().build()); - HFile.Writer writer = hFileFactory.create(); - try { - writer.append(new KeyValue(CellUtil.createCell(randomBytes, - family, - randomBytes, - 0L, - KeyValue.Type.Put.getCode(), - randomBytes))); - } finally { - writer.close(); - } - } finally { - out.close(); - } - return hFileLocation.getAbsoluteFile().getAbsolutePath(); - } - - private static Matcher bulkLogWalEditType(byte[] typeBytes) { - return new WalMatcher(typeBytes); - } - - private static Matcher bulkLogWalEdit(byte[] typeBytes, byte[] tableName, - byte[] familyName, List storeFileNames) { - return new WalMatcher(typeBytes, tableName, familyName, storeFileNames); - } - - private static class WalMatcher extends TypeSafeMatcher { - private final byte[] typeBytes; - private final byte[] tableName; - private final byte[] familyName; - private final List storeFileNames; - - public WalMatcher(byte[] typeBytes) { - this(typeBytes, null, null, null); - } - - public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName, - List storeFileNames) { - this.typeBytes = typeBytes; - this.tableName = tableName; - this.familyName = familyName; - this.storeFileNames = storeFileNames; - } - - @Override - protected boolean matchesSafely(WALEdit item) { - assertTrue(Arrays.equals(CellUtil.cloneQualifier(item.getCells().get(0)), typeBytes)); - BulkLoadDescriptor desc; - try { - desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0)); - } catch (IOException e) { - return false; - } - assertNotNull(desc); - - if (tableName != null) { - assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(), - tableName)); - } - - if(storeFileNames != null) { - int index=0; - StoreDescriptor store = desc.getStores(0); - assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName)); - assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName)); - assertEquals(storeFileNames.size(), store.getStoreFileCount()); - } - - return true; - } - - @Override - public void describeTo(Description description) { - - } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkloadBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkloadBase.java new file mode 100644 index 00000000000..a2e398ff3a5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkloadBase.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; + +public class TestBulkloadBase { + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + protected final WAL log = mock(WAL.class); + protected final Configuration conf = HBaseConfiguration.create(); + private final Random random = new Random(); + private final byte[] randomBytes = new byte[100]; + protected final byte[] family1 = Bytes.toBytes("family1"); + protected final byte[] family2 = Bytes.toBytes("family2"); + protected final byte[] family3 = Bytes.toBytes("family3"); + + @Rule + public TestName name = new TestName(); + + @Before + public void before() throws IOException { + random.nextBytes(randomBytes); + } + + protected Pair withMissingHFileForFamily(byte[] family) { + return new Pair<>(family, getNotExistFilePath()); + } + + private String getNotExistFilePath() { + Path path = new Path(TEST_UTIL.getDataTestDir(), "does_not_exist"); + return path.toUri().getPath(); + } + + protected Pair withInvalidColumnFamilyButProperHFileLocation(byte[] family) + throws IOException { + createHFileForFamilies(family); + return new Pair<>(new byte[] { 0x00, 0x01, 0x02 }, getNotExistFilePath()); + } + + protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName, + byte[]... families) throws IOException { + RegionInfo hRegionInfo = RegionInfoBuilder.newBuilder(tableName).build(); + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); + + for (byte[] family : families) { + builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); + } + ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, + MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); + // TODO We need a way to do this without creating files + return HRegion.createHRegion(hRegionInfo, new Path(testFolder.newFolder().toURI()), conf, + builder.build(), log); + + } + + protected HRegion testRegionWithFamilies(byte[]... families) throws IOException { + TableName tableName = TableName.valueOf(name.getMethodName()); + return testRegionWithFamiliesAndSpecifiedTableName(tableName, families); + } + + private List> getBlankFamilyPaths() { + return new ArrayList<>(); + } + + protected List> withFamilyPathsFor(byte[]... families) throws IOException { + List> familyPaths = getBlankFamilyPaths(); + for (byte[] family : families) { + familyPaths.add(new Pair<>(family, createHFileForFamilies(family))); + } + return familyPaths; + } + + private String createHFileForFamilies(byte[] family) throws IOException { + HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf); + // TODO We need a way to do this without creating files + File hFileLocation = testFolder.newFile(); + FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); + try { + hFileFactory.withOutputStream(out); + hFileFactory.withFileContext(new HFileContextBuilder().build()); + HFile.Writer writer = hFileFactory.create(); + try { + writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY) + .setRow(randomBytes).setFamily(family).setQualifier(randomBytes).setTimestamp(0L) + .setType(KeyValue.Type.Put.getCode()).setValue(randomBytes).build())); + } finally { + writer.close(); + } + } finally { + out.close(); + } + return hFileLocation.getAbsoluteFile().getAbsolutePath(); + } + + protected static Matcher bulkLogWalEditType(byte[] typeBytes) { + return new WalMatcher(typeBytes); + } + + protected static Matcher bulkLogWalEdit(byte[] typeBytes, byte[] tableName, + byte[] familyName, List storeFileNames) { + return new WalMatcher(typeBytes, tableName, familyName, storeFileNames); + } + + private static class WalMatcher extends TypeSafeMatcher { + private final byte[] typeBytes; + private final byte[] tableName; + private final byte[] familyName; + private final List storeFileNames; + + public WalMatcher(byte[] typeBytes) { + this(typeBytes, null, null, null); + } + + public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName, + List storeFileNames) { + this.typeBytes = typeBytes; + this.tableName = tableName; + this.familyName = familyName; + this.storeFileNames = storeFileNames; + } + + @Override + protected boolean matchesSafely(WALEdit item) { + assertTrue(Arrays.equals(CellUtil.cloneQualifier(item.getCells().get(0)), typeBytes)); + WALProtos.BulkLoadDescriptor desc; + try { + desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0)); + } catch (IOException e) { + return false; + } + assertNotNull(desc); + + if (tableName != null) { + assertTrue( + Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(), tableName)); + } + + if (storeFileNames != null) { + int index = 0; + WALProtos.StoreDescriptor store = desc.getStores(0); + assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName)); + assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName)); + assertEquals(storeFileNames.size(), store.getStoreFileCount()); + } + + return true; + } + + @Override + public void describeTo(Description description) { + + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java new file mode 100644 index 00000000000..423659ecaa8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +@Category(SmallTests.class) +public class TestCompactionAfterBulkLoad extends TestBulkloadBase { + private final RegionServerServices regionServerServices = mock(RegionServerServices.class); + private final CompactionRequester compactionRequester = mock(CompactSplit.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class); + + @Override + protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName, + byte[]... families) throws IOException { + RegionInfo hRegionInfo = RegionInfoBuilder.newBuilder(tableName).build(); + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); + + for (byte[] family : families) { + builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)); + } + ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, + MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); + // TODO We need a way to do this without creating files + return HRegion.createHRegion(hRegionInfo, new Path(testFolder.newFolder().toURI()), conf, + builder.build(), log, true, regionServerServices); + + } + + @Test + public void shouldRequestCompactionAfterBulkLoad() throws IOException { + List> familyPaths = new ArrayList<>(); + // enough hfile to request compaction + for (int i = 0; i < 5; i++) { + familyPaths.addAll(withFamilyPathsFor(family1, family2, family3)); + } + when(regionServerServices.getConfiguration()).thenReturn(conf); + when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester); + when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + WALKeyImpl walKey = invocation.getArgument(1); + MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); + if (mvcc != null) { + MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); + walKey.setWriteEntry(we); + } + return 01L; + } + }); + + Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(), + any(), any()); + testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null); + // invoke three times for 3 families + verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class), + isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null)); + } +}