diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 29b7e8a1db0..33aecc0b294 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -229,8 +229,8 @@ public class PartitionedMobCompactor extends MobCompactor { } // archive the del files if all the mob files are selected. if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) { - LOG.info("After a mob compaction with all files selected, archiving the del files " - + newDelPaths); + LOG.info( + "After a mob compaction with all files selected, archiving the del files " + newDelPaths); try { MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles); } catch (IOException e) { @@ -381,7 +381,7 @@ public class PartitionedMobCompactor extends MobCompactor { List filesToCompact, int batch, Path bulkloadPathOfPartition, Path bulkloadColumnPath, List newFiles) - throws IOException { + throws IOException { // open scanner to the selected mob files and del files. StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); // the mob files to be compacted, not include the del files. @@ -392,62 +392,92 @@ public class PartitionedMobCompactor extends MobCompactor { StoreFileWriter writer = null; StoreFileWriter refFileWriter = null; Path filePath = null; - Path refFilePath = null; long mobCells = 0; + boolean cleanupTmpMobFile = false; + boolean cleanupBulkloadDirOfPartition = false; + boolean cleanupCommittedMobFile = false; + boolean closeReaders= true; + try { - writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(), - tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), partition.getPartitionId() - .getStartKey(), compactionCacheConfig, cryptoContext); - filePath = writer.getPath(); - byte[] fileName = Bytes.toBytes(filePath.getName()); - // create a temp file and open a writer for it in the bulkloadPath - refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo - .getSecond().longValue(), compactionCacheConfig, cryptoContext); - refFilePath = refFileWriter.getPath(); - List cells = new ArrayList<>(); - boolean hasMore; - ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); - do { - hasMore = scanner.next(cells, scannerContext); - for (Cell cell : cells) { - // write the mob cell to the mob file. - writer.append(cell); - // write the new reference cell to the store file. - KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag); - refFileWriter.append(reference); - mobCells++; + try { + writer = MobUtils + .createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath, + Long.MAX_VALUE, column.getCompactionCompressionType(), + partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext); + cleanupTmpMobFile = true; + filePath = writer.getPath(); + byte[] fileName = Bytes.toBytes(filePath.getName()); + // create a temp file and open a writer for it in the bulkloadPath + refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, + fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext); + cleanupBulkloadDirOfPartition = true; + List cells = new ArrayList<>(); + boolean hasMore; + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + do { + hasMore = scanner.next(cells, scannerContext); + for (Cell cell : cells) { + // write the mob cell to the mob file. + writer.append(cell); + // write the new reference cell to the store file. + KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag); + refFileWriter.append(reference); + mobCells++; + } + cells.clear(); + } while (hasMore); + } finally { + // close the scanner. + scanner.close(); + + if (cleanupTmpMobFile) { + // append metadata to the mob file, and close the mob file writer. + closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); } - cells.clear(); - } while (hasMore); + + if (cleanupBulkloadDirOfPartition) { + // append metadata and bulkload info to the ref mob file, and close the writer. + closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); + } + } + + if (mobCells > 0) { + // commit mob file + MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + cleanupTmpMobFile = false; + cleanupCommittedMobFile = true; + // bulkload the ref file + bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName()); + cleanupCommittedMobFile = false; + newFiles.add(new Path(mobFamilyDir, filePath.getName())); + } + + // archive the old mob files, do not archive the del files. + try { + closeStoreFileReaders(mobFilesToCompact); + closeReaders = false; + MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); + } catch (IOException e) { + LOG.error("Failed to archive the files " + mobFilesToCompact, e); + } } finally { - // close the scanner. - scanner.close(); - // append metadata to the mob file, and close the mob file writer. - closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); - // append metadata and bulkload info to the ref mob file, and close the writer. - closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); - } - if (mobCells > 0) { - // commit mob file - MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); - // bulkload the ref file - bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName()); - newFiles.add(new Path(mobFamilyDir, filePath.getName())); - } else { - // remove the new files - // the mob file is empty, delete it instead of committing. - deletePath(filePath); - // the ref file is empty, delete it instead of committing. - deletePath(refFilePath); - } - // archive the old mob files, do not archive the del files. - try { - closeStoreFileReaders(mobFilesToCompact); - MobUtils - .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); - } catch (IOException e) { - LOG.error("Failed to archive the files " + mobFilesToCompact, e); + if (closeReaders) { + closeStoreFileReaders(mobFilesToCompact); + } + + if (cleanupTmpMobFile) { + deletePath(filePath); + } + + if (cleanupBulkloadDirOfPartition) { + // delete the bulkload files in bulkloadPath + deletePath(bulkloadPathOfPartition); + } + + if (cleanupCommittedMobFile) { + deletePath(new Path(mobFamilyDir, filePath.getName())); + } } } @@ -509,7 +539,7 @@ public class PartitionedMobCompactor extends MobCompactor { writer = MobUtils.createDelFileWriter(conf, fs, column, MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig, - cryptoContext); + cryptoContext); filePath = writer.getPath(); List cells = new ArrayList<>(); boolean hasMore; @@ -572,22 +602,15 @@ public class PartitionedMobCompactor extends MobCompactor { * @throws IOException if IO failure is encountered */ private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory, - String fileName) - throws IOException { + String fileName) + throws IOException { // bulkload the ref file try { LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); - bulkload.doBulkLoad(bulkloadDirectory, - connection.getAdmin(), - table, - connection.getRegionLocator(table.getName())); + bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table, + connection.getRegionLocator(table.getName())); } catch (Exception e) { - // delete the committed mob file - deletePath(new Path(mobFamilyDir, fileName)); throw new IOException(e); - } finally { - // delete the bulkload files in bulkloadPath - deletePath(bulkloadDirectory); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java index 7970d62571d..7da85447713 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java @@ -53,8 +53,10 @@ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.C import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.junit.AfterClass; import org.junit.Assert; +import static org.junit.Assert.assertTrue; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -79,6 +81,9 @@ public class TestPartitionedMobCompactor { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); + // Inject our customized DistributedFileSystem + TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class, + DistributedFileSystem.class); TEST_UTIL.startMiniCluster(1); pool = createThreadPool(); } @@ -160,6 +165,51 @@ public class TestPartitionedMobCompactor { testCompactDelFilesAtBatchSize(tableName, 4, 2); } + @Test + public void testCompactFilesWithDstDirFull() throws Exception { + String tableName = "testCompactFilesWithDstDirFull"; + fs = FileSystem.get(conf); + FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs; + Path testDir = FSUtils.getRootDir(conf); + Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); + basePath = new Path(new Path(mobTestDir, tableName), family); + + try { + int count = 2; + // create 2 mob files. + createStoreFiles(basePath, family, qf, count, Type.Put, true); + listFiles(); + + TableName tName = TableName.valueOf(tableName); + MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool); + faultyFs.setThrowException(true); + try { + compactor.compact(allFiles, true); + } catch (IOException e) { + System.out.println("Expected exception, ignore"); + } + + // Verify that all the files in tmp directory are cleaned up + Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); + FileStatus[] ls = faultyFs.listStatus(tempPath); + + // Only .bulkload under this directory + assertTrue(ls.length == 1); + assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName())); + + Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( + tName.getNamespaceAsString(), tName.getQualifierAsString()))); + + // Nothing in bulkLoad directory + FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath); + assertTrue(lsBulkload.length == 0); + + } finally { + faultyFs.setThrowException(false); + } + } + + private void testCompactDelFilesAtBatchSize(String tableName, int batchSize, int delfileMaxCount) throws Exception { resetConf(); @@ -289,17 +339,30 @@ public class TestPartitionedMobCompactor { */ private void createStoreFiles(Path basePath, String family, String qualifier, int count, Type type) throws IOException { + createStoreFiles(basePath, family, qualifier, count, type, false); + } + + private void createStoreFiles(Path basePath, String family, String qualifier, int count, + Type type, boolean sameStartKey) throws IOException { HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); String startKey = "row_"; MobFileName mobFileName = null; for (int i = 0; i < count; i++) { - byte[] startRow = Bytes.toBytes(startKey + i) ; + byte[] startRow; + if (sameStartKey) { + // When creating multiple files under one partition, suffix needs to be different. + startRow = Bytes.toBytes(startKey); + mobSuffix = UUID.randomUUID().toString().replaceAll("-", ""); + delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; + } else { + startRow = Bytes.toBytes(startKey + i); + } if(type.equals(Type.Delete)) { mobFileName = MobFileName.create(startRow, MobUtils.formatDate( new Date()), delSuffix); } if(type.equals(Type.Put)){ - mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate( + mobFileName = MobFileName.create(startRow, MobUtils.formatDate( new Date()), mobSuffix); } StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs) @@ -394,4 +457,27 @@ public class TestPartitionedMobCompactor { conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); } + + /** + * The customized Distributed File System Implementation + */ + static class FaultyDistributedFileSystem extends DistributedFileSystem { + private volatile boolean throwException = false; + + public FaultyDistributedFileSystem() { + super(); + } + + public void setThrowException(boolean throwException) { + this.throwException = throwException; + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + if (throwException) { + throw new IOException("No more files allowed"); + } + return super.rename(src, dst); + } + } }