HBASE-7603 refactor storefile management in HStore in order to support things like LevelDB-style compactions (Sergey)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1444262 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-02-08 22:22:08 +00:00
parent 68fb6c6877
commit df04846da5
17 changed files with 442 additions and 215 deletions

View File

@ -0,0 +1,132 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
/**
* Default implementation of StoreFileManager. Not thread-safe.
*/
@InterfaceAudience.Private
class DefaultStoreFileManager implements StoreFileManager {
static final Log LOG = LogFactory.getLog(DefaultStoreFileManager.class);
private final KVComparator kvComparator;
/**
* List of store files inside this store. This is an immutable list that
* is atomically replaced when its contents change.
*/
private volatile ImmutableList<StoreFile> storefiles = null;
public DefaultStoreFileManager(KVComparator kvComparator) {
this.kvComparator = kvComparator;
}
@Override
public void loadFiles(List<StoreFile> storeFiles) {
sortAndSetStoreFiles(storeFiles);
}
@Override
public final Collection<StoreFile> getStorefiles() {
return storefiles;
}
@Override
public void insertNewFile(StoreFile sf) {
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
newFiles.add(sf);
sortAndSetStoreFiles(newFiles);
}
@Override
public ImmutableCollection<StoreFile> clearFiles() {
ImmutableList<StoreFile> result = storefiles;
storefiles = ImmutableList.of();
return result;
}
@Override
public final int getStorefileCount() {
return storefiles.size();
}
@Override
public void addCompactionResults(
Collection<StoreFile> compactedFiles, Collection<StoreFile> results) {
ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
newStoreFiles.removeAll(compactedFiles);
if (!results.isEmpty()) {
newStoreFiles.addAll(results);
}
sortAndSetStoreFiles(newStoreFiles);
}
@Override
public final Iterator<StoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
return new ArrayList<StoreFile>(Lists.reverse(this.storefiles)).iterator();
}
@Override
public Iterator<StoreFile> updateCandidateFilesForRowKeyBefore(
Iterator<StoreFile> candidateFiles, final KeyValue targetKey, final KeyValue candidate) {
// Default store has nothing useful to do here.
// TODO: move this comment when implementing Level:
// Level store can trim the list by range, removing all the files which cannot have
// any useful candidates less than "candidate".
return candidateFiles;
}
@Override
public final byte[] getSplitPoint() throws IOException {
if (this.storefiles.isEmpty()) {
return null;
}
return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator);
}
@Override
public final Collection<StoreFile> getFilesForScanOrGet(boolean isGet,
byte[] startRow, byte[] stopRow) {
// We cannot provide any useful input and already have the files sorted by seqNum.
return getStorefiles();
}
private void sortAndSetStoreFiles(List<StoreFile> storeFiles) {
Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
storefiles = ImmutableList.copyOf(storeFiles);
}
}

View File

