HBASE-26271 Cleanup the broken store files under data directory (#3786)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Josh Elser <elserj@apache.org> Signed-off-by: Wellington Ramos Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
8bec26ea91
commit
a288365f92
|
@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||
import org.apache.hadoop.hbase.regionserver.HMobStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
|
@ -286,7 +285,6 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
* </ol>
|
||||
* @param fd File details
|
||||
* @param scanner Where to read from.
|
||||
* @param writer Where to write to.
|
||||
* @param smallestReadPoint Smallest read point.
|
||||
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
|
||||
* @param throughputController The compaction throughput controller.
|
||||
|
@ -295,7 +293,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
* @return Whether compaction ended; false if it was interrupted for any reason.
|
||||
*/
|
||||
@Override
|
||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
|
||||
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
||||
boolean major, int numofFilesToCompact) throws IOException {
|
||||
long bytesWrittenProgressForLog = 0;
|
||||
|
@ -665,7 +663,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
|
||||
|
||||
@Override
|
||||
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
|
||||
protected List<Path> commitWriter(FileDetails fd,
|
||||
CompactionRequestImpl request) throws IOException {
|
||||
List<Path> newFiles = Lists.newArrayList(writer.getPath());
|
||||
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
|
||||
|
|
|
@ -110,7 +110,11 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
|
|||
return paths;
|
||||
}
|
||||
|
||||
protected abstract Collection<StoreFileWriter> writers();
|
||||
/**
|
||||
* Returns all writers. This is used to prevent deleting currently writen storefiles
|
||||
* during cleanup.
|
||||
*/
|
||||
public abstract Collection<StoreFileWriter> writers();
|
||||
|
||||
/**
|
||||
* Subclasses override this method to be called at the end of a successful sequence of append; all
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This Chore, every time it runs, will clear the unsused HFiles in the data
|
||||
* folder.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BrokenStoreFileCleaner extends ScheduledChore {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class);
|
||||
public static final String BROKEN_STOREFILE_CLEANER_ENABLED =
|
||||
"hbase.region.broken.storefilecleaner.enabled";
|
||||
public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false;
|
||||
public static final String BROKEN_STOREFILE_CLEANER_TTL =
|
||||
"hbase.region.broken.storefilecleaner.ttl";
|
||||
public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h
|
||||
public static final String BROKEN_STOREFILE_CLEANER_DELAY =
|
||||
"hbase.region.broken.storefilecleaner.delay";
|
||||
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h
|
||||
public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER =
|
||||
"hbase.region.broken.storefilecleaner.delay.jitter";
|
||||
public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D;
|
||||
public static final String BROKEN_STOREFILE_CLEANER_PERIOD =
|
||||
"hbase.region.broken.storefilecleaner.period";
|
||||
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h
|
||||
|
||||
private HRegionServer regionServer;
|
||||
private final AtomicBoolean enabled = new AtomicBoolean(true);
|
||||
private long fileTtl;
|
||||
|
||||
public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper,
|
||||
Configuration conf, HRegionServer regionServer) {
|
||||
super("BrokenStoreFileCleaner", stopper, period, delay);
|
||||
this.regionServer = regionServer;
|
||||
setEnabled(
|
||||
conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED));
|
||||
fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL);
|
||||
}
|
||||
|
||||
public boolean setEnabled(final boolean enabled) {
|
||||
return this.enabled.getAndSet(enabled);
|
||||
}
|
||||
|
||||
public boolean getEnabled() {
|
||||
return this.enabled.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void chore() {
|
||||
if (getEnabled()) {
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
AtomicLong deletedFiles = new AtomicLong(0);
|
||||
AtomicLong failedDeletes = new AtomicLong(0);
|
||||
for (HRegion region : regionServer.getRegions()) {
|
||||
for (HStore store : region.getStores()) {
|
||||
//only do cleanup in stores not using tmp directories
|
||||
if (store.getStoreEngine().requireWritingToTmpDirFirst()) {
|
||||
continue;
|
||||
}
|
||||
Path storePath =
|
||||
new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName());
|
||||
|
||||
try {
|
||||
List<FileStatus> fsStoreFiles =
|
||||
Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath));
|
||||
fsStoreFiles.forEach(
|
||||
file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes));
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.debug(
|
||||
"BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed "
|
||||
+ "to delete {}",
|
||||
regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start,
|
||||
deletedFiles.get(), failedDeletes.get());
|
||||
} else {
|
||||
LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning.");
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanFileIfNeeded(FileStatus file, HStore store,
|
||||
AtomicLong deletedFiles, AtomicLong failedDeletes) {
|
||||
if(file.isDirectory()){
|
||||
LOG.trace("This is a Directory {}, skip cleanup", file.getPath());
|
||||
return;
|
||||
}
|
||||
|
||||
if(!validate(file.getPath())){
|
||||
LOG.trace("Invalid file {}, skip cleanup", file.getPath());
|
||||
return;
|
||||
}
|
||||
|
||||
if(!isOldEnough(file)){
|
||||
LOG.trace("Fresh file {}, skip cleanup", file.getPath());
|
||||
return;
|
||||
}
|
||||
|
||||
if(isActiveStorefile(file, store)){
|
||||
LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath());
|
||||
return;
|
||||
}
|
||||
|
||||
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
|
||||
// be skipped here
|
||||
if(isCompactedFile(file, store)){
|
||||
LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath());
|
||||
return;
|
||||
}
|
||||
|
||||
if(isCompactionResultFile(file, store)){
|
||||
LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath());
|
||||
return;
|
||||
}
|
||||
|
||||
deleteFile(file, store, deletedFiles, failedDeletes);
|
||||
}
|
||||
|
||||
private boolean isCompactionResultFile(FileStatus file, HStore store) {
|
||||
return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
|
||||
}
|
||||
|
||||
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
|
||||
// be skipped here
|
||||
private boolean isCompactedFile(FileStatus file, HStore store) {
|
||||
return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream()
|
||||
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
|
||||
}
|
||||
|
||||
private boolean isActiveStorefile(FileStatus file, HStore store) {
|
||||
return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
|
||||
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
|
||||
}
|
||||
|
||||
boolean validate(Path file) {
|
||||
if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
|
||||
return true;
|
||||
}
|
||||
return StoreFileInfo.validateStoreFileName(file.getName());
|
||||
}
|
||||
|
||||
boolean isOldEnough(FileStatus file){
|
||||
return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime();
|
||||
}
|
||||
|
||||
private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles,
|
||||
AtomicLong failedDeletes) {
|
||||
Path filePath = file.getPath();
|
||||
LOG.debug("Removing {} from store", filePath);
|
||||
try {
|
||||
boolean success = store.getFileSystem().delete(filePath, false);
|
||||
if (!success) {
|
||||
failedDeletes.incrementAndGet();
|
||||
LOG.warn("Attempted to delete:" + filePath
|
||||
+ ", but couldn't. Attempt to delete on next pass.");
|
||||
}
|
||||
else{
|
||||
deletedFiles.incrementAndGet();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e = e instanceof RemoteException ?
|
||||
((RemoteException)e).unwrapRemoteException() : e;
|
||||
LOG.warn("Error while deleting: " + filePath, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -71,7 +71,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Collection<StoreFileWriter> writers() {
|
||||
public Collection<StoreFileWriter> writers() {
|
||||
return lowerBoundary2Writer.values();
|
||||
}
|
||||
|
||||
|
|
|
@ -609,7 +609,7 @@ public class HRegionFileSystem {
|
|||
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
|
||||
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
||||
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
|
||||
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
|
||||
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
|
||||
}
|
||||
return regionDir;
|
||||
}
|
||||
|
|
|
@ -432,6 +432,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
|
|||
*/
|
||||
final ServerNonceManager nonceManager;
|
||||
|
||||
private BrokenStoreFileCleaner brokenStoreFileCleaner;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
CompactedHFilesDischarger compactedFileDischarger;
|
||||
|
||||
|
@ -1835,6 +1837,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
|
|||
if (this.slowLogTableOpsChore != null) {
|
||||
choreService.scheduleChore(slowLogTableOpsChore);
|
||||
}
|
||||
if (this.brokenStoreFileCleaner != null) {
|
||||
choreService.scheduleChore(brokenStoreFileCleaner);
|
||||
}
|
||||
|
||||
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
||||
// an unhandled exception, it will just exit.
|
||||
|
@ -1914,6 +1919,22 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
|
|||
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
|
||||
onlyMetaRefresh, this, this);
|
||||
}
|
||||
|
||||
int brokenStoreFileCleanerPeriod = conf.getInt(
|
||||
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
|
||||
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
|
||||
int brokenStoreFileCleanerDelay = conf.getInt(
|
||||
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
|
||||
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
|
||||
double brokenStoreFileCleanerDelayJitter = conf.getDouble(
|
||||
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
|
||||
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
|
||||
double jitterRate = (RandomUtils.nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
|
||||
long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
|
||||
this.brokenStoreFileCleaner =
|
||||
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
|
||||
brokenStoreFileCleanerPeriod, this, conf, this);
|
||||
|
||||
registerConfigurationObservers();
|
||||
}
|
||||
|
||||
|
@ -3488,6 +3509,11 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
|
|||
return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){
|
||||
return brokenStoreFileCleaner;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void stopChores() {
|
||||
shutdownChore(nonceManagerChore);
|
||||
|
@ -3498,5 +3524,6 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
|
|||
shutdownChore(storefileRefresher);
|
||||
shutdownChore(fsUtilizationChore);
|
||||
shutdownChore(slowLogTableOpsChore);
|
||||
shutdownChore(brokenStoreFileCleaner);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1156,6 +1156,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
}
|
||||
}
|
||||
replaceStoreFiles(filesToCompact, sfs, true);
|
||||
|
||||
// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
|
||||
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
|
||||
// have failed.
|
||||
storeEngine.resetCompactionWriter();
|
||||
|
||||
if (cr.isMajor()) {
|
||||
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
|
||||
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
|
||||
|
|
|
@ -42,9 +42,11 @@ import org.apache.hadoop.hbase.CellComparator;
|
|||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -532,6 +534,25 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the implementation of the used storefile tracker requires you to write to temp
|
||||
* directory first, i.e, does not allow broken store files under the actual data directory.
|
||||
*/
|
||||
public boolean requireWritingToTmpDirFirst() {
|
||||
return storeFileTracker.requireWritingToTmpDirFirst();
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets the compaction writer when the new file is committed and used as active storefile.
|
||||
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
|
||||
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
|
||||
* have failed. Currently called in
|
||||
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
|
||||
*/
|
||||
public void resetCompactionWriter(){
|
||||
compactor.resetWriter();
|
||||
}
|
||||
|
||||
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
|
||||
allowedOnPath = ".*/TestHStore.java")
|
||||
ReadWriteLock getLock() {
|
||||
|
|
|
@ -58,7 +58,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Collection<StoreFileWriter> writers() {
|
||||
public Collection<StoreFileWriter> writers() {
|
||||
return existingWriters;
|
||||
}
|
||||
|
||||
|
|
|
@ -68,7 +68,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void abortWriter(T writer) throws IOException {
|
||||
protected void abortWriter() throws IOException {
|
||||
FileSystem fs = store.getFileSystem();
|
||||
for (Path leftoverFile : writer.abortWriters()) {
|
||||
try {
|
||||
|
@ -79,5 +79,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
|
|||
e);
|
||||
}
|
||||
}
|
||||
//this step signals that the target file is no longer writen and can be cleaned up
|
||||
writer = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,9 +25,12 @@ import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELET
|
|||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
|
@ -37,6 +40,7 @@ import org.apache.hadoop.hbase.PrivateConstants;
|
|||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
|
||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
|
@ -92,6 +96,8 @@ public abstract class Compactor<T extends CellSink> {
|
|||
private final boolean dropCacheMajor;
|
||||
private final boolean dropCacheMinor;
|
||||
|
||||
protected T writer = null;
|
||||
|
||||
//TODO: depending on Store is not good but, realistically, all compactors currently do.
|
||||
Compactor(Configuration conf, HStore store) {
|
||||
this.conf = conf;
|
||||
|
@ -324,7 +330,6 @@ public abstract class Compactor<T extends CellSink> {
|
|||
// Find the smallest read point across all the Scanners.
|
||||
long smallestReadPoint = getSmallestReadPoint();
|
||||
|
||||
T writer = null;
|
||||
boolean dropCache;
|
||||
if (request.isMajor() || request.isAllFiles()) {
|
||||
dropCache = this.dropCacheMajor;
|
||||
|
@ -348,8 +353,13 @@ public abstract class Compactor<T extends CellSink> {
|
|||
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
|
||||
cleanSeqId = true;
|
||||
}
|
||||
if (writer != null){
|
||||
LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream()
|
||||
.map(n -> n.toString())
|
||||
.collect(Collectors.joining(", ", "{ ", " }")));
|
||||
}
|
||||
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor());
|
||||
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
|
||||
finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId,
|
||||
throughputController, request.isAllFiles(), request.getFiles().size());
|
||||
if (!finished) {
|
||||
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
|
||||
|
@ -369,24 +379,23 @@ public abstract class Compactor<T extends CellSink> {
|
|||
Closeables.close(scanner, true);
|
||||
}
|
||||
if (!finished && writer != null) {
|
||||
abortWriter(writer);
|
||||
abortWriter();
|
||||
}
|
||||
}
|
||||
assert finished : "We should have exited the method on all error paths";
|
||||
assert writer != null : "Writer should be non-null if no error";
|
||||
return commitWriter(writer, fd, request);
|
||||
return commitWriter(fd, request);
|
||||
}
|
||||
|
||||
protected abstract List<Path> commitWriter(T writer, FileDetails fd,
|
||||
protected abstract List<Path> commitWriter(FileDetails fd,
|
||||
CompactionRequestImpl request) throws IOException;
|
||||
|
||||
protected abstract void abortWriter(T writer) throws IOException;
|
||||
protected abstract void abortWriter() throws IOException;
|
||||
|
||||
/**
|
||||
* Performs the compaction.
|
||||
* @param fd FileDetails of cell sink writer
|
||||
* @param scanner Where to read from.
|
||||
* @param writer Where to write to.
|
||||
* @param smallestReadPoint Smallest read point.
|
||||
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
|
||||
* smallestReadPoint
|
||||
|
@ -394,7 +403,7 @@ public abstract class Compactor<T extends CellSink> {
|
|||
* @param numofFilesToCompact the number of files to compact
|
||||
* @return Whether compaction ended; false if it was interrupted for some reason.
|
||||
*/
|
||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
|
||||
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
||||
boolean major, int numofFilesToCompact) throws IOException {
|
||||
assert writer instanceof ShipperListener;
|
||||
|
@ -537,4 +546,24 @@ public abstract class Compactor<T extends CellSink> {
|
|||
return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
|
||||
dropDeletesFromRow, dropDeletesToRow);
|
||||
}
|
||||
|
||||
public List<Path> getCompactionTargets(){
|
||||
if (writer == null){
|
||||
return Collections.emptyList();
|
||||
}
|
||||
synchronized (writer){
|
||||
if (writer instanceof StoreFileWriter){
|
||||
return Arrays.asList(((StoreFileWriter)writer).getPath());
|
||||
}
|
||||
return ((AbstractMultiFileWriter)writer).writers().stream().map(sfw -> sfw.getPath()).collect(
|
||||
Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the Writer when the new storefiles were successfully added
|
||||
*/
|
||||
public void resetWriter(){
|
||||
writer = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,8 +79,10 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
|
||||
protected List<Path> commitWriter(FileDetails fd,
|
||||
CompactionRequestImpl request) throws IOException {
|
||||
return writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
|
||||
List<Path> pathList =
|
||||
writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
|
||||
return pathList;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
|
||||
protected List<Path> commitWriter(FileDetails fd,
|
||||
CompactionRequestImpl request) throws IOException {
|
||||
List<Path> newFiles = Lists.newArrayList(writer.getPath());
|
||||
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
|
||||
|
@ -72,12 +72,19 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void abortWriter() throws IOException {
|
||||
abortWriter(writer);
|
||||
}
|
||||
|
||||
protected void abortWriter(StoreFileWriter writer) throws IOException {
|
||||
Path leftoverFile = writer.getPath();
|
||||
try {
|
||||
writer.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to close the writer after an unfinished compaction.", e);
|
||||
} finally {
|
||||
//this step signals that the target file is no longer writen and can be cleaned up
|
||||
writer = null;
|
||||
}
|
||||
try {
|
||||
store.getFileSystem().delete(leftoverFile, false);
|
||||
|
|
|
@ -125,7 +125,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
|
||||
protected List<Path> commitWriter(FileDetails fd,
|
||||
CompactionRequestImpl request) throws IOException {
|
||||
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
|
||||
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
|
||||
|
|
|
@ -95,7 +95,7 @@ class FileBasedStoreFileTracker extends StoreFileTrackerBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean requireWritingToTmpDirFirst() {
|
||||
public boolean requireWritingToTmpDirFirst() {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ class MigrationStoreFileTracker extends StoreFileTrackerBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean requireWritingToTmpDirFirst() {
|
||||
public boolean requireWritingToTmpDirFirst() {
|
||||
// Returns true if either of the two StoreFileTracker returns true.
|
||||
// For example, if we want to migrate from a tracker implementation which can ignore the broken
|
||||
// files under data directory to a tracker implementation which can not, if we still allow
|
||||
|
|
|
@ -88,4 +88,10 @@ public interface StoreFileTracker {
|
|||
* @param builder The table descriptor builder for the given table.
|
||||
*/
|
||||
TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder);
|
||||
|
||||
/**
|
||||
* Whether the implementation of this tracker requires you to write to temp directory first, i.e,
|
||||
* does not allow broken store files under the actual data directory.
|
||||
*/
|
||||
boolean requireWritingToTmpDirFirst();
|
||||
}
|
||||
|
|
|
@ -173,12 +173,6 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the implementation of this tracker requires you to write to temp directory first, i.e,
|
||||
* does not allow broken store files under the actual data directory.
|
||||
*/
|
||||
protected abstract boolean requireWritingToTmpDirFirst();
|
||||
|
||||
protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
|
||||
|
||||
protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
|
||||
|
|
|
@ -549,7 +549,7 @@ public class RestoreSnapshotHelper {
|
|||
" of snapshot=" + snapshotName+
|
||||
" to region=" + regionInfo.getEncodedName() + " table=" + tableName);
|
||||
String fileName = restoreStoreFile(familyDir, regionInfo, storeFile, createBackRefs);
|
||||
//mark the reference file to be added to tracker
|
||||
// mark the reference file to be added to tracker
|
||||
filesToTrack.add(new StoreFileInfo(conf, fs,
|
||||
new Path(familyDir, fileName), true));
|
||||
}
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.KeyValue;
|
|||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
|
||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||
|
@ -89,7 +88,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
|
||||
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
||||
boolean major, int numofFilesToCompact) throws IOException {
|
||||
|
||||
|
|
|
@ -0,0 +1,225 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MediumTests.class, RegionServerTests.class })
|
||||
public class TestBrokenStoreFileCleaner {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestBrokenStoreFileCleaner.class);
|
||||
|
||||
private final HBaseTestingUtil testUtil = new HBaseTestingUtil();
|
||||
private final static byte[] fam = Bytes.toBytes("cf_1");
|
||||
private final static byte[] qual1 = Bytes.toBytes("qf_1");
|
||||
private final static byte[] val = Bytes.toBytes("val");
|
||||
private final static String junkFileName = "409fad9a751c4e8c86d7f32581bdc156";
|
||||
TableName tableName;
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
|
||||
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
|
||||
testUtil.getConfiguration()
|
||||
.set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_ENABLED, "true");
|
||||
testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "0");
|
||||
testUtil.getConfiguration()
|
||||
.set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, "15000000");
|
||||
testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, "0");
|
||||
testUtil.startMiniCluster(1);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
testUtil.deleteTable(tableName);
|
||||
testUtil.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeletingJunkFile() throws Exception {
|
||||
tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile");
|
||||
createTableWithData(tableName);
|
||||
|
||||
HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
|
||||
ServerName sn = testUtil.getMiniHBaseCluster()
|
||||
.getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
|
||||
HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
|
||||
BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner();
|
||||
|
||||
//create junk file
|
||||
HStore store = region.getStore(fam);
|
||||
Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
|
||||
Path junkFilePath = new Path(cfPath, junkFileName);
|
||||
|
||||
FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
|
||||
junkFileOS.writeUTF("hello");
|
||||
junkFileOS.close();
|
||||
|
||||
int storeFiles = store.getStorefilesCount();
|
||||
assertTrue(storeFiles > 0);
|
||||
|
||||
//verify the file exist before the chore and missing afterwards
|
||||
assertTrue(store.getFileSystem().exists(junkFilePath));
|
||||
cleaner.chore();
|
||||
assertFalse(store.getFileSystem().exists(junkFilePath));
|
||||
|
||||
//verify no storefile got deleted
|
||||
int currentStoreFiles = store.getStorefilesCount();
|
||||
assertEquals(currentStoreFiles, storeFiles);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSkippingCompactedFiles() throws Exception {
|
||||
tableName = TableName.valueOf(getClass().getSimpleName() + "testSkippningCompactedFiles");
|
||||
createTableWithData(tableName);
|
||||
|
||||
HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
|
||||
|
||||
ServerName sn = testUtil.getMiniHBaseCluster()
|
||||
.getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
|
||||
HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
|
||||
BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner();
|
||||
|
||||
//run major compaction to generate compaced files
|
||||
region.compact(true);
|
||||
|
||||
//make sure there are compacted files
|
||||
HStore store = region.getStore(fam);
|
||||
int compactedFiles = store.getCompactedFilesCount();
|
||||
assertTrue(compactedFiles > 0);
|
||||
|
||||
cleaner.chore();
|
||||
|
||||
//verify none of the compacted files were deleted
|
||||
int existingCompactedFiles = store.getCompactedFilesCount();
|
||||
assertEquals(compactedFiles, existingCompactedFiles);
|
||||
|
||||
//verify adding a junk file does not break anything
|
||||
Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
|
||||
Path junkFilePath = new Path(cfPath, junkFileName);
|
||||
|
||||
FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
|
||||
junkFileOS.writeUTF("hello");
|
||||
junkFileOS.close();
|
||||
|
||||
assertTrue(store.getFileSystem().exists(junkFilePath));
|
||||
cleaner.setEnabled(true);
|
||||
cleaner.chore();
|
||||
assertFalse(store.getFileSystem().exists(junkFilePath));
|
||||
|
||||
//verify compacted files are still intact
|
||||
existingCompactedFiles = store.getCompactedFilesCount();
|
||||
assertEquals(compactedFiles, existingCompactedFiles);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJunkFileTTL() throws Exception {
|
||||
tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile");
|
||||
createTableWithData(tableName);
|
||||
|
||||
HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
|
||||
ServerName sn = testUtil.getMiniHBaseCluster()
|
||||
.getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
|
||||
HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
|
||||
|
||||
//create junk file
|
||||
HStore store = region.getStore(fam);
|
||||
Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
|
||||
Path junkFilePath = new Path(cfPath, junkFileName);
|
||||
|
||||
FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
|
||||
junkFileOS.writeUTF("hello");
|
||||
junkFileOS.close();
|
||||
|
||||
int storeFiles = store.getStorefilesCount();
|
||||
assertTrue(storeFiles > 0);
|
||||
|
||||
//verify the file exist before the chore
|
||||
assertTrue(store.getFileSystem().exists(junkFilePath));
|
||||
|
||||
//set a 5 sec ttl
|
||||
rs.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "5000");
|
||||
BrokenStoreFileCleaner cleaner = new BrokenStoreFileCleaner(15000000,
|
||||
0, rs, rs.getConfiguration(), rs);
|
||||
cleaner.chore();
|
||||
//file is still present after chore run
|
||||
assertTrue(store.getFileSystem().exists(junkFilePath));
|
||||
Thread.sleep(5000);
|
||||
cleaner.chore();
|
||||
assertFalse(store.getFileSystem().exists(junkFilePath));
|
||||
|
||||
//verify no storefile got deleted
|
||||
int currentStoreFiles = store.getStorefilesCount();
|
||||
assertEquals(currentStoreFiles, storeFiles);
|
||||
}
|
||||
|
||||
private Table createTableWithData(TableName tableName) throws IOException {
|
||||
Table table = testUtil.createTable(tableName, fam);
|
||||
try {
|
||||
for (int i = 1; i < 10; i++) {
|
||||
Put p = new Put(Bytes.toBytes("row" + i));
|
||||
p.addColumn(fam, qual1, val);
|
||||
table.put(p);
|
||||
}
|
||||
// flush them
|
||||
testUtil.getAdmin().flush(tableName);
|
||||
for (int i = 11; i < 20; i++) {
|
||||
Put p = new Put(Bytes.toBytes("row" + i));
|
||||
p.addColumn(fam, qual1, val);
|
||||
table.put(p);
|
||||
}
|
||||
// flush them
|
||||
testUtil.getAdmin().flush(tableName);
|
||||
for (int i = 21; i < 30; i++) {
|
||||
Put p = new Put(Bytes.toBytes("row" + i));
|
||||
p.addColumn(fam, qual1, val);
|
||||
table.put(p);
|
||||
}
|
||||
// flush them
|
||||
testUtil.getAdmin().flush(tableName);
|
||||
} catch (IOException e) {
|
||||
table.close();
|
||||
throw e;
|
||||
}
|
||||
return table;
|
||||
}
|
||||
}
|
|
@ -128,13 +128,13 @@ public class TestCompactorMemLeak {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
|
||||
protected List<Path> commitWriter(FileDetails fd,
|
||||
CompactionRequestImpl request) throws IOException {
|
||||
HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer;
|
||||
Cell cell = writerImpl.getLastCell();
|
||||
// The cell should be backend with an KeyOnlyKeyValue.
|
||||
IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue);
|
||||
return super.commitWriter(writer, fd, request);
|
||||
return super.commitWriter(fd, request);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,7 +47,6 @@ public class TestStoreFileTracker extends DefaultStoreFileTracker {
|
|||
} else {
|
||||
LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null.");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue