HBASE-21883 Enhancements to Major Compaction tool

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Thiruvel Thirumoolan 2019-03-26 19:43:02 -07:00 committed by Andrew Purtell
parent d3cf01d006
commit 36b4c0fc6f
No known key found for this signature in database
GPG Key ID: 8597754DD5365CCD
10 changed files with 1023 additions and 137 deletions

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;
import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.compaction.MajorCompactorTTL;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This script takes an rsgroup as argument and compacts part/all of regions of that table
* based on the table's TTL.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
public class RSGroupMajorCompactionTTL extends MajorCompactorTTL {
private static final Logger LOG = LoggerFactory.getLogger(RSGroupMajorCompactionTTL.class);
@VisibleForTesting
RSGroupMajorCompactionTTL() {
super();
}
public int compactTTLRegionsOnGroup(Configuration conf, String rsgroup, int concurrency,
long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait)
throws Exception {
Connection conn = ConnectionFactory.createConnection(conf);
RSGroupAdmin rsGroupAdmin = new RSGroupAdminClient(conn);
RSGroupInfo rsGroupInfo = rsGroupAdmin.getRSGroupInfo(rsgroup);
if (rsGroupInfo == null) {
LOG.error("Invalid rsgroup specified: " + rsgroup);
throw new IllegalArgumentException("Invalid rsgroup specified: " + rsgroup);
}
for (TableName tableName : rsGroupInfo.getTables()) {
int status = compactRegionsTTLOnTable(conf, tableName.getNameAsString(), concurrency, sleep,
numServers, numRegions, dryRun, skipWait);
if (status != 0) {
LOG.error("Failed to compact table: " + tableName);
return status;
}
}
return 0;
}
protected Options getOptions() {
Options options = getCommonOptions();
Option rsGroupOption = new Option("rsgroup", true, "Tables of rsgroup to be compacted");
rsGroupOption.setRequired(true);
options.addOption(rsGroupOption);
return options;
}
@Override
public int run(String[] args) throws Exception {
Options options = getOptions();
final CommandLineParser cmdLineParser = new BasicParser();
CommandLine commandLine;
try {
commandLine = cmdLineParser.parse(options, args);
} catch (ParseException parseException) {
System.err.println("ERROR: Unable to parse command-line arguments " + Arrays.toString(args)
+ " due to: " + parseException);
printUsage(options);
throw parseException;
}
String rsgroup = commandLine.getOptionValue("rsgroup");
int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1"));
long sleep = Long.parseLong(commandLine.getOptionValue("sleep", "30000"));
boolean dryRun = commandLine.hasOption("dryRun");
boolean skipWait = commandLine.hasOption("skipWait");
Configuration conf = HBaseConfiguration.create();
return compactTTLRegionsOnGroup(conf, rsgroup, concurrency, sleep, numServers, numRegions,
dryRun, skipWait);
}
public static void main(String[] args) throws Exception {
ToolRunner.run(HBaseConfiguration.create(), new RSGroupMajorCompactionTTL(), args);
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.util.compaction.TestMajorCompactorTTL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestRSGroupMajorCompactionTTL extends TestMajorCompactorTTL {
private final static int NUM_SLAVES_BASE = 6;
@Before
public void setUp() throws Exception {
utility = new HBaseTestingUtility();
Configuration conf = utility.getConfiguration();
conf.set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class.getName());
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, RSGroupAdminEndpoint.class.getName());
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_SLAVES_BASE);
conf.setInt("hbase.hfile.compaction.discharger.interval", 10);
utility.startMiniCluster(NUM_SLAVES_BASE);
MiniHBaseCluster cluster = utility.getHBaseCluster();
final HMaster master = cluster.getMaster();
//wait for balancer to come online
utility.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() {
return master.isInitialized() &&
((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline();
}
});
admin = utility.getHBaseAdmin();
}
@After
public void tearDown() throws Exception {
utility.shutdownMiniCluster();
}
@Test
public void testCompactingTables() throws Exception {
List<TableName> tableNames = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
tableNames.add(createTable(name.getMethodName() + "___" + i));
}
// Delay a bit, so we can set the table TTL to 5 seconds
Thread.sleep(10 * 1000);
for (TableName tableName : tableNames) {
int numberOfRegions = admin.getTableRegions(tableName).size();
int numHFiles = utility.getNumHFiles(tableName, FAMILY);
// we should have a table with more store files than we would before we major compacted.
assertTrue(numberOfRegions < numHFiles);
modifyTTL(tableName);
}
RSGroupMajorCompactionTTL compactor = new RSGroupMajorCompactionTTL();
compactor.compactTTLRegionsOnGroup(utility.getConfiguration(),
RSGroupInfo.DEFAULT_GROUP, 1, 200, -1, -1, false, false);
for (TableName tableName : tableNames) {
int numberOfRegions = admin.getTableRegions(tableName).size();
int numHFiles = utility.getNumHFiles(tableName, FAMILY);
assertEquals(numberOfRegions, numHFiles);
}
}
}

View File

@ -44,25 +44,27 @@ class MajorCompactionRequest {
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class); private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class);
private final Configuration configuration; protected final Configuration configuration;
private final HRegionInfo region; protected final HRegionInfo region;
private Set<String> stores; private Set<String> stores;
private final long timestamp;
MajorCompactionRequest(Configuration configuration, HRegionInfo region) {
this.configuration = configuration;
this.region = region;
}
@VisibleForTesting @VisibleForTesting
MajorCompactionRequest(Configuration configuration, HRegionInfo region, MajorCompactionRequest(Configuration configuration, HRegionInfo region,
Set<String> stores, long timestamp) { Set<String> stores) {
this.configuration = configuration; this(configuration, region);
this.region = region;
this.stores = stores; this.stores = stores;
this.timestamp = timestamp;
} }
static Optional<MajorCompactionRequest> newRequest(Configuration configuration, HRegionInfo info, static Optional<MajorCompactionRequest> newRequest(Configuration configuration, HRegionInfo info,
Set<String> stores, long timestamp) throws IOException { Set<String> stores, long timestamp) throws IOException {
MajorCompactionRequest request = MajorCompactionRequest request =
new MajorCompactionRequest(configuration, info, stores, timestamp); new MajorCompactionRequest(configuration, info, stores);
return request.createRequest(configuration, stores); return request.createRequest(configuration, stores, timestamp);
} }
HRegionInfo getRegion() { HRegionInfo getRegion() {
@ -79,53 +81,66 @@ class MajorCompactionRequest {
@VisibleForTesting @VisibleForTesting
Optional<MajorCompactionRequest> createRequest(Configuration configuration, Optional<MajorCompactionRequest> createRequest(Configuration configuration,
Set<String> stores) throws IOException { Set<String> stores, long timestamp) throws IOException {
Set<String> familiesToCompact = getStoresRequiringCompaction(stores); Set<String> familiesToCompact = getStoresRequiringCompaction(stores, timestamp);
MajorCompactionRequest request = null; MajorCompactionRequest request = null;
if (!familiesToCompact.isEmpty()) { if (!familiesToCompact.isEmpty()) {
request = new MajorCompactionRequest(configuration, region, familiesToCompact, timestamp); request = new MajorCompactionRequest(configuration, region, familiesToCompact);
} }
return Optional.fromNullable(request); return Optional.fromNullable(request);
} }
Set<String> getStoresRequiringCompaction(Set<String> requestedStores) throws IOException { Set<String> getStoresRequiringCompaction(Set<String> requestedStores, long timestamp)
throws IOException {
try(Connection connection = getConnection(configuration)) { try(Connection connection = getConnection(configuration)) {
HRegionFileSystem fileSystem = getFileSystem(connection); HRegionFileSystem fileSystem = getFileSystem(connection);
Set<String> familiesToCompact = Sets.newHashSet(); Set<String> familiesToCompact = Sets.newHashSet();
for (String family : requestedStores) { for (String family : requestedStores) {
if (shouldCFBeCompacted(fileSystem, family, timestamp)) {
familiesToCompact.add(family);
}
}
return familiesToCompact;
}
}
boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
throws IOException {
// do we have any store files? // do we have any store files?
Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family); Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
if (storeFiles == null) { if (storeFiles == null) {
LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
.getRegionInfo().getEncodedName(), " has no store files"); .getRegionInfo().getEncodedName(), " has no store files");
continue; return false;
} }
// check for reference files // check for reference files
if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family)) { if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family, ts)) {
familiesToCompact.add(family);
LOG.info("Including store: " + family + " with: " + storeFiles.size() LOG.info("Including store: " + family + " with: " + storeFiles.size()
+ " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName()); + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName());
continue; return true;
} }
// check store file timestamps // check store file timestamps
boolean includeStore = false; boolean includeStore = this.shouldIncludeStore(fileSystem, family, storeFiles, ts);
if (!includeStore) {
LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
.getRegionInfo().getEncodedName() + " already compacted");
}
return includeStore;
}
protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family,
Collection<StoreFileInfo> storeFiles, long ts) throws IOException {
for (StoreFileInfo storeFile : storeFiles) { for (StoreFileInfo storeFile : storeFiles) {
if (storeFile.getModificationTime() < timestamp) { if (storeFile.getModificationTime() < ts) {
LOG.info("Including store: " + family + " with: " + storeFiles.size() LOG.info("Including store: " + family + " with: " + storeFiles.size()
+ " files for compaction for region: " + " files for compaction for region: "
+ fileSystem.getRegionInfo().getEncodedName()); + fileSystem.getRegionInfo().getEncodedName());
familiesToCompact.add(family); return true;
includeStore = true;
break;
} }
} }
if (!includeStore) { return false;
LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
.getRegionInfo().getEncodedName(), " already compacted");
}
}
return familiesToCompact;
}
} }
@VisibleForTesting @VisibleForTesting
@ -133,13 +148,13 @@ class MajorCompactionRequest {
return ConnectionFactory.createConnection(configuration); return ConnectionFactory.createConnection(configuration);
} }
private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family) protected boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family, long ts)
throws IOException { throws IOException {
List<Path> referenceFiles = List<Path> referenceFiles =
getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family)); getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family));
for (Path referenceFile : referenceFiles) { for (Path referenceFile : referenceFiles) {
FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile); FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile);
if (status.getModificationTime() < timestamp) { if (status.getModificationTime() < ts) {
LOG.info("Including store: " + family + " for compaction for region: " + fileSystem LOG.info("Including store: " + family + " for compaction for region: " + fileSystem
.getRegionInfo().getEncodedName() + " (reference store files)"); .getRegionInfo().getEncodedName() + " (reference store files)");
return true; return true;

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.compaction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This request helps determine if a region has to be compacted based on table's TTL.
*/
@InterfaceAudience.Private
public class MajorCompactionTTLRequest extends MajorCompactionRequest {
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionTTLRequest.class);
MajorCompactionTTLRequest(Configuration conf, HRegionInfo region) {
super(conf, region);
}
static Optional<MajorCompactionRequest> newRequest(Configuration conf, HRegionInfo info,
HTableDescriptor htd) throws IOException {
MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(conf, info);
return request.createRequest(conf, htd);
}
@VisibleForTesting
private Optional<MajorCompactionRequest> createRequest(Configuration conf, HTableDescriptor htd)
throws IOException {
Map<String, Long> familiesToCompact = getStoresRequiringCompaction(htd);
MajorCompactionRequest request = null;
if (!familiesToCompact.isEmpty()) {
LOG.debug("Compaction families for region: " + region + " CF: " + familiesToCompact.keySet());
request = new MajorCompactionTTLRequest(conf, region);
}
return Optional.fromNullable(request);
}
Map<String, Long> getStoresRequiringCompaction(HTableDescriptor htd) throws IOException {
try(Connection connection = getConnection(configuration)) {
HRegionFileSystem fileSystem = getFileSystem(connection);
Map<String, Long> familyTTLMap = Maps.newHashMap();
for (HColumnDescriptor descriptor : htd.getColumnFamilies()) {
long ts = getColFamilyCutoffTime(descriptor);
// If the table's TTL is forever, lets not compact any of the regions.
if (ts > 0 && shouldCFBeCompacted(fileSystem, descriptor.getNameAsString(), ts)) {
familyTTLMap.put(descriptor.getNameAsString(), ts);
}
}
return familyTTLMap;
}
}
// If the CF has no TTL, return -1, else return the current time - TTL.
private long getColFamilyCutoffTime(HColumnDescriptor colDesc) {
if (colDesc.getTimeToLive() == HConstants.FOREVER) {
return -1;
}
return System.currentTimeMillis() - (colDesc.getTimeToLive() * 1000L);
}
@Override
protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family,
Collection<StoreFileInfo> storeFiles, long ts) throws IOException {
for (StoreFileInfo storeFile : storeFiles) {
// Lets only compact when all files are older than TTL
if (storeFile.getModificationTime() >= ts) {
LOG.info("There is atleast one file in store: " + family + " file: " + storeFile.getPath()
+ " with timestamp " + storeFile.getModificationTime()
+ " for region: " + fileSystem.getRegionInfo().getEncodedName()
+ " older than TTL: " + ts);
return false;
}
}
return true;
}
}

View File

@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.util.compaction;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -31,6 +34,7 @@ import com.google.common.base.Predicate;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
@ -40,10 +44,12 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
@ -54,22 +60,30 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
public class MajorCompactor { public class MajorCompactor extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class); private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
private static final Set<MajorCompactionRequest> ERRORS = Sets.newHashSet(); protected static final Set<MajorCompactionRequest> ERRORS = Sets.newHashSet();
private final ClusterCompactionQueues clusterCompactionQueues; protected ClusterCompactionQueues clusterCompactionQueues;
private final long timestamp; private long timestamp;
private final Set<String> storesToCompact; protected Set<String> storesToCompact;
private final ExecutorService executor; protected ExecutorService executor;
private final long sleepForMs; protected long sleepForMs;
private final Connection connection; protected Connection connection;
private final TableName tableName; protected TableName tableName;
private int numServers = -1;
private int numRegions = -1;
private boolean skipWait = false;
MajorCompactor() {
}
public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact, public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact,
int concurrency, long timestamp, long sleepForMs) throws IOException { int concurrency, long timestamp, long sleepForMs) throws IOException {
@ -157,16 +171,83 @@ public class MajorCompactor {
} }
LOG.info( LOG.info(
"Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact); "Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact);
Map<ServerName, List<HRegionInfo>> snRegionMap = getServerRegionsMap();
/*
* If numservers is specified, stop inspecting regions beyond the numservers, it will serve
* to throttle and won't end up scanning all the regions in the event there are not many
* regions to compact based on the criteria.
*/
for (ServerName sn : getServersToCompact(snRegionMap.keySet())) {
List<HRegionInfo> regions = snRegionMap.get(sn);
LOG.debug("Table: " + tableName + " Server: " + sn + " No of regions: " + regions.size());
/*
* If the tool is run periodically, then we could shuffle the regions and provide
* some random order to select regions. Helps if numregions is specified.
*/
Collections.shuffle(regions);
int regionsToCompact = numRegions;
for (HRegionInfo hri : regions) {
if (numRegions > 0 && regionsToCompact <= 0) {
LOG.debug("Reached region limit for server: " + sn);
break;
}
Optional<MajorCompactionRequest> request = getMajorCompactionRequest(hri);
if (request.isPresent()) {
LOG.debug("Adding region " + hri + " to queue " + sn + " for compaction");
clusterCompactionQueues.addToCompactionQueue(sn, request.get());
if (numRegions > 0) {
regionsToCompact--;
}
}
}
}
}
protected Optional<MajorCompactionRequest> getMajorCompactionRequest(HRegionInfo hri)
throws IOException {
return MajorCompactionRequest.newRequest(connection.getConfiguration(), hri, storesToCompact,
timestamp);
}
private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) {
if(numServers < 0 || snSet.size() <= numServers) {
return snSet;
} else {
List<ServerName> snList = Lists.newArrayList(snSet);
Collections.shuffle(snList);
return snList.subList(0, numServers);
}
}
private Map<ServerName, List<HRegionInfo>> getServerRegionsMap() throws IOException {
Map<ServerName, List<HRegionInfo>> snRegionMap = Maps.newHashMap();
List<HRegionLocation> regionLocations = List<HRegionLocation> regionLocations =
connection.getRegionLocator(tableName).getAllRegionLocations(); connection.getRegionLocator(tableName).getAllRegionLocations();
for (HRegionLocation location : regionLocations) { for (HRegionLocation regionLocation : regionLocations) {
Optional<MajorCompactionRequest> request = MajorCompactionRequest ServerName sn = regionLocation.getServerName();
.newRequest(connection.getConfiguration(), location.getRegionInfo(), storesToCompact, HRegionInfo hri = regionLocation.getRegionInfo();
timestamp); if (!snRegionMap.containsKey(sn)) {
if (request.isPresent()) { snRegionMap.put(sn, Lists.<HRegionInfo>newArrayList());
clusterCompactionQueues.addToCompactionQueue(location.getServerName(), request.get());
} }
snRegionMap.get(sn).add(hri);
} }
return snRegionMap;
}
public void setNumServers(int numServers) {
this.numServers = numServers;
}
public void setNumRegions(int numRegions) {
this.numRegions = numRegions;
}
public void setSkipWait(boolean skipWait) {
this.skipWait = skipWait;
} }
class Compact implements Runnable { class Compact implements Runnable {
@ -199,29 +280,36 @@ public class MajorCompactor {
try { try {
// only make the request if the region is not already major compacting // only make the request if the region is not already major compacting
if (!isCompacting(request)) { if (!isCompacting(request)) {
Set<String> stores = request.getStoresRequiringCompaction(storesToCompact); Set<String> stores = getStoresRequiringCompaction(request);
if (!stores.isEmpty()) { if (!stores.isEmpty()) {
request.setStores(stores); request.setStores(stores);
for (String store : request.getStores()) { for (String store : request.getStores()) {
admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), compactRegionOnServer(request, admin, store);
Bytes.toBytes(store));
} }
} }
} }
/*
* In some scenarios like compacting TTLed regions, the compaction itself won't take time
* and hence we can skip the wait. An external tool will also be triggered frequently and
* the next run can identify region movements and compact them.
*/
if (!skipWait) {
while (isCompacting(request)) { while (isCompacting(request)) {
Thread.sleep(sleepForMs); Thread.sleep(sleepForMs);
LOG.debug("Waiting for compaction to complete for region: " + request.getRegion() LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
.getEncodedName()); .getEncodedName());
} }
}
} finally { } finally {
if (!skipWait) {
// Make sure to wait for the CompactedFileDischarger chore to do its work // Make sure to wait for the CompactedFileDischarger chore to do its work
int waitForArchive = connection.getConfiguration() int waitForArchive = connection.getConfiguration()
.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
Thread.sleep(waitForArchive); Thread.sleep(waitForArchive);
// check if compaction completed successfully, otherwise put that request back in the // check if compaction completed successfully, otherwise put that request back in the
// proper queue // proper queue
Set<String> storesRequiringCompaction = Set<String> storesRequiringCompaction = getStoresRequiringCompaction(request);
request.getStoresRequiringCompaction(storesToCompact);
if (!storesRequiringCompaction.isEmpty()) { if (!storesRequiringCompaction.isEmpty()) {
// this happens, when a region server is marked as dead, flushes a store file and // this happens, when a region server is marked as dead, flushes a store file and
// the new regionserver doesn't pick it up because its accounted for in the WAL replay, // the new regionserver doesn't pick it up because its accounted for in the WAL replay,
@ -230,8 +318,10 @@ public class MajorCompactor {
.getRegionLocation(request.getRegion().getStartKey()).getServerName() .getRegionLocation(request.getRegion().getStartKey()).getServerName()
.equals(serverName); .equals(serverName);
if (regionHasNotMoved) { if (regionHasNotMoved) {
LOG.error("Not all store files were compacted, this may be due to the regionserver not " LOG.error(
+ "being aware of all store files. Will not reattempt compacting, " + request); "Not all store files were compacted, this may be due to the regionserver not "
+ "being aware of all store files. Will not reattempt compacting, "
+ request);
ERRORS.add(request); ERRORS.add(request);
} else { } else {
request.setStores(storesRequiringCompaction); request.setStores(storesRequiringCompaction);
@ -247,6 +337,13 @@ public class MajorCompactor {
} }
} }
private void compactRegionOnServer(MajorCompactionRequest request, Admin admin, String store)
throws IOException {
admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
Bytes.toBytes(store));
}
}
private boolean isCompacting(MajorCompactionRequest request) throws Exception { private boolean isCompacting(MajorCompactionRequest request) throws Exception {
AdminProtos.GetRegionInfoResponse.CompactionState compactionState = connection.getAdmin() AdminProtos.GetRegionInfoResponse.CompactionState compactionState = connection.getAdmin()
.getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes()); .getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes());
@ -261,9 +358,8 @@ public class MajorCompactor {
connection.getRegionLocator(tableName).getAllRegionLocations(); connection.getRegionLocator(tableName).getAllRegionLocations();
for (HRegionLocation location : locations) { for (HRegionLocation location : locations) {
if (location.getRegionInfo().getRegionId() > timestamp) { if (location.getRegionInfo().getRegionId() > timestamp) {
Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest Optional<MajorCompactionRequest> compactionRequest =
.newRequest(connection.getConfiguration(), location.getRegionInfo(), storesToCompact, getMajorCompactionRequest(location.getRegionInfo());
timestamp);
if (compactionRequest.isPresent()) { if (compactionRequest.isPresent()) {
clusterCompactionQueues clusterCompactionQueues
.addToCompactionQueue(location.getServerName(), compactionRequest.get()); .addToCompactionQueue(location.getServerName(), compactionRequest.get());
@ -275,8 +371,44 @@ public class MajorCompactor {
} }
} }
public static void main(String[] args) throws Exception { protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request)
throws IOException {
return request.getStoresRequiringCompaction(storesToCompact, timestamp);
}
protected Options getCommonOptions() {
Options options = new Options(); Options options = new Options();
Option serverOption = new Option("servers", true, "Concurrent servers compacting");
serverOption.setRequired(true);
options.addOption(serverOption);
Option sleepOption = new Option("sleep", true, "Time to sleepForMs (ms) for checking "
+ "compaction status per region and available "
+ "work queues: default 30s");
options.addOption(sleepOption);
Option retryOption = new Option("retries", true, "Max # of retries for a compaction request,"
+ " defaults to 3");
options.addOption(retryOption);
options.addOption(new Option("dryRun", false, "Dry run, will just output a list of regions"
+ " that require compaction based on parameters passed"));
options.addOption(new Option("skipWait", false, "Skip waiting after triggering compaction."));
Option numServersOption = new Option("numservers", true, "Number of servers to compact in "
+ "this run, defaults to all");
options.addOption(numServersOption);
Option numRegionsOption = new Option("numregions", true, "Number of regions to compact per"
+ "server, defaults to all");
options.addOption(numRegionsOption);
return options;
}
@Override
public int run(String[] args) throws Exception {
Options options = getCommonOptions();
Option tableOption = new Option("table", true, "table name"); Option tableOption = new Option("table", true, "table name");
tableOption.setRequired(true); tableOption.setRequired(true);
options.addOption(tableOption); options.addOption(tableOption);
@ -285,10 +417,6 @@ public class MajorCompactor {
cfOption.setOptionalArg(true); cfOption.setOptionalArg(true);
options.addOption(cfOption); options.addOption(cfOption);
Option serverOption = new Option("servers", true, "Concurrent servers compacting");
serverOption.setRequired(true);
options.addOption(serverOption);
options.addOption(new Option("minModTime", true, options.addOption(new Option("minModTime", true,
"Compact if store files have modification time < minModTime")); "Compact if store files have modification time < minModTime"));
@ -300,25 +428,6 @@ public class MajorCompactor {
rootDirOption.setOptionalArg(true); rootDirOption.setOptionalArg(true);
options.addOption(rootDirOption); options.addOption(rootDirOption);
Option sleepOption = new Option("sleep", true, "Time to sleepForMs (ms) for checking "
+ "compaction status per region and available "
+ "work queues: default 30s");
options.addOption(sleepOption);
Option retryOption = new Option("retries", true, "Max # of retries for a compaction request,"
+ " defaults to 3");
options.addOption(
retryOption
);
options.addOption(
new Option(
"dryRun",
false,
"Dry run, will just output a list of regions that require compaction "
+ "based on parameters passed")
);
final CommandLineParser cmdLineParser = new BasicParser(); final CommandLineParser cmdLineParser = new BasicParser();
CommandLine commandLine = null; CommandLine commandLine = null;
try { try {
@ -338,7 +447,6 @@ public class MajorCompactor {
Iterables.addAll(families, Splitter.on(",").split(cf)); Iterables.addAll(families, Splitter.on(",").split(cf));
} }
Configuration configuration = HBaseConfiguration.create(); Configuration configuration = HBaseConfiguration.create();
int concurrency = Integer.parseInt(commandLine.getOptionValue("servers")); int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
long minModTime = Long.parseLong( long minModTime = Long.parseLong(
@ -348,25 +456,35 @@ public class MajorCompactor {
String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR)); String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
long sleep = Long.parseLong(commandLine.getOptionValue("sleep", "30000")); long sleep = Long.parseLong(commandLine.getOptionValue("sleep", "30000"));
int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
configuration.set(HConstants.HBASE_DIR, rootDir); configuration.set(HConstants.HBASE_DIR, rootDir);
configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum); configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
MajorCompactor compactor = MajorCompactor compactor =
new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency, new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency,
minModTime, sleep); minModTime, sleep);
compactor.setNumServers(numServers);
compactor.setNumRegions(numRegions);
compactor.setSkipWait(commandLine.hasOption("skipWait"));
compactor.initializeWorkQueues(); compactor.initializeWorkQueues();
if (!commandLine.hasOption("dryRun")) { if (!commandLine.hasOption("dryRun")) {
compactor.compactAllRegions(); compactor.compactAllRegions();
} }
compactor.shutdown(); compactor.shutdown();
return ERRORS.size();
} }
private static void printUsage(final Options options) { protected static void printUsage(final Options options) {
String header = "\nUsage instructions\n\n"; String header = "\nUsage instructions\n\n";
String footer = "\n"; String footer = "\n";
HelpFormatter formatter = new HelpFormatter(); HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true); formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
} }
public static void main(String[] args) throws Exception {
ToolRunner.run(HBaseConfiguration.create(), new MajorCompactor(), args);
}
} }

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.compaction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.Executors;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This tool compacts a table's regions that are beyond it's TTL. It helps to save disk space and
* regions become empty as a result of compaction.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
public class MajorCompactorTTL extends MajorCompactor {
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactorTTL .class);
private HTableDescriptor htd;
@VisibleForTesting
public MajorCompactorTTL(Configuration conf, HTableDescriptor htd, int concurrency,
long sleepForMs) throws IOException {
this.connection = ConnectionFactory.createConnection(conf);
this.htd = htd;
this.tableName = htd.getTableName();
this.storesToCompact = Sets.newHashSet(); // Empty set so all stores will be compacted
this.sleepForMs = sleepForMs;
this.executor = Executors.newFixedThreadPool(concurrency);
this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
}
protected MajorCompactorTTL() {
super();
}
@Override
protected Optional<MajorCompactionRequest> getMajorCompactionRequest(HRegionInfo hri)
throws IOException {
return MajorCompactionTTLRequest.newRequest(connection.getConfiguration(), hri, htd);
}
@Override
protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request)
throws IOException {
return ((MajorCompactionTTLRequest)request).getStoresRequiringCompaction(htd).keySet();
}
public int compactRegionsTTLOnTable(Configuration conf, String table, int concurrency,
long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait)
throws Exception {
Connection conn = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf(table);
HTableDescriptor htd = conn.getAdmin().getTableDescriptor(tableName);
if (!doesAnyColFamilyHaveTTL(htd)) {
LOG.info("No TTL present for CF of table: " + tableName + ", skipping compaction");
return 0;
}
LOG.info("Major compacting table " + tableName + " based on TTL");
MajorCompactor compactor = new MajorCompactorTTL(conf, htd, concurrency, sleep);
compactor.setNumServers(numServers);
compactor.setNumRegions(numRegions);
compactor.setSkipWait(skipWait);
compactor.initializeWorkQueues();
if (!dryRun) {
compactor.compactAllRegions();
}
compactor.shutdown();
return ERRORS.size();
}
private boolean doesAnyColFamilyHaveTTL(HTableDescriptor htd) {
for (HColumnDescriptor descriptor : htd.getColumnFamilies()) {
if (descriptor.getTimeToLive() != HConstants.FOREVER) {
return true;
}
}
return false;
}
private Options getOptions() {
Options options = getCommonOptions();
Option tableOption = new Option("table", true, "Table to be compacted");
tableOption.setRequired(true);
options.addOption(tableOption);
return options;
}
@Override
public int run(String[] args) throws Exception {
Options options = getOptions();
final CommandLineParser cmdLineParser = new BasicParser();
CommandLine commandLine;
try {
commandLine = cmdLineParser.parse(options, args);
} catch (ParseException parseException) {
System.err.println("ERROR: Unable to parse command-line arguments " + Arrays.toString(args)
+ " due to: " + parseException);
printUsage(options);
throw parseException;
}
String table = commandLine.getOptionValue("table");
int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1"));
long sleep = Long.parseLong(commandLine.getOptionValue("sleep", "30000"));
boolean dryRun = commandLine.hasOption("dryRun");
boolean skipWait = commandLine.hasOption("skipWait");
return compactRegionsTTLOnTable(HBaseConfiguration.create(), table, concurrency, sleep,
numServers, numRegions, dryRun, skipWait);
}
public static void main(String[] args) throws Exception {
ToolRunner.run(HBaseConfiguration.create(), new MajorCompactorTTL(), args);
}
}

View File

@ -59,10 +59,10 @@ import static org.mockito.Mockito.when;
@Category({SmallTests.class}) @Category({SmallTests.class})
public class MajorCompactionRequestTest { public class MajorCompactionRequestTest {
private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility(); protected static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
private static final String FAMILY = "a"; protected static final String FAMILY = "a";
private Path rootRegionDir; protected Path rootRegionDir;
private Path regionStoreDir; protected Path regionStoreDir;
@Before public void setUp() throws Exception { @Before public void setUp() throws Exception {
rootRegionDir = UTILITY.getDataTestDirOnTestFS("MajorCompactionRequestTest"); rootRegionDir = UTILITY.getDataTestDirOnTestFS("MajorCompactionRequestTest");
@ -72,15 +72,15 @@ public class MajorCompactionRequestTest {
@Test public void testStoresNeedingCompaction() throws Exception { @Test public void testStoresNeedingCompaction() throws Exception {
// store files older than timestamp // store files older than timestamp
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10); List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
MajorCompactionRequest request = makeMockRequest(100, storeFiles, false); MajorCompactionRequest request = makeMockRequest(storeFiles, false);
Optional<MajorCompactionRequest> result = Optional<MajorCompactionRequest> result =
request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY)); request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
assertTrue(result.isPresent()); assertTrue(result.isPresent());
// store files newer than timestamp // store files newer than timestamp
storeFiles = mockStoreFiles(regionStoreDir, 5, 101); storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
request = makeMockRequest(100, storeFiles, false); request = makeMockRequest(storeFiles, false);
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY)); result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
assertFalse(result.isPresent()); assertFalse(result.isPresent());
} }
@ -106,12 +106,13 @@ public class MajorCompactionRequestTest {
HRegionFileSystem fileSystem = HRegionFileSystem fileSystem =
mockFileSystem(region.getRegionInfo(), true, storeFiles, 50); mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration, MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration,
region.getRegionInfo(), Sets.newHashSet(FAMILY), 100)); region.getRegionInfo()));
doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration)); doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class), doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
any(Path.class)); any(Path.class));
doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class)); doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class));
Set<String> result = majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a")); Set<String> result = majorCompactionRequest
.getStoresRequiringCompaction(Sets.newHashSet("a"), 100);
assertEquals(FAMILY, Iterables.getOnlyElement(result)); assertEquals(FAMILY, Iterables.getOnlyElement(result));
} }
@ -143,7 +144,7 @@ public class MajorCompactionRequestTest {
return mockSystem; return mockSystem;
} }
private List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp) List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp)
throws IOException { throws IOException {
List<StoreFileInfo> infos = Lists.newArrayList(); List<StoreFileInfo> infos = Lists.newArrayList();
int i = 0; int i = 0;
@ -158,14 +159,14 @@ public class MajorCompactionRequestTest {
return infos; return infos;
} }
private MajorCompactionRequest makeMockRequest(long timestamp, List<StoreFileInfo> storeFiles, private MajorCompactionRequest makeMockRequest(List<StoreFileInfo> storeFiles,
boolean references) throws IOException { boolean references) throws IOException {
Configuration configuration = mock(Configuration.class); Configuration configuration = mock(Configuration.class);
HRegionInfo regionInfo = mock(HRegionInfo.class); HRegionInfo regionInfo = mock(HRegionInfo.class);
when(regionInfo.getEncodedName()).thenReturn("HBase"); when(regionInfo.getEncodedName()).thenReturn("HBase");
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo")); when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
MajorCompactionRequest request = MajorCompactionRequest request =
new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"), timestamp); new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"));
MajorCompactionRequest spy = spy(request); MajorCompactionRequest spy = spy(request);
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles); HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles);
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class)); doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -40,7 +41,8 @@ import org.junit.experimental.categories.Category;
public class MajorCompactorTest { public class MajorCompactorTest {
public static final byte[] FAMILY = Bytes.toBytes("a"); public static final byte[] FAMILY = Bytes.toBytes("a");
private HBaseTestingUtility utility; protected HBaseTestingUtility utility;
protected HBaseAdmin admin;
@Before public void setUp() throws Exception { @Before public void setUp() throws Exception {
utility = new HBaseTestingUtility(); utility = new HBaseTestingUtility();
@ -82,7 +84,7 @@ public class MajorCompactorTest {
assertEquals(numHFiles, numberOfRegions); assertEquals(numHFiles, numberOfRegions);
} }
private void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) protected void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
throws IOException { throws IOException {
Random r = new Random(); Random r = new Random();
byte[] row = new byte[rowSize]; byte[] row = new byte[rowSize];

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.compaction;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.List;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({SmallTests.class})
public class TestMajorCompactionTTLRequest {
private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
private static final String FAMILY = "a";
private Path regionStoreDir;
@Before
public void setUp() throws Exception {
Path rootRegionDir = UTILITY.getDataTestDirOnTestFS("TestMajorCompactionTTLRequest");
regionStoreDir = new Path(rootRegionDir, FAMILY);
}
@Test
public void testStoresNeedingCompaction() throws Exception {
// store files older than timestamp 10
List<StoreFileInfo> storeFiles1 = mockStoreFiles(regionStoreDir, 5, 10);
// store files older than timestamp 100
List<StoreFileInfo> storeFiles2 = mockStoreFiles(regionStoreDir, 5, 100);
List<StoreFileInfo> storeFiles = Lists.newArrayList(storeFiles1);
storeFiles.addAll(storeFiles2);
MajorCompactionTTLRequest request = makeMockRequest(storeFiles);
// All files are <= 100, so region should not be compacted.
Optional<MajorCompactionRequest> result =
request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 10);
assertFalse(result.isPresent());
// All files are <= 100, so region should not be compacted yet.
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
assertFalse(result.isPresent());
// All files are <= 100, so they should be considered for compaction
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 101);
assertTrue(result.isPresent());
}
private MajorCompactionTTLRequest makeMockRequest(List<StoreFileInfo> storeFiles)
throws IOException {
Configuration configuration = mock(Configuration.class);
HRegionInfo regionInfo = mock(HRegionInfo.class);
when(regionInfo.getEncodedName()).thenReturn("HBase");
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(configuration, regionInfo);
MajorCompactionTTLRequest spy = spy(request);
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, false, storeFiles);
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
return spy;
}
private HRegionFileSystem mockFileSystem(HRegionInfo info, boolean hasReferenceFiles,
List<StoreFileInfo> storeFiles) throws IOException {
Optional<StoreFileInfo> found = Optional.absent();
for (StoreFileInfo storeFile : storeFiles) {
found = Optional.of(storeFile);
break;
}
long timestamp = found.get().getModificationTime();
return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp);
}
private List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp)
throws IOException {
List<StoreFileInfo> infos = Lists.newArrayList();
int i = 0;
while (i < howMany) {
StoreFileInfo storeFileInfo = mock(StoreFileInfo.class);
doReturn(timestamp).doReturn(timestamp).when(storeFileInfo).getModificationTime();
doReturn(new Path(regionStoreDir, RandomStringUtils.randomAlphabetic(10))).when(storeFileInfo)
.getPath();
infos.add(storeFileInfo);
i++;
}
return infos;
}
private HRegionFileSystem mockFileSystem(HRegionInfo info, boolean hasReferenceFiles,
List<StoreFileInfo> storeFiles, long referenceFileTimestamp) throws IOException {
FileSystem fileSystem = mock(FileSystem.class);
if (hasReferenceFiles) {
FileStatus fileStatus = mock(FileStatus.class);
doReturn(referenceFileTimestamp).when(fileStatus).getModificationTime();
doReturn(fileStatus).when(fileSystem).getFileLinkStatus(isA(Path.class));
}
HRegionFileSystem mockSystem = mock(HRegionFileSystem.class);
doReturn(info).when(mockSystem).getRegionInfo();
doReturn(regionStoreDir).when(mockSystem).getStoreDir(FAMILY);
doReturn(hasReferenceFiles).when(mockSystem).hasReferences(anyString());
doReturn(storeFiles).when(mockSystem).getStoreFiles(anyString());
doReturn(fileSystem).when(mockSystem).getFileSystem();
return mockSystem;
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.compaction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category({ MiscTests.class, MediumTests.class })
public class TestMajorCompactorTTL extends MajorCompactorTest {
@Rule
public TestName name = new TestName();
@Before
public void setUp() throws Exception {
utility = new HBaseTestingUtility();
utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10);
utility.startMiniCluster();
admin = utility.getHBaseAdmin();
}
@After
public void tearDown() throws Exception {
utility.shutdownMiniCluster();
}
@Test
public void testCompactingATable() throws Exception {
TableName tableName = createTable(name.getMethodName());
// Delay a bit, so we can set the table TTL to 5 seconds
Thread.sleep(10 * 1000);
int numberOfRegions = utility.getHBaseAdmin().getTableRegions(tableName).size();
int numHFiles = utility.getNumHFiles(tableName, FAMILY);
// we should have a table with more store files than we would before we major compacted.
assertTrue(numberOfRegions < numHFiles);
modifyTTL(tableName);
MajorCompactorTTL compactor = new MajorCompactorTTL(utility.getConfiguration(),
admin.getTableDescriptor(tableName), 1, 200);
compactor.initializeWorkQueues();
compactor.compactAllRegions();
compactor.shutdown();
// verify that the store has been completely major compacted.
numberOfRegions = utility.getHBaseAdmin().getTableRegions(tableName).size();
numHFiles = utility.getNumHFiles(tableName, FAMILY);
assertEquals(numberOfRegions, numHFiles);
}
protected void modifyTTL(TableName tableName) throws IOException, InterruptedException {
// Set the TTL to 5 secs, so all the files just written above will get cleaned up on compact.
admin.disableTable(tableName);
utility.waitTableDisabled(tableName.getName());
HTableDescriptor descriptor = admin.getTableDescriptor(tableName);
HColumnDescriptor colDesc = descriptor.getFamily(FAMILY);
colDesc.setTimeToLive(5);
admin.modifyColumn(tableName, colDesc);
admin.enableTable(tableName);
utility.waitTableEnabled(tableName);
}
protected TableName createTable(String name) throws IOException, InterruptedException {
TableName tableName = TableName.valueOf(name);
utility.createMultiRegionTable(tableName, FAMILY, 5);
utility.waitTableAvailable(tableName);
Connection connection = utility.getConnection();
Table table = connection.getTable(tableName);
// write data and flush multiple store files:
for (int i = 0; i < 5; i++) {
loadRandomRows(table, FAMILY, 50, 100);
utility.flush(tableName);
}
table.close();
return tableName;
}
}