HBASE-11195 Potentially improve block locality during major compaction for old regions
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
53815afc10
commit
f71b980d37
|
@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
|
||||||
import java.lang.annotation.Retention;
|
import java.lang.annotation.Retention;
|
||||||
import java.lang.annotation.RetentionPolicy;
|
import java.lang.annotation.RetentionPolicy;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -763,10 +764,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
throw new IllegalArgumentException(e);
|
throw new IllegalArgumentException(e);
|
||||||
}
|
}
|
||||||
// Server to handle client requests.
|
// Server to handle client requests.
|
||||||
String hostname = rs.conf.get("hbase.regionserver.ipc.address",
|
String hostname = getHostname(rs.conf);
|
||||||
Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
|
||||||
rs.conf.get("hbase.regionserver.dns.interface", "default"),
|
|
||||||
rs.conf.get("hbase.regionserver.dns.nameserver", "default"))));
|
|
||||||
|
|
||||||
boolean mode =
|
boolean mode =
|
||||||
rs.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
|
rs.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
|
||||||
|
@ -807,6 +805,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
rs.setName(name);
|
rs.setName(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String getHostname(Configuration conf) throws UnknownHostException {
|
||||||
|
return conf.get("hbase.regionserver.ipc.address",
|
||||||
|
Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
||||||
|
conf.get("hbase.regionserver.dns.interface", "default"),
|
||||||
|
conf.get("hbase.regionserver.dns.nameserver", "default"))));
|
||||||
|
}
|
||||||
|
|
||||||
RegionScanner getScanner(long scannerId) {
|
RegionScanner getScanner(long scannerId) {
|
||||||
String scannerIdString = Long.toString(scannerId);
|
String scannerIdString = Long.toString(scannerId);
|
||||||
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
|
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||||
* minFilesToCompact - lower bound on number of files in any minor compaction
|
* minFilesToCompact - lower bound on number of files in any minor compaction
|
||||||
* maxFilesToCompact - upper bound on number of files in any minor compaction
|
* maxFilesToCompact - upper bound on number of files in any minor compaction
|
||||||
* compactionRatio - Ratio used for compaction
|
* compactionRatio - Ratio used for compaction
|
||||||
|
* minLocalityToForceCompact - Locality threshold for a store file to major compact (HBASE-11195)
|
||||||
* <p/>
|
* <p/>
|
||||||
* Set parameter as "hbase.hstore.compaction.<attribute>"
|
* Set parameter as "hbase.hstore.compaction.<attribute>"
|
||||||
*/
|
*/
|
||||||
|
@ -56,6 +57,8 @@ public class CompactionConfiguration {
|
||||||
"hbase.hstore.compaction.max.size";
|
"hbase.hstore.compaction.max.size";
|
||||||
public static final String HBASE_HSTORE_OFFPEAK_END_HOUR = "hbase.offpeak.end.hour";
|
public static final String HBASE_HSTORE_OFFPEAK_END_HOUR = "hbase.offpeak.end.hour";
|
||||||
public static final String HBASE_HSTORE_OFFPEAK_START_HOUR = "hbase.offpeak.start.hour";
|
public static final String HBASE_HSTORE_OFFPEAK_START_HOUR = "hbase.offpeak.start.hour";
|
||||||
|
public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT =
|
||||||
|
"hbase.hstore.min.locality.to.skip.major.compact";
|
||||||
|
|
||||||
Configuration conf;
|
Configuration conf;
|
||||||
StoreConfigInformation storeConfigInfo;
|
StoreConfigInformation storeConfigInfo;
|
||||||
|
@ -70,6 +73,7 @@ public class CompactionConfiguration {
|
||||||
private final long throttlePoint;
|
private final long throttlePoint;
|
||||||
private final long majorCompactionPeriod;
|
private final long majorCompactionPeriod;
|
||||||
private final float majorCompactionJitter;
|
private final float majorCompactionJitter;
|
||||||
|
private final float minLocalityToForceCompact;
|
||||||
|
|
||||||
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
CompactionConfiguration(Configuration conf, StoreConfigInformation storeConfigInfo) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
@ -89,7 +93,7 @@ public class CompactionConfiguration {
|
||||||
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7);
|
majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24*7);
|
||||||
// Make it 0.5 so jitter has us fall evenly either side of when the compaction should run
|
// Make it 0.5 so jitter has us fall evenly either side of when the compaction should run
|
||||||
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F);
|
majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.50F);
|
||||||
|
minLocalityToForceCompact = conf.getFloat(HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT, 0f);
|
||||||
LOG.info(this);
|
LOG.info(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,7 +101,7 @@ public class CompactionConfiguration {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return String.format(
|
return String.format(
|
||||||
"size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;"
|
"size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;"
|
||||||
+ " major period %d, major jitter %f",
|
+ " major period %d, major jitter %f, min locality to compact %f",
|
||||||
minCompactSize,
|
minCompactSize,
|
||||||
maxCompactSize,
|
maxCompactSize,
|
||||||
minFilesToCompact,
|
minFilesToCompact,
|
||||||
|
@ -106,7 +110,8 @@ public class CompactionConfiguration {
|
||||||
offPeakCompactionRatio,
|
offPeakCompactionRatio,
|
||||||
throttlePoint,
|
throttlePoint,
|
||||||
majorCompactionPeriod,
|
majorCompactionPeriod,
|
||||||
majorCompactionJitter);
|
majorCompactionJitter,
|
||||||
|
minLocalityToForceCompact);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -173,4 +178,13 @@ public class CompactionConfiguration {
|
||||||
public float getMajorCompactionJitter() {
|
public float getMajorCompactionJitter() {
|
||||||
return majorCompactionJitter;
|
return majorCompactionJitter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Block locality ratio, the ratio at which we will include old regions with a single
|
||||||
|
* store file for major compaction. Used to improve block locality for regions that
|
||||||
|
* haven't had writes in a while but are still being read.
|
||||||
|
*/
|
||||||
|
public float getMinLocalityToForceCompact() {
|
||||||
|
return minLocalityToForceCompact;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,6 +62,8 @@ public abstract class CompactionPolicy {
|
||||||
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
|
this.comConf = new CompactionConfiguration(conf, this.storeConfigInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The current compaction configuration settings.
|
* @return The current compaction configuration settings.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
import org.apache.hadoop.hbase.regionserver.StoreUtils;
|
||||||
|
@ -298,10 +299,25 @@ public class RatioBasedCompactionPolicy extends CompactionPolicy {
|
||||||
: now - minTimestamp.longValue();
|
: now - minTimestamp.longValue();
|
||||||
if (sf.isMajorCompaction() &&
|
if (sf.isMajorCompaction() &&
|
||||||
(cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
|
(cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
|
||||||
|
float blockLocalityIndex = sf.getHDFSBlockDistribution().getBlockLocalityIndex(
|
||||||
|
RSRpcServices.getHostname(comConf.conf)
|
||||||
|
);
|
||||||
|
if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Major compaction triggered on only store " + this +
|
||||||
|
"; to make hdfs blocks local, current blockLocalityIndex is " +
|
||||||
|
blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
|
||||||
|
")");
|
||||||
|
}
|
||||||
|
result = true;
|
||||||
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Skipping major compaction of " + this +
|
LOG.debug("Skipping major compaction of " + this +
|
||||||
" because one (major) compacted file only and oldestTime " +
|
" because one (major) compacted file only, oldestTime " +
|
||||||
oldest + "ms is < ttl=" + cfTtl);
|
oldest + "ms is < ttl=" + cfTtl + " and blockLocalityIndex is " +
|
||||||
|
blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() +
|
||||||
|
")");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
|
} else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
|
||||||
LOG.debug("Major compaction triggered on store " + this +
|
LOG.debug("Major compaction triggered on store " + this +
|
||||||
|
|
Loading…
Reference in New Issue