HBASE-16767 Mob compaction needs to clean up temporary files in face of IOExceptions.
1). Fix resource leak issue upon exception during mob compaction. 2). Reorg the code in compactMobFilesInBatch() to make it more readable. Signed-off-by: Jonathan M Hsieh <jmhsieh@apache.org>
This commit is contained in:
parent
932a1964bf
commit
c930bc92f4
|
@ -229,8 +229,8 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
}
|
||||
// archive the del files if all the mob files are selected.
|
||||
if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) {
|
||||
LOG.info("After a mob compaction with all files selected, archiving the del files "
|
||||
+ newDelPaths);
|
||||
LOG.info(
|
||||
"After a mob compaction with all files selected, archiving the del files " + newDelPaths);
|
||||
try {
|
||||
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles);
|
||||
} catch (IOException e) {
|
||||
|
@ -381,7 +381,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
List<StoreFile> filesToCompact, int batch,
|
||||
Path bulkloadPathOfPartition, Path bulkloadColumnPath,
|
||||
List<Path> newFiles)
|
||||
throws IOException {
|
||||
throws IOException {
|
||||
// open scanner to the selected mob files and del files.
|
||||
StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES);
|
||||
// the mob files to be compacted, not include the del files.
|
||||
|
@ -392,62 +392,92 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
StoreFileWriter writer = null;
|
||||
StoreFileWriter refFileWriter = null;
|
||||
Path filePath = null;
|
||||
Path refFilePath = null;
|
||||
long mobCells = 0;
|
||||
boolean cleanupTmpMobFile = false;
|
||||
boolean cleanupBulkloadDirOfPartition = false;
|
||||
boolean cleanupCommittedMobFile = false;
|
||||
boolean closeReaders= true;
|
||||
|
||||
try {
|
||||
writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
|
||||
tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), partition.getPartitionId()
|
||||
.getStartKey(), compactionCacheConfig, cryptoContext);
|
||||
filePath = writer.getPath();
|
||||
byte[] fileName = Bytes.toBytes(filePath.getName());
|
||||
// create a temp file and open a writer for it in the bulkloadPath
|
||||
refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
|
||||
.getSecond().longValue(), compactionCacheConfig, cryptoContext);
|
||||
refFilePath = refFileWriter.getPath();
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
boolean hasMore;
|
||||
ScannerContext scannerContext =
|
||||
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
|
||||
do {
|
||||
hasMore = scanner.next(cells, scannerContext);
|
||||
for (Cell cell : cells) {
|
||||
// write the mob cell to the mob file.
|
||||
writer.append(cell);
|
||||
// write the new reference cell to the store file.
|
||||
KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
|
||||
refFileWriter.append(reference);
|
||||
mobCells++;
|
||||
try {
|
||||
writer = MobUtils
|
||||
.createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath,
|
||||
Long.MAX_VALUE, column.getCompactionCompressionType(),
|
||||
partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext);
|
||||
cleanupTmpMobFile = true;
|
||||
filePath = writer.getPath();
|
||||
byte[] fileName = Bytes.toBytes(filePath.getName());
|
||||
// create a temp file and open a writer for it in the bulkloadPath
|
||||
refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath,
|
||||
fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext);
|
||||
cleanupBulkloadDirOfPartition = true;
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
boolean hasMore;
|
||||
ScannerContext scannerContext =
|
||||
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
|
||||
do {
|
||||
hasMore = scanner.next(cells, scannerContext);
|
||||
for (Cell cell : cells) {
|
||||
// write the mob cell to the mob file.
|
||||
writer.append(cell);
|
||||
// write the new reference cell to the store file.
|
||||
KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag);
|
||||
refFileWriter.append(reference);
|
||||
mobCells++;
|
||||
}
|
||||
cells.clear();
|
||||
} while (hasMore);
|
||||
} finally {
|
||||
// close the scanner.
|
||||
scanner.close();
|
||||
|
||||
if (cleanupTmpMobFile) {
|
||||
// append metadata to the mob file, and close the mob file writer.
|
||||
closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
|
||||
}
|
||||
cells.clear();
|
||||
} while (hasMore);
|
||||
|
||||
if (cleanupBulkloadDirOfPartition) {
|
||||
// append metadata and bulkload info to the ref mob file, and close the writer.
|
||||
closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
|
||||
}
|
||||
}
|
||||
|
||||
if (mobCells > 0) {
|
||||
// commit mob file
|
||||
MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
|
||||
cleanupTmpMobFile = false;
|
||||
cleanupCommittedMobFile = true;
|
||||
// bulkload the ref file
|
||||
bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
|
||||
cleanupCommittedMobFile = false;
|
||||
newFiles.add(new Path(mobFamilyDir, filePath.getName()));
|
||||
}
|
||||
|
||||
// archive the old mob files, do not archive the del files.
|
||||
try {
|
||||
closeStoreFileReaders(mobFilesToCompact);
|
||||
closeReaders = false;
|
||||
MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to archive the files " + mobFilesToCompact, e);
|
||||
}
|
||||
} finally {
|
||||
// close the scanner.
|
||||
scanner.close();
|
||||
// append metadata to the mob file, and close the mob file writer.
|
||||
closeMobFileWriter(writer, fileInfo.getFirst(), mobCells);
|
||||
// append metadata and bulkload info to the ref mob file, and close the writer.
|
||||
closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime);
|
||||
}
|
||||
if (mobCells > 0) {
|
||||
// commit mob file
|
||||
MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig);
|
||||
// bulkload the ref file
|
||||
bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
|
||||
newFiles.add(new Path(mobFamilyDir, filePath.getName()));
|
||||
} else {
|
||||
// remove the new files
|
||||
// the mob file is empty, delete it instead of committing.
|
||||
deletePath(filePath);
|
||||
// the ref file is empty, delete it instead of committing.
|
||||
deletePath(refFilePath);
|
||||
}
|
||||
// archive the old mob files, do not archive the del files.
|
||||
try {
|
||||
closeStoreFileReaders(mobFilesToCompact);
|
||||
MobUtils
|
||||
.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to archive the files " + mobFilesToCompact, e);
|
||||
if (closeReaders) {
|
||||
closeStoreFileReaders(mobFilesToCompact);
|
||||
}
|
||||
|
||||
if (cleanupTmpMobFile) {
|
||||
deletePath(filePath);
|
||||
}
|
||||
|
||||
if (cleanupBulkloadDirOfPartition) {
|
||||
// delete the bulkload files in bulkloadPath
|
||||
deletePath(bulkloadPathOfPartition);
|
||||
}
|
||||
|
||||
if (cleanupCommittedMobFile) {
|
||||
deletePath(new Path(mobFamilyDir, filePath.getName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -509,7 +539,7 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
writer = MobUtils.createDelFileWriter(conf, fs, column,
|
||||
MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
|
||||
column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
|
||||
cryptoContext);
|
||||
cryptoContext);
|
||||
filePath = writer.getPath();
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
boolean hasMore;
|
||||
|
@ -572,22 +602,15 @@ public class PartitionedMobCompactor extends MobCompactor {
|
|||
* @throws IOException if IO failure is encountered
|
||||
*/
|
||||
private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory,
|
||||
String fileName)
|
||||
throws IOException {
|
||||
String fileName)
|
||||
throws IOException {
|
||||
// bulkload the ref file
|
||||
try {
|
||||
LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
|
||||
bulkload.doBulkLoad(bulkloadDirectory,
|
||||
connection.getAdmin(),
|
||||
table,
|
||||
connection.getRegionLocator(table.getName()));
|
||||
bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
|
||||
connection.getRegionLocator(table.getName()));
|
||||
} catch (Exception e) {
|
||||
// delete the committed mob file
|
||||
deletePath(new Path(mobFamilyDir, fileName));
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
// delete the bulkload files in bulkloadPath
|
||||
deletePath(bulkloadDirectory);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -53,8 +53,10 @@ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.C
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -79,6 +81,9 @@ public class TestPartitionedMobCompactor {
|
|||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
|
||||
// Inject our customized DistributedFileSystem
|
||||
TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class,
|
||||
DistributedFileSystem.class);
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
pool = createThreadPool();
|
||||
}
|
||||
|
@ -160,6 +165,51 @@ public class TestPartitionedMobCompactor {
|
|||
testCompactDelFilesAtBatchSize(tableName, 4, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactFilesWithDstDirFull() throws Exception {
|
||||
String tableName = "testCompactFilesWithDstDirFull";
|
||||
fs = FileSystem.get(conf);
|
||||
FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs;
|
||||
Path testDir = FSUtils.getRootDir(conf);
|
||||
Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
|
||||
basePath = new Path(new Path(mobTestDir, tableName), family);
|
||||
|
||||
try {
|
||||
int count = 2;
|
||||
// create 2 mob files.
|
||||
createStoreFiles(basePath, family, qf, count, Type.Put, true);
|
||||
listFiles();
|
||||
|
||||
TableName tName = TableName.valueOf(tableName);
|
||||
MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool);
|
||||
faultyFs.setThrowException(true);
|
||||
try {
|
||||
compactor.compact(allFiles, true);
|
||||
} catch (IOException e) {
|
||||
System.out.println("Expected exception, ignore");
|
||||
}
|
||||
|
||||
// Verify that all the files in tmp directory are cleaned up
|
||||
Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME);
|
||||
FileStatus[] ls = faultyFs.listStatus(tempPath);
|
||||
|
||||
// Only .bulkload under this directory
|
||||
assertTrue(ls.length == 1);
|
||||
assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName()));
|
||||
|
||||
Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path(
|
||||
tName.getNamespaceAsString(), tName.getQualifierAsString())));
|
||||
|
||||
// Nothing in bulkLoad directory
|
||||
FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath);
|
||||
assertTrue(lsBulkload.length == 0);
|
||||
|
||||
} finally {
|
||||
faultyFs.setThrowException(false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void testCompactDelFilesAtBatchSize(String tableName, int batchSize,
|
||||
int delfileMaxCount) throws Exception {
|
||||
resetConf();
|
||||
|
@ -289,17 +339,30 @@ public class TestPartitionedMobCompactor {
|
|||
*/
|
||||
private void createStoreFiles(Path basePath, String family, String qualifier, int count,
|
||||
Type type) throws IOException {
|
||||
createStoreFiles(basePath, family, qualifier, count, type, false);
|
||||
}
|
||||
|
||||
private void createStoreFiles(Path basePath, String family, String qualifier, int count,
|
||||
Type type, boolean sameStartKey) throws IOException {
|
||||
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
|
||||
String startKey = "row_";
|
||||
MobFileName mobFileName = null;
|
||||
for (int i = 0; i < count; i++) {
|
||||
byte[] startRow = Bytes.toBytes(startKey + i) ;
|
||||
byte[] startRow;
|
||||
if (sameStartKey) {
|
||||
// When creating multiple files under one partition, suffix needs to be different.
|
||||
startRow = Bytes.toBytes(startKey);
|
||||
mobSuffix = UUID.randomUUID().toString().replaceAll("-", "");
|
||||
delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del";
|
||||
} else {
|
||||
startRow = Bytes.toBytes(startKey + i);
|
||||
}
|
||||
if(type.equals(Type.Delete)) {
|
||||
mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
|
||||
new Date()), delSuffix);
|
||||
}
|
||||
if(type.equals(Type.Put)){
|
||||
mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate(
|
||||
mobFileName = MobFileName.create(startRow, MobUtils.formatDate(
|
||||
new Date()), mobSuffix);
|
||||
}
|
||||
StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs)
|
||||
|
@ -394,4 +457,27 @@ public class TestPartitionedMobCompactor {
|
|||
conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE,
|
||||
MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* The customized Distributed File System Implementation
|
||||
*/
|
||||
static class FaultyDistributedFileSystem extends DistributedFileSystem {
|
||||
private volatile boolean throwException = false;
|
||||
|
||||
public FaultyDistributedFileSystem() {
|
||||
super();
|
||||
}
|
||||
|
||||
public void setThrowException(boolean throwException) {
|
||||
this.throwException = throwException;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rename(Path src, Path dst) throws IOException {
|
||||
if (throwException) {
|
||||
throw new IOException("No more files allowed");
|
||||
}
|
||||
return super.rename(src, dst);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue