diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index 33aecc0b294..29b7e8a1db0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -229,8 +229,8 @@ public class PartitionedMobCompactor extends MobCompactor { } // archive the del files if all the mob files are selected. if (request.type == CompactionType.ALL_FILES && !newDelPaths.isEmpty()) { - LOG.info( - "After a mob compaction with all files selected, archiving the del files " + newDelPaths); + LOG.info("After a mob compaction with all files selected, archiving the del files " + + newDelPaths); try { MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), newDelFiles); } catch (IOException e) { @@ -381,7 +381,7 @@ public class PartitionedMobCompactor extends MobCompactor { List filesToCompact, int batch, Path bulkloadPathOfPartition, Path bulkloadColumnPath, List newFiles) - throws IOException { + throws IOException { // open scanner to the selected mob files and del files. StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); // the mob files to be compacted, not include the del files. @@ -392,92 +392,62 @@ public class PartitionedMobCompactor extends MobCompactor { StoreFileWriter writer = null; StoreFileWriter refFileWriter = null; Path filePath = null; + Path refFilePath = null; long mobCells = 0; - boolean cleanupTmpMobFile = false; - boolean cleanupBulkloadDirOfPartition = false; - boolean cleanupCommittedMobFile = false; - boolean closeReaders= true; - try { - try { - writer = MobUtils - .createWriter(conf, fs, column, partition.getPartitionId().getDate(), tempPath, - Long.MAX_VALUE, column.getCompactionCompressionType(), - partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext); - cleanupTmpMobFile = true; - filePath = writer.getPath(); - byte[] fileName = Bytes.toBytes(filePath.getName()); - // create a temp file and open a writer for it in the bulkloadPath - refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, - fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext); - cleanupBulkloadDirOfPartition = true; - List cells = new ArrayList<>(); - boolean hasMore; - ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); - do { - hasMore = scanner.next(cells, scannerContext); - for (Cell cell : cells) { - // write the mob cell to the mob file. - writer.append(cell); - // write the new reference cell to the store file. - KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag); - refFileWriter.append(reference); - mobCells++; - } - cells.clear(); - } while (hasMore); - } finally { - // close the scanner. - scanner.close(); - - if (cleanupTmpMobFile) { - // append metadata to the mob file, and close the mob file writer. - closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); + writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(), + tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), partition.getPartitionId() + .getStartKey(), compactionCacheConfig, cryptoContext); + filePath = writer.getPath(); + byte[] fileName = Bytes.toBytes(filePath.getName()); + // create a temp file and open a writer for it in the bulkloadPath + refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo + .getSecond().longValue(), compactionCacheConfig, cryptoContext); + refFilePath = refFileWriter.getPath(); + List cells = new ArrayList<>(); + boolean hasMore; + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + do { + hasMore = scanner.next(cells, scannerContext); + for (Cell cell : cells) { + // write the mob cell to the mob file. + writer.append(cell); + // write the new reference cell to the store file. + KeyValue reference = MobUtils.createMobRefKeyValue(cell, fileName, tableNameTag); + refFileWriter.append(reference); + mobCells++; } - - if (cleanupBulkloadDirOfPartition) { - // append metadata and bulkload info to the ref mob file, and close the writer. - closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); - } - } - - if (mobCells > 0) { - // commit mob file - MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); - cleanupTmpMobFile = false; - cleanupCommittedMobFile = true; - // bulkload the ref file - bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName()); - cleanupCommittedMobFile = false; - newFiles.add(new Path(mobFamilyDir, filePath.getName())); - } - - // archive the old mob files, do not archive the del files. - try { - closeStoreFileReaders(mobFilesToCompact); - closeReaders = false; - MobUtils.removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); - } catch (IOException e) { - LOG.error("Failed to archive the files " + mobFilesToCompact, e); - } + cells.clear(); + } while (hasMore); } finally { - if (closeReaders) { - closeStoreFileReaders(mobFilesToCompact); - } - - if (cleanupTmpMobFile) { - deletePath(filePath); - } - - if (cleanupBulkloadDirOfPartition) { - // delete the bulkload files in bulkloadPath - deletePath(bulkloadPathOfPartition); - } - - if (cleanupCommittedMobFile) { - deletePath(new Path(mobFamilyDir, filePath.getName())); - } + // close the scanner. + scanner.close(); + // append metadata to the mob file, and close the mob file writer. + closeMobFileWriter(writer, fileInfo.getFirst(), mobCells); + // append metadata and bulkload info to the ref mob file, and close the writer. + closeRefFileWriter(refFileWriter, fileInfo.getFirst(), request.selectionTime); + } + if (mobCells > 0) { + // commit mob file + MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); + // bulkload the ref file + bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName()); + newFiles.add(new Path(mobFamilyDir, filePath.getName())); + } else { + // remove the new files + // the mob file is empty, delete it instead of committing. + deletePath(filePath); + // the ref file is empty, delete it instead of committing. + deletePath(refFilePath); + } + // archive the old mob files, do not archive the del files. + try { + closeStoreFileReaders(mobFilesToCompact); + MobUtils + .removeMobFiles(conf, fs, tableName, mobTableDir, column.getName(), mobFilesToCompact); + } catch (IOException e) { + LOG.error("Failed to archive the files " + mobFilesToCompact, e); } } @@ -539,7 +509,7 @@ public class PartitionedMobCompactor extends MobCompactor { writer = MobUtils.createDelFileWriter(conf, fs, column, MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), HConstants.EMPTY_START_ROW, compactionCacheConfig, - cryptoContext); + cryptoContext); filePath = writer.getPath(); List cells = new ArrayList<>(); boolean hasMore; @@ -602,15 +572,22 @@ public class PartitionedMobCompactor extends MobCompactor { * @throws IOException if IO failure is encountered */ private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory, - String fileName) - throws IOException { + String fileName) + throws IOException { // bulkload the ref file try { LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); - bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table, - connection.getRegionLocator(table.getName())); + bulkload.doBulkLoad(bulkloadDirectory, + connection.getAdmin(), + table, + connection.getRegionLocator(table.getName())); } catch (Exception e) { + // delete the committed mob file + deletePath(new Path(mobFamilyDir, fileName)); throw new IOException(e); + } finally { + // delete the bulkload files in bulkloadPath + deletePath(bulkloadDirectory); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java index 7da85447713..7970d62571d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java @@ -53,10 +53,8 @@ import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.C import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.junit.AfterClass; import org.junit.Assert; -import static org.junit.Assert.assertTrue; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -81,9 +79,6 @@ public class TestPartitionedMobCompactor { @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); - // Inject our customized DistributedFileSystem - TEST_UTIL.getConfiguration().setClass("fs.hdfs.impl", FaultyDistributedFileSystem.class, - DistributedFileSystem.class); TEST_UTIL.startMiniCluster(1); pool = createThreadPool(); } @@ -165,51 +160,6 @@ public class TestPartitionedMobCompactor { testCompactDelFilesAtBatchSize(tableName, 4, 2); } - @Test - public void testCompactFilesWithDstDirFull() throws Exception { - String tableName = "testCompactFilesWithDstDirFull"; - fs = FileSystem.get(conf); - FaultyDistributedFileSystem faultyFs = (FaultyDistributedFileSystem)fs; - Path testDir = FSUtils.getRootDir(conf); - Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); - basePath = new Path(new Path(mobTestDir, tableName), family); - - try { - int count = 2; - // create 2 mob files. - createStoreFiles(basePath, family, qf, count, Type.Put, true); - listFiles(); - - TableName tName = TableName.valueOf(tableName); - MobCompactor compactor = new PartitionedMobCompactor(conf, faultyFs, tName, hcd, pool); - faultyFs.setThrowException(true); - try { - compactor.compact(allFiles, true); - } catch (IOException e) { - System.out.println("Expected exception, ignore"); - } - - // Verify that all the files in tmp directory are cleaned up - Path tempPath = new Path(MobUtils.getMobHome(conf), MobConstants.TEMP_DIR_NAME); - FileStatus[] ls = faultyFs.listStatus(tempPath); - - // Only .bulkload under this directory - assertTrue(ls.length == 1); - assertTrue(MobConstants.BULKLOAD_DIR_NAME.equalsIgnoreCase(ls[0].getPath().getName())); - - Path bulkloadPath = new Path(tempPath, new Path(MobConstants.BULKLOAD_DIR_NAME, new Path( - tName.getNamespaceAsString(), tName.getQualifierAsString()))); - - // Nothing in bulkLoad directory - FileStatus[] lsBulkload = faultyFs.listStatus(bulkloadPath); - assertTrue(lsBulkload.length == 0); - - } finally { - faultyFs.setThrowException(false); - } - } - - private void testCompactDelFilesAtBatchSize(String tableName, int batchSize, int delfileMaxCount) throws Exception { resetConf(); @@ -339,30 +289,17 @@ public class TestPartitionedMobCompactor { */ private void createStoreFiles(Path basePath, String family, String qualifier, int count, Type type) throws IOException { - createStoreFiles(basePath, family, qualifier, count, type, false); - } - - private void createStoreFiles(Path basePath, String family, String qualifier, int count, - Type type, boolean sameStartKey) throws IOException { HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); String startKey = "row_"; MobFileName mobFileName = null; for (int i = 0; i < count; i++) { - byte[] startRow; - if (sameStartKey) { - // When creating multiple files under one partition, suffix needs to be different. - startRow = Bytes.toBytes(startKey); - mobSuffix = UUID.randomUUID().toString().replaceAll("-", ""); - delSuffix = UUID.randomUUID().toString().replaceAll("-", "") + "_del"; - } else { - startRow = Bytes.toBytes(startKey + i); - } + byte[] startRow = Bytes.toBytes(startKey + i) ; if(type.equals(Type.Delete)) { mobFileName = MobFileName.create(startRow, MobUtils.formatDate( new Date()), delSuffix); } if(type.equals(Type.Put)){ - mobFileName = MobFileName.create(startRow, MobUtils.formatDate( + mobFileName = MobFileName.create(Bytes.toBytes(startKey + i), MobUtils.formatDate( new Date()), mobSuffix); } StoreFileWriter mobFileWriter = new StoreFileWriter.Builder(conf, cacheConf, fs) @@ -457,27 +394,4 @@ public class TestPartitionedMobCompactor { conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); } - - /** - * The customized Distributed File System Implementation - */ - static class FaultyDistributedFileSystem extends DistributedFileSystem { - private volatile boolean throwException = false; - - public FaultyDistributedFileSystem() { - super(); - } - - public void setThrowException(boolean throwException) { - this.throwException = throwException; - } - - @Override - public boolean rename(Path src, Path dst) throws IOException { - if (throwException) { - throw new IOException("No more files allowed"); - } - return super.rename(src, dst); - } - } }