HBASE-27433 DefaultMobStoreCompactor should delete MobStoreFile cleanly when compaction is failed (#4840)
This commit is contained in:
parent
c590bbf43d
commit
d4145db6ab
|
@ -354,6 +354,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
|
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
|
||||||
|
|
||||||
Cell mobCell = null;
|
Cell mobCell = null;
|
||||||
|
List<String> committedMobWriterFileNames = new ArrayList<>();
|
||||||
try {
|
try {
|
||||||
|
|
||||||
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
|
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
|
||||||
|
@ -425,8 +426,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
if (len > maxMobFileSize) {
|
if (len > maxMobFileSize) {
|
||||||
LOG.debug("Closing output MOB File, length={} file={}, store={}", len,
|
LOG.debug("Closing output MOB File, length={} file={}, store={}", len,
|
||||||
mobFileWriter.getPath().getName(), getStoreInfo());
|
mobFileWriter.getPath().getName(), getStoreInfo());
|
||||||
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major,
|
||||||
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
|
request, committedMobWriterFileNames);
|
||||||
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
||||||
mobCells = 0;
|
mobCells = 0;
|
||||||
}
|
}
|
||||||
|
@ -469,8 +470,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
// file compression yet)
|
// file compression yet)
|
||||||
long len = mobFileWriter.getPos();
|
long len = mobFileWriter.getPos();
|
||||||
if (len > maxMobFileSize) {
|
if (len > maxMobFileSize) {
|
||||||
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major,
|
||||||
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
|
request, committedMobWriterFileNames);
|
||||||
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
||||||
mobCells = 0;
|
mobCells = 0;
|
||||||
}
|
}
|
||||||
|
@ -521,8 +522,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
if (ioOptimizedMode) {
|
if (ioOptimizedMode) {
|
||||||
long len = mobFileWriter.getPos();
|
long len = mobFileWriter.getPos();
|
||||||
if (len > maxMobFileSize) {
|
if (len > maxMobFileSize) {
|
||||||
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, request,
|
||||||
mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
|
committedMobWriterFileNames);
|
||||||
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
|
||||||
mobCells = 0;
|
mobCells = 0;
|
||||||
}
|
}
|
||||||
|
@ -561,6 +562,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
}
|
}
|
||||||
cells.clear();
|
cells.clear();
|
||||||
} while (hasMore);
|
} while (hasMore);
|
||||||
|
// Commit last MOB writer
|
||||||
|
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
||||||
finished = true;
|
finished = true;
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
progress.cancel();
|
progress.cancel();
|
||||||
|
@ -582,11 +585,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
LOG.debug("Aborting writer for {} because of a compaction failure, Store {}",
|
LOG.debug("Aborting writer for {} because of a compaction failure, Store {}",
|
||||||
mobFileWriter.getPath(), getStoreInfo());
|
mobFileWriter.getPath(), getStoreInfo());
|
||||||
abortWriter(mobFileWriter);
|
abortWriter(mobFileWriter);
|
||||||
|
deleteCommittedMobFiles(committedMobWriterFileNames);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit last MOB writer
|
|
||||||
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
|
||||||
mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
|
mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
|
||||||
mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
|
mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
|
||||||
mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
|
mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
|
||||||
|
@ -671,4 +673,35 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
clearThreadLocals();
|
clearThreadLocals();
|
||||||
return newFiles;
|
return newFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private StoreFileWriter switchToNewMobWriter(StoreFileWriter mobFileWriter, FileDetails fd,
|
||||||
|
long mobCells, boolean major, CompactionRequestImpl request,
|
||||||
|
List<String> committedMobWriterFileNames) throws IOException {
|
||||||
|
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
|
||||||
|
committedMobWriterFileNames.add(mobFileWriter.getPath().getName());
|
||||||
|
return newMobWriter(fd, major, request.getWriterCreationTracker());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteCommittedMobFiles(List<String> fileNames) {
|
||||||
|
if (fileNames.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Path mobColumnFamilyPath =
|
||||||
|
MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
|
||||||
|
for (String fileName : fileNames) {
|
||||||
|
if (fileName == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Path path = new Path(mobColumnFamilyPath, fileName);
|
||||||
|
try {
|
||||||
|
if (store.getFileSystem().exists(path)) {
|
||||||
|
store.getFileSystem().delete(path, false);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to delete the mob file {} for an failed mob compaction.", path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,254 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mob;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HMobStore;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionAsTable;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.ScannerContext;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestMobCompactionWithException {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestMobCompactionWithException.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithException.class.getName());
|
||||||
|
private final static HBaseTestingUtility HTU = new HBaseTestingUtility();
|
||||||
|
private static Configuration conf = null;
|
||||||
|
|
||||||
|
private HRegion region = null;
|
||||||
|
private TableDescriptor tableDescriptor;
|
||||||
|
private ColumnFamilyDescriptor columnFamilyDescriptor;
|
||||||
|
private FileSystem fs;
|
||||||
|
|
||||||
|
private static final byte[] COLUMN_FAMILY = fam1;
|
||||||
|
private final byte[] STARTROW = Bytes.toBytes(START_KEY);
|
||||||
|
private static volatile boolean testException = false;
|
||||||
|
private static int rowCount = 100;
|
||||||
|
private Table table;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
conf = HTU.getConfiguration();
|
||||||
|
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
|
||||||
|
conf.set(MobStoreEngine.MOB_COMPACTOR_CLASS_KEY, MyMobStoreCompactor.class.getName());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
region.close();
|
||||||
|
this.table.close();
|
||||||
|
fs.delete(HTU.getDataTestDir(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTable(long mobThreshold) throws IOException {
|
||||||
|
|
||||||
|
this.columnFamilyDescriptor =
|
||||||
|
ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true)
|
||||||
|
.setMobThreshold(mobThreshold).setMaxVersions(1).setBlocksize(500).build();
|
||||||
|
this.tableDescriptor =
|
||||||
|
TableDescriptorBuilder.newBuilder(TableName.valueOf(TestMobUtils.getTableName(name)))
|
||||||
|
.setColumnFamily(columnFamilyDescriptor).build();
|
||||||
|
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
|
||||||
|
region = HBaseTestingUtility.createRegionAndWAL(regionInfo, HTU.getDataTestDir(), conf,
|
||||||
|
tableDescriptor, new MobFileCache(conf));
|
||||||
|
this.table = new RegionAsTable(region);
|
||||||
|
fs = FileSystem.get(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test is for HBASE-27433.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMobStoreFileDeletedWhenCompactException() throws Exception {
|
||||||
|
this.createTable(200);
|
||||||
|
byte[] dummyData = makeDummyData(1000); // larger than mob threshold
|
||||||
|
for (int i = 0; i < rowCount; i++) {
|
||||||
|
Put p = createPut(i, dummyData);
|
||||||
|
table.put(p);
|
||||||
|
region.flush(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
int storeFileCountBeforeCompact = countStoreFiles();
|
||||||
|
int mobFileCountBeforeCompact = countMobFiles();
|
||||||
|
long mobFileByteSize = getMobFileByteSize();
|
||||||
|
|
||||||
|
List<HStore> stores = region.getStores();
|
||||||
|
assertTrue(stores.size() == 1);
|
||||||
|
HMobStore mobStore = (HMobStore) stores.get(0);
|
||||||
|
Compactor<?> compactor = mobStore.getStoreEngine().getCompactor();
|
||||||
|
MyMobStoreCompactor myMobStoreCompactor = (MyMobStoreCompactor) compactor;
|
||||||
|
myMobStoreCompactor.setMobFileMaxByteSize(mobFileByteSize + 100);
|
||||||
|
testException = true;
|
||||||
|
try {
|
||||||
|
try {
|
||||||
|
|
||||||
|
// Force major compaction
|
||||||
|
mobStore.triggerMajorCompaction();
|
||||||
|
Optional<CompactionContext> context = mobStore.requestCompaction(HStore.PRIORITY_USER,
|
||||||
|
CompactionLifeCycleTracker.DUMMY, User.getCurrent());
|
||||||
|
assertTrue(context.isPresent());
|
||||||
|
region.compact(context.get(), mobStore, NoLimitThroughputController.INSTANCE,
|
||||||
|
User.getCurrent());
|
||||||
|
|
||||||
|
fail();
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertTrue(e != null);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
testException = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// When compaction is failed,the count of StoreFile and MobStoreFile should be the same as
|
||||||
|
// before compaction.
|
||||||
|
assertEquals("After compaction: store files", storeFileCountBeforeCompact, countStoreFiles());
|
||||||
|
assertEquals("After compaction: mob file count", mobFileCountBeforeCompact, countMobFiles());
|
||||||
|
}
|
||||||
|
|
||||||
|
private int countStoreFiles() throws IOException {
|
||||||
|
HStore store = region.getStore(COLUMN_FAMILY);
|
||||||
|
return store.getStorefilesCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
private int countMobFiles() throws IOException {
|
||||||
|
Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
|
||||||
|
columnFamilyDescriptor.getNameAsString());
|
||||||
|
if (fs.exists(mobDirPath)) {
|
||||||
|
FileStatus[] files = HTU.getTestFileSystem().listStatus(mobDirPath);
|
||||||
|
return files.length;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getMobFileByteSize() throws IOException {
|
||||||
|
Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableDescriptor.getTableName(),
|
||||||
|
columnFamilyDescriptor.getNameAsString());
|
||||||
|
if (fs.exists(mobDirPath)) {
|
||||||
|
FileStatus[] files = HTU.getTestFileSystem().listStatus(mobDirPath);
|
||||||
|
if (files.length > 0) {
|
||||||
|
return files[0].getLen();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
|
||||||
|
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
|
||||||
|
p.setDurability(Durability.SKIP_WAL);
|
||||||
|
p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] makeDummyData(int size) {
|
||||||
|
byte[] dummyData = new byte[size];
|
||||||
|
Bytes.random(dummyData);
|
||||||
|
return dummyData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MyMobStoreCompactor extends DefaultMobStoreCompactor {
|
||||||
|
public MyMobStoreCompactor(Configuration conf, HStore store) {
|
||||||
|
super(conf, store);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMobFileMaxByteSize(long maxByteSize) {
|
||||||
|
this.conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, maxByteSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean performCompaction(FileDetails fd, final InternalScanner scanner,
|
||||||
|
CellSink writer, long smallestReadPoint, boolean cleanSeqId,
|
||||||
|
ThroughputController throughputController, CompactionRequestImpl request,
|
||||||
|
CompactionProgress progress) throws IOException {
|
||||||
|
|
||||||
|
InternalScanner wrappedScanner = new InternalScanner() {
|
||||||
|
|
||||||
|
private int count = -1;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
|
||||||
|
count++;
|
||||||
|
if (count == rowCount - 1 && testException) {
|
||||||
|
count = 0;
|
||||||
|
throw new IOException("Inject Error");
|
||||||
|
}
|
||||||
|
return scanner.next(result, scannerContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
scanner.close();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return super.performCompaction(fd, wrappedScanner, writer, smallestReadPoint, cleanSeqId,
|
||||||
|
throughputController, request, progress);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue