HBASE-19528 - Major Compaction Tool
This commit is contained in:
parent
7c318cead9
commit
4b3b627abe
|
@ -0,0 +1,137 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util.compaction;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
class ClusterCompactionQueues {
|
||||
|
||||
private final Map<ServerName, List<MajorCompactionRequest>> compactionQueues;
|
||||
private final Set<ServerName> compactingServers;
|
||||
private final ReadWriteLock lock;
|
||||
private final int concurrentServers;
|
||||
|
||||
ClusterCompactionQueues(int concurrentServers) {
|
||||
this.concurrentServers = concurrentServers;
|
||||
|
||||
this.compactionQueues = Maps.newHashMap();
|
||||
this.lock = new ReentrantReadWriteLock();
|
||||
this.compactingServers = Sets.newHashSet();
|
||||
}
|
||||
|
||||
void addToCompactionQueue(ServerName serverName, MajorCompactionRequest info) {
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
List<MajorCompactionRequest> result = this.compactionQueues.get(serverName);
|
||||
if (result == null) {
|
||||
result = Lists.newArrayList();
|
||||
compactionQueues.put(serverName, result);
|
||||
}
|
||||
result.add(info);
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
boolean hasWorkItems() {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
return !this.compactionQueues.values().stream().allMatch(List::isEmpty);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
int getCompactionRequestsLeftToFinish() {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
int size = 0;
|
||||
for (List<MajorCompactionRequest> queue : compactionQueues.values()) {
|
||||
size += queue.size();
|
||||
}
|
||||
return size;
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting List<MajorCompactionRequest> getQueue(ServerName serverName) {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
return compactionQueues.get(serverName);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
MajorCompactionRequest reserveForCompaction(ServerName serverName) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
if (!compactionQueues.get(serverName).isEmpty()) {
|
||||
compactingServers.add(serverName);
|
||||
return compactionQueues.get(serverName).remove(0);
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void releaseCompaction(ServerName serverName) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
compactingServers.remove(serverName);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
boolean atCapacity() {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
return compactingServers.size() >= concurrentServers;
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
Optional<ServerName> getLargestQueueFromServersNotCompacting() {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
return compactionQueues.entrySet().stream()
|
||||
.filter(entry -> !compactingServers.contains(entry.getKey()))
|
||||
.max(Map.Entry.comparingByValue(
|
||||
(o1, o2) -> Integer.compare(o1.size(), o2.size()))).map(Map.Entry::getKey);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,171 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util.compaction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
class MajorCompactionRequest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class);
|
||||
|
||||
private final Configuration configuration;
|
||||
private final RegionInfo region;
|
||||
private Set<String> stores;
|
||||
private final long timestamp;
|
||||
|
||||
@VisibleForTesting
|
||||
MajorCompactionRequest(Configuration configuration, RegionInfo region,
|
||||
Set<String> stores, long timestamp) {
|
||||
this.configuration = configuration;
|
||||
this.region = region;
|
||||
this.stores = stores;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
static Optional<MajorCompactionRequest> newRequest(Configuration configuration, RegionInfo info,
|
||||
Set<String> stores, long timestamp) throws IOException {
|
||||
MajorCompactionRequest request =
|
||||
new MajorCompactionRequest(configuration, info, stores, timestamp);
|
||||
return request.createRequest(configuration, stores);
|
||||
}
|
||||
|
||||
RegionInfo getRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
Set<String> getStores() {
|
||||
return stores;
|
||||
}
|
||||
|
||||
void setStores(Set<String> stores) {
|
||||
this.stores = stores;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Optional<MajorCompactionRequest> createRequest(Configuration configuration,
|
||||
Set<String> stores) throws IOException {
|
||||
Set<String> familiesToCompact = getStoresRequiringCompaction(stores);
|
||||
MajorCompactionRequest request = null;
|
||||
if (!familiesToCompact.isEmpty()) {
|
||||
request = new MajorCompactionRequest(configuration, region, familiesToCompact, timestamp);
|
||||
}
|
||||
return Optional.ofNullable(request);
|
||||
}
|
||||
|
||||
Set<String> getStoresRequiringCompaction(Set<String> requestedStores) throws IOException {
|
||||
try(Connection connection = getConnection(configuration)) {
|
||||
HRegionFileSystem fileSystem = getFileSystem(connection);
|
||||
Set<String> familiesToCompact = Sets.newHashSet();
|
||||
for (String family : requestedStores) {
|
||||
// do we have any store files?
|
||||
Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
|
||||
if (storeFiles == null) {
|
||||
LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
|
||||
.getRegionInfo().getEncodedName(), " has no store files");
|
||||
continue;
|
||||
}
|
||||
// check for reference files
|
||||
if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family)) {
|
||||
familiesToCompact.add(family);
|
||||
LOG.info("Including store: " + family + " with: " + storeFiles.size()
|
||||
+ " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName());
|
||||
continue;
|
||||
}
|
||||
// check store file timestamps
|
||||
boolean includeStore = false;
|
||||
for (StoreFileInfo storeFile : storeFiles) {
|
||||
if (storeFile.getModificationTime() < timestamp) {
|
||||
LOG.info("Including store: " + family + " with: " + storeFiles.size()
|
||||
+ " files for compaction for region: "
|
||||
+ fileSystem.getRegionInfo().getEncodedName());
|
||||
familiesToCompact.add(family);
|
||||
includeStore = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!includeStore) {
|
||||
LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem
|
||||
.getRegionInfo().getEncodedName(), " already compacted");
|
||||
}
|
||||
}
|
||||
return familiesToCompact;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Connection getConnection(Configuration configuration) throws IOException {
|
||||
return ConnectionFactory.createConnection(configuration);
|
||||
}
|
||||
|
||||
private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family)
|
||||
throws IOException {
|
||||
List<Path> referenceFiles =
|
||||
getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family));
|
||||
for (Path referenceFile : referenceFiles) {
|
||||
FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile);
|
||||
if (status.getModificationTime() < timestamp) {
|
||||
LOG.info("Including store: " + family + " for compaction for region: " + fileSystem
|
||||
.getRegionInfo().getEncodedName() + " (reference store files)");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
List<Path> getReferenceFilePaths(FileSystem fileSystem, Path familyDir)
|
||||
throws IOException {
|
||||
return FSUtils.getReferenceFilePaths(fileSystem, familyDir);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
HRegionFileSystem getFileSystem(Connection connection) throws IOException {
|
||||
Admin admin = connection.getAdmin();
|
||||
return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(),
|
||||
FSUtils.getCurrentFileSystem(admin.getConfiguration()),
|
||||
FSUtils.getTableDir(FSUtils.getRootDir(admin.getConfiguration()), region.getTable()),
|
||||
region, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "region: " + region.getEncodedName() + " store(s): " + stores;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,379 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util.compaction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.CommandLineParser;
|
||||
import org.apache.commons.cli.DefaultParser;
|
||||
import org.apache.commons.cli.HelpFormatter;
|
||||
import org.apache.commons.cli.Option;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.CompactionState;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
|
||||
public class MajorCompactor {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class);
|
||||
private static final Set<MajorCompactionRequest> ERRORS = ConcurrentHashMap.newKeySet();
|
||||
|
||||
private final ClusterCompactionQueues clusterCompactionQueues;
|
||||
private final long timestamp;
|
||||
private final Set<String> storesToCompact;
|
||||
private final ExecutorService executor;
|
||||
private final long sleepForMs;
|
||||
private final Connection connection;
|
||||
private final TableName tableName;
|
||||
|
||||
public MajorCompactor(Configuration conf, TableName tableName, Set<String> storesToCompact,
|
||||
int concurrency, long timestamp, long sleepForMs) throws IOException {
|
||||
this.connection = ConnectionFactory.createConnection(conf);
|
||||
this.tableName = tableName;
|
||||
this.timestamp = timestamp;
|
||||
this.storesToCompact = storesToCompact;
|
||||
this.executor = Executors.newFixedThreadPool(concurrency);
|
||||
this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency);
|
||||
this.sleepForMs = sleepForMs;
|
||||
}
|
||||
|
||||
public void compactAllRegions() throws Exception {
|
||||
List<Future<?>> futures = Lists.newArrayList();
|
||||
while (clusterCompactionQueues.hasWorkItems() || !futuresComplete(futures)) {
|
||||
while (clusterCompactionQueues.atCapacity()) {
|
||||
LOG.debug("Waiting for servers to complete Compactions");
|
||||
Thread.sleep(sleepForMs);
|
||||
}
|
||||
Optional<ServerName> serverToProcess =
|
||||
clusterCompactionQueues.getLargestQueueFromServersNotCompacting();
|
||||
if (serverToProcess.isPresent() && clusterCompactionQueues.hasWorkItems()) {
|
||||
ServerName serverName = serverToProcess.get();
|
||||
// check to see if the region has moved... if so we have to enqueue it again with
|
||||
// the proper serverName
|
||||
MajorCompactionRequest request = clusterCompactionQueues.reserveForCompaction(serverName);
|
||||
|
||||
ServerName currentServer = connection.getRegionLocator(tableName)
|
||||
.getRegionLocation(request.getRegion().getStartKey()).getServerName();
|
||||
|
||||
if (!currentServer.equals(serverName)) {
|
||||
// add it back to the queue with the correct server it should be picked up in the future.
|
||||
LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: "
|
||||
+ serverName + " to: " + currentServer + " re-queuing request");
|
||||
clusterCompactionQueues.addToCompactionQueue(currentServer, request);
|
||||
clusterCompactionQueues.releaseCompaction(serverName);
|
||||
} else {
|
||||
LOG.info("Firing off compaction request for server: " + serverName + ", " + request
|
||||
+ " total queue size left: " + clusterCompactionQueues
|
||||
.getCompactionRequestsLeftToFinish());
|
||||
futures.add(executor.submit(new Compact(serverName, request)));
|
||||
}
|
||||
} else {
|
||||
// haven't assigned anything so we sleep.
|
||||
Thread.sleep(sleepForMs);
|
||||
}
|
||||
}
|
||||
LOG.info("All compactions have completed");
|
||||
}
|
||||
|
||||
private boolean futuresComplete(List<Future<?>> futures) {
|
||||
futures.removeIf(Future::isDone);
|
||||
return futures.isEmpty();
|
||||
}
|
||||
|
||||
public void shutdown() throws Exception {
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||
if (!ERRORS.isEmpty()) {
|
||||
StringBuilder builder =
|
||||
new StringBuilder().append("Major compaction failed, there were: ").append(ERRORS.size())
|
||||
.append(" regions / stores that failed compacting\n")
|
||||
.append("Failed compaction requests\n").append("--------------------------\n")
|
||||
.append(Joiner.on("\n").join(ERRORS));
|
||||
LOG.error(builder.toString());
|
||||
}
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
LOG.info("All regions major compacted successfully");
|
||||
}
|
||||
|
||||
@VisibleForTesting void initializeWorkQueues() throws IOException {
|
||||
if (storesToCompact.isEmpty()) {
|
||||
connection.getTable(tableName).getDescriptor().getColumnFamilyNames()
|
||||
.forEach(a -> storesToCompact.add(Bytes.toString(a)));
|
||||
LOG.info("No family specified, will execute for all families");
|
||||
}
|
||||
LOG.info(
|
||||
"Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact);
|
||||
List<HRegionLocation> regionLocations =
|
||||
connection.getRegionLocator(tableName).getAllRegionLocations();
|
||||
for (HRegionLocation location : regionLocations) {
|
||||
Optional<MajorCompactionRequest> request = MajorCompactionRequest
|
||||
.newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
|
||||
timestamp);
|
||||
request.ifPresent(majorCompactionRequest -> clusterCompactionQueues
|
||||
.addToCompactionQueue(location.getServerName(), majorCompactionRequest));
|
||||
}
|
||||
}
|
||||
|
||||
class Compact implements Runnable {
|
||||
|
||||
private final ServerName serverName;
|
||||
private final MajorCompactionRequest request;
|
||||
|
||||
Compact(ServerName serverName, MajorCompactionRequest request) {
|
||||
this.serverName = serverName;
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
@Override public void run() {
|
||||
try {
|
||||
compactAndWait(request);
|
||||
} catch (NotServingRegionException e) {
|
||||
// this region has split or merged
|
||||
LOG.warn("Region is invalid, requesting updated regions", e);
|
||||
// lets updated the cluster compaction queues with these newly created regions.
|
||||
addNewRegions();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Error compacting:", e);
|
||||
} finally {
|
||||
clusterCompactionQueues.releaseCompaction(serverName);
|
||||
}
|
||||
}
|
||||
|
||||
void compactAndWait(MajorCompactionRequest request) throws Exception {
|
||||
Admin admin = connection.getAdmin();
|
||||
try {
|
||||
// only make the request if the region is not already major compacting
|
||||
if (!isCompacting(request)) {
|
||||
Set<String> stores = request.getStoresRequiringCompaction(storesToCompact);
|
||||
if (!stores.isEmpty()) {
|
||||
request.setStores(stores);
|
||||
for (String store : request.getStores()) {
|
||||
admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(store));
|
||||
}
|
||||
}
|
||||
}
|
||||
while (isCompacting(request)) {
|
||||
Thread.sleep(sleepForMs);
|
||||
LOG.debug("Waiting for compaction to complete for region: " + request.getRegion()
|
||||
.getEncodedName());
|
||||
}
|
||||
} finally {
|
||||
// Make sure to wait for the CompactedFileDischarger chore to do its work
|
||||
int waitForArchive = connection.getConfiguration()
|
||||
.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
|
||||
Thread.sleep(waitForArchive);
|
||||
// check if compaction completed successfully, otherwise put that request back in the
|
||||
// proper queue
|
||||
Set<String> storesRequiringCompaction =
|
||||
request.getStoresRequiringCompaction(storesToCompact);
|
||||
if (!storesRequiringCompaction.isEmpty()) {
|
||||
// this happens, when a region server is marked as dead, flushes a store file and
|
||||
// the new regionserver doesn't pick it up because its accounted for in the WAL replay,
|
||||
// thus you have more store files on the filesystem than the regionserver knows about.
|
||||
boolean regionHasNotMoved = connection.getRegionLocator(tableName)
|
||||
.getRegionLocation(request.getRegion().getStartKey()).getServerName()
|
||||
.equals(serverName);
|
||||
if (regionHasNotMoved) {
|
||||
LOG.error("Not all store files were compacted, this may be due to the regionserver not "
|
||||
+ "being aware of all store files. Will not reattempt compacting, " + request);
|
||||
ERRORS.add(request);
|
||||
} else {
|
||||
request.setStores(storesRequiringCompaction);
|
||||
clusterCompactionQueues.addToCompactionQueue(serverName, request);
|
||||
LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction
|
||||
+ " region: " + request.getRegion().getEncodedName());
|
||||
}
|
||||
} else {
|
||||
LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName()
|
||||
+ " -> cf(s): " + request.getStores());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isCompacting(MajorCompactionRequest request) throws Exception {
|
||||
CompactionState compactionState = connection.getAdmin()
|
||||
.getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes());
|
||||
return compactionState.equals(CompactionState.MAJOR) || compactionState
|
||||
.equals(CompactionState.MAJOR_AND_MINOR);
|
||||
}
|
||||
|
||||
private void addNewRegions() {
|
||||
try {
|
||||
List<HRegionLocation> locations =
|
||||
connection.getRegionLocator(tableName).getAllRegionLocations();
|
||||
for (HRegionLocation location : locations) {
|
||||
if (location.getRegion().getRegionId() > timestamp) {
|
||||
Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest
|
||||
.newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
|
||||
timestamp);
|
||||
compactionRequest.ifPresent(request -> clusterCompactionQueues
|
||||
.addToCompactionQueue(location.getServerName(), request));
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Options options = new Options();
|
||||
options.addOption(
|
||||
Option.builder("table")
|
||||
.required()
|
||||
.desc("table name")
|
||||
.hasArg()
|
||||
.build()
|
||||
);
|
||||
options.addOption(
|
||||
Option.builder("cf")
|
||||
.optionalArg(true)
|
||||
.desc("column families: comma separated eg: a,b,c")
|
||||
.hasArg()
|
||||
.build()
|
||||
);
|
||||
options.addOption(
|
||||
Option.builder("servers")
|
||||
.required()
|
||||
.desc("Concurrent servers compacting")
|
||||
.hasArg()
|
||||
.build()
|
||||
);
|
||||
options.addOption(
|
||||
Option.builder("minModTime").
|
||||
desc("Compact if store files have modification time < minModTime")
|
||||
.hasArg()
|
||||
.build()
|
||||
);
|
||||
options.addOption(
|
||||
Option.builder("zk")
|
||||
.optionalArg(true)
|
||||
.desc("zk quorum")
|
||||
.hasArg()
|
||||
.build()
|
||||
);
|
||||
options.addOption(
|
||||
Option.builder("rootDir")
|
||||
.optionalArg(true)
|
||||
.desc("hbase.rootDir")
|
||||
.hasArg()
|
||||
.build()
|
||||
);
|
||||
options.addOption(
|
||||
Option.builder("sleep")
|
||||
.desc("Time to sleepForMs (ms) for checking compaction status per region and available "
|
||||
+ "work queues: default 30s")
|
||||
.hasArg()
|
||||
.build()
|
||||
);
|
||||
options.addOption(
|
||||
Option.builder("retries")
|
||||
.desc("Max # of retries for a compaction request," + " defaults to 3")
|
||||
.hasArg()
|
||||
.build()
|
||||
);
|
||||
options.addOption(
|
||||
Option.builder("dryRun")
|
||||
.desc("Dry run, will just output a list of regions that require compaction based on "
|
||||
+ "parameters passed")
|
||||
.hasArg(false)
|
||||
.build()
|
||||
);
|
||||
|
||||
final CommandLineParser cmdLineParser = new DefaultParser();
|
||||
CommandLine commandLine = null;
|
||||
try {
|
||||
commandLine = cmdLineParser.parse(options, args);
|
||||
} catch (ParseException parseException) {
|
||||
System.out.println(
|
||||
"ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
|
||||
+ parseException);
|
||||
printUsage(options);
|
||||
|
||||
}
|
||||
String tableName = commandLine.getOptionValue("table");
|
||||
String cf = commandLine.getOptionValue("cf", null);
|
||||
Set<String> families = Sets.newHashSet();
|
||||
if (cf != null) {
|
||||
Iterables.addAll(families, Splitter.on(",").split(cf));
|
||||
}
|
||||
|
||||
|
||||
Configuration configuration = HBaseConfiguration.create();
|
||||
int concurrency = Integer.parseInt(commandLine.getOptionValue("servers"));
|
||||
long minModTime = Long.parseLong(
|
||||
commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis())));
|
||||
String quorum =
|
||||
commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM));
|
||||
String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
|
||||
long sleep = Long.valueOf(commandLine.getOptionValue("sleep", Long.toString(30000)));
|
||||
|
||||
configuration.set(HConstants.HBASE_DIR, rootDir);
|
||||
configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
|
||||
|
||||
MajorCompactor compactor =
|
||||
new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency,
|
||||
minModTime, sleep);
|
||||
|
||||
compactor.initializeWorkQueues();
|
||||
if (!commandLine.hasOption("dryRun")) {
|
||||
compactor.compactAllRegions();
|
||||
}
|
||||
compactor.shutdown();
|
||||
}
|
||||
|
||||
private static void printUsage(final Options options) {
|
||||
String header = "\nUsage instructions\n\n";
|
||||
String footer = "\n";
|
||||
HelpFormatter formatter = new HelpFormatter();
|
||||
formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,166 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util.compaction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isA;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@Category({SmallTests.class})
|
||||
public class MajorCompactionRequestTest {
|
||||
|
||||
private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility();
|
||||
private static final String FAMILY = "a";
|
||||
private Path rootRegionDir;
|
||||
private Path regionStoreDir;
|
||||
|
||||
@Before public void setUp() throws Exception {
|
||||
rootRegionDir = UTILITY.getDataTestDirOnTestFS("MajorCompactionRequestTest");
|
||||
regionStoreDir = new Path(rootRegionDir, FAMILY);
|
||||
}
|
||||
|
||||
@Test public void testStoresNeedingCompaction() throws Exception {
|
||||
// store files older than timestamp
|
||||
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
|
||||
MajorCompactionRequest request = makeMockRequest(100, storeFiles, false);
|
||||
Optional<MajorCompactionRequest> result =
|
||||
request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
|
||||
assertTrue(result.isPresent());
|
||||
|
||||
// store files newer than timestamp
|
||||
storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
|
||||
request = makeMockRequest(100, storeFiles, false);
|
||||
result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY));
|
||||
assertFalse(result.isPresent());
|
||||
}
|
||||
|
||||
@Test public void testIfWeHaveNewReferenceFilesButOldStoreFiles() throws Exception {
|
||||
// this tests that reference files that are new, but have older timestamps for the files
|
||||
// they reference still will get compacted.
|
||||
TableName table = TableName.valueOf("MajorCompactorTest");
|
||||
TableDescriptor htd = UTILITY.createTableDescriptor(table, Bytes.toBytes(FAMILY));
|
||||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
HRegion region =
|
||||
HBaseTestingUtility.createRegionAndWAL(hri, rootRegionDir, UTILITY.getConfiguration(), htd);
|
||||
|
||||
Configuration configuration = mock(Configuration.class);
|
||||
// the reference file timestamp is newer
|
||||
List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 4, 101);
|
||||
List<Path> paths = storeFiles.stream().map(StoreFileInfo::getPath).collect(Collectors.toList());
|
||||
// the files that are referenced are older, thus we still compact.
|
||||
HRegionFileSystem fileSystem =
|
||||
mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
|
||||
MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration,
|
||||
region.getRegionInfo(), Sets.newHashSet(FAMILY), 100));
|
||||
doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
|
||||
doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
|
||||
any(Path.class));
|
||||
doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class));
|
||||
Set<String> result = majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"));
|
||||
assertEquals(FAMILY, Iterables.getOnlyElement(result));
|
||||
}
|
||||
|
||||
private HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles,
|
||||
List<StoreFileInfo> storeFiles) throws IOException {
|
||||
long timestamp = storeFiles.stream().findFirst().get().getModificationTime();
|
||||
return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp);
|
||||
}
|
||||
|
||||
private HRegionFileSystem mockFileSystem(RegionInfo info, boolean hasReferenceFiles,
|
||||
List<StoreFileInfo> storeFiles, long referenceFileTimestamp) throws IOException {
|
||||
FileSystem fileSystem = mock(FileSystem.class);
|
||||
if (hasReferenceFiles) {
|
||||
FileStatus fileStatus = mock(FileStatus.class);
|
||||
doReturn(referenceFileTimestamp).when(fileStatus).getModificationTime();
|
||||
doReturn(fileStatus).when(fileSystem).getFileLinkStatus(isA(Path.class));
|
||||
}
|
||||
HRegionFileSystem mockSystem = mock(HRegionFileSystem.class);
|
||||
doReturn(info).when(mockSystem).getRegionInfo();
|
||||
doReturn(regionStoreDir).when(mockSystem).getStoreDir(FAMILY);
|
||||
doReturn(hasReferenceFiles).when(mockSystem).hasReferences(anyString());
|
||||
doReturn(storeFiles).when(mockSystem).getStoreFiles(anyString());
|
||||
doReturn(fileSystem).when(mockSystem).getFileSystem();
|
||||
return mockSystem;
|
||||
}
|
||||
|
||||
private List<StoreFileInfo> mockStoreFiles(Path regionStoreDir, int howMany, long timestamp)
|
||||
throws IOException {
|
||||
List<StoreFileInfo> infos = Lists.newArrayList();
|
||||
int i = 0;
|
||||
while (i < howMany) {
|
||||
StoreFileInfo storeFileInfo = mock(StoreFileInfo.class);
|
||||
doReturn(timestamp).doReturn(timestamp).when(storeFileInfo).getModificationTime();
|
||||
doReturn(new Path(regionStoreDir, RandomStringUtils.randomAlphabetic(10))).when(storeFileInfo)
|
||||
.getPath();
|
||||
infos.add(storeFileInfo);
|
||||
i++;
|
||||
}
|
||||
return infos;
|
||||
}
|
||||
|
||||
private MajorCompactionRequest makeMockRequest(long timestamp, List<StoreFileInfo> storeFiles,
|
||||
boolean references) throws IOException {
|
||||
Configuration configuration = mock(Configuration.class);
|
||||
RegionInfo regionInfo = mock(RegionInfo.class);
|
||||
when(regionInfo.getEncodedName()).thenReturn("HBase");
|
||||
when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
|
||||
MajorCompactionRequest request =
|
||||
new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"), timestamp);
|
||||
MajorCompactionRequest spy = spy(request);
|
||||
HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles);
|
||||
doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
|
||||
doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
|
||||
return spy;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util.compaction;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
|
||||
import org.junit.After;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
|
||||
@Category({ MiscTests.class, MediumTests.class })
|
||||
public class MajorCompactorTest {
|
||||
|
||||
public static final byte[] FAMILY = Bytes.toBytes("a");
|
||||
private HBaseTestingUtility utility;
|
||||
|
||||
@Before public void setUp() throws Exception {
|
||||
utility = new HBaseTestingUtility();
|
||||
utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10);
|
||||
utility.startMiniCluster();
|
||||
}
|
||||
|
||||
@After public void tearDown() throws Exception {
|
||||
utility.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test public void testCompactingATable() throws Exception {
|
||||
TableName tableName = TableName.valueOf("MajorCompactorTest");
|
||||
utility.createMultiRegionTable(tableName, FAMILY, 5);
|
||||
utility.waitTableAvailable(tableName);
|
||||
Connection connection = utility.getConnection();
|
||||
Table table = connection.getTable(tableName);
|
||||
// write data and flush multiple store files:
|
||||
for (int i = 0; i < 5; i++) {
|
||||
utility.loadRandomRows(table, FAMILY, 50, 100);
|
||||
utility.flush(tableName);
|
||||
}
|
||||
table.close();
|
||||
int numberOfRegions = utility.getAdmin().getRegions(tableName).size();
|
||||
int numHFiles = utility.getNumHFiles(tableName, FAMILY);
|
||||
// we should have a table with more store files than we would before we major compacted.
|
||||
assertTrue(numberOfRegions < numHFiles);
|
||||
|
||||
MajorCompactor compactor =
|
||||
new MajorCompactor(utility.getConfiguration(), tableName,
|
||||
Sets.newHashSet(Bytes.toString(FAMILY)), 1, System.currentTimeMillis(), 200);
|
||||
compactor.initializeWorkQueues();
|
||||
compactor.compactAllRegions();
|
||||
compactor.shutdown();
|
||||
|
||||
// verify that the store has been completely major compacted.
|
||||
numberOfRegions = utility.getAdmin().getRegions(tableName).size();
|
||||
numHFiles = utility.getNumHFiles(tableName, FAMILY);
|
||||
assertEquals(numHFiles, numberOfRegions);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue