Revert "1). Fix resource leak issue upon exception during mob compaction. 2). Reorg the code in compactMobFilesInBatch() to make it more readable."

This reverts commit c7cae6be3d.

Missing JIRA ID
This commit is contained in:
Sean Busbey 2016-10-10 14:47:46 -05:00
parent 3c35a722d9
commit 932a1964bf
2 changed files with 69 additions and 178 deletions

View File

@ -229,8 +229,8 @@ public class PartitionedMobCompactor extends MobCompactor {
}
// archive the del files if all the mob files are selected.
if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
LOG.info(
"After a mob compaction with all files selected, archiving the del files " + newDelPaths);
LOG.info("After a mob compaction with all files selected, archiving the del files "
+ newDelPaths);
try {
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
} catch (IOException e) {
@ -381,7 +381,7 @@ public class PartitionedMobCompactor extends MobCompactor {
List<StoreFile> filesToCompact, int batch,
Path bulkloadPathOfPartition, Path bulkloadColumnPath,
List<Path> newFiles)
throws IOException {
throws IOException {
// open scanner to the selected mob files and del files.
StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
// the mob files to be compacted, not include the del files.
@ -392,92 +392,62 @@ public class PartitionedMobCompactor extends MobCompactor {
StoreFileWriter writer = null;
StoreFileWriter refFileWriter = null;
Path filePath = null;
Path refFilePath = null;
long mobCells = 0;
boolean cleanupTmpMobFile = false;
boolean cleanupBulkloadDirOfPartition = false;
boolean cleanupCommittedMobFile = false;
boolean closeReaders= true;
try {
try {
writer = MobUtils
.createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath,
Long.MAX_VALUE, column.getCompactionCompressionType(),
partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext);
cleanupTmpMobFile = true;
filePath = writer.getPath();
byte[] fileName = Bytes.toBytes(filePath.getName());
// create a temp file and open a writer for it in the bulkloadPath
refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath,
fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext);
cleanupBulkloadDirOfPartition = true;
List<Cell> cells = new ArrayList<>();
boolean hasMore;
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
do {
hasMore = scanner.next(cells, scannerContext);
for (Cell cell : cells) {
// write the mob cell to the mob file.
writer.append(cell);
// write the new reference cell to the store file.
KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
refFileWriter.append(reference);
mobCells++;
}
cells.clear();
} while (hasMore);
} finally {
// close the scanner.
scanner.close();
if (cleanupTmpMobFile) {
// append metadata to the mob file, and close the mob file writer.
closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), partition.getPartitionId()
.getStartKey(), compactionCacheConfig, cryptoContext);
filePath = writer.getPath();
byte[] fileName = Bytes.toBytes(filePath.getName());
// create a temp file and open a writer for it in the bulkloadPath
refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
.getSecond().longValue(), compactionCacheConfig, cryptoContext);
refFilePath = refFileWriter.getPath();
List<Cell> cells = new ArrayList<>();
boolean hasMore;
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
do {
hasMore = scanner.next(cells, scannerContext);
for (Cell cell : cells) {
// write the mob cell to the mob file.
writer.append(cell);
// write the new reference cell to the store file.
KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
refFileWriter.append(reference);
mobCells++;
}
if (cleanupBulkloadDirOfPartition) {
// append metadata and bulkload info to the ref mob file, and close the writer.
closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
}
}
if (mobCells > 0) {
// commit mob file
MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
cleanupTmpMobFile = false;
cleanupCommittedMobFile = true;
// bulkload the ref file
bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
cleanupCommittedMobFile = false;
newFiles.add(new Path(mobFamilyDir, filePath.getName()));
}
// archive the old mob files, do not archive the del files.
try {
closeStoreFileReaders(mobFilesToCompact);
closeReaders = false;
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
} catch (IOException e) {
LOG.error("Failed to archive the files " + mobFilesToCompact, e);
}
cells.clear();
} while (hasMore);
} finally {
if (closeReaders) {
closeStoreFileReaders(mobFilesToCompact);
}
if (cleanupTmpMobFile) {
deletePath(filePath);
}
if (cleanupBulkloadDirOfPartition) {
// delete the bulkload files in bulkloadPath
deletePath(bulkloadPathOfPartition);
}
if (cleanupCommittedMobFile) {
deletePath(new Path(mobFamilyDir, filePath.getName()));
}
// close the scanner.
scanner.close();
// append metadata to the mob file, and close the mob file writer.
closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
// append metadata and bulkload info to the ref mob file, and close the writer.
closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
}
if (mobCells > 0) {
// commit mob file
MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
// bulkload the ref file
bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
newFiles.add(new Path(mobFamilyDir, filePath.getName()));
} else {
// remove the new files
// the mob file is empty, delete it instead of committing.
deletePath(filePath);
// the ref file is empty, delete it instead of committing.
deletePath(refFilePath);
}
// archive the old mob files, do not archive the del files.
try {
closeStoreFileReaders(mobFilesToCompact);
MobUtils
.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
} catch (IOException e) {
LOG.error("Failed to archive the files " + mobFilesToCompact, e);
}
}
@ -539,7 +509,7 @@ public class PartitionedMobCompactor extends MobCompactor {
writer = MobUtils.createDelFileWriter(conf, fs, column,
MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
cryptoContext);
cryptoContext);
filePath = writer.getPath();
List<Cell> cells = new ArrayList<>();
boolean hasMore;
@ -602,15 +572,22 @@ public class PartitionedMobCompactor extends MobCompactor {
* @throws IOException if IO failure is encountered
*/
private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory,
String fileName)
throws IOException {
String fileName)
throws IOException {
// bulkload the ref file
try {
LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
connection.getRegionLocator(table.getName()));
bulkload.doBulkLoad(bulkloadDirectory,
connection.getAdmin(),
table,
connection.getRegionLocator(table.getName()));
} catch (Exception e) {
// delete the committed mob file
deletePath(new Path(mobFamilyDir, fileName));
throw new IOException(e);
} finally {
// delete the bulkload files in bulkloadPath
deletePath(bulkloadDirectory);
}
}

