From eb52e26822125028587201728f19caff4ae69976 Mon Sep 17 00:00:00 2001 From: Apekshit Sharma Date: Tue, 11 Oct 2016 13:45:08 -0700 Subject: [PATCH] HBASE-16811 Remove mob sweep job. Change-Id: If849fdcafb8af4a9765be1eb80da77f8b3f29a1a --- .../hbase/mob/mapreduce/MemStoreWrapper.java | 188 ------ .../mapreduce/MobFilePathHashPartitioner.java | 41 -- .../hadoop/hbase/mob/mapreduce/SweepJob.java | 604 ------------------ .../mob/mapreduce/SweepJobNodeTracker.java | 100 --- .../hbase/mob/mapreduce/SweepMapper.java | 87 --- .../hbase/mob/mapreduce/SweepReducer.java | 471 -------------- .../hadoop/hbase/mob/mapreduce/Sweeper.java | 126 ---- .../hbase/mob/mapreduce/TestMobSweepJob.java | 167 ----- .../mob/mapreduce/TestMobSweepMapper.java | 116 ---- .../mob/mapreduce/TestMobSweepReducer.java | 216 ------- 10 files changed, 2116 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java deleted file mode 100644 index 2dea5bbcefe..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MemStoreWrapper.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.mapreduce; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ArrayBackedTag; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.TagType; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.crypto.Encryption; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.mob.MobConstants; -import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; -import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.MemStore; -import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; -import org.apache.hadoop.hbase.regionserver.StoreFileWriter; -import org.apache.hadoop.hbase.security.EncryptionUtil; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Reducer.Context; - -/** - * The wrapper of a DefaultMemStore. - * This wrapper is used in the sweep reducer to buffer and sort the cells written from - * the invalid and small mob files. - * It's flushed when it's full, the mob data are written to the mob files, and their file names - * are written back to store files of HBase. - * This memStore is used to sort the cells in mob files. - * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date), - * in each group, the reducer iterates the files and read the cells to a new and bigger mob file. - * The cells in the same mob file are ordered, but cells across mob files are not. - * So we need this MemStoreWrapper to sort those cells come from different mob files before - * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file. - */ -@InterfaceAudience.Private -public class MemStoreWrapper { - - private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class); - - private MemStore memstore; - private long flushSize; - private CompactionPartitionId partitionId; - private Context context; - private Configuration conf; - private BufferedMutator table; - private HColumnDescriptor hcd; - private Path mobFamilyDir; - private FileSystem fs; - private CacheConfig cacheConfig; - private Encryption.Context cryptoContext = Encryption.Context.NONE; - - public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table, - HColumnDescriptor hcd, MemStore memstore, CacheConfig cacheConfig) throws IOException { - this.memstore = memstore; - this.context = context; - this.fs = fs; - this.table = table; - this.hcd = hcd; - this.conf = context.getConfiguration(); - this.cacheConfig = cacheConfig; - flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE, - MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE); - mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString()); - cryptoContext = EncryptionUtil.createEncryptionContext(conf, hcd); - } - - public void setPartitionId(CompactionPartitionId partitionId) { - this.partitionId = partitionId; - } - - /** - * Flushes the memstore if the size is large enough. - * @throws IOException - */ - private void flushMemStoreIfNecessary() throws IOException { - if (memstore.heapSize() >= flushSize) { - flushMemStore(); - } - } - - /** - * Flushes the memstore anyway. - * @throws IOException - */ - public void flushMemStore() throws IOException { - MemStoreSnapshot snapshot = memstore.snapshot(); - internalFlushCache(snapshot); - memstore.clearSnapshot(snapshot.getId()); - } - - /** - * Flushes the snapshot of the memstore. - * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase. - * @param snapshot The snapshot of the memstore. - * @throws IOException - */ - private void internalFlushCache(final MemStoreSnapshot snapshot) - throws IOException { - if (snapshot.getCellsCount() == 0) { - return; - } - // generate the files into a temp directory. - String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY); - StoreFileWriter mobFileWriter = MobUtils.createWriter(conf, fs, hcd, partitionId.getDate(), - new Path(tempPathString), snapshot.getCellsCount(), hcd.getCompactionCompression(), - partitionId.getStartKey(), cacheConfig, cryptoContext); - - String relativePath = mobFileWriter.getPath().getName(); - LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString()); - - byte[] referenceValue = Bytes.toBytes(relativePath); - KeyValueScanner scanner = snapshot.getScanner(); - Cell cell = null; - while (null != (cell = scanner.next())) { - mobFileWriter.append(cell); - } - scanner.close(); - // Write out the log sequence number that corresponds to this output - // hfile. The hfile is current up to and including logCacheFlushId. - mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount()); - mobFileWriter.close(); - - MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig); - context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1); - // write reference/fileName back to the store files of HBase. - scanner = snapshot.getScanner(); - scanner.seek(CellUtil.createFirstOnRow(HConstants.EMPTY_START_ROW)); - cell = null; - Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE, - Bytes.toBytes(this.table.getName().toString())); - long updatedCount = 0; - while (null != (cell = scanner.next())) { - KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag); - Put put = - new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength()); - put.add(reference); - table.mutate(put); - updatedCount++; - } - table.flush(); - context.getCounter(SweepCounter.RECORDS_UPDATED).increment(updatedCount); - scanner.close(); - } - - /** - * Adds a Cell into the memstore. - * @param cell The Cell to be added. - * @throws IOException - */ - public void addToMemstore(Cell cell) throws IOException { - memstore.add(cell); - // flush the memstore if it's full. - flushMemStoreIfNecessary(); - } - -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java deleted file mode 100644 index 0dbacfb8ea4..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/MobFilePathHashPartitioner.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.mapreduce; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.mob.MobFileName; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Partitioner; - -/** - * The partitioner for the sweep job. - * The key is a mob file name. We bucket by date. - */ -@InterfaceAudience.Private -public class MobFilePathHashPartitioner extends Partitioner { - - @Override - public int getPartition(Text fileName, KeyValue kv, int numPartitions) { - MobFileName mobFileName = MobFileName.create(fileName.toString()); - String date = mobFileName.getDate(); - int hash = date.hashCode(); - return (hash & Integer.MAX_VALUE) % numPartitions; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java deleted file mode 100644 index 0b3ca292524..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJob.java +++ /dev/null @@ -1,604 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.mapreduce; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.NoTagsKeyValue; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.HFileLink; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.hadoop.hbase.mob.MobConstants; -import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.DNS; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.serializer.JavaSerialization; -import org.apache.hadoop.io.serializer.WritableSerialization; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.zookeeper.KeeperException; - -/** - * The sweep job. - * Run map reduce to merge the smaller mob files into bigger ones and cleans the unused ones. - */ -@InterfaceAudience.Private -public class SweepJob { - - private final FileSystem fs; - private final Configuration conf; - private static final Log LOG = LogFactory.getLog(SweepJob.class); - static final String SWEEP_JOB_ID = "hbase.mob.sweep.job.id"; - static final String SWEEP_JOB_SERVERNAME = "hbase.mob.sweep.job.servername"; - static final String SWEEP_JOB_TABLE_NODE = "hbase.mob.sweep.job.table.node"; - static final String WORKING_DIR_KEY = "hbase.mob.sweep.job.dir"; - static final String WORKING_ALLNAMES_FILE_KEY = "hbase.mob.sweep.job.all.file"; - static final String WORKING_VISITED_DIR_KEY = "hbase.mob.sweep.job.visited.dir"; - static final String WORKING_ALLNAMES_DIR = "all"; - static final String WORKING_VISITED_DIR = "visited"; - public static final String WORKING_FILES_DIR_KEY = "mob.sweep.job.files.dir"; - //the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing. - public static final String MOB_SWEEP_JOB_DELAY = "hbase.mob.sweep.job.delay"; - protected static final long ONE_DAY = 24 * 60 * 60 * 1000; - private long compactionStartTime = EnvironmentEdgeManager.currentTime(); - public final static String CREDENTIALS_LOCATION = "credentials_location"; - private CacheConfig cacheConfig; - static final int SCAN_CACHING = 10000; - private TableLockManager tableLockManager; - - public SweepJob(Configuration conf, FileSystem fs) { - this.conf = conf; - this.fs = fs; - // disable the block cache. - Configuration copyOfConf = new Configuration(conf); - copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); - cacheConfig = new CacheConfig(copyOfConf); - } - - static ServerName getCurrentServerName(Configuration conf) throws IOException { - String hostname = conf.get( - "hbase.regionserver.ipc.address", - Strings.domainNamePointerToHostName(DNS.getDefaultHost( - conf.get("hbase.regionserver.dns.interface", "default"), - conf.get("hbase.regionserver.dns.nameserver", "default")))); - int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); - // Creation of a HSA will force a resolve. - InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); - if (initialIsa.getAddress() == null) { - throw new IllegalArgumentException("Failed resolve of " + initialIsa); - } - return ServerName.valueOf(initialIsa.getHostName(), initialIsa.getPort(), - EnvironmentEdgeManager.currentTime()); - } - - /** - * Runs MapReduce to do the sweeping on the mob files. - * There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob - * references from 'normal' regions' rows. - * The running of the sweep tool on the same column family are mutually exclusive. - * The HBase major compaction and running of the sweep tool on the same column family - * are mutually exclusive. - * The synchronization is done by the ZooKeeper. - * So in the beginning of the running, we need to make sure only this sweep tool is the only one - * that is currently running in this column family, and in this column family there're no major - * compaction in progress. - * @param tn The current table name. - * @param family The descriptor of the current column family. - * @return 0 upon success, 3 if bailing out because another compaction is currently happening, - * or 4 the mr job was unsuccessful - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - * @throws KeeperException - */ - public int sweep(TableName tn, HColumnDescriptor family) throws IOException, - ClassNotFoundException, InterruptedException, KeeperException { - Configuration conf = new Configuration(this.conf); - // check whether the current user is the same one with the owner of hbase root - String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName(); - FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR))); - if (hbaseRootFileStat.length > 0) { - String owner = hbaseRootFileStat[0].getOwner(); - if (!owner.equals(currentUserName)) { - String errorMsg = "The current user[" + currentUserName - + "] doesn't have hbase root credentials." - + " Please make sure the user is the root of the target HBase"; - LOG.error(errorMsg); - throw new IOException(errorMsg); - } - } else { - LOG.error("The target HBase doesn't exist"); - throw new IOException("The target HBase doesn't exist"); - } - String familyName = family.getNameAsString(); - String id = "SweepJob" + UUID.randomUUID().toString().replace("-", ""); - ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, id, new DummyMobAbortable()); - try { - ServerName serverName = getCurrentServerName(conf); - tableLockManager = TableLockManager.createTableLockManager(conf, zkw, serverName); - TableName lockName = MobUtils.getTableLockName(tn); - TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool"); - String tableName = tn.getNameAsString(); - // Try to obtain the lock. Use this lock to synchronize all the query - try { - lock.acquire(); - } catch (Exception e) { - LOG.warn("Can not lock the table " + tableName - + ". The major compaction in HBase may be in-progress or another sweep job is running." - + " Please re-run the job."); - return 3; - } - Job job = null; - try { - Scan scan = new Scan(); - scan.addFamily(family.getName()); - // Do not retrieve the mob data when scanning - scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); - scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE)); - scan.setCaching(SCAN_CACHING); - scan.setCacheBlocks(false); - scan.setMaxVersions(family.getMaxVersions()); - conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, - JavaSerialization.class.getName() + "," + WritableSerialization.class.getName()); - conf.set(SWEEP_JOB_ID, id); - conf.set(SWEEP_JOB_SERVERNAME, serverName.toString()); - String tableLockNode = ZKUtil.joinZNode(zkw.znodePaths.tableLockZNode, - lockName.getNameAsString()); - conf.set(SWEEP_JOB_TABLE_NODE, tableLockNode); - job = prepareJob(tn, familyName, scan, conf); - job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName); - // Record the compaction start time. - // In the sweep tool, only the mob file whose modification time is older than - // (startTime - delay) could be handled by this tool. - // The delay is one day. It could be configured as well, but this is only used - // in the test. - job.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, - compactionStartTime); - - job.setPartitionerClass(MobFilePathHashPartitioner.class); - submit(job, tn, familyName); - if (job.waitForCompletion(true)) { - // Archive the unused mob files. - removeUnusedFiles(job, tn, family); - } else { - System.err.println("Job was not successful"); - return 4; - } - } finally { - try { - cleanup(job, tn, familyName); - } finally { - try { - lock.release(); - } catch (IOException e) { - LOG.error("Failed to release the table lock " + tableName, e); - } - } - } - } finally { - zkw.close(); - } - return 0; - } - - /** - * Prepares a map reduce job. - * @param tn The current table name. - * @param familyName The current family name. - * @param scan The current scan. - * @param conf The current configuration. - * @return A map reduce job. - * @throws IOException - */ - private Job prepareJob(TableName tn, String familyName, Scan scan, Configuration conf) - throws IOException { - Job job = Job.getInstance(conf); - job.setJarByClass(SweepMapper.class); - TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan, - SweepMapper.class, Text.class, Writable.class, job); - - job.setInputFormatClass(TableInputFormat.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(NoTagsKeyValue.class); - job.setReducerClass(SweepReducer.class); - job.setOutputFormatClass(NullOutputFormat.class); - String jobName = getCustomJobName(this.getClass().getSimpleName(), tn, familyName); - job.setJobName(jobName); - if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) { - String fileLoc = conf.get(CREDENTIALS_LOCATION); - Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf); - job.getCredentials().addAll(cred); - } - return job; - } - - /** - * Gets a customized job name. - * It's className-mapperClassName-reducerClassName-tableName-familyName. - * @param className The current class name. - * @param tableName The current table name. - * @param familyName The current family name. - * @return The customized job name. - */ - private static String getCustomJobName(String className, TableName tableName, String familyName) { - StringBuilder name = new StringBuilder(); - name.append(className); - name.append('-').append(SweepMapper.class.getSimpleName()); - name.append('-').append(SweepReducer.class.getSimpleName()); - name.append('-').append(tableName.getNamespaceAsString()); - name.append('-').append(tableName.getQualifierAsString()); - name.append('-').append(familyName); - return name.toString(); - } - - /** - * Submits a job. - * @param job The current job. - * @param tn The current table name. - * @param familyName The current family name. - * @throws IOException - */ - private void submit(Job job, TableName tn, String familyName) throws IOException { - // delete the temp directory of the mob files in case the failure in the previous - // execution. - Path tempDir = - new Path(MobUtils.getMobHome(job.getConfiguration()), MobConstants.TEMP_DIR_NAME); - Path mobCompactionTempDir = - new Path(tempDir, MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME); - Path workingPath = MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName()); - job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString()); - // delete the working directory in case it'not deleted by the last running. - fs.delete(workingPath, true); - // create the working directory. - fs.mkdirs(workingPath); - // create a sequence file which contains the names of all the existing files. - Path workingPathOfFiles = new Path(workingPath, "files"); - Path workingPathOfNames = new Path(workingPath, "names"); - job.getConfiguration().set(WORKING_FILES_DIR_KEY, workingPathOfFiles.toString()); - Path allFileNamesPath = new Path(workingPathOfNames, WORKING_ALLNAMES_DIR); - job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString()); - Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR); - job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString()); - // create a directory where the files contain names of visited mob files are saved. - fs.mkdirs(vistiedFileNamesPath); - Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName); - // Find all the files whose creation time are older than one day. - // Write those file names to a file. - // In each reducer there's a writer, it write the visited file names to a file which is saved - // in WORKING_VISITED_DIR. - // After the job is finished, compare those files, then find out the unused mob files and - // archive them. - FileStatus[] files = fs.listStatus(mobStorePath); - Set fileNames = new TreeSet(); - long mobCompactionDelay = job.getConfiguration().getLong(MOB_SWEEP_JOB_DELAY, ONE_DAY); - for (FileStatus fileStatus : files) { - if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) { - if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) { - // only record the potentially unused files older than one day. - fileNames.add(fileStatus.getPath().getName()); - } - } - } - FSDataOutputStream fout = null; - SequenceFile.Writer writer = null; - try { - // create a file includes all the existing mob files whose creation time is older than - // (now - oneDay) - fout = fs.create(allFileNamesPath, true); - // write the names to a sequence file - writer = SequenceFile.createWriter(job.getConfiguration(), fout, String.class, String.class, - CompressionType.NONE, null); - for (String fileName : fileNames) { - writer.append(fileName, MobConstants.EMPTY_STRING); - } - writer.hflush(); - } finally { - if (writer != null) { - IOUtils.closeStream(writer); - } - if (fout != null) { - IOUtils.closeStream(fout); - } - } - } - - /** - * Gets the unused mob files. - * Compare the file which contains all the existing mob files and the visited files, - * find out the unused mob file and archive them. - * @param conf The current configuration. - * @return The unused mob files. - * @throws IOException - */ - List getUnusedFiles(Configuration conf) throws IOException { - // find out the unused files and archive them - Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY)); - SequenceFile.Reader allNamesReader = null; - MergeSortReader visitedNamesReader = null; - List toBeArchived = new ArrayList(); - try { - allNamesReader = new SequenceFile.Reader(fs, allFileNamesPath, conf); - visitedNamesReader = new MergeSortReader(fs, conf, - new Path(conf.get(WORKING_VISITED_DIR_KEY))); - String nextAll = (String) allNamesReader.next((String) null); - String nextVisited = visitedNamesReader.next(); - do { - if (nextAll != null) { - if (nextVisited != null) { - int compare = nextAll.compareTo(nextVisited); - if (compare < 0) { - toBeArchived.add(nextAll); - nextAll = (String) allNamesReader.next((String) null); - } else if (compare > 0) { - nextVisited = visitedNamesReader.next(); - } else { - nextAll = (String) allNamesReader.next((String) null); - nextVisited = visitedNamesReader.next(); - } - } else { - toBeArchived.add(nextAll); - nextAll = (String) allNamesReader.next((String) null); - } - } else { - break; - } - } while (nextAll != null || nextVisited != null); - } finally { - if (allNamesReader != null) { - IOUtils.closeStream(allNamesReader); - } - if (visitedNamesReader != null) { - visitedNamesReader.close(); - } - } - return toBeArchived; - } - - /** - * Archives unused mob files. - * @param job The current job. - * @param tn The current table name. - * @param hcd The descriptor of the current column family. - * @throws IOException - */ - private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor hcd) throws IOException { - // find out the unused files and archive them - List storeFiles = new ArrayList(); - List toBeArchived = getUnusedFiles(job.getConfiguration()); - // archive them - Path mobStorePath = MobUtils - .getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString()); - for (String archiveFileName : toBeArchived) { - Path path = new Path(mobStorePath, archiveFileName); - storeFiles.add(new StoreFile(fs, path, job.getConfiguration(), cacheConfig, BloomType.NONE)); - } - if (!storeFiles.isEmpty()) { - try { - MobUtils.removeMobFiles(job.getConfiguration(), fs, tn, - FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), hcd.getName(), storeFiles); - LOG.info(storeFiles.size() + " unused MOB files are removed"); - } catch (Exception e) { - LOG.error("Failed to archive the store files " + storeFiles, e); - } - } - } - - /** - * Deletes the working directory. - * @param job The current job. - * @param familyName The family to cleanup - */ - private void cleanup(Job job, TableName tn, String familyName) { - if (job != null) { - // delete the working directory - Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY)); - try { - fs.delete(workingPath, true); - } catch (IOException e) { - LOG.warn("Failed to delete the working directory after sweeping store " + familyName - + " in the table " + tn.getNameAsString(), e); - } - } - } - - /** - * A result with index. - */ - private static class IndexedResult implements Comparable { - private int index; - private String value; - - public IndexedResult(int index, String value) { - this.index = index; - this.value = value; - } - - public int getIndex() { - return this.index; - } - - public String getValue() { - return this.value; - } - - @Override - public int compareTo(IndexedResult o) { - if (this.value == null && o.getValue() == null) { - return 0; - } else if (o.value == null) { - return 1; - } else if (this.value == null) { - return -1; - } else { - return this.value.compareTo(o.value); - } - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (!(obj instanceof IndexedResult)) { - return false; - } - return compareTo((IndexedResult) obj) == 0; - } - - @Override - public int hashCode() { - return value.hashCode(); - } - } - - /** - * Merge sort reader. - * It merges and sort the readers in different sequence files as one where - * the results are read in order. - */ - private static class MergeSortReader { - - private List readers = new ArrayList(); - private PriorityQueue results = new PriorityQueue(); - - public MergeSortReader(FileSystem fs, Configuration conf, Path path) throws IOException { - if (fs.exists(path)) { - FileStatus[] files = fs.listStatus(path); - int index = 0; - for (FileStatus file : files) { - if (file.isFile()) { - SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf); - String key = (String) reader.next((String) null); - if (key != null) { - results.add(new IndexedResult(index, key)); - readers.add(reader); - index++; - } - } - } - } - } - - public String next() throws IOException { - IndexedResult result = results.poll(); - if (result != null) { - SequenceFile.Reader reader = readers.get(result.getIndex()); - String key = (String) reader.next((String) null); - if (key != null) { - results.add(new IndexedResult(result.getIndex(), key)); - } - return result.getValue(); - } - return null; - } - - public void close() { - for (SequenceFile.Reader reader : readers) { - if (reader != null) { - IOUtils.closeStream(reader); - } - } - } - } - - /** - * The counter used in sweep job. - */ - public enum SweepCounter { - - /** - * How many files are read. - */ - INPUT_FILE_COUNT, - - /** - * How many files need to be merged or cleaned. - */ - FILE_TO_BE_MERGE_OR_CLEAN, - - /** - * How many files are left after merging. - */ - FILE_AFTER_MERGE_OR_CLEAN, - - /** - * How many records are updated. - */ - RECORDS_UPDATED, - } - - public static class DummyMobAbortable implements Abortable { - - private boolean abort = false; - - public void abort(String why, Throwable e) { - abort = true; - } - - public boolean isAborted() { - return abort; - } - - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java deleted file mode 100644 index ee389f57609..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepJobNodeTracker.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.mapreduce; - -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.TableLock; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; - -/** - * Tracker on the sweep tool node in zookeeper. - * The sweep tool node is an ephemeral one, when the process dies this node is deleted, - * at that time MR might be still running, and if another sweep job is started, two MR - * for the same column family will run at the same time. - * This tracker watches this ephemeral node, if it's gone or it's not created by the - * sweep job that owns the current MR, the current process will be aborted. - */ -@InterfaceAudience.Private -public class SweepJobNodeTracker extends ZooKeeperListener { - - private String parentNode; - private String lockNodePrefix; - private String owner; - private String lockNode; - - public SweepJobNodeTracker(ZooKeeperWatcher watcher, String parentNode, String owner) { - super(watcher); - this.parentNode = parentNode; - this.owner = owner; - this.lockNodePrefix = ZKUtil.joinZNode(parentNode, "write-"); - } - - /** - * Registers the watcher on the sweep job node. - * If there's no such a sweep job node, or it's not created by the sweep job that - * owns the current MR, the current process will be aborted. - * This assumes the table lock uses the ZooKeeper. It's a workaround and only used - * in the sweep tool, and the sweep tool will be removed after the mob file compaction - * is finished. - */ - public void start() throws KeeperException { - watcher.registerListener(this); - List children = ZKUtil.listChildrenNoWatch(watcher, parentNode); - if (children != null && !children.isEmpty()) { - // there are locks - TreeSet sortedChildren = new TreeSet(); - sortedChildren.addAll(children); - // find all the write locks - SortedSet tails = sortedChildren.tailSet(lockNodePrefix); - if (!tails.isEmpty()) { - for (String tail : tails) { - String path = ZKUtil.joinZNode(parentNode, tail); - byte[] data = ZKUtil.getDataAndWatch(watcher, path); - TableLock lock = TableLockManager.fromBytes(data); - ServerName serverName = lock.getLockOwner(); - org.apache.hadoop.hbase.ServerName sn = org.apache.hadoop.hbase.ServerName.valueOf( - serverName.getHostName(), serverName.getPort(), serverName.getStartCode()); - // compare the server names (host, port and start code), make sure the lock is created - if (owner.equals(sn.toString())) { - lockNode = path; - return; - } - } - } - } - System.exit(1); - } - - @Override - public void nodeDeleted(String path) { - // If the lock node is deleted, abort the current process. - if (path.equals(lockNode)) { - System.exit(1); - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java deleted file mode 100644 index 3376046b4d4..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepMapper.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableMapper; -import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.io.Text; -import org.apache.zookeeper.KeeperException; - -/** - * The mapper of a sweep job. - * Takes the rows from the table and their results and map to {@literal } where mobValue is the actual cell in HBase. - */ -@InterfaceAudience.Private -public class SweepMapper extends TableMapper { - - private ZooKeeperWatcher zkw = null; - - @Override - protected void setup(Context context) throws IOException, - InterruptedException { - String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID); - String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME); - String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE); - zkw = new ZooKeeperWatcher(context.getConfiguration(), id, - new DummyMobAbortable()); - try { - SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner); - tracker.start(); - } catch (KeeperException e) { - throw new IOException(e); - } - } - - @Override - protected void cleanup(Context context) throws IOException, - InterruptedException { - if (zkw != null) { - zkw.close(); - } - } - - @Override - public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException, - InterruptedException { - if (columns == null) { - return; - } - Cell[] cells = columns.rawCells(); - if (cells == null || cells.length == 0) { - return; - } - for (Cell c : cells) { - if (MobUtils.hasValidMobRefCellValue(c)) { - String fileName = MobUtils.getMobFileName(c); - context.write(new Text(fileName), KeyValueUtil.ensureKeyValue(c)); - } - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java deleted file mode 100644 index b6b4f67fc1a..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/SweepReducer.java +++ /dev/null @@ -1,471 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.mapreduce; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.InvalidFamilyOperationException; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.BufferedMutator; -import org.apache.hadoop.hbase.client.BufferedMutatorParams; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.io.HFileLink; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.mob.MobConstants; -import org.apache.hadoop.hbase.mob.MobFile; -import org.apache.hadoop.hbase.mob.MobFileName; -import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId; -import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; -import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.DefaultMemStore; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.StoreFileScanner; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.zookeeper.KeeperException; - -/** - * The reducer of a sweep job. - * This reducer merges the small mob files into bigger ones, and write visited - * names of mob files to a sequence file which is used by the sweep job to delete - * the unused mob files. - * The key of the input is a file name, the value is a collection of KeyValues - * (the value format of KeyValue is valueLength + fileName) in HBase. - * In this reducer, we could know how many cells exist in HBase for a mob file. - * If the existCellSize/mobFileSize < compactionRatio, this mob - * file needs to be merged. - */ -@InterfaceAudience.Private -public class SweepReducer extends Reducer { - - private static final Log LOG = LogFactory.getLog(SweepReducer.class); - - private SequenceFile.Writer writer = null; - private MemStoreWrapper memstore; - private Configuration conf; - private FileSystem fs; - - private Path familyDir; - private CacheConfig cacheConfig; - private long compactionBegin; - private BufferedMutator table; - private HColumnDescriptor family; - private long mobCompactionDelay; - private Path mobTableDir; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - this.conf = context.getConfiguration(); - Connection c = ConnectionFactory.createConnection(this.conf); - this.fs = FileSystem.get(conf); - // the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing. - mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY); - String tableName = conf.get(TableInputFormat.INPUT_TABLE); - String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY); - TableName tn = TableName.valueOf(tableName); - this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName); - Admin admin = c.getAdmin(); - try { - family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName)); - if (family == null) { - // this column family might be removed, directly return. - throw new InvalidFamilyOperationException("Column family '" + familyName - + "' does not exist. It might be removed."); - } - } finally { - try { - admin.close(); - } catch (IOException e) { - LOG.warn("Failed to close the HBaseAdmin", e); - } - } - // disable the block cache. - Configuration copyOfConf = new Configuration(conf); - copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f); - this.cacheConfig = new CacheConfig(copyOfConf); - - table = c.getBufferedMutator(new BufferedMutatorParams(tn).writeBufferSize(1*1024*1024)); - memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig); - - // The start time of the sweep tool. - // Only the mob files whose creation time is older than startTime-oneDay will be handled by the - // reducer since it brings inconsistency to handle the latest mob files. - this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0); - mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn); - } - - private SweepPartition createPartition(CompactionPartitionId id, Context context) - throws IOException { - return new SweepPartition(id, context); - } - - @Override - public void run(Context context) throws IOException, InterruptedException { - String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID); - String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME); - String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE); - ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId, - new DummyMobAbortable()); - FSDataOutputStream fout = null; - try { - SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner); - tracker.start(); - setup(context); - // create a sequence contains all the visited file names in this reducer. - String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY); - Path nameFilePath = new Path(dir, UUID.randomUUID().toString() - .replace("-", MobConstants.EMPTY_STRING)); - fout = fs.create(nameFilePath, true); - writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class, - String.class, CompressionType.NONE, null); - CompactionPartitionId id; - SweepPartition partition = null; - // the mob files which have the same start key and date are in the same partition. - while (context.nextKey()) { - Text key = context.getCurrentKey(); - String keyString = key.toString(); - id = createPartitionId(keyString); - if (null == partition || !id.equals(partition.getId())) { - // It's the first mob file in the current partition. - if (null != partition) { - // this mob file is in different partitions with the previous mob file. - // directly close. - partition.close(); - } - // create a new one - partition = createPartition(id, context); - } - if (partition != null) { - // run the partition - partition.execute(key, context.getValues()); - } - } - if (null != partition) { - partition.close(); - } - writer.hflush(); - } catch (KeeperException e) { - throw new IOException(e); - } finally { - cleanup(context); - zkw.close(); - if (writer != null) { - IOUtils.closeStream(writer); - } - if (fout != null) { - IOUtils.closeStream(fout); - } - if (table != null) { - try { - table.close(); - } catch (IOException e) { - LOG.warn(e); - } - } - } - - } - - /** - * The mob files which have the same start key and date are in the same partition. - * The files in the same partition are merged together into bigger ones. - */ - public class SweepPartition { - - private final CompactionPartitionId id; - private final Context context; - private boolean memstoreUpdated = false; - private boolean mergeSmall = false; - private final Map fileStatusMap = new HashMap(); - private final List toBeDeleted = new ArrayList(); - - public SweepPartition(CompactionPartitionId id, Context context) throws IOException { - this.id = id; - this.context = context; - memstore.setPartitionId(id); - init(); - } - - public CompactionPartitionId getId() { - return this.id; - } - - /** - * Prepares the map of files. - * - * @throws IOException - */ - private void init() throws IOException { - FileStatus[] fileStats = listStatus(familyDir, id.getStartKey()); - if (null == fileStats) { - return; - } - - int smallFileCount = 0; - float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, - MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO); - long compactionMergeableSize = conf.getLong( - MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE, - MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE); - // list the files. Just merge the hfiles, don't merge the hfile links. - // prepare the map of mob files. The key is the file name, the value is the file status. - for (FileStatus fileStat : fileStats) { - MobFileStatus mobFileStatus = null; - if (!HFileLink.isHFileLink(fileStat.getPath())) { - mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize); - if (mobFileStatus.needMerge()) { - smallFileCount++; - } - // key is file name (not hfile name), value is hfile status. - fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus); - } - } - if (smallFileCount >= 2) { - // merge the files only when there're more than 1 files in the same partition. - this.mergeSmall = true; - } - } - - /** - * Flushes the data into mob files and store files, and archives the small - * files after they're merged. - * @throws IOException - */ - public void close() throws IOException { - if (null == id) { - return; - } - // flush remain key values into mob files - if (memstoreUpdated) { - memstore.flushMemStore(); - } - List storeFiles = new ArrayList(toBeDeleted.size()); - // delete samll files after compaction - for (Path path : toBeDeleted) { - LOG.info("[In Partition close] Delete the file " + path + " in partition close"); - storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE)); - } - if (!storeFiles.isEmpty()) { - try { - MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(), - storeFiles); - context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size()); - } catch (IOException e) { - LOG.error("Failed to archive the store files " + storeFiles, e); - } - storeFiles.clear(); - } - fileStatusMap.clear(); - } - - /** - * Merges the small mob files into bigger ones. - * @param fileName The current mob file name. - * @param values The collection of KeyValues in this mob file. - * @throws IOException - */ - public void execute(Text fileName, Iterable values) throws IOException { - if (null == values) { - return; - } - MobFileName mobFileName = MobFileName.create(fileName.toString()); - LOG.info("[In reducer] The file name: " + fileName.toString()); - MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName()); - if (null == mobFileStat) { - LOG.info("[In reducer] Cannot find the file, probably this record is obsolete"); - return; - } - // only handle the files that are older then one day. - if (compactionBegin - mobFileStat.getFileStatus().getModificationTime() - <= mobCompactionDelay) { - return; - } - // write the hfile name - writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING); - Set kvs = new HashSet(); - for (KeyValue kv : values) { - if (kv.getValueLength() > Bytes.SIZEOF_INT) { - mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(), - Bytes.SIZEOF_INT)); - } - kvs.add(kv); - } - // If the mob file is a invalid one or a small one, merge it into new/bigger ones. - if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) { - context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1); - MobFile file = MobFile.create(fs, - new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig); - StoreFileScanner scanner = null; - file.open(); - try { - scanner = file.getScanner(); - scanner.seek(CellUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY)); - Cell cell; - while (null != (cell = scanner.next())) { - if (kvs.contains(cell)) { - // write the KeyValue existing in HBase to the memstore. - memstore.addToMemstore(cell); - memstoreUpdated = true; - } - } - } finally { - if (scanner != null) { - scanner.close(); - } - file.close(); - } - toBeDeleted.add(mobFileStat.getFileStatus().getPath()); - } - } - - /** - * Lists the files with the same prefix. - * @param p The file path. - * @param prefix The prefix. - * @return The files with the same prefix. - * @throws IOException - */ - private FileStatus[] listStatus(Path p, String prefix) throws IOException { - return fs.listStatus(p, new PathPrefixFilter(prefix)); - } - } - - static class PathPrefixFilter implements PathFilter { - - private final String prefix; - - public PathPrefixFilter(String prefix) { - this.prefix = prefix; - } - - public boolean accept(Path path) { - return path.getName().startsWith(prefix, 0); - } - - } - - /** - * Creates the partition id. - * @param fileNameAsString The current file name, in string. - * @return The partition id. - */ - private CompactionPartitionId createPartitionId(String fileNameAsString) { - MobFileName fileName = MobFileName.create(fileNameAsString); - return new CompactionPartitionId(fileName.getStartKey(), fileName.getDate()); - } - - /** - * The mob file status used in the sweep reduecer. - */ - private static class MobFileStatus { - private FileStatus fileStatus; - private int validSize; - private long size; - - private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO; - private long compactionMergeableSize = - MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE; - - /** - * @param fileStatus The current FileStatus. - * @param compactionRatio compactionRatio the invalid ratio. - * If there're too many cells deleted in a mob file, it's regarded as invalid, - * and needs to be written to a new one. - * If existingCellSize/fileSize < compactionRatio, it's regarded as a invalid one. - * @param compactionMergeableSize compactionMergeableSize If the size of a mob file is less - * than this value, it's regarded as a small file and needs to be merged - */ - public MobFileStatus(FileStatus fileStatus, float compactionRatio, - long compactionMergeableSize) { - this.fileStatus = fileStatus; - this.size = fileStatus.getLen(); - validSize = 0; - this.compactionRatio = compactionRatio; - this.compactionMergeableSize = compactionMergeableSize; - } - - /** - * Add size to this file. - * @param size The size to be added. - */ - public void addValidSize(int size) { - this.validSize += size; - } - - /** - * Whether the mob files need to be cleaned. - * If there're too many cells deleted in this mob file, it needs to be cleaned. - * @return True if it needs to be cleaned. - */ - public boolean needClean() { - return validSize < compactionRatio * size; - } - - /** - * Whether the mob files need to be merged. - * If this mob file is too small, it needs to be merged. - * @return True if it needs to be merged. - */ - public boolean needMerge() { - return this.size < compactionMergeableSize; - } - - /** - * Gets the file status. - * @return The file status. - */ - public FileStatus getFileStatus() { - return fileStatus; - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java deleted file mode 100644 index c27e8aecf1b..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.zookeeper.KeeperException; - -/** - * The sweep tool. It deletes the mob files that are not used and merges the small mob files to - * bigger ones. Each run of this sweep tool only handles one column family. The runs on - * the same column family are mutually exclusive. And the major compaction and sweep tool on the - * same column family are mutually exclusive too. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public class Sweeper extends Configured implements Tool { - - /** - * Sweeps the mob files on one column family. It deletes the unused mob files and merges - * the small mob files into bigger ones. - * @param tableName The current table name in string format. - * @param familyName The column family name. - * @return 0 if success, 2 if job aborted with an exception, 3 if unable to start due to - * other compaction,4 if mr job was unsuccessful - * @throws IOException - * @throws InterruptedException - * @throws ClassNotFoundException - * @throws KeeperException - * @throws ServiceException - */ - int sweepFamily(String tableName, String familyName) throws IOException, InterruptedException, - ClassNotFoundException, KeeperException { - Configuration conf = getConf(); - // make sure the target HBase exists. - HBaseAdmin.available(conf); - Connection connection = ConnectionFactory.createConnection(getConf()); - Admin admin = connection.getAdmin(); - try { - FileSystem fs = FileSystem.get(conf); - TableName tn = TableName.valueOf(tableName); - HTableDescriptor htd = admin.getTableDescriptor(tn); - HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName)); - if (family == null || !family.isMobEnabled()) { - throw new IOException("Column family " + familyName + " is not a MOB column family"); - } - SweepJob job = new SweepJob(conf, fs); - // Run the sweeping - return job.sweep(tn, family); - } catch (Exception e) { - System.err.println("Job aborted due to exception " + e); - return 2; // job failed - } finally { - try { - admin.close(); - } catch (IOException e) { - System.out.println("Failed to close the HBaseAdmin: " + e.getMessage()); - } - try { - connection.close(); - } catch (IOException e) { - System.out.println("Failed to close the connection: " + e.getMessage()); - } - } - } - - public static void main(String[] args) throws Exception { - Configuration conf = HBaseConfiguration.create(); - int ret = ToolRunner.run(conf, new Sweeper(), args); - System.exit(ret); - } - - private void printUsage() { - System.err.println("Usage:\n" + "--------------------------\n" + Sweeper.class.getName() - + " tableName familyName"); - System.err.println(" tableName The table name"); - System.err.println(" familyName The column family name"); - } - - /** - * Main method for the tool. - * @return 0 if success, 1 for bad args. 2 if job aborted with an exception, - * 3 if unable to start due to other compaction, 4 if mr job was unsuccessful - */ - public int run(String[] args) throws Exception { - if (args.length != 2) { - printUsage(); - return 1; - } - String table = args[0]; - String family = args[1]; - return sweepFamily(table, family); - } -} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java deleted file mode 100644 index 666e193a27e..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepJob.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.mapreduce; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.mob.MobConstants; -import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.serializer.JavaSerialization; -import org.apache.hadoop.io.serializer.WritableSerialization; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(MediumTests.class) -public class TestMobSweepJob { - - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, - JavaSerialization.class.getName() + "," + WritableSerialization.class.getName()); - TEST_UTIL.startMiniCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - private void writeFileNames(FileSystem fs, Configuration conf, Path path, - String[] filesNames) throws IOException { - // write the names to a sequence file - SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, - String.class, String.class); - try { - for (String fileName : filesNames) { - writer.append(fileName, MobConstants.EMPTY_STRING); - } - } finally { - IOUtils.closeStream(writer); - } - } - - @Test - public void testSweeperJobWithOutUnusedFile() throws Exception { - FileSystem fs = TEST_UTIL.getTestFileSystem(); - Configuration configuration = new Configuration( - TEST_UTIL.getConfiguration()); - Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), - "/hbase/mobcompaction/SweepJob/working/names/0/visited"); - Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), - "/hbase/mobcompaction/SweepJob/working/names/0/all"); - configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, - vistiedFileNamesPath.toString()); - configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, - allFileNamesPath.toString()); - - writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", - "2", "3", "4", "5", "6"}); - - Path r0 = new Path(vistiedFileNamesPath, "r0"); - writeFileNames(fs, configuration, r0, new String[] { "1", - "2", "3"}); - Path r1 = new Path(vistiedFileNamesPath, "r1"); - writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"}); - Path r2 = new Path(vistiedFileNamesPath, "r2"); - writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"}); - - SweepJob sweepJob = new SweepJob(configuration, fs); - List toBeArchived = sweepJob.getUnusedFiles(configuration); - - assertEquals(0, toBeArchived.size()); - } - - @Test - public void testSweeperJobWithUnusedFile() throws Exception { - FileSystem fs = TEST_UTIL.getTestFileSystem(); - Configuration configuration = new Configuration( - TEST_UTIL.getConfiguration()); - Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), - "/hbase/mobcompaction/SweepJob/working/names/1/visited"); - Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), - "/hbase/mobcompaction/SweepJob/working/names/1/all"); - configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, - vistiedFileNamesPath.toString()); - configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, - allFileNamesPath.toString()); - - writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", - "2", "3", "4", "5", "6"}); - - Path r0 = new Path(vistiedFileNamesPath, "r0"); - writeFileNames(fs, configuration, r0, new String[] { "1", - "2", "3"}); - Path r1 = new Path(vistiedFileNamesPath, "r1"); - writeFileNames(fs, configuration, r1, new String[] { "1", "5"}); - Path r2 = new Path(vistiedFileNamesPath, "r2"); - writeFileNames(fs, configuration, r2, new String[] { "2", "3"}); - - SweepJob sweepJob = new SweepJob(configuration, fs); - List toBeArchived = sweepJob.getUnusedFiles(configuration); - - assertEquals(2, toBeArchived.size()); - assertArrayEquals(new String[]{"4", "6"}, toBeArchived.toArray(new String[0])); - } - - @Test - public void testSweeperJobWithRedundantFile() throws Exception { - FileSystem fs = TEST_UTIL.getTestFileSystem(); - Configuration configuration = new Configuration( - TEST_UTIL.getConfiguration()); - Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration), - "/hbase/mobcompaction/SweepJob/working/names/2/visited"); - Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration), - "/hbase/mobcompaction/SweepJob/working/names/2/all"); - configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, - vistiedFileNamesPath.toString()); - configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY, - allFileNamesPath.toString()); - - writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1", - "2", "3", "4", "5", "6"}); - - Path r0 = new Path(vistiedFileNamesPath, "r0"); - writeFileNames(fs, configuration, r0, new String[] { "1", - "2", "3"}); - Path r1 = new Path(vistiedFileNamesPath, "r1"); - writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"}); - Path r2 = new Path(vistiedFileNamesPath, "r2"); - writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"}); - - SweepJob sweepJob = new SweepJob(configuration, fs); - List toBeArchived = sweepJob.getUnusedFiles(configuration); - - assertEquals(0, toBeArchived.size()); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java deleted file mode 100644 index 44131ba6189..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepMapper.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -@Category(SmallTests.class) -public class TestMobSweepMapper { - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Test - public void TestMap() throws Exception { - String prefix = "0000"; - final String fileName = "19691231f2cd014ea28f42788214560a21a44cef"; - final String mobFilePath = prefix + fileName; - - ImmutableBytesWritable r = new ImmutableBytesWritable(Bytes.toBytes("r")); - final KeyValue[] kvList = new KeyValue[1]; - kvList[0] = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), - Bytes.toBytes("column"), Bytes.toBytes(mobFilePath)); - - Result columns = mock(Result.class); - when(columns.rawCells()).thenReturn(kvList); - - Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); - ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable()); - TableName tn = TableName.valueOf("testSweepMapper"); - TableName lockName = MobUtils.getTableLockName(tn); - String znode = ZKUtil.joinZNode(zkw.znodePaths.tableLockZNode, lockName.getNameAsString()); - configuration.set(SweepJob.SWEEP_JOB_ID, "1"); - configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode); - ServerName serverName = SweepJob.getCurrentServerName(configuration); - configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString()); - - TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw, - serverName); - TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool"); - lock.acquire(); - try { - Mapper.Context ctx = - mock(Mapper.Context.class); - when(ctx.getConfiguration()).thenReturn(configuration); - SweepMapper map = new SweepMapper(); - doAnswer(new Answer() { - - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Text text = (Text) invocation.getArguments()[0]; - KeyValue kv = (KeyValue) invocation.getArguments()[1]; - - assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName); - assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey())); - - return null; - } - }).when(ctx).write(any(Text.class), any(KeyValue.class)); - - map.map(r, columns, ctx); - } finally { - lock.release(); - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java deleted file mode 100644 index abe0d3a6feb..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/mapreduce/TestMobSweepReducer.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.mapreduce.TableInputFormat; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.hadoop.hbase.mob.MobConstants; -import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable; -import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.serializer.JavaSerialization; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.counters.GenericCounter; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Matchers; - -@Category(MediumTests.class) -public class TestMobSweepReducer { - - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private final static String tableName = "testSweepReducer"; - private final static String row = "row"; - private final static String family = "family"; - private final static String qf = "qf"; - private static BufferedMutator table; - private static Admin admin; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); - - TEST_UTIL.startMiniCluster(1); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @SuppressWarnings("deprecation") - @Before - public void setUp() throws Exception { - HTableDescriptor desc = new HTableDescriptor(tableName); - HColumnDescriptor hcd = new HColumnDescriptor(family); - hcd.setMobEnabled(true); - hcd.setMobThreshold(3L); - hcd.setMaxVersions(4); - desc.addFamily(hcd); - - admin = TEST_UTIL.getHBaseAdmin(); - admin.createTable(desc); - table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()) - .getBufferedMutator(TableName.valueOf(tableName)); - } - - @After - public void tearDown() throws Exception { - admin.disableTable(TableName.valueOf(tableName)); - admin.deleteTable(TableName.valueOf(tableName)); - admin.close(); - } - - private List getKeyFromSequenceFile(FileSystem fs, Path path, - Configuration conf) throws Exception { - List list = new ArrayList(); - SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path)); - - String next = (String) reader.next((String) null); - while (next != null) { - list.add(next); - next = (String) reader.next((String) null); - } - reader.close(); - return list; - } - - @Test - public void testRun() throws Exception { - - TableName tn = TableName.valueOf(tableName); - byte[] mobValueBytes = new byte[100]; - - //get the path where mob files lie in - Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family); - - Put put = new Put(Bytes.toBytes(row)); - put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); - Put put2 = new Put(Bytes.toBytes(row + "ignore")); - put2.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes); - table.mutate(put); - table.mutate(put2); - table.flush(); - admin.flush(tn); - - FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); - //check the generation of a mob file - assertEquals(1, fileStatuses.length); - - String mobFile1 = fileStatuses[0].getPath().getName(); - - Configuration configuration = new Configuration(TEST_UTIL.getConfiguration()); - configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f); - configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName); - configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family); - configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir"); - configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir"); - configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, - JavaSerialization.class.getName()); - configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir"); - configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, - System.currentTimeMillis() + 24 * 3600 * 1000); - - ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable()); - TableName lockName = MobUtils.getTableLockName(tn); - String znode = ZKUtil.joinZNode(zkw.znodePaths.tableLockZNode, lockName.getNameAsString()); - configuration.set(SweepJob.SWEEP_JOB_ID, "1"); - configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode); - ServerName serverName = SweepJob.getCurrentServerName(configuration); - configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString()); - - TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw, - serverName); - TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool"); - lock.acquire(); - try { - // use the same counter when mocking - Counter counter = new GenericCounter(); - Reducer.Context ctx = mock(Reducer.Context.class); - when(ctx.getConfiguration()).thenReturn(configuration); - when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter); - when(ctx.nextKey()).thenReturn(true).thenReturn(false); - when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1)); - - byte[] refBytes = Bytes.toBytes(mobFile1); - long valueLength = refBytes.length; - byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes); - KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1, - KeyValue.Type.Put, newValue); - List list = new ArrayList(); - list.add(kv2); - - when(ctx.getValues()).thenReturn(list); - - SweepReducer reducer = new SweepReducer(); - reducer.run(ctx); - } finally { - lock.release(); - } - FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath); - String mobFile2 = filsStatuses2[0].getPath().getName(); - //new mob file is generated, old one has been archived - assertEquals(1, filsStatuses2.length); - assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1)); - - //test sequence file - String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY); - FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath)); - Set files = new TreeSet(); - for (FileStatus st : statuses) { - files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(), - st.getPath(), configuration)); - } - assertEquals(1, files.size()); - assertEquals(true, files.contains(mobFile1)); - } -}