HBASE-26271 Cleanup the broken store files under data directory (#3786)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Josh Elser <elserj@apache.org>
Signed-off-by: Wellington Ramos Chevreuil <wchevreuil@apache.org>
This commit is contained in:
BukrosSzabolcs 2021-11-09 17:19:00 +01:00 committed by Josh Elser
parent 8bec26ea91
commit a288365f92
23 changed files with 556 additions and 35 deletions

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.HStoreFile;
@ -286,7 +285,6 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* </ol> * </ol>
* @param fd File details * @param fd File details
* @param scanner Where to read from. * @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point. * @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller. * @param throughputController The compaction throughput controller.
@ -295,7 +293,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* @return Whether compaction ended; false if it was interrupted for any reason. * @return Whether compaction ended; false if it was interrupted for any reason.
*/ */
@Override @Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException { boolean major, int numofFilesToCompact) throws IOException {
long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForLog = 0;
@ -665,7 +663,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
@Override @Override
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd, protected List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException { CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath()); List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());

View File

@ -110,7 +110,11 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
return paths; return paths;
} }
protected abstract Collection<StoreFileWriter> writers(); /**
* Returns all writers. This is used to prevent deleting currently writen storefiles
* during cleanup.
*/
public abstract Collection<StoreFileWriter> writers();
/** /**
* Subclasses override this method to be called at the end of a successful sequence of append; all * Subclasses override this method to be called at the end of a successful sequence of append; all

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This Chore, every time it runs, will clear the unsused HFiles in the data
* folder.
*/
@InterfaceAudience.Private
public class BrokenStoreFileCleaner extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class);
public static final String BROKEN_STOREFILE_CLEANER_ENABLED =
"hbase.region.broken.storefilecleaner.enabled";
public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false;
public static final String BROKEN_STOREFILE_CLEANER_TTL =
"hbase.region.broken.storefilecleaner.ttl";
public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h
public static final String BROKEN_STOREFILE_CLEANER_DELAY =
"hbase.region.broken.storefilecleaner.delay";
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h
public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER =
"hbase.region.broken.storefilecleaner.delay.jitter";
public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D;
public static final String BROKEN_STOREFILE_CLEANER_PERIOD =
"hbase.region.broken.storefilecleaner.period";
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h
private HRegionServer regionServer;
private final AtomicBoolean enabled = new AtomicBoolean(true);
private long fileTtl;
public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper,
Configuration conf, HRegionServer regionServer) {
super("BrokenStoreFileCleaner", stopper, period, delay);
this.regionServer = regionServer;
setEnabled(
conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED));
fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL);
}
public boolean setEnabled(final boolean enabled) {
return this.enabled.getAndSet(enabled);
}
public boolean getEnabled() {
return this.enabled.get();
}
@Override
public void chore() {
if (getEnabled()) {
long start = EnvironmentEdgeManager.currentTime();
AtomicLong deletedFiles = new AtomicLong(0);
AtomicLong failedDeletes = new AtomicLong(0);
for (HRegion region : regionServer.getRegions()) {
for (HStore store : region.getStores()) {
//only do cleanup in stores not using tmp directories
if (store.getStoreEngine().requireWritingToTmpDirFirst()) {
continue;
}
Path storePath =
new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName());
try {
List<FileStatus> fsStoreFiles =
Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath));
fsStoreFiles.forEach(
file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes));
} catch (IOException e) {
LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath);
continue;
}
}
}
LOG.debug(
"BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed "
+ "to delete {}",
regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start,
deletedFiles.get(), failedDeletes.get());
} else {
LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning.");
}
}
private void cleanFileIfNeeded(FileStatus file, HStore store,
AtomicLong deletedFiles, AtomicLong failedDeletes) {
if(file.isDirectory()){
LOG.trace("This is a Directory {}, skip cleanup", file.getPath());
return;
}
if(!validate(file.getPath())){
LOG.trace("Invalid file {}, skip cleanup", file.getPath());
return;
}
if(!isOldEnough(file)){
LOG.trace("Fresh file {}, skip cleanup", file.getPath());
return;
}
if(isActiveStorefile(file, store)){
LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath());
return;
}
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
// be skipped here
if(isCompactedFile(file, store)){
LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath());
return;
}
if(isCompactionResultFile(file, store)){
LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath());
return;
}
deleteFile(file, store, deletedFiles, failedDeletes);
}
private boolean isCompactionResultFile(FileStatus file, HStore store) {
return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
}
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
// be skipped here
private boolean isCompactedFile(FileStatus file, HStore store) {
return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream()
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
}
private boolean isActiveStorefile(FileStatus file, HStore store) {
return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
}
boolean validate(Path file) {
if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
return true;
}
return StoreFileInfo.validateStoreFileName(file.getName());
}
boolean isOldEnough(FileStatus file){
return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime();
}
private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles,
AtomicLong failedDeletes) {
Path filePath = file.getPath();
LOG.debug("Removing {} from store", filePath);
try {
boolean success = store.getFileSystem().delete(filePath, false);
if (!success) {
failedDeletes.incrementAndGet();
LOG.warn("Attempted to delete:" + filePath
+ ", but couldn't. Attempt to delete on next pass.");
}
else{
deletedFiles.incrementAndGet();
}
} catch (IOException e) {
e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e;
LOG.warn("Error while deleting: " + filePath, e);
}
}
}

View File

@ -71,7 +71,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
} }
@Override @Override
protected Collection<StoreFileWriter> writers() { public Collection<StoreFileWriter> writers() {
return lowerBoundary2Writer.values(); return lowerBoundary2Writer.values();
} }

View File

@ -609,7 +609,7 @@ public class HRegionFileSystem {
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent); writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem( HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false); env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs); insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
} }
return regionDir; return regionDir;
} }

View File

@ -432,6 +432,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
*/ */
final ServerNonceManager nonceManager; final ServerNonceManager nonceManager;
private BrokenStoreFileCleaner brokenStoreFileCleaner;
@InterfaceAudience.Private @InterfaceAudience.Private
CompactedHFilesDischarger compactedFileDischarger; CompactedHFilesDischarger compactedFileDischarger;
@ -1835,6 +1837,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
if (this.slowLogTableOpsChore != null) { if (this.slowLogTableOpsChore != null) {
choreService.scheduleChore(slowLogTableOpsChore); choreService.scheduleChore(slowLogTableOpsChore);
} }
if (this.brokenStoreFileCleaner != null) {
choreService.scheduleChore(brokenStoreFileCleaner);
}
// Leases is not a Thread. Internally it runs a daemon thread. If it gets // Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit. // an unhandled exception, it will just exit.
@ -1914,6 +1919,22 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
onlyMetaRefresh, this, this); onlyMetaRefresh, this, this);
} }
int brokenStoreFileCleanerPeriod = conf.getInt(
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
int brokenStoreFileCleanerDelay = conf.getInt(
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
double brokenStoreFileCleanerDelayJitter = conf.getDouble(
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
double jitterRate = (RandomUtils.nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
this.brokenStoreFileCleaner =
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
brokenStoreFileCleanerPeriod, this, conf, this);
registerConfigurationObservers(); registerConfigurationObservers();
} }
@ -3488,6 +3509,11 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false); return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
} }
@InterfaceAudience.Private
public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){
return brokenStoreFileCleaner;
}
@Override @Override
protected void stopChores() { protected void stopChores() {
shutdownChore(nonceManagerChore); shutdownChore(nonceManagerChore);
@ -3498,5 +3524,6 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
shutdownChore(storefileRefresher); shutdownChore(storefileRefresher);
shutdownChore(fsUtilizationChore); shutdownChore(fsUtilizationChore);
shutdownChore(slowLogTableOpsChore); shutdownChore(slowLogTableOpsChore);
shutdownChore(brokenStoreFileCleaner);
} }
} }

View File

@ -1156,6 +1156,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
} }
} }
replaceStoreFiles(filesToCompact, sfs, true); replaceStoreFiles(filesToCompact, sfs, true);
// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
// have failed.
storeEngine.resetCompactionWriter();
if (cr.isMajor()) { if (cr.isMajor()) {
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs()); majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize); majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);

View File

@ -42,9 +42,11 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -532,6 +534,25 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
} }
} }
/**
* Whether the implementation of the used storefile tracker requires you to write to temp
* directory first, i.e, does not allow broken store files under the actual data directory.
*/
public boolean requireWritingToTmpDirFirst() {
return storeFileTracker.requireWritingToTmpDirFirst();
}
/**
* Resets the compaction writer when the new file is committed and used as active storefile.
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
* have failed. Currently called in
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
*/
public void resetCompactionWriter(){
compactor.resetWriter();
}
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "", @RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
allowedOnPath = ".*/TestHStore.java") allowedOnPath = ".*/TestHStore.java")
ReadWriteLock getLock() { ReadWriteLock getLock() {

View File

@ -58,7 +58,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
} }
@Override @Override
protected Collection<StoreFileWriter> writers() { public Collection<StoreFileWriter> writers() {
return existingWriters; return existingWriters;
} }

View File

@ -68,7 +68,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
} }
@Override @Override
protected void abortWriter(T writer) throws IOException { protected void abortWriter() throws IOException {
FileSystem fs = store.getFileSystem(); FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) { for (Path leftoverFile : writer.abortWriters()) {
try { try {
@ -79,5 +79,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
e); e);
} }
} }
//this step signals that the target file is no longer writen and can be cleaned up
writer = null;
} }
} }

View File

@ -25,9 +25,12 @@ import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELET
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -37,6 +40,7 @@ import org.apache.hadoop.hbase.PrivateConstants;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo; import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
@ -92,6 +96,8 @@ public abstract class Compactor<T extends CellSink> {
private final boolean dropCacheMajor; private final boolean dropCacheMajor;
private final boolean dropCacheMinor; private final boolean dropCacheMinor;
protected T writer = null;
//TODO: depending on Store is not good but, realistically, all compactors currently do. //TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(Configuration conf, HStore store) { Compactor(Configuration conf, HStore store) {
this.conf = conf; this.conf = conf;
@ -324,7 +330,6 @@ public abstract class Compactor<T extends CellSink> {
// Find the smallest read point across all the Scanners. // Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint(); long smallestReadPoint = getSmallestReadPoint();
T writer = null;
boolean dropCache; boolean dropCache;
if (request.isMajor() || request.isAllFiles()) { if (request.isMajor() || request.isAllFiles()) {
dropCache = this.dropCacheMajor; dropCache = this.dropCacheMajor;
@ -348,8 +353,13 @@ public abstract class Compactor<T extends CellSink> {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true; cleanSeqId = true;
} }
if (writer != null){
LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream()
.map(n -> n.toString())
.collect(Collectors.joining(", ", "{ ", " }")));
}
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor()); writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor());
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId,
throughputController, request.isAllFiles(), request.getFiles().size()); throughputController, request.isAllFiles(), request.getFiles().size());
if (!finished) { if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region " throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
@ -369,24 +379,23 @@ public abstract class Compactor<T extends CellSink> {
Closeables.close(scanner, true); Closeables.close(scanner, true);
} }
if (!finished && writer != null) { if (!finished && writer != null) {
abortWriter(writer); abortWriter();
} }
} }
assert finished : "We should have exited the method on all error paths"; assert finished : "We should have exited the method on all error paths";
assert writer != null : "Writer should be non-null if no error"; assert writer != null : "Writer should be non-null if no error";
return commitWriter(writer, fd, request); return commitWriter(fd, request);
} }
protected abstract List<Path> commitWriter(T writer, FileDetails fd, protected abstract List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException; CompactionRequestImpl request) throws IOException;
protected abstract void abortWriter(T writer) throws IOException; protected abstract void abortWriter() throws IOException;
/** /**
* Performs the compaction. * Performs the compaction.
* @param fd FileDetails of cell sink writer * @param fd FileDetails of cell sink writer
* @param scanner Where to read from. * @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point. * @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;= * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
* smallestReadPoint * smallestReadPoint
@ -394,7 +403,7 @@ public abstract class Compactor<T extends CellSink> {
* @param numofFilesToCompact the number of files to compact * @param numofFilesToCompact the number of files to compact
* @return Whether compaction ended; false if it was interrupted for some reason. * @return Whether compaction ended; false if it was interrupted for some reason.
*/ */
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException { boolean major, int numofFilesToCompact) throws IOException {
assert writer instanceof ShipperListener; assert writer instanceof ShipperListener;
@ -537,4 +546,24 @@ public abstract class Compactor<T extends CellSink> {
return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
dropDeletesFromRow, dropDeletesToRow); dropDeletesFromRow, dropDeletesToRow);
} }
public List<Path> getCompactionTargets(){
if (writer == null){
return Collections.emptyList();
}
synchronized (writer){
if (writer instanceof StoreFileWriter){
return Arrays.asList(((StoreFileWriter)writer).getPath());
}
return ((AbstractMultiFileWriter)writer).writers().stream().map(sfw -> sfw.getPath()).collect(
Collectors.toList());
}
}
/**
* Reset the Writer when the new storefiles were successfully added
*/
public void resetWriter(){
writer = null;
}
} }

View File

@ -79,8 +79,10 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
} }
@Override @Override
protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd, protected List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException { CompactionRequestImpl request) throws IOException {
return writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles()); List<Path> pathList =
writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
return pathList;
} }
} }

View File

@ -63,7 +63,7 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
} }
@Override @Override
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd, protected List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException { CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath()); List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles()); writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
@ -72,12 +72,19 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
} }
@Override @Override
protected void abortWriter() throws IOException {
abortWriter(writer);
}
protected void abortWriter(StoreFileWriter writer) throws IOException { protected void abortWriter(StoreFileWriter writer) throws IOException {
Path leftoverFile = writer.getPath(); Path leftoverFile = writer.getPath();
try { try {
writer.close(); writer.close();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to close the writer after an unfinished compaction.", e); LOG.warn("Failed to close the writer after an unfinished compaction.", e);
} finally {
//this step signals that the target file is no longer writen and can be cleaned up
writer = null;
} }
try { try {
store.getFileSystem().delete(leftoverFile, false); store.getFileSystem().delete(leftoverFile, false);

View File

@ -125,7 +125,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
} }
@Override @Override
protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd, protected List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException { CompactionRequestImpl request) throws IOException {
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles()); List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";

View File

@ -95,7 +95,7 @@ class FileBasedStoreFileTracker extends StoreFileTrackerBase {
} }
@Override @Override
protected boolean requireWritingToTmpDirFirst() { public boolean requireWritingToTmpDirFirst() {
return false; return false;
} }

View File

@ -57,7 +57,7 @@ class MigrationStoreFileTracker extends StoreFileTrackerBase {
} }
@Override @Override
protected boolean requireWritingToTmpDirFirst() { public boolean requireWritingToTmpDirFirst() {
// Returns true if either of the two StoreFileTracker returns true. // Returns true if either of the two StoreFileTracker returns true.
// For example, if we want to migrate from a tracker implementation which can ignore the broken // For example, if we want to migrate from a tracker implementation which can ignore the broken
// files under data directory to a tracker implementation which can not, if we still allow // files under data directory to a tracker implementation which can not, if we still allow

View File

@ -88,4 +88,10 @@ public interface StoreFileTracker {
* @param builder The table descriptor builder for the given table. * @param builder The table descriptor builder for the given table.
*/ */
TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder); TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder);
/**
* Whether the implementation of this tracker requires you to write to temp directory first, i.e,
* does not allow broken store files under the actual data directory.
*/
boolean requireWritingToTmpDirFirst();
} }

View File

@ -173,12 +173,6 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
return builder.build(); return builder.build();
} }
/**
* Whether the implementation of this tracker requires you to write to temp directory first, i.e,
* does not allow broken store files under the actual data directory.
*/
protected abstract boolean requireWritingToTmpDirFirst();
protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException; protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles, protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,

View File

@ -549,7 +549,7 @@ public class RestoreSnapshotHelper {
" of snapshot=" + snapshotName+ " of snapshot=" + snapshotName+
" to region=" + regionInfo.getEncodedName() + " table=" + tableName); " to region=" + regionInfo.getEncodedName() + " table=" + tableName);
String fileName = restoreStoreFile(familyDir, regionInfo, storeFile, createBackRefs); String fileName = restoreStoreFile(familyDir, regionInfo, storeFile, createBackRefs);
//mark the reference file to be added to tracker // mark the reference file to be added to tracker
filesToTrack.add(new StoreFileInfo(conf, fs, filesToTrack.add(new StoreFileInfo(conf, fs,
new Path(familyDir, fileName), true)); new Path(familyDir, fileName), true));
} }

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@ -89,7 +88,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
} }
@Override @Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException { boolean major, int numofFilesToCompact) throws IOException {

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, RegionServerTests.class })
public class TestBrokenStoreFileCleaner {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBrokenStoreFileCleaner.class);
private final HBaseTestingUtil testUtil = new HBaseTestingUtil();
private final static byte[] fam = Bytes.toBytes("cf_1");
private final static byte[] qual1 = Bytes.toBytes("qf_1");
private final static byte[] val = Bytes.toBytes("val");
private final static String junkFileName = "409fad9a751c4e8c86d7f32581bdc156";
TableName tableName;
@Before
public void setUp() throws Exception {
testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
testUtil.getConfiguration()
.set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_ENABLED, "true");
testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "0");
testUtil.getConfiguration()
.set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, "15000000");
testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, "0");
testUtil.startMiniCluster(1);
}
@After
public void tearDown() throws Exception {
testUtil.deleteTable(tableName);
testUtil.shutdownMiniCluster();
}
@Test
public void testDeletingJunkFile() throws Exception {
tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile");
createTableWithData(tableName);
HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
ServerName sn = testUtil.getMiniHBaseCluster()
.getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner();
//create junk file
HStore store = region.getStore(fam);
Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
Path junkFilePath = new Path(cfPath, junkFileName);
FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
junkFileOS.writeUTF("hello");
junkFileOS.close();
int storeFiles = store.getStorefilesCount();
assertTrue(storeFiles > 0);
//verify the file exist before the chore and missing afterwards
assertTrue(store.getFileSystem().exists(junkFilePath));
cleaner.chore();
assertFalse(store.getFileSystem().exists(junkFilePath));
//verify no storefile got deleted
int currentStoreFiles = store.getStorefilesCount();
assertEquals(currentStoreFiles, storeFiles);
}
@Test
public void testSkippingCompactedFiles() throws Exception {
tableName = TableName.valueOf(getClass().getSimpleName() + "testSkippningCompactedFiles");
createTableWithData(tableName);
HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
ServerName sn = testUtil.getMiniHBaseCluster()
.getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner();
//run major compaction to generate compaced files
region.compact(true);
//make sure there are compacted files
HStore store = region.getStore(fam);
int compactedFiles = store.getCompactedFilesCount();
assertTrue(compactedFiles > 0);
cleaner.chore();
//verify none of the compacted files were deleted
int existingCompactedFiles = store.getCompactedFilesCount();
assertEquals(compactedFiles, existingCompactedFiles);
//verify adding a junk file does not break anything
Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
Path junkFilePath = new Path(cfPath, junkFileName);
FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
junkFileOS.writeUTF("hello");
junkFileOS.close();
assertTrue(store.getFileSystem().exists(junkFilePath));
cleaner.setEnabled(true);
cleaner.chore();
assertFalse(store.getFileSystem().exists(junkFilePath));
//verify compacted files are still intact
existingCompactedFiles = store.getCompactedFilesCount();
assertEquals(compactedFiles, existingCompactedFiles);
}
@Test
public void testJunkFileTTL() throws Exception {
tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile");
createTableWithData(tableName);
HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
ServerName sn = testUtil.getMiniHBaseCluster()
.getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
//create junk file
HStore store = region.getStore(fam);
Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
Path junkFilePath = new Path(cfPath, junkFileName);
FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
junkFileOS.writeUTF("hello");
junkFileOS.close();
int storeFiles = store.getStorefilesCount();
assertTrue(storeFiles > 0);
//verify the file exist before the chore
assertTrue(store.getFileSystem().exists(junkFilePath));
//set a 5 sec ttl
rs.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "5000");
BrokenStoreFileCleaner cleaner = new BrokenStoreFileCleaner(15000000,
0, rs, rs.getConfiguration(), rs);
cleaner.chore();
//file is still present after chore run
assertTrue(store.getFileSystem().exists(junkFilePath));
Thread.sleep(5000);
cleaner.chore();
assertFalse(store.getFileSystem().exists(junkFilePath));
//verify no storefile got deleted
int currentStoreFiles = store.getStorefilesCount();
assertEquals(currentStoreFiles, storeFiles);
}
private Table createTableWithData(TableName tableName) throws IOException {
Table table = testUtil.createTable(tableName, fam);
try {
for (int i = 1; i < 10; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
table.put(p);
}
// flush them
testUtil.getAdmin().flush(tableName);
for (int i = 11; i < 20; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
table.put(p);
}
// flush them
testUtil.getAdmin().flush(tableName);
for (int i = 21; i < 30; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
table.put(p);
}
// flush them
testUtil.getAdmin().flush(tableName);
} catch (IOException e) {
table.close();
throw e;
}
return table;
}
}

View File

@ -128,13 +128,13 @@ public class TestCompactorMemLeak {
} }
@Override @Override
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd, protected List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException { CompactionRequestImpl request) throws IOException {
HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer; HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer;
Cell cell = writerImpl.getLastCell(); Cell cell = writerImpl.getLastCell();
// The cell should be backend with an KeyOnlyKeyValue. // The cell should be backend with an KeyOnlyKeyValue.
IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue); IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue);
return super.commitWriter(writer, fd, request); return super.commitWriter(fd, request);
} }
} }
} }

View File

@ -47,7 +47,6 @@ public class TestStoreFileTracker extends DefaultStoreFileTracker {
} else { } else {
LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null."); LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null.");
} }
} }
@Override @Override