From 8a69dd5b0898a92bd86d75d7535660322e27cf8e Mon Sep 17 00:00:00 2001 From: Enis Soztutar Date: Wed, 18 Nov 2015 19:22:03 -0800 Subject: [PATCH] HBASE-14468 Compaction improvements: FIFO compaction policy (Vladimir Rodionov) --- .../hbase/util/TimeOffsetEnvironmentEdge.java | 49 ++++ .../apache/hadoop/hbase/master/HMaster.java | 88 ++++++- .../compactions/FIFOCompactionPolicy.java | 134 ++++++++++ .../compactions/TestFIFOCompactionPolicy.java | 234 ++++++++++++++++++ 4 files changed, 501 insertions(+), 4 deletions(-) create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java new file mode 100644 index 00000000000..dba71298630 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class TimeOffsetEnvironmentEdge implements EnvironmentEdge { + private long offset; + + public TimeOffsetEnvironmentEdge(){} + + public void setTimeOffset(long off){ + this.offset = off; + } + + public long getTimeOffset() + { + return offset; + } + + public void increment(long incr) + { + this.offset += incr; + } + + @Override + public long currentTime() { + return System.currentTimeMillis() + offset; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 9195aa76922..c1f10fbf720 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -93,6 +93,9 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler; +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore; +import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory; import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.CreateNamespaceProcedure; import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; @@ -109,9 +112,6 @@ import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; -import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; -import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore; -import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; @@ -125,10 +125,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.quotas.RegionStateListener; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Addressing; @@ -1560,7 +1564,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } catch (IOException e) { warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e); } - + // Verify compaction policy + try{ + checkCompactionPolicy(conf, htd); + } catch(IOException e){ + warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e); + } // check that we have at least 1 CF if (htd.getColumnFamilies().length == 0) { String message = "Table should have at least one column family."; @@ -1616,6 +1625,77 @@ public class HMaster extends HRegionServer implements MasterServices, Server { } } + private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) + throws IOException { + // FIFO compaction has some requirements + // Actually FCP ignores periodic major compactions + String className = + htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY); + if (className == null) { + className = + conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + ExploringCompactionPolicy.class.getName()); + } + + long majorCompactionPeriod = Long.MAX_VALUE; + String sv = htd.getConfigurationValue(HConstants.MAJOR_COMPACTION_PERIOD); + if (sv != null) { + majorCompactionPeriod = Long.parseLong(sv); + } else { + majorCompactionPeriod = + conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, majorCompactionPeriod); + } + String splitPolicyClassName = htd.getRegionSplitPolicyClassName(); + if (splitPolicyClassName == null) { + splitPolicyClassName = conf.get(HConstants.HBASE_REGION_SPLIT_POLICY_KEY); + } + + int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT; + sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY); + if (sv != null) { + blockingFileCount = Integer.parseInt(sv); + } else { + blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount); + } + + for (HColumnDescriptor hcd : htd.getColumnFamilies()) { + String compactionPolicy = + hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY); + if (compactionPolicy == null) { + compactionPolicy = className; + } + if (className.equals(FIFOCompactionPolicy.class.getName()) == false) { + continue; + } + // FIFOCompaction + String message = null; + + // 1. Check TTL + if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) { + message = "Default TTL is not supported for FIFO compaction"; + throw new IOException(message); + } + + // 2. Check min versions + if (hcd.getMinVersions() > 0) { + message = "MIN_VERSION > 0 is not supported for FIFO compaction"; + throw new IOException(message); + } + + // 3. blocking file count + String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY); + if (sbfc != null) { + blockingFileCount = Integer.parseInt(sbfc); + } + if (blockingFileCount < 1000) { + message = + "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount + + " is below recommended minimum of 1000"; + throw new IOException(message); + } + } + } + // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled. private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey, String message, Exception cause) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java new file mode 100644 index 00000000000..eace81fe483 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/FIFOCompactionPolicy.java @@ -0,0 +1,134 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * + * FIFO compaction policy selects only files which have all cells expired. + * The column family MUST have non-default TTL. One of the use cases for this + * policy is when we need to store raw data which will be post-processed later + * and discarded completely after quite short period of time. Raw time-series vs. + * time-based roll up aggregates and compacted time-series. We collect raw time-series + * and store them into CF with FIFO compaction policy, periodically we run task + * which creates roll up aggregates and compacts time-series, the original raw data + * can be discarded after that. + * + */ +@InterfaceAudience.Private +public class FIFOCompactionPolicy extends ExploringCompactionPolicy { + + private static final Log LOG = LogFactory.getLog(FIFOCompactionPolicy.class); + + + public FIFOCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { + super(conf, storeConfigInfo); + } + + @Override + public CompactionRequest selectCompaction(Collection candidateFiles, + List filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, + boolean forceMajor) throws IOException { + + if(forceMajor){ + LOG.warn("Major compaction is not supported for FIFO compaction policy. Ignore the flag."); + } + boolean isAfterSplit = StoreUtils.hasReferences(candidateFiles); + if(isAfterSplit){ + LOG.info("Split detected, delegate selection to the parent policy."); + return super.selectCompaction(candidateFiles, filesCompacting, isUserCompaction, + mayUseOffPeak, forceMajor); + } + + // Nothing to compact + Collection toCompact = getExpiredStores(candidateFiles, filesCompacting); + CompactionRequest result = new CompactionRequest(toCompact); + return result; + } + + @Override + public boolean isMajorCompaction(Collection filesToCompact) throws IOException { + boolean isAfterSplit = StoreUtils.hasReferences(filesToCompact); + if(isAfterSplit){ + LOG.info("Split detected, delegate to the parent policy."); + return super.isMajorCompaction(filesToCompact); + } + return false; + } + + @Override + public boolean needsCompaction(Collection storeFiles, + List filesCompacting) { + boolean isAfterSplit = StoreUtils.hasReferences(storeFiles); + if(isAfterSplit){ + LOG.info("Split detected, delegate to the parent policy."); + return super.needsCompaction(storeFiles, filesCompacting); + } + return hasExpiredStores(storeFiles); + } + + private boolean hasExpiredStores(Collection files) { + long currentTime = EnvironmentEdgeManager.currentTime(); + for(StoreFile sf: files){ + // Check MIN_VERSIONS is in HStore removeUnneededFiles + Long maxTs = sf.getReader().getMaxTimestamp(); + long maxTtl = storeConfigInfo.getStoreFileTtl(); + if(maxTs == null + || maxTtl == Long.MAX_VALUE + || (currentTime - maxTtl < maxTs)){ + continue; + } else{ + return true; + } + } + return false; + } + + private Collection getExpiredStores(Collection files, + Collection filesCompacting) { + long currentTime = EnvironmentEdgeManager.currentTime(); + Collection expiredStores = new ArrayList(); + for(StoreFile sf: files){ + // Check MIN_VERSIONS is in HStore removeUnneededFiles + Long maxTs = sf.getReader().getMaxTimestamp(); + long maxTtl = storeConfigInfo.getStoreFileTtl(); + if(maxTs == null + || maxTtl == Long.MAX_VALUE + || (currentTime - maxTtl < maxTs)){ + continue; + } else if(filesCompacting == null || filesCompacting.contains(sf) == false){ + expiredStores.add(sf); + } + } + return expiredStores; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java new file mode 100644 index 00000000000..b887da9ec2e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.TimeOffsetEnvironmentEdge; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class }) +public class TestFIFOCompactionPolicy { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store prepareData() throws IOException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + FIFOCompactionPolicy.class.getName()); + desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + DisabledRegionSplitPolicy.class.getName()); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + colDesc.setTimeToLive(1); // 1 sec + desc.addFamily(colDesc); + + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(tableName); + Random rand = new Random(); + TimeOffsetEnvironmentEdge edge = + (TimeOffsetEnvironmentEdge) EnvironmentEdgeManager.getDelegate(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[128 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + edge.increment(1001); + } + return getStoreWithName(tableName); + } + + @BeforeClass + public static void setEnvironmentEdge() + { + EnvironmentEdge ee = new TimeOffsetEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(ee); + } + + @AfterClass + public static void resetEnvironmentEdge() + { + EnvironmentEdgeManager.reset(); + } + + @Test + public void testPurgeExpiredFiles() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + + TEST_UTIL.startMiniCluster(1); + try { + Store store = prepareData(); + assertEquals(10, store.getStorefilesCount()); + TEST_UTIL.getHBaseAdmin().majorCompact(tableName); + while (store.getStorefilesCount() > 1) { + Thread.sleep(100); + } + assertTrue(store.getStorefilesCount() == 1); + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testSanityCheckTTL() throws Exception + { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + TEST_UTIL.startMiniCluster(1); + + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + String tableName = this.tableName.getNameAsString()+"-TTL"; + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + FIFOCompactionPolicy.class.getName()); + desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + DisabledRegionSplitPolicy.class.getName()); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + desc.addFamily(colDesc); + try{ + admin.createTable(desc); + Assert.fail(); + }catch(Exception e){ + }finally{ + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testSanityCheckMinVersion() throws Exception + { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + TEST_UTIL.startMiniCluster(1); + + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + String tableName = this.tableName.getNameAsString()+"-MinVersion"; + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + FIFOCompactionPolicy.class.getName()); + desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + DisabledRegionSplitPolicy.class.getName()); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + colDesc.setTimeToLive(1); // 1 sec + colDesc.setMinVersions(1); + desc.addFamily(colDesc); + try{ + admin.createTable(desc); + Assert.fail(); + }catch(Exception e){ + }finally{ + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testSanityCheckBlockingStoreFiles() throws Exception + { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10); + TEST_UTIL.startMiniCluster(1); + + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + String tableName = this.tableName.getNameAsString()+"-MinVersion"; + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.setConfiguration(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, + FIFOCompactionPolicy.class.getName()); + desc.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + DisabledRegionSplitPolicy.class.getName()); + HColumnDescriptor colDesc = new HColumnDescriptor(family); + colDesc.setTimeToLive(1); // 1 sec + desc.addFamily(colDesc); + try{ + admin.createTable(desc); + Assert.fail(); + }catch(Exception e){ + }finally{ + TEST_UTIL.shutdownMiniCluster(); + } + } +}