diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java new file mode 100644 index 00000000000..e0096dc3fd0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/ClusterCompactionQueues.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.compaction; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.hadoop.hbase.ServerName; + +@InterfaceAudience.Private +class ClusterCompactionQueues { + + private final Map> compactionQueues; + private final Set compactingServers; + private final ReadWriteLock lock; + private final int concurrentServers; + + ClusterCompactionQueues(int concurrentServers) { + this.concurrentServers = concurrentServers; + + this.compactionQueues = Maps.newHashMap(); + this.lock = new ReentrantReadWriteLock(); + this.compactingServers = Sets.newHashSet(); + } + + void addToCompactionQueue(ServerName serverName, MajorCompactionRequest info) { + this.lock.writeLock().lock(); + try { + List result = this.compactionQueues.get(serverName); + if (result == null) { + result = Lists.newArrayList(); + compactionQueues.put(serverName, result); + } + result.add(info); + } finally { + this.lock.writeLock().unlock(); + } + } + + boolean hasWorkItems() { + lock.readLock().lock(); + try { + for (List majorCompactionRequests : this.compactionQueues.values()) { + if (!majorCompactionRequests.isEmpty()) { + return true; + } + } + return false; + } finally { + lock.readLock().unlock(); + } + } + + int getCompactionRequestsLeftToFinish() { + lock.readLock().lock(); + try { + int size = 0; + for (List queue : compactionQueues.values()) { + size += queue.size(); + } + return size; + } finally { + lock.readLock().unlock(); + } + } + + @VisibleForTesting + List getQueue(ServerName serverName) { + lock.readLock().lock(); + try { + return compactionQueues.get(serverName); + } finally { + lock.readLock().unlock(); + } + } + + MajorCompactionRequest reserveForCompaction(ServerName serverName) { + lock.writeLock().lock(); + try { + if (!compactionQueues.get(serverName).isEmpty()) { + compactingServers.add(serverName); + return compactionQueues.get(serverName).remove(0); + } + return null; + } finally { + lock.writeLock().unlock(); + } + } + + void releaseCompaction(ServerName serverName) { + lock.writeLock().lock(); + try { + compactingServers.remove(serverName); + } finally { + lock.writeLock().unlock(); + } + } + + boolean atCapacity() { + lock.readLock().lock(); + try { + return compactingServers.size() >= concurrentServers; + } finally { + lock.readLock().unlock(); + } + } + + Optional getLargestQueueFromServersNotCompacting() { + lock.readLock().lock(); + try { + Sets.SetView difference = + Sets.difference(compactionQueues.keySet(), compactingServers); + ServerName serverName = null; + int maxItems = 0; + for (ServerName server : difference) { + if (compactionQueues.get(server).size() > maxItems) { + maxItems = compactionQueues.get(server).size(); + serverName = server; + } + } + return Optional.fromNullable(serverName); + } finally { + lock.readLock().unlock(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java new file mode 100644 index 00000000000..91bdb732158 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.compaction; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.util.FSUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +class MajorCompactionRequest { + + private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class); + + private final Configuration configuration; + private final HRegionInfo region; + private Set stores; + private final long timestamp; + + @VisibleForTesting + MajorCompactionRequest(Configuration configuration, HRegionInfo region, + Set stores, long timestamp) { + this.configuration = configuration; + this.region = region; + this.stores = stores; + this.timestamp = timestamp; + } + + static Optional newRequest(Configuration configuration, HRegionInfo info, + Set stores, long timestamp) throws IOException { + MajorCompactionRequest request = + new MajorCompactionRequest(configuration, info, stores, timestamp); + return request.createRequest(configuration, stores); + } + + HRegionInfo getRegion() { + return region; + } + + Set getStores() { + return stores; + } + + void setStores(Set stores) { + this.stores = stores; + } + + @VisibleForTesting + Optional createRequest(Configuration configuration, + Set stores) throws IOException { + Set familiesToCompact = getStoresRequiringCompaction(stores); + MajorCompactionRequest request = null; + if (!familiesToCompact.isEmpty()) { + request = new MajorCompactionRequest(configuration, region, familiesToCompact, timestamp); + } + return Optional.fromNullable(request); + } + + Set getStoresRequiringCompaction(Set requestedStores) throws IOException { + try(Connection connection = getConnection(configuration)) { + HRegionFileSystem fileSystem = getFileSystem(connection); + Set familiesToCompact = Sets.newHashSet(); + for (String family : requestedStores) { + // do we have any store files? + Collection storeFiles = fileSystem.getStoreFiles(family); + if (storeFiles == null) { + LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem + .getRegionInfo().getEncodedName(), " has no store files"); + continue; + } + // check for reference files + if (fileSystem.hasReferences(family) && familyHasReferenceFile(fileSystem, family)) { + familiesToCompact.add(family); + LOG.info("Including store: " + family + " with: " + storeFiles.size() + + " files for compaction for region: " + fileSystem.getRegionInfo().getEncodedName()); + continue; + } + // check store file timestamps + boolean includeStore = false; + for (StoreFileInfo storeFile : storeFiles) { + if (storeFile.getModificationTime() < timestamp) { + LOG.info("Including store: " + family + " with: " + storeFiles.size() + + " files for compaction for region: " + + fileSystem.getRegionInfo().getEncodedName()); + familiesToCompact.add(family); + includeStore = true; + break; + } + } + if (!includeStore) { + LOG.info("Excluding store: " + family + " for compaction for region: " + fileSystem + .getRegionInfo().getEncodedName(), " already compacted"); + } + } + return familiesToCompact; + } + } + + @VisibleForTesting + Connection getConnection(Configuration configuration) throws IOException { + return ConnectionFactory.createConnection(configuration); + } + + private boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family) + throws IOException { + List referenceFiles = + getReferenceFilePaths(fileSystem.getFileSystem(), fileSystem.getStoreDir(family)); + for (Path referenceFile : referenceFiles) { + FileStatus status = fileSystem.getFileSystem().getFileLinkStatus(referenceFile); + if (status.getModificationTime() < timestamp) { + LOG.info("Including store: " + family + " for compaction for region: " + fileSystem + .getRegionInfo().getEncodedName() + " (reference store files)"); + return true; + } + } + return false; + + } + + @VisibleForTesting + List getReferenceFilePaths(FileSystem fileSystem, Path familyDir) + throws IOException { + return FSUtils.getReferenceFilePaths(fileSystem, familyDir); + } + + @VisibleForTesting + HRegionFileSystem getFileSystem(Connection connection) throws IOException { + Admin admin = connection.getAdmin(); + return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(), + FSUtils.getCurrentFileSystem(admin.getConfiguration()), + FSUtils.getTableDir(FSUtils.getRootDir(admin.getConfiguration()), region.getTable()), + region, true); + } + + @Override + public String toString() { + return "region: " + region.getEncodedName() + " store(s): " + stores; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java new file mode 100644 index 00000000000..a5e44945edc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java @@ -0,0 +1,372 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.compaction; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class MajorCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(MajorCompactor.class); + private static final Set ERRORS = Sets.newHashSet(); + + private final ClusterCompactionQueues clusterCompactionQueues; + private final long timestamp; + private final Set storesToCompact; + private final ExecutorService executor; + private final long sleepForMs; + private final Connection connection; + private final TableName tableName; + + public MajorCompactor(Configuration conf, TableName tableName, Set storesToCompact, + int concurrency, long timestamp, long sleepForMs) throws IOException { + this.connection = ConnectionFactory.createConnection(conf); + this.tableName = tableName; + this.timestamp = timestamp; + this.storesToCompact = storesToCompact; + this.executor = Executors.newFixedThreadPool(concurrency); + this.clusterCompactionQueues = new ClusterCompactionQueues(concurrency); + this.sleepForMs = sleepForMs; + } + + public void compactAllRegions() throws Exception { + List> futures = Lists.newArrayList(); + while (clusterCompactionQueues.hasWorkItems() || !futuresComplete(futures)) { + while (clusterCompactionQueues.atCapacity()) { + LOG.debug("Waiting for servers to complete Compactions"); + Thread.sleep(sleepForMs); + } + Optional serverToProcess = + clusterCompactionQueues.getLargestQueueFromServersNotCompacting(); + if (serverToProcess.isPresent() && clusterCompactionQueues.hasWorkItems()) { + ServerName serverName = serverToProcess.get(); + // check to see if the region has moved... if so we have to enqueue it again with + // the proper serverName + MajorCompactionRequest request = clusterCompactionQueues.reserveForCompaction(serverName); + + ServerName currentServer = connection.getRegionLocator(tableName) + .getRegionLocation(request.getRegion().getStartKey()).getServerName(); + + if (!currentServer.equals(serverName)) { + // add it back to the queue with the correct server it should be picked up in the future. + LOG.info("Server changed for region: " + request.getRegion().getEncodedName() + " from: " + + serverName + " to: " + currentServer + " re-queuing request"); + clusterCompactionQueues.addToCompactionQueue(currentServer, request); + clusterCompactionQueues.releaseCompaction(serverName); + } else { + LOG.info("Firing off compaction request for server: " + serverName + ", " + request + + " total queue size left: " + clusterCompactionQueues + .getCompactionRequestsLeftToFinish()); + futures.add(executor.submit(new Compact(serverName, request))); + } + } else { + // haven't assigned anything so we sleep. + Thread.sleep(sleepForMs); + } + } + LOG.info("All compactions have completed"); + } + + private boolean futuresComplete(List> futures) { + Iterables.removeIf(futures, new Predicate>() { + @Override public boolean apply(Future input) { + return input.isDone(); + } + }); + return futures.isEmpty(); + } + + public void shutdown() throws Exception { + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + if (!ERRORS.isEmpty()) { + StringBuilder builder = + new StringBuilder().append("Major compaction failed, there were: ").append(ERRORS.size()) + .append(" regions / stores that failed compacting\n") + .append("Failed compaction requests\n").append("--------------------------\n") + .append(Joiner.on("\n").join(ERRORS)); + LOG.error(builder.toString()); + } + if (connection != null) { + connection.close(); + } + LOG.info("All regions major compacted successfully"); + } + + @VisibleForTesting + void initializeWorkQueues() throws IOException { + if (storesToCompact.isEmpty()) { + for (HColumnDescriptor a : connection.getTable(tableName).getTableDescriptor() + .getFamilies()) { + storesToCompact.add(Bytes.toString(a.getName())); + } + LOG.info("No family specified, will execute for all families"); + } + LOG.info( + "Initializing compaction queues for table: " + tableName + " with cf: " + storesToCompact); + List regionLocations = + connection.getRegionLocator(tableName).getAllRegionLocations(); + for (HRegionLocation location : regionLocations) { + Optional request = MajorCompactionRequest + .newRequest(connection.getConfiguration(), location.getRegionInfo(), storesToCompact, + timestamp); + if (request.isPresent()) { + clusterCompactionQueues.addToCompactionQueue(location.getServerName(), request.get()); + } + } + } + + class Compact implements Runnable { + + private final ServerName serverName; + private final MajorCompactionRequest request; + + Compact(ServerName serverName, MajorCompactionRequest request) { + this.serverName = serverName; + this.request = request; + } + + @Override public void run() { + try { + compactAndWait(request); + } catch (NotServingRegionException e) { + // this region has split or merged + LOG.warn("Region is invalid, requesting updated regions", e); + // lets updated the cluster compaction queues with these newly created regions. + addNewRegions(); + } catch (Exception e) { + LOG.warn("Error compacting:", e); + } finally { + clusterCompactionQueues.releaseCompaction(serverName); + } + } + + void compactAndWait(MajorCompactionRequest request) throws Exception { + Admin admin = connection.getAdmin(); + try { + // only make the request if the region is not already major compacting + if (!isCompacting(request)) { + Set stores = request.getStoresRequiringCompaction(storesToCompact); + if (!stores.isEmpty()) { + request.setStores(stores); + for (String store : request.getStores()) { + admin.majorCompactRegion(request.getRegion().getEncodedNameAsBytes(), + Bytes.toBytes(store)); + } + } + } + while (isCompacting(request)) { + Thread.sleep(sleepForMs); + LOG.debug("Waiting for compaction to complete for region: " + request.getRegion() + .getEncodedName()); + } + } finally { + // Make sure to wait for the CompactedFileDischarger chore to do its work + int waitForArchive = connection.getConfiguration() + .getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); + Thread.sleep(waitForArchive); + // check if compaction completed successfully, otherwise put that request back in the + // proper queue + Set storesRequiringCompaction = + request.getStoresRequiringCompaction(storesToCompact); + if (!storesRequiringCompaction.isEmpty()) { + // this happens, when a region server is marked as dead, flushes a store file and + // the new regionserver doesn't pick it up because its accounted for in the WAL replay, + // thus you have more store files on the filesystem than the regionserver knows about. + boolean regionHasNotMoved = connection.getRegionLocator(tableName) + .getRegionLocation(request.getRegion().getStartKey()).getServerName() + .equals(serverName); + if (regionHasNotMoved) { + LOG.error("Not all store files were compacted, this may be due to the regionserver not " + + "being aware of all store files. Will not reattempt compacting, " + request); + ERRORS.add(request); + } else { + request.setStores(storesRequiringCompaction); + clusterCompactionQueues.addToCompactionQueue(serverName, request); + LOG.info("Compaction failed for the following stores: " + storesRequiringCompaction + + " region: " + request.getRegion().getEncodedName()); + } + } else { + LOG.info("Compaction complete for region: " + request.getRegion().getEncodedName() + + " -> cf(s): " + request.getStores()); + } + } + } + } + + private boolean isCompacting(MajorCompactionRequest request) throws Exception { + AdminProtos.GetRegionInfoResponse.CompactionState compactionState = connection.getAdmin() + .getCompactionStateForRegion(request.getRegion().getEncodedNameAsBytes()); + return compactionState.equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR) + || compactionState + .equals(AdminProtos.GetRegionInfoResponse.CompactionState.MAJOR_AND_MINOR); + } + + private void addNewRegions() { + try { + List locations = + connection.getRegionLocator(tableName).getAllRegionLocations(); + for (HRegionLocation location : locations) { + if (location.getRegionInfo().getRegionId() > timestamp) { + Optional compactionRequest = MajorCompactionRequest + .newRequest(connection.getConfiguration(), location.getRegionInfo(), storesToCompact, + timestamp); + if (compactionRequest.isPresent()) { + clusterCompactionQueues + .addToCompactionQueue(location.getServerName(), compactionRequest.get()); + } + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void main(String[] args) throws Exception { + Options options = new Options(); + Option tableOption = new Option("table", true, "table name"); + tableOption.setRequired(true); + options.addOption(tableOption); + + Option cfOption = new Option("cf", true, "column families: comma separated eg: a,b,c"); + cfOption.setOptionalArg(true); + options.addOption(cfOption); + + Option serverOption = new Option("servers", true, "Concurrent servers compacting"); + serverOption.setRequired(true); + options.addOption(serverOption); + + options.addOption(new Option("minModTime", true, + "Compact if store files have modification time < minModTime")); + + Option zkOption = new Option("zk", true, "zk quorum"); + zkOption.setOptionalArg(true); + options.addOption(zkOption); + + Option rootDirOption = new Option("rootDir", true, "hbase.rootDir"); + rootDirOption.setOptionalArg(true); + options.addOption(rootDirOption); + + Option sleepOption = new Option("sleep", true, "Time to sleepForMs (ms) for checking " + + "compaction status per region and available " + + "work queues: default 30s"); + options.addOption(sleepOption); + + Option retryOption = new Option("retries", true, "Max # of retries for a compaction request," + + " defaults to 3"); + options.addOption( + retryOption + ); + + options.addOption( + new Option( + "dryRun", + false, + "Dry run, will just output a list of regions that require compaction " + + "based on parameters passed") + ); + + final CommandLineParser cmdLineParser = new BasicParser(); + CommandLine commandLine = null; + try { + commandLine = cmdLineParser.parse(options, args); + } catch (ParseException parseException) { + System.out.println( + "ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: " + + parseException); + printUsage(options); + throw parseException; + } + + String tableName = commandLine.getOptionValue("table"); + String cf = commandLine.getOptionValue("cf", null); + Set families = Sets.newHashSet(); + if (cf != null) { + Iterables.addAll(families, Splitter.on(",").split(cf)); + } + + + Configuration configuration = HBaseConfiguration.create(); + int concurrency = Integer.parseInt(commandLine.getOptionValue("servers")); + long minModTime = Long.parseLong( + commandLine.getOptionValue("minModTime", String.valueOf(System.currentTimeMillis()))); + String quorum = + commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM)); + String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR)); + long sleep = Long.parseLong(commandLine.getOptionValue("sleep", "30000")); + + configuration.set(HConstants.HBASE_DIR, rootDir); + configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum); + + MajorCompactor compactor = + new MajorCompactor(configuration, TableName.valueOf(tableName), families, concurrency, + minModTime, sleep); + + compactor.initializeWorkQueues(); + if (!commandLine.hasOption("dryRun")) { + compactor.compactAllRegions(); + } + compactor.shutdown(); + } + + private static void printUsage(final Options options) { + String header = "\nUsage instructions\n\n"; + String footer = "\n"; + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(MajorCompactor.class.getSimpleName(), header, options, footer, true); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java new file mode 100644 index 00000000000..de6f5807da3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequestTest.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util.compaction; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@Category({SmallTests.class}) +public class MajorCompactionRequestTest { + + private static final HBaseTestingUtility UTILITY = new HBaseTestingUtility(); + private static final String FAMILY = "a"; + private Path rootRegionDir; + private Path regionStoreDir; + + @Before public void setUp() throws Exception { + rootRegionDir = UTILITY.getDataTestDirOnTestFS("MajorCompactionRequestTest"); + regionStoreDir = new Path(rootRegionDir, FAMILY); + } + + @Test public void testStoresNeedingCompaction() throws Exception { + // store files older than timestamp + List storeFiles = mockStoreFiles(regionStoreDir, 5, 10); + MajorCompactionRequest request = makeMockRequest(100, storeFiles, false); + Optional result = + request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY)); + assertTrue(result.isPresent()); + + // store files newer than timestamp + storeFiles = mockStoreFiles(regionStoreDir, 5, 101); + request = makeMockRequest(100, storeFiles, false); + result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY)); + assertFalse(result.isPresent()); + } + + @Test public void testIfWeHaveNewReferenceFilesButOldStoreFiles() throws Exception { + // this tests that reference files that are new, but have older timestamps for the files + // they reference still will get compacted. + TableName table = TableName.valueOf("MajorCompactorTest"); + HTableDescriptor htd = UTILITY.createTableDescriptor(table, Bytes.toBytes(FAMILY)); + HRegionInfo hri = new HRegionInfo(htd.getTableName()); + HRegion region = + HBaseTestingUtility.createRegionAndWAL(hri, rootRegionDir, UTILITY.getRandomDir(), + UTILITY.getConfiguration(), htd); + + Configuration configuration = mock(Configuration.class); + // the reference file timestamp is newer + List storeFiles = mockStoreFiles(regionStoreDir, 4, 101); + List paths = new ArrayList<>(); + for (StoreFileInfo storeFile : storeFiles) { + Path path = storeFile.getPath(); + paths.add(path); + } + // the files that are referenced are older, thus we still compact. + HRegionFileSystem fileSystem = + mockFileSystem(region.getRegionInfo(), true, storeFiles, 50); + MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration, + region.getRegionInfo(), Sets.newHashSet(FAMILY), 100)); + doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration)); + doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class), + any(Path.class)); + doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class)); + Set result = majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a")); + assertEquals(FAMILY, Iterables.getOnlyElement(result)); + } + + private HRegionFileSystem mockFileSystem(HRegionInfo info, boolean hasReferenceFiles, + List storeFiles) throws IOException { + Optional found = Optional.absent(); + for (StoreFileInfo storeFile : storeFiles) { + found = Optional.of(storeFile); + break; + } + long timestamp = found.get().getModificationTime(); + return mockFileSystem(info, hasReferenceFiles, storeFiles, timestamp); + } + + private HRegionFileSystem mockFileSystem(HRegionInfo info, boolean hasReferenceFiles, + List storeFiles, long referenceFileTimestamp) throws IOException { + FileSystem fileSystem = mock(FileSystem.class); + if (hasReferenceFiles) { + FileStatus fileStatus = mock(FileStatus.class); + doReturn(referenceFileTimestamp).when(fileStatus).getModificationTime(); + doReturn(fileStatus).when(fileSystem).getFileLinkStatus(isA(Path.class)); + } + HRegionFileSystem mockSystem = mock(HRegionFileSystem.class); + doReturn(info).when(mockSystem).getRegionInfo(); + doReturn(regionStoreDir).when(mockSystem).getStoreDir(FAMILY); + doReturn(hasReferenceFiles).when(mockSystem).hasReferences(anyString()); + doReturn(storeFiles).when(mockSystem).getStoreFiles(anyString()); + doReturn(fileSystem).when(mockSystem).getFileSystem(); + return mockSystem; + } + + private List mockStoreFiles(Path regionStoreDir, int howMany, long timestamp) + throws IOException { + List infos = Lists.newArrayList(); + int i = 0; + while (i < howMany) { + StoreFileInfo storeFileInfo = mock(StoreFileInfo.class); + doReturn(timestamp).doReturn(timestamp).when(storeFileInfo).getModificationTime(); + doReturn(new Path(regionStoreDir, RandomStringUtils.randomAlphabetic(10))).when(storeFileInfo) + .getPath(); + infos.add(storeFileInfo); + i++; + } + return infos; + } + + private MajorCompactionRequest makeMockRequest(long timestamp, List storeFiles, + boolean references) throws IOException { + Configuration configuration = mock(Configuration.class); + HRegionInfo regionInfo = mock(HRegionInfo.class); + when(regionInfo.getEncodedName()).thenReturn("HBase"); + when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo")); + MajorCompactionRequest request = + new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"), timestamp); + MajorCompactionRequest spy = spy(request); + HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles); + doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class)); + doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration)); + return spy; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java new file mode 100644 index 00000000000..751a8492676 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.compaction; + +import java.io.IOException; +import java.util.Random; +import com.google.common.collect.Sets; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category({ MiscTests.class, MediumTests.class }) +public class MajorCompactorTest { + + public static final byte[] FAMILY = Bytes.toBytes("a"); + private HBaseTestingUtility utility; + + @Before public void setUp() throws Exception { + utility = new HBaseTestingUtility(); + utility.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 10); + utility.startMiniCluster(); + } + + @After public void tearDown() throws Exception { + utility.shutdownMiniCluster(); + } + + @Test public void testCompactingATable() throws Exception { + TableName tableName = TableName.valueOf("MajorCompactorTest"); + utility.createMultiRegionTable(tableName, FAMILY, 5); + utility.waitTableAvailable(tableName); + Connection connection = utility.getConnection(); + Table table = connection.getTable(tableName); + // write data and flush multiple store files: + for (int i = 0; i < 5; i++) { + loadRandomRows(table, FAMILY, 50, 100); + utility.flush(tableName); + } + table.close(); + int numberOfRegions = utility.getHBaseAdmin().getTableRegions(tableName).size(); + int numHFiles = utility.getNumHFiles(tableName, FAMILY); + // we should have a table with more store files than we would before we major compacted. + assertTrue(numberOfRegions < numHFiles); + + MajorCompactor compactor = + new MajorCompactor(utility.getConfiguration(), tableName, + Sets.newHashSet(Bytes.toString(FAMILY)), 1, System.currentTimeMillis(), 200); + compactor.initializeWorkQueues(); + compactor.compactAllRegions(); + compactor.shutdown(); + + // verify that the store has been completely major compacted. + numberOfRegions = utility.getHBaseAdmin().getTableRegions(tableName).size(); + numHFiles = utility.getNumHFiles(tableName, FAMILY); + assertEquals(numHFiles, numberOfRegions); + } + + private void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) + throws IOException { + Random r = new Random(); + byte[] row = new byte[rowSize]; + for (int i = 0; i < totalRows; i++) { + r.nextBytes(row); + Put put = new Put(row); + put.addColumn(f, new byte[]{0}, new byte[]{0}); + t.put(put); + } + } +} \ No newline at end of file