HBASE-16981 Expand Mob Compaction Partition policy from daily to weekly, monthly

Support weekly and monthly mob compact partition policies in addition to the existing
daily partition policy.

Signed-off-by: Jingcheng Du <jingchengdu@apache.org>
This commit is contained in:
Huaxiang Sun 2017-02-01 08:20:52 -08:00 committed by Jingcheng Du
parent 63c819efbe
commit 1159296541
10 changed files with 842 additions and 58 deletions

View File

@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.HBaseException; import org.apache.hadoop.hbase.exceptions.HBaseException;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
@ -41,7 +42,6 @@ import org.apache.hadoop.hbase.util.PrettyPrinter.Unit;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/** /**
* An HColumnDescriptor contains information about a column family such as the * An HColumnDescriptor contains information about a column family such as the
* number of versions, compression settings, etc. * number of versions, compression settings, etc.
@ -130,6 +130,11 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
public static final String MOB_THRESHOLD = "MOB_THRESHOLD"; public static final String MOB_THRESHOLD = "MOB_THRESHOLD";
public static final byte[] MOB_THRESHOLD_BYTES = Bytes.toBytes(MOB_THRESHOLD); public static final byte[] MOB_THRESHOLD_BYTES = Bytes.toBytes(MOB_THRESHOLD);
public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k
public static final String MOB_COMPACT_PARTITION_POLICY = "MOB_COMPACT_PARTITION_POLICY";
public static final byte[] MOB_COMPACT_PARTITION_POLICY_BYTES =
Bytes.toBytes(MOB_COMPACT_PARTITION_POLICY);
public static final MobCompactPartitionPolicy DEFAULT_MOB_COMPACT_PARTITION_POLICY =
MobCompactPartitionPolicy.DAILY;
public static final String DFS_REPLICATION = "DFS_REPLICATION"; public static final String DFS_REPLICATION = "DFS_REPLICATION";
public static final short DEFAULT_DFS_REPLICATION = 0; public static final short DEFAULT_DFS_REPLICATION = 0;
@ -276,6 +281,7 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY))); RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY)));
RESERVED_KEYWORDS.add(new Bytes(IS_MOB_BYTES)); RESERVED_KEYWORDS.add(new Bytes(IS_MOB_BYTES));
RESERVED_KEYWORDS.add(new Bytes(MOB_THRESHOLD_BYTES)); RESERVED_KEYWORDS.add(new Bytes(MOB_THRESHOLD_BYTES));
RESERVED_KEYWORDS.add(new Bytes(MOB_COMPACT_PARTITION_POLICY_BYTES));
} }
private static final int UNINITIALIZED = -1; private static final int UNINITIALIZED = -1;
@ -438,8 +444,7 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
if (Bytes.compareTo(Bytes.toBytes(HConstants.VERSIONS), key) == 0) { if (Bytes.compareTo(Bytes.toBytes(HConstants.VERSIONS), key) == 0) {
cachedMaxVersions = UNINITIALIZED; cachedMaxVersions = UNINITIALIZED;
} }
values.put(new Bytes(key), values.put(new Bytes(key), new Bytes(value));
new Bytes(value));
return this; return this;
} }
@ -1020,7 +1025,7 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
public static Unit getUnit(String key) { public static Unit getUnit(String key) {
Unit unit; Unit unit;
/* TTL for now, we can add more as we neeed */ /* TTL for now, we can add more as we need */
if (key.equals(HColumnDescriptor.TTL)) { if (key.equals(HColumnDescriptor.TTL)) {
unit = Unit.TIME_INTERVAL; unit = Unit.TIME_INTERVAL;
} else if (key.equals(HColumnDescriptor.MOB_THRESHOLD)) { } else if (key.equals(HColumnDescriptor.MOB_THRESHOLD)) {
@ -1222,6 +1227,28 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
return this; return this;
} }
/**
* Get the mob compact partition policy for this family
* @return MobCompactPartitionPolicy
*/
public MobCompactPartitionPolicy getMobCompactPartitionPolicy() {
String policy = getValue(MOB_COMPACT_PARTITION_POLICY);
if (policy == null) {
return DEFAULT_MOB_COMPACT_PARTITION_POLICY;
}
return MobCompactPartitionPolicy.valueOf(policy.toUpperCase(Locale.ROOT));
}
/**
* Set the mob compact partition policy for the family.
* @param policy policy type
* @return this (for chained invocation)
*/
public HColumnDescriptor setMobCompactPartitionPolicy(MobCompactPartitionPolicy policy) {
return setValue(MOB_COMPACT_PARTITION_POLICY, policy.toString().toUpperCase(Locale.ROOT));
}
/** /**
* @return replication factor set for this CF or {@link #DEFAULT_DFS_REPLICATION} if not set. * @return replication factor set for this CF or {@link #DEFAULT_DFS_REPLICATION} if not set.
* <p> * <p>

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Enum describing the mob compact partition policy types.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public enum MobCompactPartitionPolicy {
/**
* Compact daily mob files into one file
*/
DAILY,
/**
* Compact mob files within one calendar week into one file
*/
WEEKLY,
/**
* Compact mob files within one calendar month into one file
*/
MONTHLY
}

View File

@ -114,12 +114,16 @@ public class TestHColumnDescriptor {
public void testMobValuesInHColumnDescriptorShouldReadable() { public void testMobValuesInHColumnDescriptorShouldReadable() {
boolean isMob = true; boolean isMob = true;
long threshold = 1000; long threshold = 1000;
String policy = "weekly";
String isMobString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(isMob)), String isMobString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(isMob)),
HColumnDescriptor.getUnit(HColumnDescriptor.IS_MOB)); HColumnDescriptor.getUnit(HColumnDescriptor.IS_MOB));
String thresholdString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(threshold)), String thresholdString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(threshold)),
HColumnDescriptor.getUnit(HColumnDescriptor.MOB_THRESHOLD)); HColumnDescriptor.getUnit(HColumnDescriptor.MOB_THRESHOLD));
String policyString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(policy)),
HColumnDescriptor.getUnit(HColumnDescriptor.MOB_COMPACT_PARTITION_POLICY));
assertEquals(String.valueOf(isMob), isMobString); assertEquals(String.valueOf(isMob), isMobString);
assertEquals(String.valueOf(threshold), thresholdString); assertEquals(String.valueOf(threshold), thresholdString);
assertEquals(String.valueOf(policy), policyString);
} }
@Test @Test

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
@ -63,6 +65,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.mob.compactions.MobCompactor; import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
@ -81,6 +85,8 @@ import org.apache.hadoop.hbase.util.Threads;
public final class MobUtils { public final class MobUtils {
private static final Log LOG = LogFactory.getLog(MobUtils.class); private static final Log LOG = LogFactory.getLog(MobUtils.class);
private final static long WEEKLY_THRESHOLD_MULTIPLIER = 7;
private final static long MONTHLY_THRESHOLD_MULTIPLIER = 4 * WEEKLY_THRESHOLD_MULTIPLIER;
private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT = private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
new ThreadLocal<SimpleDateFormat>() { new ThreadLocal<SimpleDateFormat>() {
@ -122,6 +128,45 @@ public final class MobUtils {
return LOCAL_FORMAT.get().parse(dateString); return LOCAL_FORMAT.get().parse(dateString);
} }
/**
* Get the first day of the input date's month
* @param calendar Calendar object
* @param date The date to find out its first day of that month
* @return The first day in the month
*/
public static Date getFirstDayOfMonth(final Calendar calendar, final Date date) {
calendar.setTime(date);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
calendar.set(Calendar.DAY_OF_MONTH, 1);
Date firstDayInMonth = calendar.getTime();
return firstDayInMonth;
}
/**
* Get the first day of the input date's week
* @param calendar Calendar object
* @param date The date to find out its first day of that week
* @return The first day in the week
*/
public static Date getFirstDayOfWeek(final Calendar calendar, final Date date) {
calendar.setTime(date);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
calendar.setFirstDayOfWeek(Calendar.MONDAY);
calendar.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY);
Date firstDayInWeek = calendar.getTime();
return firstDayInWeek;
}
/** /**
* Whether the current cell is a mob reference cell. * Whether the current cell is a mob reference cell.
* @param cell The current cell. * @param cell The current cell.
@ -247,8 +292,14 @@ public final class MobUtils {
return; return;
} }
Date expireDate = new Date(current - timeToLive * 1000); Calendar calendar = Calendar.getInstance();
expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate()); calendar.setTimeInMillis(current - timeToLive * 1000);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
Date expireDate = calendar.getTime();
LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!"); LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
FileStatus[] stats = null; FileStatus[] stats = null;
@ -268,14 +319,13 @@ public final class MobUtils {
for (FileStatus file : stats) { for (FileStatus file : stats) {
String fileName = file.getPath().getName(); String fileName = file.getPath().getName();
try { try {
MobFileName mobFileName = null; if (HFileLink.isHFileLink(file.getPath())) {
if (!HFileLink.isHFileLink(file.getPath())) {
mobFileName = MobFileName.create(fileName);
} else {
HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
mobFileName = MobFileName.create(hfileLink.getOriginPath().getName()); fileName = hfileLink.getOriginPath().getName();
} }
Date fileDate = parseDate(mobFileName.getDate());
Date fileDate = parseDate(MobFileName.getDateFromName(fileName));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Checking file " + fileName); LOG.debug("Checking file " + fileName);
} }
@ -471,8 +521,8 @@ public final class MobUtils {
Compression.Algorithm compression, String startKey, CacheConfig cacheConfig, Compression.Algorithm compression, String startKey, CacheConfig cacheConfig,
Encryption.Context cryptoContext) Encryption.Context cryptoContext)
throws IOException { throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString() MobFileName mobFileName = MobFileName.create(startKey, date,
.replaceAll("-", "")); UUID.randomUUID().toString().replaceAll("-", ""));
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
cacheConfig, cryptoContext); cacheConfig, cryptoContext);
} }
@ -527,8 +577,8 @@ public final class MobUtils {
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig, Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
Encryption.Context cryptoContext) Encryption.Context cryptoContext)
throws IOException { throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString() MobFileName mobFileName = MobFileName.create(startKey, date,
.replaceAll("-", "")); UUID.randomUUID().toString().replaceAll("-", ""));
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
cacheConfig, cryptoContext); cacheConfig, cryptoContext);
} }
@ -854,4 +904,87 @@ public final class MobUtils {
} }
return false; return false;
} }
/**
* fill out partition id based on compaction policy and date, threshold...
* @param id Partition id to be filled out
* @param firstDayOfCurrentMonth The first day in the current month
* @param firstDayOfCurrentWeek The first day in the current week
* @param dateStr Date string from the mob file
* @param policy Mob compaction policy
* @param calendar Calendar object
* @param threshold Mob compaciton threshold configured
* @return true if the file needs to be excluded from compaction
*/
public static boolean fillPartitionId(final CompactionPartitionId id,
final Date firstDayOfCurrentMonth, final Date firstDayOfCurrentWeek, final String dateStr,
final MobCompactPartitionPolicy policy, final Calendar calendar, final long threshold) {
boolean skipCompcation = false;
id.setThreshold(threshold);
if (threshold <= 0) {
id.setDate(dateStr);
return skipCompcation;
}
long finalThreshold;
Date date;
try {
date = MobUtils.parseDate(dateStr);
} catch (ParseException e) {
LOG.warn("Failed to parse date " + dateStr, e);
id.setDate(dateStr);
return true;
}
/* The algorithm works as follows:
* For monthly policy:
* 1). If the file's date is in past months, apply 4 * 7 * threshold
* 2). If the file's date is in past weeks, apply 7 * threshold
* 3). If the file's date is in current week, exclude it from the compaction
* For weekly policy:
* 1). If the file's date is in past weeks, apply 7 * threshold
* 2). If the file's date in currently, apply threshold
* For daily policy:
* 1). apply threshold
*/
if (policy == MobCompactPartitionPolicy.MONTHLY) {
if (date.before(firstDayOfCurrentMonth)) {
// Check overflow
if (threshold < (Long.MAX_VALUE / MONTHLY_THRESHOLD_MULTIPLIER)) {
finalThreshold = MONTHLY_THRESHOLD_MULTIPLIER * threshold;
} else {
finalThreshold = Long.MAX_VALUE;
}
id.setThreshold(finalThreshold);
// set to the date for the first day of that month
id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfMonth(calendar, date)));
return skipCompcation;
}
}
if ((policy == MobCompactPartitionPolicy.MONTHLY) ||
(policy == MobCompactPartitionPolicy.WEEKLY)) {
// Check if it needs to apply weekly multiplier
if (date.before(firstDayOfCurrentWeek)) {
// Check overflow
if (threshold < (Long.MAX_VALUE / WEEKLY_THRESHOLD_MULTIPLIER)) {
finalThreshold = WEEKLY_THRESHOLD_MULTIPLIER * threshold;
} else {
finalThreshold = Long.MAX_VALUE;
}
id.setThreshold(finalThreshold);
id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfWeek(calendar, date)));
return skipCompcation;
} else if (policy == MobCompactPartitionPolicy.MONTHLY) {
skipCompcation = true;
}
}
// Rest is daily
id.setDate(dateStr);
return skipCompcation;
}
} }

View File

@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/** /**
@ -98,11 +99,15 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
public static class CompactionPartitionId { public static class CompactionPartitionId {
private String startKey; private String startKey;
private String date; private String date;
private String latestDate;
private long threshold;
public CompactionPartitionId() { public CompactionPartitionId() {
// initialize these fields to empty string // initialize these fields to empty string
this.startKey = ""; this.startKey = "";
this.date = ""; this.date = "";
this.latestDate = "";
this.threshold = 0;
} }
public CompactionPartitionId(String startKey, String date) { public CompactionPartitionId(String startKey, String date) {
@ -111,6 +116,16 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
} }
this.startKey = startKey; this.startKey = startKey;
this.date = date; this.date = date;
this.latestDate = "";
this.threshold = 0;
}
public void setThreshold (final long threshold) {
this.threshold = threshold;
}
public long getThreshold () {
return this.threshold;
} }
public String getStartKey() { public String getStartKey() {
@ -129,6 +144,14 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
this.date = date; this.date = date;
} }
public String getLatestDate () { return this.latestDate; }
public void updateLatestDate(final String latestDate) {
if (this.latestDate.compareTo(latestDate) < 0) {
this.latestDate = latestDate;
}
}
@Override @Override
public int hashCode() { public int hashCode() {
int result = 17; int result = 17;

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mob.compactions;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HFileLink;
@ -151,6 +153,19 @@ public class PartitionedMobCompactor extends MobCompactor {
final CompactionPartitionId id = new CompactionPartitionId(); final CompactionPartitionId id = new CompactionPartitionId();
int selectedFileCount = 0; int selectedFileCount = 0;
int irrelevantFileCount = 0; int irrelevantFileCount = 0;
MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy();
Calendar calendar = Calendar.getInstance();
Date currentDate = new Date();
Date firstDayOfCurrentMonth = null;
Date firstDayOfCurrentWeek = null;
if (policy == MobCompactPartitionPolicy.MONTHLY) {
firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, currentDate);
firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate);
} else if (policy == MobCompactPartitionPolicy.WEEKLY) {
firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate);
}
for (FileStatus file : candidates) { for (FileStatus file : candidates) {
if (!file.isFile()) { if (!file.isFile()) {
@ -170,25 +185,34 @@ public class PartitionedMobCompactor extends MobCompactor {
} }
if (StoreFileInfo.isDelFile(linkedFile.getPath())) { if (StoreFileInfo.isDelFile(linkedFile.getPath())) {
allDelFiles.add(file); allDelFiles.add(file);
} else if (allFiles || (linkedFile.getLen() < mergeableSize)) { } else {
String fileName = linkedFile.getPath().getName();
String date = MobFileName.getDateFromName(fileName);
boolean skipCompaction = MobUtils.fillPartitionId(id, firstDayOfCurrentMonth,
firstDayOfCurrentWeek, date, policy, calendar, mergeableSize);
if (allFiles || (!skipCompaction && (linkedFile.getLen() < id.getThreshold()))) {
// add all files if allFiles is true, // add all files if allFiles is true,
// otherwise add the small files to the merge pool // otherwise add the small files to the merge pool
String fileName = linkedFile.getPath().getName(); // filter out files which are not supposed to be compacted with the
// current policy
id.setStartKey(MobFileName.getStartKeyFromName(fileName)); id.setStartKey(MobFileName.getStartKeyFromName(fileName));
id.setDate(MobFileName.getDateFromName(fileName));
CompactionPartition compactionPartition = filesToCompact.get(id); CompactionPartition compactionPartition = filesToCompact.get(id);
if (compactionPartition == null) { if (compactionPartition == null) {
CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate()); CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate());
compactionPartition = new CompactionPartition(newId); compactionPartition = new CompactionPartition(newId);
compactionPartition.addFile(file); compactionPartition.addFile(file);
filesToCompact.put(newId, compactionPartition); filesToCompact.put(newId, compactionPartition);
newId.updateLatestDate(date);
} else { } else {
compactionPartition.addFile(file); compactionPartition.addFile(file);
compactionPartition.getPartitionId().updateLatestDate(date);
} }
selectedFileCount++; selectedFileCount++;
} }
} }
}
/* /*
* If it is not a major mob compaction with del files, and the file number in Partition is 1, * If it is not a major mob compaction with del files, and the file number in Partition is 1,
@ -437,7 +461,7 @@ public class PartitionedMobCompactor extends MobCompactor {
try { try {
try { try {
writer = MobUtils writer = MobUtils
.createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath, .createWriter(conf, fs, column, partition.getPartitionId().getLatestDate(), tempPath,
Long.MAX_VALUE, column.getCompactionCompressionType(), Long.MAX_VALUE, column.getCompactionCompressionType(),
partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext); partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext);
cleanupTmpMobFile = true; cleanupTmpMobFile = true;

View File

@ -38,6 +38,8 @@ import java.util.concurrent.TimeUnit;
import javax.crypto.spec.SecretKeySpec; import javax.crypto.spec.SecretKeySpec;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -59,6 +61,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
@ -74,6 +77,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
@ -86,6 +90,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -95,6 +100,7 @@ import org.junit.experimental.categories.Category;
@Category(LargeTests.class) @Category(LargeTests.class)
public class TestMobCompactor { public class TestMobCompactor {
private static final Log LOG = LogFactory.getLog(TestMobCompactor.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Configuration conf = null; private static Configuration conf = null;
private TableName tableName; private TableName tableName;
@ -110,6 +116,43 @@ public class TestMobCompactor {
private static final String family2 = "family2"; private static final String family2 = "family2";
private static final String qf1 = "qualifier1"; private static final String qf1 = "qualifier1";
private static final String qf2 = "qualifier2"; private static final String qf2 = "qualifier2";
private static final long tsFor20150907Monday = 1441654904000L;
private static final long tsFor20151120Sunday = 1448051213000L;
private static final long tsFor20151128Saturday = 1448734396000L;
private static final long tsFor20151130Monday = 1448874000000L;
private static final long tsFor20151201Tuesday = 1448960400000L;
private static final long tsFor20151205Saturday = 1449306000000L;
private static final long tsFor20151228Monday = 1451293200000L;
private static final long tsFor20151231Thursday = 1451552400000L;
private static final long tsFor20160101Friday = 1451638800000L;
private static final long tsFor20160103Sunday = 1451844796000L;
private static final byte[] mobKey01 = Bytes.toBytes("r01");
private static final byte[] mobKey02 = Bytes.toBytes("r02");
private static final byte[] mobKey03 = Bytes.toBytes("r03");
private static final byte[] mobKey04 = Bytes.toBytes("r04");
private static final byte[] mobKey05 = Bytes.toBytes("r05");
private static final byte[] mobKey06 = Bytes.toBytes("r05");
private static final byte[] mobKey1 = Bytes.toBytes("r1");
private static final byte[] mobKey2 = Bytes.toBytes("r2");
private static final byte[] mobKey3 = Bytes.toBytes("r3");
private static final byte[] mobKey4 = Bytes.toBytes("r4");
private static final byte[] mobKey5 = Bytes.toBytes("r5");
private static final byte[] mobKey6 = Bytes.toBytes("r6");
private static final byte[] mobKey7 = Bytes.toBytes("r7");
private static final byte[] mobKey8 = Bytes.toBytes("r8");
private static final String mobValue0 = "mobValue00000000000000000000000000";
private static final String mobValue1 = "mobValue00000111111111111111111111";
private static final String mobValue2 = "mobValue00000222222222222222222222";
private static final String mobValue3 = "mobValue00000333333333333333333333";
private static final String mobValue4 = "mobValue00000444444444444444444444";
private static final String mobValue5 = "mobValue00000666666666666666666666";
private static final String mobValue6 = "mobValue00000777777777777777777777";
private static final String mobValue7 = "mobValue00000888888888888888888888";
private static final String mobValue8 = "mobValue00000888888888888888888899";
private static byte[] KEYS = Bytes.toBytes("012"); private static byte[] KEYS = Bytes.toBytes("012");
private static int regionNum = KEYS.length; private static int regionNum = KEYS.length;
private static int delRowNum = 1; private static int delRowNum = 1;
@ -128,6 +171,7 @@ public class TestMobCompactor {
TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 1); TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 1);
TEST_UTIL.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 100); TEST_UTIL.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 100);
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
pool = createThreadPool(TEST_UTIL.getConfiguration()); pool = createThreadPool(TEST_UTIL.getConfiguration());
conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool); conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool);
@ -159,6 +203,37 @@ public class TestMobCompactor {
bufMut = conn.getBufferedMutator(tableName); bufMut = conn.getBufferedMutator(tableName);
} }
// Set up for mob compaction policy testing
private void setUpForPolicyTest(String tableNameAsString, MobCompactPartitionPolicy type)
throws IOException {
tableName = TableName.valueOf(tableNameAsString);
hcd1 = new HColumnDescriptor(family1);
hcd1.setMobEnabled(true);
hcd1.setMobThreshold(10);
hcd1.setMobCompactPartitionPolicy(type);
desc = new HTableDescriptor(tableName);
desc.addFamily(hcd1);
admin.createTable(desc);
table = conn.getTable(tableName);
bufMut = conn.getBufferedMutator(tableName);
}
// alter mob compaction policy
private void alterForPolicyTest(final MobCompactPartitionPolicy type)
throws Exception {
hcd1.setMobCompactPartitionPolicy(type);
desc.modifyFamily(hcd1);
admin.modifyTable(tableName, desc);
Pair<Integer, Integer> st;
while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) {
LOG.debug(st.getFirst() + " regions left to update");
Thread.sleep(40);
}
LOG.info("alter status finished");
}
@Test(timeout = 300000) @Test(timeout = 300000)
public void testMinorCompaction() throws Exception { public void testMinorCompaction() throws Exception {
resetConf(); resetConf();
@ -219,6 +294,128 @@ public class TestMobCompactor {
countFiles(tableName, false, family2)); countFiles(tableName, false, family2));
} }
private void waitUntilFilesShowup(final TableName table, final String famStr, final int num)
throws InterruptedException, IOException {
HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(table).get(0);
// Make sure that it is flushed.
FileSystem fs = r.getRegionFileSystem().getFileSystem();
Path path = r.getRegionFileSystem().getStoreDir(famStr);
FileStatus[] fileList = fs.listStatus(path);
while (fileList.length != num) {
Thread.sleep(50);
fileList = fs.listStatus(path);
}
}
private int numberOfMobFiles(final TableName table, final String famStr)
throws IOException {
HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(table).get(0);
// Make sure that it is flushed.
FileSystem fs = r.getRegionFileSystem().getFileSystem();
Path path = r.getRegionFileSystem().getStoreDir(famStr);
FileStatus[] fileList = fs.listStatus(path);
return fileList.length;
}
@Test
public void testMinorCompactionWithWeeklyPolicy() throws Exception {
resetConf();
int mergeSize = 5000;
// change the mob compaction merge size
conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
commonPolicyTestLogic("testMinorCompactionWithWeeklyPolicy",
MobCompactPartitionPolicy.WEEKLY, false, 6,
new String[] { "20150907", "20151120", "20151128", "20151130", "20151205", "20160103" },
true);
}
@Test
public void testMajorCompactionWithWeeklyPolicy() throws Exception {
resetConf();
commonPolicyTestLogic("testMajorCompactionWithWeeklyPolicy",
MobCompactPartitionPolicy.WEEKLY, true, 5,
new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
}
@Test
public void testMinorCompactionWithMonthlyPolicy() throws Exception {
resetConf();
int mergeSize = 5000;
// change the mob compaction merge size
conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
commonPolicyTestLogic("testMinorCompactionWithMonthlyPolicy",
MobCompactPartitionPolicy.MONTHLY, false, 4,
new String[] { "20150907", "20151130", "20151231", "20160103" }, true);
}
@Test
public void testMajorCompactionWithMonthlyPolicy() throws Exception {
resetConf();
commonPolicyTestLogic("testMajorCompactionWithMonthlyPolicy",
MobCompactPartitionPolicy.MONTHLY, true, 4,
new String[] {"20150907", "20151130", "20151231", "20160103"}, true);
}
@Test
public void testMajorCompactionWithWeeklyFollowedByMonthly() throws Exception {
resetConf();
commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthly",
MobCompactPartitionPolicy.WEEKLY, true, 5,
new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthly",
MobCompactPartitionPolicy.MONTHLY, true, 4,
new String[] {"20150907", "20151128", "20151205", "20160103" }, false);
}
@Test
public void testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly() throws Exception {
resetConf();
commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly",
MobCompactPartitionPolicy.WEEKLY, true, 5,
new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly",
MobCompactPartitionPolicy.MONTHLY, true, 4,
new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly",
MobCompactPartitionPolicy.WEEKLY, true, 4,
new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
}
@Test
public void testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily() throws Exception {
resetConf();
commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily",
MobCompactPartitionPolicy.WEEKLY, true, 5,
new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true);
commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily",
MobCompactPartitionPolicy.MONTHLY, true, 4,
new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily",
MobCompactPartitionPolicy.DAILY, true, 4,
new String[] { "20150907", "20151128", "20151205", "20160103" }, false);
}
@Test(timeout = 300000) @Test(timeout = 300000)
public void testCompactionWithHFileLink() throws IOException, InterruptedException { public void testCompactionWithHFileLink() throws IOException, InterruptedException {
resetConf(); resetConf();
@ -716,6 +913,65 @@ public class TestMobCompactor {
admin.flush(tableName); admin.flush(tableName);
} }
private void loadDataForPartitionPolicy(Admin admin, BufferedMutator table, TableName tableName)
throws IOException {
Put[] pArray = new Put[1000];
for (int i = 0; i < 1000; i ++) {
Put put0 = new Put(Bytes.toBytes("r0" + i));
put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151130Monday, Bytes.toBytes(mobValue0));
pArray[i] = put0;
}
loadData(admin, bufMut, tableName, pArray);
Put put06 = new Put(mobKey06);
put06.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151128Saturday, Bytes.toBytes(mobValue0));
loadData(admin, bufMut, tableName, new Put[] { put06 });
Put put1 = new Put(mobKey1);
put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151201Tuesday,
Bytes.toBytes(mobValue1));
loadData(admin, bufMut, tableName, new Put[] { put1 });
Put put2 = new Put(mobKey2);
put2.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151205Saturday,
Bytes.toBytes(mobValue2));
loadData(admin, bufMut, tableName, new Put[] { put2 });
Put put3 = new Put(mobKey3);
put3.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151228Monday,
Bytes.toBytes(mobValue3));
loadData(admin, bufMut, tableName, new Put[] { put3 });
Put put4 = new Put(mobKey4);
put4.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151231Thursday,
Bytes.toBytes(mobValue4));
loadData(admin, bufMut, tableName, new Put[] { put4 });
Put put5 = new Put(mobKey5);
put5.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20160101Friday,
Bytes.toBytes(mobValue5));
loadData(admin, bufMut, tableName, new Put[] { put5 });
Put put6 = new Put(mobKey6);
put6.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20160103Sunday,
Bytes.toBytes(mobValue6));
loadData(admin, bufMut, tableName, new Put[] { put6 });
Put put7 = new Put(mobKey7);
put7.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20150907Monday,
Bytes.toBytes(mobValue7));
loadData(admin, bufMut, tableName, new Put[] { put7 });
Put put8 = new Put(mobKey8);
put8.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151120Sunday,
Bytes.toBytes(mobValue8));
loadData(admin, bufMut, tableName, new Put[] { put8 });
}
/** /**
* delete the row, family and cell to create the del file * delete the row, family and cell to create the del file
*/ */
@ -833,4 +1089,127 @@ public class TestMobCompactor {
conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
} }
/**
* Verify mob partition policy compaction values.
*/
private void verifyPolicyValues() throws Exception {
Get get = new Get(mobKey01);
Result result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue0)));
get = new Get(mobKey02);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue0)));
get = new Get(mobKey03);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue0)));
get = new Get(mobKey04);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue0)));
get = new Get(mobKey05);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue0)));
get = new Get(mobKey06);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue0)));
get = new Get(mobKey1);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue1)));
get = new Get(mobKey2);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue2)));
get = new Get(mobKey3);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue3)));
get = new Get(mobKey4);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue4)));
get = new Get(mobKey5);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue5)));
get = new Get(mobKey6);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue6)));
get = new Get(mobKey7);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue7)));
get = new Get(mobKey8);
result = table.get(get);
assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)),
Bytes.toBytes(mobValue8)));
}
private void commonPolicyTestLogic (final String tableNameAsString,
final MobCompactPartitionPolicy pType, final boolean majorCompact,
final int expectedFileNumbers, final String[] expectedFileNames,
final boolean setupAndLoadData
) throws Exception {
if (setupAndLoadData) {
setUpForPolicyTest(tableNameAsString, pType);
loadDataForPartitionPolicy(admin, bufMut, tableName);
} else {
alterForPolicyTest(pType);
}
if (majorCompact) {
admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB);
} else {
admin.compact(tableName, hcd1.getName(), CompactType.MOB);
}
waitUntilMobCompactionFinished(tableName);
// Run cleaner to make sure that files in archive directory are cleaned up
TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting();
//check the number of files
Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, family1);
FileStatus[] fileList = fs.listStatus(mobDirPath);
assertTrue(fileList.length == expectedFileNumbers);
// the file names are expected
ArrayList<String> fileNames = new ArrayList<>(expectedFileNumbers);
for (FileStatus file : fileList) {
fileNames.add(MobFileName.getDateFromName(file.getPath().getName()));
}
int index = 0;
for (String fileName : expectedFileNames) {
index = fileNames.indexOf(fileName);
assertTrue(index >= 0);
fileNames.remove(index);
}
// Check daily mob files are removed from the mobdir, and only weekly mob files are there.
// Also check that there is no data loss.
verifyPolicyValues();
}
} }

View File

@ -19,13 +19,8 @@
package org.apache.hadoop.hbase.mob.compactions; package org.apache.hadoop.hbase.mob.compactions;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.text.ParseException;
import java.util.Collection; import java.util.*;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
@ -33,12 +28,15 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
import org.apache.hadoop.hbase.regionserver.*; import org.apache.hadoop.hbase.regionserver.*;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -63,9 +61,11 @@ import org.junit.experimental.categories.Category;
@Category(LargeTests.class) @Category(LargeTests.class)
public class TestPartitionedMobCompactor { public class TestPartitionedMobCompactor {
private static final Log LOG = LogFactory.getLog(TestPartitionedMobCompactor.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static String family = "family"; private final static String family = "family";
private final static String qf = "qf"; private final static String qf = "qf";
private final long DAY_IN_MS = 1000 * 60 * 60 * 24;
private HColumnDescriptor hcd = new HColumnDescriptor(family); private HColumnDescriptor hcd = new HColumnDescriptor(family);
private Configuration conf = TEST_UTIL.getConfiguration(); private Configuration conf = TEST_UTIL.getConfiguration();
private CacheConfig cacheConf = new CacheConfig(conf); private CacheConfig cacheConf = new CacheConfig(conf);
@ -103,6 +103,109 @@ public class TestPartitionedMobCompactor {
delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
} }
@Test
public void testCompactionSelectAllFilesWeeklyPolicy() throws Exception {
String tableName = "testCompactionSelectAllFilesWeeklyPolicy";
testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
CompactionType.ALL_FILES, false, false, new Date(), MobCompactPartitionPolicy.WEEKLY, 1);
}
@Test
public void testCompactionSelectPartFilesWeeklyPolicy() throws Exception {
String tableName = "testCompactionSelectPartFilesWeeklyPolicy";
testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false,
new Date(), MobCompactPartitionPolicy.WEEKLY, 1);
}
@Test
public void testCompactionSelectPartFilesWeeklyPolicyWithPastWeek() throws Exception {
String tableName = "testCompactionSelectPartFilesWeeklyPolicyWithPastWeek";
Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
testCompactionAtMergeSize(tableName, 700, CompactionType.PART_FILES, false, false, dateLastWeek,
MobCompactPartitionPolicy.WEEKLY, 7);
}
@Test
public void testCompactionSelectAllFilesWeeklyPolicyWithPastWeek() throws Exception {
String tableName = "testCompactionSelectAllFilesWeeklyPolicyWithPastWeek";
Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES,
false, false, dateLastWeek, MobCompactPartitionPolicy.WEEKLY, 7);
}
@Test
public void testCompactionSelectAllFilesMonthlyPolicy() throws Exception {
String tableName = "testCompactionSelectAllFilesMonthlyPolicy";
Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
CompactionType.ALL_FILES, false, false, dateLastWeek,
MobCompactPartitionPolicy.MONTHLY, 7);
}
@Test
public void testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy() throws Exception {
String tableName = "testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy";
testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD,
CompactionType.PART_FILES, false, false, new Date(), MobCompactPartitionPolicy.MONTHLY, 1);
}
@Test
public void testCompactionSelectPartFilesMonthlyPolicy() throws Exception {
String tableName = "testCompactionSelectPartFilesMonthlyPolicy";
testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false,
new Date(), MobCompactPartitionPolicy.MONTHLY, 1);
}
@Test
public void testCompactionSelectPartFilesMonthlyPolicyWithPastWeek() throws Exception {
String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastWeek";
Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
Calendar calendar = Calendar.getInstance();
Date firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, new Date());
CompactionType type = CompactionType.PART_FILES;
long mergeSizeMultiFactor = 7;
// The dateLastWeek may not really be last week, suppose that it runs at 2/1/2017, it is going
// to be last month and the monthly policy is going to be applied here.
if (dateLastWeek.before(firstDayOfCurrentMonth)) {
type = CompactionType.ALL_FILES;
mergeSizeMultiFactor *= 4;
}
testCompactionAtMergeSize(tableName, 700, type, false, false, dateLastWeek,
MobCompactPartitionPolicy.MONTHLY, mergeSizeMultiFactor);
}
@Test
public void testCompactionSelectAllFilesMonthlyPolicyWithPastWeek() throws Exception {
String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastWeek";
Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS));
testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES,
false, false, dateLastWeek, MobCompactPartitionPolicy.MONTHLY, 7);
}
@Test
public void testCompactionSelectPartFilesMonthlyPolicyWithPastMonth() throws Exception {
String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastMonth";
// back 5 weeks, it is going to be a past month
Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS));
testCompactionAtMergeSize(tableName, 200, CompactionType.PART_FILES, false, false, dateLastMonth,
MobCompactPartitionPolicy.MONTHLY, 28);
}
@Test
public void testCompactionSelectAllFilesMonthlyPolicyWithPastMonth() throws Exception {
String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastMonth";
// back 5 weeks, it is going to be a past month
Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS));
testCompactionAtMergeSize(tableName, 750, CompactionType.ALL_FILES,
false, false, dateLastMonth, MobCompactPartitionPolicy.MONTHLY, 28);
}
@Test @Test
public void testCompactionSelectWithAllFiles() throws Exception { public void testCompactionSelectWithAllFiles() throws Exception {
String tableName = "testCompactionSelectWithAllFiles"; String tableName = "testCompactionSelectWithAllFiles";
@ -121,7 +224,6 @@ public class TestPartitionedMobCompactor {
CompactionType.PART_FILES, false); CompactionType.PART_FILES, false);
} }
@Test @Test
public void testCompactionSelectWithPartFiles() throws Exception { public void testCompactionSelectWithPartFiles() throws Exception {
String tableName = "testCompactionSelectWithPartFiles"; String tableName = "testCompactionSelectWithPartFiles";
@ -144,34 +246,76 @@ public class TestPartitionedMobCompactor {
final long mergeSize, final CompactionType type, final boolean isForceAllFiles, final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
final boolean createDelFiles) final boolean createDelFiles)
throws Exception { throws Exception {
Date date = new Date();
testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date);
}
private void testCompactionAtMergeSize(final String tableName,
final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
final boolean createDelFiles, final Date date)
throws Exception {
testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date,
MobCompactPartitionPolicy.DAILY, 1);
}
private void testCompactionAtMergeSize(final String tableName,
final long mergeSize, final CompactionType type, final boolean isForceAllFiles,
final boolean createDelFiles, final Date date, final MobCompactPartitionPolicy policy,
final long mergeSizeMultiFactor)
throws Exception {
resetConf(); resetConf();
init(tableName); init(tableName);
int count = 10; int count = 10;
// create 10 mob files. // create 10 mob files.
createStoreFiles(basePath, family, qf, count, Type.Put); createStoreFiles(basePath, family, qf, count, Type.Put, date);
if (createDelFiles) { if (createDelFiles) {
// create 10 del files // create 10 del files
createStoreFiles(basePath, family, qf, count, Type.Delete); createStoreFiles(basePath, family, qf, count, Type.Delete, date);
} }
Calendar calendar = Calendar.getInstance();
Date firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, new Date());
listFiles(); listFiles();
List<String> expectedStartKeys = new ArrayList<>(); List<String> expectedStartKeys = new ArrayList<>();
for(FileStatus file : mobFiles) { for(FileStatus file : mobFiles) {
if(file.getLen() < mergeSize) { if(file.getLen() < mergeSize * mergeSizeMultiFactor) {
String fileName = file.getPath().getName(); String fileName = file.getPath().getName();
String startKey = fileName.substring(0, 32); String startKey = fileName.substring(0, 32);
// If the policy is monthly and files are in current week, they will be skipped
// in minor compcation.
boolean skipCompaction = false;
if (policy == MobCompactPartitionPolicy.MONTHLY) {
String fileDateStr = MobFileName.getDateFromName(fileName);
Date fileDate;
try {
fileDate = MobUtils.parseDate(fileDateStr);
} catch (ParseException e) {
LOG.warn("Failed to parse date " + fileDateStr, e);
fileDate = new Date();
}
if (!fileDate.before(firstDayOfCurrentWeek)) {
skipCompaction = true;
}
}
// If it is not an major mob compaction and del files are there, // If it is not an major mob compaction and del files are there,
// these mob files wont be compacted. // these mob files wont be compacted.
if (isForceAllFiles || !createDelFiles) { if (isForceAllFiles || (!createDelFiles && !skipCompaction)) {
expectedStartKeys.add(startKey); expectedStartKeys.add(startKey);
} }
} }
} }
// Set the policy
this.hcd.setMobCompactPartitionPolicy(policy);
// set the mob compaction mergeable threshold // set the mob compaction mergeable threshold
conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize);
testSelectFiles(tableName, type, isForceAllFiles, expectedStartKeys); testSelectFiles(tableName, type, isForceAllFiles, expectedStartKeys);
// go back to the default daily policy
this.hcd.setMobCompactPartitionPolicy(MobCompactPartitionPolicy.DAILY);
} }
@Test @Test
@ -205,7 +349,7 @@ public class TestPartitionedMobCompactor {
try { try {
int count = 2; int count = 2;
// create 2 mob files. // create 2 mob files.
createStoreFiles(basePath, family, qf, count, Type.Put, true); createStoreFiles(basePath, family, qf, count, Type.Put, true, new Date());
listFiles(); listFiles();
TableName tName = TableName.valueOf(tableName); TableName tName = TableName.valueOf(tableName);
@ -243,9 +387,9 @@ public class TestPartitionedMobCompactor {
resetConf(); resetConf();
init(tableName); init(tableName);
// create 20 mob files. // create 20 mob files.
createStoreFiles(basePath, family, qf, 20, Type.Put); createStoreFiles(basePath, family, qf, 20, Type.Put, new Date());
// create 13 del files // create 13 del files
createStoreFiles(basePath, family, qf, 13, Type.Delete); createStoreFiles(basePath, family, qf, 13, Type.Delete, new Date());
listFiles(); listFiles();
// set the max del file count // set the max del file count
@ -366,12 +510,12 @@ public class TestPartitionedMobCompactor {
* @type the key type * @type the key type
*/ */
private void createStoreFiles(Path basePath, String family, String qualifier, int count, private void createStoreFiles(Path basePath, String family, String qualifier, int count,
Type type) throws IOException { Type type, final Date date) throws IOException {
createStoreFiles(basePath, family, qualifier, count, type, false); createStoreFiles(basePath, family, qualifier, count, type, false, date);
} }
private void createStoreFiles(Path basePath, String family, String qualifier, int count, private void createStoreFiles(Path basePath, String family, String qualifier, int count,
Type type, boolean sameStartKey) throws IOException { Type type, boolean sameStartKey, final Date date) throws IOException {
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
String startKey = "row_"; String startKey = "row_";
MobFileName mobFileName = null; MobFileName mobFileName = null;
@ -386,12 +530,10 @@ public class TestPartitionedMobCompactor {
startRow = Bytes.toBytes(startKey + i); startRow = Bytes.toBytes(startKey + i);
} }
if(type.equals(Type.Delete)) { if(type.equals(Type.Delete)) {
mobFileName = MobFileName.create(startRow, MobUtils.formatDate( mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), delSuffix);
new Date()), delSuffix);
} }
if(type.equals(Type.Put)){ if(type.equals(Type.Put)){
mobFileName = MobFileName.create(startRow, MobUtils.formatDate( mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), mobSuffix);
new Date()), mobSuffix);
} }
StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs) StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
.withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build(); .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build();

View File

@ -878,6 +878,15 @@ module Hbase
storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
family.setStoragePolicy(storage_policy) family.setStoragePolicy(storage_policy)
end end
if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY)
mob_partition_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY).upcase
unless org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.include?(mob_partition_policy)
raise(ArgumentError, "MOB_COMPACT_PARTITION_POLICY #{mob_partition_policy} is not supported. Use one of " + org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.join(" "))
else
family.setMobCompactPartitionPolicy(org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.valueOf(mob_partition_policy))
end
end
set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA] set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]

View File

@ -38,6 +38,7 @@ Create a table with namespace=default and table qualifier=t1
hbase> create 't1', 'f1', 'f2', 'f3' hbase> create 't1', 'f1', 'f2', 'f3'
hbase> create 't1', {NAME => 'f1', VERSIONS => 1, TTL => 2592000, BLOCKCACHE => true} hbase> create 't1', {NAME => 'f1', VERSIONS => 1, TTL => 2592000, BLOCKCACHE => true}
hbase> create 't1', {NAME => 'f1', CONFIGURATION => {'hbase.hstore.blockingStoreFiles' => '10'}} hbase> create 't1', {NAME => 'f1', CONFIGURATION => {'hbase.hstore.blockingStoreFiles' => '10'}}
hbase> create 't1', {NAME => 'f1', IS_MOB => true, MOB_THRESHOLD => 1000000, MOB_COMPACT_PARTITION_POLICY => 'weekly'}
Table configuration options can be put at the end. Table configuration options can be put at the end.
Examples: Examples: