HBASE-3696 isMajorCompaction() check triggers lots of listStatus DFS RPC calls from HBase
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1085560 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
be4fbee16b
commit
edd036b045
|
@ -102,6 +102,8 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-3541 REST Multi Gets (Elliott Clark via Stack)
|
||||
HBASE-3052 Add ability to have multiple ZK servers in a quorum in
|
||||
MiniZooKeeperCluster for test writing (Liyin Tang via Stack)
|
||||
HBASE-3696 isMajorCompaction() check triggers lots of listStatus DFS RPC
|
||||
calls from HBase (Liyin Tang via Stack)
|
||||
|
||||
TASK
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
|
@ -725,23 +725,11 @@ public class Store implements HeapSize {
|
|||
* @param dir
|
||||
* @throws IOException
|
||||
*/
|
||||
private static long getLowestTimestamp(FileSystem fs,
|
||||
final List<StoreFile> candidates) throws IOException {
|
||||
public static long getLowestTimestamp(final List<StoreFile> candidates)
|
||||
throws IOException {
|
||||
long minTs = Long.MAX_VALUE;
|
||||
if (candidates.isEmpty()) {
|
||||
return minTs;
|
||||
}
|
||||
Path[] p = new Path[candidates.size()];
|
||||
for (int i = 0; i < candidates.size(); ++i) {
|
||||
p[i] = candidates.get(i).getPath();
|
||||
}
|
||||
|
||||
FileStatus[] stats = fs.listStatus(p);
|
||||
if (stats == null || stats.length == 0) {
|
||||
return minTs;
|
||||
}
|
||||
for (FileStatus s : stats) {
|
||||
minTs = Math.min(minTs, s.getModificationTime());
|
||||
for (StoreFile storeFile : candidates) {
|
||||
minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
|
||||
}
|
||||
return minTs;
|
||||
}
|
||||
|
@ -781,7 +769,7 @@ public class Store implements HeapSize {
|
|||
return result;
|
||||
}
|
||||
// TODO: Use better method for determining stamp of last major (HBASE-2990)
|
||||
long lowTimestamp = getLowestTimestamp(fs, filesToCompact);
|
||||
long lowTimestamp = getLowestTimestamp(filesToCompact);
|
||||
long now = System.currentTimeMillis();
|
||||
if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) {
|
||||
// Major compaction time has elapsed.
|
||||
|
|
|
@ -40,6 +40,7 @@ import java.util.regex.Pattern;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
@ -171,6 +172,8 @@ public class StoreFile {
|
|||
private final Configuration conf;
|
||||
private final BloomType bloomType;
|
||||
|
||||
// the last modification time stamp
|
||||
private long modificationTimeStamp = 0L;
|
||||
|
||||
/**
|
||||
* Constructor, loads a reader and it's indices, etc. May allocate a
|
||||
|
@ -207,6 +210,14 @@ public class StoreFile {
|
|||
this.bloomType = BloomType.NONE;
|
||||
LOG.info("Ignoring bloom filter check for file (disabled in config)");
|
||||
}
|
||||
|
||||
// cache the modification time stamp of this store file
|
||||
FileStatus[] stats = fs.listStatus(p);
|
||||
if (stats != null && stats.length == 1) {
|
||||
this.modificationTimeStamp = stats[0].getModificationTime();
|
||||
} else {
|
||||
this.modificationTimeStamp = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -296,6 +307,10 @@ public class StoreFile {
|
|||
return this.sequenceid;
|
||||
}
|
||||
|
||||
public long getModificationTimeStamp() {
|
||||
return modificationTimeStamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the highest sequence ID found across all storefiles in
|
||||
* the given list. Store files that were created by a mapreduce
|
||||
|
|
|
@ -132,6 +132,54 @@ public class TestStore extends TestCase {
|
|||
store = new Store(basedir, region, hcd, fs, conf);
|
||||
}
|
||||
|
||||
public void testLowestModificationTime() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
// Initialize region
|
||||
init(getName(), conf);
|
||||
|
||||
int storeFileNum = 4;
|
||||
for (int i = 1; i <= storeFileNum; i++) {
|
||||
LOG.info("Adding some data for the store file #"+i);
|
||||
this.store.add(new KeyValue(row, family, qf1, i, (byte[])null));
|
||||
this.store.add(new KeyValue(row, family, qf2, i, (byte[])null));
|
||||
this.store.add(new KeyValue(row, family, qf3, i, (byte[])null));
|
||||
flush(i);
|
||||
}
|
||||
// after flush; check the lowest time stamp
|
||||
long lowestTimeStampFromStore =
|
||||
Store.getLowestTimestamp(store.getStorefiles());
|
||||
long lowestTimeStampFromFS =
|
||||
getLowestTimeStampFromFS(fs,store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
|
||||
|
||||
// after compact; check the lowest time stamp
|
||||
store.compact();
|
||||
lowestTimeStampFromStore = Store.getLowestTimestamp(store.getStorefiles());
|
||||
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles());
|
||||
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
|
||||
}
|
||||
|
||||
private static long getLowestTimeStampFromFS(FileSystem fs,
|
||||
final List<StoreFile> candidates) throws IOException {
|
||||
long minTs = Long.MAX_VALUE;
|
||||
if (candidates.isEmpty()) {
|
||||
return minTs;
|
||||
}
|
||||
Path[] p = new Path[candidates.size()];
|
||||
for (int i = 0; i < candidates.size(); ++i) {
|
||||
p[i] = candidates.get(i).getPath();
|
||||
}
|
||||
|
||||
FileStatus[] stats = fs.listStatus(p);
|
||||
if (stats == null || stats.length == 0) {
|
||||
return minTs;
|
||||
}
|
||||
for (FileStatus s : stats) {
|
||||
minTs = Math.min(minTs, s.getModificationTime());
|
||||
}
|
||||
return minTs;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// Get tests
|
||||
|
|
Loading…
Reference in New Issue