From d4145db6abba119e289d3aee50f308fab1d9bcf5 Mon Sep 17 00:00:00 2001 From: chenglei Date: Fri, 21 Oct 2022 17:53:19 +0800 Subject: [PATCH] HBASE-27433 DefaultMobStoreCompactor should delete MobStoreFile cleanly when compaction is failed (#4840) --- .../hbase/mob/DefaultMobStoreCompactor.java | 49 +++- .../mob/TestMobCompactionWithException.java | 254 ++++++++++++++++++ 2 files changed, 295 insertions(+), 8 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithException.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index e46eb0f6209..9c9985ba8d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -354,6 +354,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); Cell mobCell = null; + List committedMobWriterFileNames = new ArrayList<>(); try { mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); @@ -425,8 +426,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { if (len > maxMobFileSize) { LOG.debug("Closing output MOB File, length={} file={}, store={}", len, mobFileWriter.getPath().getName(), getStoreInfo()); - commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); - mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); + mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, + request, committedMobWriterFileNames); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); mobCells = 0; } @@ -469,8 +470,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { // file compression yet) long len = mobFileWriter.getPos(); if (len > maxMobFileSize) { - commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); - mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); + mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, + request, committedMobWriterFileNames); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); mobCells = 0; } @@ -521,8 +522,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { if (ioOptimizedMode) { long len = mobFileWriter.getPos(); if (len > maxMobFileSize) { - commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); - mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker()); + mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, request, + committedMobWriterFileNames); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); mobCells = 0; } @@ -561,6 +562,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { } cells.clear(); } while (hasMore); + // Commit last MOB writer + commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); finished = true; } catch (InterruptedException e) { progress.cancel(); @@ -582,11 +585,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { LOG.debug("Aborting writer for {} because of a compaction failure, Store {}", mobFileWriter.getPath(), getStoreInfo()); abortWriter(mobFileWriter); + deleteCommittedMobFiles(committedMobWriterFileNames); } } - // Commit last MOB writer - commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); @@ -671,4 +673,35 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { clearThreadLocals(); return newFiles; } + + private StoreFileWriter switchToNewMobWriter(StoreFileWriter mobFileWriter, FileDetails fd, + long mobCells, boolean major, CompactionRequestImpl request, + List committedMobWriterFileNames) throws IOException { + commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major); + committedMobWriterFileNames.add(mobFileWriter.getPath().getName()); + return newMobWriter(fd, major, request.getWriterCreationTracker()); + } + + private void deleteCommittedMobFiles(List fileNames) { + if (fileNames.isEmpty()) { + return; + } + Path mobColumnFamilyPath = + MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); + for (String fileName : fileNames) { + if (fileName == null) { + continue; + } + Path path = new Path(mobColumnFamilyPath, fileName); + try { + if (store.getFileSystem().exists(path)) { + store.getFileSystem().delete(path, false); + } + } catch (IOException e) { + LOG.warn("Failed to delete the mob file {} for an failed mob compaction.", path, e); + } + } + + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithException.java new file mode 100644 index 00000000000..7aaedf170ac --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithException.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mob; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.CellSink; +import org.apache.hadoop.hbase.regionserver.HMobStore; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionAsTable; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; +import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(MediumTests.class) +public class TestMobCompactionWithException { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMobCompactionWithException.class); + + @Rule + public TestName name = new TestName(); + static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithException.class.getName()); + private final static HBaseTestingUtility HTU = new HBaseTestingUtility(); + private static Configuration conf = null; + + private HRegion region = null; + private TableDescriptor tableDescriptor; + private ColumnFamilyDescriptor columnFamilyDescriptor; + private FileSystem fs; + + private static final byte[] COLUMN_FAMILY = fam1; + private final byte[] STARTROW = Bytes.toBytes(START_KEY); + private static volatile boolean testException = false; + private static int rowCount = 100; + private Table table; + + @BeforeClass + public static void setUp() throws Exception { + conf = HTU.getConfiguration(); + conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE); + conf.set(MobStoreEngine.MOB_COMPACTOR_CLASS_KEY, MyMobStoreCompactor.class.getName()); + + } + + @After + public void tearDown() throws Exception { + region.close(); + this.table.close(); + fs.delete(HTU.getDataTestDir(), true); + } + + private void createTable(long mobThreshold) throws IOException { + + this.columnFamilyDescriptor = + ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true) + .setMobThreshold(mobThreshold).setMaxVersions(1).setBlocksize(500).build(); + this.tableDescriptor = + TableDescriptorBuilder.newBuilder(TableName.valueOf(TestMobUtils.getTableName(name))) + .setColumnFamily(columnFamilyDescriptor).build(); + RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); + region = HBaseTestingUtility.createRegionAndWAL(regionInfo, HTU.getDataTestDir(), conf, + tableDescriptor, new MobFileCache(conf)); + this.table = new RegionAsTable(region); + fs = FileSystem.get(conf); + } + + /** + * This test is for HBASE-27433. + */ + @Test + public void testMobStoreFileDeletedWhenCompactException() throws Exception { + this.createTable(200); + byte[] dummyData = makeDummyData(1000); // larger than mob threshold + for (int i = 0; i < rowCount; i++) { + Put p = createPut(i, dummyData); + table.put(p); + region.flush(true); + } + + int storeFileCountBeforeCompact = countStoreFiles(); + int mobFileCountBeforeCompact = countMobFiles(); + long mobFileByteSize = getMobFileByteSize(); + + List stores = region.getStores(); + assertTrue(stores.size() == 1); + HMobStore mobStore = (HMobStore) stores.get(0); + Compactor compactor = mobStore.getStoreEngine().getCompactor(); + MyMobStoreCompactor myMobStoreCompactor = (MyMobStoreCompactor) compactor; + myMobStoreCompactor.setMobFileMaxByteSize(mobFileByteSize + 100); + testException = true; + try { + try { + + // Force major compaction + mobStore.triggerMajorCompaction(); + Optional context = mobStore.requestCompaction(HStore.PRIORITY_USER, + CompactionLifeCycleTracker.DUMMY, User.getCurrent()); + assertTrue(context.isPresent()); + region.compact(context.get(), mobStore, NoLimitThroughputController.INSTANCE, + User.getCurrent()); + + fail(); + } catch (IOException e) { + assertTrue(e != null); + } + } finally { + testException = false; + } + + // When compaction is failed,the count of StoreFile and MobStoreFile should be the same as + // before compaction. + assertEquals("After compaction: store files", storeFileCountBeforeCompact, countStoreFiles()); + assertEquals("After compaction: mob file count", mobFileCountBeforeCompact, countMobFiles()); + } + + private int countStoreFiles() throws IOException { + HStore store = region.getStore(COLUMN_FAMILY); + return store.getStorefilesCount(); + } + + private int countMobFiles() throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(), + columnFamilyDescriptor.getNameAsString()); + if (fs.exists(mobDirPath)) { + FileStatus[] files = HTU.getTestFileSystem().listStatus(mobDirPath); + return files.length; + } + return 0; + } + + private long getMobFileByteSize() throws IOException { + Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(), + columnFamilyDescriptor.getNameAsString()); + if (fs.exists(mobDirPath)) { + FileStatus[] files = HTU.getTestFileSystem().listStatus(mobDirPath); + if (files.length > 0) { + return files[0].getLen(); + } + } + return 0; + } + + private Put createPut(int rowIdx, byte[] dummyData) throws IOException { + Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx))); + p.setDurability(Durability.SKIP_WAL); + p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData); + return p; + } + + private byte[] makeDummyData(int size) { + byte[] dummyData = new byte[size]; + Bytes.random(dummyData); + return dummyData; + } + + public static class MyMobStoreCompactor extends DefaultMobStoreCompactor { + public MyMobStoreCompactor(Configuration conf, HStore store) { + super(conf, store); + + } + + public void setMobFileMaxByteSize(long maxByteSize) { + this.conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, maxByteSize); + } + + @Override + protected boolean performCompaction(FileDetails fd, final InternalScanner scanner, + CellSink writer, long smallestReadPoint, boolean cleanSeqId, + ThroughputController throughputController, CompactionRequestImpl request, + CompactionProgress progress) throws IOException { + + InternalScanner wrappedScanner = new InternalScanner() { + + private int count = -1; + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + count++; + if (count == rowCount - 1 && testException) { + count = 0; + throw new IOException("Inject Error"); + } + return scanner.next(result, scannerContext); + } + + @Override + public void close() throws IOException { + scanner.close(); + } + }; + return super.performCompaction(fd, wrappedScanner, writer, smallestReadPoint, cleanSeqId, + throughputController, request, progress); + } + } +}