HBASE-16811 Remove mob sweep job.

Change-Id: If849fdcafb8af4a9765be1eb80da77f8b3f29a1a
This commit is contained in:
Apekshit Sharma 2016-10-11 13:45:08 -07:00
parent ec87b4bfe2
commit eb52e26822
10 changed files with 0 additions and 2116 deletions

View File

@ -1,188 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.mapreduce;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Reducer.Context;
/**
* The wrapper of a DefaultMemStore.
* This wrapper is used in the sweep reducer to buffer and sort the cells written from
* the invalid and small mob files.
* It's flushed when it's full, the mob data are written to the mob files, and their file names
* are written back to store files of HBase.
* This memStore is used to sort the cells in mob files.
* In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date),
* in each group, the reducer iterates the files and read the cells to a new and bigger mob file.
* The cells in the same mob file are ordered, but cells across mob files are not.
* So we need this MemStoreWrapper to sort those cells come from different mob files before
* flushing them to the disk, when the memStore is big enough it's flushed as a new mob file.
*/
@InterfaceAudience.Private
public class MemStoreWrapper {
private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class);
private MemStore memstore;
private long flushSize;
private CompactionPartitionId partitionId;
private Context context;
private Configuration conf;
private BufferedMutator table;
private HColumnDescriptor hcd;
private Path mobFamilyDir;
private FileSystem fs;
private CacheConfig cacheConfig;
private Encryption.Context cryptoContext = Encryption.Context.NONE;
public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table,
HColumnDescriptor hcd, MemStore memstore, CacheConfig cacheConfig) throws IOException {
this.memstore = memstore;
this.context = context;
this.fs = fs;
this.table = table;
this.hcd = hcd;
this.conf = context.getConfiguration();
this.cacheConfig = cacheConfig;
flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
cryptoContext = EncryptionUtil.createEncryptionContext(conf, hcd);
}
public void setPartitionId(CompactionPartitionId partitionId) {
this.partitionId = partitionId;
}
/**
* Flushes the memstore if the size is large enough.
* @throws IOException
*/
private void flushMemStoreIfNecessary() throws IOException {
if (memstore.heapSize() >= flushSize) {
flushMemStore();
}
}
/**
* Flushes the memstore anyway.
* @throws IOException
*/
public void flushMemStore() throws IOException {
MemStoreSnapshot snapshot = memstore.snapshot();
internalFlushCache(snapshot);
memstore.clearSnapshot(snapshot.getId());
}
/**
* Flushes the snapshot of the memstore.
* Flushes the mob data to the mob files, and flushes the name of these mob files to HBase.
* @param snapshot The snapshot of the memstore.
* @throws IOException
*/
private void internalFlushCache(final MemStoreSnapshot snapshot)
throws IOException {
if (snapshot.getCellsCount() == 0) {
return;
}
// generate the files into a temp directory.
String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
StoreFileWriter mobFileWriter = MobUtils.createWriter(conf, fs, hcd, partitionId.getDate(),
new Path(tempPathString), snapshot.getCellsCount(), hcd.getCompactionCompression(),
partitionId.getStartKey(), cacheConfig, cryptoContext);
String relativePath = mobFileWriter.getPath().getName();
LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
byte[] referenceValue = Bytes.toBytes(relativePath);
KeyValueScanner scanner = snapshot.getScanner();
Cell cell = null;
while (null != (cell = scanner.next())) {
mobFileWriter.append(cell);
}
scanner.close();
// Write out the log sequence number that corresponds to this output
// hfile. The hfile is current up to and including logCacheFlushId.
mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount());
mobFileWriter.close();
MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1);
// write reference/fileName back to the store files of HBase.
scanner = snapshot.getScanner();
scanner.seek(CellUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
cell = null;
Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE,
Bytes.toBytes(this.table.getName().toString()));
long updatedCount = 0;
while (null != (cell = scanner.next())) {
KeyValue reference = MobUtils.createMobRefKeyValue(cell, referenceValue, tableNameTag);
Put put =
new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
put.add(reference);
table.mutate(put);
updatedCount++;
}
table.flush();
context.getCounter(SweepCounter.RECORDS_UPDATED).increment(updatedCount);
scanner.close();
}
/**
* Adds a Cell into the memstore.
* @param cell The Cell to be added.
* @throws IOException
*/
public void addToMemstore(Cell cell) throws IOException {
memstore.add(cell);
// flush the memstore if it's full.
flushMemStoreIfNecessary();
}
}

View File

@ -1,41 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.mapreduce;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* The partitioner for the sweep job.
* The key is a mob file name. We bucket by date.
*/
@InterfaceAudience.Private
public class MobFilePathHashPartitioner extends Partitioner<Text, KeyValue> {
@Override
public int getPartition(Text fileName, KeyValue kv, int numPartitions) {
MobFileName mobFileName = MobFileName.create(fileName.toString());
String date = mobFileName.getDate();
int hash = date.hashCode();
return (hash & Integer.MAX_VALUE) % numPartitions;
}
}

View File

