diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java new file mode 100644 index 00000000000..d1b375181cc --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.compaction.MajorCompactorTTL; +import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; +import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; +import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; + +/** + * This script takes an rsgroup as argument and compacts part/all of regions of that table + * based on the table's TTL. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class RSGroupMajorCompactionTTL extends MajorCompactorTTL { + + private static final Logger LOG = LoggerFactory.getLogger(RSGroupMajorCompactionTTL.class); + + @VisibleForTesting + RSGroupMajorCompactionTTL() { + super(); + } + + public int compactTTLRegionsOnGroup(Configuration conf, String rsgroup, int concurrency, + long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait) + throws Exception { + + Connection conn = ConnectionFactory.createConnection(conf); + RSGroupAdmin rsGroupAdmin = new RSGroupAdminClient(conn); + + RSGroupInfo rsGroupInfo = rsGroupAdmin.getRSGroupInfo(rsgroup); + if (rsGroupInfo == null) { + LOG.error("Invalid rsgroup specified: " + rsgroup); + throw new IllegalArgumentException("Invalid rsgroup specified: " + rsgroup); + } + + for (TableName tableName : rsGroupInfo.getTables()) { + int status = compactRegionsTTLOnTable(conf, tableName.getNameAsString(), concurrency, sleep, + numServers, numRegions, dryRun, skipWait); + if (status != 0) { + LOG.error("Failed to compact table: " + tableName); + return status; + } + } + return 0; + } + + protected Options getOptions() { + Options options = getCommonOptions(); + + options.addOption( + Option.builder("rsgroup") + .required() + .desc("Tables of rsgroup to be compacted") + .hasArg() + .build() + ); + + return options; + } + + @Override + public int run(String[] args) throws Exception { + Options options = getOptions(); + + final CommandLineParser cmdLineParser = new DefaultParser(); + CommandLine commandLine; + try { + commandLine = cmdLineParser.parse(options, args); + } catch (ParseException parseException) { + System.out.println( + "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: " + + parseException); + printUsage(options); + return -1; + } + if (commandLine == null) { + System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args)); + printUsage(options); + return -1; + } + + String rsgroup = commandLine.getOptionValue("rsgroup"); + int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1")); + int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1")); + int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1")); + long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000))); + boolean dryRun = commandLine.hasOption("dryRun"); + boolean skipWait = commandLine.hasOption("skipWait"); + Configuration conf = getConf(); + + return compactTTLRegionsOnGroup(conf, rsgroup, concurrency, sleep, numServers, numRegions, + dryRun, skipWait); + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(HBaseConfiguration.create(), new RSGroupMajorCompactionTTL(), args); + } +} diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java new file mode 100644 index 00000000000..9b3dcbbc39e --- /dev/null +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.util.compaction.TestMajorCompactorTTL; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +public class TestRSGroupMajorCompactionTTL extends TestMajorCompactorTTL { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRSGroupMajorCompactionTTL.class); + + private final static int NUM_SLAVES_BASE = 6; + + @Before + @Override + public void setUp() throws Exception { + utility = new HBaseTestingUtility(); + Configuration conf = utility.getConfiguration(); + conf.set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class.getName()); + conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, RSGroupAdminEndpoint.class.getName()); + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_SLAVES_BASE); + conf.setInt("hbase.hfile.compaction.discharger.interval", 10); + utility.startMiniCluster(NUM_SLAVES_BASE); + MiniHBaseCluster cluster = utility.getHBaseCluster(); + final HMaster master = cluster.getMaster(); + + //wait for balancer to come online + utility.waitFor(60000, new Waiter.Predicate() { + @Override + public boolean evaluate() { + return master.isInitialized() && + ((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline(); + } + }); + admin = utility.getAdmin(); + } + + @After + @Override + public void tearDown() throws Exception { + utility.shutdownMiniCluster(); + } + + @Test + public void testCompactingTables() throws Exception { + List tableNames = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + tableNames.add(createTable(name.getMethodName() + "___" + i)); + } + + // Delay a bit, so we can set the table TTL to 5 seconds + Thread.sleep(10 * 1000); + + for (TableName tableName : tableNames) { + int numberOfRegions = admin.getRegions(tableName).size(); + int numHFiles = utility.getNumHFiles(tableName, FAMILY); + // we should have a table with more store files than we would before we major compacted. + assertTrue(numberOfRegions < numHFiles); + modifyTTL(tableName); + } + + RSGroupMajorCompactionTTL compactor = new RSGroupMajorCompactionTTL(); + compactor.compactTTLRegionsOnGroup(utility.getConfiguration(), + RSGroupInfo.DEFAULT_GROUP, 1, 200, -1, -1, false, false); + + for (TableName tableName : tableNames) { + int numberOfRegions = admin.getRegions(tableName).size(); + int numHFiles = utility.getNumHFiles(tableName, FAMILY); + assertEquals(numberOfRegions, numHFiles); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java index 51b2b9d83d3..cf5fcd95128 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java @@ -44,25 +44,27 @@ class MajorCompactionRequest { private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class); - private final Configuration configuration; - private final RegionInfo region; + protected final Configuration configuration; + protected final RegionInfo region; private Set stores; - private final long timestamp; + + MajorCompactionRequest(Configuration configuration, RegionInfo region) { + this.configuration = configuration; + this.region = region; + } @VisibleForTesting MajorCompactionRequest(Configuration configuration, RegionInfo region, - Set stores, long timestamp) { - this.configuration = configuration; - this.region = region; + Set stores) { + this(configuration, region); this.stores = stores; - this.timestamp = timestamp; } static Optional newRequest(Configuration configuration, RegionInfo info, Set stores, long timestamp) throws IOException { MajorCompactionRequest request = - new MajorCompactionRequest(configuration, info, stores, timestamp); - return request.createRequest(configuration, stores); + new MajorCompactionRequest(configuration, info, stores); + return request.createRequest(configuration, stores, timestamp); } RegionInfo getRegion() { @@ -79,67 +81,80 @@ class MajorCompactionRequest { @VisibleForTesting Optional createRequest(Configuration configuration, - Set stores) throws IOException { - Set familiesToCompact = getStoresRequiringCompaction(stores); + Set stores, long timestamp) throws IOException { + Set familiesToCompact = getStoresRequiringCompaction(stores, timestamp); MajorCompactionRequest request = null; if (!familiesToCompact.isEmpty()) { - request = new MajorCompactionRequest(configuration, region, familiesToCompact, timestamp); + request = new MajorCompactionRequest(configuration, region, familiesToCompact); } return Optional.ofNullable(request); } - Set getStoresRequiringCompaction(Set requestedStores) throws IOException { + Set getStoresRequiringCompaction(Set requestedStores, long timestamp) + throws IOException { try(Connection connection = getConnection(configuration)) { HRegionFileSystem fileSystem = getFileSystem(connection); Set familiesToCompact = Sets.newHashSet(); for (String family : requestedStores) { - // do we have any store files? - Collection storeFiles = fileSystem.getStoreFiles(family); - if (storeFiles == null) { - LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem - .getRegionInfo().getEncodedName(), " has no store files"); - continue; - } - // check for reference files - if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family)) { + if (shouldCFBeCompacted(fileSystem, family, timestamp)) { familiesToCompact.add(family); - LOG.info("Including store: " + family + " with: " + storeFiles.size() - + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName()); - continue; - } - // check store file timestamps - boolean includeStore = false; - for (StoreFileInfo storeFile : storeFiles) { - if (storeFile.getModificationTime() < timestamp) { - LOG.info("Including store: " + family + " with: " + storeFiles.size() - + " files for compaction for region: " - + fileSystem.getRegionInfo().getEncodedName()); - familiesToCompact.add(family); - includeStore = true; - break; - } - } - if (!includeStore) { - LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem - .getRegionInfo().getEncodedName(), " already compacted"); } } return familiesToCompact; } } + boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts) + throws IOException { + + // do we have any store files? + Collection storeFiles = fileSystem.getStoreFiles(family); + if (storeFiles == null) { + LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem + .getRegionInfo().getEncodedName(), " has no store files"); + return false; + } + // check for reference files + if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family, ts)) { + LOG.info("Including store: " + family + " with: " + storeFiles.size() + + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName()); + return true; + } + // check store file timestamps + boolean includeStore = this.shouldIncludeStore(fileSystem, family, storeFiles, ts); + if (!includeStore) { + LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem + .getRegionInfo().getEncodedName() + " already compacted"); + } + return includeStore; + } + + protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family, + Collection storeFiles, long ts) throws IOException { + + for (StoreFileInfo storeFile : storeFiles) { + if (storeFile.getModificationTime() < ts) { + LOG.info("Including store: " + family + " with: " + storeFiles.size() + + " files for compaction for region: " + + fileSystem.getRegionInfo().getEncodedName()); + return true; + } + } + return false; + } + @VisibleForTesting Connection getConnection(Configuration configuration) throws IOException { return ConnectionFactory.createConnection(configuration); } - private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family) + protected boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family, long ts) throws IOException { List referenceFiles = getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family)); for (Path referenceFile : referenceFiles) { FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile); - if (status.getModificationTime() < timestamp) { + if (status.getModificationTime() < ts) { LOG.info("Including store: " + family + " for compaction for region: " + fileSystem .getRegionInfo().getEncodedName() + " (reference store files)"); return true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java new file mode 100644 index 00000000000..4d2b341ecd5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.compaction; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; + +/** + * This request helps determine if a region has to be compacted based on table's TTL. + */ +@InterfaceAudience.Private +public class MajorCompactionTTLRequest extends MajorCompactionRequest { + + private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionTTLRequest.class); + + MajorCompactionTTLRequest(Configuration conf, RegionInfo region) { + super(conf, region); + } + + static Optional newRequest(Configuration conf, RegionInfo info, + TableDescriptor htd) throws IOException { + MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(conf, info); + return request.createRequest(conf, htd); + } + + @VisibleForTesting + private Optional createRequest(Configuration conf, TableDescriptor htd) + throws IOException { + Map familiesToCompact = getStoresRequiringCompaction(htd); + MajorCompactionRequest request = null; + if (!familiesToCompact.isEmpty()) { + LOG.debug("Compaction families for region: " + region + " CF: " + familiesToCompact.keySet()); + request = new MajorCompactionTTLRequest(conf, region); + } + return Optional.ofNullable(request); + } + + Map getStoresRequiringCompaction(TableDescriptor htd) throws IOException { + try(Connection connection = getConnection(configuration)) { + HRegionFileSystem fileSystem = getFileSystem(connection); + Map familyTTLMap = Maps.newHashMap(); + for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) { + long ts = getColFamilyCutoffTime(descriptor); + // If the table's TTL is forever, lets not compact any of the regions. + if (ts > 0 && shouldCFBeCompacted(fileSystem, descriptor.getNameAsString(), ts)) { + familyTTLMap.put(descriptor.getNameAsString(), ts); + } + } + return familyTTLMap; + } + } + + // If the CF has no TTL, return -1, else return the current time - TTL. + private long getColFamilyCutoffTime(ColumnFamilyDescriptor colDesc) { + if (colDesc.getTimeToLive() == HConstants.FOREVER) { + return -1; + } + return System.currentTimeMillis() - (colDesc.getTimeToLive() * 1000L); + } + + @Override + protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family, + Collection storeFiles, long ts) throws IOException { + + for (StoreFileInfo storeFile : storeFiles) { + // Lets only compact when all files are older than TTL + if (storeFile.getModificationTime() >= ts) { + LOG.info("There is atleast one file in store: " + family + " file: " + storeFile.getPath() + + " with timestamp " + storeFile.getModificationTime() + + " for region: " + fileSystem.getRegionInfo().getEncodedName() + + " older than TTL: " + ts); + return false; + } + } + return true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java index f7cac226e21..151b49286c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.util.compaction; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -26,7 +29,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; @@ -38,15 +43,19 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Joiner; import org.apache.hbase.thirdparty.com.google.common.base.Splitter; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; @@ -57,18 +66,24 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) -public class MajorCompactor { +public class MajorCompactor extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class); - private static final Set ERRORS = ConcurrentHashMap.newKeySet(); + protected static final Set ERRORS = ConcurrentHashMap.newKeySet(); - private final ClusterCompactionQueues clusterCompactionQueues; - private final long timestamp; - private final Set storesToCompact; - private final ExecutorService executor; - private final long sleepForMs; - private final Connection connection; - private final TableName tableName; + protected ClusterCompactionQueues clusterCompactionQueues; + private long timestamp; + protected Set storesToCompact; + protected ExecutorService executor; + protected long sleepForMs; + protected Connection connection; + protected TableName tableName; + private int numServers = -1; + private int numRegions = -1; + private boolean skipWait = false; + + MajorCompactor() { + } public MajorCompactor(Configuration conf, TableName tableName, Set storesToCompact, int concurrency, long timestamp, long sleepForMs) throws IOException { @@ -149,15 +164,83 @@ public class MajorCompactor { } LOG.info( "Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact); + + Map> snRegionMap = getServerRegionsMap(); + /* + * If numservers is specified, stop inspecting regions beyond the numservers, it will serve + * to throttle and won't end up scanning all the regions in the event there are not many + * regions to compact based on the criteria. + */ + for (ServerName sn : getServersToCompact(snRegionMap.keySet())) { + List regions = snRegionMap.get(sn); + LOG.debug("Table: " + tableName + " Server: " + sn + " No of regions: " + regions.size()); + + /* + * If the tool is run periodically, then we could shuffle the regions and provide + * some random order to select regions. Helps if numregions is specified. + */ + Collections.shuffle(regions); + int regionsToCompact = numRegions; + for (RegionInfo hri : regions) { + if (numRegions > 0 && regionsToCompact <= 0) { + LOG.debug("Reached region limit for server: " + sn); + break; + } + + Optional request = getMajorCompactionRequest(hri); + if (request.isPresent()) { + LOG.debug("Adding region " + hri + " to queue " + sn + " for compaction"); + clusterCompactionQueues.addToCompactionQueue(sn, request.get()); + if (numRegions > 0) { + regionsToCompact--; + } + } + } + } + } + + protected Optional getMajorCompactionRequest(RegionInfo hri) + throws IOException { + return MajorCompactionRequest.newRequest(connection.getConfiguration(), hri, storesToCompact, + timestamp); + } + + private Collection getServersToCompact(Set snSet) { + if(numServers < 0 || snSet.size() <= numServers) { + return snSet; + + } else { + List snList = Lists.newArrayList(snSet); + Collections.shuffle(snList); + return snList.subList(0, numServers); + } + } + + private Map> getServerRegionsMap() throws IOException { + Map> snRegionMap = Maps.newHashMap(); List regionLocations = connection.getRegionLocator(tableName).getAllRegionLocations(); - for (HRegionLocation location : regionLocations) { - Optional request = MajorCompactionRequest - .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact, - timestamp); - request.ifPresent(majorCompactionRequest -> clusterCompactionQueues - .addToCompactionQueue(location.getServerName(), majorCompactionRequest)); + for (HRegionLocation regionLocation : regionLocations) { + ServerName sn = regionLocation.getServerName(); + RegionInfo hri = regionLocation.getRegion(); + if (!snRegionMap.containsKey(sn)) { + snRegionMap.put(sn, Lists.newArrayList()); + } + snRegionMap.get(sn).add(hri); } + return snRegionMap; + } + + public void setNumServers(int numServers) { + this.numServers = numServers; + } + + public void setNumRegions(int numRegions) { + this.numRegions = numRegions; + } + + public void setSkipWait(boolean skipWait) { + this.skipWait = skipWait; } class Compact implements Runnable { @@ -190,52 +273,68 @@ public class MajorCompactor { try { // only make the request if the region is not already major compacting if (!isCompacting(request)) { - Set stores = request.getStoresRequiringCompaction(storesToCompact); + Set stores = getStoresRequiringCompaction(request); if (!stores.isEmpty()) { request.setStores(stores); for (String store : request.getStores()) { - admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), - Bytes.toBytes(store)); + compactRegionOnServer(request, admin, store); } } } - while (isCompacting(request)) { - Thread.sleep(sleepForMs); - LOG.debug("Waiting for compaction to complete for region: " + request.getRegion() - .getEncodedName()); + + /* + * In some scenarios like compacting TTLed regions, the compaction itself won't take time + * and hence we can skip the wait. An external tool will also be triggered frequently and + * the next run can identify region movements and compact them. + */ + if (!skipWait) { + while (isCompacting(request)) { + Thread.sleep(sleepForMs); + LOG.debug("Waiting for compaction to complete for region: " + request.getRegion() + .getEncodedName()); + } } } finally { - // Make sure to wait for the CompactedFileDischarger chore to do its work - int waitForArchive = connection.getConfiguration() - .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); - Thread.sleep(waitForArchive); - // check if compaction completed successfully, otherwise put that request back in the - // proper queue - Set storesRequiringCompaction = - request.getStoresRequiringCompaction(storesToCompact); - if (!storesRequiringCompaction.isEmpty()) { - // this happens, when a region server is marked as dead, flushes a store file and - // the new regionserver doesn't pick it up because its accounted for in the WAL replay, - // thus you have more store files on the filesystem than the regionserver knows about. - boolean regionHasNotMoved = connection.getRegionLocator(tableName) - .getRegionLocation(request.getRegion().getStartKey()).getServerName() - .equals(serverName); - if (regionHasNotMoved) { - LOG.error("Not all store files were compacted, this may be due to the regionserver not " - + "being aware of all store files. Will not reattempt compacting, " + request); - ERRORS.add(request); + if (!skipWait) { + // Make sure to wait for the CompactedFileDischarger chore to do its work + int waitForArchive = connection.getConfiguration() + .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); + Thread.sleep(waitForArchive); + // check if compaction completed successfully, otherwise put that request back in the + // proper queue + Set storesRequiringCompaction = getStoresRequiringCompaction(request); + if (!storesRequiringCompaction.isEmpty()) { + // this happens, when a region server is marked as dead, flushes a store file and + // the new regionserver doesn't pick it up because its accounted for in the WAL replay, + // thus you have more store files on the filesystem than the regionserver knows about. + boolean regionHasNotMoved = connection.getRegionLocator(tableName) + .getRegionLocation(request.getRegion().getStartKey()).getServerName() + .equals(serverName); + if (regionHasNotMoved) { + LOG.error( + "Not all store files were compacted, this may be due to the regionserver not " + + "being aware of all store files. Will not reattempt compacting, " + + request); + ERRORS.add(request); + } else { + request.setStores(storesRequiringCompaction); + clusterCompactionQueues.addToCompactionQueue(serverName, request); + LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction + + " region: " + request.getRegion().getEncodedName()); + } } else { - request.setStores(storesRequiringCompaction); - clusterCompactionQueues.addToCompactionQueue(serverName, request); - LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction - + " region: " + request.getRegion().getEncodedName()); + LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName() + + " -> cf(s): " + request.getStores()); } - } else { - LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName() - + " -> cf(s): " + request.getStores()); } } } + + private void compactRegionOnServer(MajorCompactionRequest request, Admin admin, String store) + throws IOException { + admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), + Bytes.toBytes(store)); + } } private boolean isCompacting(MajorCompactionRequest request) throws Exception { @@ -263,22 +362,14 @@ public class MajorCompactor { } } - public static void main(String[] args) throws Exception { + protected Set getStoresRequiringCompaction(MajorCompactionRequest request) + throws IOException { + return request.getStoresRequiringCompaction(storesToCompact, timestamp); + } + + protected Options getCommonOptions() { Options options = new Options(); - options.addOption( - Option.builder("table") - .required() - .desc("table name") - .hasArg() - .build() - ); - options.addOption( - Option.builder("cf") - .optionalArg(true) - .desc("column families: comma separated eg: a,b,c") - .hasArg() - .build() - ); + options.addOption( Option.builder("servers") .required() @@ -327,6 +418,49 @@ public class MajorCompactor { .build() ); + options.addOption( + Option.builder("skipWait") + .desc("Skip waiting after triggering compaction.") + .hasArg(false) + .build() + ); + + options.addOption( + Option.builder("numservers") + .optionalArg(true) + .desc("Number of servers to compact in this run, defaults to all") + .hasArg() + .build() + ); + + options.addOption( + Option.builder("numregions") + .optionalArg(true) + .desc("Number of regions to compact per server, defaults to all") + .hasArg() + .build() + ); + return options; + } + + @Override + public int run(String[] args) throws Exception { + Options options = getCommonOptions(); + options.addOption( + Option.builder("table") + .required() + .desc("table name") + .hasArg() + .build() + ); + options.addOption( + Option.builder("cf") + .optionalArg(true) + .desc("column families: comma separated eg: a,b,c") + .hasArg() + .build() + ); + final CommandLineParser cmdLineParser = new DefaultParser(); CommandLine commandLine = null; try { @@ -336,12 +470,12 @@ public class MajorCompactor { "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: " + parseException); printUsage(options); - return; + return -1; } if (commandLine == null) { System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args)); printUsage(options); - return; + return -1; } String tableName = commandLine.getOptionValue("table"); String cf = commandLine.getOptionValue("cf", null); @@ -350,8 +484,7 @@ public class MajorCompactor { Iterables.addAll(families, Splitter.on(",").split(cf)); } - - Configuration configuration = HBaseConfiguration.create(); + Configuration configuration = getConf(); int concurrency = Integer.parseInt(commandLine.getOptionValue("servers")); long minModTime = Long.parseLong( commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis()))); @@ -360,25 +493,35 @@ public class MajorCompactor { String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR)); long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000))); + int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1")); + int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1")); + configuration.set(HConstants.HBASE_DIR, rootDir); configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum); MajorCompactor compactor = new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency, minModTime, sleep); + compactor.setNumServers(numServers); + compactor.setNumRegions(numRegions); + compactor.setSkipWait(commandLine.hasOption("skipWait")); compactor.initializeWorkQueues(); if (!commandLine.hasOption("dryRun")) { compactor.compactAllRegions(); } compactor.shutdown(); + return ERRORS.size(); } - private static void printUsage(final Options options) { + protected static void printUsage(final Options options) { String header = "\nUsage instructions\n\n"; String footer = "\n"; HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true); } + public static void main(String[] args) throws Exception { + ToolRunner.run(HBaseConfiguration.create(), new MajorCompactor(), args); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java new file mode 100644 index 00000000000..321cbe03386 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.compaction; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; +import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; +import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; +import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; + +/** + * This tool compacts a table's regions that are beyond it's TTL. It helps to save disk space and + * regions become empty as a result of compaction. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class MajorCompactorTTL extends MajorCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(MajorCompactorTTL .class); + + private TableDescriptor htd; + + @VisibleForTesting + public MajorCompactorTTL(Configuration conf, TableDescriptor htd, int concurrency, + long sleepForMs) throws IOException { + this.connection = ConnectionFactory.createConnection(conf); + this.htd = htd; + this.tableName = htd.getTableName(); + this.storesToCompact = Sets.newHashSet(); // Empty set so all stores will be compacted + this.sleepForMs = sleepForMs; + this.executor = Executors.newFixedThreadPool(concurrency); + this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency); + } + + protected MajorCompactorTTL() { + super(); + } + + @Override + protected Optional getMajorCompactionRequest(RegionInfo hri) + throws IOException { + return MajorCompactionTTLRequest.newRequest(connection.getConfiguration(), hri, htd); + } + + @Override + protected Set getStoresRequiringCompaction(MajorCompactionRequest request) + throws IOException { + return ((MajorCompactionTTLRequest)request).getStoresRequiringCompaction(htd).keySet(); + } + + public int compactRegionsTTLOnTable(Configuration conf, String table, int concurrency, + long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait) + throws Exception { + + Connection conn = ConnectionFactory.createConnection(conf); + TableName tableName = TableName.valueOf(table); + + TableDescriptor htd = conn.getAdmin().getDescriptor(tableName); + if (!doesAnyColFamilyHaveTTL(htd)) { + LOG.info("No TTL present for CF of table: " + tableName + ", skipping compaction"); + return 0; + } + + LOG.info("Major compacting table " + tableName + " based on TTL"); + MajorCompactor compactor = new MajorCompactorTTL(conf, htd, concurrency, sleep); + compactor.setNumServers(numServers); + compactor.setNumRegions(numRegions); + compactor.setSkipWait(skipWait); + + compactor.initializeWorkQueues(); + if (!dryRun) { + compactor.compactAllRegions(); + } + compactor.shutdown(); + return ERRORS.size(); + } + + private boolean doesAnyColFamilyHaveTTL(TableDescriptor htd) { + for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) { + if (descriptor.getTimeToLive() != HConstants.FOREVER) { + return true; + } + } + return false; + } + + private Options getOptions() { + Options options = getCommonOptions(); + + options.addOption( + Option.builder("table") + .required() + .desc("table name") + .hasArg() + .build() + ); + + return options; + } + + @Override + public int run(String[] args) throws Exception { + Options options = getOptions(); + + final CommandLineParser cmdLineParser = new DefaultParser(); + CommandLine commandLine; + try { + commandLine = cmdLineParser.parse(options, args); + } catch (ParseException parseException) { + System.out.println( + "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: " + + parseException); + printUsage(options); + return -1; + } + if (commandLine == null) { + System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args)); + printUsage(options); + return -1; + } + + String table = commandLine.getOptionValue("table"); + int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1")); + int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1")); + int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1")); + long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000))); + boolean dryRun = commandLine.hasOption("dryRun"); + boolean skipWait = commandLine.hasOption("skipWait"); + + return compactRegionsTTLOnTable(HBaseConfiguration.create(), table, concurrency, sleep, + numServers, numRegions, dryRun, skipWait); + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(HBaseConfiguration.create(), new MajorCompactorTTL(), args); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java index adecd5c6b04..c125c6e9889 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java @@ -17,11 +17,24 @@ */ package org.apache.hadoop.hbase.util.compaction; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; + import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -39,24 +52,13 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hbase.thirdparty.com.google.common.collect.Sets; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isA; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; @Category({SmallTests.class}) public class TestMajorCompactionRequest { @@ -64,10 +66,10 @@ public class TestMajorCompactionRequest { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMajorCompactionRequest.class); - private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility(); - private static final String FAMILY = "a"; - private Path rootRegionDir; - private Path regionStoreDir; + protected static final HBaseTestingUtility UTILITY = new HBaseTestingUtility(); + protected static final String FAMILY = "a"; + protected Path rootRegionDir; + protected Path regionStoreDir; @Before public void setUp() throws Exception { rootRegionDir = UTILITY.getDataTestDirOnTestFS("TestMajorCompactionRequest"); @@ -77,15 +79,15 @@ public class TestMajorCompactionRequest { @Test public void testStoresNeedingCompaction() throws Exception { // store files older than timestamp List storeFiles = mockStoreFiles(regionStoreDir, 5, 10); - MajorCompactionRequest request = makeMockRequest(100, storeFiles, false); + MajorCompactionRequest request = makeMockRequest(storeFiles, false); Optional result = - request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY)); + request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100); assertTrue(result.isPresent()); // store files newer than timestamp storeFiles = mockStoreFiles(regionStoreDir, 5, 101); - request = makeMockRequest(100, storeFiles, false); - result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY)); + request = makeMockRequest(storeFiles, false); + result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100); assertFalse(result.isPresent()); } @@ -106,16 +108,17 @@ public class TestMajorCompactionRequest { HRegionFileSystem fileSystem = mockFileSystem(region.getRegionInfo(), true, storeFiles, 50); MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration, - region.getRegionInfo(), Sets.newHashSet(FAMILY), 100)); + region.getRegionInfo(), Sets.newHashSet(FAMILY))); doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration)); doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class), any(Path.class)); doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class)); - Set result = majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a")); + Set result = + majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"), 100); assertEquals(FAMILY, Iterables.getOnlyElement(result)); } - private HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles, + protected HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles, List storeFiles) throws IOException { long timestamp = storeFiles.stream().findFirst().get().getModificationTime(); return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp); @@ -138,7 +141,7 @@ public class TestMajorCompactionRequest { return mockSystem; } - private List mockStoreFiles(Path regionStoreDir, int howMany, long timestamp) + protected List mockStoreFiles(Path regionStoreDir, int howMany, long timestamp) throws IOException { List infos = Lists.newArrayList(); int i = 0; @@ -153,14 +156,14 @@ public class TestMajorCompactionRequest { return infos; } - private MajorCompactionRequest makeMockRequest(long timestamp, List storeFiles, + private MajorCompactionRequest makeMockRequest(List storeFiles, boolean references) throws IOException { Configuration configuration = mock(Configuration.class); RegionInfo regionInfo = mock(RegionInfo.class); when(regionInfo.getEncodedName()).thenReturn("HBase"); when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo")); MajorCompactionRequest request = - new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"), timestamp); + new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a")); MajorCompactionRequest spy = spy(request); HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles); doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java new file mode 100644 index 00000000000..f15b887b0c0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.compaction; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; + +@Category({SmallTests.class}) +public class TestMajorCompactionTTLRequest extends TestMajorCompactionRequest { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMajorCompactionTTLRequest.class); + + @Before + @Override + public void setUp() throws Exception { + rootRegionDir = UTILITY.getDataTestDirOnTestFS("TestMajorCompactionTTLRequest"); + regionStoreDir = new Path(rootRegionDir, FAMILY); + } + + @Test + public void testStoresNeedingCompaction() throws Exception { + // store files older than timestamp 10 + List storeFiles1 = mockStoreFiles(regionStoreDir, 5, 10); + // store files older than timestamp 100 + List storeFiles2 = mockStoreFiles(regionStoreDir, 5, 100); + List storeFiles = Lists.newArrayList(storeFiles1); + storeFiles.addAll(storeFiles2); + + MajorCompactionTTLRequest request = makeMockRequest(storeFiles); + // All files are <= 100, so region should not be compacted. + Optional result = + request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 10); + assertFalse(result.isPresent()); + + // All files are <= 100, so region should not be compacted yet. + result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100); + assertFalse(result.isPresent()); + + // All files are <= 100, so they should be considered for compaction + result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 101); + assertTrue(result.isPresent()); + } + + private MajorCompactionTTLRequest makeMockRequest(List storeFiles) + throws IOException { + Configuration configuration = mock(Configuration.class); + RegionInfo regionInfo = mock(RegionInfo.class); + when(regionInfo.getEncodedName()).thenReturn("HBase"); + when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo")); + MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(configuration, regionInfo); + MajorCompactionTTLRequest spy = spy(request); + HRegionFileSystem fileSystem = mockFileSystem(regionInfo, false, storeFiles); + doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class)); + doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration)); + return spy; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java index ccf01462bd4..7ea80b1226a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactor.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util.compaction; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -42,7 +43,8 @@ public class TestMajorCompactor { HBaseClassTestRule.forClass(TestMajorCompactor.class); public static final byte[] FAMILY = Bytes.toBytes("a"); - private HBaseTestingUtility utility; + protected HBaseTestingUtility utility; + protected Admin admin; @Before public void setUp() throws Exception { utility = new HBaseTestingUtility(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java new file mode 100644 index 00000000000..44abda61a6e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.compaction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MiscTests.class, MediumTests.class }) +public class TestMajorCompactorTTL extends TestMajorCompactor { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMajorCompactorTTL.class); + + @Rule + public TestName name = new TestName(); + + @Before + @Override + public void setUp() throws Exception { + utility = new HBaseTestingUtility(); + utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10); + utility.startMiniCluster(); + admin = utility.getAdmin(); + } + + @After + @Override + public void tearDown() throws Exception { + utility.shutdownMiniCluster(); + } + + @Test + public void testCompactingATable() throws Exception { + TableName tableName = createTable(name.getMethodName()); + + // Delay a bit, so we can set the table TTL to 5 seconds + Thread.sleep(10 * 1000); + + int numberOfRegions = admin.getRegions(tableName).size(); + int numHFiles = utility.getNumHFiles(tableName, FAMILY); + // we should have a table with more store files than we would before we major compacted. + assertTrue(numberOfRegions < numHFiles); + modifyTTL(tableName); + + MajorCompactorTTL compactor = new MajorCompactorTTL(utility.getConfiguration(), + admin.getDescriptor(tableName), 1, 200); + compactor.initializeWorkQueues(); + compactor.compactAllRegions(); + compactor.shutdown(); + + // verify that the store has been completely major compacted. + numberOfRegions = admin.getRegions(tableName).size(); + numHFiles = utility.getNumHFiles(tableName, FAMILY); + assertEquals(numberOfRegions, numHFiles); + } + + protected void modifyTTL(TableName tableName) throws IOException, InterruptedException { + // Set the TTL to 5 secs, so all the files just written above will get cleaned up on compact. + admin.disableTable(tableName); + utility.waitTableDisabled(tableName.getName()); + TableDescriptor descriptor = admin.getDescriptor(tableName); + ColumnFamilyDescriptor colDesc = descriptor.getColumnFamily(FAMILY); + ColumnFamilyDescriptorBuilder cFDB = ColumnFamilyDescriptorBuilder.newBuilder(colDesc); + cFDB.setTimeToLive(5); + admin.modifyColumnFamily(tableName, cFDB.build()); + admin.enableTable(tableName); + utility.waitTableEnabled(tableName); + } + + protected TableName createTable(String name) throws IOException, InterruptedException { + TableName tableName = TableName.valueOf(name); + utility.createMultiRegionTable(tableName, FAMILY, 5); + utility.waitTableAvailable(tableName); + Connection connection = utility.getConnection(); + Table table = connection.getTable(tableName); + // write data and flush multiple store files: + for (int i = 0; i < 5; i++) { + utility.loadRandomRows(table, FAMILY, 50, 100); + utility.flush(tableName); + } + table.close(); + return tableName; + } +} \ No newline at end of file