diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 0d557aa52a4..1597a06d27f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.HBaseException; import org.apache.hadoop.hbase.io.compress.Compression; @@ -41,7 +42,6 @@ import org.apache.hadoop.hbase.util.PrettyPrinter.Unit; import com.google.common.base.Preconditions; - /** * An HColumnDescriptor contains information about a column family such as the * number of versions, compression settings, etc. @@ -130,6 +130,11 @@ public class HColumnDescriptor implements Comparable { public static final String MOB_THRESHOLD = "MOB_THRESHOLD"; public static final byte[] MOB_THRESHOLD_BYTES = Bytes.toBytes(MOB_THRESHOLD); public static final long DEFAULT_MOB_THRESHOLD = 100 * 1024; // 100k + public static final String MOB_COMPACT_PARTITION_POLICY = "MOB_COMPACT_PARTITION_POLICY"; + public static final byte[] MOB_COMPACT_PARTITION_POLICY_BYTES = + Bytes.toBytes(MOB_COMPACT_PARTITION_POLICY); + public static final MobCompactPartitionPolicy DEFAULT_MOB_COMPACT_PARTITION_POLICY = + MobCompactPartitionPolicy.DAILY; public static final String DFS_REPLICATION = "DFS_REPLICATION"; public static final short DEFAULT_DFS_REPLICATION = 0; @@ -276,6 +281,7 @@ public class HColumnDescriptor implements Comparable { RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY))); RESERVED_KEYWORDS.add(new Bytes(IS_MOB_BYTES)); RESERVED_KEYWORDS.add(new Bytes(MOB_THRESHOLD_BYTES)); + RESERVED_KEYWORDS.add(new Bytes(MOB_COMPACT_PARTITION_POLICY_BYTES)); } private static final int UNINITIALIZED = -1; @@ -438,8 +444,7 @@ public class HColumnDescriptor implements Comparable { if (Bytes.compareTo(Bytes.toBytes(HConstants.VERSIONS), key) == 0) { cachedMaxVersions = UNINITIALIZED; } - values.put(new Bytes(key), - new Bytes(value)); + values.put(new Bytes(key), new Bytes(value)); return this; } @@ -1020,7 +1025,7 @@ public class HColumnDescriptor implements Comparable { public static Unit getUnit(String key) { Unit unit; - /* TTL for now, we can add more as we neeed */ + /* TTL for now, we can add more as we need */ if (key.equals(HColumnDescriptor.TTL)) { unit = Unit.TIME_INTERVAL; } else if (key.equals(HColumnDescriptor.MOB_THRESHOLD)) { @@ -1222,6 +1227,28 @@ public class HColumnDescriptor implements Comparable { return this; } + /** + * Get the mob compact partition policy for this family + * @return MobCompactPartitionPolicy + */ + public MobCompactPartitionPolicy getMobCompactPartitionPolicy() { + String policy = getValue(MOB_COMPACT_PARTITION_POLICY); + if (policy == null) { + return DEFAULT_MOB_COMPACT_PARTITION_POLICY; + } + + return MobCompactPartitionPolicy.valueOf(policy.toUpperCase(Locale.ROOT)); + } + + /** + * Set the mob compact partition policy for the family. + * @param policy policy type + * @return this (for chained invocation) + */ + public HColumnDescriptor setMobCompactPartitionPolicy(MobCompactPartitionPolicy policy) { + return setValue(MOB_COMPACT_PARTITION_POLICY, policy.toString().toUpperCase(Locale.ROOT)); + } + /** * @return replication factor set for this CF or {@link #DEFAULT_DFS_REPLICATION} if not set. *

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MobCompactPartitionPolicy.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MobCompactPartitionPolicy.java new file mode 100644 index 00000000000..f5505726705 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MobCompactPartitionPolicy.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Enum describing the mob compact partition policy types. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public enum MobCompactPartitionPolicy { + /** + * Compact daily mob files into one file + */ + DAILY, + /** + * Compact mob files within one calendar week into one file + */ + WEEKLY, + /** + * Compact mob files within one calendar month into one file + */ + MONTHLY +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java index c53fff2fe14..cabf55745d9 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestHColumnDescriptor.java @@ -114,12 +114,16 @@ public class TestHColumnDescriptor { public void testMobValuesInHColumnDescriptorShouldReadable() { boolean isMob = true; long threshold = 1000; + String policy = "weekly"; String isMobString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(isMob)), HColumnDescriptor.getUnit(HColumnDescriptor.IS_MOB)); String thresholdString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(threshold)), HColumnDescriptor.getUnit(HColumnDescriptor.MOB_THRESHOLD)); + String policyString = PrettyPrinter.format(Bytes.toStringBinary(Bytes.toBytes(policy)), + HColumnDescriptor.getUnit(HColumnDescriptor.MOB_COMPACT_PARTITION_POLICY)); assertEquals(String.valueOf(isMob), isMobString); assertEquals(String.valueOf(threshold), thresholdString); + assertEquals(String.valueOf(policy), policyString); } @Test diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index 678bea7c5bd..3b0b990f7e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Calendar; import java.util.Collection; import java.util.Date; import java.util.List; @@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.backup.HFileArchiver; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.compress.Compression; @@ -63,6 +65,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.mob.compactions.MobCompactor; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; @@ -81,6 +85,8 @@ import org.apache.hadoop.hbase.util.Threads; public final class MobUtils { private static final Log LOG = LogFactory.getLog(MobUtils.class); + private final static long WEEKLY_THRESHOLD_MULTIPLIER = 7; + private final static long MONTHLY_THRESHOLD_MULTIPLIER = 4 * WEEKLY_THRESHOLD_MULTIPLIER; private static final ThreadLocal LOCAL_FORMAT = new ThreadLocal() { @@ -122,6 +128,45 @@ public final class MobUtils { return LOCAL_FORMAT.get().parse(dateString); } + /** + * Get the first day of the input date's month + * @param calendar Calendar object + * @param date The date to find out its first day of that month + * @return The first day in the month + */ + public static Date getFirstDayOfMonth(final Calendar calendar, final Date date) { + + calendar.setTime(date); + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + calendar.set(Calendar.DAY_OF_MONTH, 1); + + Date firstDayInMonth = calendar.getTime(); + return firstDayInMonth; + } + + /** + * Get the first day of the input date's week + * @param calendar Calendar object + * @param date The date to find out its first day of that week + * @return The first day in the week + */ + public static Date getFirstDayOfWeek(final Calendar calendar, final Date date) { + + calendar.setTime(date); + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + calendar.setFirstDayOfWeek(Calendar.MONDAY); + calendar.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY); + + Date firstDayInWeek = calendar.getTime(); + return firstDayInWeek; + } + /** * Whether the current cell is a mob reference cell. * @param cell The current cell. @@ -247,8 +292,14 @@ public final class MobUtils { return; } - Date expireDate = new Date(current - timeToLive * 1000); - expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate()); + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(current - timeToLive * 1000); + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + + Date expireDate = calendar.getTime(); + LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!"); FileStatus[] stats = null; @@ -268,14 +319,13 @@ public final class MobUtils { for (FileStatus file : stats) { String fileName = file.getPath().getName(); try { - MobFileName mobFileName = null; - if (!HFileLink.isHFileLink(file.getPath())) { - mobFileName = MobFileName.create(fileName); - } else { + if (HFileLink.isHFileLink(file.getPath())) { HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath()); - mobFileName = MobFileName.create(hfileLink.getOriginPath().getName()); + fileName = hfileLink.getOriginPath().getName(); } - Date fileDate = parseDate(mobFileName.getDate()); + + Date fileDate = parseDate(MobFileName.getDateFromName(fileName)); + if (LOG.isDebugEnabled()) { LOG.debug("Checking file " + fileName); } @@ -471,10 +521,10 @@ public final class MobUtils { Compression.Algorithm compression, String startKey, CacheConfig cacheConfig, Encryption.Context cryptoContext) throws IOException { - MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString() - .replaceAll("-", "")); + MobFileName mobFileName = MobFileName.create(startKey, date, + UUID.randomUUID().toString().replaceAll("-", "")); return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, - cacheConfig, cryptoContext); + cacheConfig, cryptoContext); } /** @@ -527,8 +577,8 @@ public final class MobUtils { Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig, Encryption.Context cryptoContext) throws IOException { - MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString() - .replaceAll("-", "")); + MobFileName mobFileName = MobFileName.create(startKey, date, + UUID.randomUUID().toString().replaceAll("-", "")); return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, cacheConfig, cryptoContext); } @@ -710,7 +760,7 @@ public final class MobUtils { HColumnDescriptor hcd, ExecutorService pool, boolean allFiles, LockManager.MasterLock lock) throws IOException { String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY, - PartitionedMobCompactor.class.getName()); + PartitionedMobCompactor.class.getName()); // instantiate the mob compactor. MobCompactor compactor = null; try { @@ -741,7 +791,7 @@ public final class MobUtils { */ public static ExecutorService createMobCompactorThreadPool(Configuration conf) { int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX, - MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX); + MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX); if (maxThreads == 0) { maxThreads = 1; } @@ -854,4 +904,87 @@ public final class MobUtils { } return false; } + + /** + * fill out partition id based on compaction policy and date, threshold... + * @param id Partition id to be filled out + * @param firstDayOfCurrentMonth The first day in the current month + * @param firstDayOfCurrentWeek The first day in the current week + * @param dateStr Date string from the mob file + * @param policy Mob compaction policy + * @param calendar Calendar object + * @param threshold Mob compaciton threshold configured + * @return true if the file needs to be excluded from compaction + */ + public static boolean fillPartitionId(final CompactionPartitionId id, + final Date firstDayOfCurrentMonth, final Date firstDayOfCurrentWeek, final String dateStr, + final MobCompactPartitionPolicy policy, final Calendar calendar, final long threshold) { + + boolean skipCompcation = false; + id.setThreshold(threshold); + if (threshold <= 0) { + id.setDate(dateStr); + return skipCompcation; + } + + long finalThreshold; + Date date; + try { + date = MobUtils.parseDate(dateStr); + } catch (ParseException e) { + LOG.warn("Failed to parse date " + dateStr, e); + id.setDate(dateStr); + return true; + } + + /* The algorithm works as follows: + * For monthly policy: + * 1). If the file's date is in past months, apply 4 * 7 * threshold + * 2). If the file's date is in past weeks, apply 7 * threshold + * 3). If the file's date is in current week, exclude it from the compaction + * For weekly policy: + * 1). If the file's date is in past weeks, apply 7 * threshold + * 2). If the file's date in currently, apply threshold + * For daily policy: + * 1). apply threshold + */ + if (policy == MobCompactPartitionPolicy.MONTHLY) { + if (date.before(firstDayOfCurrentMonth)) { + // Check overflow + if (threshold < (Long.MAX_VALUE / MONTHLY_THRESHOLD_MULTIPLIER)) { + finalThreshold = MONTHLY_THRESHOLD_MULTIPLIER * threshold; + } else { + finalThreshold = Long.MAX_VALUE; + } + id.setThreshold(finalThreshold); + + // set to the date for the first day of that month + id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfMonth(calendar, date))); + return skipCompcation; + } + } + + if ((policy == MobCompactPartitionPolicy.MONTHLY) || + (policy == MobCompactPartitionPolicy.WEEKLY)) { + // Check if it needs to apply weekly multiplier + if (date.before(firstDayOfCurrentWeek)) { + // Check overflow + if (threshold < (Long.MAX_VALUE / WEEKLY_THRESHOLD_MULTIPLIER)) { + finalThreshold = WEEKLY_THRESHOLD_MULTIPLIER * threshold; + } else { + finalThreshold = Long.MAX_VALUE; + } + id.setThreshold(finalThreshold); + + id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfWeek(calendar, date))); + return skipCompcation; + } else if (policy == MobCompactPartitionPolicy.MONTHLY) { + skipCompcation = true; + } + } + + // Rest is daily + id.setDate(dateStr); + return skipCompcation; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java index 665b5e21599..33351491a2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** @@ -98,11 +99,15 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest { public static class CompactionPartitionId { private String startKey; private String date; + private String latestDate; + private long threshold; public CompactionPartitionId() { // initialize these fields to empty string this.startKey = ""; this.date = ""; + this.latestDate = ""; + this.threshold = 0; } public CompactionPartitionId(String startKey, String date) { @@ -111,6 +116,16 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest { } this.startKey = startKey; this.date = date; + this.latestDate = ""; + this.threshold = 0; + } + + public void setThreshold (final long threshold) { + this.threshold = threshold; + } + + public long getThreshold () { + return this.threshold; } public String getStartKey() { @@ -129,6 +144,14 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest { this.date = date; } + public String getLatestDate () { return this.latestDate; } + + public void updateLatestDate(final String latestDate) { + if (this.latestDate.compareTo(latestDate) < 0) { + this.latestDate = latestDate; + } + } + @Override public int hashCode() { int result = 17; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 6fb1107fb00..a0823d747fa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mob.compactions; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Calendar; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.HFileLink; @@ -151,6 +153,19 @@ public class PartitionedMobCompactor extends MobCompactor { final CompactionPartitionId id = new CompactionPartitionId(); int selectedFileCount = 0; int irrelevantFileCount = 0; + MobCompactPartitionPolicy policy = column.getMobCompactPartitionPolicy(); + + Calendar calendar = Calendar.getInstance(); + Date currentDate = new Date(); + Date firstDayOfCurrentMonth = null; + Date firstDayOfCurrentWeek = null; + + if (policy == MobCompactPartitionPolicy.MONTHLY) { + firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, currentDate); + firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate); + } else if (policy == MobCompactPartitionPolicy.WEEKLY) { + firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, currentDate); + } for (FileStatus file : candidates) { if (!file.isFile()) { @@ -170,23 +185,32 @@ public class PartitionedMobCompactor extends MobCompactor { } if (StoreFileInfo.isDelFile(linkedFile.getPath())) { allDelFiles.add(file); - } else if (allFiles || (linkedFile.getLen() < mergeableSize)) { - // add all files if allFiles is true, - // otherwise add the small files to the merge pool + } else { String fileName = linkedFile.getPath().getName(); - id.setStartKey(MobFileName.getStartKeyFromName(fileName)); - id.setDate(MobFileName.getDateFromName(fileName)); - CompactionPartition compactionPartition = filesToCompact.get(id); - if (compactionPartition == null) { - CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate()); - compactionPartition = new CompactionPartition(newId); + String date = MobFileName.getDateFromName(fileName); + boolean skipCompaction = MobUtils.fillPartitionId(id, firstDayOfCurrentMonth, + firstDayOfCurrentWeek, date, policy, calendar, mergeableSize); + if (allFiles || (!skipCompaction && (linkedFile.getLen() < id.getThreshold()))) { + // add all files if allFiles is true, + // otherwise add the small files to the merge pool + // filter out files which are not supposed to be compacted with the + // current policy - compactionPartition.addFile(file); - filesToCompact.put(newId, compactionPartition); - } else { - compactionPartition.addFile(file); + id.setStartKey(MobFileName.getStartKeyFromName(fileName)); + CompactionPartition compactionPartition = filesToCompact.get(id); + if (compactionPartition == null) { + CompactionPartitionId newId = new CompactionPartitionId(id.getStartKey(), id.getDate()); + compactionPartition = new CompactionPartition(newId); + compactionPartition.addFile(file); + filesToCompact.put(newId, compactionPartition); + newId.updateLatestDate(date); + } else { + compactionPartition.addFile(file); + compactionPartition.getPartitionId().updateLatestDate(date); + } + + selectedFileCount++; } - selectedFileCount++; } } @@ -437,7 +461,7 @@ public class PartitionedMobCompactor extends MobCompactor { try { try { writer = MobUtils - .createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath, + .createWriter(conf, fs, column, partition.getPartitionId().getLatestDate(), tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext); cleanupTmpMobFile = true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java index 0739271389c..a4d984d7f0a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java @@ -38,6 +38,8 @@ import java.util.concurrent.TimeUnit; import javax.crypto.spec.SecretKeySpec; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -59,6 +61,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -74,6 +77,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -86,6 +90,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.junit.AfterClass; import org.junit.Assert; @@ -95,6 +100,7 @@ import org.junit.experimental.categories.Category; @Category(LargeTests.class) public class TestMobCompactor { + private static final Log LOG = LogFactory.getLog(TestMobCompactor.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static Configuration conf = null; private TableName tableName; @@ -110,6 +116,43 @@ public class TestMobCompactor { private static final String family2 = "family2"; private static final String qf1 = "qualifier1"; private static final String qf2 = "qualifier2"; + + private static final long tsFor20150907Monday = 1441654904000L; + + private static final long tsFor20151120Sunday = 1448051213000L; + private static final long tsFor20151128Saturday = 1448734396000L; + private static final long tsFor20151130Monday = 1448874000000L; + private static final long tsFor20151201Tuesday = 1448960400000L; + private static final long tsFor20151205Saturday = 1449306000000L; + private static final long tsFor20151228Monday = 1451293200000L; + private static final long tsFor20151231Thursday = 1451552400000L; + private static final long tsFor20160101Friday = 1451638800000L; + private static final long tsFor20160103Sunday = 1451844796000L; + + private static final byte[] mobKey01 = Bytes.toBytes("r01"); + private static final byte[] mobKey02 = Bytes.toBytes("r02"); + private static final byte[] mobKey03 = Bytes.toBytes("r03"); + private static final byte[] mobKey04 = Bytes.toBytes("r04"); + private static final byte[] mobKey05 = Bytes.toBytes("r05"); + private static final byte[] mobKey06 = Bytes.toBytes("r05"); + private static final byte[] mobKey1 = Bytes.toBytes("r1"); + private static final byte[] mobKey2 = Bytes.toBytes("r2"); + private static final byte[] mobKey3 = Bytes.toBytes("r3"); + private static final byte[] mobKey4 = Bytes.toBytes("r4"); + private static final byte[] mobKey5 = Bytes.toBytes("r5"); + private static final byte[] mobKey6 = Bytes.toBytes("r6"); + private static final byte[] mobKey7 = Bytes.toBytes("r7"); + private static final byte[] mobKey8 = Bytes.toBytes("r8"); + private static final String mobValue0 = "mobValue00000000000000000000000000"; + private static final String mobValue1 = "mobValue00000111111111111111111111"; + private static final String mobValue2 = "mobValue00000222222222222222222222"; + private static final String mobValue3 = "mobValue00000333333333333333333333"; + private static final String mobValue4 = "mobValue00000444444444444444444444"; + private static final String mobValue5 = "mobValue00000666666666666666666666"; + private static final String mobValue6 = "mobValue00000777777777777777777777"; + private static final String mobValue7 = "mobValue00000888888888888888888888"; + private static final String mobValue8 = "mobValue00000888888888888888888899"; + private static byte[] KEYS = Bytes.toBytes("012"); private static int regionNum = KEYS.length; private static int delRowNum = 1; @@ -123,11 +166,12 @@ public class TestMobCompactor { TEST_UTIL.getConfiguration() .setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 5000); TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, - KeyProviderForTesting.class.getName()); + KeyProviderForTesting.class.getName()); TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 1); TEST_UTIL.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 100); + TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); TEST_UTIL.startMiniCluster(1); pool = createThreadPool(TEST_UTIL.getConfiguration()); conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool); @@ -159,6 +203,37 @@ public class TestMobCompactor { bufMut = conn.getBufferedMutator(tableName); } + // Set up for mob compaction policy testing + private void setUpForPolicyTest(String tableNameAsString, MobCompactPartitionPolicy type) + throws IOException { + tableName = TableName.valueOf(tableNameAsString); + hcd1 = new HColumnDescriptor(family1); + hcd1.setMobEnabled(true); + hcd1.setMobThreshold(10); + hcd1.setMobCompactPartitionPolicy(type); + desc = new HTableDescriptor(tableName); + desc.addFamily(hcd1); + admin.createTable(desc); + table = conn.getTable(tableName); + bufMut = conn.getBufferedMutator(tableName); + } + + // alter mob compaction policy + private void alterForPolicyTest(final MobCompactPartitionPolicy type) + throws Exception { + + hcd1.setMobCompactPartitionPolicy(type); + desc.modifyFamily(hcd1); + admin.modifyTable(tableName, desc); + Pair st; + + while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) { + LOG.debug(st.getFirst() + " regions left to update"); + Thread.sleep(40); + } + LOG.info("alter status finished"); + } + @Test(timeout = 300000) public void testMinorCompaction() throws Exception { resetConf(); @@ -219,6 +294,128 @@ public class TestMobCompactor { countFiles(tableName, false, family2)); } + private void waitUntilFilesShowup(final TableName table, final String famStr, final int num) + throws InterruptedException, IOException { + + HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(table).get(0); + + // Make sure that it is flushed. + FileSystem fs = r.getRegionFileSystem().getFileSystem(); + Path path = r.getRegionFileSystem().getStoreDir(famStr); + + + FileStatus[] fileList = fs.listStatus(path); + + while (fileList.length != num) { + Thread.sleep(50); + fileList = fs.listStatus(path); + } + } + + private int numberOfMobFiles(final TableName table, final String famStr) + throws IOException { + + HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(table).get(0); + + // Make sure that it is flushed. + FileSystem fs = r.getRegionFileSystem().getFileSystem(); + Path path = r.getRegionFileSystem().getStoreDir(famStr); + + FileStatus[] fileList = fs.listStatus(path); + + return fileList.length; + } + + @Test + public void testMinorCompactionWithWeeklyPolicy() throws Exception { + resetConf(); + int mergeSize = 5000; + // change the mob compaction merge size + conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + + commonPolicyTestLogic("testMinorCompactionWithWeeklyPolicy", + MobCompactPartitionPolicy.WEEKLY, false, 6, + new String[] { "20150907", "20151120", "20151128", "20151130", "20151205", "20160103" }, + true); + } + + @Test + public void testMajorCompactionWithWeeklyPolicy() throws Exception { + resetConf(); + + commonPolicyTestLogic("testMajorCompactionWithWeeklyPolicy", + MobCompactPartitionPolicy.WEEKLY, true, 5, + new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true); + } + + @Test + public void testMinorCompactionWithMonthlyPolicy() throws Exception { + resetConf(); + int mergeSize = 5000; + // change the mob compaction merge size + conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); + + commonPolicyTestLogic("testMinorCompactionWithMonthlyPolicy", + MobCompactPartitionPolicy.MONTHLY, false, 4, + new String[] { "20150907", "20151130", "20151231", "20160103" }, true); + } + + @Test + public void testMajorCompactionWithMonthlyPolicy() throws Exception { + resetConf(); + + commonPolicyTestLogic("testMajorCompactionWithMonthlyPolicy", + MobCompactPartitionPolicy.MONTHLY, true, 4, + new String[] {"20150907", "20151130", "20151231", "20160103"}, true); + } + + @Test + public void testMajorCompactionWithWeeklyFollowedByMonthly() throws Exception { + resetConf(); + + commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthly", + MobCompactPartitionPolicy.WEEKLY, true, 5, + new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true); + + commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthly", + MobCompactPartitionPolicy.MONTHLY, true, 4, + new String[] {"20150907", "20151128", "20151205", "20160103" }, false); + } + + @Test + public void testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly() throws Exception { + resetConf(); + + commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly", + MobCompactPartitionPolicy.WEEKLY, true, 5, + new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true); + + commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly", + MobCompactPartitionPolicy.MONTHLY, true, 4, + new String[] { "20150907", "20151128", "20151205", "20160103" }, false); + + commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly", + MobCompactPartitionPolicy.WEEKLY, true, 4, + new String[] { "20150907", "20151128", "20151205", "20160103" }, false); + } + + @Test + public void testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily() throws Exception { + resetConf(); + + commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily", + MobCompactPartitionPolicy.WEEKLY, true, 5, + new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true); + + commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily", + MobCompactPartitionPolicy.MONTHLY, true, 4, + new String[] { "20150907", "20151128", "20151205", "20160103" }, false); + + commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily", + MobCompactPartitionPolicy.DAILY, true, 4, + new String[] { "20150907", "20151128", "20151205", "20160103" }, false); + } + @Test(timeout = 300000) public void testCompactionWithHFileLink() throws IOException, InterruptedException { resetConf(); @@ -716,6 +913,65 @@ public class TestMobCompactor { admin.flush(tableName); } + private void loadDataForPartitionPolicy(Admin admin, BufferedMutator table, TableName tableName) + throws IOException { + + Put[] pArray = new Put[1000]; + + for (int i = 0; i < 1000; i ++) { + Put put0 = new Put(Bytes.toBytes("r0" + i)); + put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151130Monday, Bytes.toBytes(mobValue0)); + pArray[i] = put0; + } + loadData(admin, bufMut, tableName, pArray); + + Put put06 = new Put(mobKey06); + put06.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151128Saturday, Bytes.toBytes(mobValue0)); + + loadData(admin, bufMut, tableName, new Put[] { put06 }); + + Put put1 = new Put(mobKey1); + put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151201Tuesday, + Bytes.toBytes(mobValue1)); + loadData(admin, bufMut, tableName, new Put[] { put1 }); + + Put put2 = new Put(mobKey2); + put2.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151205Saturday, + Bytes.toBytes(mobValue2)); + loadData(admin, bufMut, tableName, new Put[] { put2 }); + + Put put3 = new Put(mobKey3); + put3.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151228Monday, + Bytes.toBytes(mobValue3)); + loadData(admin, bufMut, tableName, new Put[] { put3 }); + + Put put4 = new Put(mobKey4); + put4.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151231Thursday, + Bytes.toBytes(mobValue4)); + loadData(admin, bufMut, tableName, new Put[] { put4 }); + + Put put5 = new Put(mobKey5); + put5.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20160101Friday, + Bytes.toBytes(mobValue5)); + loadData(admin, bufMut, tableName, new Put[] { put5 }); + + Put put6 = new Put(mobKey6); + put6.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20160103Sunday, + Bytes.toBytes(mobValue6)); + loadData(admin, bufMut, tableName, new Put[] { put6 }); + + Put put7 = new Put(mobKey7); + put7.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20150907Monday, + Bytes.toBytes(mobValue7)); + loadData(admin, bufMut, tableName, new Put[] { put7 }); + + Put put8 = new Put(mobKey8); + put8.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151120Sunday, + Bytes.toBytes(mobValue8)); + loadData(admin, bufMut, tableName, new Put[] { put8 }); + } + + /** * delete the row, family and cell to create the del file */ @@ -833,4 +1089,127 @@ public class TestMobCompactor { conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); } -} + + /** + * Verify mob partition policy compaction values. + */ + private void verifyPolicyValues() throws Exception { + Get get = new Get(mobKey01); + Result result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue0))); + + get = new Get(mobKey02); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue0))); + + get = new Get(mobKey03); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue0))); + + get = new Get(mobKey04); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue0))); + + get = new Get(mobKey05); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue0))); + + get = new Get(mobKey06); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue0))); + + get = new Get(mobKey1); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue1))); + + get = new Get(mobKey2); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue2))); + + get = new Get(mobKey3); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue3))); + + get = new Get(mobKey4); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue4))); + + get = new Get(mobKey5); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue5))); + + get = new Get(mobKey6); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue6))); + + get = new Get(mobKey7); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue7))); + + get = new Get(mobKey8); + result = table.get(get); + assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), + Bytes.toBytes(mobValue8))); + } + + private void commonPolicyTestLogic (final String tableNameAsString, + final MobCompactPartitionPolicy pType, final boolean majorCompact, + final int expectedFileNumbers, final String[] expectedFileNames, + final boolean setupAndLoadData + ) throws Exception { + if (setupAndLoadData) { + setUpForPolicyTest(tableNameAsString, pType); + + loadDataForPartitionPolicy(admin, bufMut, tableName); + } else { + alterForPolicyTest(pType); + } + + if (majorCompact) { + admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB); + } else { + admin.compact(tableName, hcd1.getName(), CompactType.MOB); + } + + waitUntilMobCompactionFinished(tableName); + + // Run cleaner to make sure that files in archive directory are cleaned up + TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); + + //check the number of files + Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, family1); + FileStatus[] fileList = fs.listStatus(mobDirPath); + + assertTrue(fileList.length == expectedFileNumbers); + + // the file names are expected + ArrayList fileNames = new ArrayList<>(expectedFileNumbers); + for (FileStatus file : fileList) { + fileNames.add(MobFileName.getDateFromName(file.getPath().getName())); + } + int index = 0; + for (String fileName : expectedFileNames) { + index = fileNames.indexOf(fileName); + assertTrue(index >= 0); + fileNames.remove(index); + } + + // Check daily mob files are removed from the mobdir, and only weekly mob files are there. + // Also check that there is no data loss. + + verifyPolicyValues(); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java index c112289b486..0aa9426ba2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java @@ -19,13 +19,8 @@ package org.apache.hadoop.hbase.mob.compactions; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Random; -import java.util.UUID; +import java.text.ParseException; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; @@ -33,12 +28,15 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; import org.apache.hadoop.hbase.regionserver.*; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.client.Scan; @@ -63,9 +61,11 @@ import org.junit.experimental.categories.Category; @Category(LargeTests.class) public class TestPartitionedMobCompactor { + private static final Log LOG = LogFactory.getLog(TestPartitionedMobCompactor.class); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static String family = "family"; private final static String qf = "qf"; + private final long DAY_IN_MS = 1000 * 60 * 60 * 24; private HColumnDescriptor hcd = new HColumnDescriptor(family); private Configuration conf = TEST_UTIL.getConfiguration(); private CacheConfig cacheConf = new CacheConfig(conf); @@ -103,6 +103,109 @@ public class TestPartitionedMobCompactor { delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; } + @Test + public void testCompactionSelectAllFilesWeeklyPolicy() throws Exception { + String tableName = "testCompactionSelectAllFilesWeeklyPolicy"; + testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, + CompactionType.ALL_FILES, false, false, new Date(), MobCompactPartitionPolicy.WEEKLY, 1); + } + + @Test + public void testCompactionSelectPartFilesWeeklyPolicy() throws Exception { + String tableName = "testCompactionSelectPartFilesWeeklyPolicy"; + testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false, + new Date(), MobCompactPartitionPolicy.WEEKLY, 1); + } + + @Test + public void testCompactionSelectPartFilesWeeklyPolicyWithPastWeek() throws Exception { + String tableName = "testCompactionSelectPartFilesWeeklyPolicyWithPastWeek"; + Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); + testCompactionAtMergeSize(tableName, 700, CompactionType.PART_FILES, false, false, dateLastWeek, + MobCompactPartitionPolicy.WEEKLY, 7); + } + + @Test + public void testCompactionSelectAllFilesWeeklyPolicyWithPastWeek() throws Exception { + String tableName = "testCompactionSelectAllFilesWeeklyPolicyWithPastWeek"; + Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); + testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES, + false, false, dateLastWeek, MobCompactPartitionPolicy.WEEKLY, 7); + } + + @Test + public void testCompactionSelectAllFilesMonthlyPolicy() throws Exception { + String tableName = "testCompactionSelectAllFilesMonthlyPolicy"; + Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); + testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, + CompactionType.ALL_FILES, false, false, dateLastWeek, + MobCompactPartitionPolicy.MONTHLY, 7); + } + + @Test + public void testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy() throws Exception { + String tableName = "testCompactionSelectNoFilesWithinCurrentWeekMonthlyPolicy"; + testCompactionAtMergeSize(tableName, MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD, + CompactionType.PART_FILES, false, false, new Date(), MobCompactPartitionPolicy.MONTHLY, 1); + } + + @Test + public void testCompactionSelectPartFilesMonthlyPolicy() throws Exception { + String tableName = "testCompactionSelectPartFilesMonthlyPolicy"; + testCompactionAtMergeSize(tableName, 4000, CompactionType.PART_FILES, false, false, + new Date(), MobCompactPartitionPolicy.MONTHLY, 1); + } + + @Test + public void testCompactionSelectPartFilesMonthlyPolicyWithPastWeek() throws Exception { + String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastWeek"; + Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); + Calendar calendar = Calendar.getInstance(); + Date firstDayOfCurrentMonth = MobUtils.getFirstDayOfMonth(calendar, new Date()); + CompactionType type = CompactionType.PART_FILES; + long mergeSizeMultiFactor = 7; + + + // The dateLastWeek may not really be last week, suppose that it runs at 2/1/2017, it is going + // to be last month and the monthly policy is going to be applied here. + if (dateLastWeek.before(firstDayOfCurrentMonth)) { + type = CompactionType.ALL_FILES; + mergeSizeMultiFactor *= 4; + } + + testCompactionAtMergeSize(tableName, 700, type, false, false, dateLastWeek, + MobCompactPartitionPolicy.MONTHLY, mergeSizeMultiFactor); + } + + @Test + public void testCompactionSelectAllFilesMonthlyPolicyWithPastWeek() throws Exception { + String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastWeek"; + Date dateLastWeek = new Date(System.currentTimeMillis() - (7 * DAY_IN_MS)); + + testCompactionAtMergeSize(tableName, 3000, CompactionType.ALL_FILES, + false, false, dateLastWeek, MobCompactPartitionPolicy.MONTHLY, 7); + } + + @Test + public void testCompactionSelectPartFilesMonthlyPolicyWithPastMonth() throws Exception { + String tableName = "testCompactionSelectPartFilesMonthlyPolicyWithPastMonth"; + + // back 5 weeks, it is going to be a past month + Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS)); + testCompactionAtMergeSize(tableName, 200, CompactionType.PART_FILES, false, false, dateLastMonth, + MobCompactPartitionPolicy.MONTHLY, 28); + } + + @Test + public void testCompactionSelectAllFilesMonthlyPolicyWithPastMonth() throws Exception { + String tableName = "testCompactionSelectAllFilesMonthlyPolicyWithPastMonth"; + + // back 5 weeks, it is going to be a past month + Date dateLastMonth = new Date(System.currentTimeMillis() - (7 * 5 * DAY_IN_MS)); + testCompactionAtMergeSize(tableName, 750, CompactionType.ALL_FILES, + false, false, dateLastMonth, MobCompactPartitionPolicy.MONTHLY, 28); + } + @Test public void testCompactionSelectWithAllFiles() throws Exception { String tableName = "testCompactionSelectWithAllFiles"; @@ -121,7 +224,6 @@ public class TestPartitionedMobCompactor { CompactionType.PART_FILES, false); } - @Test public void testCompactionSelectWithPartFiles() throws Exception { String tableName = "testCompactionSelectWithPartFiles"; @@ -144,34 +246,76 @@ public class TestPartitionedMobCompactor { final long mergeSize, final CompactionType type, final boolean isForceAllFiles, final boolean createDelFiles) throws Exception { + Date date = new Date(); + testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date); + } + + private void testCompactionAtMergeSize(final String tableName, + final long mergeSize, final CompactionType type, final boolean isForceAllFiles, + final boolean createDelFiles, final Date date) + throws Exception { + testCompactionAtMergeSize(tableName, mergeSize, type, isForceAllFiles, createDelFiles, date, + MobCompactPartitionPolicy.DAILY, 1); + } + + private void testCompactionAtMergeSize(final String tableName, + final long mergeSize, final CompactionType type, final boolean isForceAllFiles, + final boolean createDelFiles, final Date date, final MobCompactPartitionPolicy policy, + final long mergeSizeMultiFactor) + throws Exception { resetConf(); init(tableName); int count = 10; // create 10 mob files. - createStoreFiles(basePath, family, qf, count, Type.Put); + createStoreFiles(basePath, family, qf, count, Type.Put, date); if (createDelFiles) { // create 10 del files - createStoreFiles(basePath, family, qf, count, Type.Delete); + createStoreFiles(basePath, family, qf, count, Type.Delete, date); } + Calendar calendar = Calendar.getInstance(); + Date firstDayOfCurrentWeek = MobUtils.getFirstDayOfWeek(calendar, new Date()); + listFiles(); List expectedStartKeys = new ArrayList<>(); for(FileStatus file : mobFiles) { - if(file.getLen() < mergeSize) { + if(file.getLen() < mergeSize * mergeSizeMultiFactor) { String fileName = file.getPath().getName(); String startKey = fileName.substring(0, 32); + // If the policy is monthly and files are in current week, they will be skipped + // in minor compcation. + boolean skipCompaction = false; + if (policy == MobCompactPartitionPolicy.MONTHLY) { + String fileDateStr = MobFileName.getDateFromName(fileName); + Date fileDate; + try { + fileDate = MobUtils.parseDate(fileDateStr); + } catch (ParseException e) { + LOG.warn("Failed to parse date " + fileDateStr, e); + fileDate = new Date(); + } + if (!fileDate.before(firstDayOfCurrentWeek)) { + skipCompaction = true; + } + } + // If it is not an major mob compaction and del files are there, // these mob files wont be compacted. - if (isForceAllFiles || !createDelFiles) { + if (isForceAllFiles || (!createDelFiles && !skipCompaction)) { expectedStartKeys.add(startKey); } } } + + // Set the policy + this.hcd.setMobCompactPartitionPolicy(policy); // set the mob compaction mergeable threshold conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); testSelectFiles(tableName, type, isForceAllFiles, expectedStartKeys); + // go back to the default daily policy + this.hcd.setMobCompactPartitionPolicy(MobCompactPartitionPolicy.DAILY); } @Test @@ -205,7 +349,7 @@ public class TestPartitionedMobCompactor { try { int count = 2; // create 2 mob files. - createStoreFiles(basePath, family, qf, count, Type.Put, true); + createStoreFiles(basePath, family, qf, count, Type.Put, true, new Date()); listFiles(); TableName tName = TableName.valueOf(tableName); @@ -243,9 +387,9 @@ public class TestPartitionedMobCompactor { resetConf(); init(tableName); // create 20 mob files. - createStoreFiles(basePath, family, qf, 20, Type.Put); + createStoreFiles(basePath, family, qf, 20, Type.Put, new Date()); // create 13 del files - createStoreFiles(basePath, family, qf, 13, Type.Delete); + createStoreFiles(basePath, family, qf, 13, Type.Delete, new Date()); listFiles(); // set the max del file count @@ -366,12 +510,12 @@ public class TestPartitionedMobCompactor { * @type the key type */ private void createStoreFiles(Path basePath, String family, String qualifier, int count, - Type type) throws IOException { - createStoreFiles(basePath, family, qualifier, count, type, false); + Type type, final Date date) throws IOException { + createStoreFiles(basePath, family, qualifier, count, type, false, date); } private void createStoreFiles(Path basePath, String family, String qualifier, int count, - Type type, boolean sameStartKey) throws IOException { + Type type, boolean sameStartKey, final Date date) throws IOException { HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); String startKey = "row_"; MobFileName mobFileName = null; @@ -386,12 +530,10 @@ public class TestPartitionedMobCompactor { startRow = Bytes.toBytes(startKey + i); } if(type.equals(Type.Delete)) { - mobFileName = MobFileName.create(startRow, MobUtils.formatDate( - new Date()), delSuffix); + mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), delSuffix); } if(type.equals(Type.Put)){ - mobFileName = MobFileName.create(startRow, MobUtils.formatDate( - new Date()), mobSuffix); + mobFileName = MobFileName.create(startRow, MobUtils.formatDate(date), mobSuffix); } StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs) .withFileContext(meta).withFilePath(new Path(basePath, mobFileName.getFileName())).build(); diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 94b9d3eb29f..a1a9336644c 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -878,6 +878,15 @@ module Hbase storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase family.setStoragePolicy(storage_policy) end + if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY) + mob_partition_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::MOB_COMPACT_PARTITION_POLICY).upcase + unless org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.include?(mob_partition_policy) + raise(ArgumentError, "MOB_COMPACT_PARTITION_POLICY #{mob_partition_policy} is not supported. Use one of " + org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.constants.join(" ")) + else + family.setMobCompactPartitionPolicy(org.apache.hadoop.hbase.client.MobCompactPartitionPolicy.valueOf(mob_partition_policy)) + end + end + set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA] set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION] diff --git a/hbase-shell/src/main/ruby/shell/commands/create.rb b/hbase-shell/src/main/ruby/shell/commands/create.rb index ee14455a29d..4812048062b 100644 --- a/hbase-shell/src/main/ruby/shell/commands/create.rb +++ b/hbase-shell/src/main/ruby/shell/commands/create.rb @@ -38,6 +38,7 @@ Create a table with namespace=default and table qualifier=t1 hbase> create 't1', 'f1', 'f2', 'f3' hbase> create 't1', {NAME => 'f1', VERSIONS => 1, TTL => 2592000, BLOCKCACHE => true} hbase> create 't1', {NAME => 'f1', CONFIGURATION => {'hbase.hstore.blockingStoreFiles' => '10'}} + hbase> create 't1', {NAME => 'f1', IS_MOB => true, MOB_THRESHOLD => 1000000, MOB_COMPACT_PARTITION_POLICY => 'weekly'} Table configuration options can be put at the end. Examples: