HBASE-21883 Enhancements to Major Compaction tool
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
2c7fdb39ce
commit
32250e55ba
|
@ -0,0 +1,131 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.rsgroup;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.util.compaction.MajorCompactorTTL;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This script takes an rsgroup as argument and compacts part/all of regions of that table
|
||||||
|
* based on the table's TTL.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
|
||||||
|
public class RSGroupMajorCompactionTTL extends MajorCompactorTTL {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(RSGroupMajorCompactionTTL.class);
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
RSGroupMajorCompactionTTL() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int compactTTLRegionsOnGroup(Configuration conf, String rsgroup, int concurrency,
|
||||||
|
long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
Connection conn = ConnectionFactory.createConnection(conf);
|
||||||
|
RSGroupAdmin rsGroupAdmin = new RSGroupAdminClient(conn);
|
||||||
|
|
||||||
|
RSGroupInfo rsGroupInfo = rsGroupAdmin.getRSGroupInfo(rsgroup);
|
||||||
|
if (rsGroupInfo == null) {
|
||||||
|
LOG.error("Invalid rsgroup specified: " + rsgroup);
|
||||||
|
throw new IllegalArgumentException("Invalid rsgroup specified: " + rsgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (TableName tableName : rsGroupInfo.getTables()) {
|
||||||
|
int status = compactRegionsTTLOnTable(conf, tableName.getNameAsString(), concurrency, sleep,
|
||||||
|
numServers, numRegions, dryRun, skipWait);
|
||||||
|
if (status != 0) {
|
||||||
|
LOG.error("Failed to compact table: " + tableName);
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Options getOptions() {
|
||||||
|
Options options = getCommonOptions();
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Option.builder("rsgroup")
|
||||||
|
.required()
|
||||||
|
.desc("Tables of rsgroup to be compacted")
|
||||||
|
.hasArg()
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
Options options = getOptions();
|
||||||
|
|
||||||
|
final CommandLineParser cmdLineParser = new DefaultParser();
|
||||||
|
CommandLine commandLine;
|
||||||
|
try {
|
||||||
|
commandLine = cmdLineParser.parse(options, args);
|
||||||
|
} catch (ParseException parseException) {
|
||||||
|
System.out.println(
|
||||||
|
"ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
|
||||||
|
+ parseException);
|
||||||
|
printUsage(options);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (commandLine == null) {
|
||||||
|
System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
|
||||||
|
printUsage(options);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
String rsgroup = commandLine.getOptionValue("rsgroup");
|
||||||
|
int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
|
||||||
|
int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
|
||||||
|
int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1"));
|
||||||
|
long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
|
||||||
|
boolean dryRun = commandLine.hasOption("dryRun");
|
||||||
|
boolean skipWait = commandLine.hasOption("skipWait");
|
||||||
|
Configuration conf = getConf();
|
||||||
|
|
||||||
|
return compactTTLRegionsOnGroup(conf, rsgroup, concurrency, sleep, numServers, numRegions,
|
||||||
|
dryRun, skipWait);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
ToolRunner.run(HBaseConfiguration.create(), new RSGroupMajorCompactionTTL(), args);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,106 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.rsgroup;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
|
import org.apache.hadoop.hbase.util.compaction.TestMajorCompactorTTL;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
public class TestRSGroupMajorCompactionTTL extends TestMajorCompactorTTL {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestRSGroupMajorCompactionTTL.class);
|
||||||
|
|
||||||
|
private final static int NUM_SLAVES_BASE = 6;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
utility = new HBaseTestingUtility();
|
||||||
|
Configuration conf = utility.getConfiguration();
|
||||||
|
conf.set(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, RSGroupBasedLoadBalancer.class.getName());
|
||||||
|
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, RSGroupAdminEndpoint.class.getName());
|
||||||
|
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_SLAVES_BASE);
|
||||||
|
conf.setInt("hbase.hfile.compaction.discharger.interval", 10);
|
||||||
|
utility.startMiniCluster(NUM_SLAVES_BASE);
|
||||||
|
MiniHBaseCluster cluster = utility.getHBaseCluster();
|
||||||
|
final HMaster master = cluster.getMaster();
|
||||||
|
|
||||||
|
//wait for balancer to come online
|
||||||
|
utility.waitFor(60000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() {
|
||||||
|
return master.isInitialized() &&
|
||||||
|
((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
admin = utility.getAdmin();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
@Override
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
utility.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompactingTables() throws Exception {
|
||||||
|
List<TableName> tableNames = Lists.newArrayList();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
tableNames.add(createTable(name.getMethodName() + "___" + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delay a bit, so we can set the table TTL to 5 seconds
|
||||||
|
Thread.sleep(10 * 1000);
|
||||||
|
|
||||||
|
for (TableName tableName : tableNames) {
|
||||||
|
int numberOfRegions = admin.getRegions(tableName).size();
|
||||||
|
int numHFiles = utility.getNumHFiles(tableName, FAMILY);
|
||||||
|
// we should have a table with more store files than we would before we major compacted.
|
||||||
|
assertTrue(numberOfRegions < numHFiles);
|
||||||
|
modifyTTL(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
RSGroupMajorCompactionTTL compactor = new RSGroupMajorCompactionTTL();
|
||||||
|
compactor.compactTTLRegionsOnGroup(utility.getConfiguration(),
|
||||||
|
RSGroupInfo.DEFAULT_GROUP, 1, 200, -1, -1, false, false);
|
||||||
|
|
||||||
|
for (TableName tableName : tableNames) {
|
||||||
|
int numberOfRegions = admin.getRegions(tableName).size();
|
||||||
|
int numHFiles = utility.getNumHFiles(tableName, FAMILY);
|
||||||
|
assertEquals(numberOfRegions, numHFiles);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -44,25 +44,27 @@ class MajorCompactionRequest {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class);
|
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class);
|
||||||
|
|
||||||
private final Configuration configuration;
|
protected final Configuration configuration;
|
||||||
private final RegionInfo region;
|
protected final RegionInfo region;
|
||||||
private Set<String> stores;
|
private Set<String> stores;
|
||||||
private final long timestamp;
|
|
||||||
|
MajorCompactionRequest(Configuration configuration, RegionInfo region) {
|
||||||
|
this.configuration = configuration;
|
||||||
|
this.region = region;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
MajorCompactionRequest(Configuration configuration, RegionInfo region,
|
MajorCompactionRequest(Configuration configuration, RegionInfo region,
|
||||||
Set<String> stores, long timestamp) {
|
Set<String> stores) {
|
||||||
this.configuration = configuration;
|
this(configuration, region);
|
||||||
this.region = region;
|
|
||||||
this.stores = stores;
|
this.stores = stores;
|
||||||
this.timestamp = timestamp;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static Optional<MajorCompactionRequest> newRequest(Configuration configuration, RegionInfo info,
|
static Optional<MajorCompactionRequest> newRequest(Configuration configuration, RegionInfo info,
|
||||||
Set<String> stores, long timestamp) throws IOException {
|
Set<String> stores, long timestamp) throws IOException {
|
||||||
MajorCompactionRequest request =
|
MajorCompactionRequest request =
|
||||||
new MajorCompactionRequest(configuration, info, stores, timestamp);
|
new MajorCompactionRequest(configuration, info, stores);
|
||||||
return request.createRequest(configuration, stores);
|
return request.createRequest(configuration, stores, timestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
RegionInfo getRegion() {
|
RegionInfo getRegion() {
|
||||||
|
@ -79,53 +81,66 @@ class MajorCompactionRequest {
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Optional<MajorCompactionRequest> createRequest(Configuration configuration,
|
Optional<MajorCompactionRequest> createRequest(Configuration configuration,
|
||||||
Set<String> stores) throws IOException {
|
Set<String> stores, long timestamp) throws IOException {
|
||||||
Set<String> familiesToCompact = getStoresRequiringCompaction(stores);
|
Set<String> familiesToCompact = getStoresRequiringCompaction(stores, timestamp);
|
||||||
MajorCompactionRequest request = null;
|
MajorCompactionRequest request = null;
|
||||||
if (!familiesToCompact.isEmpty()) {
|
if (!familiesToCompact.isEmpty()) {
|
||||||
request = new MajorCompactionRequest(configuration, region, familiesToCompact, timestamp);
|
request = new MajorCompactionRequest(configuration, region, familiesToCompact);
|
||||||
}
|
}
|
||||||
return Optional.ofNullable(request);
|
return Optional.ofNullable(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
Set<String> getStoresRequiringCompaction(Set<String> requestedStores) throws IOException {
|
Set<String> getStoresRequiringCompaction(Set<String> requestedStores, long timestamp)
|
||||||
|
throws IOException {
|
||||||
try(Connection connection = getConnection(configuration)) {
|
try(Connection connection = getConnection(configuration)) {
|
||||||
HRegionFileSystem fileSystem = getFileSystem(connection);
|
HRegionFileSystem fileSystem = getFileSystem(connection);
|
||||||
Set<String> familiesToCompact = Sets.newHashSet();
|
Set<String> familiesToCompact = Sets.newHashSet();
|
||||||
for (String family : requestedStores) {
|
for (String family : requestedStores) {
|
||||||
|
if (shouldCFBeCompacted(fileSystem, family, timestamp)) {
|
||||||
|
familiesToCompact.add(family);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return familiesToCompact;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
// do we have any store files?
|
// do we have any store files?
|
||||||
Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
|
Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
|
||||||
if (storeFiles == null) {
|
if (storeFiles == null) {
|
||||||
LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
|
LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
|
||||||
.getRegionInfo().getEncodedName(), " has no store files");
|
.getRegionInfo().getEncodedName(), " has no store files");
|
||||||
continue;
|
return false;
|
||||||
}
|
}
|
||||||
// check for reference files
|
// check for reference files
|
||||||
if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family)) {
|
if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family, ts)) {
|
||||||
familiesToCompact.add(family);
|
|
||||||
LOG.info("Including store: " + family + " with: " + storeFiles.size()
|
LOG.info("Including store: " + family + " with: " + storeFiles.size()
|
||||||
+ " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName());
|
+ " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName());
|
||||||
continue;
|
return true;
|
||||||
}
|
}
|
||||||
// check store file timestamps
|
// check store file timestamps
|
||||||
boolean includeStore = false;
|
boolean includeStore = this.shouldIncludeStore(fileSystem, family, storeFiles, ts);
|
||||||
|
if (!includeStore) {
|
||||||
|
LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
|
||||||
|
.getRegionInfo().getEncodedName() + " already compacted");
|
||||||
|
}
|
||||||
|
return includeStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family,
|
||||||
|
Collection<StoreFileInfo> storeFiles, long ts) throws IOException {
|
||||||
|
|
||||||
for (StoreFileInfo storeFile : storeFiles) {
|
for (StoreFileInfo storeFile : storeFiles) {
|
||||||
if (storeFile.getModificationTime() < timestamp) {
|
if (storeFile.getModificationTime() < ts) {
|
||||||
LOG.info("Including store: " + family + " with: " + storeFiles.size()
|
LOG.info("Including store: " + family + " with: " + storeFiles.size()
|
||||||
+ " files for compaction for region: "
|
+ " files for compaction for region: "
|
||||||
+ fileSystem.getRegionInfo().getEncodedName());
|
+ fileSystem.getRegionInfo().getEncodedName());
|
||||||
familiesToCompact.add(family);
|
return true;
|
||||||
includeStore = true;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!includeStore) {
|
return false;
|
||||||
LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
|
|
||||||
.getRegionInfo().getEncodedName(), " already compacted");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return familiesToCompact;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -133,13 +148,13 @@ class MajorCompactionRequest {
|
||||||
return ConnectionFactory.createConnection(configuration);
|
return ConnectionFactory.createConnection(configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family)
|
protected boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family, long ts)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<Path> referenceFiles =
|
List<Path> referenceFiles =
|
||||||
getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family));
|
getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family));
|
||||||
for (Path referenceFile : referenceFiles) {
|
for (Path referenceFile : referenceFiles) {
|
||||||
FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile);
|
FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile);
|
||||||
if (status.getModificationTime() < timestamp) {
|
if (status.getModificationTime() < ts) {
|
||||||
LOG.info("Including store: " + family + " for compaction for region: " + fileSystem
|
LOG.info("Including store: " + family + " for compaction for region: " + fileSystem
|
||||||
.getRegionInfo().getEncodedName() + " (reference store files)");
|
.getRegionInfo().getEncodedName() + " (reference store files)");
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -0,0 +1,109 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util.compaction;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This request helps determine if a region has to be compacted based on table's TTL.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class MajorCompactionTTLRequest extends MajorCompactionRequest {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionTTLRequest.class);
|
||||||
|
|
||||||
|
MajorCompactionTTLRequest(Configuration conf, RegionInfo region) {
|
||||||
|
super(conf, region);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Optional<MajorCompactionRequest> newRequest(Configuration conf, RegionInfo info,
|
||||||
|
TableDescriptor htd) throws IOException {
|
||||||
|
MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(conf, info);
|
||||||
|
return request.createRequest(conf, htd);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
private Optional<MajorCompactionRequest> createRequest(Configuration conf, TableDescriptor htd)
|
||||||
|
throws IOException {
|
||||||
|
Map<String, Long> familiesToCompact = getStoresRequiringCompaction(htd);
|
||||||
|
MajorCompactionRequest request = null;
|
||||||
|
if (!familiesToCompact.isEmpty()) {
|
||||||
|
LOG.debug("Compaction families for region: " + region + " CF: " + familiesToCompact.keySet());
|
||||||
|
request = new MajorCompactionTTLRequest(conf, region);
|
||||||
|
}
|
||||||
|
return Optional.ofNullable(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Long> getStoresRequiringCompaction(TableDescriptor htd) throws IOException {
|
||||||
|
try(Connection connection = getConnection(configuration)) {
|
||||||
|
HRegionFileSystem fileSystem = getFileSystem(connection);
|
||||||
|
Map<String, Long> familyTTLMap = Maps.newHashMap();
|
||||||
|
for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) {
|
||||||
|
long ts = getColFamilyCutoffTime(descriptor);
|
||||||
|
// If the table's TTL is forever, lets not compact any of the regions.
|
||||||
|
if (ts > 0 && shouldCFBeCompacted(fileSystem, descriptor.getNameAsString(), ts)) {
|
||||||
|
familyTTLMap.put(descriptor.getNameAsString(), ts);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return familyTTLMap;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the CF has no TTL, return -1, else return the current time - TTL.
|
||||||
|
private long getColFamilyCutoffTime(ColumnFamilyDescriptor colDesc) {
|
||||||
|
if (colDesc.getTimeToLive() == HConstants.FOREVER) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return System.currentTimeMillis() - (colDesc.getTimeToLive() * 1000L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean shouldIncludeStore(HRegionFileSystem fileSystem, String family,
|
||||||
|
Collection<StoreFileInfo> storeFiles, long ts) throws IOException {
|
||||||
|
|
||||||
|
for (StoreFileInfo storeFile : storeFiles) {
|
||||||
|
// Lets only compact when all files are older than TTL
|
||||||
|
if (storeFile.getModificationTime() >= ts) {
|
||||||
|
LOG.info("There is atleast one file in store: " + family + " file: " + storeFile.getPath()
|
||||||
|
+ " with timestamp " + storeFile.getModificationTime()
|
||||||
|
+ " for region: " + fileSystem.getRegionInfo().getEncodedName()
|
||||||
|
+ " older than TTL: " + ts);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.util.compaction;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -26,7 +29,9 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -38,15 +43,19 @@ import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.CompactionState;
|
import org.apache.hadoop.hbase.client.CompactionState;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
|
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
|
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
|
||||||
|
@ -57,18 +66,24 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
|
||||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
|
||||||
|
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
|
||||||
public class MajorCompactor {
|
public class MajorCompactor extends Configured implements Tool {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
|
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
|
||||||
private static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
|
protected static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
private final ClusterCompactionQueues clusterCompactionQueues;
|
protected ClusterCompactionQueues clusterCompactionQueues;
|
||||||
private final long timestamp;
|
private long timestamp;
|
||||||
private final Set<String> storesToCompact;
|
protected Set<String> storesToCompact;
|
||||||
private final ExecutorService executor;
|
protected ExecutorService executor;
|
||||||
private final long sleepForMs;
|
protected long sleepForMs;
|
||||||
private final Connection connection;
|
protected Connection connection;
|
||||||
private final TableName tableName;
|
protected TableName tableName;
|
||||||
|
private int numServers = -1;
|
||||||
|
private int numRegions = -1;
|
||||||
|
private boolean skipWait = false;
|
||||||
|
|
||||||
|
MajorCompactor() {
|
||||||
|
}
|
||||||
|
|
||||||
public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact,
|
public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact,
|
||||||
int concurrency, long timestamp, long sleepForMs) throws IOException {
|
int concurrency, long timestamp, long sleepForMs) throws IOException {
|
||||||
|
@ -149,15 +164,83 @@ public class MajorCompactor {
|
||||||
}
|
}
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact);
|
"Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact);
|
||||||
|
|
||||||
|
Map<ServerName, List<RegionInfo>> snRegionMap = getServerRegionsMap();
|
||||||
|
/*
|
||||||
|
* If numservers is specified, stop inspecting regions beyond the numservers, it will serve
|
||||||
|
* to throttle and won't end up scanning all the regions in the event there are not many
|
||||||
|
* regions to compact based on the criteria.
|
||||||
|
*/
|
||||||
|
for (ServerName sn : getServersToCompact(snRegionMap.keySet())) {
|
||||||
|
List<RegionInfo> regions = snRegionMap.get(sn);
|
||||||
|
LOG.debug("Table: " + tableName + " Server: " + sn + " No of regions: " + regions.size());
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the tool is run periodically, then we could shuffle the regions and provide
|
||||||
|
* some random order to select regions. Helps if numregions is specified.
|
||||||
|
*/
|
||||||
|
Collections.shuffle(regions);
|
||||||
|
int regionsToCompact = numRegions;
|
||||||
|
for (RegionInfo hri : regions) {
|
||||||
|
if (numRegions > 0 && regionsToCompact <= 0) {
|
||||||
|
LOG.debug("Reached region limit for server: " + sn);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
Optional<MajorCompactionRequest> request = getMajorCompactionRequest(hri);
|
||||||
|
if (request.isPresent()) {
|
||||||
|
LOG.debug("Adding region " + hri + " to queue " + sn + " for compaction");
|
||||||
|
clusterCompactionQueues.addToCompactionQueue(sn, request.get());
|
||||||
|
if (numRegions > 0) {
|
||||||
|
regionsToCompact--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
|
||||||
|
throws IOException {
|
||||||
|
return MajorCompactionRequest.newRequest(connection.getConfiguration(), hri, storesToCompact,
|
||||||
|
timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) {
|
||||||
|
if(numServers < 0 || snSet.size() <= numServers) {
|
||||||
|
return snSet;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
List<ServerName> snList = Lists.newArrayList(snSet);
|
||||||
|
Collections.shuffle(snList);
|
||||||
|
return snList.subList(0, numServers);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<ServerName, List<RegionInfo>> getServerRegionsMap() throws IOException {
|
||||||
|
Map<ServerName, List<RegionInfo>> snRegionMap = Maps.newHashMap();
|
||||||
List<HRegionLocation> regionLocations =
|
List<HRegionLocation> regionLocations =
|
||||||
connection.getRegionLocator(tableName).getAllRegionLocations();
|
connection.getRegionLocator(tableName).getAllRegionLocations();
|
||||||
for (HRegionLocation location : regionLocations) {
|
for (HRegionLocation regionLocation : regionLocations) {
|
||||||
Optional<MajorCompactionRequest> request = MajorCompactionRequest
|
ServerName sn = regionLocation.getServerName();
|
||||||
.newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
|
RegionInfo hri = regionLocation.getRegion();
|
||||||
timestamp);
|
if (!snRegionMap.containsKey(sn)) {
|
||||||
request.ifPresent(majorCompactionRequest -> clusterCompactionQueues
|
snRegionMap.put(sn, Lists.newArrayList());
|
||||||
.addToCompactionQueue(location.getServerName(), majorCompactionRequest));
|
|
||||||
}
|
}
|
||||||
|
snRegionMap.get(sn).add(hri);
|
||||||
|
}
|
||||||
|
return snRegionMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNumServers(int numServers) {
|
||||||
|
this.numServers = numServers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setNumRegions(int numRegions) {
|
||||||
|
this.numRegions = numRegions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSkipWait(boolean skipWait) {
|
||||||
|
this.skipWait = skipWait;
|
||||||
}
|
}
|
||||||
|
|
||||||
class Compact implements Runnable {
|
class Compact implements Runnable {
|
||||||
|
@ -190,29 +273,36 @@ public class MajorCompactor {
|
||||||
try {
|
try {
|
||||||
// only make the request if the region is not already major compacting
|
// only make the request if the region is not already major compacting
|
||||||
if (!isCompacting(request)) {
|
if (!isCompacting(request)) {
|
||||||
Set<String> stores = request.getStoresRequiringCompaction(storesToCompact);
|
Set<String> stores = getStoresRequiringCompaction(request);
|
||||||
if (!stores.isEmpty()) {
|
if (!stores.isEmpty()) {
|
||||||
request.setStores(stores);
|
request.setStores(stores);
|
||||||
for (String store : request.getStores()) {
|
for (String store : request.getStores()) {
|
||||||
admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
|
compactRegionOnServer(request, admin, store);
|
||||||
Bytes.toBytes(store));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* In some scenarios like compacting TTLed regions, the compaction itself won't take time
|
||||||
|
* and hence we can skip the wait. An external tool will also be triggered frequently and
|
||||||
|
* the next run can identify region movements and compact them.
|
||||||
|
*/
|
||||||
|
if (!skipWait) {
|
||||||
while (isCompacting(request)) {
|
while (isCompacting(request)) {
|
||||||
Thread.sleep(sleepForMs);
|
Thread.sleep(sleepForMs);
|
||||||
LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
|
LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
|
||||||
.getEncodedName());
|
.getEncodedName());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (!skipWait) {
|
||||||
// Make sure to wait for the CompactedFileDischarger chore to do its work
|
// Make sure to wait for the CompactedFileDischarger chore to do its work
|
||||||
int waitForArchive = connection.getConfiguration()
|
int waitForArchive = connection.getConfiguration()
|
||||||
.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
|
.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
|
||||||
Thread.sleep(waitForArchive);
|
Thread.sleep(waitForArchive);
|
||||||
// check if compaction completed successfully, otherwise put that request back in the
|
// check if compaction completed successfully, otherwise put that request back in the
|
||||||
// proper queue
|
// proper queue
|
||||||
Set<String> storesRequiringCompaction =
|
Set<String> storesRequiringCompaction = getStoresRequiringCompaction(request);
|
||||||
request.getStoresRequiringCompaction(storesToCompact);
|
|
||||||
if (!storesRequiringCompaction.isEmpty()) {
|
if (!storesRequiringCompaction.isEmpty()) {
|
||||||
// this happens, when a region server is marked as dead, flushes a store file and
|
// this happens, when a region server is marked as dead, flushes a store file and
|
||||||
// the new regionserver doesn't pick it up because its accounted for in the WAL replay,
|
// the new regionserver doesn't pick it up because its accounted for in the WAL replay,
|
||||||
|
@ -221,8 +311,10 @@ public class MajorCompactor {
|
||||||
.getRegionLocation(request.getRegion().getStartKey()).getServerName()
|
.getRegionLocation(request.getRegion().getStartKey()).getServerName()
|
||||||
.equals(serverName);
|
.equals(serverName);
|
||||||
if (regionHasNotMoved) {
|
if (regionHasNotMoved) {
|
||||||
LOG.error("Not all store files were compacted, this may be due to the regionserver not "
|
LOG.error(
|
||||||
+ "being aware of all store files. Will not reattempt compacting, " + request);
|
"Not all store files were compacted, this may be due to the regionserver not "
|
||||||
|
+ "being aware of all store files. Will not reattempt compacting, "
|
||||||
|
+ request);
|
||||||
ERRORS.add(request);
|
ERRORS.add(request);
|
||||||
} else {
|
} else {
|
||||||
request.setStores(storesRequiringCompaction);
|
request.setStores(storesRequiringCompaction);
|
||||||
|
@ -238,6 +330,13 @@ public class MajorCompactor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void compactRegionOnServer(MajorCompactionRequest request, Admin admin, String store)
|
||||||
|
throws IOException {
|
||||||
|
admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
|
||||||
|
Bytes.toBytes(store));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isCompacting(MajorCompactionRequest request) throws Exception {
|
private boolean isCompacting(MajorCompactionRequest request) throws Exception {
|
||||||
CompactionState compactionState = connection.getAdmin()
|
CompactionState compactionState = connection.getAdmin()
|
||||||
.getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes());
|
.getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes());
|
||||||
|
@ -263,22 +362,14 @@ public class MajorCompactor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request)
|
||||||
|
throws IOException {
|
||||||
|
return request.getStoresRequiringCompaction(storesToCompact, timestamp);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Options getCommonOptions() {
|
||||||
Options options = new Options();
|
Options options = new Options();
|
||||||
options.addOption(
|
|
||||||
Option.builder("table")
|
|
||||||
.required()
|
|
||||||
.desc("table name")
|
|
||||||
.hasArg()
|
|
||||||
.build()
|
|
||||||
);
|
|
||||||
options.addOption(
|
|
||||||
Option.builder("cf")
|
|
||||||
.optionalArg(true)
|
|
||||||
.desc("column families: comma separated eg: a,b,c")
|
|
||||||
.hasArg()
|
|
||||||
.build()
|
|
||||||
);
|
|
||||||
options.addOption(
|
options.addOption(
|
||||||
Option.builder("servers")
|
Option.builder("servers")
|
||||||
.required()
|
.required()
|
||||||
|
@ -327,6 +418,49 @@ public class MajorCompactor {
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Option.builder("skipWait")
|
||||||
|
.desc("Skip waiting after triggering compaction.")
|
||||||
|
.hasArg(false)
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Option.builder("numservers")
|
||||||
|
.optionalArg(true)
|
||||||
|
.desc("Number of servers to compact in this run, defaults to all")
|
||||||
|
.hasArg()
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Option.builder("numregions")
|
||||||
|
.optionalArg(true)
|
||||||
|
.desc("Number of regions to compact per server, defaults to all")
|
||||||
|
.hasArg()
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
Options options = getCommonOptions();
|
||||||
|
options.addOption(
|
||||||
|
Option.builder("table")
|
||||||
|
.required()
|
||||||
|
.desc("table name")
|
||||||
|
.hasArg()
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
options.addOption(
|
||||||
|
Option.builder("cf")
|
||||||
|
.optionalArg(true)
|
||||||
|
.desc("column families: comma separated eg: a,b,c")
|
||||||
|
.hasArg()
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
|
||||||
final CommandLineParser cmdLineParser = new DefaultParser();
|
final CommandLineParser cmdLineParser = new DefaultParser();
|
||||||
CommandLine commandLine = null;
|
CommandLine commandLine = null;
|
||||||
try {
|
try {
|
||||||
|
@ -336,12 +470,12 @@ public class MajorCompactor {
|
||||||
"ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
|
"ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
|
||||||
+ parseException);
|
+ parseException);
|
||||||
printUsage(options);
|
printUsage(options);
|
||||||
return;
|
return -1;
|
||||||
}
|
}
|
||||||
if (commandLine == null) {
|
if (commandLine == null) {
|
||||||
System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
|
System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
|
||||||
printUsage(options);
|
printUsage(options);
|
||||||
return;
|
return -1;
|
||||||
}
|
}
|
||||||
String tableName = commandLine.getOptionValue("table");
|
String tableName = commandLine.getOptionValue("table");
|
||||||
String cf = commandLine.getOptionValue("cf", null);
|
String cf = commandLine.getOptionValue("cf", null);
|
||||||
|
@ -350,8 +484,7 @@ public class MajorCompactor {
|
||||||
Iterables.addAll(families, Splitter.on(",").split(cf));
|
Iterables.addAll(families, Splitter.on(",").split(cf));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Configuration configuration = getConf();
|
||||||
Configuration configuration = HBaseConfiguration.create();
|
|
||||||
int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
|
int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
|
||||||
long minModTime = Long.parseLong(
|
long minModTime = Long.parseLong(
|
||||||
commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis())));
|
commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis())));
|
||||||
|
@ -360,25 +493,35 @@ public class MajorCompactor {
|
||||||
String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
|
String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
|
||||||
long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
|
long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
|
||||||
|
|
||||||
|
int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
|
||||||
|
int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
|
||||||
|
|
||||||
configuration.set(HConstants.HBASE_DIR, rootDir);
|
configuration.set(HConstants.HBASE_DIR, rootDir);
|
||||||
configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
|
configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
|
||||||
|
|
||||||
MajorCompactor compactor =
|
MajorCompactor compactor =
|
||||||
new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency,
|
new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency,
|
||||||
minModTime, sleep);
|
minModTime, sleep);
|
||||||
|
compactor.setNumServers(numServers);
|
||||||
|
compactor.setNumRegions(numRegions);
|
||||||
|
compactor.setSkipWait(commandLine.hasOption("skipWait"));
|
||||||
|
|
||||||
compactor.initializeWorkQueues();
|
compactor.initializeWorkQueues();
|
||||||
if (!commandLine.hasOption("dryRun")) {
|
if (!commandLine.hasOption("dryRun")) {
|
||||||
compactor.compactAllRegions();
|
compactor.compactAllRegions();
|
||||||
}
|
}
|
||||||
compactor.shutdown();
|
compactor.shutdown();
|
||||||
|
return ERRORS.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void printUsage(final Options options) {
|
protected static void printUsage(final Options options) {
|
||||||
String header = "\nUsage instructions\n\n";
|
String header = "\nUsage instructions\n\n";
|
||||||
String footer = "\n";
|
String footer = "\n";
|
||||||
HelpFormatter formatter = new HelpFormatter();
|
HelpFormatter formatter = new HelpFormatter();
|
||||||
formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
|
formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
ToolRunner.run(HBaseConfiguration.create(), new MajorCompactor(), args);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,175 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util.compaction;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This tool compacts a table's regions that are beyond it's TTL. It helps to save disk space and
|
||||||
|
* regions become empty as a result of compaction.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
|
||||||
|
public class MajorCompactorTTL extends MajorCompactor {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactorTTL .class);
|
||||||
|
|
||||||
|
private TableDescriptor htd;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public MajorCompactorTTL(Configuration conf, TableDescriptor htd, int concurrency,
|
||||||
|
long sleepForMs) throws IOException {
|
||||||
|
this.connection = ConnectionFactory.createConnection(conf);
|
||||||
|
this.htd = htd;
|
||||||
|
this.tableName = htd.getTableName();
|
||||||
|
this.storesToCompact = Sets.newHashSet(); // Empty set so all stores will be compacted
|
||||||
|
this.sleepForMs = sleepForMs;
|
||||||
|
this.executor = Executors.newFixedThreadPool(concurrency);
|
||||||
|
this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected MajorCompactorTTL() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
|
||||||
|
throws IOException {
|
||||||
|
return MajorCompactionTTLRequest.newRequest(connection.getConfiguration(), hri, htd);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Set<String> getStoresRequiringCompaction(MajorCompactionRequest request)
|
||||||
|
throws IOException {
|
||||||
|
return ((MajorCompactionTTLRequest)request).getStoresRequiringCompaction(htd).keySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int compactRegionsTTLOnTable(Configuration conf, String table, int concurrency,
|
||||||
|
long sleep, int numServers, int numRegions, boolean dryRun, boolean skipWait)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
Connection conn = ConnectionFactory.createConnection(conf);
|
||||||
|
TableName tableName = TableName.valueOf(table);
|
||||||
|
|
||||||
|
TableDescriptor htd = conn.getAdmin().getDescriptor(tableName);
|
||||||
|
if (!doesAnyColFamilyHaveTTL(htd)) {
|
||||||
|
LOG.info("No TTL present for CF of table: " + tableName + ", skipping compaction");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Major compacting table " + tableName + " based on TTL");
|
||||||
|
MajorCompactor compactor = new MajorCompactorTTL(conf, htd, concurrency, sleep);
|
||||||
|
compactor.setNumServers(numServers);
|
||||||
|
compactor.setNumRegions(numRegions);
|
||||||
|
compactor.setSkipWait(skipWait);
|
||||||
|
|
||||||
|
compactor.initializeWorkQueues();
|
||||||
|
if (!dryRun) {
|
||||||
|
compactor.compactAllRegions();
|
||||||
|
}
|
||||||
|
compactor.shutdown();
|
||||||
|
return ERRORS.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean doesAnyColFamilyHaveTTL(TableDescriptor htd) {
|
||||||
|
for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) {
|
||||||
|
if (descriptor.getTimeToLive() != HConstants.FOREVER) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Options getOptions() {
|
||||||
|
Options options = getCommonOptions();
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Option.builder("table")
|
||||||
|
.required()
|
||||||
|
.desc("table name")
|
||||||
|
.hasArg()
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
|
||||||
|
return options;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
Options options = getOptions();
|
||||||
|
|
||||||
|
final CommandLineParser cmdLineParser = new DefaultParser();
|
||||||
|
CommandLine commandLine;
|
||||||
|
try {
|
||||||
|
commandLine = cmdLineParser.parse(options, args);
|
||||||
|
} catch (ParseException parseException) {
|
||||||
|
System.out.println(
|
||||||
|
"ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
|
||||||
|
+ parseException);
|
||||||
|
printUsage(options);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (commandLine == null) {
|
||||||
|
System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
|
||||||
|
printUsage(options);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
String table = commandLine.getOptionValue("table");
|
||||||
|
int numServers = Integer.parseInt(commandLine.getOptionValue("numservers", "-1"));
|
||||||
|
int numRegions = Integer.parseInt(commandLine.getOptionValue("numregions", "-1"));
|
||||||
|
int concurrency = Integer.parseInt(commandLine.getOptionValue("servers", "1"));
|
||||||
|
long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
|
||||||
|
boolean dryRun = commandLine.hasOption("dryRun");
|
||||||
|
boolean skipWait = commandLine.hasOption("skipWait");
|
||||||
|
|
||||||
|
return compactRegionsTTLOnTable(HBaseConfiguration.create(), table, concurrency, sleep,
|
||||||
|
numServers, numRegions, dryRun, skipWait);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
ToolRunner.run(HBaseConfiguration.create(), new MajorCompactorTTL(), args);
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,11 +17,24 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.util.compaction;
|
package org.apache.hadoop.hbase.util.compaction;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.ArgumentMatchers.isA;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -39,24 +52,13 @@ import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
import static org.mockito.ArgumentMatchers.isA;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||||
import static org.mockito.Matchers.anyString;
|
|
||||||
import static org.mockito.Mockito.doReturn;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
@Category({SmallTests.class})
|
@Category({SmallTests.class})
|
||||||
public class TestMajorCompactionRequest {
|
public class TestMajorCompactionRequest {
|
||||||
|
@ -64,10 +66,10 @@ public class TestMajorCompactionRequest {
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestMajorCompactionRequest.class);
|
HBaseClassTestRule.forClass(TestMajorCompactionRequest.class);
|
||||||
|
|
||||||
private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
|
protected static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
|
||||||
private static final String FAMILY = "a";
|
protected static final String FAMILY = "a";
|
||||||
private Path rootRegionDir;
|
protected Path rootRegionDir;
|
||||||
private Path regionStoreDir;
|
protected Path regionStoreDir;
|
||||||
|
|
||||||
@Before public void setUp() throws Exception {
|
@Before public void setUp() throws Exception {
|
||||||
rootRegionDir = UTILITY.getDataTestDirOnTestFS("TestMajorCompactionRequest");
|
rootRegionDir = UTILITY.getDataTestDirOnTestFS("TestMajorCompactionRequest");
|
||||||
|
@ -77,15 +79,15 @@ public class TestMajorCompactionRequest {
|
||||||
@Test public void testStoresNeedingCompaction() throws Exception {
|
@Test public void testStoresNeedingCompaction() throws Exception {
|
||||||
// store files older than timestamp
|
// store files older than timestamp
|
||||||
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
|
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
|
||||||
MajorCompactionRequest request = makeMockRequest(100, storeFiles, false);
|
MajorCompactionRequest request = makeMockRequest(storeFiles, false);
|
||||||
Optional<MajorCompactionRequest> result =
|
Optional<MajorCompactionRequest> result =
|
||||||
request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
|
request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
|
||||||
assertTrue(result.isPresent());
|
assertTrue(result.isPresent());
|
||||||
|
|
||||||
// store files newer than timestamp
|
// store files newer than timestamp
|
||||||
storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
|
storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
|
||||||
request = makeMockRequest(100, storeFiles, false);
|
request = makeMockRequest(storeFiles, false);
|
||||||
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
|
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
|
||||||
assertFalse(result.isPresent());
|
assertFalse(result.isPresent());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,16 +108,17 @@ public class TestMajorCompactionRequest {
|
||||||
HRegionFileSystem fileSystem =
|
HRegionFileSystem fileSystem =
|
||||||
mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
|
mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
|
||||||
MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration,
|
MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration,
|
||||||
region.getRegionInfo(), Sets.newHashSet(FAMILY), 100));
|
region.getRegionInfo(), Sets.newHashSet(FAMILY)));
|
||||||
doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
|
doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
|
||||||
doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
|
doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
|
||||||
any(Path.class));
|
any(Path.class));
|
||||||
doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class));
|
doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class));
|
||||||
Set<String> result = majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"));
|
Set<String> result =
|
||||||
|
majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"), 100);
|
||||||
assertEquals(FAMILY, Iterables.getOnlyElement(result));
|
assertEquals(FAMILY, Iterables.getOnlyElement(result));
|
||||||
}
|
}
|
||||||
|
|
||||||
private HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles,
|
protected HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles,
|
||||||
List<StoreFileInfo> storeFiles) throws IOException {
|
List<StoreFileInfo> storeFiles) throws IOException {
|
||||||
long timestamp = storeFiles.stream().findFirst().get().getModificationTime();
|
long timestamp = storeFiles.stream().findFirst().get().getModificationTime();
|
||||||
return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp);
|
return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp);
|
||||||
|
@ -138,7 +141,7 @@ public class TestMajorCompactionRequest {
|
||||||
return mockSystem;
|
return mockSystem;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp)
|
protected List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<StoreFileInfo> infos = Lists.newArrayList();
|
List<StoreFileInfo> infos = Lists.newArrayList();
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
@ -153,14 +156,14 @@ public class TestMajorCompactionRequest {
|
||||||
return infos;
|
return infos;
|
||||||
}
|
}
|
||||||
|
|
||||||
private MajorCompactionRequest makeMockRequest(long timestamp, List<StoreFileInfo> storeFiles,
|
private MajorCompactionRequest makeMockRequest(List<StoreFileInfo> storeFiles,
|
||||||
boolean references) throws IOException {
|
boolean references) throws IOException {
|
||||||
Configuration configuration = mock(Configuration.class);
|
Configuration configuration = mock(Configuration.class);
|
||||||
RegionInfo regionInfo = mock(RegionInfo.class);
|
RegionInfo regionInfo = mock(RegionInfo.class);
|
||||||
when(regionInfo.getEncodedName()).thenReturn("HBase");
|
when(regionInfo.getEncodedName()).thenReturn("HBase");
|
||||||
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
|
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
|
||||||
MajorCompactionRequest request =
|
MajorCompactionRequest request =
|
||||||
new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"), timestamp);
|
new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"));
|
||||||
MajorCompactionRequest spy = spy(request);
|
MajorCompactionRequest spy = spy(request);
|
||||||
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles);
|
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles);
|
||||||
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
|
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
|
||||||
|
|
|
@ -0,0 +1,100 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.util.compaction;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
@Category({SmallTests.class})
|
||||||
|
public class TestMajorCompactionTTLRequest extends TestMajorCompactionRequest {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestMajorCompactionTTLRequest.class);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
rootRegionDir = UTILITY.getDataTestDirOnTestFS("TestMajorCompactionTTLRequest");
|
||||||
|
regionStoreDir = new Path(rootRegionDir, FAMILY);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStoresNeedingCompaction() throws Exception {
|
||||||
|
// store files older than timestamp 10
|
||||||
|
List<StoreFileInfo> storeFiles1 = mockStoreFiles(regionStoreDir, 5, 10);
|
||||||
|
// store files older than timestamp 100
|
||||||
|
List<StoreFileInfo> storeFiles2 = mockStoreFiles(regionStoreDir, 5, 100);
|
||||||
|
List<StoreFileInfo> storeFiles = Lists.newArrayList(storeFiles1);
|
||||||
|
storeFiles.addAll(storeFiles2);
|
||||||
|
|
||||||
|
MajorCompactionTTLRequest request = makeMockRequest(storeFiles);
|
||||||
|
// All files are <= 100, so region should not be compacted.
|
||||||
|
Optional<MajorCompactionRequest> result =
|
||||||
|
request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 10);
|
||||||
|
assertFalse(result.isPresent());
|
||||||
|
|
||||||
|
// All files are <= 100, so region should not be compacted yet.
|
||||||
|
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
|
||||||
|
assertFalse(result.isPresent());
|
||||||
|
|
||||||
|
// All files are <= 100, so they should be considered for compaction
|
||||||
|
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 101);
|
||||||
|
assertTrue(result.isPresent());
|
||||||
|
}
|
||||||
|
|
||||||
|
private MajorCompactionTTLRequest makeMockRequest(List<StoreFileInfo> storeFiles)
|
||||||
|
throws IOException {
|
||||||
|
Configuration configuration = mock(Configuration.class);
|
||||||
|
RegionInfo regionInfo = mock(RegionInfo.class);
|
||||||
|
when(regionInfo.getEncodedName()).thenReturn("HBase");
|
||||||
|
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
|
||||||
|
MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(configuration, regionInfo);
|
||||||
|
MajorCompactionTTLRequest spy = spy(request);
|
||||||
|
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, false, storeFiles);
|
||||||
|
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
|
||||||
|
doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
|
||||||
|
return spy;
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util.compaction;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
@ -42,7 +43,8 @@ public class TestMajorCompactor {
|
||||||
HBaseClassTestRule.forClass(TestMajorCompactor.class);
|
HBaseClassTestRule.forClass(TestMajorCompactor.class);
|
||||||
|
|
||||||
public static final byte[] FAMILY = Bytes.toBytes("a");
|
public static final byte[] FAMILY = Bytes.toBytes("a");
|
||||||
private HBaseTestingUtility utility;
|
protected HBaseTestingUtility utility;
|
||||||
|
protected Admin admin;
|
||||||
|
|
||||||
@Before public void setUp() throws Exception {
|
@Before public void setUp() throws Exception {
|
||||||
utility = new HBaseTestingUtility();
|
utility = new HBaseTestingUtility();
|
||||||
|
|
|
@ -0,0 +1,119 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util.compaction;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
@Category({ MiscTests.class, MediumTests.class })
|
||||||
|
public class TestMajorCompactorTTL extends TestMajorCompactor {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestMajorCompactorTTL.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
utility = new HBaseTestingUtility();
|
||||||
|
utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10);
|
||||||
|
utility.startMiniCluster();
|
||||||
|
admin = utility.getAdmin();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
@Override
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
utility.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompactingATable() throws Exception {
|
||||||
|
TableName tableName = createTable(name.getMethodName());
|
||||||
|
|
||||||
|
// Delay a bit, so we can set the table TTL to 5 seconds
|
||||||
|
Thread.sleep(10 * 1000);
|
||||||
|
|
||||||
|
int numberOfRegions = admin.getRegions(tableName).size();
|
||||||
|
int numHFiles = utility.getNumHFiles(tableName, FAMILY);
|
||||||
|
// we should have a table with more store files than we would before we major compacted.
|
||||||
|
assertTrue(numberOfRegions < numHFiles);
|
||||||
|
modifyTTL(tableName);
|
||||||
|
|
||||||
|
MajorCompactorTTL compactor = new MajorCompactorTTL(utility.getConfiguration(),
|
||||||
|
admin.getDescriptor(tableName), 1, 200);
|
||||||
|
compactor.initializeWorkQueues();
|
||||||
|
compactor.compactAllRegions();
|
||||||
|
compactor.shutdown();
|
||||||
|
|
||||||
|
// verify that the store has been completely major compacted.
|
||||||
|
numberOfRegions = admin.getRegions(tableName).size();
|
||||||
|
numHFiles = utility.getNumHFiles(tableName, FAMILY);
|
||||||
|
assertEquals(numberOfRegions, numHFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void modifyTTL(TableName tableName) throws IOException, InterruptedException {
|
||||||
|
// Set the TTL to 5 secs, so all the files just written above will get cleaned up on compact.
|
||||||
|
admin.disableTable(tableName);
|
||||||
|
utility.waitTableDisabled(tableName.getName());
|
||||||
|
TableDescriptor descriptor = admin.getDescriptor(tableName);
|
||||||
|
ColumnFamilyDescriptor colDesc = descriptor.getColumnFamily(FAMILY);
|
||||||
|
ColumnFamilyDescriptorBuilder cFDB = ColumnFamilyDescriptorBuilder.newBuilder(colDesc);
|
||||||
|
cFDB.setTimeToLive(5);
|
||||||
|
admin.modifyColumnFamily(tableName, cFDB.build());
|
||||||
|
admin.enableTable(tableName);
|
||||||
|
utility.waitTableEnabled(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected TableName createTable(String name) throws IOException, InterruptedException {
|
||||||
|
TableName tableName = TableName.valueOf(name);
|
||||||
|
utility.createMultiRegionTable(tableName, FAMILY, 5);
|
||||||
|
utility.waitTableAvailable(tableName);
|
||||||
|
Connection connection = utility.getConnection();
|
||||||
|
Table table = connection.getTable(tableName);
|
||||||
|
// write data and flush multiple store files:
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
utility.loadRandomRows(table, FAMILY, 50, 100);
|
||||||
|
utility.flush(tableName);
|
||||||
|
}
|
||||||
|
table.close();
|
||||||
|
return tableName;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue