HBASE-19675 Miscellaneous HStore Class Improvements.
* Use HashSet instead of List for a variable which is only used for lookups * Remove logging code guards in favor of slf4j parameters * Use CollectionsUtils.isEmpty() consistently * Small check-style fixes
This commit is contained in:
parent
741466a882
commit
a47afc84cd
|
@ -49,6 +49,8 @@ import java.util.function.ToLongFunction;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.LongStream;
|
import java.util.stream.LongStream;
|
||||||
|
|
||||||
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
|
import org.apache.commons.collections4.IterableUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -256,8 +258,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
// used by ScanQueryMatcher
|
// used by ScanQueryMatcher
|
||||||
long timeToPurgeDeletes =
|
long timeToPurgeDeletes =
|
||||||
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
|
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
|
||||||
LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
|
LOG.trace("Time to purge deletes set to {}ms in store {}", timeToPurgeDeletes, this);
|
||||||
"ms in store " + this);
|
|
||||||
// Get TTL
|
// Get TTL
|
||||||
long ttl = determineTTLFromFamily(family);
|
long ttl = determineTTLFromFamily(family);
|
||||||
// Why not just pass a HColumnDescriptor in here altogether? Even if have
|
// Why not just pass a HColumnDescriptor in here altogether? Even if have
|
||||||
|
@ -289,7 +290,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
|
this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator, this,
|
||||||
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
|
this.getHRegion().getRegionServicesForStores(), inMemoryCompaction });
|
||||||
}
|
}
|
||||||
LOG.info("Memstore class name is " + className);
|
LOG.info("Memstore class name is {}", className);
|
||||||
this.offPeakHours = OffPeakHours.getInstance(conf);
|
this.offPeakHours = OffPeakHours.getInstance(conf);
|
||||||
|
|
||||||
// Setting up cache configuration for this family
|
// Setting up cache configuration for this family
|
||||||
|
@ -302,8 +303,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
this.compactionCheckMultiplier = conf.getInt(
|
this.compactionCheckMultiplier = conf.getInt(
|
||||||
COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
|
COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
|
||||||
if (this.compactionCheckMultiplier <= 0) {
|
if (this.compactionCheckMultiplier <= 0) {
|
||||||
LOG.error("Compaction check period multiplier must be positive, setting default: "
|
LOG.error("Compaction check period multiplier must be positive, setting default: {}",
|
||||||
+ DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
|
DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
|
||||||
this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
|
this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -317,7 +318,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
|
|
||||||
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
|
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
|
||||||
this.checksumType = getChecksumType(conf);
|
this.checksumType = getChecksumType(conf);
|
||||||
// initilize bytes per checksum
|
// Initialize bytes per checksum
|
||||||
this.bytesPerChecksum = getBytesPerChecksum(conf);
|
this.bytesPerChecksum = getBytesPerChecksum(conf);
|
||||||
flushRetriesNumber = conf.getInt(
|
flushRetriesNumber = conf.getInt(
|
||||||
"hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
|
"hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
|
||||||
|
@ -519,8 +520,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
|
private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
|
||||||
if (files == null || files.isEmpty()) {
|
if (CollectionUtils.isEmpty(files)) {
|
||||||
return new ArrayList<>();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
// initialize the thread pool for opening store files in parallel..
|
// initialize the thread pool for opening store files in parallel..
|
||||||
ThreadPoolExecutor storeFileOpenerThreadPool =
|
ThreadPoolExecutor storeFileOpenerThreadPool =
|
||||||
|
@ -545,9 +546,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
long length = storeFile.getReader().length();
|
long length = storeFile.getReader().length();
|
||||||
this.storeSize += length;
|
this.storeSize += length;
|
||||||
this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
|
this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("loaded {}", storeFile);
|
||||||
LOG.debug("loaded " + storeFile.toStringDetailed());
|
|
||||||
}
|
|
||||||
results.add(storeFile);
|
results.add(storeFile);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -565,9 +564,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
|
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
|
||||||
for (HStoreFile file : results) {
|
for (HStoreFile file : results) {
|
||||||
try {
|
try {
|
||||||
if (file != null) file.closeStoreFile(evictOnClose);
|
if (file != null) {
|
||||||
|
file.closeStoreFile(evictOnClose);
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(e.getMessage());
|
LOG.warn("Could not close store file", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw ioe;
|
throw ioe;
|
||||||
|
@ -745,11 +746,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
Preconditions.checkState(lk.isPresent(), "Last key can not be null");
|
Preconditions.checkState(lk.isPresent(), "Last key can not be null");
|
||||||
byte[] lastKey = CellUtil.cloneRow(lk.get());
|
byte[] lastKey = CellUtil.cloneRow(lk.get());
|
||||||
|
|
||||||
LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) +
|
if (LOG.isDebugEnabled()) {
|
||||||
" last=" + Bytes.toStringBinary(lastKey));
|
LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) +
|
||||||
LOG.debug("Region bounds: first=" +
|
" last=" + Bytes.toStringBinary(lastKey));
|
||||||
Bytes.toStringBinary(getRegionInfo().getStartKey()) +
|
LOG.debug("Region bounds: first=" +
|
||||||
" last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
|
Bytes.toStringBinary(getRegionInfo().getStartKey()) +
|
||||||
|
" last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
|
||||||
|
}
|
||||||
|
|
||||||
if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
|
if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
|
||||||
throw new WrongRegionException(
|
throw new WrongRegionException(
|
||||||
|
@ -759,13 +762,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
|
|
||||||
if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
|
if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
|
||||||
HConstants.DEFAULT_MAX_FILE_SIZE)) {
|
HConstants.DEFAULT_MAX_FILE_SIZE)) {
|
||||||
LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " +
|
LOG.warn("Trying to bulk load hfile " + srcPath + " with size: " +
|
||||||
reader.length() + " bytes can be problematic as it may lead to oversplitting.");
|
reader.length() + " bytes can be problematic as it may lead to oversplitting.");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (verifyBulkLoads) {
|
if (verifyBulkLoads) {
|
||||||
long verificationStartTime = EnvironmentEdgeManager.currentTime();
|
long verificationStartTime = EnvironmentEdgeManager.currentTime();
|
||||||
LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
|
LOG.info("Full verification started for bulk load hfile: {}", srcPath);
|
||||||
Cell prevCell = null;
|
Cell prevCell = null;
|
||||||
HFileScanner scanner = reader.getScanner(false, false, false);
|
HFileScanner scanner = reader.getScanner(false, false, false);
|
||||||
scanner.seekTo();
|
scanner.seekTo();
|
||||||
|
@ -828,8 +831,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
HStoreFile sf = createStoreFileAndReader(dstPath);
|
HStoreFile sf = createStoreFileAndReader(dstPath);
|
||||||
bulkLoadHFile(sf);
|
bulkLoadHFile(sf);
|
||||||
|
|
||||||
LOG.info("Successfully loaded store file " + srcPath + " into store " + this
|
LOG.info("Successfully loaded store file {} into store {} (new location: {})",
|
||||||
+ " (new location: " + dstPath + ")");
|
srcPath, this, dstPath);
|
||||||
|
|
||||||
return dstPath;
|
return dstPath;
|
||||||
}
|
}
|
||||||
|
@ -880,7 +883,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
Collection<HStoreFile> compactedfiles =
|
Collection<HStoreFile> compactedfiles =
|
||||||
storeEngine.getStoreFileManager().clearCompactedFiles();
|
storeEngine.getStoreFileManager().clearCompactedFiles();
|
||||||
// clear the compacted files
|
// clear the compacted files
|
||||||
if (compactedfiles != null && !compactedfiles.isEmpty()) {
|
if (CollectionUtils.isNotEmpty(compactedfiles)) {
|
||||||
removeCompactedfiles(compactedfiles);
|
removeCompactedfiles(compactedfiles);
|
||||||
}
|
}
|
||||||
if (!result.isEmpty()) {
|
if (!result.isEmpty()) {
|
||||||
|
@ -924,7 +927,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
}
|
}
|
||||||
if (ioe != null) throw ioe;
|
if (ioe != null) throw ioe;
|
||||||
}
|
}
|
||||||
LOG.info("Closed " + this);
|
LOG.info("Closed {}", this);
|
||||||
return result;
|
return result;
|
||||||
} finally {
|
} finally {
|
||||||
this.lock.writeLock().unlock();
|
this.lock.writeLock().unlock();
|
||||||
|
@ -978,7 +981,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
}
|
}
|
||||||
return pathNames;
|
return pathNames;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
|
LOG.warn("Failed validating store file {}, retrying num={}", lastPathName, i, e);
|
||||||
if (e instanceof IOException) {
|
if (e instanceof IOException) {
|
||||||
lastException = (IOException) e;
|
lastException = (IOException) e;
|
||||||
} else {
|
} else {
|
||||||
|
@ -986,7 +989,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed flushing store file, retrying num=" + i, e);
|
LOG.warn("Failed flushing store file, retrying num={}", i, e);
|
||||||
lastException = e;
|
lastException = e;
|
||||||
}
|
}
|
||||||
if (lastException != null && i < (flushRetriesNumber - 1)) {
|
if (lastException != null && i < (flushRetriesNumber - 1)) {
|
||||||
|
@ -1518,7 +1521,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
// being in the store's folder) or they may be missing due to a compaction.
|
// being in the store's folder) or they may be missing due to a compaction.
|
||||||
|
|
||||||
String familyName = this.getColumnFamilyName();
|
String familyName = this.getColumnFamilyName();
|
||||||
List<String> inputFiles = new ArrayList<>(compactionInputs.size());
|
Set<String> inputFiles = new HashSet<>();
|
||||||
for (String compactionInput : compactionInputs) {
|
for (String compactionInput : compactionInputs) {
|
||||||
Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
|
Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
|
||||||
inputFiles.add(inputPath.getName());
|
inputFiles.add(inputPath.getName());
|
||||||
|
@ -1631,7 +1634,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
|
for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
|
||||||
// TODO: what are these reader checks all over the place?
|
// TODO: what are these reader checks all over the place?
|
||||||
if (sf.getReader() == null) {
|
if (sf.getReader() == null) {
|
||||||
LOG.debug("StoreFile " + sf + " has null Reader");
|
LOG.debug("StoreFile {} has null Reader", sf);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1715,16 +1718,20 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
this.lock.readLock().unlock();
|
this.lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
|
if (LOG.isDebugEnabled()) {
|
||||||
+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
|
LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
|
||||||
+ (request.isAllFiles() ? " (all files)" : ""));
|
+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
|
||||||
|
+ (request.isAllFiles() ? " (all files)" : ""));
|
||||||
|
}
|
||||||
this.region.reportCompactionRequestStart(request.isMajor());
|
this.region.reportCompactionRequestStart(request.isMajor());
|
||||||
return Optional.of(compaction);
|
return Optional.of(compaction);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Adds the files to compacting files. filesCompacting must be locked. */
|
/** Adds the files to compacting files. filesCompacting must be locked. */
|
||||||
private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
|
private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
|
||||||
if (filesToAdd == null) return;
|
if (CollectionUtils.isEmpty(filesToAdd)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
// Check that we do not try to compact the same StoreFile twice.
|
// Check that we do not try to compact the same StoreFile twice.
|
||||||
if (!Collections.disjoint(filesCompacting, filesToAdd)) {
|
if (!Collections.disjoint(filesCompacting, filesToAdd)) {
|
||||||
Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
|
Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
|
||||||
|
@ -1736,7 +1743,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
private void removeUnneededFiles() throws IOException {
|
private void removeUnneededFiles() throws IOException {
|
||||||
if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
|
if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
|
||||||
if (getColumnFamilyDescriptor().getMinVersions() > 0) {
|
if (getColumnFamilyDescriptor().getMinVersions() > 0) {
|
||||||
LOG.debug("Skipping expired store file removal due to min version being " +
|
LOG.debug("Skipping expired store file removal due to min version being {}",
|
||||||
getColumnFamilyDescriptor().getMinVersions());
|
getColumnFamilyDescriptor().getMinVersions());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1754,9 +1761,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
} finally {
|
} finally {
|
||||||
this.lock.readLock().unlock();
|
this.lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
if (delSfs == null || delSfs.isEmpty()) return;
|
|
||||||
|
|
||||||
Collection<HStoreFile> newFiles = new ArrayList<>(); // No new files.
|
if (CollectionUtils.isEmpty(delSfs)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
|
||||||
writeCompactionWalRecord(delSfs, newFiles);
|
writeCompactionWalRecord(delSfs, newFiles);
|
||||||
replaceStoreFiles(delSfs, newFiles);
|
replaceStoreFiles(delSfs, newFiles);
|
||||||
completeCompaction(delSfs);
|
completeCompaction(delSfs);
|
||||||
|
@ -1790,7 +1800,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
try {
|
try {
|
||||||
storeFile = createStoreFileAndReader(path);
|
storeFile = createStoreFileAndReader(path);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Failed to open store file : " + path + ", keeping it in tmp location", e);
|
LOG.error("Failed to open store file : {}, keeping it in tmp location", path, e);
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
if (storeFile != null) {
|
if (storeFile != null) {
|
||||||
|
@ -1821,7 +1831,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
|
for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
|
||||||
StoreFileReader r = hsf.getReader();
|
StoreFileReader r = hsf.getReader();
|
||||||
if (r == null) {
|
if (r == null) {
|
||||||
LOG.warn("StoreFile " + hsf + " has a null Reader");
|
LOG.warn("StoreFile {} has a null Reader", hsf);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
this.storeSize += r.length();
|
this.storeSize += r.length();
|
||||||
|
@ -1849,9 +1859,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
// Not split-able if we find a reference store file present in the store.
|
// Not split-able if we find a reference store file present in the store.
|
||||||
boolean result = !hasReferences();
|
boolean result = !hasReferences();
|
||||||
if (!result) {
|
if (!result) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Not splittable; has references: {}", this);
|
||||||
LOG.trace("Not splittable; has references: " + this);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1869,14 +1877,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
assert !this.getRegionInfo().isMetaRegion();
|
assert !this.getRegionInfo().isMetaRegion();
|
||||||
// Not split-able if we find a reference store file present in the store.
|
// Not split-able if we find a reference store file present in the store.
|
||||||
if (hasReferences()) {
|
if (hasReferences()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Not splittable; has references: {}", this);
|
||||||
LOG.trace("Not splittable; has references: " + this);
|
|
||||||
}
|
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
return this.storeEngine.getStoreFileManager().getSplitPoint();
|
return this.storeEngine.getStoreFileManager().getSplitPoint();
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
LOG.warn("Failed getting store size for " + this, e);
|
LOG.warn("Failed getting store size for {}", this, e);
|
||||||
} finally {
|
} finally {
|
||||||
this.lock.readLock().unlock();
|
this.lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
@ -1959,10 +1965,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
for (HStoreFile file : getStorefiles()) {
|
for (HStoreFile file : getStorefiles()) {
|
||||||
name2File.put(file.getFileInfo().getActiveFileName(), file);
|
name2File.put(file.getFileInfo().getActiveFileName(), file);
|
||||||
}
|
}
|
||||||
if (getCompactedFiles() != null) {
|
Collection<HStoreFile> compactedFiles = getCompactedFiles();
|
||||||
for (HStoreFile file : getCompactedFiles()) {
|
for (HStoreFile file : IterableUtils.emptyIfNull(compactedFiles)) {
|
||||||
name2File.put(file.getFileInfo().getActiveFileName(), file);
|
name2File.put(file.getFileInfo().getActiveFileName(), file);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
List<HStoreFile> filesToReopen = new ArrayList<>();
|
List<HStoreFile> filesToReopen = new ArrayList<>();
|
||||||
for (KeyValueScanner kvs : currentFileScanners) {
|
for (KeyValueScanner kvs : currentFileScanners) {
|
||||||
|
@ -2000,7 +2005,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
private LongStream getStoreFileAgeStream() {
|
private LongStream getStoreFileAgeStream() {
|
||||||
return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
|
return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
|
||||||
if (sf.getReader() == null) {
|
if (sf.getReader() == null) {
|
||||||
LOG.warn("StoreFile " + sf + " has a null Reader");
|
LOG.warn("StoreFile {} has a null Reader", sf);
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
return true;
|
return true;
|
||||||
|
@ -2056,7 +2061,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
private long getStorefilesSize(Predicate<HStoreFile> predicate) {
|
private long getStorefilesSize(Predicate<HStoreFile> predicate) {
|
||||||
return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
|
return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
|
||||||
if (sf.getReader() == null) {
|
if (sf.getReader() == null) {
|
||||||
LOG.warn("StoreFile " + sf + " has a null Reader");
|
LOG.warn("StoreFile {} has a null Reader", sf);
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
return true;
|
return true;
|
||||||
|
@ -2067,7 +2072,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
private long getStoreFileFieldSize(ToLongFunction<StoreFileReader> f) {
|
private long getStoreFileFieldSize(ToLongFunction<StoreFileReader> f) {
|
||||||
return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
|
return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> {
|
||||||
if (sf.getReader() == null) {
|
if (sf.getReader() == null) {
|
||||||
LOG.warn("StoreFile " + sf + " has a null Reader");
|
LOG.warn("StoreFile {} has a null Reader", sf);
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
return true;
|
return true;
|
||||||
|
@ -2197,7 +2202,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean commit(MonitoredTask status) throws IOException {
|
public boolean commit(MonitoredTask status) throws IOException {
|
||||||
if (this.tempFiles == null || this.tempFiles.isEmpty()) {
|
if (CollectionUtils.isEmpty(this.tempFiles)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size());
|
List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size());
|
||||||
|
@ -2207,7 +2212,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
outputFileSize += sf.getReader().length();
|
outputFileSize += sf.getReader().length();
|
||||||
storeFiles.add(sf);
|
storeFiles.add(sf);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
LOG.error("Failed to commit store file " + storeFilePath, ex);
|
LOG.error("Failed to commit store file {}", storeFilePath, ex);
|
||||||
// Try to delete the files we have committed before.
|
// Try to delete the files we have committed before.
|
||||||
for (HStoreFile sf : storeFiles) {
|
for (HStoreFile sf : storeFiles) {
|
||||||
Path pathToDelete = sf.getPath();
|
Path pathToDelete = sf.getPath();
|
||||||
|
@ -2215,7 +2220,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
sf.deleteStoreFile();
|
sf.deleteStoreFile();
|
||||||
} catch (IOException deleteEx) {
|
} catch (IOException deleteEx) {
|
||||||
LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, "
|
LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we committed, "
|
||||||
+ "halting " + pathToDelete, ex);
|
+ "halting {}", pathToDelete, ex);
|
||||||
Runtime.getRuntime().halt(1);
|
Runtime.getRuntime().halt(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2288,10 +2293,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void abort() throws IOException {
|
public void abort() throws IOException {
|
||||||
if (snapshot == null) {
|
if (snapshot != null) {
|
||||||
return;
|
HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
|
||||||
}
|
}
|
||||||
HStore.this.updateStorefiles(new ArrayList<>(0), snapshot.getId());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2461,19 +2465,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
try {
|
try {
|
||||||
Collection<HStoreFile> compactedfiles =
|
Collection<HStoreFile> compactedfiles =
|
||||||
this.getStoreEngine().getStoreFileManager().getCompactedfiles();
|
this.getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
if (compactedfiles != null && compactedfiles.size() != 0) {
|
if (CollectionUtils.isNotEmpty(compactedfiles)) {
|
||||||
// Do a copy under read lock
|
// Do a copy under read lock
|
||||||
copyCompactedfiles = new ArrayList<>(compactedfiles);
|
copyCompactedfiles = new ArrayList<>(compactedfiles);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("No compacted files to archive");
|
||||||
LOG.trace("No compacted files to archive");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
lock.readLock().unlock();
|
lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
|
if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
|
||||||
removeCompactedfiles(copyCompactedfiles);
|
removeCompactedfiles(copyCompactedfiles);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -2494,25 +2495,21 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
try {
|
try {
|
||||||
StoreFileReader r = file.getReader();
|
StoreFileReader r = file.getReader();
|
||||||
if (r == null) {
|
if (r == null) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("The file {} was closed but still not archived", file);
|
||||||
LOG.debug("The file " + file + " was closed but still not archived.");
|
|
||||||
}
|
|
||||||
filesToRemove.add(file);
|
filesToRemove.add(file);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (file.isCompactedAway() && !file.isReferencedInReads()) {
|
if (file.isCompactedAway() && !file.isReferencedInReads()) {
|
||||||
// Even if deleting fails we need not bother as any new scanners won't be
|
// Even if deleting fails we need not bother as any new scanners won't be
|
||||||
// able to use the compacted file as the status is already compactedAway
|
// able to use the compacted file as the status is already compactedAway
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Closing and archiving the file {}", file);
|
||||||
LOG.trace("Closing and archiving the file " + file.getPath());
|
|
||||||
}
|
|
||||||
r.close(true);
|
r.close(true);
|
||||||
// Just close and return
|
// Just close and return
|
||||||
filesToRemove.add(file);
|
filesToRemove.add(file);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error(
|
LOG.error("Exception while trying to close the compacted store file {}",
|
||||||
"Exception while trying to close the compacted store file " + file.getPath().getName());
|
file.getPath(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2521,9 +2518,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
// The secondary region does not move the files to archive. Any active reads from
|
// The secondary region does not move the files to archive. Any active reads from
|
||||||
// the secondary region will still work because the file as such has active readers on it.
|
// the secondary region will still work because the file as such has active readers on it.
|
||||||
if (!filesToRemove.isEmpty()) {
|
if (!filesToRemove.isEmpty()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Moving the files {} to archive", filesToRemove);
|
||||||
LOG.debug("Moving the files " + filesToRemove + " to archive");
|
|
||||||
}
|
|
||||||
// Only if this is successful it has to be removed
|
// Only if this is successful it has to be removed
|
||||||
try {
|
try {
|
||||||
this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove);
|
this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(), filesToRemove);
|
||||||
|
@ -2561,9 +2556,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
}
|
}
|
||||||
|
|
||||||
private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException {
|
private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException {
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Clearing the compacted file {} from this store", filesToRemove);
|
||||||
LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
lock.writeLock().lock();
|
lock.writeLock().lock();
|
||||||
this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
|
this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
|
||||||
|
|
Loading…
Reference in New Issue