HBASE-26271 Cleanup the broken store files under data directory (#3786)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Josh Elser <elserj@apache.org> Signed-off-by: Wellington Ramos Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
c4325ff088
commit
68252e17cb
|
@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HMobStore;
|
import org.apache.hadoop.hbase.regionserver.HMobStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
|
@ -52,6 +51,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compact passed set of files in the mob-enabled column family.
|
* Compact passed set of files in the mob-enabled column family.
|
||||||
*/
|
*/
|
||||||
|
@ -154,7 +155,6 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
* the scanner to filter the deleted cells.
|
* the scanner to filter the deleted cells.
|
||||||
* @param fd File details
|
* @param fd File details
|
||||||
* @param scanner Where to read from.
|
* @param scanner Where to read from.
|
||||||
* @param writer Where to write to.
|
|
||||||
* @param smallestReadPoint Smallest read point.
|
* @param smallestReadPoint Smallest read point.
|
||||||
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
|
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
|
||||||
* @param throughputController The compaction throughput controller.
|
* @param throughputController The compaction throughput controller.
|
||||||
|
@ -163,7 +163,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
* @return Whether compaction ended; false if it was interrupted for any reason.
|
* @return Whether compaction ended; false if it was interrupted for any reason.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
|
||||||
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
||||||
boolean major, int numofFilesToCompact) throws IOException {
|
boolean major, int numofFilesToCompact) throws IOException {
|
||||||
long bytesWrittenProgressForCloseCheck = 0;
|
long bytesWrittenProgressForCloseCheck = 0;
|
||||||
|
@ -369,4 +369,14 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||||
progress.complete();
|
progress.complete();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<Path> commitWriter(FileDetails fd,
|
||||||
|
CompactionRequestImpl request) throws IOException {
|
||||||
|
List<Path> newFiles = Lists.newArrayList(writer.getPath());
|
||||||
|
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
|
||||||
|
writer.close();
|
||||||
|
return newFiles;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,7 +110,11 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
|
||||||
return paths;
|
return paths;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract Collection<StoreFileWriter> writers();
|
/**
|
||||||
|
* Returns all writers. This is used to prevent deleting currently writen storefiles
|
||||||
|
* during cleanup.
|
||||||
|
*/
|
||||||
|
public abstract Collection<StoreFileWriter> writers();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subclasses override this method to be called at the end of a successful sequence of append; all
|
* Subclasses override this method to be called at the end of a successful sequence of append; all
|
||||||
|
|
|
@ -0,0 +1,202 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.ScheduledChore;
|
||||||
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
import org.apache.hadoop.hbase.io.HFileLink;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This Chore, every time it runs, will clear the unsused HFiles in the data
|
||||||
|
* folder.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class BrokenStoreFileCleaner extends ScheduledChore {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class);
|
||||||
|
public static final String BROKEN_STOREFILE_CLEANER_ENABLED =
|
||||||
|
"hbase.region.broken.storefilecleaner.enabled";
|
||||||
|
public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false;
|
||||||
|
public static final String BROKEN_STOREFILE_CLEANER_TTL =
|
||||||
|
"hbase.region.broken.storefilecleaner.ttl";
|
||||||
|
public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h
|
||||||
|
public static final String BROKEN_STOREFILE_CLEANER_DELAY =
|
||||||
|
"hbase.region.broken.storefilecleaner.delay";
|
||||||
|
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h
|
||||||
|
public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER =
|
||||||
|
"hbase.region.broken.storefilecleaner.delay.jitter";
|
||||||
|
public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D;
|
||||||
|
public static final String BROKEN_STOREFILE_CLEANER_PERIOD =
|
||||||
|
"hbase.region.broken.storefilecleaner.period";
|
||||||
|
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h
|
||||||
|
|
||||||
|
private HRegionServer regionServer;
|
||||||
|
private final AtomicBoolean enabled = new AtomicBoolean(true);
|
||||||
|
private long fileTtl;
|
||||||
|
|
||||||
|
public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper,
|
||||||
|
Configuration conf, HRegionServer regionServer) {
|
||||||
|
super("BrokenStoreFileCleaner", stopper, period, delay);
|
||||||
|
this.regionServer = regionServer;
|
||||||
|
setEnabled(
|
||||||
|
conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED));
|
||||||
|
fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean setEnabled(final boolean enabled) {
|
||||||
|
return this.enabled.getAndSet(enabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getEnabled() {
|
||||||
|
return this.enabled.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void chore() {
|
||||||
|
if (getEnabled()) {
|
||||||
|
long start = EnvironmentEdgeManager.currentTime();
|
||||||
|
AtomicLong deletedFiles = new AtomicLong(0);
|
||||||
|
AtomicLong failedDeletes = new AtomicLong(0);
|
||||||
|
for (HRegion region : regionServer.getRegions()) {
|
||||||
|
for (HStore store : region.getStores()) {
|
||||||
|
//only do cleanup in stores not using tmp directories
|
||||||
|
if (store.getStoreEngine().requireWritingToTmpDirFirst()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Path storePath =
|
||||||
|
new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName());
|
||||||
|
|
||||||
|
try {
|
||||||
|
List<FileStatus> fsStoreFiles =
|
||||||
|
Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath));
|
||||||
|
fsStoreFiles.forEach(
|
||||||
|
file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes));
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.debug(
|
||||||
|
"BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed "
|
||||||
|
+ "to delete {}",
|
||||||
|
regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start,
|
||||||
|
deletedFiles.get(), failedDeletes.get());
|
||||||
|
} else {
|
||||||
|
LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanFileIfNeeded(FileStatus file, HStore store,
|
||||||
|
AtomicLong deletedFiles, AtomicLong failedDeletes) {
|
||||||
|
if(file.isDirectory()){
|
||||||
|
LOG.trace("This is a Directory {}, skip cleanup", file.getPath());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!validate(file.getPath())){
|
||||||
|
LOG.trace("Invalid file {}, skip cleanup", file.getPath());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!isOldEnough(file)){
|
||||||
|
LOG.trace("Fresh file {}, skip cleanup", file.getPath());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(isActiveStorefile(file, store)){
|
||||||
|
LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
|
||||||
|
// be skipped here
|
||||||
|
if(isCompactedFile(file, store)){
|
||||||
|
LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(isCompactionResultFile(file, store)){
|
||||||
|
LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
deleteFile(file, store, deletedFiles, failedDeletes);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isCompactionResultFile(FileStatus file, HStore store) {
|
||||||
|
return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
|
||||||
|
// be skipped here
|
||||||
|
private boolean isCompactedFile(FileStatus file, HStore store) {
|
||||||
|
return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream()
|
||||||
|
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isActiveStorefile(FileStatus file, HStore store) {
|
||||||
|
return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
|
||||||
|
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean validate(Path file) {
|
||||||
|
if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return StoreFileInfo.validateStoreFileName(file.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isOldEnough(FileStatus file){
|
||||||
|
return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles,
|
||||||
|
AtomicLong failedDeletes) {
|
||||||
|
Path filePath = file.getPath();
|
||||||
|
LOG.debug("Removing {} from store", filePath);
|
||||||
|
try {
|
||||||
|
boolean success = store.getFileSystem().delete(filePath, false);
|
||||||
|
if (!success) {
|
||||||
|
failedDeletes.incrementAndGet();
|
||||||
|
LOG.warn("Attempted to delete:" + filePath
|
||||||
|
+ ", but couldn't. Attempt to delete on next pass.");
|
||||||
|
}
|
||||||
|
else{
|
||||||
|
deletedFiles.incrementAndGet();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
e = e instanceof RemoteException ?
|
||||||
|
((RemoteException)e).unwrapRemoteException() : e;
|
||||||
|
LOG.warn("Error while deleting: " + filePath, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -71,7 +71,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<StoreFileWriter> writers() {
|
public Collection<StoreFileWriter> writers() {
|
||||||
return lowerBoundary2Writer.values();
|
return lowerBoundary2Writer.values();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -609,7 +609,7 @@ public class HRegionFileSystem {
|
||||||
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
|
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
|
||||||
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
|
||||||
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
|
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
|
||||||
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
|
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
|
||||||
}
|
}
|
||||||
return regionDir;
|
return regionDir;
|
||||||
}
|
}
|
||||||
|
|
|
@ -546,6 +546,8 @@ public class HRegionServer extends Thread implements
|
||||||
*/
|
*/
|
||||||
protected final ConfigurationManager configurationManager;
|
protected final ConfigurationManager configurationManager;
|
||||||
|
|
||||||
|
private BrokenStoreFileCleaner brokenStoreFileCleaner;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
CompactedHFilesDischarger compactedFileDischarger;
|
CompactedHFilesDischarger compactedFileDischarger;
|
||||||
|
|
||||||
|
@ -2156,6 +2158,9 @@ public class HRegionServer extends Thread implements
|
||||||
if (this.slowLogTableOpsChore != null) {
|
if (this.slowLogTableOpsChore != null) {
|
||||||
choreService.scheduleChore(slowLogTableOpsChore);
|
choreService.scheduleChore(slowLogTableOpsChore);
|
||||||
}
|
}
|
||||||
|
if (this.brokenStoreFileCleaner != null) {
|
||||||
|
choreService.scheduleChore(brokenStoreFileCleaner);
|
||||||
|
}
|
||||||
|
|
||||||
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
|
||||||
// an unhandled exception, it will just exit.
|
// an unhandled exception, it will just exit.
|
||||||
|
@ -2236,6 +2241,22 @@ public class HRegionServer extends Thread implements
|
||||||
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
|
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
|
||||||
onlyMetaRefresh, this, this);
|
onlyMetaRefresh, this, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int brokenStoreFileCleanerPeriod = conf.getInt(
|
||||||
|
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
|
||||||
|
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
|
||||||
|
int brokenStoreFileCleanerDelay = conf.getInt(
|
||||||
|
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
|
||||||
|
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
|
||||||
|
double brokenStoreFileCleanerDelayJitter = conf.getDouble(
|
||||||
|
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
|
||||||
|
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
|
||||||
|
double jitterRate = (RandomUtils.nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
|
||||||
|
long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
|
||||||
|
this.brokenStoreFileCleaner =
|
||||||
|
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
|
||||||
|
brokenStoreFileCleanerPeriod, this, conf, this);
|
||||||
|
|
||||||
registerConfigurationObservers();
|
registerConfigurationObservers();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4027,4 +4048,9 @@ public class HRegionServer extends Thread implements
|
||||||
public MetaRegionLocationCache getMetaRegionLocationCache() {
|
public MetaRegionLocationCache getMetaRegionLocationCache() {
|
||||||
return this.metaRegionLocationCache;
|
return this.metaRegionLocationCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){
|
||||||
|
return brokenStoreFileCleaner;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1156,6 +1156,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
replaceStoreFiles(filesToCompact, sfs, true);
|
replaceStoreFiles(filesToCompact, sfs, true);
|
||||||
|
|
||||||
|
// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
|
||||||
|
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
|
||||||
|
// have failed.
|
||||||
|
storeEngine.resetCompactionWriter();
|
||||||
|
|
||||||
if (cr.isMajor()) {
|
if (cr.isMajor()) {
|
||||||
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
|
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
|
||||||
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
|
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
|
||||||
|
|
|
@ -42,9 +42,11 @@ import org.apache.hadoop.hbase.CellComparator;
|
||||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
|
||||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
|
||||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -532,6 +534,25 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the implementation of the used storefile tracker requires you to write to temp
|
||||||
|
* directory first, i.e, does not allow broken store files under the actual data directory.
|
||||||
|
*/
|
||||||
|
public boolean requireWritingToTmpDirFirst() {
|
||||||
|
return storeFileTracker.requireWritingToTmpDirFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resets the compaction writer when the new file is committed and used as active storefile.
|
||||||
|
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
|
||||||
|
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
|
||||||
|
* have failed. Currently called in
|
||||||
|
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
|
||||||
|
*/
|
||||||
|
public void resetCompactionWriter(){
|
||||||
|
compactor.resetWriter();
|
||||||
|
}
|
||||||
|
|
||||||
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
|
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
|
||||||
allowedOnPath = ".*/TestHStore.java")
|
allowedOnPath = ".*/TestHStore.java")
|
||||||
ReadWriteLock getLock() {
|
ReadWriteLock getLock() {
|
||||||
|
|
|
@ -58,7 +58,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<StoreFileWriter> writers() {
|
public Collection<StoreFileWriter> writers() {
|
||||||
return existingWriters;
|
return existingWriters;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -68,7 +68,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void abortWriter(T writer) throws IOException {
|
protected void abortWriter() throws IOException {
|
||||||
FileSystem fs = store.getFileSystem();
|
FileSystem fs = store.getFileSystem();
|
||||||
for (Path leftoverFile : writer.abortWriters()) {
|
for (Path leftoverFile : writer.abortWriters()) {
|
||||||
try {
|
try {
|
||||||
|
@ -79,5 +79,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
|
||||||
e);
|
e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//this step signals that the target file is no longer writen and can be cleaned up
|
||||||
|
writer = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,9 +25,12 @@ import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELET
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
@ -36,6 +39,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
|
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
|
||||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||||
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
|
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
|
@ -91,6 +95,8 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
private final boolean dropCacheMajor;
|
private final boolean dropCacheMajor;
|
||||||
private final boolean dropCacheMinor;
|
private final boolean dropCacheMinor;
|
||||||
|
|
||||||
|
protected T writer = null;
|
||||||
|
|
||||||
//TODO: depending on Store is not good but, realistically, all compactors currently do.
|
//TODO: depending on Store is not good but, realistically, all compactors currently do.
|
||||||
Compactor(Configuration conf, HStore store) {
|
Compactor(Configuration conf, HStore store) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
@ -323,7 +329,6 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
// Find the smallest read point across all the Scanners.
|
// Find the smallest read point across all the Scanners.
|
||||||
long smallestReadPoint = getSmallestReadPoint();
|
long smallestReadPoint = getSmallestReadPoint();
|
||||||
|
|
||||||
T writer = null;
|
|
||||||
boolean dropCache;
|
boolean dropCache;
|
||||||
if (request.isMajor() || request.isAllFiles()) {
|
if (request.isMajor() || request.isAllFiles()) {
|
||||||
dropCache = this.dropCacheMajor;
|
dropCache = this.dropCacheMajor;
|
||||||
|
@ -347,8 +352,13 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
|
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
|
||||||
cleanSeqId = true;
|
cleanSeqId = true;
|
||||||
}
|
}
|
||||||
|
if (writer != null){
|
||||||
|
LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream()
|
||||||
|
.map(n -> n.toString())
|
||||||
|
.collect(Collectors.joining(", ", "{ ", " }")));
|
||||||
|
}
|
||||||
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor());
|
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor());
|
||||||
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
|
finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId,
|
||||||
throughputController, request.isAllFiles(), request.getFiles().size());
|
throughputController, request.isAllFiles(), request.getFiles().size());
|
||||||
if (!finished) {
|
if (!finished) {
|
||||||
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
|
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
|
||||||
|
@ -368,24 +378,23 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
Closeables.close(scanner, true);
|
Closeables.close(scanner, true);
|
||||||
}
|
}
|
||||||
if (!finished && writer != null) {
|
if (!finished && writer != null) {
|
||||||
abortWriter(writer);
|
abortWriter();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert finished : "We should have exited the method on all error paths";
|
assert finished : "We should have exited the method on all error paths";
|
||||||
assert writer != null : "Writer should be non-null if no error";
|
assert writer != null : "Writer should be non-null if no error";
|
||||||
return commitWriter(writer, fd, request);
|
return commitWriter(fd, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract List<Path> commitWriter(T writer, FileDetails fd,
|
protected abstract List<Path> commitWriter(FileDetails fd,
|
||||||
CompactionRequestImpl request) throws IOException;
|
CompactionRequestImpl request) throws IOException;
|
||||||
|
|
||||||
protected abstract void abortWriter(T writer) throws IOException;
|
protected abstract void abortWriter() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs the compaction.
|
* Performs the compaction.
|
||||||
* @param fd FileDetails of cell sink writer
|
* @param fd FileDetails of cell sink writer
|
||||||
* @param scanner Where to read from.
|
* @param scanner Where to read from.
|
||||||
* @param writer Where to write to.
|
|
||||||
* @param smallestReadPoint Smallest read point.
|
* @param smallestReadPoint Smallest read point.
|
||||||
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
|
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
|
||||||
* smallestReadPoint
|
* smallestReadPoint
|
||||||
|
@ -393,7 +402,7 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
* @param numofFilesToCompact the number of files to compact
|
* @param numofFilesToCompact the number of files to compact
|
||||||
* @return Whether compaction ended; false if it was interrupted for some reason.
|
* @return Whether compaction ended; false if it was interrupted for some reason.
|
||||||
*/
|
*/
|
||||||
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
|
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
|
||||||
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
|
||||||
boolean major, int numofFilesToCompact) throws IOException {
|
boolean major, int numofFilesToCompact) throws IOException {
|
||||||
assert writer instanceof ShipperListener;
|
assert writer instanceof ShipperListener;
|
||||||
|
@ -536,4 +545,24 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
|
return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
|
||||||
dropDeletesFromRow, dropDeletesToRow);
|
dropDeletesFromRow, dropDeletesToRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<Path> getCompactionTargets(){
|
||||||
|
if (writer == null){
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
synchronized (writer){
|
||||||
|
if (writer instanceof StoreFileWriter){
|
||||||
|
return Arrays.asList(((StoreFileWriter)writer).getPath());
|
||||||
|
}
|
||||||
|
return ((AbstractMultiFileWriter)writer).writers().stream().map(sfw -> sfw.getPath()).collect(
|
||||||
|
Collectors.toList());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset the Writer when the new storefiles were successfully added
|
||||||
|
*/
|
||||||
|
public void resetWriter(){
|
||||||
|
writer = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,8 +79,10 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
|
protected List<Path> commitWriter(FileDetails fd,
|
||||||
CompactionRequestImpl request) throws IOException {
|
CompactionRequestImpl request) throws IOException {
|
||||||
return writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
|
List<Path> pathList =
|
||||||
|
writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
|
||||||
|
return pathList;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
|
protected List<Path> commitWriter(FileDetails fd,
|
||||||
CompactionRequestImpl request) throws IOException {
|
CompactionRequestImpl request) throws IOException {
|
||||||
List<Path> newFiles = Lists.newArrayList(writer.getPath());
|
List<Path> newFiles = Lists.newArrayList(writer.getPath());
|
||||||
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
|
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
|
||||||
|
@ -72,12 +72,19 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
protected void abortWriter() throws IOException {
|
||||||
|
abortWriter(writer);
|
||||||
|
}
|
||||||
|
|
||||||
protected void abortWriter(StoreFileWriter writer) throws IOException {
|
protected void abortWriter(StoreFileWriter writer) throws IOException {
|
||||||
Path leftoverFile = writer.getPath();
|
Path leftoverFile = writer.getPath();
|
||||||
try {
|
try {
|
||||||
writer.close();
|
writer.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to close the writer after an unfinished compaction.", e);
|
LOG.warn("Failed to close the writer after an unfinished compaction.", e);
|
||||||
|
} finally {
|
||||||
|
//this step signals that the target file is no longer writen and can be cleaned up
|
||||||
|
writer = null;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
store.getFileSystem().delete(leftoverFile, false);
|
store.getFileSystem().delete(leftoverFile, false);
|
||||||
|
|
|
@ -125,7 +125,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
|
protected List<Path> commitWriter(FileDetails fd,
|
||||||
CompactionRequestImpl request) throws IOException {
|
CompactionRequestImpl request) throws IOException {
|
||||||
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
|
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
|
||||||
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
|
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
|
||||||
|
|
|
@ -95,7 +95,7 @@ class FileBasedStoreFileTracker extends StoreFileTrackerBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean requireWritingToTmpDirFirst() {
|
public boolean requireWritingToTmpDirFirst() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ class MigrationStoreFileTracker extends StoreFileTrackerBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean requireWritingToTmpDirFirst() {
|
public boolean requireWritingToTmpDirFirst() {
|
||||||
// Returns true if either of the two StoreFileTracker returns true.
|
// Returns true if either of the two StoreFileTracker returns true.
|
||||||
// For example, if we want to migrate from a tracker implementation which can ignore the broken
|
// For example, if we want to migrate from a tracker implementation which can ignore the broken
|
||||||
// files under data directory to a tracker implementation which can not, if we still allow
|
// files under data directory to a tracker implementation which can not, if we still allow
|
||||||
|
|
|
@ -88,4 +88,10 @@ public interface StoreFileTracker {
|
||||||
* @param builder The table descriptor builder for the given table.
|
* @param builder The table descriptor builder for the given table.
|
||||||
*/
|
*/
|
||||||
TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder);
|
TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the implementation of this tracker requires you to write to temp directory first, i.e,
|
||||||
|
* does not allow broken store files under the actual data directory.
|
||||||
|
*/
|
||||||
|
boolean requireWritingToTmpDirFirst();
|
||||||
}
|
}
|
||||||
|
|
|
@ -173,12 +173,6 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Whether the implementation of this tracker requires you to write to temp directory first, i.e,
|
|
||||||
* does not allow broken store files under the actual data directory.
|
|
||||||
*/
|
|
||||||
protected abstract boolean requireWritingToTmpDirFirst();
|
|
||||||
|
|
||||||
protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
|
protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
|
||||||
|
|
||||||
protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
|
protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
|
||||||
|
|
|
@ -549,7 +549,7 @@ public class RestoreSnapshotHelper {
|
||||||
" of snapshot=" + snapshotName+
|
" of snapshot=" + snapshotName+
|
||||||
" to region=" + regionInfo.getEncodedName() + " table=" + tableName);
|
" to region=" + regionInfo.getEncodedName() + " table=" + tableName);
|
||||||
String fileName = restoreStoreFile(familyDir, regionInfo, storeFile, createBackRefs);
|
String fileName = restoreStoreFile(familyDir, regionInfo, storeFile, createBackRefs);
|
||||||
//mark the reference file to be added to tracker
|
// mark the reference file to be added to tracker
|
||||||
filesToTrack.add(new StoreFileInfo(conf, fs,
|
filesToTrack.add(new StoreFileInfo(conf, fs,
|
||||||
new Path(familyDir, fileName), true));
|
new Path(familyDir, fileName), true));
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,226 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MediumTests.class, RegionServerTests.class })
|
||||||
|
public class TestBrokenStoreFileCleaner {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestBrokenStoreFileCleaner.class);
|
||||||
|
|
||||||
|
private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
|
||||||
|
private final static byte[] fam = Bytes.toBytes("cf_1");
|
||||||
|
private final static byte[] qual1 = Bytes.toBytes("qf_1");
|
||||||
|
private final static byte[] val = Bytes.toBytes("val");
|
||||||
|
private final static String junkFileName = "409fad9a751c4e8c86d7f32581bdc156";
|
||||||
|
TableName tableName;
|
||||||
|
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
|
||||||
|
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
|
||||||
|
testUtil.getConfiguration()
|
||||||
|
.set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_ENABLED, "true");
|
||||||
|
testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "0");
|
||||||
|
testUtil.getConfiguration()
|
||||||
|
.set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, "15000000");
|
||||||
|
testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, "0");
|
||||||
|
testUtil.startMiniCluster(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
testUtil.deleteTable(tableName);
|
||||||
|
testUtil.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeletingJunkFile() throws Exception {
|
||||||
|
tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile");
|
||||||
|
createTableWithData(tableName);
|
||||||
|
|
||||||
|
HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
|
||||||
|
ServerName sn = testUtil.getMiniHBaseCluster()
|
||||||
|
.getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
|
||||||
|
HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
|
||||||
|
BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner();
|
||||||
|
|
||||||
|
//create junk file
|
||||||
|
HStore store = region.getStore(fam);
|
||||||
|
Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
|
||||||
|
Path junkFilePath = new Path(cfPath, junkFileName);
|
||||||
|
|
||||||
|
FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
|
||||||
|
junkFileOS.writeUTF("hello");
|
||||||
|
junkFileOS.close();
|
||||||
|
|
||||||
|
int storeFiles = store.getStorefilesCount();
|
||||||
|
assertTrue(storeFiles > 0);
|
||||||
|
|
||||||
|
//verify the file exist before the chore and missing afterwards
|
||||||
|
assertTrue(store.getFileSystem().exists(junkFilePath));
|
||||||
|
cleaner.chore();
|
||||||
|
assertFalse(store.getFileSystem().exists(junkFilePath));
|
||||||
|
|
||||||
|
//verify no storefile got deleted
|
||||||
|
int currentStoreFiles = store.getStorefilesCount();
|
||||||
|
assertEquals(currentStoreFiles, storeFiles);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSkippingCompactedFiles() throws Exception {
|
||||||
|
tableName = TableName.valueOf(getClass().getSimpleName() + "testSkippningCompactedFiles");
|
||||||
|
createTableWithData(tableName);
|
||||||
|
|
||||||
|
HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
|
||||||
|
|
||||||
|
ServerName sn = testUtil.getMiniHBaseCluster()
|
||||||
|
.getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
|
||||||
|
HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
|
||||||
|
BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner();
|
||||||
|
|
||||||
|
//run major compaction to generate compaced files
|
||||||
|
region.compact(true);
|
||||||
|
|
||||||
|
//make sure there are compacted files
|
||||||
|
HStore store = region.getStore(fam);
|
||||||
|
int compactedFiles = store.getCompactedFilesCount();
|
||||||
|
assertTrue(compactedFiles > 0);
|
||||||
|
|
||||||
|
cleaner.chore();
|
||||||
|
|
||||||
|
//verify none of the compacted files were deleted
|
||||||
|
int existingCompactedFiles = store.getCompactedFilesCount();
|
||||||
|
assertEquals(compactedFiles, existingCompactedFiles);
|
||||||
|
|
||||||
|
//verify adding a junk file does not break anything
|
||||||
|
Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
|
||||||
|
Path junkFilePath = new Path(cfPath, junkFileName);
|
||||||
|
|
||||||
|
FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
|
||||||
|
junkFileOS.writeUTF("hello");
|
||||||
|
junkFileOS.close();
|
||||||
|
|
||||||
|
assertTrue(store.getFileSystem().exists(junkFilePath));
|
||||||
|
cleaner.setEnabled(true);
|
||||||
|
cleaner.chore();
|
||||||
|
assertFalse(store.getFileSystem().exists(junkFilePath));
|
||||||
|
|
||||||
|
//verify compacted files are still intact
|
||||||
|
existingCompactedFiles = store.getCompactedFilesCount();
|
||||||
|
assertEquals(compactedFiles, existingCompactedFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJunkFileTTL() throws Exception {
|
||||||
|
tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile");
|
||||||
|
createTableWithData(tableName);
|
||||||
|
|
||||||
|
HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
|
||||||
|
ServerName sn = testUtil.getMiniHBaseCluster()
|
||||||
|
.getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
|
||||||
|
HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
|
||||||
|
|
||||||
|
//create junk file
|
||||||
|
HStore store = region.getStore(fam);
|
||||||
|
Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
|
||||||
|
Path junkFilePath = new Path(cfPath, junkFileName);
|
||||||
|
|
||||||
|
FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
|
||||||
|
junkFileOS.writeUTF("hello");
|
||||||
|
junkFileOS.close();
|
||||||
|
|
||||||
|
int storeFiles = store.getStorefilesCount();
|
||||||
|
assertTrue(storeFiles > 0);
|
||||||
|
|
||||||
|
//verify the file exist before the chore
|
||||||
|
assertTrue(store.getFileSystem().exists(junkFilePath));
|
||||||
|
|
||||||
|
//set a 5 sec ttl
|
||||||
|
rs.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "5000");
|
||||||
|
BrokenStoreFileCleaner cleaner = new BrokenStoreFileCleaner(15000000,
|
||||||
|
0, rs, rs.getConfiguration(), rs);
|
||||||
|
cleaner.chore();
|
||||||
|
//file is still present after chore run
|
||||||
|
assertTrue(store.getFileSystem().exists(junkFilePath));
|
||||||
|
Thread.sleep(5000);
|
||||||
|
cleaner.chore();
|
||||||
|
assertFalse(store.getFileSystem().exists(junkFilePath));
|
||||||
|
|
||||||
|
//verify no storefile got deleted
|
||||||
|
int currentStoreFiles = store.getStorefilesCount();
|
||||||
|
assertEquals(currentStoreFiles, storeFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Table createTableWithData(TableName tableName) throws IOException {
|
||||||
|
Table table = testUtil.createTable(tableName, fam);
|
||||||
|
try {
|
||||||
|
for (int i = 1; i < 10; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
table.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
testUtil.getAdmin().flush(tableName);
|
||||||
|
for (int i = 11; i < 20; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
table.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
testUtil.getAdmin().flush(tableName);
|
||||||
|
for (int i = 21; i < 30; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
table.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
testUtil.getAdmin().flush(tableName);
|
||||||
|
} catch (IOException e) {
|
||||||
|
table.close();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return table;
|
||||||
|
}
|
||||||
|
}
|
|
@ -128,13 +128,13 @@ public class TestCompactorMemLeak {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
|
protected List<Path> commitWriter(FileDetails fd,
|
||||||
CompactionRequestImpl request) throws IOException {
|
CompactionRequestImpl request) throws IOException {
|
||||||
HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer;
|
HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer;
|
||||||
Cell cell = writerImpl.getLastCell();
|
Cell cell = writerImpl.getLastCell();
|
||||||
// The cell should be backend with an KeyOnlyKeyValue.
|
// The cell should be backend with an KeyOnlyKeyValue.
|
||||||
IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue);
|
IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue);
|
||||||
return super.commitWriter(writer, fd, request);
|
return super.commitWriter(fd, request);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,6 @@ public class TestStoreFileTracker extends DefaultStoreFileTracker {
|
||||||
} else {
|
} else {
|
||||||
LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null.");
|
LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null.");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue