diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java new file mode 100644 index 00000000000..335720e2690 --- /dev/null +++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupMajorCompactionTTL.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Arrays; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.compaction.MajorCompactorTTL; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This script takes an rsgroup as argument and compacts part/all of regions of that table + * based on the table's TTL. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class RSGroupMajorCompactionTTL extends MajorCompactorTTL { + + private static final Logger LOG = LoggerFactory.getLogger(RSGroupMajorCompactionTTL.class); + + @VisibleForTesting + RSGroupMajorCompactionTTL() { + super(); + } + + public int compactTTLRegionsOnGroup(Configuration conf, String rsgroup, int concurrency, + long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait) + throws Exception { + + Connection conn = ConnectionFactory.createConnection(conf); + RSGroupAdmin rsGroupAdmin = new RSGroupAdminClient(conn); + + RSGroupInfo rsGroupInfo = rsGroupAdmin.getRSGroupInfo(rsgroup); + if (rsGroupInfo == null) { + LOG.error("Invalid rsgroup specified: " + rsgroup); + throw new IllegalArgumentException("Invalid rsgroup specified: " + rsgroup); + } + + for (TableName tableName : rsGroupInfo.getTables()) { + int status = compactRegionsTTLOnTable(conf, tableName.getNameAsString(), concurrency, sleep, + numServers, numRegions, dryRun, skipWait); + if (status != 0) { + LOG.error("Failed to compact table: " + tableName); + return status; + } + } + return 0; + } + + protected Options getOptions() { + Options options = getCommonOptions(); + + Option rsGroupOption = new Option("rsgroup", true, "Tables of rsgroup to be compacted"); + rsGroupOption.setRequired(true); + options.addOption(rsGroupOption); + + return options; + } + + @Override + public int run(String[] args) throws Exception { + Options options = getOptions(); + + final CommandLineParser cmdLineParser = new BasicParser(); + CommandLine commandLine; + try { + commandLine = cmdLineParser.parse(options, args); + } catch (ParseException parseException) { + System.err.println("ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + + " due to: " + parseException); + printUsage(options); + throw parseException; + } + + String rsgroup = commandLine.getOptionValue("rsgroup"); + int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1")); + int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1")); + int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1")); + long sleep = Long.parseLong(commandLine.getOptionValue("sleep", "30000")); + boolean dryRun = commandLine.hasOption("dryRun"); + boolean skipWait = commandLine.hasOption("skipWait"); + Configuration conf = HBaseConfiguration.create(); + + return compactTTLRegionsOnGroup(conf, rsgroup, concurrency, sleep, numServers, numRegions, + dryRun, skipWait); + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(HBaseConfiguration.create(), new RSGroupMajorCompactionTTL(), args); + } +} diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java new file mode 100644 index 00000000000..58625fd41ac --- /dev/null +++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupMajorCompactionTTL.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.rsgroup; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.util.compaction.TestMajorCompactorTTL; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestRSGroupMajorCompactionTTL extends TestMajorCompactorTTL { + + private final static int NUM_SLAVES_BASE = 6; + + @Before + public void setUp() throws Exception { + utility = new HBaseTestingUtility(); + Configuration conf = utility.getConfiguration(); + conf.set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class.getName()); + conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, RSGroupAdminEndpoint.class.getName()); + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_SLAVES_BASE); + conf.setInt("hbase.hfile.compaction.discharger.interval", 10); + utility.startMiniCluster(NUM_SLAVES_BASE); + MiniHBaseCluster cluster = utility.getHBaseCluster(); + final HMaster master = cluster.getMaster(); + + //wait for balancer to come online + utility.waitFor(60000, new Waiter.Predicate() { + @Override + public boolean evaluate() { + return master.isInitialized() && + ((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline(); + } + }); + admin = utility.getHBaseAdmin(); + } + + @After + public void tearDown() throws Exception { + utility.shutdownMiniCluster(); + } + + @Test + public void testCompactingTables() throws Exception { + List tableNames = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + tableNames.add(createTable(name.getMethodName() + "___" + i)); + } + + // Delay a bit, so we can set the table TTL to 5 seconds + Thread.sleep(10 * 1000); + + for (TableName tableName : tableNames) { + int numberOfRegions = admin.getTableRegions(tableName).size(); + int numHFiles = utility.getNumHFiles(tableName, FAMILY); + // we should have a table with more store files than we would before we major compacted. + assertTrue(numberOfRegions < numHFiles); + modifyTTL(tableName); + } + + RSGroupMajorCompactionTTL compactor = new RSGroupMajorCompactionTTL(); + compactor.compactTTLRegionsOnGroup(utility.getConfiguration(), + RSGroupInfo.DEFAULT_GROUP, 1, 200, -1, -1, false, false); + + for (TableName tableName : tableNames) { + int numberOfRegions = admin.getTableRegions(tableName).size(); + int numHFiles = utility.getNumHFiles(tableName, FAMILY); + assertEquals(numberOfRegions, numHFiles); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java index 91bdb732158..bae287a27a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java @@ -44,25 +44,27 @@ class MajorCompactionRequest { private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class); - private final Configuration configuration; - private final HRegionInfo region; + protected final Configuration configuration; + protected final HRegionInfo region; private Set stores; - private final long timestamp; + + MajorCompactionRequest(Configuration configuration, HRegionInfo region) { + this.configuration = configuration; + this.region = region; + } @VisibleForTesting MajorCompactionRequest(Configuration configuration, HRegionInfo region, - Set stores, long timestamp) { - this.configuration = configuration; - this.region = region; + Set stores) { + this(configuration, region); this.stores = stores; - this.timestamp = timestamp; } static Optional newRequest(Configuration configuration, HRegionInfo info, Set stores, long timestamp) throws IOException { MajorCompactionRequest request = - new MajorCompactionRequest(configuration, info, stores, timestamp); - return request.createRequest(configuration, stores); + new MajorCompactionRequest(configuration, info, stores); + return request.createRequest(configuration, stores, timestamp); } HRegionInfo getRegion() { @@ -79,67 +81,80 @@ class MajorCompactionRequest { @VisibleForTesting Optional createRequest(Configuration configuration, - Set stores) throws IOException { - Set familiesToCompact = getStoresRequiringCompaction(stores); + Set stores, long timestamp) throws IOException { + Set familiesToCompact = getStoresRequiringCompaction(stores, timestamp); MajorCompactionRequest request = null; if (!familiesToCompact.isEmpty()) { - request = new MajorCompactionRequest(configuration, region, familiesToCompact, timestamp); + request = new MajorCompactionRequest(configuration, region, familiesToCompact); } return Optional.fromNullable(request); } - Set getStoresRequiringCompaction(Set requestedStores) throws IOException { + Set getStoresRequiringCompaction(Set requestedStores, long timestamp) + throws IOException { try(Connection connection = getConnection(configuration)) { HRegionFileSystem fileSystem = getFileSystem(connection); Set familiesToCompact = Sets.newHashSet(); for (String family : requestedStores) { - // do we have any store files? - Collection storeFiles = fileSystem.getStoreFiles(family); - if (storeFiles == null) { - LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem - .getRegionInfo().getEncodedName(), " has no store files"); - continue; - } - // check for reference files - if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family)) { + if (shouldCFBeCompacted(fileSystem, family, timestamp)) { familiesToCompact.add(family); - LOG.info("Including store: " + family + " with: " + storeFiles.size() - + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName()); - continue; - } - // check store file timestamps - boolean includeStore = false; - for (StoreFileInfo storeFile : storeFiles) { - if (storeFile.getModificationTime() < timestamp) { - LOG.info("Including store: " + family + " with: " + storeFiles.size() - + " files for compaction for region: " - + fileSystem.getRegionInfo().getEncodedName()); - familiesToCompact.add(family); - includeStore = true; - break; - } - } - if (!includeStore) { - LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem - .getRegionInfo().getEncodedName(), " already compacted"); } } return familiesToCompact; } } + boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts) + throws IOException { + + // do we have any store files? + Collection storeFiles = fileSystem.getStoreFiles(family); + if (storeFiles == null) { + LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem + .getRegionInfo().getEncodedName(), " has no store files"); + return false; + } + // check for reference files + if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family, ts)) { + LOG.info("Including store: " + family + " with: " + storeFiles.size() + + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName()); + return true; + } + // check store file timestamps + boolean includeStore = this.shouldIncludeStore(fileSystem, family, storeFiles, ts); + if (!includeStore) { + LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem + .getRegionInfo().getEncodedName() + " already compacted"); + } + return includeStore; + } + + protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family, + Collection storeFiles, long ts) throws IOException { + + for (StoreFileInfo storeFile : storeFiles) { + if (storeFile.getModificationTime() < ts) { + LOG.info("Including store: " + family + " with: " + storeFiles.size() + + " files for compaction for region: " + + fileSystem.getRegionInfo().getEncodedName()); + return true; + } + } + return false; + } + @VisibleForTesting Connection getConnection(Configuration configuration) throws IOException { return ConnectionFactory.createConnection(configuration); } - private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family) + protected boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family, long ts) throws IOException { List referenceFiles = getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family)); for (Path referenceFile : referenceFiles) { FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile); - if (status.getModificationTime() < timestamp) { + if (status.getModificationTime() < ts) { LOG.info("Including store: " + family + " for compaction for region: " + fileSystem .getRegionInfo().getEncodedName() + " (reference store files)"); return true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java new file mode 100644 index 00000000000..060bb8a8bf5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.compaction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This request helps determine if a region has to be compacted based on table's TTL. + */ +@InterfaceAudience.Private +public class MajorCompactionTTLRequest extends MajorCompactionRequest { + + private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionTTLRequest.class); + + MajorCompactionTTLRequest(Configuration conf, HRegionInfo region) { + super(conf, region); + } + + static Optional newRequest(Configuration conf, HRegionInfo info, + HTableDescriptor htd) throws IOException { + MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(conf, info); + return request.createRequest(conf, htd); + } + + @VisibleForTesting + private Optional createRequest(Configuration conf, HTableDescriptor htd) + throws IOException { + Map familiesToCompact = getStoresRequiringCompaction(htd); + MajorCompactionRequest request = null; + if (!familiesToCompact.isEmpty()) { + LOG.debug("Compaction families for region: " + region + " CF: " + familiesToCompact.keySet()); + request = new MajorCompactionTTLRequest(conf, region); + } + return Optional.fromNullable(request); + } + + Map getStoresRequiringCompaction(HTableDescriptor htd) throws IOException { + try(Connection connection = getConnection(configuration)) { + HRegionFileSystem fileSystem = getFileSystem(connection); + Map familyTTLMap = Maps.newHashMap(); + for (HColumnDescriptor descriptor : htd.getColumnFamilies()) { + long ts = getColFamilyCutoffTime(descriptor); + // If the table's TTL is forever, lets not compact any of the regions. + if (ts > 0 && shouldCFBeCompacted(fileSystem, descriptor.getNameAsString(), ts)) { + familyTTLMap.put(descriptor.getNameAsString(), ts); + } + } + return familyTTLMap; + } + } + + // If the CF has no TTL, return -1, else return the current time - TTL. + private long getColFamilyCutoffTime(HColumnDescriptor colDesc) { + if (colDesc.getTimeToLive() == HConstants.FOREVER) { + return -1; + } + return System.currentTimeMillis() - (colDesc.getTimeToLive() * 1000L); + } + + @Override + protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family, + Collection storeFiles, long ts) throws IOException { + + for (StoreFileInfo storeFile : storeFiles) { + // Lets only compact when all files are older than TTL + if (storeFile.getModificationTime() >= ts) { + LOG.info("There is atleast one file in store: " + family + " file: " + storeFile.getPath() + + " with timestamp " + storeFile.getModificationTime() + + " for region: " + fileSystem.getRegionInfo().getEncodedName() + + " older than TTL: " + ts); + return false; + } + } + return true; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java index a5e44945edc..96faee8c240 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.util.compaction; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -31,6 +34,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; @@ -40,10 +44,12 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.ServerName; @@ -54,22 +60,30 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) -public class MajorCompactor { +public class MajorCompactor extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class); - private static final Set ERRORS = Sets.newHashSet(); + protected static final Set ERRORS = Sets.newHashSet(); - private final ClusterCompactionQueues clusterCompactionQueues; - private final long timestamp; - private final Set storesToCompact; - private final ExecutorService executor; - private final long sleepForMs; - private final Connection connection; - private final TableName tableName; + protected ClusterCompactionQueues clusterCompactionQueues; + private long timestamp; + protected Set storesToCompact; + protected ExecutorService executor; + protected long sleepForMs; + protected Connection connection; + protected TableName tableName; + private int numServers = -1; + private int numRegions = -1; + private boolean skipWait = false; + + MajorCompactor() { + } public MajorCompactor(Configuration conf, TableName tableName, Set storesToCompact, int concurrency, long timestamp, long sleepForMs) throws IOException { @@ -157,18 +171,85 @@ public class MajorCompactor { } LOG.info( "Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact); - List regionLocations = - connection.getRegionLocator(tableName).getAllRegionLocations(); - for (HRegionLocation location : regionLocations) { - Optional request = MajorCompactionRequest - .newRequest(connection.getConfiguration(), location.getRegionInfo(), storesToCompact, - timestamp); - if (request.isPresent()) { - clusterCompactionQueues.addToCompactionQueue(location.getServerName(), request.get()); + + Map> snRegionMap = getServerRegionsMap(); + /* + * If numservers is specified, stop inspecting regions beyond the numservers, it will serve + * to throttle and won't end up scanning all the regions in the event there are not many + * regions to compact based on the criteria. + */ + for (ServerName sn : getServersToCompact(snRegionMap.keySet())) { + List regions = snRegionMap.get(sn); + LOG.debug("Table: " + tableName + " Server: " + sn + " No of regions: " + regions.size()); + + /* + * If the tool is run periodically, then we could shuffle the regions and provide + * some random order to select regions. Helps if numregions is specified. + */ + Collections.shuffle(regions); + int regionsToCompact = numRegions; + for (HRegionInfo hri : regions) { + if (numRegions > 0 && regionsToCompact <= 0) { + LOG.debug("Reached region limit for server: " + sn); + break; + } + + Optional request = getMajorCompactionRequest(hri); + if (request.isPresent()) { + LOG.debug("Adding region " + hri + " to queue " + sn + " for compaction"); + clusterCompactionQueues.addToCompactionQueue(sn, request.get()); + if (numRegions > 0) { + regionsToCompact--; + } + } } } } + protected Optional getMajorCompactionRequest(HRegionInfo hri) + throws IOException { + return MajorCompactionRequest.newRequest(connection.getConfiguration(), hri, storesToCompact, + timestamp); + } + + private Collection getServersToCompact(Set snSet) { + if(numServers < 0 || snSet.size() <= numServers) { + return snSet; + + } else { + List snList = Lists.newArrayList(snSet); + Collections.shuffle(snList); + return snList.subList(0, numServers); + } + } + + private Map> getServerRegionsMap() throws IOException { + Map> snRegionMap = Maps.newHashMap(); + List regionLocations = + connection.getRegionLocator(tableName).getAllRegionLocations(); + for (HRegionLocation regionLocation : regionLocations) { + ServerName sn = regionLocation.getServerName(); + HRegionInfo hri = regionLocation.getRegionInfo(); + if (!snRegionMap.containsKey(sn)) { + snRegionMap.put(sn, Lists.newArrayList()); + } + snRegionMap.get(sn).add(hri); + } + return snRegionMap; + } + + public void setNumServers(int numServers) { + this.numServers = numServers; + } + + public void setNumRegions(int numRegions) { + this.numRegions = numRegions; + } + + public void setSkipWait(boolean skipWait) { + this.skipWait = skipWait; + } + class Compact implements Runnable { private final ServerName serverName; @@ -199,52 +280,68 @@ public class MajorCompactor { try { // only make the request if the region is not already major compacting if (!isCompacting(request)) { - Set stores = request.getStoresRequiringCompaction(storesToCompact); + Set stores = getStoresRequiringCompaction(request); if (!stores.isEmpty()) { request.setStores(stores); for (String store : request.getStores()) { - admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), - Bytes.toBytes(store)); + compactRegionOnServer(request, admin, store); } } } - while (isCompacting(request)) { - Thread.sleep(sleepForMs); - LOG.debug("Waiting for compaction to complete for region: " + request.getRegion() - .getEncodedName()); + + /* + * In some scenarios like compacting TTLed regions, the compaction itself won't take time + * and hence we can skip the wait. An external tool will also be triggered frequently and + * the next run can identify region movements and compact them. + */ + if (!skipWait) { + while (isCompacting(request)) { + Thread.sleep(sleepForMs); + LOG.debug("Waiting for compaction to complete for region: " + request.getRegion() + .getEncodedName()); + } } } finally { - // Make sure to wait for the CompactedFileDischarger chore to do its work - int waitForArchive = connection.getConfiguration() - .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); - Thread.sleep(waitForArchive); - // check if compaction completed successfully, otherwise put that request back in the - // proper queue - Set storesRequiringCompaction = - request.getStoresRequiringCompaction(storesToCompact); - if (!storesRequiringCompaction.isEmpty()) { - // this happens, when a region server is marked as dead, flushes a store file and - // the new regionserver doesn't pick it up because its accounted for in the WAL replay, - // thus you have more store files on the filesystem than the regionserver knows about. - boolean regionHasNotMoved = connection.getRegionLocator(tableName) - .getRegionLocation(request.getRegion().getStartKey()).getServerName() - .equals(serverName); - if (regionHasNotMoved) { - LOG.error("Not all store files were compacted, this may be due to the regionserver not " - + "being aware of all store files. Will not reattempt compacting, " + request); - ERRORS.add(request); + if (!skipWait) { + // Make sure to wait for the CompactedFileDischarger chore to do its work + int waitForArchive = connection.getConfiguration() + .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); + Thread.sleep(waitForArchive); + // check if compaction completed successfully, otherwise put that request back in the + // proper queue + Set storesRequiringCompaction = getStoresRequiringCompaction(request); + if (!storesRequiringCompaction.isEmpty()) { + // this happens, when a region server is marked as dead, flushes a store file and + // the new regionserver doesn't pick it up because its accounted for in the WAL replay, + // thus you have more store files on the filesystem than the regionserver knows about. + boolean regionHasNotMoved = connection.getRegionLocator(tableName) + .getRegionLocation(request.getRegion().getStartKey()).getServerName() + .equals(serverName); + if (regionHasNotMoved) { + LOG.error( + "Not all store files were compacted, this may be due to the regionserver not " + + "being aware of all store files. Will not reattempt compacting, " + + request); + ERRORS.add(request); + } else { + request.setStores(storesRequiringCompaction); + clusterCompactionQueues.addToCompactionQueue(serverName, request); + LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction + + " region: " + request.getRegion().getEncodedName()); + } } else { - request.setStores(storesRequiringCompaction); - clusterCompactionQueues.addToCompactionQueue(serverName, request); - LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction - + " region: " + request.getRegion().getEncodedName()); + LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName() + + " -> cf(s): " + request.getStores()); } - } else { - LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName() - + " -> cf(s): " + request.getStores()); } } } + + private void compactRegionOnServer(MajorCompactionRequest request, Admin admin, String store) + throws IOException { + admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), + Bytes.toBytes(store)); + } } private boolean isCompacting(MajorCompactionRequest request) throws Exception { @@ -261,9 +358,8 @@ public class MajorCompactor { connection.getRegionLocator(tableName).getAllRegionLocations(); for (HRegionLocation location : locations) { if (location.getRegionInfo().getRegionId() > timestamp) { - Optional compactionRequest = MajorCompactionRequest - .newRequest(connection.getConfiguration(), location.getRegionInfo(), storesToCompact, - timestamp); + Optional compactionRequest = + getMajorCompactionRequest(location.getRegionInfo()); if (compactionRequest.isPresent()) { clusterCompactionQueues .addToCompactionQueue(location.getServerName(), compactionRequest.get()); @@ -275,8 +371,44 @@ public class MajorCompactor { } } - public static void main(String[] args) throws Exception { + protected Set getStoresRequiringCompaction(MajorCompactionRequest request) + throws IOException { + return request.getStoresRequiringCompaction(storesToCompact, timestamp); + } + + protected Options getCommonOptions() { Options options = new Options(); + Option serverOption = new Option("servers", true, "Concurrent servers compacting"); + serverOption.setRequired(true); + options.addOption(serverOption); + + Option sleepOption = new Option("sleep", true, "Time to sleepForMs (ms) for checking " + + "compaction status per region and available " + + "work queues: default 30s"); + options.addOption(sleepOption); + + Option retryOption = new Option("retries", true, "Max # of retries for a compaction request," + + " defaults to 3"); + options.addOption(retryOption); + + options.addOption(new Option("dryRun", false, "Dry run, will just output a list of regions" + + " that require compaction based on parameters passed")); + + options.addOption(new Option("skipWait", false, "Skip waiting after triggering compaction.")); + + Option numServersOption = new Option("numservers", true, "Number of servers to compact in " + + "this run, defaults to all"); + options.addOption(numServersOption); + + Option numRegionsOption = new Option("numregions", true, "Number of regions to compact per" + + "server, defaults to all"); + options.addOption(numRegionsOption); + return options; + } + + @Override + public int run(String[] args) throws Exception { + Options options = getCommonOptions(); Option tableOption = new Option("table", true, "table name"); tableOption.setRequired(true); options.addOption(tableOption); @@ -285,10 +417,6 @@ public class MajorCompactor { cfOption.setOptionalArg(true); options.addOption(cfOption); - Option serverOption = new Option("servers", true, "Concurrent servers compacting"); - serverOption.setRequired(true); - options.addOption(serverOption); - options.addOption(new Option("minModTime", true, "Compact if store files have modification time < minModTime")); @@ -300,25 +428,6 @@ public class MajorCompactor { rootDirOption.setOptionalArg(true); options.addOption(rootDirOption); - Option sleepOption = new Option("sleep", true, "Time to sleepForMs (ms) for checking " - + "compaction status per region and available " - + "work queues: default 30s"); - options.addOption(sleepOption); - - Option retryOption = new Option("retries", true, "Max # of retries for a compaction request," - + " defaults to 3"); - options.addOption( - retryOption - ); - - options.addOption( - new Option( - "dryRun", - false, - "Dry run, will just output a list of regions that require compaction " - + "based on parameters passed") - ); - final CommandLineParser cmdLineParser = new BasicParser(); CommandLine commandLine = null; try { @@ -338,7 +447,6 @@ public class MajorCompactor { Iterables.addAll(families, Splitter.on(",").split(cf)); } - Configuration configuration = HBaseConfiguration.create(); int concurrency = Integer.parseInt(commandLine.getOptionValue("servers")); long minModTime = Long.parseLong( @@ -348,25 +456,35 @@ public class MajorCompactor { String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR)); long sleep = Long.parseLong(commandLine.getOptionValue("sleep", "30000")); + int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1")); + int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1")); + configuration.set(HConstants.HBASE_DIR, rootDir); configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum); MajorCompactor compactor = new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency, minModTime, sleep); + compactor.setNumServers(numServers); + compactor.setNumRegions(numRegions); + compactor.setSkipWait(commandLine.hasOption("skipWait")); compactor.initializeWorkQueues(); if (!commandLine.hasOption("dryRun")) { compactor.compactAllRegions(); } compactor.shutdown(); + return ERRORS.size(); } - private static void printUsage(final Options options) { + protected static void printUsage(final Options options) { String header = "\nUsage instructions\n\n"; String footer = "\n"; HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true); } + public static void main(String[] args) throws Exception { + ToolRunner.run(HBaseConfiguration.create(), new MajorCompactor(), args); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java new file mode 100644 index 00000000000..c37b8bdfc9a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.compaction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Set; +import java.util.concurrent.Executors; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This tool compacts a table's regions that are beyond it's TTL. It helps to save disk space and + * regions become empty as a result of compaction. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class MajorCompactorTTL extends MajorCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(MajorCompactorTTL .class); + + private HTableDescriptor htd; + + @VisibleForTesting + public MajorCompactorTTL(Configuration conf, HTableDescriptor htd, int concurrency, + long sleepForMs) throws IOException { + this.connection = ConnectionFactory.createConnection(conf); + this.htd = htd; + this.tableName = htd.getTableName(); + this.storesToCompact = Sets.newHashSet(); // Empty set so all stores will be compacted + this.sleepForMs = sleepForMs; + this.executor = Executors.newFixedThreadPool(concurrency); + this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency); + } + + protected MajorCompactorTTL() { + super(); + } + + @Override + protected Optional getMajorCompactionRequest(HRegionInfo hri) + throws IOException { + return MajorCompactionTTLRequest.newRequest(connection.getConfiguration(), hri, htd); + } + + @Override + protected Set getStoresRequiringCompaction(MajorCompactionRequest request) + throws IOException { + return ((MajorCompactionTTLRequest)request).getStoresRequiringCompaction(htd).keySet(); + } + + public int compactRegionsTTLOnTable(Configuration conf, String table, int concurrency, + long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait) + throws Exception { + + Connection conn = ConnectionFactory.createConnection(conf); + TableName tableName = TableName.valueOf(table); + + HTableDescriptor htd = conn.getAdmin().getTableDescriptor(tableName); + if (!doesAnyColFamilyHaveTTL(htd)) { + LOG.info("No TTL present for CF of table: " + tableName + ", skipping compaction"); + return 0; + } + + LOG.info("Major compacting table " + tableName + " based on TTL"); + MajorCompactor compactor = new MajorCompactorTTL(conf, htd, concurrency, sleep); + compactor.setNumServers(numServers); + compactor.setNumRegions(numRegions); + compactor.setSkipWait(skipWait); + + compactor.initializeWorkQueues(); + if (!dryRun) { + compactor.compactAllRegions(); + } + compactor.shutdown(); + return ERRORS.size(); + } + + private boolean doesAnyColFamilyHaveTTL(HTableDescriptor htd) { + for (HColumnDescriptor descriptor : htd.getColumnFamilies()) { + if (descriptor.getTimeToLive() != HConstants.FOREVER) { + return true; + } + } + return false; + } + + private Options getOptions() { + Options options = getCommonOptions(); + + Option tableOption = new Option("table", true, "Table to be compacted"); + tableOption.setRequired(true); + options.addOption(tableOption); + + return options; + } + + @Override + public int run(String[] args) throws Exception { + Options options = getOptions(); + + final CommandLineParser cmdLineParser = new BasicParser(); + CommandLine commandLine; + try { + commandLine = cmdLineParser.parse(options, args); + } catch (ParseException parseException) { + System.err.println("ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + + " due to: " + parseException); + printUsage(options); + throw parseException; + } + + String table = commandLine.getOptionValue("table"); + int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1")); + int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1")); + int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1")); + long sleep = Long.parseLong(commandLine.getOptionValue("sleep", "30000")); + boolean dryRun = commandLine.hasOption("dryRun"); + boolean skipWait = commandLine.hasOption("skipWait"); + + return compactRegionsTTLOnTable(HBaseConfiguration.create(), table, concurrency, sleep, + numServers, numRegions, dryRun, skipWait); + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(HBaseConfiguration.create(), new MajorCompactorTTL(), args); + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java index de6f5807da3..122f6647cb2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java @@ -59,10 +59,10 @@ import static org.mockito.Mockito.when; @Category({SmallTests.class}) public class MajorCompactionRequestTest { - private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility(); - private static final String FAMILY = "a"; - private Path rootRegionDir; - private Path regionStoreDir; + protected static final HBaseTestingUtility UTILITY = new HBaseTestingUtility(); + protected static final String FAMILY = "a"; + protected Path rootRegionDir; + protected Path regionStoreDir; @Before public void setUp() throws Exception { rootRegionDir = UTILITY.getDataTestDirOnTestFS("MajorCompactionRequestTest"); @@ -72,15 +72,15 @@ public class MajorCompactionRequestTest { @Test public void testStoresNeedingCompaction() throws Exception { // store files older than timestamp List storeFiles = mockStoreFiles(regionStoreDir, 5, 10); - MajorCompactionRequest request = makeMockRequest(100, storeFiles, false); + MajorCompactionRequest request = makeMockRequest(storeFiles, false); Optional result = - request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY)); + request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100); assertTrue(result.isPresent()); // store files newer than timestamp storeFiles = mockStoreFiles(regionStoreDir, 5, 101); - request = makeMockRequest(100, storeFiles, false); - result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY)); + request = makeMockRequest(storeFiles, false); + result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100); assertFalse(result.isPresent()); } @@ -106,12 +106,13 @@ public class MajorCompactionRequestTest { HRegionFileSystem fileSystem = mockFileSystem(region.getRegionInfo(), true, storeFiles, 50); MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration, - region.getRegionInfo(), Sets.newHashSet(FAMILY), 100)); + region.getRegionInfo())); doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration)); doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class), any(Path.class)); doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class)); - Set result = majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a")); + Set result = majorCompactionRequest + .getStoresRequiringCompaction(Sets.newHashSet("a"), 100); assertEquals(FAMILY, Iterables.getOnlyElement(result)); } @@ -143,7 +144,7 @@ public class MajorCompactionRequestTest { return mockSystem; } - private List mockStoreFiles(Path regionStoreDir, int howMany, long timestamp) + List mockStoreFiles(Path regionStoreDir, int howMany, long timestamp) throws IOException { List infos = Lists.newArrayList(); int i = 0; @@ -158,14 +159,14 @@ public class MajorCompactionRequestTest { return infos; } - private MajorCompactionRequest makeMockRequest(long timestamp, List storeFiles, + private MajorCompactionRequest makeMockRequest(List storeFiles, boolean references) throws IOException { Configuration configuration = mock(Configuration.class); HRegionInfo regionInfo = mock(HRegionInfo.class); when(regionInfo.getEncodedName()).thenReturn("HBase"); when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo")); MajorCompactionRequest request = - new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"), timestamp); + new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a")); MajorCompactionRequest spy = spy(request); HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles); doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java index 751a8492676..3b3e3f2fe13 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.Sets; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -40,7 +41,8 @@ import org.junit.experimental.categories.Category; public class MajorCompactorTest { public static final byte[] FAMILY = Bytes.toBytes("a"); - private HBaseTestingUtility utility; + protected HBaseTestingUtility utility; + protected HBaseAdmin admin; @Before public void setUp() throws Exception { utility = new HBaseTestingUtility(); @@ -82,7 +84,7 @@ public class MajorCompactorTest { assertEquals(numHFiles, numberOfRegions); } - private void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) + protected void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) throws IOException { Random r = new Random(); byte[] row = new byte[rowSize]; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java new file mode 100644 index 00000000000..0454dc54abd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.compaction; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({SmallTests.class}) +public class TestMajorCompactionTTLRequest { + + private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility(); + private static final String FAMILY = "a"; + private Path regionStoreDir; + + @Before + public void setUp() throws Exception { + Path rootRegionDir = UTILITY.getDataTestDirOnTestFS("TestMajorCompactionTTLRequest"); + regionStoreDir = new Path(rootRegionDir, FAMILY); + } + + @Test + public void testStoresNeedingCompaction() throws Exception { + // store files older than timestamp 10 + List storeFiles1 = mockStoreFiles(regionStoreDir, 5, 10); + // store files older than timestamp 100 + List storeFiles2 = mockStoreFiles(regionStoreDir, 5, 100); + List storeFiles = Lists.newArrayList(storeFiles1); + storeFiles.addAll(storeFiles2); + + MajorCompactionTTLRequest request = makeMockRequest(storeFiles); + // All files are <= 100, so region should not be compacted. + Optional result = + request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 10); + assertFalse(result.isPresent()); + + // All files are <= 100, so region should not be compacted yet. + result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100); + assertFalse(result.isPresent()); + + // All files are <= 100, so they should be considered for compaction + result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 101); + assertTrue(result.isPresent()); + } + + private MajorCompactionTTLRequest makeMockRequest(List storeFiles) + throws IOException { + Configuration configuration = mock(Configuration.class); + HRegionInfo regionInfo = mock(HRegionInfo.class); + when(regionInfo.getEncodedName()).thenReturn("HBase"); + when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo")); + MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(configuration, regionInfo); + MajorCompactionTTLRequest spy = spy(request); + HRegionFileSystem fileSystem = mockFileSystem(regionInfo, false, storeFiles); + doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class)); + doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration)); + return spy; + } + + private HRegionFileSystem mockFileSystem(HRegionInfo info, boolean hasReferenceFiles, + List storeFiles) throws IOException { + Optional found = Optional.absent(); + for (StoreFileInfo storeFile : storeFiles) { + found = Optional.of(storeFile); + break; + } + long timestamp = found.get().getModificationTime(); + return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp); + } + + private List mockStoreFiles(Path regionStoreDir, int howMany, long timestamp) + throws IOException { + List infos = Lists.newArrayList(); + int i = 0; + while (i < howMany) { + StoreFileInfo storeFileInfo = mock(StoreFileInfo.class); + doReturn(timestamp).doReturn(timestamp).when(storeFileInfo).getModificationTime(); + doReturn(new Path(regionStoreDir, RandomStringUtils.randomAlphabetic(10))).when(storeFileInfo) + .getPath(); + infos.add(storeFileInfo); + i++; + } + return infos; + } + + private HRegionFileSystem mockFileSystem(HRegionInfo info, boolean hasReferenceFiles, + List storeFiles, long referenceFileTimestamp) throws IOException { + FileSystem fileSystem = mock(FileSystem.class); + if (hasReferenceFiles) { + FileStatus fileStatus = mock(FileStatus.class); + doReturn(referenceFileTimestamp).when(fileStatus).getModificationTime(); + doReturn(fileStatus).when(fileSystem).getFileLinkStatus(isA(Path.class)); + } + HRegionFileSystem mockSystem = mock(HRegionFileSystem.class); + doReturn(info).when(mockSystem).getRegionInfo(); + doReturn(regionStoreDir).when(mockSystem).getStoreDir(FAMILY); + doReturn(hasReferenceFiles).when(mockSystem).hasReferences(anyString()); + doReturn(storeFiles).when(mockSystem).getStoreFiles(anyString()); + doReturn(fileSystem).when(mockSystem).getFileSystem(); + return mockSystem; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java new file mode 100644 index 00000000000..44a61a904e2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactorTTL.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.compaction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MiscTests.class, MediumTests.class }) +public class TestMajorCompactorTTL extends MajorCompactorTest { + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws Exception { + utility = new HBaseTestingUtility(); + utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10); + utility.startMiniCluster(); + admin = utility.getHBaseAdmin(); + } + + @After + public void tearDown() throws Exception { + utility.shutdownMiniCluster(); + } + + @Test + public void testCompactingATable() throws Exception { + TableName tableName = createTable(name.getMethodName()); + + // Delay a bit, so we can set the table TTL to 5 seconds + Thread.sleep(10 * 1000); + + int numberOfRegions = utility.getHBaseAdmin().getTableRegions(tableName).size(); + int numHFiles = utility.getNumHFiles(tableName, FAMILY); + // we should have a table with more store files than we would before we major compacted. + assertTrue(numberOfRegions < numHFiles); + modifyTTL(tableName); + + MajorCompactorTTL compactor = new MajorCompactorTTL(utility.getConfiguration(), + admin.getTableDescriptor(tableName), 1, 200); + compactor.initializeWorkQueues(); + compactor.compactAllRegions(); + compactor.shutdown(); + + // verify that the store has been completely major compacted. + numberOfRegions = utility.getHBaseAdmin().getTableRegions(tableName).size(); + numHFiles = utility.getNumHFiles(tableName, FAMILY); + assertEquals(numberOfRegions, numHFiles); + } + + protected void modifyTTL(TableName tableName) throws IOException, InterruptedException { + // Set the TTL to 5 secs, so all the files just written above will get cleaned up on compact. + admin.disableTable(tableName); + utility.waitTableDisabled(tableName.getName()); + HTableDescriptor descriptor = admin.getTableDescriptor(tableName); + HColumnDescriptor colDesc = descriptor.getFamily(FAMILY); + colDesc.setTimeToLive(5); + admin.modifyColumn(tableName, colDesc); + admin.enableTable(tableName); + utility.waitTableEnabled(tableName); + } + + protected TableName createTable(String name) throws IOException, InterruptedException { + TableName tableName = TableName.valueOf(name); + utility.createMultiRegionTable(tableName, FAMILY, 5); + utility.waitTableAvailable(tableName); + Connection connection = utility.getConnection(); + Table table = connection.getTable(tableName); + // write data and flush multiple store files: + for (int i = 0; i < 5; i++) { + loadRandomRows(table, FAMILY, 50, 100); + utility.flush(tableName); + } + table.close(); + return tableName; + } +} \ No newline at end of file