HBASE-25213 Should request Compaction after bulkLoadHFiles is done (#2684)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
niuyulin 2020-11-23 08:33:41 +08:00 committed by GitHub
parent febd832f2f
commit 8976781302
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 414 additions and 249 deletions

View File

@ -7047,6 +7047,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
isSuccessful = true; isSuccessful = true;
//request compaction
familyWithFinalPath.keySet().forEach(family -> {
HStore store = getStore(family);
try {
if (this.rsServices != null && store.needsCompaction()) {
this.rsServices.getCompactionRequestor().requestCompaction(this, store,
"bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
CompactionLifeCycleTracker.DUMMY, null);
}
} catch (IOException e) {
LOG.error("bulkload hfiles request compaction error ", e);
}
});
} finally { } finally {
if (wal != null && !storeFiles.isEmpty()) { if (wal != null && !storeFiles.isEmpty()) {
// Write a bulk load event for hfiles that are loaded // Write a bulk load event for hfiles that are loaded
@ -7786,20 +7799,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Utility methods // Utility methods
/** /**
* A utility method to create new instances of HRegion based on the * A utility method to create new instances of HRegion based on the {@link HConstants#REGION_IMPL}
* {@link HConstants#REGION_IMPL} configuration property. * configuration property.
* @param tableDir qualified path of directory where region should be located, * @param tableDir qualified path of directory where region should be located, usually the table
* usually the table directory. * directory.
* @param wal The WAL is the outbound log for any updates to the HRegion * @param wal The WAL is the outbound log for any updates to the HRegion The wal file is a logfile
* The wal file is a logfile from the previous execution that's * from the previous execution that's custom-computed for this HRegion. The HRegionServer
* custom-computed for this HRegion. The HRegionServer computes and sorts the * computes and sorts the appropriate wal info for this HRegion. If there is a previous
* appropriate wal info for this HRegion. If there is a previous file * file (implying that the HRegion has been written-to before), then read it from the
* (implying that the HRegion has been written-to before), then read it from * supplied path.
* the supplied path.
* @param fs is the filesystem. * @param fs is the filesystem.
* @param conf is global configuration settings. * @param conf is global configuration settings.
* @param regionInfo - RegionInfo that describes the region * @param regionInfo - RegionInfo that describes the region is new), then read them from the
* is new), then read them from the supplied path. * supplied path.
* @param htd the table descriptor * @param htd the table descriptor
* @return the new instance * @return the new instance
*/ */
@ -7825,7 +7837,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** /**
* Convenience method creating new HRegions. Used by createTable. * Convenience method creating new HRegions. Used by createTable.
*
* @param info Info for region to create. * @param info Info for region to create.
* @param rootDir Root directory for HBase instance * @param rootDir Root directory for HBase instance
* @param wal shared WAL * @param wal shared WAL
@ -7833,14 +7844,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return new HRegion * @return new HRegion
*/ */
public static HRegion createHRegion(final RegionInfo info, final Path rootDir, public static HRegion createHRegion(final RegionInfo info, final Path rootDir,
final Configuration conf, final TableDescriptor hTableDescriptor, final WAL wal, final Configuration conf, final TableDescriptor hTableDescriptor, final WAL wal,
final boolean initialize) throws IOException { final boolean initialize) throws IOException {
LOG.info("creating " + info + ", tableDescriptor=" + return createHRegion(info, rootDir, conf, hTableDescriptor, wal, initialize, null);
(hTableDescriptor == null ? "null" : hTableDescriptor) + ", regionDir=" + rootDir); }
/**
* Convenience method creating new HRegions. Used by createTable.
* @param info Info for region to create.
* @param rootDir Root directory for HBase instance
* @param wal shared WAL
* @param initialize - true to initialize the region
* @param rsRpcServices An interface we can request flushes against.
* @return new HRegion
*/
public static HRegion createHRegion(final RegionInfo info, final Path rootDir,
final Configuration conf, final TableDescriptor hTableDescriptor, final WAL wal,
final boolean initialize, RegionServerServices rsRpcServices) throws IOException {
LOG.info("creating " + info + ", tableDescriptor="
+ (hTableDescriptor == null ? "null" : hTableDescriptor) + ", regionDir=" + rootDir);
createRegionDir(conf, info, rootDir); createRegionDir(conf, info, rootDir);
FileSystem fs = rootDir.getFileSystem(conf); FileSystem fs = rootDir.getFileSystem(conf);
Path tableDir = CommonFSUtils.getTableDir(rootDir, info.getTable()); Path tableDir = CommonFSUtils.getTableDir(rootDir, info.getTable());
HRegion region = HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, null); HRegion region =
HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, rsRpcServices);
if (initialize) { if (initialize) {
region.initialize(null); region.initialize(null);
} }

View File

@ -18,92 +18,42 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.hamcrest.MockitoHamcrest.argThat; import static org.mockito.hamcrest.MockitoHamcrest.argThat;
import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
/** /**
* This class attempts to unit test bulk HLog loading. * This class attempts to unit test bulk HLog loading.
*/ */
@Category(SmallTests.class) @Category(SmallTests.class)
public class TestBulkLoad { public class TestBulkLoad extends TestBulkloadBase {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBulkLoad.class); HBaseClassTestRule.forClass(TestBulkLoad.class);
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final WAL log = mock(WAL.class);
private final Configuration conf = HBaseConfiguration.create();
private final Random random = new Random();
private final byte[] randomBytes = new byte[100];
private final byte[] family1 = Bytes.toBytes("family1");
private final byte[] family2 = Bytes.toBytes("family2");
private final byte[] family3 = Bytes.toBytes("family3");
@Rule
public TestName name = new TestName();
@Before
public void before() throws IOException {
random.nextBytes(randomBytes);
// Mockito.when(log.append(htd, info, key, edits, inMemstore));
}
@Test @Test
public void verifyBulkLoadEvent() throws IOException { public void verifyBulkLoadEvent() throws IOException {
TableName tableName = TableName.valueOf("test", "test"); TableName tableName = TableName.valueOf("test", "test");
@ -113,22 +63,22 @@ public class TestBulkLoad {
storeFileName = (new Path(storeFileName)).getName(); storeFileName = (new Path(storeFileName)).getName();
List<String> storeFileNames = new ArrayList<>(); List<String> storeFileNames = new ArrayList<>();
storeFileNames.add(storeFileName); storeFileNames.add(storeFileName);
when(log.appendMarker(any(), any(), argThat( when(log.appendMarker(any(), any(),
bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), familyName, storeFileNames)))). argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), familyName, storeFileNames))))
thenAnswer(new Answer() { .thenAnswer(new Answer() {
@Override @Override
public Object answer(InvocationOnMock invocation) { public Object answer(InvocationOnMock invocation) {
WALKeyImpl walKey = invocation.getArgument(1); WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) { if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
walKey.setWriteEntry(we); walKey.setWriteEntry(we);
} }
return 01L; return 01L;
}; };
}); });
testRegionWithFamiliesAndSpecifiedTableName(tableName, family1) testRegionWithFamiliesAndSpecifiedTableName(tableName, family1).bulkLoadHFiles(familyPaths,
.bulkLoadHFiles(familyPaths, false, null); false, null);
verify(log).sync(anyLong()); verify(log).sync(anyLong());
} }
@ -139,19 +89,19 @@ public class TestBulkLoad {
@Test @Test
public void shouldBulkLoadSingleFamilyHLog() throws IOException { public void shouldBulkLoadSingleFamilyHLog() throws IOException {
when(log.appendMarker(any(), when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() { .thenAnswer(new Answer() {
@Override @Override
public Object answer(InvocationOnMock invocation) { public Object answer(InvocationOnMock invocation) {
WALKeyImpl walKey = invocation.getArgument(1); WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) { if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
walKey.setWriteEntry(we); walKey.setWriteEntry(we);
} }
return 01L; return 01L;
}; };
}); });
testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null); testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
verify(log).sync(anyLong()); verify(log).sync(anyLong());
} }
@ -178,19 +128,19 @@ public class TestBulkLoad {
@Test @Test
public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException { public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
when(log.appendMarker(any(), when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() { .thenAnswer(new Answer() {
@Override @Override
public Object answer(InvocationOnMock invocation) { public Object answer(InvocationOnMock invocation) {
WALKeyImpl walKey = invocation.getArgument(1); WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) { if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
walKey.setWriteEntry(we); walKey.setWriteEntry(we);
} }
return 01L; return 01L;
}; };
}); });
TableName tableName = TableName.valueOf("test", "test"); TableName tableName = TableName.valueOf("test", "test");
testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2) testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
.bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null); .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null);
@ -237,141 +187,4 @@ public class TestBulkLoad {
list.addAll(asList(withMissingHFileForFamily(family2))); list.addAll(asList(withMissingHFileForFamily(family2)));
testRegionWithFamilies(family1, family2).bulkLoadHFiles(list, false, null); testRegionWithFamilies(family1, family2).bulkLoadHFiles(list, false, null);
} }
private Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
return new Pair<>(family, getNotExistFilePath());
}
private String getNotExistFilePath() {
Path path = new Path(TEST_UTIL.getDataTestDir(), "does_not_exist");
return path.toUri().getPath();
}
private Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family)
throws IOException {
createHFileForFamilies(family);
return new Pair<>(new byte[]{0x00, 0x01, 0x02}, getNotExistFilePath());
}
private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
byte[]... families)
throws IOException {
HRegionInfo hRegionInfo = new HRegionInfo(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
for (byte[] family : families) {
hTableDescriptor.addFamily(new HColumnDescriptor(family));
}
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0,
0, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
// TODO We need a way to do this without creating files
return HRegion.createHRegion(hRegionInfo,
new Path(testFolder.newFolder().toURI()),
conf,
hTableDescriptor,
log);
}
private HRegion testRegionWithFamilies(byte[]... families) throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
return testRegionWithFamiliesAndSpecifiedTableName(tableName, families);
}
private List<Pair<byte[], String>> getBlankFamilyPaths(){
return new ArrayList<>();
}
private List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException {
List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths();
for (byte[] family : families) {
familyPaths.add(new Pair<>(family, createHFileForFamilies(family)));
}
return familyPaths;
}
private String createHFileForFamilies(byte[] family) throws IOException {
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
// TODO We need a way to do this without creating files
File hFileLocation = testFolder.newFile();
FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
try {
hFileFactory.withOutputStream(out);
hFileFactory.withFileContext(new HFileContextBuilder().build());
HFile.Writer writer = hFileFactory.create();
try {
writer.append(new KeyValue(CellUtil.createCell(randomBytes,
family,
randomBytes,
0L,
KeyValue.Type.Put.getCode(),
randomBytes)));
} finally {
writer.close();
}
} finally {
out.close();
}
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
private static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) {
return new WalMatcher(typeBytes);
}
private static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName,
byte[] familyName, List<String> storeFileNames) {
return new WalMatcher(typeBytes, tableName, familyName, storeFileNames);
}
private static class WalMatcher extends TypeSafeMatcher<WALEdit> {
private final byte[] typeBytes;
private final byte[] tableName;
private final byte[] familyName;
private final List<String> storeFileNames;
public WalMatcher(byte[] typeBytes) {
this(typeBytes, null, null, null);
}
public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName,
List<String> storeFileNames) {
this.typeBytes = typeBytes;
this.tableName = tableName;
this.familyName = familyName;
this.storeFileNames = storeFileNames;
}
@Override
protected boolean matchesSafely(WALEdit item) {
assertTrue(Arrays.equals(CellUtil.cloneQualifier(item.getCells().get(0)), typeBytes));
BulkLoadDescriptor desc;
try {
desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0));
} catch (IOException e) {
return false;
}
assertNotNull(desc);
if (tableName != null) {
assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(),
tableName));
}
if(storeFileNames != null) {
int index=0;
StoreDescriptor store = desc.getStores(0);
assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
assertEquals(storeFileNames.size(), store.getStoreFileCount());
}
return true;
}
@Override
public void describeTo(Description description) {
}
}
} }

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
public class TestBulkloadBase {
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected final WAL log = mock(WAL.class);
protected final Configuration conf = HBaseConfiguration.create();
private final Random random = new Random();
private final byte[] randomBytes = new byte[100];
protected final byte[] family1 = Bytes.toBytes("family1");
protected final byte[] family2 = Bytes.toBytes("family2");
protected final byte[] family3 = Bytes.toBytes("family3");
@Rule
public TestName name = new TestName();
@Before
public void before() throws IOException {
random.nextBytes(randomBytes);
}
protected Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
return new Pair<>(family, getNotExistFilePath());
}
private String getNotExistFilePath() {
Path path = new Path(TEST_UTIL.getDataTestDir(), "does_not_exist");
return path.toUri().getPath();
}
protected Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family)
throws IOException {
createHFileForFamilies(family);
return new Pair<>(new byte[] { 0x00, 0x01, 0x02 }, getNotExistFilePath());
}
protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
byte[]... families) throws IOException {
RegionInfo hRegionInfo = RegionInfoBuilder.newBuilder(tableName).build();
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
for (byte[] family : families) {
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
}
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
// TODO We need a way to do this without creating files
return HRegion.createHRegion(hRegionInfo, new Path(testFolder.newFolder().toURI()), conf,
builder.build(), log);
}
protected HRegion testRegionWithFamilies(byte[]... families) throws IOException {
TableName tableName = TableName.valueOf(name.getMethodName());
return testRegionWithFamiliesAndSpecifiedTableName(tableName, families);
}
private List<Pair<byte[], String>> getBlankFamilyPaths() {
return new ArrayList<>();
}
protected List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException {
List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths();
for (byte[] family : families) {
familyPaths.add(new Pair<>(family, createHFileForFamilies(family)));
}
return familyPaths;
}
private String createHFileForFamilies(byte[] family) throws IOException {
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
// TODO We need a way to do this without creating files
File hFileLocation = testFolder.newFile();
FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
try {
hFileFactory.withOutputStream(out);
hFileFactory.withFileContext(new HFileContextBuilder().build());
HFile.Writer writer = hFileFactory.create();
try {
writer.append(new KeyValue(ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
.setRow(randomBytes).setFamily(family).setQualifier(randomBytes).setTimestamp(0L)
.setType(KeyValue.Type.Put.getCode()).setValue(randomBytes).build()));
} finally {
writer.close();
}
} finally {
out.close();
}
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
protected static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) {
return new WalMatcher(typeBytes);
}
protected static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName,
byte[] familyName, List<String> storeFileNames) {
return new WalMatcher(typeBytes, tableName, familyName, storeFileNames);
}
private static class WalMatcher extends TypeSafeMatcher<WALEdit> {
private final byte[] typeBytes;
private final byte[] tableName;
private final byte[] familyName;
private final List<String> storeFileNames;
public WalMatcher(byte[] typeBytes) {
this(typeBytes, null, null, null);
}
public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName,
List<String> storeFileNames) {
this.typeBytes = typeBytes;
this.tableName = tableName;
this.familyName = familyName;
this.storeFileNames = storeFileNames;
}
@Override
protected boolean matchesSafely(WALEdit item) {
assertTrue(Arrays.equals(CellUtil.cloneQualifier(item.getCells().get(0)), typeBytes));
WALProtos.BulkLoadDescriptor desc;
try {
desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0));
} catch (IOException e) {
return false;
}
assertNotNull(desc);
if (tableName != null) {
assertTrue(
Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(), tableName));
}
if (storeFileNames != null) {
int index = 0;
WALProtos.StoreDescriptor store = desc.getStores(0);
assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
assertEquals(storeFileNames.size(), store.getStoreFileCount());
}
return true;
}
@Override
public void describeTo(Description description) {
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@Category(SmallTests.class)
public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
private final CompactionRequester compactionRequester = mock(CompactSplit.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class);
@Override
protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
byte[]... families) throws IOException {
RegionInfo hRegionInfo = RegionInfoBuilder.newBuilder(tableName).build();
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
for (byte[] family : families) {
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
}
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
// TODO We need a way to do this without creating files
return HRegion.createHRegion(hRegionInfo, new Path(testFolder.newFolder().toURI()), conf,
builder.build(), log, true, regionServerServices);
}
@Test
public void shouldRequestCompactionAfterBulkLoad() throws IOException {
List<Pair<byte[], String>> familyPaths = new ArrayList<>();
// enough hfile to request compaction
for (int i = 0; i < 5; i++) {
familyPaths.addAll(withFamilyPathsFor(family1, family2, family3));
}
when(regionServerServices.getConfiguration()).thenReturn(conf);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
.thenAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
walKey.setWriteEntry(we);
}
return 01L;
}
});
Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
any(), any());
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
// invoke three times for 3 families
verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
}
}