HBASE-19528 - Major Compaction Tool

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Rahul Gidwani 2018-01-31 14:22:33 -08:00 committed by Michael Stack
parent 13cc7f9f50
commit 673e809250
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
5 changed files with 963 additions and 0 deletions

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.compaction;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.hbase.ServerName;
@InterfaceAudience.Private
class ClusterCompactionQueues {
private final Map<ServerName, List<MajorCompactionRequest>> compactionQueues;
private final Set<ServerName> compactingServers;
private final ReadWriteLock lock;
private final int concurrentServers;
ClusterCompactionQueues(int concurrentServers) {
this.concurrentServers = concurrentServers;
this.compactionQueues = Maps.newHashMap();
this.lock = new ReentrantReadWriteLock();
this.compactingServers = Sets.newHashSet();
}
void addToCompactionQueue(ServerName serverName, MajorCompactionRequest info) {
this.lock.writeLock().lock();
try {
List<MajorCompactionRequest> result = this.compactionQueues.get(serverName);
if (result == null) {
result = Lists.newArrayList();
compactionQueues.put(serverName, result);
}
result.add(info);
} finally {
this.lock.writeLock().unlock();
}
}
boolean hasWorkItems() {
lock.readLock().lock();
try {
for (List<MajorCompactionRequest> majorCompactionRequests : this.compactionQueues.values()) {
if (!majorCompactionRequests.isEmpty()) {
return true;
}
}
return false;
} finally {
lock.readLock().unlock();
}
}
int getCompactionRequestsLeftToFinish() {
lock.readLock().lock();
try {
int size = 0;
for (List<MajorCompactionRequest> queue : compactionQueues.values()) {
size += queue.size();
}
return size;
} finally {
lock.readLock().unlock();
}
}
@VisibleForTesting
List<MajorCompactionRequest> getQueue(ServerName serverName) {
lock.readLock().lock();
try {
return compactionQueues.get(serverName);
} finally {
lock.readLock().unlock();
}
}
MajorCompactionRequest reserveForCompaction(ServerName serverName) {
lock.writeLock().lock();
try {
if (!compactionQueues.get(serverName).isEmpty()) {
compactingServers.add(serverName);
return compactionQueues.get(serverName).remove(0);
}
return null;
} finally {
lock.writeLock().unlock();
}
}
void releaseCompaction(ServerName serverName) {
lock.writeLock().lock();
try {
compactingServers.remove(serverName);
} finally {
lock.writeLock().unlock();
}
}
boolean atCapacity() {
lock.readLock().lock();
try {
return compactingServers.size() >= concurrentServers;
} finally {
lock.readLock().unlock();
}
}
Optional<ServerName> getLargestQueueFromServersNotCompacting() {
lock.readLock().lock();
try {
Sets.SetView<ServerName> difference =
Sets.difference(compactionQueues.keySet(), compactingServers);
ServerName serverName = null;
int maxItems = 0;
for (ServerName server : difference) {
if (compactionQueues.get(server).size() > maxItems) {
maxItems = compactionQueues.get(server).size();
serverName = server;
}
}
return Optional.fromNullable(serverName);
} finally {
lock.readLock().unlock();
}
}
}

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.compaction;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.FSUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
class MajorCompactionRequest {
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class);
private final Configuration configuration;
private final HRegionInfo region;
private Set<String> stores;
private final long timestamp;
@VisibleForTesting
MajorCompactionRequest(Configuration configuration, HRegionInfo region,
Set<String> stores, long timestamp) {
this.configuration = configuration;
this.region = region;
this.stores = stores;
this.timestamp = timestamp;
}
static Optional<MajorCompactionRequest> newRequest(Configuration configuration, HRegionInfo info,
Set<String> stores, long timestamp) throws IOException {
MajorCompactionRequest request =
new MajorCompactionRequest(configuration, info, stores, timestamp);
return request.createRequest(configuration, stores);
}
HRegionInfo getRegion() {
return region;
}
Set<String> getStores() {
return stores;
}
void setStores(Set<String> stores) {
this.stores = stores;
}
@VisibleForTesting
Optional<MajorCompactionRequest> createRequest(Configuration configuration,
Set<String> stores) throws IOException {
Set<String> familiesToCompact = getStoresRequiringCompaction(stores);
MajorCompactionRequest request = null;
if (!familiesToCompact.isEmpty()) {
request = new MajorCompactionRequest(configuration, region, familiesToCompact, timestamp);
}
return Optional.fromNullable(request);
}
Set<String> getStoresRequiringCompaction(Set<String> requestedStores) throws IOException {
try(Connection connection = getConnection(configuration)) {
HRegionFileSystem fileSystem = getFileSystem(connection);
Set<String> familiesToCompact = Sets.newHashSet();
for (String family : requestedStores) {
// do we have any store files?
Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
if (storeFiles == null) {
LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
.getRegionInfo().getEncodedName(), " has no store files");
continue;
}
// check for reference files
if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family)) {
familiesToCompact.add(family);
LOG.info("Including store: " + family + " with: " + storeFiles.size()
+ " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName());
continue;
}
// check store file timestamps
boolean includeStore = false;
for (StoreFileInfo storeFile : storeFiles) {
if (storeFile.getModificationTime() < timestamp) {
LOG.info("Including store: " + family + " with: " + storeFiles.size()
+ " files for compaction for region: "
+ fileSystem.getRegionInfo().getEncodedName());
familiesToCompact.add(family);
includeStore = true;
break;
}
}
if (!includeStore) {
LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
.getRegionInfo().getEncodedName(), " already compacted");
}
}
return familiesToCompact;
}
}
@VisibleForTesting
Connection getConnection(Configuration configuration) throws IOException {
return ConnectionFactory.createConnection(configuration);
}
private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family)
throws IOException {
List<Path> referenceFiles =
getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family));
for (Path referenceFile : referenceFiles) {
FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile);
if (status.getModificationTime() < timestamp) {
LOG.info("Including store: " + family + " for compaction for region: " + fileSystem
.getRegionInfo().getEncodedName() + " (reference store files)");
return true;
}
}
return false;
}
@VisibleForTesting
List<Path> getReferenceFilePaths(FileSystem fileSystem, Path familyDir)
throws IOException {
return FSUtils.getReferenceFilePaths(fileSystem, familyDir);
}
@VisibleForTesting
HRegionFileSystem getFileSystem(Connection connection) throws IOException {
Admin admin = connection.getAdmin();
return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(),
FSUtils.getCurrentFileSystem(admin.getConfiguration()),
FSUtils.getTableDir(FSUtils.getRootDir(admin.getConfiguration()), region.getTable()),
region, true);
}
@Override
public String toString() {
return "region: " + region.getEncodedName() + " store(s): " + stores;
}
}

View File

@ -0,0 +1,372 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.compaction;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
public class MajorCompactor {
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
private static final Set<MajorCompactionRequest> ERRORS = Sets.newHashSet();
private final ClusterCompactionQueues clusterCompactionQueues;
private final long timestamp;
private final Set<String> storesToCompact;
private final ExecutorService executor;
private final long sleepForMs;
private final Connection connection;
private final TableName tableName;
public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact,
int concurrency, long timestamp, long sleepForMs) throws IOException {
this.connection = ConnectionFactory.createConnection(conf);
this.tableName = tableName;
this.timestamp = timestamp;
this.storesToCompact = storesToCompact;
this.executor = Executors.newFixedThreadPool(concurrency);
this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
this.sleepForMs = sleepForMs;
}
public void compactAllRegions() throws Exception {
List<Future<?>> futures = Lists.newArrayList();
while (clusterCompactionQueues.hasWorkItems() || !futuresComplete(futures)) {
while (clusterCompactionQueues.atCapacity()) {
LOG.debug("Waiting for servers to complete Compactions");
Thread.sleep(sleepForMs);
}
Optional<ServerName> serverToProcess =
clusterCompactionQueues.getLargestQueueFromServersNotCompacting();
if (serverToProcess.isPresent() && clusterCompactionQueues.hasWorkItems()) {
ServerName serverName = serverToProcess.get();
// check to see if the region has moved... if so we have to enqueue it again with
// the proper serverName
MajorCompactionRequest request = clusterCompactionQueues.reserveForCompaction(serverName);
ServerName currentServer = connection.getRegionLocator(tableName)
.getRegionLocation(request.getRegion().getStartKey()).getServerName();
if (!currentServer.equals(serverName)) {
// add it back to the queue with the correct server it should be picked up in the future.
LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: "
+ serverName + " to: " + currentServer + " re-queuing request");
clusterCompactionQueues.addToCompactionQueue(currentServer, request);
clusterCompactionQueues.releaseCompaction(serverName);
} else {
LOG.info("Firing off compaction request for server: " + serverName + ", " + request
+ " total queue size left: " + clusterCompactionQueues
.getCompactionRequestsLeftToFinish());
futures.add(executor.submit(new Compact(serverName, request)));
}
} else {
// haven't assigned anything so we sleep.
Thread.sleep(sleepForMs);
}
}
LOG.info("All compactions have completed");
}
private boolean futuresComplete(List<Future<?>> futures) {
Iterables.removeIf(futures, new Predicate<Future<?>>() {
@Override public boolean apply(Future<?> input) {
return input.isDone();
}
});
return futures.isEmpty();
}
public void shutdown() throws Exception {
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
if (!ERRORS.isEmpty()) {
StringBuilder builder =
new StringBuilder().append("Major compaction failed, there were: ").append(ERRORS.size())
.append(" regions / stores that failed compacting\n")
.append("Failed compaction requests\n").append("--------------------------\n")
.append(Joiner.on("\n").join(ERRORS));
LOG.error(builder.toString());
}
if (connection != null) {
connection.close();
}
LOG.info("All regions major compacted successfully");
}
@VisibleForTesting
void initializeWorkQueues() throws IOException {
if (storesToCompact.isEmpty()) {
for (HColumnDescriptor a : connection.getTable(tableName).getTableDescriptor()
.getFamilies()) {
storesToCompact.add(Bytes.toString(a.getName()));
}
LOG.info("No family specified, will execute for all families");
}
LOG.info(
"Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact);
List<HRegionLocation> regionLocations =
connection.getRegionLocator(tableName).getAllRegionLocations();
for (HRegionLocation location : regionLocations) {
Optional<MajorCompactionRequest> request = MajorCompactionRequest
.newRequest(connection.getConfiguration(), location.getRegionInfo(), storesToCompact,
timestamp);
if (request.isPresent()) {
clusterCompactionQueues.addToCompactionQueue(location.getServerName(), request.get());
}
}
}
class Compact implements Runnable {
private final ServerName serverName;
private final MajorCompactionRequest request;
Compact(ServerName serverName, MajorCompactionRequest request) {
this.serverName = serverName;
this.request = request;
}
@Override public void run() {
try {
compactAndWait(request);
} catch (NotServingRegionException e) {
// this region has split or merged
LOG.warn("Region is invalid, requesting updated regions", e);
// lets updated the cluster compaction queues with these newly created regions.
addNewRegions();
} catch (Exception e) {
LOG.warn("Error compacting:", e);
} finally {
clusterCompactionQueues.releaseCompaction(serverName);
}
}
void compactAndWait(MajorCompactionRequest request) throws Exception {
Admin admin = connection.getAdmin();
try {
// only make the request if the region is not already major compacting
if (!isCompacting(request)) {
Set<String> stores = request.getStoresRequiringCompaction(storesToCompact);
if (!stores.isEmpty()) {
request.setStores(stores);
for (String store : request.getStores()) {
admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
Bytes.toBytes(store));
}
}
}
while (isCompacting(request)) {
Thread.sleep(sleepForMs);
LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
.getEncodedName());
}
} finally {
// Make sure to wait for the CompactedFileDischarger chore to do its work
int waitForArchive = connection.getConfiguration()
.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
Thread.sleep(waitForArchive);
// check if compaction completed successfully, otherwise put that request back in the
// proper queue
Set<String> storesRequiringCompaction =
request.getStoresRequiringCompaction(storesToCompact);
if (!storesRequiringCompaction.isEmpty()) {
// this happens, when a region server is marked as dead, flushes a store file and
// the new regionserver doesn't pick it up because its accounted for in the WAL replay,
// thus you have more store files on the filesystem than the regionserver knows about.
boolean regionHasNotMoved = connection.getRegionLocator(tableName)
.getRegionLocation(request.getRegion().getStartKey()).getServerName()
.equals(serverName);
if (regionHasNotMoved) {
LOG.error("Not all store files were compacted, this may be due to the regionserver not "
+ "being aware of all store files. Will not reattempt compacting, " + request);
ERRORS.add(request);
} else {
request.setStores(storesRequiringCompaction);
clusterCompactionQueues.addToCompactionQueue(serverName, request);
LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction
+ " region: " + request.getRegion().getEncodedName());
}
} else {
LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName()
+ " -> cf(s): " + request.getStores());
}
}
}
}
private boolean isCompacting(MajorCompactionRequest request) throws Exception {
AdminProtos.GetRegionInfoResponse.CompactionState compactionState = connection.getAdmin()
.getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes());
return compactionState.equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR)
|| compactionState
.equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR_AND_MINOR);
}
private void addNewRegions() {
try {
List<HRegionLocation> locations =
connection.getRegionLocator(tableName).getAllRegionLocations();
for (HRegionLocation location : locations) {
if (location.getRegionInfo().getRegionId() > timestamp) {
Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest
.newRequest(connection.getConfiguration(), location.getRegionInfo(), storesToCompact,
timestamp);
if (compactionRequest.isPresent()) {
clusterCompactionQueues
.addToCompactionQueue(location.getServerName(), compactionRequest.get());
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws Exception {
Options options = new Options();
Option tableOption = new Option("table", true, "table name");
tableOption.setRequired(true);
options.addOption(tableOption);
Option cfOption = new Option("cf", true, "column families: comma separated eg: a,b,c");
cfOption.setOptionalArg(true);
options.addOption(cfOption);
Option serverOption = new Option("servers", true, "Concurrent servers compacting");
serverOption.setRequired(true);
options.addOption(serverOption);
options.addOption(new Option("minModTime", true,
"Compact if store files have modification time < minModTime"));
Option zkOption = new Option("zk", true, "zk quorum");
zkOption.setOptionalArg(true);
options.addOption(zkOption);
Option rootDirOption = new Option("rootDir", true, "hbase.rootDir");
rootDirOption.setOptionalArg(true);
options.addOption(rootDirOption);
Option sleepOption = new Option("sleep", true, "Time to sleepForMs (ms) for checking "
+ "compaction status per region and available "
+ "work queues: default 30s");
options.addOption(sleepOption);
Option retryOption = new Option("retries", true, "Max # of retries for a compaction request,"
+ " defaults to 3");
options.addOption(
retryOption
);
options.addOption(
new Option(
"dryRun",
false,
"Dry run, will just output a list of regions that require compaction "
+ "based on parameters passed")
);
final CommandLineParser cmdLineParser = new BasicParser();
CommandLine commandLine = null;
try {
commandLine = cmdLineParser.parse(options, args);
} catch (ParseException parseException) {
System.out.println(
"ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
+ parseException);
printUsage(options);
throw parseException;
}
String tableName = commandLine.getOptionValue("table");
String cf = commandLine.getOptionValue("cf", null);
Set<String> families = Sets.newHashSet();
if (cf != null) {
Iterables.addAll(families, Splitter.on(",").split(cf));
}
Configuration configuration = HBaseConfiguration.create();
int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
long minModTime = Long.parseLong(
commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis())));
String quorum =
commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM));
String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
long sleep = Long.parseLong(commandLine.getOptionValue("sleep", "30000"));
configuration.set(HConstants.HBASE_DIR, rootDir);
configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
MajorCompactor compactor =
new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency,
minModTime, sleep);
compactor.initializeWorkQueues();
if (!commandLine.hasOption("dryRun")) {
compactor.compactAllRegions();
}
compactor.shutdown();
}
private static void printUsage(final Options options) {
String header = "\nUsage instructions\n\n";
String footer = "\n";
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
}
}

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.compaction;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@Category({SmallTests.class})
public class MajorCompactionRequestTest {
private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
private static final String FAMILY = "a";
private Path rootRegionDir;
private Path regionStoreDir;
@Before public void setUp() throws Exception {
rootRegionDir = UTILITY.getDataTestDirOnTestFS("MajorCompactionRequestTest");
regionStoreDir = new Path(rootRegionDir, FAMILY);
}
@Test public void testStoresNeedingCompaction() throws Exception {
// store files older than timestamp
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
MajorCompactionRequest request = makeMockRequest(100, storeFiles, false);
Optional<MajorCompactionRequest> result =
request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
assertTrue(result.isPresent());
// store files newer than timestamp
storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
request = makeMockRequest(100, storeFiles, false);
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
assertFalse(result.isPresent());
}
@Test public void testIfWeHaveNewReferenceFilesButOldStoreFiles() throws Exception {
// this tests that reference files that are new, but have older timestamps for the files
// they reference still will get compacted.
TableName table = TableName.valueOf("MajorCompactorTest");
HTableDescriptor htd = UTILITY.createTableDescriptor(table, Bytes.toBytes(FAMILY));
HRegionInfo hri = new HRegionInfo(htd.getTableName());
HRegion region =
HBaseTestingUtility.createRegionAndWAL(hri, rootRegionDir, UTILITY.getRandomDir(),
UTILITY.getConfiguration(), htd);
Configuration configuration = mock(Configuration.class);
// the reference file timestamp is newer
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 4, 101);
List<Path> paths = new ArrayList<>();
for (StoreFileInfo storeFile : storeFiles) {
Path path = storeFile.getPath();
paths.add(path);
}
// the files that are referenced are older, thus we still compact.
HRegionFileSystem fileSystem =
mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration,
region.getRegionInfo(), Sets.newHashSet(FAMILY), 100));
doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
any(Path.class));
doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class));
Set<String> result = majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"));
assertEquals(FAMILY, Iterables.getOnlyElement(result));
}
private HRegionFileSystem mockFileSystem(HRegionInfo info, boolean hasReferenceFiles,
List<StoreFileInfo> storeFiles) throws IOException {
Optional<StoreFileInfo> found = Optional.absent();
for (StoreFileInfo storeFile : storeFiles) {
found = Optional.of(storeFile);
break;
}
long timestamp = found.get().getModificationTime();
return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp);
}
private HRegionFileSystem mockFileSystem(HRegionInfo info, boolean hasReferenceFiles,
List<StoreFileInfo> storeFiles, long referenceFileTimestamp) throws IOException {
FileSystem fileSystem = mock(FileSystem.class);
if (hasReferenceFiles) {
FileStatus fileStatus = mock(FileStatus.class);
doReturn(referenceFileTimestamp).when(fileStatus).getModificationTime();
doReturn(fileStatus).when(fileSystem).getFileLinkStatus(isA(Path.class));
}
HRegionFileSystem mockSystem = mock(HRegionFileSystem.class);
doReturn(info).when(mockSystem).getRegionInfo();
doReturn(regionStoreDir).when(mockSystem).getStoreDir(FAMILY);
doReturn(hasReferenceFiles).when(mockSystem).hasReferences(anyString());
doReturn(storeFiles).when(mockSystem).getStoreFiles(anyString());
doReturn(fileSystem).when(mockSystem).getFileSystem();
return mockSystem;
}
private List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp)
throws IOException {
List<StoreFileInfo> infos = Lists.newArrayList();
int i = 0;
while (i < howMany) {
StoreFileInfo storeFileInfo = mock(StoreFileInfo.class);
doReturn(timestamp).doReturn(timestamp).when(storeFileInfo).getModificationTime();
doReturn(new Path(regionStoreDir, RandomStringUtils.randomAlphabetic(10))).when(storeFileInfo)
.getPath();
infos.add(storeFileInfo);
i++;
}
return infos;
}
private MajorCompactionRequest makeMockRequest(long timestamp, List<StoreFileInfo> storeFiles,
boolean references) throws IOException {
Configuration configuration = mock(Configuration.class);
HRegionInfo regionInfo = mock(HRegionInfo.class);
when(regionInfo.getEncodedName()).thenReturn("HBase");
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
MajorCompactionRequest request =
new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"), timestamp);
MajorCompactionRequest spy = spy(request);
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles);
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
return spy;
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.compaction;
import java.io.IOException;
import java.util.Random;
import com.google.common.collect.Sets;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MiscTests.class, MediumTests.class })
public class MajorCompactorTest {
public static final byte[] FAMILY = Bytes.toBytes("a");
private HBaseTestingUtility utility;
@Before public void setUp() throws Exception {
utility = new HBaseTestingUtility();
utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10);
utility.startMiniCluster();
}
@After public void tearDown() throws Exception {
utility.shutdownMiniCluster();
}
@Test public void testCompactingATable() throws Exception {
TableName tableName = TableName.valueOf("MajorCompactorTest");
utility.createMultiRegionTable(tableName, FAMILY, 5);
utility.waitTableAvailable(tableName);
Connection connection = utility.getConnection();
Table table = connection.getTable(tableName);
// write data and flush multiple store files:
for (int i = 0; i < 5; i++) {
loadRandomRows(table, FAMILY, 50, 100);
utility.flush(tableName);
}
table.close();
int numberOfRegions = utility.getHBaseAdmin().getTableRegions(tableName).size();
int numHFiles = utility.getNumHFiles(tableName, FAMILY);
// we should have a table with more store files than we would before we major compacted.
assertTrue(numberOfRegions < numHFiles);
MajorCompactor compactor =
new MajorCompactor(utility.getConfiguration(), tableName,
Sets.newHashSet(Bytes.toString(FAMILY)), 1, System.currentTimeMillis(), 200);
compactor.initializeWorkQueues();
compactor.compactAllRegions();
compactor.shutdown();
// verify that the store has been completely major compacted.
numberOfRegions = utility.getHBaseAdmin().getTableRegions(tableName).size();
numHFiles = utility.getNumHFiles(tableName, FAMILY);
assertEquals(numHFiles, numberOfRegions);
}
private void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows)
throws IOException {
Random r = new Random();
byte[] row = new byte[rowSize];
for (int i = 0; i < totalRows; i++) {
r.nextBytes(row);
Put put = new Put(row);
put.addColumn(f, new byte[]{0}, new byte[]{0});
t.put(put);
}
}
}