@ -671,10 +671,7 @@ public class HRegion implements HeapSize { // , Writable{
*/
public boolean hasReferences() {
for (Store store : this.stores.values()) {
for (StoreFile sf : store.getStorefiles()) {
// Found a reference, return.
if (sf.isReference()) return true;
}
if (store.hasReferences()) return true;
}
return false;
}
@ -1026,24 +1023,22 @@ public class HRegion implements HeapSize { // , Writable{
ThreadPoolExecutor storeCloserThreadPool =
getStoreOpenAndCloseThreadPool("StoreCloserThread-"
+ this.regionInfo.getRegionNameAsString());
CompletionService<ImmutableList<StoreFile>> completionService =
new ExecutorCompletionService<ImmutableList<StoreFile>>(
storeCloserThreadPool);
CompletionService<Collection<StoreFile>> completionService =
new ExecutorCompletionService<Collection<StoreFile>>(storeCloserThreadPool);
// close each store in parallel
for (final Store store : stores.values()) {
completionService
.submit(new Callable<ImmutableList<StoreFile>>() {
public ImmutableList<StoreFile> call() throws IOException {
.submit(new Callable<Collection<StoreFile>>() {
public Collection<StoreFile> call() throws IOException {
return store.close();
}
});
}
try {
for (int i = 0; i < stores.size(); i++) {
Future<ImmutableList<StoreFile>> future = completionService
.take();
ImmutableList<StoreFile> storeFileList = future.get();
Future<Collection<StoreFile>> future = completionService.take();
Collection<StoreFile> storeFileList = future.get();
result.addAll(storeFileList);
}
} catch (InterruptedException e) {
@ -3042,8 +3037,7 @@ public class HRegion implements HeapSize { // , Writable{
throw new IllegalArgumentException("No column family : " +
new String(column) + " available");
}
List<StoreFile> storeFiles = store.getStorefiles();
for (StoreFile storeFile: storeFiles) {
for (StoreFile storeFile: store.getStorefiles()) {
storeFileNames.add(storeFile.getPath().toString());
}
}

View File

@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.SortedSet;
@ -78,6 +79,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@ -132,12 +134,7 @@ public class HStore implements Store {
private ScanInfo scanInfo;
/*
* List of store files inside this store. This is an immutable list that
* is atomically replaced when its contents change.
*/
private volatile ImmutableList<StoreFile> storefiles = null;
private StoreFileManager storeFileManager;
final List<StoreFile> filesCompacting = Lists.newArrayList();
// All access must be synchronized.
@ -221,7 +218,9 @@ public class HStore implements Store {
HStore.closeCheckInterval = conf.getInt(
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
}
this.storefiles = sortAndClone(loadStoreFiles());
this.storeFileManager = new DefaultStoreFileManager(this.comparator);
this.storeFileManager.loadFiles(loadStoreFiles());
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
this.checksumType = getChecksumType(conf);
@ -341,7 +340,7 @@ public class HStore implements Store {
}
/**
* @return The maximum sequence id in all store files.
* @return The maximum sequence id in all store files. Used for log replay.
*/
long getMaxSequenceId(boolean includeBulkFiles) {
return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
@ -529,8 +528,8 @@ public class HStore implements Store {
* @return All store files.
*/
@Override
public List<StoreFile> getStorefiles() {
return this.storefiles;
public Collection<StoreFile> getStorefiles() {
return this.storeFileManager.getStorefiles();
}
@Override
@ -633,11 +632,9 @@ public class HStore implements Store {
// Append the new storefile into the list
this.lock.writeLock().lock();
try {
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
newFiles.add(sf);
this.storefiles = sortAndClone(newFiles);
this.storeFileManager.insertNewFile(sf);
} finally {
// We need the lock, as long as we are updating the storefiles
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
@ -660,13 +657,11 @@ public class HStore implements Store {
}
@Override
public ImmutableList<StoreFile> close() throws IOException {
public ImmutableCollection<StoreFile> close() throws IOException {
this.lock.writeLock().lock();
try {
ImmutableList<StoreFile> result = storefiles;
// Clear so metrics doesn't find them.
storefiles = ImmutableList.of();
ImmutableCollection<StoreFile> result = storeFileManager.clearFiles();
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
@ -963,7 +958,7 @@ public class HStore implements Store {
}
/*
* Change storefiles adding into place the Reader produced by this new flush.
* Change storeFiles adding into place the Reader produced by this new flush.
* @param sf
* @param set That was used to make the passed file <code>p</code>.
* @throws IOException
@ -974,13 +969,10 @@ public class HStore implements Store {
throws IOException {
this.lock.writeLock().lock();
try {
ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
newList.add(sf);
storefiles = sortAndClone(newList);
this.storeFileManager.insertNewFile(sf);
this.memstore.clearSnapshot(set);
} finally {
// We need the lock, as long as we are updating the storefiles
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
@ -1010,14 +1002,13 @@ public class HStore implements Store {
* @return all scanners for this store
*/
protected List<KeyValueScanner> getScanners(boolean cacheBlocks,
boolean isGet,
boolean isCompaction,
ScanQueryMatcher matcher) throws IOException {
List<StoreFile> storeFiles;
boolean isGet, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
byte[] stopRow) throws IOException {
Collection<StoreFile> storeFilesToScan;
List<KeyValueScanner> memStoreScanners;
this.lock.readLock().lock();
try {
storeFiles = this.getStorefiles();
storeFilesToScan = this.storeFileManager.getFilesForScanOrGet(isGet, startRow, stopRow);
memStoreScanners = this.memstore.getScanners();
} finally {
this.lock.readLock().unlock();
@ -1029,7 +1020,7 @@ public class HStore implements Store {
// but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end.
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher);
.getScannersForStoreFiles(storeFilesToScan, cacheBlocks, isGet, isCompaction, matcher);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);
@ -1149,15 +1140,21 @@ public class HStore implements Store {
return sfs;
}
@Override
public void compactRecentForTesting(int N) throws IOException {
/**
* This method tries to compact N recent files for testing.
* Note that because compacting "recent" files only makes sense for some policies,
* e.g. the default one, it assumes default policy is used. It doesn't use policy,
* but instead makes a compaction candidate list by itself.
* @param N Number of files.
*/
public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
List<StoreFile> filesToCompact;
boolean isMajor;
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
filesToCompact = Lists.newArrayList(storefiles);
filesToCompact = Lists.newArrayList(storeFileManager.getStorefiles());
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
@ -1172,7 +1169,7 @@ public class HStore implements Store {
}
filesToCompact = filesToCompact.subList(count - N, count);
isMajor = (filesToCompact.size() == storefiles.size());
isMajor = (filesToCompact.size() == storeFileManager.getStorefileCount());
filesCompacting.addAll(filesToCompact);
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
}
@ -1200,7 +1197,7 @@ public class HStore implements Store {
@Override
public boolean hasReferences() {
return StoreUtils.hasReferences(this.storefiles);
return StoreUtils.hasReferences(this.storeFileManager.getStorefiles());
}
@Override
@ -1210,15 +1207,14 @@ public class HStore implements Store {
@Override
public boolean isMajorCompaction() throws IOException {
for (StoreFile sf : this.storefiles) {
for (StoreFile sf : this.storeFileManager.getStorefiles()) {
// TODO: what are these reader checks all over the place?
if (sf.getReader() == null) {
LOG.debug("StoreFile " + sf + " has null Reader");
return false;
}
}
List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
return compactionPolicy.isMajorCompaction(candidates);
return compactionPolicy.isMajorCompaction(this.storeFileManager.getStorefiles());
}
public CompactionRequest requestCompaction() throws IOException {
@ -1235,8 +1231,8 @@ public class HStore implements Store {
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
// candidates = all storefiles not already in compaction queue
List<StoreFile> candidates = Lists.newArrayList(storefiles);
// candidates = all StoreFiles not already in compaction queue
List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
@ -1280,9 +1276,8 @@ public class HStore implements Store {
filesCompacting.addAll(filesToCompact.getFilesToCompact());
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
// major compaction iff all StoreFiles are included
boolean isMajor =
(filesToCompact.getFilesToCompact().size() == this.storefiles.size());
(filesToCompact.getFilesToCompact().size() == this.getStorefilesCount());
if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false;
@ -1378,25 +1373,22 @@ public class HStore implements Store {
this.family.getBloomFilterType(), this.dataBlockEncoder);
result.createReader();
}
try {
this.lock.writeLock().lock();
try {
// Change this.storefiles so it reflects new state but do not
// Change this.storeFiles so it reflects new state but do not
// delete old store files until we have sent out notification of
// change in case old files are still being accessed by outstanding
// scanners.
ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
newStoreFiles.removeAll(compactedFiles);
filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
// If a StoreFile result, move it into place. May be null.
List<StoreFile> results = new ArrayList<StoreFile>(1);
if (result != null) {
newStoreFiles.add(result);
results.add(result);
}
this.storefiles = sortAndClone(newStoreFiles);
this.storeFileManager.addCompactionResults(compactedFiles, results);
filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
} finally {
// We need the lock, as long as we are updating the storefiles
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
// notifyChangeReadersObservers. See HBASE-4485 for a possible
// deadlock scenario that could have happened if continue to hold
@ -1423,7 +1415,7 @@ public class HStore implements Store {
// 4. Compute new store size
this.storeSize = 0L;
this.totalUncompressedBytes = 0L;
for (StoreFile hsf : this.storefiles) {
for (StoreFile hsf : this.storeFileManager.getStorefiles()) {
StoreFile.Reader r = hsf.getReader();
if (r == null) {
LOG.warn("StoreFile " + hsf + " has a null Reader");
@ -1435,21 +1427,6 @@ public class HStore implements Store {
return result;
}
public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
return newList;
}
// ////////////////////////////////////////////////////////////////////////////
// Accessors.
// (This is the only section that is directly useful!)
//////////////////////////////////////////////////////////////////////////////
@Override
public int getNumberOfStoreFiles() {
return this.storefiles.size();
}
/*
* @param wantedVersions How many versions were asked for.
* @return wantedVersions or this families' {@link HConstants#VERSIONS}.
@ -1486,10 +1463,18 @@ public class HStore implements Store {
// First go to the memstore. Pick up deletes and candidates.
this.memstore.getRowKeyAtOrBefore(state);
// Check if match, if we got a candidate on the asked for 'kv' row.
// Process each store file. Run through from newest to oldest.
for (StoreFile sf : Lists.reverse(storefiles)) {
// Update the candidate keys from the current map file
rowAtOrBeforeFromStoreFile(sf, state);
// Process each relevant store file. Run through from newest to oldest.
Iterator<StoreFile> sfIterator =
this.storeFileManager.getCandidateFilesForRowKeyBefore(state.getTargetKey());
while (sfIterator.hasNext()) {
StoreFile sf = sfIterator.next();
sfIterator.remove(); // Remove sf from iterator.
boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
if (haveNewCandidate) {
// TODO: we may have an optimization here which stops the search if we find exact match.
sfIterator = this.storeFileManager.updateCandidateFilesForRowKeyBefore(sfIterator,
state.getTargetKey(), state.getCandidate());
}
}
return state.getCandidate();
} finally {
@ -1502,22 +1487,23 @@ public class HStore implements Store {
* @param f
* @param state
* @throws IOException
* @return True iff the candidate has been updated in the state.
*/
private void rowAtOrBeforeFromStoreFile(final StoreFile f,
private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
final GetClosestRowBeforeTracker state)
throws IOException {
StoreFile.Reader r = f.getReader();
if (r == null) {
LOG.warn("StoreFile " + f + " has a null Reader");
return;
return false;
}
if (r.getEntries() == 0) {
LOG.warn("StoreFile " + f + " is a empty store file");
return;
return false;
}
// TODO: Cache these keys rather than make each time?
byte [] fk = r.getFirstKey();
if (fk == null) return;
if (fk == null) return false;
KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
byte [] lk = r.getLastKey();
KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
@ -1525,7 +1511,7 @@ public class HStore implements Store {
if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
// If last key in file is not of the target table, no candidates in this
// file. Return.
if (!state.isTargetTable(lastKV)) return;
if (!state.isTargetTable(lastKV)) return false;
// If the row we're looking for is past the end of file, set search key to
// last key. TODO: Cache last and first key rather than make each time.
firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
@ -1533,10 +1519,10 @@ public class HStore implements Store {
// Get a scanner that caches blocks and that uses pread.
HFileScanner scanner = r.getScanner(true, true, false);
// Seek scanner. If can't seek it, return.
if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
// If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
// Unlikely that there'll be an instance of actual first row in table.
if (walkForwardInSingleRow(scanner, firstOnRow, state)) return;
if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
// If here, need to start backing up.
while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
firstOnRow.getKeyLength())) {
@ -1546,10 +1532,11 @@ public class HStore implements Store {
// Make new first on row.
firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
// Seek scanner. If can't seek it, break.
if (!seekToScanner(scanner, firstOnRow, firstKV)) break;
if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
// If we find something, break;
if (walkForwardInSingleRow(scanner, firstOnRow, state)) break;
if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
}
return false;
}
/*
@ -1608,17 +1595,12 @@ public class HStore implements Store {
public boolean canSplit() {
this.lock.readLock().lock();
try {
// Not splitable if we find a reference store file present in the store.
for (StoreFile sf : storefiles) {
if (sf.isReference()) {
if (LOG.isDebugEnabled()) {
LOG.debug(sf + " is not splittable");
}
return false;
}
// Not split-able if we find a reference store file present in the store.
boolean result = !hasReferences();
if (!result && LOG.isDebugEnabled()) {
LOG.debug("Cannot split region due to reference files being there");
}
return true;
return result;
} finally {
this.lock.readLock().unlock();
}
@ -1628,64 +1610,14 @@ public class HStore implements Store {
public byte[] getSplitPoint() {
this.lock.readLock().lock();
try {
// sanity checks
if (this.storefiles.isEmpty()) {
return null;
}
// Should already be enforced by the split policy!
assert !this.region.getRegionInfo().isMetaRegion();
// Not splitable if we find a reference store file present in the store.
long maxSize = 0L;
StoreFile largestSf = null;
for (StoreFile sf : storefiles) {
if (sf.isReference()) {
// Should already be enforced since we return false in this case
assert false : "getSplitPoint() called on a region that can't split!";
return null;
}
StoreFile.Reader r = sf.getReader();
if (r == null) {
LOG.warn("Storefile " + sf + " Reader is null");
continue;
}
long size = r.length();
if (size > maxSize) {
// This is the largest one so far
maxSize = size;
largestSf = sf;
}
}
StoreFile.Reader r = largestSf.getReader();
if (r == null) {
LOG.warn("Storefile " + largestSf + " Reader is null");
// Not split-able if we find a reference store file present in the store.
if (hasReferences()) {
assert false : "getSplitPoint() called on a region that can't split!";
return null;
}
// Get first, last, and mid keys. Midkey is the key that starts block
// in middle of hfile. Has column and timestamp. Need to return just
// the row we want to split on as midkey.
byte [] midkey = r.midkey();
if (midkey != null) {
KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
byte [] fk = r.getFirstKey();
KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
byte [] lk = r.getLastKey();
KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
// if the midkey is the same as the first or last keys, then we cannot
// (ever) split this region.
if (this.comparator.compareRows(mk, firstKey) == 0 ||
this.comparator.compareRows(mk, lastKey) == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot split because midkey is the same as first or " +
"last row");
}
return null;
}
return mk.getRow();
}
return this.storeFileManager.getSplitPoint();
} catch(IOException e) {
LOG.warn("Failed getting store size for " + this, e);
} finally {
@ -1741,7 +1673,7 @@ public class HStore implements Store {
@Override
public int getStorefilesCount() {
return this.storefiles.size();
return this.storeFileManager.getStorefileCount();
}
@Override
@ -1752,7 +1684,7 @@ public class HStore implements Store {
@Override
public long getStorefilesSize() {
long size = 0;
for (StoreFile s: storefiles) {
for (StoreFile s: this.storeFileManager.getStorefiles()) {
StoreFile.Reader r = s.getReader();
if (r == null) {
LOG.warn("StoreFile " + s + " has a null Reader");
@ -1766,7 +1698,7 @@ public class HStore implements Store {
@Override
public long getStorefilesIndexSize() {
long size = 0;
for (StoreFile s: storefiles) {
for (StoreFile s: this.storeFileManager.getStorefiles()) {
StoreFile.Reader r = s.getReader();
if (r == null) {
LOG.warn("StoreFile " + s + " has a null Reader");
@ -1780,7 +1712,7 @@ public class HStore implements Store {
@Override
public long getTotalStaticIndexSize() {
long size = 0;
for (StoreFile s : storefiles) {
for (StoreFile s : this.storeFileManager.getStorefiles()) {
size += s.getReader().getUncompressedDataIndexSize();
}
return size;
@ -1789,7 +1721,7 @@ public class HStore implements Store {
@Override
public long getTotalStaticBloomSize() {
long size = 0;
for (StoreFile s : storefiles) {
for (StoreFile s : this.storeFileManager.getStorefiles()) {
StoreFile.Reader r = s.getReader();
size += r.getTotalBloomSize();
}
@ -1811,7 +1743,7 @@ public class HStore implements Store {
if(priority == Store.PRIORITY_USER) {
return Store.PRIORITY_USER;
} else {
return this.blockingStoreFileCount - this.storefiles.size();
return this.blockingStoreFileCount - this.storeFileManager.getStorefileCount();
}
}
@ -1923,7 +1855,8 @@ public class HStore implements Store {
@Override
public boolean needsCompaction() {
return compactionPolicy.needsCompaction(storefiles.size() - filesCompacting.size());
return compactionPolicy.needsCompaction(
this.storeFileManager.getStorefileCount() - filesCompacting.size());
}
@Override

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.NavigableSet;
@ -52,7 +53,7 @@ public interface Store extends HeapSize, StoreConfigInformation {
// General Accessors
public KeyValue.KVComparator getComparator();
public List<StoreFile> getStorefiles();
public Collection<StoreFile> getStorefiles();
/**
* Close all the readers We don't need to worry about subsequent requests because the HRegion
@ -60,7 +61,7 @@ public interface Store extends HeapSize, StoreConfigInformation {
* @return the {@link StoreFile StoreFiles} that were previously being used.
* @throws IOException on failure
*/
public ImmutableList<StoreFile> close() throws IOException;
public Collection<StoreFile> close() throws IOException;
/**
* Return a scanner for both the memstore and the HStore files. Assumes we are not in a
@ -208,11 +209,6 @@ public interface Store extends HeapSize, StoreConfigInformation {
*/
public HFileDataBlockEncoder getDataBlockEncoder();
/**
* @return the number of files in this store
*/
public int getNumberOfStoreFiles();
/** @return aggregate size of all HStores used in the last compaction */
public long getLastCompactSize();
@ -256,13 +252,6 @@ public interface Store extends HeapSize, StoreConfigInformation {
// Test-helper methods
/**
* Compact the most recent N files. Used in testing.
* @param N number of files to compact. Must be less than or equal to current number of files.
* @throws IOException on failure
*/
public void compactRecentForTesting(int N) throws IOException;
/**
* Used for tests.
* @return cache configuration for this Store.

View File

@ -942,6 +942,38 @@ public class StoreFile {
getReader().timeRangeTracker.minimumTimestamp;
}
/**
* Gets the approximate mid-point of this file that is optimal for use in splitting it.
* @param comparator Comparator used to compare KVs.
* @return The split point row, or null if splitting is not possible, or reader is null.
*/
byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
if (this.reader == null) {
LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
return null;
}
// Get first, last, and mid keys. Midkey is the key that starts block
// in middle of hfile. Has column and timestamp. Need to return just
// the row we want to split on as midkey.
byte [] midkey = this.reader.midkey();
if (midkey != null) {
KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
byte [] fk = this.reader.getFirstKey();
KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
byte [] lk = this.reader.getLastKey();
KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
// if the midkey is the same as the first or last keys, we cannot (ever) split this region.
if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot split because midkey is the same as first or last row");
}
return null;
}
return mk.getRow();
}
return null;
}
/**
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
* local because it is an implementation detail of the HBase regionserver.

View File

@ -0,0 +1,123 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import com.google.common.collect.ImmutableCollection;
/**
* Manages the store files and basic metadata about that that determines the logical structure
* (e.g. what files to return for scan, how to determine split point, and such).
* Does NOT affect the physical structure of files in HDFS.
* Example alternative structures - the default list of files by seqNum; levelDB one sorted
* by level and seqNum.
*
* Implementations are assumed to be not thread safe.
*/
@InterfaceAudience.Private
interface StoreFileManager {
/**
* Loads the initial store files into empty StoreFileManager.
* @param storeFiles The files to load.
*/
public abstract void loadFiles(List<StoreFile> storeFiles);
/**
* Adds new file, either for from MemStore flush or bulk insert, into the structure.
* @param sf New store file.
*/
public abstract void insertNewFile(StoreFile sf);
/**
* Adds compaction results into the structure.
* @param compactedFiles The input files for the compaction.
* @param results The resulting files for the compaction.
* @return The files that can be removed from storage. Usually,
*/
public abstract void addCompactionResults(
Collection<StoreFile> compactedFiles, Collection<StoreFile> results);
/**
* Clears all the files currently in use and returns them.
* @return The files previously in use.
*/
public abstract ImmutableCollection<StoreFile> clearFiles();
/**
* Gets the snapshot of the store files currently in use. Can be used for things like metrics
* and checks; should not assume anything about relations between store files in the list.
* @return The list of StoreFiles.
*/
public abstract Collection<StoreFile> getStorefiles();
/**
* Returns the number of files currently in use.
* @return The number of files.
*/
public abstract int getStorefileCount();
/**
* Gets the store files to scan for a Scan or Get request.
* @param isGet Whether it's a get.
* @param startRow Start row of the request.
* @param stopRow Stop row of the request.
* @return The list of files that are to be read for this request.
*/
public abstract Collection<StoreFile> getFilesForScanOrGet(boolean isGet,
byte[] startRow, byte[] stopRow);
/**
* Gets initial, full list of candidate store files to check for row-key-before.
* @param targetKey The key that is the basis of the search.
* @return The files that may have the key less than or equal to targetKey, in reverse
* order of new-ness, and preference for target key.
*/
public abstract Iterator<StoreFile> getCandidateFilesForRowKeyBefore(
KeyValue targetKey);
/**
* Updates the candidate list for finding row key before. Based on the list of candidates
* remaining to check from getCandidateFilesForRowKeyBefore, targetKey and current candidate,
* may trim and reorder the list to remove the files where a better candidate cannot be found.
* @param candidateFiles The candidate files not yet checked for better candidates - return
* value from {@link #getCandidateFilesForRowKeyBefore(KeyValue)},
* with some files already removed.
* @param targetKey The key to search for.
* @param candidate The current best candidate found.
* @return The list to replace candidateFiles.
*/
public abstract Iterator<StoreFile> updateCandidateFilesForRowKeyBefore(
Iterator<StoreFile> candidateFiles, KeyValue targetKey, KeyValue candidate);
/**
* Gets the split point for the split of this set of store files (approx. middle).
* @return The mid-point, or null if no split is possible.
* @throws IOException
*/
public abstract byte[] getSplitPoint() throws IOException;
}

View File

@ -206,7 +206,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
final boolean isCompaction = false;
return selectScannersFrom(store.getScanners(cacheBlocks, isGet,
isCompaction, matcher));
isCompaction, matcher, scan.getStartRow(), scan.getStopRow()));
}
/**

View File

@ -29,18 +29,19 @@ public class StoreUtils {
/**
* Creates a deterministic hash code for store file collection.
*/
public static Integer getDeterministicRandomSeed(final List<StoreFile> files) {
public static Integer getDeterministicRandomSeed(final Collection<StoreFile> files) {
if (files != null && !files.isEmpty()) {
return files.get(0).getPath().getName().hashCode();
return files.iterator().next().getPath().getName().hashCode();
}
return null;
}
/**
* Determines whether any files in the collection are references.
* @param files The files.
*/
public static boolean hasReferences(final Collection<StoreFile> files) {
if (files != null && files.size() > 0) {
if (files != null) {
for (StoreFile hsf: files) {
if (hsf.isReference()) {
return true;
@ -53,7 +54,7 @@ public class StoreUtils {
/**
* Gets lowest timestamp from candidate StoreFiles
*/
public static long getLowestTimestamp(final List<StoreFile> candidates)
public static long getLowestTimestamp(final Collection<StoreFile> candidates)
throws IOException {
long minTs = Long.MAX_VALUE;
for (StoreFile storeFile : candidates) {
@ -61,4 +62,24 @@ public class StoreUtils {
}
return minTs;
}
/**
* Gets the largest file (with reader) out of the list of files.
* @param candidates The files to choose from.
* @return The largest file; null if no file has a reader.
*/
static StoreFile getLargestFile(final Collection<StoreFile> candidates) {
long maxSize = -1L;
StoreFile largestSf = null;
for (StoreFile sf : candidates) {
StoreFile.Reader r = sf.getReader();
if (r == null) continue;
long size = r.length();
if (size > maxSize) {
maxSize = size;
largestSf = sf;
}
}
return largestSf;
}
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@ -66,7 +67,7 @@ public abstract class CompactionPolicy extends Configured {
* @return True if we should run a major compaction.
*/
public abstract boolean isMajorCompaction(
final List<StoreFile> filesToCompact) throws IOException;
final Collection<StoreFile> filesToCompact) throws IOException;
/**
* @param compactionSize Total size of some compaction

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Random;
@ -293,7 +294,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
public boolean isMajorCompaction(final Collection<StoreFile> filesToCompact)
throws IOException {
boolean result = false;
long mcTime = getNextMajorCompactTime(filesToCompact);
@ -308,7 +309,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
long cfTtl = this.store.getStoreFileTtl();
if (filesToCompact.size() == 1) {
// Single file
StoreFile sf = filesToCompact.get(0);
StoreFile sf = filesToCompact.iterator().next();
Long minTimestamp = sf.getMinimumTimestamp();
long oldest = (minTimestamp == null)
? Long.MIN_VALUE
@ -337,7 +338,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
return result;
}
public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
public long getNextMajorCompactTime(final Collection<StoreFile> filesToCompact) {
// default = 24hrs
long ret = comConf.getMajorCompactionPeriod();
if (ret > 0) {
@ -390,4 +391,4 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
}
return (currentHour >= startHour || currentHour < endHour);
}
}
}

View File

@ -4807,11 +4807,11 @@ public class TestFromClientSide {
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
// compact, net minus two blocks, two hits, no misses
System.out.println("Compacting");
assertEquals(2, store.getNumberOfStoreFiles());
assertEquals(2, store.getStorefilesCount());
store.triggerMajorCompaction();
region.compactStores();
waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
assertEquals(1, store.getNumberOfStoreFiles());
assertEquals(1, store.getStorefilesCount());
expectedBlockCount -= 2; // evicted two blocks, cached none
assertEquals(expectedBlockCount, cache.getBlockCount());
expectedBlockHits += 2;
@ -4832,12 +4832,12 @@ public class TestFromClientSide {
throws InterruptedException {
long start = System.currentTimeMillis();
while (start + timeout > System.currentTimeMillis() &&
store.getNumberOfStoreFiles() != count) {
store.getStorefilesCount() != count) {
Thread.sleep(100);
}
System.out.println("start=" + start + ", now=" +
System.currentTimeMillis() + ", cur=" + store.getNumberOfStoreFiles());
assertEquals(count, store.getNumberOfStoreFiles());
System.currentTimeMillis() + ", cur=" + store.getStorefilesCount());
assertEquals(count, store.getStorefilesCount());
}
@Test

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -151,7 +152,8 @@ public class TestScannerSelectionUsingTTL {
// Exercise both compaction codepaths.
if (explicitCompaction) {
region.getStore(FAMILY_BYTES).compactRecentForTesting(totalNumFiles);
HStore store = (HStore)region.getStore(FAMILY_BYTES);
store.compactRecentForTestingAssumingDefaultPolicy(totalNumFiles);
} else {
region.compactStores();
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
import java.util.TreeMap;
import java.util.List;
@ -195,7 +196,7 @@ public class TestLoadIncrementalHFiles {
loader.doBulkLoad(dir, table);
// Get the store files
List<StoreFile> files = util.getHBaseCluster().
Collection<StoreFile> files = util.getHBaseCluster().
getRegions(TABLE).get(0).getStore(FAMILY).getStorefiles();
for (StoreFile file: files) {
// the sequenceId gets initialized during createReader

View File

@ -23,6 +23,7 @@ import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -314,7 +315,7 @@ public class TestCompaction extends HBaseTestCase {
// ensure that major compaction time is deterministic
DefaultCompactionPolicy c = (DefaultCompactionPolicy)s.compactionPolicy;
List<StoreFile> storeFiles = s.getStorefiles();
Collection<StoreFile> storeFiles = s.getStorefiles();
long mcTime = c.getNextMajorCompactTime(storeFiles);
for (int i = 0; i < 10; ++i) {
assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles));
@ -439,7 +440,7 @@ public class TestCompaction extends HBaseTestCase {
Store store2 = this.r.stores.get(fam2);
int numFiles1 = store2.getStorefiles().size();
assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
store2.compactRecentForTesting(compactionThreshold); // = 3
((HStore)store2).compactRecentForTestingAssumingDefaultPolicy(compactionThreshold); // = 3
int numFiles2 = store2.getStorefiles().size();
// Check that we did compact
assertTrue("Number of store files should go down", numFiles1 > numFiles2);
@ -587,7 +588,7 @@ public class TestCompaction extends HBaseTestCase {
}
HStore store = (HStore) r.getStore(COLUMN_FAMILY);
List<StoreFile> storeFiles = store.getStorefiles();
Collection<StoreFile> storeFiles = store.getStorefiles();
Compactor tool = store.compactionPolicy.getCompactor();
List<Path> newFiles =

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -3256,7 +3257,7 @@ public class TestHRegion extends HBaseTestCase {
}
//before compaction
HStore store = (HStore) region.getStore(fam1);
List<StoreFile> storeFiles = store.getStorefiles();
Collection<StoreFile> storeFiles = store.getStorefiles();
for (StoreFile storefile : storeFiles) {
StoreFile.Reader reader = storefile.getReader();
reader.loadFileInfo();

View File

@ -163,16 +163,9 @@ public class TestSplitTransaction {
* Pass a reference store
*/
@Test public void testPrepareWithRegionsWithReference() throws IOException {
// create a mock that will act as a reference StoreFile
StoreFile storeFileMock = Mockito.mock(StoreFile.class);
when(storeFileMock.isReference()).thenReturn(true);
// add the mock to the parent stores
HStore storeMock = Mockito.mock(HStore.class);
List<StoreFile> storeFileList = new ArrayList<StoreFile>(1);
storeFileList.add(storeFileMock);
when(storeMock.getStorefiles()).thenReturn(storeFileList);
when(storeMock.close()).thenReturn(ImmutableList.copyOf(storeFileList));
when(storeMock.hasReferences()).thenReturn(true);
when(storeMock.close()).thenReturn(ImmutableList.<StoreFile>of());
this.parent.stores.put(Bytes.toBytes(""), storeMock);
SplitTransaction st = new SplitTransaction(this.parent, GOOD_SPLIT_ROW);

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -269,14 +270,16 @@ public class TestStore extends TestCase {
}
private static long getLowestTimeStampFromFS(FileSystem fs,
final List<StoreFile> candidates) throws IOException {
final Collection<StoreFile> candidates) throws IOException {
long minTs = Long.MAX_VALUE;
if (candidates.isEmpty()) {
return minTs;
}
Path[] p = new Path[candidates.size()];
for (int i = 0; i < candidates.size(); ++i) {
p[i] = candidates.get(i).getPath();
int i = 0;
for (StoreFile sf : candidates) {
p[i] = sf.getPath();
++i;
}
FileStatus[] stats = fs.listStatus(p);
@ -305,7 +308,7 @@ public class TestStore extends TestCase {
flush(1);
// Now put in place an empty store file. Its a little tricky. Have to
// do manually with hacked in sequence id.
StoreFile f = this.store.getStorefiles().get(0);
StoreFile f = this.store.getStorefiles().iterator().next();
Path storedir = f.getPath().getParent();
long seqid = f.getMaxSequenceId();
Configuration c = HBaseConfiguration.create();