HBASE-22632 SplitTableRegionProcedure and MergeTableRegionsProcedure should skip store files for unknown column families (#344)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
0198868531
commit
94d5419efb
|
@ -639,15 +639,14 @@ public class MergeTableRegionsProcedure
|
||||||
* @param regionFs region file system
|
* @param regionFs region file system
|
||||||
* @param mergedDir the temp directory of merged region
|
* @param mergedDir the temp directory of merged region
|
||||||
*/
|
*/
|
||||||
private void mergeStoreFiles(
|
private void mergeStoreFiles(final MasterProcedureEnv env, final HRegionFileSystem regionFs,
|
||||||
final MasterProcedureEnv env, final HRegionFileSystem regionFs, final Path mergedDir)
|
final Path mergedDir) throws IOException {
|
||||||
throws IOException {
|
|
||||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||||
final Configuration conf = env.getMasterConfiguration();
|
final Configuration conf = env.getMasterConfiguration();
|
||||||
final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
|
final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
|
||||||
|
|
||||||
for (String family : regionFs.getFamilies()) {
|
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
|
||||||
final ColumnFamilyDescriptor hcd = htd.getColumnFamily(Bytes.toBytes(family));
|
String family = hcd.getNameAsString();
|
||||||
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
|
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
|
||||||
|
|
||||||
if (storeFiles != null && storeFiles.size() > 0) {
|
if (storeFiles != null && storeFiles.size() > 0) {
|
||||||
|
@ -655,9 +654,9 @@ public class MergeTableRegionsProcedure
|
||||||
// Create reference file(s) of the region in mergedDir.
|
// Create reference file(s) of the region in mergedDir.
|
||||||
// As this procedure is running on master, use CacheConfig.DISABLED means
|
// As this procedure is running on master, use CacheConfig.DISABLED means
|
||||||
// don't cache any block.
|
// don't cache any block.
|
||||||
regionFs.mergeStoreFile(mergedRegion, family,
|
regionFs.mergeStoreFile(mergedRegion, family, new HStoreFile(mfs.getFileSystem(),
|
||||||
new HStoreFile(mfs.getFileSystem(), storeFileInfo, conf, CacheConfig.DISABLED,
|
storeFileInfo, conf, CacheConfig.DISABLED, hcd.getBloomFilterType(), true),
|
||||||
hcd.getBloomFilterType(), true), mergedDir);
|
mergedDir);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -626,6 +626,7 @@ public class SplitTableRegionProcedure
|
||||||
final HRegionFileSystem regionFs) throws IOException {
|
final HRegionFileSystem regionFs) throws IOException {
|
||||||
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||||
final Configuration conf = env.getMasterConfiguration();
|
final Configuration conf = env.getMasterConfiguration();
|
||||||
|
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
|
||||||
// The following code sets up a thread pool executor with as many slots as
|
// The following code sets up a thread pool executor with as many slots as
|
||||||
// there's files to split. It then fires up everything, waits for
|
// there's files to split. It then fires up everything, waits for
|
||||||
// completion and finally checks for any exception
|
// completion and finally checks for any exception
|
||||||
|
@ -635,12 +636,15 @@ public class SplitTableRegionProcedure
|
||||||
// clean this up.
|
// clean this up.
|
||||||
int nbFiles = 0;
|
int nbFiles = 0;
|
||||||
final Map<String, Collection<StoreFileInfo>> files =
|
final Map<String, Collection<StoreFileInfo>> files =
|
||||||
new HashMap<String, Collection<StoreFileInfo>>(regionFs.getFamilies().size());
|
new HashMap<String, Collection<StoreFileInfo>>(htd.getColumnFamilyCount());
|
||||||
for (String family: regionFs.getFamilies()) {
|
for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
|
||||||
|
String family = cfd.getNameAsString();
|
||||||
Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
|
Collection<StoreFileInfo> sfis = regionFs.getStoreFiles(family);
|
||||||
if (sfis == null) continue;
|
if (sfis == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
Collection<StoreFileInfo> filteredSfis = null;
|
Collection<StoreFileInfo> filteredSfis = null;
|
||||||
for (StoreFileInfo sfi: sfis) {
|
for (StoreFileInfo sfi : sfis) {
|
||||||
// Filter. There is a lag cleaning up compacted reference files. They get cleared
|
// Filter. There is a lag cleaning up compacted reference files. They get cleared
|
||||||
// after a delay in case outstanding Scanners still have references. Because of this,
|
// after a delay in case outstanding Scanners still have references. Because of this,
|
||||||
// the listing of the Store content may have straggler reference files. Skip these.
|
// the listing of the Store content may have straggler reference files. Skip these.
|
||||||
|
@ -661,7 +665,7 @@ public class SplitTableRegionProcedure
|
||||||
}
|
}
|
||||||
if (nbFiles == 0) {
|
if (nbFiles == 0) {
|
||||||
// no file needs to be splitted.
|
// no file needs to be splitted.
|
||||||
return new Pair<Integer, Integer>(0,0);
|
return new Pair<Integer, Integer>(0, 0);
|
||||||
}
|
}
|
||||||
// Max #threads is the smaller of the number of storefiles or the default max determined above.
|
// Max #threads is the smaller of the number of storefiles or the default max determined above.
|
||||||
int maxThreads = Math.min(
|
int maxThreads = Math.min(
|
||||||
|
@ -669,12 +673,11 @@ public class SplitTableRegionProcedure
|
||||||
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT)),
|
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT)),
|
||||||
nbFiles);
|
nbFiles);
|
||||||
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
|
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
|
||||||
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
|
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
|
||||||
final ExecutorService threadPool = Executors.newFixedThreadPool(
|
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
|
||||||
maxThreads, Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
|
Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
|
||||||
final List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>>(nbFiles);
|
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);
|
||||||
|
|
||||||
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
|
|
||||||
// Split each store file.
|
// Split each store file.
|
||||||
for (Map.Entry<String, Collection<StoreFileInfo>> e : files.entrySet()) {
|
for (Map.Entry<String, Collection<StoreFileInfo>> e : files.entrySet()) {
|
||||||
byte[] familyName = Bytes.toBytes(e.getKey());
|
byte[] familyName = Bytes.toBytes(e.getKey());
|
||||||
|
@ -684,9 +687,9 @@ public class SplitTableRegionProcedure
|
||||||
for (StoreFileInfo storeFileInfo : storeFiles) {
|
for (StoreFileInfo storeFileInfo : storeFiles) {
|
||||||
// As this procedure is running on master, use CacheConfig.DISABLED means
|
// As this procedure is running on master, use CacheConfig.DISABLED means
|
||||||
// don't cache any block.
|
// don't cache any block.
|
||||||
StoreFileSplitter sfs = new StoreFileSplitter(regionFs, familyName,
|
StoreFileSplitter sfs =
|
||||||
new HStoreFile(mfs.getFileSystem(), storeFileInfo, conf, CacheConfig.DISABLED,
|
new StoreFileSplitter(regionFs, familyName, new HStoreFile(mfs.getFileSystem(),
|
||||||
hcd.getBloomFilterType(), true));
|
storeFileInfo, conf, CacheConfig.DISABLED, hcd.getBloomFilterType(), true));
|
||||||
futures.add(threadPool.submit(sfs));
|
futures.add(threadPool.submit(sfs));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -698,7 +701,7 @@ public class SplitTableRegionProcedure
|
||||||
// When splits ran on the RegionServer, how-long-to-wait-configuration was named
|
// When splits ran on the RegionServer, how-long-to-wait-configuration was named
|
||||||
// hbase.regionserver.fileSplitTimeout. If set, use its value.
|
// hbase.regionserver.fileSplitTimeout. If set, use its value.
|
||||||
long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
|
long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
|
||||||
conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
|
conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
|
||||||
try {
|
try {
|
||||||
boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS);
|
boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS);
|
||||||
if (stillRunning) {
|
if (stillRunning) {
|
||||||
|
@ -707,11 +710,11 @@ public class SplitTableRegionProcedure
|
||||||
while (!threadPool.isTerminated()) {
|
while (!threadPool.isTerminated()) {
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
}
|
}
|
||||||
throw new IOException("Took too long to split the" +
|
throw new IOException(
|
||||||
" files and create the references, aborting split");
|
"Took too long to split the" + " files and create the references, aborting split");
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
|
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
int daughterA = 0;
|
int daughterA = 0;
|
||||||
|
@ -731,9 +734,8 @@ public class SplitTableRegionProcedure
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("pid=" + getProcId() + " split storefiles for region " +
|
LOG.debug("pid=" + getProcId() + " split storefiles for region " +
|
||||||
getParentRegion().getShortNameToLog() +
|
getParentRegion().getShortNameToLog() + " Daughter A: " + daughterA +
|
||||||
" Daughter A: " + daughterA + " storefiles, Daughter B: " +
|
" storefiles, Daughter B: " + daughterB + " storefiles.");
|
||||||
daughterB + " storefiles.");
|
|
||||||
}
|
}
|
||||||
return new Pair<Integer, Integer>(daughterA, daughterB);
|
return new Pair<Integer, Integer>(daughterA, daughterB);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,117 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Testcase for HBASE-22632
|
||||||
|
*/
|
||||||
|
@Category({ MasterTests.class, MediumTests.class })
|
||||||
|
public class TestIgnoreUnknownFamily {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestIgnoreUnknownFamily.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static final byte[] FAMILY = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
private static final byte[] UNKNOWN_FAMILY = Bytes.toBytes("wrong_cf");
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDownAfterTest() throws IOException {
|
||||||
|
Admin admin = UTIL.getAdmin();
|
||||||
|
for (TableName tableName : admin.listTableNames()) {
|
||||||
|
admin.disableTable(tableName);
|
||||||
|
admin.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addStoreFileToKnownFamily(RegionInfo region) throws IOException {
|
||||||
|
MasterFileSystem mfs = UTIL.getMiniHBaseCluster().getMaster().getMasterFileSystem();
|
||||||
|
Path regionDir =
|
||||||
|
FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(mfs.getConfiguration()), region);
|
||||||
|
Path familyDir = new Path(regionDir, Bytes.toString(UNKNOWN_FAMILY));
|
||||||
|
StoreFileWriter writer =
|
||||||
|
new StoreFileWriter.Builder(mfs.getConfiguration(), mfs.getFileSystem())
|
||||||
|
.withOutputDir(familyDir).withFileContext(new HFileContext()).build();
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSplit()
|
||||||
|
throws IOException, InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
|
Admin admin = UTIL.getAdmin();
|
||||||
|
admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
|
||||||
|
RegionInfo region = admin.getRegions(tableName).get(0);
|
||||||
|
addStoreFileToKnownFamily(region);
|
||||||
|
admin.splitRegionAsync(region.getRegionName(), Bytes.toBytes(0)).get(30, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMerge()
|
||||||
|
throws IOException, InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
|
Admin admin = UTIL.getAdmin();
|
||||||
|
admin.createTable(
|
||||||
|
TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(),
|
||||||
|
new byte[][] { Bytes.toBytes(0) });
|
||||||
|
List<RegionInfo> regions = admin.getRegions(tableName);
|
||||||
|
addStoreFileToKnownFamily(regions.get(0));
|
||||||
|
admin.mergeRegionsAsync(regions.get(0).getEncodedNameAsBytes(),
|
||||||
|
regions.get(1).getEncodedNameAsBytes(), false).get(30, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue