HBASE-3417 CacheOnWrite is using the temporary output path for block names, need to use a more consistent block naming scheme (jgray)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1183137 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
df9b82c082
commit
ef5476f5ca
|
@ -356,6 +356,8 @@ Release 0.92.0 - Unreleased
|
|||
HBASE-4582 Store.java cleanup (failing TestHeapSize and has warnings)
|
||||
HBASE-4556 Fix all incorrect uses of InternalScanner.next(...) (Lars H)
|
||||
HBASE-4078 Validate store files after flush/compaction
|
||||
HBASE-3417 CacheOnWrite is using the temporary output path for block
|
||||
names, need to use a more consistent block naming scheme (jgray)
|
||||
|
||||
TESTS
|
||||
HBASE-4450 test for number of blocks read: to serve as baseline for expected
|
||||
|
|
|
@ -95,7 +95,7 @@ public abstract class AbstractHFileWriter implements HFile.Writer {
|
|||
|
||||
/**
|
||||
* Name for this object used when logging or in toString. Is either
|
||||
* the result of a toString on stream or else toString of passed file Path.
|
||||
* the result of a toString on stream or else name of passed file Path.
|
||||
*/
|
||||
protected final String name;
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ public class CacheConfig {
|
|||
private final boolean inMemory;
|
||||
|
||||
/** Whether data blocks should be cached when new files are written */
|
||||
private final boolean cacheDataOnWrite;
|
||||
private boolean cacheDataOnWrite;
|
||||
|
||||
/** Whether index blocks should be cached when new files are written */
|
||||
private final boolean cacheIndexesOnWrite;
|
||||
|
@ -106,7 +106,7 @@ public class CacheConfig {
|
|||
private final boolean cacheBloomsOnWrite;
|
||||
|
||||
/** Whether blocks of a file should be evicted when the file is closed */
|
||||
private final boolean evictOnClose;
|
||||
private boolean evictOnClose;
|
||||
|
||||
/** Whether data blocks should be stored in compressed form in the cache */
|
||||
private final boolean cacheCompressed;
|
||||
|
@ -227,6 +227,15 @@ public class CacheConfig {
|
|||
return isBlockCacheEnabled() && this.cacheDataOnWrite;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only used for testing.
|
||||
* @param cacheDataOnWrite whether data blocks should be written to the cache
|
||||
* when an HFile is written
|
||||
*/
|
||||
public void setCacheDataOnWrite(boolean cacheDataOnWrite) {
|
||||
this.cacheDataOnWrite = cacheDataOnWrite;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if index blocks should be written to the cache when an HFile
|
||||
* is written, false if not
|
||||
|
@ -251,6 +260,15 @@ public class CacheConfig {
|
|||
return isBlockCacheEnabled() && this.evictOnClose;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only used for testing.
|
||||
* @param evictOnClose whether blocks should be evicted from the cache when an
|
||||
* HFile reader is closed
|
||||
*/
|
||||
public void setEvictOnClose(boolean evictOnClose) {
|
||||
this.evictOnClose = evictOnClose;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if blocks should be compressed in the cache, false if not
|
||||
*/
|
||||
|
|
|
@ -469,7 +469,8 @@ public class Store implements HeapSize {
|
|||
TimeRangeTracker snapshotTimeRangeTracker,
|
||||
MonitoredTask status)
|
||||
throws IOException {
|
||||
StoreFile.Writer writer = null;
|
||||
StoreFile.Writer writer;
|
||||
String fileName;
|
||||
long flushed = 0;
|
||||
// Don't flush if there are no entries.
|
||||
if (set.size() == 0) {
|
||||
|
@ -492,6 +493,7 @@ public class Store implements HeapSize {
|
|||
// A. Write the map out to the disk
|
||||
writer = createWriterInTmp(set.size());
|
||||
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
|
||||
fileName = writer.getPath().getName();
|
||||
try {
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
boolean hasMore;
|
||||
|
@ -519,7 +521,7 @@ public class Store implements HeapSize {
|
|||
}
|
||||
|
||||
// Write-out finished successfully, move into the right spot
|
||||
Path dstPath = StoreFile.getUniqueFile(fs, homedir);
|
||||
Path dstPath = new Path(homedir, fileName);
|
||||
validateStoreFile(writer.getPath());
|
||||
String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath;
|
||||
LOG.info(msg);
|
||||
|
@ -1245,15 +1247,17 @@ public class Store implements HeapSize {
|
|||
StoreFile result = null;
|
||||
if (compactedFile != null) {
|
||||
validateStoreFile(compactedFile.getPath());
|
||||
Path p = null;
|
||||
try {
|
||||
p = StoreFile.rename(this.fs, compactedFile.getPath(),
|
||||
StoreFile.getRandomFilename(fs, this.homedir));
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
|
||||
return null;
|
||||
// Move the file into the right spot
|
||||
Path origPath = compactedFile.getPath();
|
||||
Path destPath = new Path(homedir, origPath.getName());
|
||||
LOG.info("Renaming compacted file at " + origPath + " to " + destPath);
|
||||
if (!fs.rename(origPath, destPath)) {
|
||||
LOG.error("Failed move of compacted file " + origPath + " to " +
|
||||
destPath);
|
||||
throw new IOException("Failed move of compacted file " + origPath +
|
||||
" to " + destPath);
|
||||
}
|
||||
result = new StoreFile(this.fs, p, this.conf, this.cacheConf,
|
||||
result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
|
||||
this.family.getBloomFilterType());
|
||||
result.createReader();
|
||||
}
|
||||
|
@ -1587,7 +1591,7 @@ public class Store implements HeapSize {
|
|||
return storeSize;
|
||||
}
|
||||
|
||||
void triggerMajorCompaction() {
|
||||
public void triggerMajorCompaction() {
|
||||
this.forceMajor = true;
|
||||
}
|
||||
|
||||
|
@ -1815,6 +1819,13 @@ public class Store implements HeapSize {
|
|||
return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for tests. Get the cache configuration for this Store.
|
||||
*/
|
||||
public CacheConfig getCacheConfig() {
|
||||
return this.cacheConf;
|
||||
}
|
||||
|
||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT + (17 * ClassSize.REFERENCE) +
|
||||
(7 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
|
||||
|
|
|
@ -28,8 +28,8 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.SortedSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -162,14 +162,11 @@ public class StoreFile {
|
|||
* this files id. Group 2 the referenced region name, etc.
|
||||
*/
|
||||
private static final Pattern REF_NAME_PARSER =
|
||||
Pattern.compile("^(\\d+)(?:\\.(.+))?$");
|
||||
Pattern.compile("^([0-9a-f]+)(?:\\.(.+))?$");
|
||||
|
||||
// StoreFile.Reader
|
||||
private volatile Reader reader;
|
||||
|
||||
// Used making file ids.
|
||||
private final static Random rand = new Random();
|
||||
|
||||
/**
|
||||
* Bloom filter type specified in column family configuration. Does not
|
||||
* necessarily correspond to the Bloom filter type present in the HFile.
|
||||
|
@ -662,7 +659,7 @@ public class StoreFile {
|
|||
throw new IOException("Expecting " + dir.toString() +
|
||||
" to be a directory");
|
||||
}
|
||||
return fs.getFileStatus(dir).isDir()? getRandomFilename(fs, dir): dir;
|
||||
return getRandomFilename(fs, dir);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -689,14 +686,8 @@ public class StoreFile {
|
|||
final Path dir,
|
||||
final String suffix)
|
||||
throws IOException {
|
||||
long id = -1;
|
||||
Path p = null;
|
||||
do {
|
||||
id = Math.abs(rand.nextLong());
|
||||
p = new Path(dir, Long.toString(id) +
|
||||
((suffix == null || suffix.length() <= 0)? "": suffix));
|
||||
} while(fs.exists(p));
|
||||
return p;
|
||||
return new Path(dir, UUID.randomUUID().toString().replaceAll("-", "")
|
||||
+ (suffix == null ? "" : suffix));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -773,7 +764,7 @@ public class StoreFile {
|
|||
CacheConfig cacheConf,
|
||||
final KVComparator comparator, BloomType bloomType, long maxKeys)
|
||||
throws IOException {
|
||||
writer = HFile.getWriterFactory(conf).createWriter(
|
||||
writer = HFile.getWriterFactory(conf, cacheConf).createWriter(
|
||||
fs, path, blocksize,
|
||||
compress, comparator.getRawComparator());
|
||||
|
||||
|
|
|
@ -67,6 +67,11 @@ import org.apache.hadoop.hbase.filter.RowFilter;
|
|||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
|
||||
|
@ -4262,5 +4267,106 @@ public class TestFromClientSide {
|
|||
assertEquals(numOfRegions, scanMetrics.countOfRegions.getCurrentIntervalValue());
|
||||
}
|
||||
|
||||
}
|
||||
/**
|
||||
* Tests that cache on write works all the way up from the client-side.
|
||||
*
|
||||
* Performs inserts, flushes, and compactions, verifying changes in the block
|
||||
* cache along the way.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testCacheOnWriteEvictOnClose() throws Exception {
|
||||
byte [] tableName = Bytes.toBytes("testCOWEOCfromClient");
|
||||
byte [] data = Bytes.toBytes("data");
|
||||
HTable table = TEST_UTIL.createTable(tableName, new byte [][] {FAMILY});
|
||||
// get the block cache and region
|
||||
String regionName = table.getRegionLocations().firstKey().getEncodedName();
|
||||
HRegion region = TEST_UTIL.getRSForFirstRegionInTable(
|
||||
tableName).getFromOnlineRegions(regionName);
|
||||
Store store = region.getStores().values().iterator().next();
|
||||
CacheConfig cacheConf = store.getCacheConfig();
|
||||
cacheConf.setCacheDataOnWrite(true);
|
||||
cacheConf.setEvictOnClose(true);
|
||||
BlockCache cache = cacheConf.getBlockCache();
|
||||
|
||||
// establish baseline stats
|
||||
long startBlockCount = cache.getBlockCount();
|
||||
long startBlockHits = cache.getStats().getHitCount();
|
||||
long startBlockMiss = cache.getStats().getMissCount();
|
||||
// insert data
|
||||
Put put = new Put(ROW);
|
||||
put.add(FAMILY, QUALIFIER, data);
|
||||
table.put(put);
|
||||
assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
|
||||
// data was in memstore so don't expect any changes
|
||||
assertEquals(startBlockCount, cache.getBlockCount());
|
||||
assertEquals(startBlockHits, cache.getStats().getHitCount());
|
||||
assertEquals(startBlockMiss, cache.getStats().getMissCount());
|
||||
// flush the data
|
||||
System.out.println("Flushing cache");
|
||||
region.flushcache();
|
||||
// expect one more block in cache, no change in hits/misses
|
||||
long expectedBlockCount = startBlockCount + 1;
|
||||
long expectedBlockHits = startBlockHits;
|
||||
long expectedBlockMiss = startBlockMiss;
|
||||
assertEquals(expectedBlockCount, cache.getBlockCount());
|
||||
assertEquals(expectedBlockHits, cache.getStats().getHitCount());
|
||||
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
|
||||
// read the data and expect same blocks, one new hit, no misses
|
||||
assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
|
||||
assertEquals(expectedBlockCount, cache.getBlockCount());
|
||||
assertEquals(++expectedBlockHits, cache.getStats().getHitCount());
|
||||
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
|
||||
// insert a second column, read the row, no new blocks, one new hit
|
||||
byte [] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
|
||||
byte [] data2 = Bytes.add(data, data);
|
||||
put = new Put(ROW);
|
||||
put.add(FAMILY, QUALIFIER2, data2);
|
||||
table.put(put);
|
||||
Result r = table.get(new Get(ROW));
|
||||
assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
|
||||
assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
|
||||
assertEquals(expectedBlockCount, cache.getBlockCount());
|
||||
assertEquals(++expectedBlockHits, cache.getStats().getHitCount());
|
||||
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
|
||||
// flush, one new block
|
||||
System.out.println("Flushing cache");
|
||||
region.flushcache();
|
||||
assertEquals(++expectedBlockCount, cache.getBlockCount());
|
||||
assertEquals(expectedBlockHits, cache.getStats().getHitCount());
|
||||
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
|
||||
// compact, net minus on block, two hits, no misses
|
||||
System.out.println("Compacting");
|
||||
assertEquals(2, store.getNumberOfstorefiles());
|
||||
store.triggerMajorCompaction();
|
||||
region.compactStores();
|
||||
waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
|
||||
assertEquals(1, store.getNumberOfstorefiles());
|
||||
assertEquals(--expectedBlockCount, cache.getBlockCount());
|
||||
expectedBlockHits += 2;
|
||||
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
|
||||
assertEquals(expectedBlockHits, cache.getStats().getHitCount());
|
||||
// read the row, same blocks, one hit no miss
|
||||
r = table.get(new Get(ROW));
|
||||
assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
|
||||
assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
|
||||
assertEquals(expectedBlockCount, cache.getBlockCount());
|
||||
assertEquals(++expectedBlockHits, cache.getStats().getHitCount());
|
||||
assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
|
||||
// no cache misses!
|
||||
assertEquals(startBlockMiss, cache.getStats().getMissCount());
|
||||
}
|
||||
|
||||
private void waitForStoreFileCount(Store store, int count, int timeout)
|
||||
throws InterruptedException {
|
||||
long start = System.currentTimeMillis();
|
||||
while (start + timeout > System.currentTimeMillis() &&
|
||||
store.getNumberOfstorefiles() != count) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
System.out.println("start=" + start + ", now=" +
|
||||
System.currentTimeMillis() + ", cur=" + store.getNumberOfstorefiles());
|
||||
assertEquals(count, store.getNumberOfstorefiles());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue