1). Fix resource leak issue upon exception during mob compaction. 2). Reorg the code in compactMobFilesInBatch() to make it more readable.

Signed-off-by: Jonathan M Hsieh <jmhsieh@apache.org>
This commit is contained in:
Huaxiang Sun 2016-10-07 15:47:06 -07:00 committed by Jonathan M Hsieh
parent 723d56153f
commit c7cae6be3d
2 changed files with 178 additions and 69 deletions

View File

@ -229,8 +229,8 @@ public class PartitionedMobCompactor extends MobCompactor {
}
// archive the del files if all the mob files are selected.
if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
LOG.info("After a mob compaction with all files selected, archiving the del files "
+ newDelPaths);
LOG.info(
"After a mob compaction with all files selected, archiving the del files " + newDelPaths);
try {
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
} catch (IOException e) {
@ -381,7 +381,7 @@ public class PartitionedMobCompactor extends MobCompactor {
List<StoreFile> filesToCompact, int batch,
Path bulkloadPathOfPartition, Path bulkloadColumnPath,
List<Path> newFiles)
throws IOException {
throws IOException {
// open scanner to the selected mob files and del files.
StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
// the mob files to be compacted, not include the del files.
@ -392,62 +392,92 @@ public class PartitionedMobCompactor extends MobCompactor {
StoreFileWriter writer = null;
StoreFileWriter refFileWriter = null;
Path filePath = null;
Path refFilePath = null;
long mobCells = 0;
boolean cleanupTmpMobFile = false;
boolean cleanupBulkloadDirOfPartition = false;
boolean cleanupCommittedMobFile = false;
boolean closeReaders= true;
try {
writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), partition.getPartitionId()
.getStartKey(), compactionCacheConfig, cryptoContext);
filePath = writer.getPath();
byte[] fileName = Bytes.toBytes(filePath.getName());
// create a temp file and open a writer for it in the bulkloadPath
refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
.getSecond().longValue(), compactionCacheConfig, cryptoContext);
refFilePath = refFileWriter.getPath();
List<Cell> cells = new ArrayList<>();
boolean hasMore;
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
do {
hasMore = scanner.next(cells, scannerContext);
for (Cell cell : cells) {
// write the mob cell to the mob file.
writer.append(cell);
// write the new reference cell to the store file.
KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
refFileWriter.append(reference);
mobCells++;
try {
writer = MobUtils
.createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath,
Long.MAX_VALUE, column.getCompactionCompressionType(),
partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext);
cleanupTmpMobFile = true;
filePath = writer.getPath();
byte[] fileName = Bytes.toBytes(filePath.getName());
// create a temp file and open a writer for it in the bulkloadPath
refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath,
fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext);
cleanupBulkloadDirOfPartition = true;
List<Cell> cells = new ArrayList<>();
boolean hasMore;
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
do {
hasMore = scanner.next(cells, scannerContext);
for (Cell cell : cells) {
// write the mob cell to the mob file.
writer.append(cell);
// write the new reference cell to the store file.
KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
refFileWriter.append(reference);
mobCells++;
}
cells.clear();
} while (hasMore);
} finally {
// close the scanner.
scanner.close();
if (cleanupTmpMobFile) {
// append metadata to the mob file, and close the mob file writer.
closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
}
cells.clear();
} while (hasMore);
if (cleanupBulkloadDirOfPartition) {
// append metadata and bulkload info to the ref mob file, and close the writer.
closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
}
}
if (mobCells > 0) {
// commit mob file
MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
cleanupTmpMobFile = false;
cleanupCommittedMobFile = true;
// bulkload the ref file
bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
cleanupCommittedMobFile = false;
newFiles.add(new Path(mobFamilyDir, filePath.getName()));
}
// archive the old mob files, do not archive the del files.
try {
closeStoreFileReaders(mobFilesToCompact);
closeReaders = false;
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
} catch (IOException e) {
LOG.error("Failed to archive the files " + mobFilesToCompact, e);
}
} finally {
// close the scanner.
scanner.close();
// append metadata to the mob file, and close the mob file writer.
closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
// append metadata and bulkload info to the ref mob file, and close the writer.
closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
}
if (mobCells > 0) {
// commit mob file
MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
// bulkload the ref file
bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
newFiles.add(new Path(mobFamilyDir, filePath.getName()));
} else {
// remove the new files
// the mob file is empty, delete it instead of committing.
deletePath(filePath);
// the ref file is empty, delete it instead of committing.
deletePath(refFilePath);
}
// archive the old mob files, do not archive the del files.
try {
closeStoreFileReaders(mobFilesToCompact);
MobUtils
.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
} catch (IOException e) {
LOG.error("Failed to archive the files " + mobFilesToCompact, e);
if (closeReaders) {
closeStoreFileReaders(mobFilesToCompact);
}
if (cleanupTmpMobFile) {
deletePath(filePath);
}
if (cleanupBulkloadDirOfPartition) {
// delete the bulkload files in bulkloadPath
deletePath(bulkloadPathOfPartition);
}
if (cleanupCommittedMobFile) {
deletePath(new Path(mobFamilyDir, filePath.getName()));
}
}
}
@ -509,7 +539,7 @@ public class PartitionedMobCompactor extends MobCompactor {
writer = MobUtils.createDelFileWriter(conf, fs, column,
MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
cryptoContext);
cryptoContext);
filePath = writer.getPath();
List<Cell> cells = new ArrayList<>();
boolean hasMore;
@ -572,22 +602,15 @@ public class PartitionedMobCompactor extends MobCompactor {
* @throws IOException if IO failure is encountered
*/
private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory,
String fileName)
throws IOException {
String fileName)
throws IOException {
// bulkload the ref file
try {
LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
bulkload.doBulkLoad(bulkloadDirectory,
connection.getAdmin(),
table,
connection.getRegionLocator(table.getName()));
bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
connection.getRegionLocator(table.getName()));
} catch (Exception e) {
// delete the committed mob file
deletePath(new Path(mobFamilyDir, fileName));
throw new IOException(e);
} finally {
// delete the bulkload files in bulkloadPath
deletePath(bulkloadDirectory);
}
}

View File

@ -53,8 +53,10 @@ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.C
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.AfterClass;
import org.junit.Assert;
import static org.junit.Assert.assertTrue;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -79,6 +81,9 @@ public class TestPartitionedMobCompactor {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
// Inject our customized DistributedFileSystem
TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class,
DistributedFileSystem.class);
TEST_UTIL.startMiniCluster(1);
pool = createThreadPool();
}
@ -160,6 +165,51 @@ public class TestPartitionedMobCompactor {
testCompactDelFilesAtBatchSize(tableName, 4, 2);
}
@Test
public void testCompactFilesWithDstDirFull() throws Exception {
String tableName = "testCompactFilesWithDstDirFull";
fs = FileSystem.get(conf);
FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs;
Path testDir = FSUtils.getRootDir(conf);
Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
basePath = new Path(new Path(mobTestDir, tableName), family);
try {
int count = 2;
// create 2 mob files.
createStoreFiles(basePath, family, qf, count, Type.Put, true);
listFiles();
TableName tName = TableName.valueOf(tableName);
MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool);
faultyFs.setThrowException(true);
try {
compactor.compact(allFiles, true);
} catch (IOException e) {
System.out.println("Expected exception, ignore");
}
// Verify that all the files in tmp directory are cleaned up
Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
FileStatus[] ls = faultyFs.listStatus(tempPath);
// Only .bulkload under this directory
assertTrue(ls.length == 1);
assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName()));
Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
tName.getNamespaceAsString(), tName.getQualifierAsString())));
// Nothing in bulkLoad directory
FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath);
assertTrue(lsBulkload.length == 0);
} finally {
faultyFs.setThrowException(false);
}
}
private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
int delfileMaxCount) throws Exception {
resetConf();
@ -289,17 +339,30 @@ public class TestPartitionedMobCompactor {
*/
private void createStoreFiles(Path basePath, String family, String qualifier, int count,
Type type) throws IOException {
createStoreFiles(basePath, family, qualifier, count, type, false);
}
private void createStoreFiles(Path basePath, String family, String qualifier, int count,
Type type, boolean sameStartKey) throws IOException {
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
String startKey = "row_";
MobFileName mobFileName = null;
for (int i = 0; i < count; i++) {
byte[] startRow = Bytes.toBytes(startKey + i) ;
byte[] startRow;
if (sameStartKey) {
// When creating multiple files under one partition, suffix needs to be different.
startRow = Bytes.toBytes(startKey);
mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
} else {
startRow = Bytes.toBytes(startKey + i);
}
if(type.equals(Type.Delete)) {
mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
new Date()), delSuffix);
}
if(type.equals(Type.Put)){
mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
new Date()), mobSuffix);
}
StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
@ -394,4 +457,27 @@ public class TestPartitionedMobCompactor {
conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
}
/**
* The customized Distributed File System Implementation
*/
static class FaultyDistributedFileSystem extends DistributedFileSystem {
private volatile boolean throwException = false;
public FaultyDistributedFileSystem() {
super();
}
public void setThrowException(boolean throwException) {
this.throwException = throwException;
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
if (throwException) {
throw new IOException("No more files allowed");
}
return super.rename(src, dst);
}
}
}