HBASE-14468 Compaction improvements: FIFO compaction policy (Vladimir Rodionov)

This commit is contained in:
Enis Soztutar 2015-11-18 19:22:03 -08:00
parent de4f235bf4
commit 8a69dd5b08
4 changed files with 501 additions and 4 deletions

View File

@ -0,0 +1,49 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public class TimeOffsetEnvironmentEdge implements EnvironmentEdge {
private long offset;
public TimeOffsetEnvironmentEdge(){}
public void setTimeOffset(long off){
this.offset = off;
}
public long getTimeOffset()
{
return offset;
}
public void increment(long incr)
{
this.offset += incr;
}
@Override
public long currentTime() {
return System.currentTimeMillis() + offset;
}
}

View File

@ -93,6 +93,9 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler; import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
import org.apache.hadoop.hbase.master.procedure.CreateNamespaceProcedure; import org.apache.hadoop.hbase.master.procedure.CreateNamespaceProcedure;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
@ -109,9 +112,6 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@ -125,10 +125,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionStateListener; import org.apache.hadoop.hbase.quotas.RegionStateListener;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Addressing;
@ -1560,7 +1564,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
} catch (IOException e) { } catch (IOException e) {
warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e); warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
} }
// Verify compaction policy
try{
checkCompactionPolicy(conf, htd);
} catch(IOException e){
warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e);
}
// check that we have at least 1 CF // check that we have at least 1 CF
if (htd.getColumnFamilies().length == 0) { if (htd.getColumnFamilies().length == 0) {
String message = "Table should have at least one column family."; String message = "Table should have at least one column family.";
@ -1616,6 +1625,77 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
} }
} }
private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
throws IOException {
// FIFO compaction has some requirements
// Actually FCP ignores periodic major compactions
String className =
htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
if (className == null) {
className =
conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
ExploringCompactionPolicy.class.getName());
}
long majorCompactionPeriod = Long.MAX_VALUE;
String sv = htd.getConfigurationValue(HConstants.MAJOR_COMPACTION_PERIOD);
if (sv != null) {
majorCompactionPeriod = Long.parseLong(sv);
} else {
majorCompactionPeriod =
conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, majorCompactionPeriod);
}
String splitPolicyClassName = htd.getRegionSplitPolicyClassName();
if (splitPolicyClassName == null) {
splitPolicyClassName = conf.get(HConstants.HBASE_REGION_SPLIT_POLICY_KEY);
}
int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT;
sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
if (sv != null) {
blockingFileCount = Integer.parseInt(sv);
} else {
blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount);
}
for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
String compactionPolicy =
hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
if (compactionPolicy == null) {
compactionPolicy = className;
}
if (className.equals(FIFOCompactionPolicy.class.getName()) == false) {
continue;
}
// FIFOCompaction
String message = null;
// 1. Check TTL
if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) {
message = "Default TTL is not supported for FIFO compaction";
throw new IOException(message);
}
// 2. Check min versions
if (hcd.getMinVersions() > 0) {
message = "MIN_VERSION > 0 is not supported for FIFO compaction";
throw new IOException(message);
}
// 3. blocking file count
String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
if (sbfc != null) {
blockingFileCount = Integer.parseInt(sbfc);
}
if (blockingFileCount < 1000) {
message =
"blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount
+ " is below recommended minimum of 1000";
throw new IOException(message);
}
}
}
// HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled. // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey, private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
String message, Exception cause) throws IOException { String message, Exception cause) throws IOException {

View File

@ -0,0 +1,134 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
*
* FIFO compaction policy selects only files which have all cells expired.
* The column family MUST have non-default TTL. One of the use cases for this
* policy is when we need to store raw data which will be post-processed later
* and discarded completely after quite short period of time. Raw time-series vs.
* time-based roll up aggregates and compacted time-series. We collect raw time-series
* and store them into CF with FIFO compaction policy, periodically we run task
* which creates roll up aggregates and compacts time-series, the original raw data
* can be discarded after that.
*
*/
@InterfaceAudience.Private
public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class);
public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
super(conf, storeConfigInfo);
}
@Override
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
List<StoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak,
boolean forceMajor) throws IOException {
if(forceMajor){
LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag.");
}
boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles);
if(isAfterSplit){
LOG.info("Split detected, delegate selection to the parent policy.");
return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction,
mayUseOffPeak, forceMajor);
}
// Nothing to compact
Collection<StoreFile> toCompact = getExpiredStores(candidateFiles, filesCompacting);
CompactionRequest result = new CompactionRequest(toCompact);
return result;
}
@Override
public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact);
if(isAfterSplit){
LOG.info("Split detected, delegate to the parent policy.");
return super.isMajorCompaction(filesToCompact);
}
return false;
}
@Override
public boolean needsCompaction(Collection<StoreFile> storeFiles,
List<StoreFile> filesCompacting) {
boolean isAfterSplit = StoreUtils.hasReferences(storeFiles);
if(isAfterSplit){
LOG.info("Split detected, delegate to the parent policy.");
return super.needsCompaction(storeFiles, filesCompacting);
}
return hasExpiredStores(storeFiles);
}
private boolean hasExpiredStores(Collection<StoreFile> files) {
long currentTime = EnvironmentEdgeManager.currentTime();
for(StoreFile sf: files){
// Check MIN_VERSIONS is in HStore removeUnneededFiles
Long maxTs = sf.getReader().getMaxTimestamp();
long maxTtl = storeConfigInfo.getStoreFileTtl();
if(maxTs == null
|| maxTtl == Long.MAX_VALUE
|| (currentTime - maxTtl < maxTs)){
continue;
} else{
return true;
}
}
return false;
}
private Collection<StoreFile> getExpiredStores(Collection<StoreFile> files,
Collection<StoreFile> filesCompacting) {
long currentTime = EnvironmentEdgeManager.currentTime();
Collection<StoreFile> expiredStores = new ArrayList<StoreFile>();
for(StoreFile sf: files){
// Check MIN_VERSIONS is in HStore removeUnneededFiles
Long maxTs = sf.getReader().getMaxTimestamp();
long maxTtl = storeConfigInfo.getStoreFileTtl();
if(maxTs == null
|| maxTtl == Long.MAX_VALUE
|| (currentTime - maxTtl < maxTs)){
continue;
} else if(filesCompacting == null || filesCompacting.contains(sf) == false){
expiredStores.add(sf);
}
}
return expiredStores;
}
}

View File

@ -0,0 +1,234 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.TimeOffsetEnvironmentEdge;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class })
public class TestFIFOCompactionPolicy {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
private final byte[] family = Bytes.toBytes("f");
private final byte[] qualifier = Bytes.toBytes("q");
private Store getStoreWithName(TableName tableName) {
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
HRegionServer hrs = rsts.get(i).getRegionServer();
for (Region region : hrs.getOnlineRegions(tableName)) {
return region.getStores().iterator().next();
}
}
return null;
}
private Store prepareData() throws IOException {
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
FIFOCompactionPolicy.class.getName());
desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
DisabledRegionSplitPolicy.class.getName());
HColumnDescriptor colDesc = new HColumnDescriptor(family);
colDesc.setTimeToLive(1); // 1 sec
desc.addFamily(colDesc);
admin.createTable(desc);
Table table = TEST_UTIL.getConnection().getTable(tableName);
Random rand = new Random();
TimeOffsetEnvironmentEdge edge =
(TimeOffsetEnvironmentEdge) EnvironmentEdgeManager.getDelegate();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[128 * 1024];
rand.nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
admin.flush(tableName);
edge.increment(1001);
}
return getStoreWithName(tableName);
}
@BeforeClass
public static void setEnvironmentEdge()
{
EnvironmentEdge ee = new TimeOffsetEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(ee);
}
@AfterClass
public static void resetEnvironmentEdge()
{
EnvironmentEdgeManager.reset();
}
@Test
public void testPurgeExpiredFiles() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
TEST_UTIL.startMiniCluster(1);
try {
Store store = prepareData();
assertEquals(10, store.getStorefilesCount());
TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
while (store.getStorefilesCount() > 1) {
Thread.sleep(100);
}
assertTrue(store.getStorefilesCount() == 1);
} finally {
TEST_UTIL.shutdownMiniCluster();
}
}
@Test
public void testSanityCheckTTL() throws Exception
{
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
TEST_UTIL.startMiniCluster(1);
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
String tableName = this.tableName.getNameAsString()+"-TTL";
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
FIFOCompactionPolicy.class.getName());
desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
DisabledRegionSplitPolicy.class.getName());
HColumnDescriptor colDesc = new HColumnDescriptor(family);
desc.addFamily(colDesc);
try{
admin.createTable(desc);
Assert.fail();
}catch(Exception e){
}finally{
TEST_UTIL.shutdownMiniCluster();
}
}
@Test
public void testSanityCheckMinVersion() throws Exception
{
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
TEST_UTIL.startMiniCluster(1);
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
String tableName = this.tableName.getNameAsString()+"-MinVersion";
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
FIFOCompactionPolicy.class.getName());
desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
DisabledRegionSplitPolicy.class.getName());
HColumnDescriptor colDesc = new HColumnDescriptor(family);
colDesc.setTimeToLive(1); // 1 sec
colDesc.setMinVersions(1);
desc.addFamily(colDesc);
try{
admin.createTable(desc);
Assert.fail();
}catch(Exception e){
}finally{
TEST_UTIL.shutdownMiniCluster();
}
}
@Test
public void testSanityCheckBlockingStoreFiles() throws Exception
{
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10);
TEST_UTIL.startMiniCluster(1);
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
String tableName = this.tableName.getNameAsString()+"-MinVersion";
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
FIFOCompactionPolicy.class.getName());
desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
DisabledRegionSplitPolicy.class.getName());
HColumnDescriptor colDesc = new HColumnDescriptor(family);
colDesc.setTimeToLive(1); // 1 sec
desc.addFamily(colDesc);
try{
admin.createTable(desc);
Assert.fail();
}catch(Exception e){
}finally{
TEST_UTIL.shutdownMiniCluster();
}
}
}