View File

@ -53,10 +53,8 @@ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.C
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.AfterClass;
import org.junit.Assert;
import static org.junit.Assert.assertTrue;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -81,9 +79,6 @@ public class TestPartitionedMobCompactor {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
// Inject our customized DistributedFileSystem
TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class,
DistributedFileSystem.class);
TEST_UTIL.startMiniCluster(1);
pool = createThreadPool();
}
@ -165,51 +160,6 @@ public class TestPartitionedMobCompactor {
testCompactDelFilesAtBatchSize(tableName, 4, 2);
}
@Test
public void testCompactFilesWithDstDirFull() throws Exception {
String tableName = "testCompactFilesWithDstDirFull";
fs = FileSystem.get(conf);
FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs;
Path testDir = FSUtils.getRootDir(conf);
Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
basePath = new Path(new Path(mobTestDir, tableName), family);
try {
int count = 2;
// create 2 mob files.
createStoreFiles(basePath, family, qf, count, Type.Put, true);
listFiles();
TableName tName = TableName.valueOf(tableName);
MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool);
faultyFs.setThrowException(true);
try {
compactor.compact(allFiles, true);
} catch (IOException e) {
System.out.println("Expected exception, ignore");
}
// Verify that all the files in tmp directory are cleaned up
Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
FileStatus[] ls = faultyFs.listStatus(tempPath);
// Only .bulkload under this directory
assertTrue(ls.length == 1);
assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName()));
Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
tName.getNamespaceAsString(), tName.getQualifierAsString())));
// Nothing in bulkLoad directory
FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath);
assertTrue(lsBulkload.length == 0);
} finally {
faultyFs.setThrowException(false);
}
}
private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
int delfileMaxCount) throws Exception {
resetConf();
@ -339,30 +289,17 @@ public class TestPartitionedMobCompactor {
*/
private void createStoreFiles(Path basePath, String family, String qualifier, int count,
Type type) throws IOException {
createStoreFiles(basePath, family, qualifier, count, type, false);
}
private void createStoreFiles(Path basePath, String family, String qualifier, int count,
Type type, boolean sameStartKey) throws IOException {
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
String startKey = "row_";
MobFileName mobFileName = null;
for (int i = 0; i < count; i++) {
byte[] startRow;
if (sameStartKey) {
// When creating multiple files under one partition, suffix needs to be different.
startRow = Bytes.toBytes(startKey);
mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
} else {
startRow = Bytes.toBytes(startKey + i);
}
byte[] startRow = Bytes.toBytes(startKey + i) ;
if(type.equals(Type.Delete)) {
mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
new Date()), delSuffix);
}
if(type.equals(Type.Put)){
mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
new Date()), mobSuffix);
}
StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
@ -457,27 +394,4 @@ public class TestPartitionedMobCompactor {
conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
}
/**
* The customized Distributed File System Implementation
*/
static class FaultyDistributedFileSystem extends DistributedFileSystem {
private volatile boolean throwException = false;
public FaultyDistributedFileSystem() {
super();
}
public void setThrowException(boolean throwException) {
this.throwException = throwException;
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
if (throwException) {
throw new IOException("No more files allowed");
}
return super.rename(src, dst);
}
}
}