diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 05374b10309..0d9f56459ab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -758,10 +759,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new IllegalArgumentException(e);
}
// Server to handle client requests.
- String hostname = rs.conf.get("hbase.regionserver.ipc.address",
- Strings.domainNamePointerToHostName(DNS.getDefaultHost(
- rs.conf.get("hbase.regionserver.dns.interface", "default"),
- rs.conf.get("hbase.regionserver.dns.nameserver", "default"))));
+ String hostname = getHostname(rs.conf);
boolean mode =
rs.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
@@ -802,6 +800,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rs.setName(name);
}
+ public static String getHostname(Configuration conf) throws UnknownHostException {
+ return conf.get("hbase.regionserver.ipc.address",
+ Strings.domainNamePointerToHostName(DNS.getDefaultHost(
+ conf.get("hbase.regionserver.dns.interface", "default"),
+ conf.get("hbase.regionserver.dns.nameserver", "default"))));
+ }
+
RegionScanner getScanner(long scannerId) {
String scannerIdString = Long.toString(scannerId);
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
index 8d8fcd07339..0de154c7022 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
* minFilesToCompact - lower bound on number of files in any minor compaction
* maxFilesToCompact - upper bound on number of files in any minor compaction
* compactionRatio - Ratio used for compaction
+ * minLocalityToForceCompact - Locality threshold for a store file to major compact (HBASE-11195)
*
* Set parameter as "hbase.hstore.compaction."
*/
@@ -56,6 +57,8 @@ public class CompactionConfiguration {
"hbase.hstore.compaction.max.size";
public static final String HBASE_HSTORE_OFFPEAK_END_HOUR = "hbase.offpeak.end.hour";
public static final String HBASE_HSTORE_OFFPEAK_START_HOUR = "hbase.offpeak.start.hour";
+ public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT =
+ "hbase.hstore.min.locality.to.skip.major.compact";
Configuration conf;
StoreConfigInformation storeConfigInfo;
@@ -70,6 +73,7 @@ public class CompactionConfiguration {
private final long throttlePoint;
private final long majorCompactionPeriod;
private final float majorCompactionJitter;
+ private final float minLocalityToForceCompact;
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
this.conf = conf;
@@ -89,7 +93,7 @@ public class CompactionConfiguration {
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7);
// Make it 0.5 so jitter has us fall evenly either side of when the compaction should run
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F);
-
+ minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f);
LOG.info(this);
}
@@ -97,7 +101,7 @@ public class CompactionConfiguration {
public String toString() {
return String.format(
"size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;"
- + " major period %d, major jitter %f",
+ + " major period %d, major jitter %f, min locality to compact %f",
minCompactSize,
maxCompactSize,
minFilesToCompact,
@@ -106,7 +110,8 @@ public class CompactionConfiguration {
offPeakCompactionRatio,
throttlePoint,
majorCompactionPeriod,
- majorCompactionJitter);
+ majorCompactionJitter,
+ minLocalityToForceCompact);
}
/**
@@ -173,4 +178,13 @@ public class CompactionConfiguration {
public float getMajorCompactionJitter() {
return majorCompactionJitter;
}
+
+ /**
+ * @return Block locality ratio, the ratio at which we will include old regions with a single
+ * store file for major compaction. Used to improve block locality for regions that
+ * haven't had writes in a while but are still being read.
+ */
+ public float getMinLocalityToForceCompact() {
+ return minLocalityToForceCompact;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
index d31c9b89609..3b24189b6b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
@@ -62,6 +62,8 @@ public abstract class CompactionPolicy {
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
}
+
+
/**
* @return The current compaction configuration settings.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
index c58ff147f5e..0db5c00ed39 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
@@ -298,10 +299,25 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
: now - minTimestamp.longValue();
if (sf.isMajorCompaction() &&
(cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping major compaction of " + this +
- " because one (major) compacted file only and oldestTime " +
- oldest + "ms is < ttl=" + cfTtl);
+ float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex(
+ RSRpcServices.getHostname(comConf.conf)
+ );
+ if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Major compaction triggered on only store " + this +
+ "; to make hdfs blocks local, current blockLocalityIndex is " +
+ blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
+ ")");
+ }
+ result = true;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping major compaction of " + this +
+ " because one (major) compacted file only, oldestTime " +
+ oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " +
+ blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
+ ")");
+ }
}
} else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
LOG.debug("Major compaction triggered on store " + this +