From f71b980d3715c94478930b614e63250ebb678899 Mon Sep 17 00:00:00 2001 From: rahulgidwani Date: Mon, 19 Jan 2015 17:31:59 -0800 Subject: [PATCH] HBASE-11195 Potentially improve block locality during major compaction for old regions Signed-off-by: Andrew Purtell --- .../hbase/regionserver/RSRpcServices.java | 13 ++++++---- .../compactions/CompactionConfiguration.java | 20 +++++++++++++--- .../compactions/CompactionPolicy.java | 2 ++ .../RatioBasedCompactionPolicy.java | 24 +++++++++++++++---- 4 files changed, 48 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 814cbc138de..7f3b7cb3d74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -23,6 +23,7 @@ import java.io.InterruptedIOException; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -763,10 +764,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throw new IllegalArgumentException(e); } // Server to handle client requests. - String hostname = rs.conf.get("hbase.regionserver.ipc.address", - Strings.domainNamePointerToHostName(DNS.getDefaultHost( - rs.conf.get("hbase.regionserver.dns.interface", "default"), - rs.conf.get("hbase.regionserver.dns.nameserver", "default")))); + String hostname = getHostname(rs.conf); boolean mode = rs.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); @@ -807,6 +805,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rs.setName(name); } + public static String getHostname(Configuration conf) throws UnknownHostException { + return conf.get("hbase.regionserver.ipc.address", + Strings.domainNamePointerToHostName(DNS.getDefaultHost( + conf.get("hbase.regionserver.dns.interface", "default"), + conf.get("hbase.regionserver.dns.nameserver", "default")))); + } + RegionScanner getScanner(long scannerId) { String scannerIdString = Long.toString(scannerId); RegionScannerHolder scannerHolder = scanners.get(scannerIdString); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 8d8fcd07339..0de154c7022 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; * minFilesToCompact - lower bound on number of files in any minor compaction * maxFilesToCompact - upper bound on number of files in any minor compaction * compactionRatio - Ratio used for compaction + * minLocalityToForceCompact - Locality threshold for a store file to major compact (HBASE-11195) *

* Set parameter as "hbase.hstore.compaction." */ @@ -56,6 +57,8 @@ public class CompactionConfiguration { "hbase.hstore.compaction.max.size"; public static final String HBASE_HSTORE_OFFPEAK_END_HOUR = "hbase.offpeak.end.hour"; public static final String HBASE_HSTORE_OFFPEAK_START_HOUR = "hbase.offpeak.start.hour"; + public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT = + "hbase.hstore.min.locality.to.skip.major.compact"; Configuration conf; StoreConfigInformation storeConfigInfo; @@ -70,6 +73,7 @@ public class CompactionConfiguration { private final long throttlePoint; private final long majorCompactionPeriod; private final float majorCompactionJitter; + private final float minLocalityToForceCompact; CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) { this.conf = conf; @@ -89,7 +93,7 @@ public class CompactionConfiguration { majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7); // Make it 0.5 so jitter has us fall evenly either side of when the compaction should run majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F); - + minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f); LOG.info(this); } @@ -97,7 +101,7 @@ public class CompactionConfiguration { public String toString() { return String.format( "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;" - + " major period %d, major jitter %f", + + " major period %d, major jitter %f, min locality to compact %f", minCompactSize, maxCompactSize, minFilesToCompact, @@ -106,7 +110,8 @@ public class CompactionConfiguration { offPeakCompactionRatio, throttlePoint, majorCompactionPeriod, - majorCompactionJitter); + majorCompactionJitter, + minLocalityToForceCompact); } /** @@ -173,4 +178,13 @@ public class CompactionConfiguration { public float getMajorCompactionJitter() { return majorCompactionJitter; } + + /** + * @return Block locality ratio, the ratio at which we will include old regions with a single + * store file for major compaction. Used to improve block locality for regions that + * haven't had writes in a while but are still being read. + */ + public float getMinLocalityToForceCompact() { + return minLocalityToForceCompact; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java index d31c9b89609..3b24189b6b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java @@ -62,6 +62,8 @@ public abstract class CompactionPolicy { this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo); } + + /** * @return The current compaction configuration settings. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java index c58ff147f5e..0db5c00ed39 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; @@ -298,10 +299,25 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy { : now - minTimestamp.longValue(); if (sf.isMajorCompaction() && (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping major compaction of " + this + - " because one (major) compacted file only and oldestTime " + - oldest + "ms is < ttl=" + cfTtl); + float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex( + RSRpcServices.getHostname(comConf.conf) + ); + if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on only store " + this + + "; to make hdfs blocks local, current blockLocalityIndex is " + + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + + ")"); + } + result = true; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only, oldestTime " + + oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " + + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + + ")"); + } } } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { LOG.debug("Major compaction triggered on store " + this +