HADOOP-1398. Add HBase in-memory block cache. Contributed by Tom White.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk/src/contrib/hbase@618347 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2008-02-04 17:54:35 +00:00
parent 678403c4fe
commit 5d2ec2506d
20 changed files with 801 additions and 248 deletions

View File

@ -221,6 +221,12 @@
such as hlog. such as hlog.
</description> </description>
</property> </property>
<property>
<name>hbase.hstore.blockCache.blockSize</name>
<value>65536</value>
<description>The size of each block in any block caches.
</description>
</property>
<!-- HbaseShell Configurations --> <!-- HbaseShell Configurations -->
<property> <property>

View File

@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.io.TextSequence;
public class HColumnDescriptor implements WritableComparable { public class HColumnDescriptor implements WritableComparable {
// For future backward compatibility // For future backward compatibility
private static final byte COLUMN_DESCRIPTOR_VERSION = (byte)1; private static final byte COLUMN_DESCRIPTOR_VERSION = (byte)2;
/** Legal family names can only contain 'word characters' and end in a colon. */ /** Legal family names can only contain 'word characters' and end in a colon. */
public static final Pattern LEGAL_FAMILY_NAME = Pattern.compile("\\w+:"); public static final Pattern LEGAL_FAMILY_NAME = Pattern.compile("\\w+:");
@ -76,6 +76,11 @@ public class HColumnDescriptor implements WritableComparable {
*/ */
public static final boolean DEFAULT_IN_MEMORY = false; public static final boolean DEFAULT_IN_MEMORY = false;
/**
* Default setting for whether to use a block cache or not.
*/
public static final boolean DEFAULT_BLOCK_CACHE_ENABLED = false;
/** /**
* Default maximum length of cell contents. * Default maximum length of cell contents.
*/ */
@ -95,6 +100,8 @@ public class HColumnDescriptor implements WritableComparable {
private CompressionType compressionType; private CompressionType compressionType;
// Serve reads from in-memory cache // Serve reads from in-memory cache
private boolean inMemory; private boolean inMemory;
// Serve reads from in-memory block cache
private boolean blockCacheEnabled;
// Maximum value size // Maximum value size
private int maxValueLength; private int maxValueLength;
// True if bloom filter was specified // True if bloom filter was specified
@ -123,6 +130,7 @@ public class HColumnDescriptor implements WritableComparable {
this(columnName == null || columnName.length() <= 0? this(columnName == null || columnName.length() <= 0?
new Text(): new Text(columnName), new Text(): new Text(columnName),
DEFAULT_N_VERSIONS, DEFAULT_COMPRESSION_TYPE, DEFAULT_IN_MEMORY, DEFAULT_N_VERSIONS, DEFAULT_COMPRESSION_TYPE, DEFAULT_IN_MEMORY,
DEFAULT_BLOCK_CACHE_ENABLED,
Integer.MAX_VALUE, DEFAULT_BLOOM_FILTER_DESCRIPTOR); Integer.MAX_VALUE, DEFAULT_BLOOM_FILTER_DESCRIPTOR);
} }
@ -134,6 +142,7 @@ public class HColumnDescriptor implements WritableComparable {
* @param compression Compression type * @param compression Compression type
* @param inMemory If true, column data should be kept in an HRegionServer's * @param inMemory If true, column data should be kept in an HRegionServer's
* cache * cache
* @param blockCacheEnabled If true, MapFile blocks should be cached
* @param maxValueLength Restrict values to &lt;= this value * @param maxValueLength Restrict values to &lt;= this value
* @param bloomFilter Enable the specified bloom filter for this column * @param bloomFilter Enable the specified bloom filter for this column
* *
@ -144,6 +153,7 @@ public class HColumnDescriptor implements WritableComparable {
*/ */
public HColumnDescriptor(final Text name, final int maxVersions, public HColumnDescriptor(final Text name, final int maxVersions,
final CompressionType compression, final boolean inMemory, final CompressionType compression, final boolean inMemory,
final boolean blockCacheEnabled,
final int maxValueLength, final BloomFilterDescriptor bloomFilter) { final int maxValueLength, final BloomFilterDescriptor bloomFilter) {
String familyStr = name.toString(); String familyStr = name.toString();
// Test name if not null (It can be null when deserializing after // Test name if not null (It can be null when deserializing after
@ -165,6 +175,7 @@ public class HColumnDescriptor implements WritableComparable {
} }
this.maxVersions = maxVersions; this.maxVersions = maxVersions;
this.inMemory = inMemory; this.inMemory = inMemory;
this.blockCacheEnabled = blockCacheEnabled;
this.maxValueLength = maxValueLength; this.maxValueLength = maxValueLength;
this.bloomFilter = bloomFilter; this.bloomFilter = bloomFilter;
this.bloomFilterSpecified = this.bloomFilter == null ? false : true; this.bloomFilterSpecified = this.bloomFilter == null ? false : true;
@ -212,6 +223,13 @@ public class HColumnDescriptor implements WritableComparable {
return this.inMemory; return this.inMemory;
} }
/**
* @return True if MapFile blocks should be cached.
*/
public boolean isBlockCacheEnabled() {
return blockCacheEnabled;
}
/** /**
* @return Maximum value length. * @return Maximum value length.
*/ */
@ -234,6 +252,7 @@ public class HColumnDescriptor implements WritableComparable {
return "{name: " + tmp.substring(0, tmp.length() - 1) + return "{name: " + tmp.substring(0, tmp.length() - 1) +
", max versions: " + maxVersions + ", max versions: " + maxVersions +
", compression: " + this.compressionType + ", in memory: " + inMemory + ", compression: " + this.compressionType + ", in memory: " + inMemory +
", block cache enabled: " + blockCacheEnabled +
", max length: " + maxValueLength + ", bloom filter: " + ", max length: " + maxValueLength + ", bloom filter: " +
(bloomFilterSpecified ? bloomFilter.toString() : "none") + "}"; (bloomFilterSpecified ? bloomFilter.toString() : "none") + "}";
} }
@ -251,6 +270,7 @@ public class HColumnDescriptor implements WritableComparable {
result ^= Integer.valueOf(this.maxVersions).hashCode(); result ^= Integer.valueOf(this.maxVersions).hashCode();
result ^= this.compressionType.hashCode(); result ^= this.compressionType.hashCode();
result ^= Boolean.valueOf(this.inMemory).hashCode(); result ^= Boolean.valueOf(this.inMemory).hashCode();
result ^= Boolean.valueOf(this.blockCacheEnabled).hashCode();
result ^= Integer.valueOf(this.maxValueLength).hashCode(); result ^= Integer.valueOf(this.maxValueLength).hashCode();
result ^= Boolean.valueOf(this.bloomFilterSpecified).hashCode(); result ^= Boolean.valueOf(this.bloomFilterSpecified).hashCode();
result ^= Byte.valueOf(this.versionNumber).hashCode(); result ^= Byte.valueOf(this.versionNumber).hashCode();
@ -277,6 +297,10 @@ public class HColumnDescriptor implements WritableComparable {
bloomFilter = new BloomFilterDescriptor(); bloomFilter = new BloomFilterDescriptor();
bloomFilter.readFields(in); bloomFilter.readFields(in);
} }
if (this.versionNumber > 1) {
this.blockCacheEnabled = in.readBoolean();
}
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@ -292,6 +316,8 @@ public class HColumnDescriptor implements WritableComparable {
if(bloomFilterSpecified) { if(bloomFilterSpecified) {
bloomFilter.write(out); bloomFilter.write(out);
} }
out.writeBoolean(this.blockCacheEnabled);
} }
// Comparable // Comparable
@ -327,6 +353,18 @@ public class HColumnDescriptor implements WritableComparable {
} }
} }
if(result == 0) {
if(this.blockCacheEnabled == other.blockCacheEnabled) {
result = 0;
} else if(this.blockCacheEnabled) {
result = -1;
} else {
result = 1;
}
}
if(result == 0) { if(result == 0) {
result = other.maxValueLength - this.maxValueLength; result = other.maxValueLength - this.maxValueLength;
} }

View File

@ -739,13 +739,23 @@ public class HStore implements HConstants {
// Move maxSeqId on by one. Why here? And not in HRegion? // Move maxSeqId on by one. Why here? And not in HRegion?
this.maxSeqId += 1; this.maxSeqId += 1;
// Finally, start up all the map readers! (There should be just one at this // Finally, start up all the map readers! (There could be more than one
// point, as we've compacted them all.) // since we haven't compacted yet.)
boolean first = true;
for(Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) { for(Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
if (first) {
// Use a block cache (if configured) for the first reader only
// so as to control memory usage.
this.readers.put(e.getKey(),
e.getValue().getReader(this.fs, this.bloomFilter,
family.isBlockCacheEnabled()));
first = false;
} else {
this.readers.put(e.getKey(), this.readers.put(e.getKey(),
e.getValue().getReader(this.fs, this.bloomFilter)); e.getValue().getReader(this.fs, this.bloomFilter));
} }
} }
}
/* /*
* @param hstoreFiles * @param hstoreFiles
@ -1560,7 +1570,10 @@ public class HStore implements HConstants {
// 6. Loading the new TreeMap. // 6. Loading the new TreeMap.
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs)); Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
this.readers.put(orderVal, this.readers.put(orderVal,
finalCompactedFile.getReader(this.fs, this.bloomFilter)); // Use a block cache (if configured) for this reader since
// it is the only one.
finalCompactedFile.getReader(this.fs, this.bloomFilter,
family.isBlockCacheEnabled()));
this.storefiles.put(orderVal, finalCompactedFile); this.storefiles.put(orderVal, finalCompactedFile);
} catch (IOException e) { } catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e); e = RemoteExceptionHandler.checkIOException(e);

View File

@ -31,9 +31,11 @@ import java.util.Random;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.BlockFSInputStream;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.MapFile;
@ -423,6 +425,29 @@ public class HStoreFile implements HConstants {
conf, bloomFilter); conf, bloomFilter);
} }
/**
* Get reader for the store file map file.
* Client is responsible for closing file when done.
* @param fs
* @param bloomFilter If null, no filtering is done.
* @param blockCacheEnabled If true, MapFile blocks should be cached.
* @return MapFile.Reader
* @throws IOException
*/
public synchronized MapFile.Reader getReader(final FileSystem fs,
final Filter bloomFilter, final boolean blockCacheEnabled)
throws IOException {
if (isReference()) {
return new HStoreFile.HalfMapFileReader(fs,
getMapFilePath(reference).toString(), conf,
reference.getFileRegion(), reference.getMidkey(), bloomFilter,
blockCacheEnabled);
}
return new BloomFilterMapFile.Reader(fs, getMapFilePath().toString(),
conf, bloomFilter, blockCacheEnabled);
}
/** /**
* Get a store file writer. * Get a store file writer.
* Client is responsible for closing file when done. * Client is responsible for closing file when done.
@ -581,7 +606,13 @@ public class HStoreFile implements HConstants {
*/ */
static class HbaseMapFile extends MapFile { static class HbaseMapFile extends MapFile {
/**
* A reader capable of reading and caching blocks of the data file.
*/
static class HbaseReader extends MapFile.Reader { static class HbaseReader extends MapFile.Reader {
private final boolean blockCacheEnabled;
/** /**
* @param fs * @param fs
* @param dirName * @param dirName
@ -590,7 +621,23 @@ public class HStoreFile implements HConstants {
*/ */
public HbaseReader(FileSystem fs, String dirName, Configuration conf) public HbaseReader(FileSystem fs, String dirName, Configuration conf)
throws IOException { throws IOException {
super(fs, dirName, conf); this(fs, dirName, conf, false);
}
/**
* @param fs
* @param dirName
* @param conf
* @param blockCacheEnabled
* @throws IOException
*/
public HbaseReader(FileSystem fs, String dirName, Configuration conf,
boolean blockCacheEnabled)
throws IOException {
super(fs, dirName, null, conf, false); // defer opening streams
this.blockCacheEnabled = blockCacheEnabled;
open(fs, dirName, null, conf);
// Force reading of the mapfile index by calling midKey. // Force reading of the mapfile index by calling midKey.
// Reading the index will bring the index into memory over // Reading the index will bring the index into memory over
// here on the client and then close the index file freeing // here on the client and then close the index file freeing
@ -601,6 +648,28 @@ public class HStoreFile implements HConstants {
// using up datanode resources. See HADOOP-2341. // using up datanode resources. See HADOOP-2341.
midKey(); midKey();
} }
@Override
protected org.apache.hadoop.io.SequenceFile.Reader createDataFileReader(
FileSystem fs, Path dataFile, Configuration conf)
throws IOException {
if (!blockCacheEnabled) {
return super.createDataFileReader(fs, dataFile, conf);
}
LOG.info("Block Cache enabled");
final int blockSize = conf.getInt("hbase.hstore.blockCache.blockSize",
64 * 1024);
return new SequenceFile.Reader(fs, dataFile, conf) {
@Override
protected FSDataInputStream openFile(FileSystem fs, Path file,
int bufferSize, long length) throws IOException {
return new FSDataInputStream(new BlockFSInputStream(
super.openFile(fs, file, bufferSize, length), length,
blockSize));
}
};
}
} }
static class HbaseWriter extends MapFile.Writer { static class HbaseWriter extends MapFile.Writer {
@ -649,6 +718,13 @@ public class HStoreFile implements HConstants {
bloomFilter = filter; bloomFilter = filter;
} }
public Reader(FileSystem fs, String dirName, Configuration conf,
final Filter filter, final boolean blockCacheEnabled)
throws IOException {
super(fs, dirName, conf, blockCacheEnabled);
bloomFilter = filter;
}
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public Writable get(WritableComparable key, Writable val) public Writable get(WritableComparable key, Writable val)
@ -741,7 +817,7 @@ public class HStoreFile implements HConstants {
final Configuration conf, final Range r, final Configuration conf, final Range r,
final WritableComparable midKey) final WritableComparable midKey)
throws IOException { throws IOException {
this(fs, dirName, conf, r, midKey, null); this(fs, dirName, conf, r, midKey, null, false);
} }
HalfMapFileReader(final FileSystem fs, final String dirName, HalfMapFileReader(final FileSystem fs, final String dirName,
@ -753,6 +829,16 @@ public class HStoreFile implements HConstants {
midkey = midKey; midkey = midKey;
} }
HalfMapFileReader(final FileSystem fs, final String dirName,
final Configuration conf, final Range r,
final WritableComparable midKey, final Filter filter,
final boolean blockCacheEnabled)
throws IOException {
super(fs, dirName, conf, filter, blockCacheEnabled);
top = isTopFileRegion(r);
midkey = midKey;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void checkKey(final WritableComparable key) private void checkKey(final WritableComparable key)
throws IOException { throws IOException {

View File

@ -43,15 +43,15 @@ public class HTableDescriptor implements WritableComparable {
public static final HTableDescriptor rootTableDesc = public static final HTableDescriptor rootTableDesc =
new HTableDescriptor(HConstants.ROOT_TABLE_NAME, new HTableDescriptor(HConstants.ROOT_TABLE_NAME,
new HColumnDescriptor(HConstants.COLUMN_FAMILY, 1, new HColumnDescriptor(HConstants.COLUMN_FAMILY, 1,
HColumnDescriptor.CompressionType.NONE, false, Integer.MAX_VALUE, HColumnDescriptor.CompressionType.NONE, false, false,
null)); Integer.MAX_VALUE, null));
/** table descriptor for meta table */ /** table descriptor for meta table */
public static final HTableDescriptor metaTableDesc = public static final HTableDescriptor metaTableDesc =
new HTableDescriptor(HConstants.META_TABLE_NAME, new HTableDescriptor(HConstants.META_TABLE_NAME,
new HColumnDescriptor(HConstants.COLUMN_FAMILY, 1, new HColumnDescriptor(HConstants.COLUMN_FAMILY, 1,
HColumnDescriptor.CompressionType.NONE, false, Integer.MAX_VALUE, HColumnDescriptor.CompressionType.NONE, false, false,
null)); Integer.MAX_VALUE, null));
private boolean rootregion; private boolean rootregion;
private boolean metaregion; private boolean metaregion;

View File

@ -202,6 +202,8 @@ public class AlterCommand extends SchemaModificationCommand {
.get(spec)).toUpperCase()); .get(spec)).toUpperCase());
} else if (spec.equals("IN_MEMORY")) { } else if (spec.equals("IN_MEMORY")) {
inMemory = (Boolean) columnSpec.get(spec); inMemory = (Boolean) columnSpec.get(spec);
} else if (spec.equals("BLOCK_CACHE_ENABLED")) {
blockCacheEnabled = (Boolean) columnSpec.get(spec);
} else if (spec.equals("BLOOMFILTER")) { } else if (spec.equals("BLOOMFILTER")) {
bloomFilterType = BloomFilterType.valueOf(((String) columnSpec.get(spec)) bloomFilterType = BloomFilterType.valueOf(((String) columnSpec.get(spec))
.toUpperCase()); .toUpperCase());
@ -229,7 +231,8 @@ public class AlterCommand extends SchemaModificationCommand {
column = appendDelimiter(column); column = appendDelimiter(column);
HColumnDescriptor columnDesc = new HColumnDescriptor(new Text(column), HColumnDescriptor columnDesc = new HColumnDescriptor(new Text(column),
maxVersions, compression, inMemory, maxLength, bloomFilterDesc); maxVersions, compression, inMemory, blockCacheEnabled,
maxLength, bloomFilterDesc);
return columnDesc; return columnDesc;
} }
@ -243,6 +246,7 @@ public class AlterCommand extends SchemaModificationCommand {
maxLength = original.getMaxValueLength(); maxLength = original.getMaxValueLength();
compression = original.getCompression(); compression = original.getCompression();
inMemory = original.isInMemory(); inMemory = original.isInMemory();
blockCacheEnabled = original.isBlockCacheEnabled();
bloomFilterDesc = original.getBloomFilter(); bloomFilterDesc = original.getBloomFilter();
} }
} }

View File

@ -118,6 +118,7 @@ TOKEN: /** for HQL statements */
| <BLOCK: "block"> | <BLOCK: "block">
| <RECORD: "record"> | <RECORD: "record">
| <IN_MEMORY: "in_memory"> | <IN_MEMORY: "in_memory">
| <BLOCK_CACHE_ENABLED: "block_cache_enabled">
| <BLOOMFILTER: "bloomfilter"> | <BLOOMFILTER: "bloomfilter">
| <COUNTING_BLOOMFILTER: "counting_bloomfilter"> | <COUNTING_BLOOMFILTER: "counting_bloomfilter">
| <RETOUCHED_BLOOMFILTER: "retouched_bloomfilter"> | <RETOUCHED_BLOOMFILTER: "retouched_bloomfilter">
@ -353,6 +354,11 @@ Map<String, Object> ColumnSpec() :
{ {
columnSpec.put("IN_MEMORY", true); columnSpec.put("IN_MEMORY", true);
} }
|
<BLOCK_CACHE_ENABLED>
{
columnSpec.put("BLOCK_CACHE_ENABLED", true);
}
| |
<BLOOMFILTER> <BLOOMFILTER>
<EQUALS> <EQUALS>

View File

@ -37,6 +37,7 @@ public abstract class SchemaModificationCommand extends BasicCommand {
protected int maxLength; protected int maxLength;
protected HColumnDescriptor.CompressionType compression; protected HColumnDescriptor.CompressionType compression;
protected boolean inMemory; protected boolean inMemory;
protected boolean blockCacheEnabled;
protected BloomFilterDescriptor bloomFilterDesc; protected BloomFilterDescriptor bloomFilterDesc;
protected BloomFilterType bloomFilterType; protected BloomFilterType bloomFilterType;
protected int vectorSize; protected int vectorSize;
@ -52,6 +53,7 @@ public abstract class SchemaModificationCommand extends BasicCommand {
maxLength = HColumnDescriptor.DEFAULT_MAX_VALUE_LENGTH; maxLength = HColumnDescriptor.DEFAULT_MAX_VALUE_LENGTH;
compression = HColumnDescriptor.DEFAULT_COMPRESSION_TYPE; compression = HColumnDescriptor.DEFAULT_COMPRESSION_TYPE;
inMemory = HColumnDescriptor.DEFAULT_IN_MEMORY; inMemory = HColumnDescriptor.DEFAULT_IN_MEMORY;
blockCacheEnabled = HColumnDescriptor.DEFAULT_BLOCK_CACHE_ENABLED;
bloomFilterDesc = HColumnDescriptor.DEFAULT_BLOOM_FILTER_DESCRIPTOR; bloomFilterDesc = HColumnDescriptor.DEFAULT_BLOOM_FILTER_DESCRIPTOR;
} }
@ -76,6 +78,8 @@ public abstract class SchemaModificationCommand extends BasicCommand {
.valueOf(((String) columnSpec.get(spec)).toUpperCase()); .valueOf(((String) columnSpec.get(spec)).toUpperCase());
} else if (spec.equals("IN_MEMORY")) { } else if (spec.equals("IN_MEMORY")) {
inMemory = (Boolean) columnSpec.get(spec); inMemory = (Boolean) columnSpec.get(spec);
} else if (spec.equals("BLOCK_CACHE_ENABLED")) {
blockCacheEnabled = (Boolean) columnSpec.get(spec);
} else if (spec.equals("BLOOMFILTER")) { } else if (spec.equals("BLOOMFILTER")) {
bloomFilterType = BloomFilterType.valueOf(((String) columnSpec.get(spec)) bloomFilterType = BloomFilterType.valueOf(((String) columnSpec.get(spec))
.toUpperCase()); .toUpperCase());
@ -103,7 +107,8 @@ public abstract class SchemaModificationCommand extends BasicCommand {
column = appendDelimiter(column); column = appendDelimiter(column);
HColumnDescriptor columnDesc = new HColumnDescriptor(new Text(column), HColumnDescriptor columnDesc = new HColumnDescriptor(new Text(column),
maxVersions, compression, inMemory, maxLength, bloomFilterDesc); maxVersions, compression, inMemory, blockCacheEnabled,
maxLength, bloomFilterDesc);
return columnDesc; return columnDesc;
} }

View File

@ -75,7 +75,7 @@ public class HQLParser implements HQLParserConstants {
case SELECT: case SELECT:
case ENABLE: case ENABLE:
case DISABLE: case DISABLE:
case 68: case 69:
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) { switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
case HELP: case HELP:
case ALTER: case ALTER:
@ -100,7 +100,7 @@ public class HQLParser implements HQLParserConstants {
jj_la1[0] = jj_gen; jj_la1[0] = jj_gen;
; ;
} }
jj_consume_token(68); jj_consume_token(69);
break; break;
case 0: case 0:
jj_consume_token(0); jj_consume_token(0);
@ -390,6 +390,7 @@ public class HQLParser implements HQLParserConstants {
case MAX_LENGTH: case MAX_LENGTH:
case COMPRESSION: case COMPRESSION:
case IN_MEMORY: case IN_MEMORY:
case BLOCK_CACHE_ENABLED:
case BLOOMFILTER: case BLOOMFILTER:
case VECTOR_SIZE: case VECTOR_SIZE:
case NUM_HASH: case NUM_HASH:
@ -440,6 +441,10 @@ public class HQLParser implements HQLParserConstants {
jj_consume_token(IN_MEMORY); jj_consume_token(IN_MEMORY);
columnSpec.put("IN_MEMORY", true); columnSpec.put("IN_MEMORY", true);
break; break;
case BLOCK_CACHE_ENABLED:
jj_consume_token(BLOCK_CACHE_ENABLED);
columnSpec.put("BLOCK_CACHE_ENABLED", true);
break;
case BLOOMFILTER: case BLOOMFILTER:
jj_consume_token(BLOOMFILTER); jj_consume_token(BLOOMFILTER);
jj_consume_token(EQUALS); jj_consume_token(EQUALS);
@ -1080,6 +1085,22 @@ public class HQLParser implements HQLParserConstants {
finally { jj_save(0, xla); } finally { jj_save(0, xla); }
} }
final private boolean jj_3_1() {
if (jj_scan_token(ADD)) return true;
if (jj_3R_10()) return true;
return false;
}
final private boolean jj_3R_12() {
Token xsp;
xsp = jj_scanpos;
if (jj_scan_token(67)) {
jj_scanpos = xsp;
if (jj_scan_token(68)) return true;
}
return false;
}
final private boolean jj_3R_11() { final private boolean jj_3R_11() {
if (jj_scan_token(ID)) return true; if (jj_scan_token(ID)) return true;
return false; return false;
@ -1095,22 +1116,6 @@ public class HQLParser implements HQLParserConstants {
return false; return false;
} }
final private boolean jj_3_1() {
if (jj_scan_token(ADD)) return true;
if (jj_3R_10()) return true;
return false;
}
final private boolean jj_3R_12() {
Token xsp;
xsp = jj_scanpos;
if (jj_scan_token(66)) {
jj_scanpos = xsp;
if (jj_scan_token(67)) return true;
}
return false;
}
public HQLParserTokenManager token_source; public HQLParserTokenManager token_source;
SimpleCharStream jj_input_stream; SimpleCharStream jj_input_stream;
public Token token, jj_nt; public Token token, jj_nt;
@ -1133,10 +1138,10 @@ public class HQLParser implements HQLParserConstants {
jj_la1_0 = new int[] {0xf3ffe0,0xf3ffe1,0xf3ffe0,0x0,0x0,0x0,0x0,0x33dbc0,0x33dbc0,0x0,0x600,0x0,0x0,0x0,0x0,0x0,0x0,0x1000,0x0,0x80000000,0x0,0x2000000,0x0,0x3000000,0x8000000,0x3000000,0x80000000,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,}; jj_la1_0 = new int[] {0xf3ffe0,0xf3ffe1,0xf3ffe0,0x0,0x0,0x0,0x0,0x33dbc0,0x33dbc0,0x0,0x600,0x0,0x0,0x0,0x0,0x0,0x0,0x1000,0x0,0x80000000,0x0,0x2000000,0x0,0x3000000,0x8000000,0x3000000,0x80000000,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,};
} }
private static void jj_la1_1() { private static void jj_la1_1() {
jj_la1_1 = new int[] {0x0,0x0,0x0,0x40000000,0xc0000000,0xc0000000,0x40000000,0x40000000,0x40000000,0x40000000,0x0,0x731c000,0xe0000,0xe00000,0x731c000,0x10,0x10,0x18000000,0x0,0x0,0x0,0x0,0xe0002000,0x0,0x0,0x0,0x0,0x1,0x2,0x10,0x0,0xc0002000,0xc0002000,0xc0002000,0x0,0xc0002000,0x10,0x10,0x10,0xc0000000,0x0,0x40000000,}; jj_la1_1 = new int[] {0x0,0x0,0x0,0x80000000,0x80000000,0x80000000,0x80000000,0x80000000,0x80000000,0x80000000,0x0,0xe71c000,0xe0000,0x1c00000,0xe71c000,0x10,0x10,0x30000000,0x0,0x0,0x0,0x0,0xc0002000,0x0,0x0,0x0,0x0,0x1,0x2,0x10,0x0,0x80002000,0x80002000,0x80002000,0x0,0x80002000,0x10,0x10,0x10,0x80000000,0x0,0x80000000,};
} }
private static void jj_la1_2() { private static void jj_la1_2() {
jj_la1_2 = new int[] {0x0,0x10,0x0,0x0,0x1,0x1,0xc,0x0,0x0,0xc,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0xc,0x0,0xc,0x0,0xc,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0xc,0x0,0xc,0x0,0xc,0xc,0x0,0x0,0x0,0x0,0xc,0xc,}; jj_la1_2 = new int[] {0x0,0x20,0x0,0x0,0x3,0x3,0x18,0x0,0x0,0x18,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x18,0x0,0x18,0x0,0x19,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x18,0x1,0x19,0x1,0x18,0x19,0x0,0x0,0x0,0x1,0x18,0x18,};
} }
final private JJCalls[] jj_2_rtns = new JJCalls[1]; final private JJCalls[] jj_2_rtns = new JJCalls[1];
private boolean jj_rescan = false; private boolean jj_rescan = false;
@ -1313,8 +1318,8 @@ public class HQLParser implements HQLParserConstants {
public ParseException generateParseException() { public ParseException generateParseException() {
jj_expentries.removeAllElements(); jj_expentries.removeAllElements();
boolean[] la1tokens = new boolean[69]; boolean[] la1tokens = new boolean[70];
for (int i = 0; i < 69; i++) { for (int i = 0; i < 70; i++) {
la1tokens[i] = false; la1tokens[i] = false;
} }
if (jj_kind >= 0) { if (jj_kind >= 0) {
@ -1336,7 +1341,7 @@ public class HQLParser implements HQLParserConstants {
} }
} }
} }
for (int i = 0; i < 69; i++) { for (int i = 0; i < 70; i++) {
if (la1tokens[i]) { if (la1tokens[i]) {
jj_expentry = new int[1]; jj_expentry = new int[1];
jj_expentry[0] = i; jj_expentry[0] = i;

View File

@ -52,21 +52,22 @@ public interface HQLParserConstants {
int BLOCK = 50; int BLOCK = 50;
int RECORD = 51; int RECORD = 51;
int IN_MEMORY = 52; int IN_MEMORY = 52;
int BLOOMFILTER = 53; int BLOCK_CACHE_ENABLED = 53;
int COUNTING_BLOOMFILTER = 54; int BLOOMFILTER = 54;
int RETOUCHED_BLOOMFILTER = 55; int COUNTING_BLOOMFILTER = 55;
int VECTOR_SIZE = 56; int RETOUCHED_BLOOMFILTER = 56;
int NUM_HASH = 57; int VECTOR_SIZE = 57;
int NUM_ENTRIES = 58; int NUM_HASH = 58;
int ADD = 59; int NUM_ENTRIES = 59;
int CHANGE = 60; int ADD = 60;
int COUNT = 61; int CHANGE = 61;
int ID = 62; int COUNT = 62;
int INTEGER_LITERAL = 63; int ID = 63;
int FLOATING_POINT_LITERAL = 64; int INTEGER_LITERAL = 64;
int EXPONENT = 65; int FLOATING_POINT_LITERAL = 65;
int QUOTED_IDENTIFIER = 66; int EXPONENT = 66;
int STRING_LITERAL = 67; int QUOTED_IDENTIFIER = 67;
int STRING_LITERAL = 68;
int DEFAULT = 0; int DEFAULT = 0;
@ -124,6 +125,7 @@ public interface HQLParserConstants {
"\"block\"", "\"block\"",
"\"record\"", "\"record\"",
"\"in_memory\"", "\"in_memory\"",
"\"block_cache_enabled\"",
"\"bloomfilter\"", "\"bloomfilter\"",
"\"counting_bloomfilter\"", "\"counting_bloomfilter\"",
"\"retouched_bloomfilter\"", "\"retouched_bloomfilter\"",

View File

@ -0,0 +1,206 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import org.apache.commons.collections.map.ReferenceMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.DataInputBuffer;
/**
* An implementation of {@link FSInputStream} that reads the stream in blocks
* of a fixed, configurable size. The blocks are stored in a memory-sensitive cache.
*/
public class BlockFSInputStream extends FSInputStream {
static final Log LOG = LogFactory.getLog(BlockFSInputStream.class);
private final InputStream in;
private final long fileLength;
private final int blockSize;
private final Map<Long, byte[]> blocks;
private boolean closed;
private DataInputBuffer blockStream = new DataInputBuffer();
private long blockEnd = -1;
private long pos = 0;
/**
* @param in
* @param fileLength
* @param blockSize the size of each block in bytes.
*/
@SuppressWarnings("unchecked")
public BlockFSInputStream(InputStream in, long fileLength, int blockSize) {
this.in = in;
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
throw new IllegalArgumentException(
"In is not an instance of Seekable or PositionedReadable");
}
this.fileLength = fileLength;
this.blockSize = blockSize;
// a memory-sensitive map that has soft references to values
this.blocks = new ReferenceMap() {
private long hits, misses;
@Override
public Object get(Object key) {
Object value = super.get(key);
if (value == null) {
misses++;
} else {
hits++;
}
if (LOG.isDebugEnabled() && ((hits + misses) % 10000) == 0) {
long hitRate = (100 * hits) / (hits + misses);
LOG.info("Hit rate for cache " + hashCode() + ": " + hitRate + "%");
}
return value;
}
};
}
@Override
public synchronized long getPos() throws IOException {
return pos;
}
@Override
public synchronized int available() throws IOException {
return (int) (fileLength - pos);
}
@Override
public synchronized void seek(long targetPos) throws IOException {
if (targetPos > fileLength) {
throw new IOException("Cannot seek after EOF");
}
pos = targetPos;
blockEnd = -1;
}
@Override
public synchronized boolean seekToNewSource(long targetPos)
throws IOException {
return false;
}
@Override
public synchronized int read() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
int result = -1;
if (pos < fileLength) {
if (pos > blockEnd) {
blockSeekTo(pos);
}
result = blockStream.read();
if (result >= 0) {
pos++;
}
}
return result;
}
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
if (pos < fileLength) {
if (pos > blockEnd) {
blockSeekTo(pos);
}
int realLen = Math.min(len, (int) (blockEnd - pos + 1));
int result = blockStream.read(buf, off, realLen);
if (result >= 0) {
pos += result;
}
return result;
}
return -1;
}
private synchronized void blockSeekTo(long target) throws IOException {
int targetBlock = (int) (target / blockSize);
long targetBlockStart = targetBlock * blockSize;
long targetBlockEnd = Math.min(targetBlockStart + blockSize, fileLength) - 1;
long blockLength = targetBlockEnd - targetBlockStart + 1;
long offsetIntoBlock = target - targetBlockStart;
byte[] block = blocks.get(targetBlockStart);
if (block == null) {
block = new byte[blockSize];
((PositionedReadable) in).readFully(targetBlockStart, block, 0,
(int) blockLength);
blocks.put(targetBlockStart, block);
}
this.pos = target;
this.blockEnd = targetBlockEnd;
this.blockStream.reset(block, (int) offsetIntoBlock,
(int) (blockLength - offsetIntoBlock));
}
@Override
public void close() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
if (blockStream != null) {
blockStream.close();
blockStream = null;
}
super.close();
closed = true;
}
/**
* We don't support marks.
*/
@Override
public boolean markSupported() {
return false;
}
@Override
public void mark(int readLimit) {
// Do nothing
}
@Override
public void reset() throws IOException {
throw new IOException("Mark not supported");
}
}

View File

@ -64,7 +64,8 @@ struct ColumnDescriptor {
5:i32 maxValueLength = 2147483647, 5:i32 maxValueLength = 2147483647,
6:string bloomFilterType = "NONE", 6:string bloomFilterType = "NONE",
7:i32 bloomFilterVectorSize = 0, 7:i32 bloomFilterVectorSize = 0,
8:i32 bloomFilterNbHashes = 0 8:i32 bloomFilterNbHashes = 0,
9:bool blockCacheEnabled = 0
} }
/** /**

View File

@ -58,7 +58,8 @@ public class ThriftUtilities {
throw new IllegalArgument("column name is empty"); throw new IllegalArgument("column name is empty");
} }
HColumnDescriptor col = new HColumnDescriptor(new Text(in.name), HColumnDescriptor col = new HColumnDescriptor(new Text(in.name),
in.maxVersions, comp, in.inMemory, in.maxValueLength, bloom); in.maxVersions, comp, in.inMemory, in.blockCacheEnabled,
in.maxValueLength, bloom);
return col; return col;
} }
@ -76,6 +77,7 @@ public class ThriftUtilities {
col.maxVersions = in.getMaxVersions(); col.maxVersions = in.getMaxVersions();
col.compression = in.getCompression().toString(); col.compression = in.getCompression().toString();
col.inMemory = in.isInMemory(); col.inMemory = in.isInMemory();
col.blockCacheEnabled = in.isBlockCacheEnabled();
col.maxValueLength = in.getMaxValueLength(); col.maxValueLength = in.getMaxValueLength();
BloomFilterDescriptor bloom = in.getBloomFilter(); BloomFilterDescriptor bloom = in.getBloomFilter();
if (bloom != null) { if (bloom != null) {

View File

@ -46,6 +46,7 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
public String bloomFilterType; public String bloomFilterType;
public int bloomFilterVectorSize; public int bloomFilterVectorSize;
public int bloomFilterNbHashes; public int bloomFilterNbHashes;
public boolean blockCacheEnabled;
public final Isset __isset = new Isset(); public final Isset __isset = new Isset();
public static final class Isset { public static final class Isset {
@ -57,6 +58,7 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
public boolean bloomFilterType = false; public boolean bloomFilterType = false;
public boolean bloomFilterVectorSize = false; public boolean bloomFilterVectorSize = false;
public boolean bloomFilterNbHashes = false; public boolean bloomFilterNbHashes = false;
public boolean blockCacheEnabled = false;
} }
public ColumnDescriptor() { public ColumnDescriptor() {
@ -74,6 +76,8 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
this.bloomFilterNbHashes = 0; this.bloomFilterNbHashes = 0;
this.blockCacheEnabled = false;
} }
public ColumnDescriptor( public ColumnDescriptor(
@ -84,7 +88,8 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
int maxValueLength, int maxValueLength,
String bloomFilterType, String bloomFilterType,
int bloomFilterVectorSize, int bloomFilterVectorSize,
int bloomFilterNbHashes) int bloomFilterNbHashes,
boolean blockCacheEnabled)
{ {
this(); this();
this.name = name; this.name = name;
@ -103,6 +108,8 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
this.__isset.bloomFilterVectorSize = true; this.__isset.bloomFilterVectorSize = true;
this.bloomFilterNbHashes = bloomFilterNbHashes; this.bloomFilterNbHashes = bloomFilterNbHashes;
this.__isset.bloomFilterNbHashes = true; this.__isset.bloomFilterNbHashes = true;
this.blockCacheEnabled = blockCacheEnabled;
this.__isset.blockCacheEnabled = true;
} }
public void read(TProtocol iprot) throws TException { public void read(TProtocol iprot) throws TException {
@ -180,6 +187,14 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
TProtocolUtil.skip(iprot, field.type); TProtocolUtil.skip(iprot, field.type);
} }
break; break;
case 9:
if (field.type == TType.BOOL) {
this.blockCacheEnabled = iprot.readBool();
this.__isset.blockCacheEnabled = true;
} else {
TProtocolUtil.skip(iprot, field.type);
}
break;
default: default:
TProtocolUtil.skip(iprot, field.type); TProtocolUtil.skip(iprot, field.type);
break; break;
@ -247,6 +262,12 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
oprot.writeFieldBegin(field); oprot.writeFieldBegin(field);
oprot.writeI32(this.bloomFilterNbHashes); oprot.writeI32(this.bloomFilterNbHashes);
oprot.writeFieldEnd(); oprot.writeFieldEnd();
field.name = "blockCacheEnabled";
field.type = TType.BOOL;
field.id = 9;
oprot.writeFieldBegin(field);
oprot.writeBool(this.blockCacheEnabled);
oprot.writeFieldEnd();
oprot.writeFieldStop(); oprot.writeFieldStop();
oprot.writeStructEnd(); oprot.writeStructEnd();
} }
@ -269,6 +290,8 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
sb.append(this.bloomFilterVectorSize); sb.append(this.bloomFilterVectorSize);
sb.append(",bloomFilterNbHashes:"); sb.append(",bloomFilterNbHashes:");
sb.append(this.bloomFilterNbHashes); sb.append(this.bloomFilterNbHashes);
sb.append(",blockCacheEnabled:");
sb.append(this.blockCacheEnabled);
sb.append(")"); sb.append(")");
return sb.toString(); return sb.toString();
} }

View File

@ -184,11 +184,11 @@ public abstract class HBaseTestCase extends TestCase {
final int versions) { final int versions) {
HTableDescriptor htd = new HTableDescriptor(name); HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME1), versions, htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME1), versions,
CompressionType.NONE, false, Integer.MAX_VALUE, null)); CompressionType.NONE, false, false, Integer.MAX_VALUE, null));
htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME2), versions, htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME2), versions,
CompressionType.NONE, false, Integer.MAX_VALUE, null)); CompressionType.NONE, false, false, Integer.MAX_VALUE, null));
htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME3), versions, htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME3), versions,
CompressionType.NONE, false, Integer.MAX_VALUE, null)); CompressionType.NONE, false, false, Integer.MAX_VALUE, null));
return htd; return htd;
} }

View File

@ -169,6 +169,7 @@ public class TestBloomFilters extends HBaseClusterTestCase {
1, // Max versions 1, // Max versions
HColumnDescriptor.CompressionType.NONE, // no compression HColumnDescriptor.CompressionType.NONE, // no compression
HColumnDescriptor.DEFAULT_IN_MEMORY, // not in memory HColumnDescriptor.DEFAULT_IN_MEMORY, // not in memory
HColumnDescriptor.DEFAULT_BLOCK_CACHE_ENABLED,
HColumnDescriptor.DEFAULT_MAX_VALUE_LENGTH, HColumnDescriptor.DEFAULT_MAX_VALUE_LENGTH,
bloomFilter bloomFilter
) )
@ -234,6 +235,7 @@ public class TestBloomFilters extends HBaseClusterTestCase {
1, // Max versions 1, // Max versions
HColumnDescriptor.CompressionType.NONE, // no compression HColumnDescriptor.CompressionType.NONE, // no compression
HColumnDescriptor.DEFAULT_IN_MEMORY, // not in memory HColumnDescriptor.DEFAULT_IN_MEMORY, // not in memory
HColumnDescriptor.DEFAULT_BLOCK_CACHE_ENABLED,
HColumnDescriptor.DEFAULT_MAX_VALUE_LENGTH, HColumnDescriptor.DEFAULT_MAX_VALUE_LENGTH,
bloomFilter bloomFilter
) )

View File

@ -337,7 +337,7 @@ public class TestTimestamp extends HBaseTestCase {
private HRegion createRegion() throws IOException { private HRegion createRegion() throws IOException {
HTableDescriptor htd = createTableDescriptor(getName()); HTableDescriptor htd = createTableDescriptor(getName());
htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS, htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS,
CompressionType.NONE, false, Integer.MAX_VALUE, null)); CompressionType.NONE, false, false, Integer.MAX_VALUE, null));
return createNewHRegion(htd, null, null); return createNewHRegion(htd, null, null);
} }
} }

View File

@ -44,8 +44,9 @@ public class TestToString extends TestCase {
HTableDescriptor htd = HTableDescriptor.rootTableDesc; HTableDescriptor htd = HTableDescriptor.rootTableDesc;
System. out.println(htd.toString()); System. out.println(htd.toString());
assertEquals("Table descriptor", "name: -ROOT-, families: {info:={name: " + assertEquals("Table descriptor", "name: -ROOT-, families: {info:={name: " +
"info, max versions: 1, compression: NONE, in memory: false, max " + "info, max versions: 1, compression: NONE, in memory: false, " +
"length: 2147483647, bloom filter: none}}", htd.toString()); "block cache enabled: false, max length: 2147483647, " +
"bloom filter: none}}", htd.toString());
} }
/** /**
@ -57,7 +58,7 @@ public class TestToString extends TestCase {
assertEquals("HRegionInfo", assertEquals("HRegionInfo",
"regionname: -ROOT-,,0, startKey: <>, endKey: <>, encodedName: 70236052, tableDesc: " + "regionname: -ROOT-,,0, startKey: <>, endKey: <>, encodedName: 70236052, tableDesc: " +
"{name: -ROOT-, families: {info:={name: info, max versions: 1, " + "{name: -ROOT-, families: {info:={name: info, max versions: 1, " +
"compression: NONE, in memory: false, max length: 2147483647, bloom " + "compression: NONE, in memory: false, block cache enabled: false, " +
"filter: none}}}", hri.toString()); "max length: 2147483647, bloom filter: none}}}", hri.toString());
} }
} }

View File

@ -0,0 +1,121 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import junit.framework.TestCase;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.io.DataInputBuffer;
public class TestBlockFSInputStream extends TestCase {
static class InMemoryFSInputStream extends FSInputStream {
private byte[] data;
private DataInputBuffer din = new DataInputBuffer();
public InMemoryFSInputStream(byte[] data) {
this.data = data;
din.reset(data, data.length);
}
@Override
public long getPos() throws IOException {
return din.getPosition();
}
@Override
public void seek(long pos) throws IOException {
if (pos > data.length) {
throw new IOException("Cannot seek after EOF");
}
din.reset(data, (int) pos, data.length - (int) pos);
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public int read() throws IOException {
return din.read();
}
}
private byte[] data;
private BlockFSInputStream stream;
@Override
protected void setUp() throws Exception {
data = new byte[34];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) i;
}
FSInputStream byteStream = new InMemoryFSInputStream(data);
stream = new BlockFSInputStream(byteStream, 34, 10);
}
public void testReadForwards() throws IOException {
for (int i = 0; i < data.length; i++) {
assertEquals(i, stream.getPos());
assertEquals(i, stream.read());
}
}
public void testReadBackwards() throws IOException {
for (int i = data.length - 1; i >= 0; i--) {
stream.seek(i);
assertEquals(i, stream.getPos());
assertEquals(i, stream.read());
}
}
public void testReadInChunks() throws IOException {
byte[] buf = new byte[data.length];
int chunkLength = 6;
assertEquals(6, stream.read(buf, 0, chunkLength));
assertEquals(4, stream.read(buf, 6, chunkLength));
assertEquals(6, stream.read(buf, 10, chunkLength));
assertEquals(4, stream.read(buf, 16, chunkLength));
assertEquals(6, stream.read(buf, 20, chunkLength));
assertEquals(4, stream.read(buf, 26, chunkLength));
assertEquals(4, stream.read(buf, 30, chunkLength));
assertEquals(0, stream.available());
assertEquals(-1, stream.read());
for (int i = 0; i < buf.length; i++) {
assertEquals(i, buf[i]);
}
}
}