@ -1,604 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.mapreduce;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NoTagsKeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.JavaSerialization;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.KeeperException;
/**
* The sweep job.
* Run map reduce to merge the smaller mob files into bigger ones and cleans the unused ones.
*/
@InterfaceAudience.Private
public class SweepJob {
private final FileSystem fs;
private final Configuration conf;
private static final Log LOG = LogFactory.getLog(SweepJob.class);
static final String SWEEP_JOB_ID = "hbase.mob.sweep.job.id";
static final String SWEEP_JOB_SERVERNAME = "hbase.mob.sweep.job.servername";
static final String SWEEP_JOB_TABLE_NODE = "hbase.mob.sweep.job.table.node";
static final String WORKING_DIR_KEY = "hbase.mob.sweep.job.dir";
static final String WORKING_ALLNAMES_FILE_KEY = "hbase.mob.sweep.job.all.file";
static final String WORKING_VISITED_DIR_KEY = "hbase.mob.sweep.job.visited.dir";
static final String WORKING_ALLNAMES_DIR = "all";
static final String WORKING_VISITED_DIR = "visited";
public static final String WORKING_FILES_DIR_KEY = "mob.sweep.job.files.dir";
//the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing.
public static final String MOB_SWEEP_JOB_DELAY = "hbase.mob.sweep.job.delay";
protected static final long ONE_DAY = 24 * 60 * 60 * 1000;
private long compactionStartTime = EnvironmentEdgeManager.currentTime();
public final static String CREDENTIALS_LOCATION = "credentials_location";
private CacheConfig cacheConfig;
static final int SCAN_CACHING = 10000;
private TableLockManager tableLockManager;
public SweepJob(Configuration conf, FileSystem fs) {
this.conf = conf;
this.fs = fs;
// disable the block cache.
Configuration copyOfConf = new Configuration(conf);
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
cacheConfig = new CacheConfig(copyOfConf);
}
static ServerName getCurrentServerName(Configuration conf) throws IOException {
String hostname = conf.get(
"hbase.regionserver.ipc.address",
Strings.domainNamePointerToHostName(DNS.getDefaultHost(
conf.get("hbase.regionserver.dns.interface", "default"),
conf.get("hbase.regionserver.dns.nameserver", "default"))));
int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
// Creation of a HSA will force a resolve.
InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
if (initialIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
}
return ServerName.valueOf(initialIsa.getHostName(), initialIsa.getPort(),
EnvironmentEdgeManager.currentTime());
}
/**
* Runs MapReduce to do the sweeping on the mob files.
* There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob
* references from 'normal' regions' rows.
* The running of the sweep tool on the same column family are mutually exclusive.
* The HBase major compaction and running of the sweep tool on the same column family
* are mutually exclusive.
* The synchronization is done by the ZooKeeper.
* So in the beginning of the running, we need to make sure only this sweep tool is the only one
* that is currently running in this column family, and in this column family there're no major
* compaction in progress.
* @param tn The current table name.
* @param family The descriptor of the current column family.
* @return 0 upon success, 3 if bailing out because another compaction is currently happening,
* or 4 the mr job was unsuccessful
*
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws KeeperException
*/
public int sweep(TableName tn, HColumnDescriptor family) throws IOException,
ClassNotFoundException, InterruptedException, KeeperException {
Configuration conf = new Configuration(this.conf);
// check whether the current user is the same one with the owner of hbase root
String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName();
FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR)));
if (hbaseRootFileStat.length > 0) {
String owner = hbaseRootFileStat[0].getOwner();
if (!owner.equals(currentUserName)) {
String errorMsg = "The current user[" + currentUserName
+ "] doesn't have hbase root credentials."
+ " Please make sure the user is the root of the target HBase";
LOG.error(errorMsg);
throw new IOException(errorMsg);
}
} else {
LOG.error("The target HBase doesn't exist");
throw new IOException("The target HBase doesn't exist");
}
String familyName = family.getNameAsString();
String id = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, id, new DummyMobAbortable());
try {
ServerName serverName = getCurrentServerName(conf);
tableLockManager = TableLockManager.createTableLockManager(conf, zkw, serverName);
TableName lockName = MobUtils.getTableLockName(tn);
TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
String tableName = tn.getNameAsString();
// Try to obtain the lock. Use this lock to synchronize all the query
try {
lock.acquire();
} catch (Exception e) {
LOG.warn("Can not lock the table " + tableName
+ ". The major compaction in HBase may be in-progress or another sweep job is running."
+ " Please re-run the job.");
return 3;
}
Job job = null;
try {
Scan scan = new Scan();
scan.addFamily(family.getName());
// Do not retrieve the mob data when scanning
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
scan.setCaching(SCAN_CACHING);
scan.setCacheBlocks(false);
scan.setMaxVersions(family.getMaxVersions());
conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
conf.set(SWEEP_JOB_ID, id);
conf.set(SWEEP_JOB_SERVERNAME, serverName.toString());
String tableLockNode = ZKUtil.joinZNode(zkw.znodePaths.tableLockZNode,
lockName.getNameAsString());
conf.set(SWEEP_JOB_TABLE_NODE, tableLockNode);
job = prepareJob(tn, familyName, scan, conf);
job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName);
// Record the compaction start time.
// In the sweep tool, only the mob file whose modification time is older than
// (startTime - delay) could be handled by this tool.
// The delay is one day. It could be configured as well, but this is only used
// in the test.
job.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
compactionStartTime);
job.setPartitionerClass(MobFilePathHashPartitioner.class);
submit(job, tn, familyName);
if (job.waitForCompletion(true)) {
// Archive the unused mob files.
removeUnusedFiles(job, tn, family);
} else {
System.err.println("Job was not successful");
return 4;
}
} finally {
try {
cleanup(job, tn, familyName);
} finally {
try {
lock.release();
} catch (IOException e) {
LOG.error("Failed to release the table lock " + tableName, e);
}
}
}
} finally {
zkw.close();
}
return 0;
}
/**
* Prepares a map reduce job.
* @param tn The current table name.
* @param familyName The current family name.
* @param scan The current scan.
* @param conf The current configuration.
* @return A map reduce job.
* @throws IOException
*/
private Job prepareJob(TableName tn, String familyName, Scan scan, Configuration conf)
throws IOException {
Job job = Job.getInstance(conf);
job.setJarByClass(SweepMapper.class);
TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan,
SweepMapper.class, Text.class, Writable.class, job);
job.setInputFormatClass(TableInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NoTagsKeyValue.class);
job.setReducerClass(SweepReducer.class);
job.setOutputFormatClass(NullOutputFormat.class);
String jobName = getCustomJobName(this.getClass().getSimpleName(), tn, familyName);
job.setJobName(jobName);
if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
String fileLoc = conf.get(CREDENTIALS_LOCATION);
Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
job.getCredentials().addAll(cred);
}
return job;
}
/**
* Gets a customized job name.
* It's className-mapperClassName-reducerClassName-tableName-familyName.
* @param className The current class name.
* @param tableName The current table name.
* @param familyName The current family name.
* @return The customized job name.
*/
private static String getCustomJobName(String className, TableName tableName, String familyName) {
StringBuilder name = new StringBuilder();
name.append(className);
name.append('-').append(SweepMapper.class.getSimpleName());
name.append('-').append(SweepReducer.class.getSimpleName());
name.append('-').append(tableName.getNamespaceAsString());
name.append('-').append(tableName.getQualifierAsString());
name.append('-').append(familyName);
return name.toString();
}
/**
* Submits a job.
* @param job The current job.
* @param tn The current table name.
* @param familyName The current family name.
* @throws IOException
*/
private void submit(Job job, TableName tn, String familyName) throws IOException {
// delete the temp directory of the mob files in case the failure in the previous
// execution.
Path tempDir =
new Path(MobUtils.getMobHome(job.getConfiguration()), MobConstants.TEMP_DIR_NAME);
Path mobCompactionTempDir =
new Path(tempDir, MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME);
Path workingPath = MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName());
job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString());
// delete the working directory in case it'not deleted by the last running.
fs.delete(workingPath, true);
// create the working directory.
fs.mkdirs(workingPath);
// create a sequence file which contains the names of all the existing files.
Path workingPathOfFiles = new Path(workingPath, "files");
Path workingPathOfNames = new Path(workingPath, "names");
job.getConfiguration().set(WORKING_FILES_DIR_KEY, workingPathOfFiles.toString());
Path allFileNamesPath = new Path(workingPathOfNames, WORKING_ALLNAMES_DIR);
job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString());
Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR);
job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString());
// create a directory where the files contain names of visited mob files are saved.
fs.mkdirs(vistiedFileNamesPath);
Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName);
// Find all the files whose creation time are older than one day.
// Write those file names to a file.
// In each reducer there's a writer, it write the visited file names to a file which is saved
// in WORKING_VISITED_DIR.
// After the job is finished, compare those files, then find out the unused mob files and
// archive them.
FileStatus[] files = fs.listStatus(mobStorePath);
Set<String> fileNames = new TreeSet<String>();
long mobCompactionDelay = job.getConfiguration().getLong(MOB_SWEEP_JOB_DELAY, ONE_DAY);
for (FileStatus fileStatus : files) {
if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) {
if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) {
// only record the potentially unused files older than one day.
fileNames.add(fileStatus.getPath().getName());
}
}
}
FSDataOutputStream fout = null;
SequenceFile.Writer writer = null;
try {
// create a file includes all the existing mob files whose creation time is older than
// (now - oneDay)
fout = fs.create(allFileNamesPath, true);
// write the names to a sequence file
writer = SequenceFile.createWriter(job.getConfiguration(), fout, String.class, String.class,
CompressionType.NONE, null);
for (String fileName : fileNames) {
writer.append(fileName, MobConstants.EMPTY_STRING);
}
writer.hflush();
} finally {
if (writer != null) {
IOUtils.closeStream(writer);
}
if (fout != null) {
IOUtils.closeStream(fout);
}
}
}
/**
* Gets the unused mob files.
* Compare the file which contains all the existing mob files and the visited files,
* find out the unused mob file and archive them.
* @param conf The current configuration.
* @return The unused mob files.
* @throws IOException
*/
List<String> getUnusedFiles(Configuration conf) throws IOException {
// find out the unused files and archive them
Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY));
SequenceFile.Reader allNamesReader = null;
MergeSortReader visitedNamesReader = null;
List<String> toBeArchived = new ArrayList<String>();
try {
allNamesReader = new SequenceFile.Reader(fs, allFileNamesPath, conf);
visitedNamesReader = new MergeSortReader(fs, conf,
new Path(conf.get(WORKING_VISITED_DIR_KEY)));
String nextAll = (String) allNamesReader.next((String) null);
String nextVisited = visitedNamesReader.next();
do {
if (nextAll != null) {
if (nextVisited != null) {
int compare = nextAll.compareTo(nextVisited);
if (compare < 0) {
toBeArchived.add(nextAll);
nextAll = (String) allNamesReader.next((String) null);
} else if (compare > 0) {
nextVisited = visitedNamesReader.next();
} else {
nextAll = (String) allNamesReader.next((String) null);
nextVisited = visitedNamesReader.next();
}
} else {
toBeArchived.add(nextAll);
nextAll = (String) allNamesReader.next((String) null);
}
} else {
break;
}
} while (nextAll != null || nextVisited != null);
} finally {
if (allNamesReader != null) {
IOUtils.closeStream(allNamesReader);
}
if (visitedNamesReader != null) {
visitedNamesReader.close();
}
}
return toBeArchived;
}
/**
* Archives unused mob files.
* @param job The current job.
* @param tn The current table name.
* @param hcd The descriptor of the current column family.
* @throws IOException
*/
private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor hcd) throws IOException {
// find out the unused files and archive them
List<StoreFile> storeFiles = new ArrayList<StoreFile>();
List<String> toBeArchived = getUnusedFiles(job.getConfiguration());
// archive them
Path mobStorePath = MobUtils
.getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString());
for (String archiveFileName : toBeArchived) {
Path path = new Path(mobStorePath, archiveFileName);
storeFiles.add(new StoreFile(fs, path, job.getConfiguration(), cacheConfig, BloomType.NONE));
}
if (!storeFiles.isEmpty()) {
try {
MobUtils.removeMobFiles(job.getConfiguration(), fs, tn,
FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), hcd.getName(), storeFiles);
LOG.info(storeFiles.size() + " unused MOB files are removed");
} catch (Exception e) {
LOG.error("Failed to archive the store files " + storeFiles, e);
}
}
}
/**
* Deletes the working directory.
* @param job The current job.
* @param familyName The family to cleanup
*/
private void cleanup(Job job, TableName tn, String familyName) {
if (job != null) {
// delete the working directory
Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY));
try {
fs.delete(workingPath, true);
} catch (IOException e) {
LOG.warn("Failed to delete the working directory after sweeping store " + familyName
+ " in the table " + tn.getNameAsString(), e);
}
}
}
/**
* A result with index.
*/
private static class IndexedResult implements Comparable<IndexedResult> {
private int index;
private String value;
public IndexedResult(int index, String value) {
this.index = index;
this.value = value;
}
public int getIndex() {
return this.index;
}
public String getValue() {
return this.value;
}
@Override
public int compareTo(IndexedResult o) {
if (this.value == null && o.getValue() == null) {
return 0;
} else if (o.value == null) {
return 1;
} else if (this.value == null) {
return -1;
} else {
return this.value.compareTo(o.value);
}
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof IndexedResult)) {
return false;
}
return compareTo((IndexedResult) obj) == 0;
}
@Override
public int hashCode() {
return value.hashCode();
}
}
/**
* Merge sort reader.
* It merges and sort the readers in different sequence files as one where
* the results are read in order.
*/
private static class MergeSortReader {
private List<SequenceFile.Reader> readers = new ArrayList<SequenceFile.Reader>();
private PriorityQueue<IndexedResult> results = new PriorityQueue<IndexedResult>();
public MergeSortReader(FileSystem fs, Configuration conf, Path path) throws IOException {
if (fs.exists(path)) {
FileStatus[] files = fs.listStatus(path);
int index = 0;
for (FileStatus file : files) {
if (file.isFile()) {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf);
String key = (String) reader.next((String) null);
if (key != null) {
results.add(new IndexedResult(index, key));
readers.add(reader);
index++;
}
}
}
}
}
public String next() throws IOException {
IndexedResult result = results.poll();
if (result != null) {
SequenceFile.Reader reader = readers.get(result.getIndex());
String key = (String) reader.next((String) null);
if (key != null) {
results.add(new IndexedResult(result.getIndex(), key));
}
return result.getValue();
}
return null;
}
public void close() {
for (SequenceFile.Reader reader : readers) {
if (reader != null) {
IOUtils.closeStream(reader);
}
}
}
}
/**
* The counter used in sweep job.
*/
public enum SweepCounter {
/**
* How many files are read.
*/
INPUT_FILE_COUNT,
/**
* How many files need to be merged or cleaned.
*/
FILE_TO_BE_MERGE_OR_CLEAN,
/**
* How many files are left after merging.
*/
FILE_AFTER_MERGE_OR_CLEAN,
/**
* How many records are updated.
*/
RECORDS_UPDATED,
}
public static class DummyMobAbortable implements Abortable {
private boolean abort = false;
public void abort(String why, Throwable e) {
abort = true;
}
public boolean isAborted() {
return abort;
}
}
}

View File

@ -1,100 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.mapreduce;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.TableLock;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* Tracker on the sweep tool node in zookeeper.
* The sweep tool node is an ephemeral one, when the process dies this node is deleted,
* at that time MR might be still running, and if another sweep job is started, two MR
* for the same column family will run at the same time.
* This tracker watches this ephemeral node, if it's gone or it's not created by the
* sweep job that owns the current MR, the current process will be aborted.
*/
@InterfaceAudience.Private
public class SweepJobNodeTracker extends ZooKeeperListener {
private String parentNode;
private String lockNodePrefix;
private String owner;
private String lockNode;
public SweepJobNodeTracker(ZooKeeperWatcher watcher, String parentNode, String owner) {
super(watcher);
this.parentNode = parentNode;
this.owner = owner;
this.lockNodePrefix = ZKUtil.joinZNode(parentNode, "write-");
}
/**
* Registers the watcher on the sweep job node.
* If there's no such a sweep job node, or it's not created by the sweep job that
* owns the current MR, the current process will be aborted.
* This assumes the table lock uses the ZooKeeper. It's a workaround and only used
* in the sweep tool, and the sweep tool will be removed after the mob file compaction
* is finished.
*/
public void start() throws KeeperException {
watcher.registerListener(this);
List<String> children = ZKUtil.listChildrenNoWatch(watcher, parentNode);
if (children != null && !children.isEmpty()) {
// there are locks
TreeSet<String> sortedChildren = new TreeSet<String>();
sortedChildren.addAll(children);
// find all the write locks
SortedSet<String> tails = sortedChildren.tailSet(lockNodePrefix);
if (!tails.isEmpty()) {
for (String tail : tails) {
String path = ZKUtil.joinZNode(parentNode, tail);
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
TableLock lock = TableLockManager.fromBytes(data);
ServerName serverName = lock.getLockOwner();
org.apache.hadoop.hbase.ServerName sn = org.apache.hadoop.hbase.ServerName.valueOf(
serverName.getHostName(), serverName.getPort(), serverName.getStartCode());
// compare the server names (host, port and start code), make sure the lock is created
if (owner.equals(sn.toString())) {
lockNode = path;
return;
}
}
}
}
System.exit(1);
}
@Override
public void nodeDeleted(String path) {
// If the lock node is deleted, abort the current process.
if (path.equals(lockNode)) {
System.exit(1);
}
}
}

View File

@ -1,87 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.mapreduce;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
/**
* The mapper of a sweep job.
* Takes the rows from the table and their results and map to {@literal <filename:Text,
* mobValue:KeyValue>} where mobValue is the actual cell in HBase.
*/
@InterfaceAudience.Private
public class SweepMapper extends TableMapper<Text, KeyValue> {
private ZooKeeperWatcher zkw = null;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
zkw = new ZooKeeperWatcher(context.getConfiguration(), id,
new DummyMobAbortable());
try {
SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
tracker.start();
} catch (KeeperException e) {
throw new IOException(e);
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
if (zkw != null) {
zkw.close();
}
}
@Override
public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException,
InterruptedException {
if (columns == null) {
return;
}
Cell[] cells = columns.rawCells();
if (cells == null || cells.length == 0) {
return;
}
for (Cell c : cells) {
if (MobUtils.hasValidMobRefCellValue(c)) {
String fileName = MobUtils.getMobFileName(c);
context.write(new Text(fileName), KeyValueUtil.ensureKeyValue(c));
}
}
}
}

View File

@ -1,471 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFile;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.zookeeper.KeeperException;
/**
* The reducer of a sweep job.
* This reducer merges the small mob files into bigger ones, and write visited
* names of mob files to a sequence file which is used by the sweep job to delete
* the unused mob files.
* The key of the input is a file name, the value is a collection of KeyValues
* (the value format of KeyValue is valueLength + fileName) in HBase.
* In this reducer, we could know how many cells exist in HBase for a mob file.
* If the existCellSize/mobFileSize < compactionRatio, this mob
* file needs to be merged.
*/
@InterfaceAudience.Private
public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
private static final Log LOG = LogFactory.getLog(SweepReducer.class);
private SequenceFile.Writer writer = null;
private MemStoreWrapper memstore;
private Configuration conf;
private FileSystem fs;
private Path familyDir;
private CacheConfig cacheConfig;
private long compactionBegin;
private BufferedMutator table;
private HColumnDescriptor family;
private long mobCompactionDelay;
private Path mobTableDir;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
this.conf = context.getConfiguration();
Connection c = ConnectionFactory.createConnection(this.conf);
this.fs = FileSystem.get(conf);
// the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing.
mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY);
String tableName = conf.get(TableInputFormat.INPUT_TABLE);
String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
TableName tn = TableName.valueOf(tableName);
this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName);
Admin admin = c.getAdmin();
try {
family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName));
if (family == null) {
// this column family might be removed, directly return.
throw new InvalidFamilyOperationException("Column family '" + familyName
+ "' does not exist. It might be removed.");
}
} finally {
try {
admin.close();
} catch (IOException e) {
LOG.warn("Failed to close the HBaseAdmin", e);
}
}
// disable the block cache.
Configuration copyOfConf = new Configuration(conf);
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
this.cacheConfig = new CacheConfig(copyOfConf);
table = c.getBufferedMutator(new BufferedMutatorParams(tn).writeBufferSize(1*1024*1024));
memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig);
// The start time of the sweep tool.
// Only the mob files whose creation time is older than startTime-oneDay will be handled by the
// reducer since it brings inconsistency to handle the latest mob files.
this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0);
mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn);
}
private SweepPartition createPartition(CompactionPartitionId id, Context context)
throws IOException {
return new SweepPartition(id, context);
}
@Override
public void run(Context context) throws IOException, InterruptedException {
String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
new DummyMobAbortable());
FSDataOutputStream fout = null;
try {
SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
tracker.start();
setup(context);
// create a sequence contains all the visited file names in this reducer.
String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
.replace("-", MobConstants.EMPTY_STRING));
fout = fs.create(nameFilePath, true);
writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class,
String.class, CompressionType.NONE, null);
CompactionPartitionId id;
SweepPartition partition = null;
// the mob files which have the same start key and date are in the same partition.
while (context.nextKey()) {
Text key = context.getCurrentKey();
String keyString = key.toString();
id = createPartitionId(keyString);
if (null == partition || !id.equals(partition.getId())) {
// It's the first mob file in the current partition.
if (null != partition) {
// this mob file is in different partitions with the previous mob file.
// directly close.
partition.close();
}
// create a new one
partition = createPartition(id, context);
}
if (partition != null) {
// run the partition
partition.execute(key, context.getValues());
}
}
if (null != partition) {
partition.close();
}
writer.hflush();
} catch (KeeperException e) {
throw new IOException(e);
} finally {
cleanup(context);
zkw.close();
if (writer != null) {
IOUtils.closeStream(writer);
}
if (fout != null) {
IOUtils.closeStream(fout);
}
if (table != null) {
try {
table.close();
} catch (IOException e) {
LOG.warn(e);
}
}
}
}
/**
* The mob files which have the same start key and date are in the same partition.
* The files in the same partition are merged together into bigger ones.
*/
public class SweepPartition {
private final CompactionPartitionId id;
private final Context context;
private boolean memstoreUpdated = false;
private boolean mergeSmall = false;
private final Map<String, MobFileStatus> fileStatusMap = new HashMap<String, MobFileStatus>();
private final List<Path> toBeDeleted = new ArrayList<Path>();
public SweepPartition(CompactionPartitionId id, Context context) throws IOException {
this.id = id;
this.context = context;
memstore.setPartitionId(id);
init();
}
public CompactionPartitionId getId() {
return this.id;
}
/**
* Prepares the map of files.
*
* @throws IOException
*/
private void init() throws IOException {
FileStatus[] fileStats = listStatus(familyDir, id.getStartKey());
if (null == fileStats) {
return;
}
int smallFileCount = 0;
float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO,
MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO);
long compactionMergeableSize = conf.getLong(
MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE,
MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE);
// list the files. Just merge the hfiles, don't merge the hfile links.
// prepare the map of mob files. The key is the file name, the value is the file status.
for (FileStatus fileStat : fileStats) {
MobFileStatus mobFileStatus = null;
if (!HFileLink.isHFileLink(fileStat.getPath())) {
mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize);
if (mobFileStatus.needMerge()) {
smallFileCount++;
}
// key is file name (not hfile name), value is hfile status.
fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus);
}
}
if (smallFileCount >= 2) {
// merge the files only when there're more than 1 files in the same partition.
this.mergeSmall = true;
}
}
/**
* Flushes the data into mob files and store files, and archives the small
* files after they're merged.
* @throws IOException
*/
public void close() throws IOException {
if (null == id) {
return;
}
// flush remain key values into mob files
if (memstoreUpdated) {
memstore.flushMemStore();
}
List<StoreFile> storeFiles = new ArrayList<StoreFile>(toBeDeleted.size());
// delete samll files after compaction
for (Path path : toBeDeleted) {
LOG.info("[In Partition close] Delete the file " + path + " in partition close");
storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE));
}
if (!storeFiles.isEmpty()) {
try {
MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(),
storeFiles);
context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size());
} catch (IOException e) {
LOG.error("Failed to archive the store files " + storeFiles, e);
}
storeFiles.clear();
}
fileStatusMap.clear();
}
/**
* Merges the small mob files into bigger ones.
* @param fileName The current mob file name.
* @param values The collection of KeyValues in this mob file.
* @throws IOException
*/
public void execute(Text fileName, Iterable<KeyValue> values) throws IOException {
if (null == values) {
return;
}
MobFileName mobFileName = MobFileName.create(fileName.toString());
LOG.info("[In reducer] The file name: " + fileName.toString());
MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName());
if (null == mobFileStat) {
LOG.info("[In reducer] Cannot find the file, probably this record is obsolete");
return;
}
// only handle the files that are older then one day.
if (compactionBegin - mobFileStat.getFileStatus().getModificationTime()
<= mobCompactionDelay) {
return;
}
// write the hfile name
writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING);
Set<Cell> kvs = new HashSet<Cell>();
for (KeyValue kv : values) {
if (kv.getValueLength() > Bytes.SIZEOF_INT) {
mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(),
Bytes.SIZEOF_INT));
}
kvs.add(kv);
}
// If the mob file is a invalid one or a small one, merge it into new/bigger ones.
if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) {
context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1);
MobFile file = MobFile.create(fs,
new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig);
StoreFileScanner scanner = null;
file.open();
try {
scanner = file.getScanner();
scanner.seek(CellUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY));
Cell cell;
while (null != (cell = scanner.next())) {
if (kvs.contains(cell)) {
// write the KeyValue existing in HBase to the memstore.
memstore.addToMemstore(cell);
memstoreUpdated = true;
}
}
} finally {
if (scanner != null) {
scanner.close();
}
file.close();
}
toBeDeleted.add(mobFileStat.getFileStatus().getPath());
}
}
/**
* Lists the files with the same prefix.
* @param p The file path.
* @param prefix The prefix.
* @return The files with the same prefix.
* @throws IOException
*/
private FileStatus[] listStatus(Path p, String prefix) throws IOException {
return fs.listStatus(p, new PathPrefixFilter(prefix));
}
}
static class PathPrefixFilter implements PathFilter {
private final String prefix;
public PathPrefixFilter(String prefix) {
this.prefix = prefix;
}
public boolean accept(Path path) {
return path.getName().startsWith(prefix, 0);
}
}
/**
* Creates the partition id.
* @param fileNameAsString The current file name, in string.
* @return The partition id.
*/
private CompactionPartitionId createPartitionId(String fileNameAsString) {
MobFileName fileName = MobFileName.create(fileNameAsString);
return new CompactionPartitionId(fileName.getStartKey(), fileName.getDate());
}
/**
* The mob file status used in the sweep reduecer.
*/
private static class MobFileStatus {
private FileStatus fileStatus;
private int validSize;
private long size;
private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO;
private long compactionMergeableSize =
MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE;
/**
* @param fileStatus The current FileStatus.
* @param compactionRatio compactionRatio the invalid ratio.
* If there're too many cells deleted in a mob file, it's regarded as invalid,
* and needs to be written to a new one.
* If existingCellSize/fileSize < compactionRatio, it's regarded as a invalid one.
* @param compactionMergeableSize compactionMergeableSize If the size of a mob file is less
* than this value, it's regarded as a small file and needs to be merged
*/
public MobFileStatus(FileStatus fileStatus, float compactionRatio,
long compactionMergeableSize) {
this.fileStatus = fileStatus;
this.size = fileStatus.getLen();
validSize = 0;
this.compactionRatio = compactionRatio;
this.compactionMergeableSize = compactionMergeableSize;
}
/**
* Add size to this file.
* @param size The size to be added.
*/
public void addValidSize(int size) {
this.validSize += size;
}
/**
* Whether the mob files need to be cleaned.
* If there're too many cells deleted in this mob file, it needs to be cleaned.
* @return True if it needs to be cleaned.
*/
public boolean needClean() {
return validSize < compactionRatio * size;
}
/**
* Whether the mob files need to be merged.
* If this mob file is too small, it needs to be merged.
* @return True if it needs to be merged.
*/
public boolean needMerge() {
return this.size < compactionMergeableSize;
}
/**
* Gets the file status.
* @return The file status.
*/
public FileStatus getFileStatus() {
return fileStatus;
}
}
}

View File

@ -1,126 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
/**
* The sweep tool. It deletes the mob files that are not used and merges the small mob files to
* bigger ones. Each run of this sweep tool only handles one column family. The runs on
* the same column family are mutually exclusive. And the major compaction and sweep tool on the
* same column family are mutually exclusive too.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Sweeper extends Configured implements Tool {
/**
* Sweeps the mob files on one column family. It deletes the unused mob files and merges
* the small mob files into bigger ones.
* @param tableName The current table name in string format.
* @param familyName The column family name.
* @return 0 if success, 2 if job aborted with an exception, 3 if unable to start due to
* other compaction,4 if mr job was unsuccessful
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
* @throws KeeperException
* @throws ServiceException
*/
int sweepFamily(String tableName, String familyName) throws IOException, InterruptedException,
ClassNotFoundException, KeeperException {
Configuration conf = getConf();
// make sure the target HBase exists.
HBaseAdmin.available(conf);
Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin();
try {
FileSystem fs = FileSystem.get(conf);
TableName tn = TableName.valueOf(tableName);
HTableDescriptor htd = admin.getTableDescriptor(tn);
HColumnDescriptor family = htd.getFamily(Bytes.toBytes(familyName));
if (family == null || !family.isMobEnabled()) {
throw new IOException("Column family " + familyName + " is not a MOB column family");
}
SweepJob job = new SweepJob(conf, fs);
// Run the sweeping
return job.sweep(tn, family);
} catch (Exception e) {
System.err.println("Job aborted due to exception " + e);
return 2; // job failed
} finally {
try {
admin.close();
} catch (IOException e) {
System.out.println("Failed to close the HBaseAdmin: " + e.getMessage());
}
try {
connection.close();
} catch (IOException e) {
System.out.println("Failed to close the connection: " + e.getMessage());
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int ret = ToolRunner.run(conf, new Sweeper(), args);
System.exit(ret);
}
private void printUsage() {
System.err.println("Usage:\n" + "--------------------------\n" + Sweeper.class.getName()
+ " tableName familyName");
System.err.println(" tableName The table name");
System.err.println(" familyName The column family name");
}
/**
* Main method for the tool.
* @return 0 if success, 1 for bad args. 2 if job aborted with an exception,
* 3 if unable to start due to other compaction, 4 if mr job was unsuccessful
*/
public int run(String[] args) throws Exception {
if (args.length != 2) {
printUsage();
return 1;
}
String table = args[0];
String family = args[1];
return sweepFamily(table, family);
}
}

View File

@ -1,167 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.mapreduce;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.serializer.JavaSerialization;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestMobSweepJob {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
private void writeFileNames(FileSystem fs, Configuration conf, Path path,
String[] filesNames) throws IOException {
// write the names to a sequence file
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
String.class, String.class);
try {
for (String fileName : filesNames) {
writer.append(fileName, MobConstants.EMPTY_STRING);
}
} finally {
IOUtils.closeStream(writer);
}
}
@Test
public void testSweeperJobWithOutUnusedFile() throws Exception {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration configuration = new Configuration(
TEST_UTIL.getConfiguration());
Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
"/hbase/mobcompaction/SweepJob/working/names/0/visited");
Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
"/hbase/mobcompaction/SweepJob/working/names/0/all");
configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
vistiedFileNamesPath.toString());
configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
allFileNamesPath.toString());
writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
"2", "3", "4", "5", "6"});
Path r0 = new Path(vistiedFileNamesPath, "r0");
writeFileNames(fs, configuration, r0, new String[] { "1",
"2", "3"});
Path r1 = new Path(vistiedFileNamesPath, "r1");
writeFileNames(fs, configuration, r1, new String[] { "1", "4", "5"});
Path r2 = new Path(vistiedFileNamesPath, "r2");
writeFileNames(fs, configuration, r2, new String[] { "2", "3", "6"});
SweepJob sweepJob = new SweepJob(configuration, fs);
List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
assertEquals(0, toBeArchived.size());
}
@Test
public void testSweeperJobWithUnusedFile() throws Exception {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration configuration = new Configuration(
TEST_UTIL.getConfiguration());
Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
"/hbase/mobcompaction/SweepJob/working/names/1/visited");
Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
"/hbase/mobcompaction/SweepJob/working/names/1/all");
configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
vistiedFileNamesPath.toString());
configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
allFileNamesPath.toString());
writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
"2", "3", "4", "5", "6"});
Path r0 = new Path(vistiedFileNamesPath, "r0");
writeFileNames(fs, configuration, r0, new String[] { "1",
"2", "3"});
Path r1 = new Path(vistiedFileNamesPath, "r1");
writeFileNames(fs, configuration, r1, new String[] { "1", "5"});
Path r2 = new Path(vistiedFileNamesPath, "r2");
writeFileNames(fs, configuration, r2, new String[] { "2", "3"});
SweepJob sweepJob = new SweepJob(configuration, fs);
List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
assertEquals(2, toBeArchived.size());
assertArrayEquals(new String[]{"4", "6"}, toBeArchived.toArray(new String[0]));
}
@Test
public void testSweeperJobWithRedundantFile() throws Exception {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration configuration = new Configuration(
TEST_UTIL.getConfiguration());
Path vistiedFileNamesPath = new Path(MobUtils.getMobHome(configuration),
"/hbase/mobcompaction/SweepJob/working/names/2/visited");
Path allFileNamesPath = new Path(MobUtils.getMobHome(configuration),
"/hbase/mobcompaction/SweepJob/working/names/2/all");
configuration.set(SweepJob.WORKING_VISITED_DIR_KEY,
vistiedFileNamesPath.toString());
configuration.set(SweepJob.WORKING_ALLNAMES_FILE_KEY,
allFileNamesPath.toString());
writeFileNames(fs, configuration, allFileNamesPath, new String[] { "1",
"2", "3", "4", "5", "6"});
Path r0 = new Path(vistiedFileNamesPath, "r0");
writeFileNames(fs, configuration, r0, new String[] { "1",
"2", "3"});
Path r1 = new Path(vistiedFileNamesPath, "r1");
writeFileNames(fs, configuration, r1, new String[] { "1", "5", "6", "7"});
Path r2 = new Path(vistiedFileNamesPath, "r2");
writeFileNames(fs, configuration, r2, new String[] { "2", "3", "4"});
SweepJob sweepJob = new SweepJob(configuration, fs);
List<String> toBeArchived = sweepJob.getUnusedFiles(configuration);
assertEquals(0, toBeArchived.size());
}
}

View File

@ -1,116 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@Category(SmallTests.class)
public class TestMobSweepMapper {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void TestMap() throws Exception {
String prefix = "0000";
final String fileName = "19691231f2cd014ea28f42788214560a21a44cef";
final String mobFilePath = prefix + fileName;
ImmutableBytesWritable r = new ImmutableBytesWritable(Bytes.toBytes("r"));
final KeyValue[] kvList = new KeyValue[1];
kvList[0] = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"),
Bytes.toBytes("column"), Bytes.toBytes(mobFilePath));
Result columns = mock(Result.class);
when(columns.rawCells()).thenReturn(kvList);
Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
TableName tn = TableName.valueOf("testSweepMapper");
TableName lockName = MobUtils.getTableLockName(tn);
String znode = ZKUtil.joinZNode(zkw.znodePaths.tableLockZNode, lockName.getNameAsString());
configuration.set(SweepJob.SWEEP_JOB_ID, "1");
configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
ServerName serverName = SweepJob.getCurrentServerName(configuration);
configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
serverName);
TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
lock.acquire();
try {
Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
mock(Mapper.Context.class);
when(ctx.getConfiguration()).thenReturn(configuration);
SweepMapper map = new SweepMapper();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Text text = (Text) invocation.getArguments()[0];
KeyValue kv = (KeyValue) invocation.getArguments()[1];
assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName);
assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey()));
return null;
}
}).when(ctx).write(any(Text.class), any(KeyValue.class));
map.map(r, columns, ctx);
} finally {
lock.release();
}
}
}

View File

@ -1,216 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.JavaSerialization;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Matchers;
@Category(MediumTests.class)
public class TestMobSweepReducer {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static String tableName = "testSweepReducer";
private final static String row = "row";
private final static String family = "family";
private final static String qf = "qf";
private static BufferedMutator table;
private static Admin admin;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
TEST_UTIL.startMiniCluster(1);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@SuppressWarnings("deprecation")
@Before
public void setUp() throws Exception {
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMobEnabled(true);
hcd.setMobThreshold(3L);
hcd.setMaxVersions(4);
desc.addFamily(hcd);
admin = TEST_UTIL.getHBaseAdmin();
admin.createTable(desc);
table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
.getBufferedMutator(TableName.valueOf(tableName));
}
@After
public void tearDown() throws Exception {
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
admin.close();
}
private List<String> getKeyFromSequenceFile(FileSystem fs, Path path,
Configuration conf) throws Exception {
List<String> list = new ArrayList<String>();
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
String next = (String) reader.next((String) null);
while (next != null) {
list.add(next);
next = (String) reader.next((String) null);
}
reader.close();
return list;
}
@Test
public void testRun() throws Exception {
TableName tn = TableName.valueOf(tableName);
byte[] mobValueBytes = new byte[100];
//get the path where mob files lie in
Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family);
Put put = new Put(Bytes.toBytes(row));
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
Put put2 = new Put(Bytes.toBytes(row + "ignore"));
put2.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
table.mutate(put);
table.mutate(put2);
table.flush();
admin.flush(tn);
FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
//check the generation of a mob file
assertEquals(1, fileStatuses.length);
String mobFile1 = fileStatuses[0].getPath().getName();
Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f);
configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName);
configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family);
configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir");
configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir");
configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
JavaSerialization.class.getName());
configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir");
configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
System.currentTimeMillis() + 24 * 3600 * 1000);
ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
TableName lockName = MobUtils.getTableLockName(tn);
String znode = ZKUtil.joinZNode(zkw.znodePaths.tableLockZNode, lockName.getNameAsString());
configuration.set(SweepJob.SWEEP_JOB_ID, "1");
configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
ServerName serverName = SweepJob.getCurrentServerName(configuration);
configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
serverName);
TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
lock.acquire();
try {
// use the same counter when mocking
Counter counter = new GenericCounter();
Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class);
when(ctx.getConfiguration()).thenReturn(configuration);
when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
when(ctx.nextKey()).thenReturn(true).thenReturn(false);
when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
byte[] refBytes = Bytes.toBytes(mobFile1);
long valueLength = refBytes.length;
byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1,
KeyValue.Type.Put, newValue);
List<KeyValue> list = new ArrayList<KeyValue>();
list.add(kv2);
when(ctx.getValues()).thenReturn(list);
SweepReducer reducer = new SweepReducer();
reducer.run(ctx);
} finally {
lock.release();
}
FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
String mobFile2 = filsStatuses2[0].getPath().getName();
//new mob file is generated, old one has been archived
assertEquals(1, filsStatuses2.length);
assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
//test sequence file
String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY);
FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath));
Set<String> files = new TreeSet<String>();
for (FileStatus st : statuses) {
files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(),
st.getPath(), configuration));
}
assertEquals(1, files.size());
assertEquals(true, files.contains(mobFile1));
}
}