HADOOP-2485 Make mapfile index interval configurable
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@607131 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bbae5a575f
commit
164bc44b58
|
@ -20,6 +20,8 @@ Trunk (unreleased changes)
|
|||
|
||||
OPTIMIZATIONS
|
||||
HADOOP-2479 Save on number of Text object creations
|
||||
HADOOP-2485 Make mapfile index interval configurable (Set default to 32
|
||||
instead of 128)
|
||||
|
||||
BUG FIXES
|
||||
HADOOP-2059 In tests, exceptions in min dfs shutdown should not fail test
|
||||
|
|
|
@ -201,6 +201,16 @@
|
|||
<description>How often a region server runs the split/compaction check.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.io.index.interval</name>
|
||||
<value>32</value>
|
||||
<description>The interval at which we record offsets in hbase
|
||||
store files/mapfiles. Default for stock mapfiles is 128. Index
|
||||
files are read into memory. If there are many of them, could prove
|
||||
a burden. If so play with the hadoop io.map.index.skip property and
|
||||
skip every nth index member when reading back the index into memory.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<!-- HbaseShell Configurations -->
|
||||
<property>
|
||||
|
|
|
@ -937,11 +937,13 @@ class HStore implements HConstants {
|
|||
//
|
||||
// Related, looks like 'merging compactions' in BigTable paper interlaces
|
||||
// a memcache flush. We don't.
|
||||
int entries = 0;
|
||||
try {
|
||||
for (Map.Entry<HStoreKey, byte []> es: cache.entrySet()) {
|
||||
HStoreKey curkey = es.getKey();
|
||||
TextSequence f = HStoreKey.extractFamily(curkey.getColumn());
|
||||
if (f.equals(this.familyName)) {
|
||||
entries++;
|
||||
out.append(curkey, new ImmutableBytesWritable(es.getValue()));
|
||||
}
|
||||
}
|
||||
|
@ -967,10 +969,10 @@ class HStore implements HConstants {
|
|||
flushedFile.getReader(this.fs, this.bloomFilter));
|
||||
this.storefiles.put(flushid, flushedFile);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Added " + name +
|
||||
" with sequence id " + logCacheFlushId + " and size " +
|
||||
StringUtils.humanReadableInt(flushedFile.length()) + " for " +
|
||||
this.regionName + "/" + this.familyName);
|
||||
LOG.debug("Added " + name + " with " + entries +
|
||||
" entries, sequence id " + logCacheFlushId + ", and size " +
|
||||
StringUtils.humanReadableInt(flushedFile.length()) + " for " +
|
||||
this.regionName + "/" + this.familyName);
|
||||
}
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
|
|
|
@ -654,6 +654,182 @@ public class HStoreFile implements HConstants, WritableComparable {
|
|||
return success;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hbase customizations of MapFile.
|
||||
*/
|
||||
static class HbaseMapFile extends MapFile {
|
||||
|
||||
static class HbaseReader extends MapFile.Reader {
|
||||
public HbaseReader(FileSystem fs, String dirName, Configuration conf)
|
||||
throws IOException {
|
||||
super(fs, dirName, conf);
|
||||
// Force reading of the mapfile index by calling midKey.
|
||||
// Reading the index will bring the index into memory over
|
||||
// here on the client and then close the index file freeing
|
||||
// up socket connection and resources in the datanode.
|
||||
// Usually, the first access on a MapFile.Reader will load the
|
||||
// index force the issue in HStoreFile MapFiles because an
|
||||
// access may not happen for some time; meantime we're
|
||||
// using up datanode resources. See HADOOP-2341.
|
||||
midKey();
|
||||
}
|
||||
}
|
||||
|
||||
static class HbaseWriter extends MapFile.Writer {
|
||||
public HbaseWriter(Configuration conf, FileSystem fs, String dirName,
|
||||
Class<Writable> keyClass, Class<Writable> valClass,
|
||||
SequenceFile.CompressionType compression)
|
||||
throws IOException {
|
||||
super(conf, fs, dirName, keyClass, valClass, compression);
|
||||
// Default for mapfiles is 128. Makes random reads faster if we
|
||||
// have more keys indexed and we're not 'next'-ing around in the
|
||||
// mapfile.
|
||||
setIndexInterval(conf.getInt("hbase.index.interval", 128));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On write, all keys are added to a bloom filter. On read, all keys are
|
||||
* tested first against bloom filter. Keys are HStoreKey. If passed bloom
|
||||
* filter is null, just passes invocation to parent.
|
||||
*/
|
||||
static class BloomFilterMapFile extends HbaseMapFile {
|
||||
static class Reader extends HbaseReader {
|
||||
private final Filter bloomFilter;
|
||||
|
||||
public Reader(FileSystem fs, String dirName, Configuration conf,
|
||||
final Filter filter)
|
||||
throws IOException {
|
||||
super(fs, dirName, conf);
|
||||
this.bloomFilter = filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Writable get(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
if (this.bloomFilter == null) {
|
||||
return super.get(key, val);
|
||||
}
|
||||
if(this.bloomFilter.membershipTest(getBloomFilterKey(key))) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key exists");
|
||||
}
|
||||
return super.get(key, val);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key does not exist");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public WritableComparable getClosest(WritableComparable key,
|
||||
Writable val)
|
||||
throws IOException {
|
||||
if (this.bloomFilter == null) {
|
||||
return super.getClosest(key, val);
|
||||
}
|
||||
// Note - the key being passed to us is always a HStoreKey
|
||||
if(this.bloomFilter.membershipTest(getBloomFilterKey(key))) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key exists");
|
||||
}
|
||||
return super.getClosest(key, val);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key does not exist");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
static class Writer extends HbaseWriter {
|
||||
private final Filter bloomFilter;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Writer(Configuration conf, FileSystem fs, String dirName,
|
||||
Class keyClass, Class valClass,
|
||||
SequenceFile.CompressionType compression, final Filter filter)
|
||||
throws IOException {
|
||||
super(conf, fs, dirName, keyClass, valClass, compression);
|
||||
this.bloomFilter = filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
if (this.bloomFilter != null) {
|
||||
this.bloomFilter.add(getBloomFilterKey(key));
|
||||
}
|
||||
super.append(key, val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom bloom filter key maker.
|
||||
* @param key
|
||||
* @return Key made of bytes of row and column only.
|
||||
* @throws IOException
|
||||
*/
|
||||
static Key getBloomFilterKey(WritableComparable key)
|
||||
throws IOException {
|
||||
HStoreKey hsk = (HStoreKey)key;
|
||||
byte [] bytes = null;
|
||||
try {
|
||||
bytes = (hsk.getRow().toString() + hsk.getColumn().toString()).
|
||||
getBytes(UTF8_ENCODING);
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new IOException(e.toString());
|
||||
}
|
||||
return new Key(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get reader for the store file map file.
|
||||
* Client is responsible for closing file when done.
|
||||
* @param fs
|
||||
* @param bloomFilter If null, no filtering is done.
|
||||
* @return MapFile.Reader
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized MapFile.Reader getReader(final FileSystem fs,
|
||||
final Filter bloomFilter)
|
||||
throws IOException {
|
||||
return isReference()?
|
||||
new HStoreFile.HalfMapFileReader(fs,
|
||||
getMapFilePath(getReference().getEncodedRegionName(),
|
||||
getReference().getFileId()).toString(),
|
||||
this.conf, getReference().getFileRegion(), getReference().getMidkey(),
|
||||
bloomFilter):
|
||||
new BloomFilterMapFile.Reader(fs, getMapFilePath().toString(),
|
||||
this.conf, bloomFilter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a store file writer.
|
||||
* Client is responsible for closing file when done.
|
||||
* @param fs
|
||||
* @param compression Pass <code>SequenceFile.CompressionType.NONE</code>
|
||||
* for none.
|
||||
* @param bloomFilter If null, no filtering is done.
|
||||
* @return MapFile.Writer
|
||||
* @throws IOException
|
||||
*/
|
||||
public MapFile.Writer getWriter(final FileSystem fs,
|
||||
final SequenceFile.CompressionType compression,
|
||||
final Filter bloomFilter)
|
||||
throws IOException {
|
||||
if (isReference()) {
|
||||
throw new IOException("Illegal Access: Cannot get a writer on a" +
|
||||
"HStoreFile reference");
|
||||
}
|
||||
return new BloomFilterMapFile.Writer(conf, fs,
|
||||
getMapFilePath().toString(), HStoreKey.class,
|
||||
ImmutableBytesWritable.class, compression, bloomFilter);
|
||||
}
|
||||
|
||||
/**
|
||||
* A facade for a {@link MapFile.Reader} that serves up either the top or
|
||||
* bottom half of a MapFile (where 'bottom' is the first half of the file
|
||||
|
@ -787,184 +963,6 @@ public class HStoreFile implements HConstants, WritableComparable {
|
|||
return super.seek(key);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On write, all keys are added to a bloom filter. On read, all keys are
|
||||
* tested first against bloom filter. Keys are HStoreKey. If passed bloom
|
||||
* filter is null, just passes invocation to parent.
|
||||
*/
|
||||
static class BloomFilterMapFile extends MapFile {
|
||||
protected BloomFilterMapFile() {
|
||||
super();
|
||||
}
|
||||
|
||||
static class Reader extends MapFile.Reader {
|
||||
private final Filter bloomFilter;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param conf
|
||||
* @param filter
|
||||
* @throws IOException
|
||||
*/
|
||||
public Reader(FileSystem fs, String dirName, Configuration conf,
|
||||
final Filter filter)
|
||||
throws IOException {
|
||||
super(fs, dirName, conf);
|
||||
this.bloomFilter = filter;
|
||||
// Force reading of the mapfile index by calling midKey.
|
||||
// Reading the index will bring the index into memory over
|
||||
// here on the client and then close the index file freeing
|
||||
// up socket connection and resources in the datanode.
|
||||
// Usually, the first access on a MapFile.Reader will load the
|
||||
// index force the issue in HStoreFile MapFiles because an
|
||||
// access may not happen for some time; meantime we're
|
||||
// using up datanode resources. See HADOOP-2341.
|
||||
midKey();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public Writable get(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
if (this.bloomFilter == null) {
|
||||
return super.get(key, val);
|
||||
}
|
||||
if(this.bloomFilter.membershipTest(getBloomFilterKey(key))) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key exists");
|
||||
}
|
||||
return super.get(key, val);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key does not exist");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public WritableComparable getClosest(WritableComparable key,
|
||||
Writable val)
|
||||
throws IOException {
|
||||
if (this.bloomFilter == null) {
|
||||
return super.getClosest(key, val);
|
||||
}
|
||||
// Note - the key being passed to us is always a HStoreKey
|
||||
if(this.bloomFilter.membershipTest(getBloomFilterKey(key))) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key exists");
|
||||
}
|
||||
return super.getClosest(key, val);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key does not exist");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
static class Writer extends MapFile.Writer {
|
||||
private final Filter bloomFilter;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param conf
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param keyClass
|
||||
* @param valClass
|
||||
* @param compression
|
||||
* @param filter
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Writer(Configuration conf, FileSystem fs, String dirName,
|
||||
Class keyClass, Class valClass,
|
||||
SequenceFile.CompressionType compression, final Filter filter)
|
||||
throws IOException {
|
||||
super(conf, fs, dirName, keyClass, valClass, compression);
|
||||
this.bloomFilter = filter;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public void append(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
if (this.bloomFilter != null) {
|
||||
this.bloomFilter.add(getBloomFilterKey(key));
|
||||
}
|
||||
super.append(key, val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom bloom filter key maker.
|
||||
* @param key
|
||||
* @return Key made of bytes of row and column only.
|
||||
* @throws IOException
|
||||
*/
|
||||
static Key getBloomFilterKey(WritableComparable key)
|
||||
throws IOException {
|
||||
HStoreKey hsk = (HStoreKey)key;
|
||||
byte [] bytes = null;
|
||||
try {
|
||||
bytes = (hsk.getRow().toString() + hsk.getColumn().toString()).
|
||||
getBytes(UTF8_ENCODING);
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new IOException(e.toString());
|
||||
}
|
||||
return new Key(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get reader for the store file map file.
|
||||
* Client is responsible for closing file when done.
|
||||
* @param fs
|
||||
* @param bloomFilter If null, no filtering is done.
|
||||
* @return MapFile.Reader
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized MapFile.Reader getReader(final FileSystem fs,
|
||||
final Filter bloomFilter)
|
||||
throws IOException {
|
||||
return isReference()?
|
||||
new HStoreFile.HalfMapFileReader(fs,
|
||||
getMapFilePath(getReference().getEncodedRegionName(),
|
||||
getReference().getFileId()).toString(),
|
||||
this.conf, getReference().getFileRegion(), getReference().getMidkey(),
|
||||
bloomFilter):
|
||||
new BloomFilterMapFile.Reader(fs, getMapFilePath().toString(),
|
||||
this.conf, bloomFilter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a store file writer.
|
||||
* Client is responsible for closing file when done.
|
||||
* @param fs
|
||||
* @param compression Pass <code>SequenceFile.CompressionType.NONE</code>
|
||||
* for none.
|
||||
* @param bloomFilter If null, no filtering is done.
|
||||
* @return MapFile.Writer
|
||||
* @throws IOException
|
||||
*/
|
||||
public MapFile.Writer getWriter(final FileSystem fs,
|
||||
final SequenceFile.CompressionType compression,
|
||||
final Filter bloomFilter)
|
||||
throws IOException {
|
||||
if (isReference()) {
|
||||
throw new IOException("Illegal Access: Cannot get a writer on a" +
|
||||
"HStoreFile reference");
|
||||
}
|
||||
return new BloomFilterMapFile.Writer(conf, fs,
|
||||
getMapFilePath().toString(), HStoreKey.class,
|
||||
ImmutableBytesWritable.class, compression, bloomFilter);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Length of the store map file. If a reference, size is
|
||||
|
|
Loading…
Reference in New Issue