HBASE-975 Improve MapFile performance for start and end key
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@711155 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
97fbf7da76
commit
d6997a949f
|
@ -96,6 +96,7 @@ Release 0.19.0 - Unreleased
|
|||
instead (requires hadoop 0.19)
|
||||
HBASE-81 When a scanner lease times out, throw a more "user friendly" exception
|
||||
HBASE-978 Remove BloomFilterDescriptor. It is no longer used.
|
||||
HBASE-975 Improve MapFile performance for start and end key
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
|
||||
|
|
|
@ -225,7 +225,6 @@ public class HTable {
|
|||
* @return Array of region starting row keys
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("null")
|
||||
public byte[][] getStartKeys() throws IOException {
|
||||
final List<byte[]> keyList = new ArrayList<byte[]>();
|
||||
|
||||
|
@ -1154,7 +1153,6 @@ public class HTable {
|
|||
private byte[][] columns;
|
||||
private byte [] startRow;
|
||||
protected long scanTime;
|
||||
@SuppressWarnings("hiding")
|
||||
private boolean closed = false;
|
||||
private HRegionInfo currentRegion = null;
|
||||
private ScannerCallable callable = null;
|
||||
|
|
|
@ -59,7 +59,6 @@ public class BlockFSInputStream extends FSInputStream {
|
|||
* @param fileLength
|
||||
* @param blockSize the size of each block in bytes.
|
||||
*/
|
||||
@SuppressWarnings({"unchecked", "serial"})
|
||||
public BlockFSInputStream(InputStream in, long fileLength, int blockSize) {
|
||||
this.in = in;
|
||||
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
|
||||
|
@ -157,12 +156,12 @@ public class BlockFSInputStream extends FSInputStream {
|
|||
long blockLength = targetBlockEnd - targetBlockStart + 1;
|
||||
long offsetIntoBlock = target - targetBlockStart;
|
||||
|
||||
byte[] block = blocks.get(targetBlockStart);
|
||||
byte[] block = blocks.get(Long.valueOf(targetBlockStart));
|
||||
if (block == null) {
|
||||
block = new byte[blockSize];
|
||||
((PositionedReadable) in).readFully(targetBlockStart, block, 0,
|
||||
(int) blockLength);
|
||||
blocks.put(targetBlockStart, block);
|
||||
blocks.put(Long.valueOf(targetBlockStart), block);
|
||||
}
|
||||
|
||||
this.pos = target;
|
||||
|
|
|
@ -0,0 +1,231 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.util.Hash;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.onelab.filter.BloomFilter;
|
||||
|
||||
/**
|
||||
* On write, all keys are added to a bloom filter. On read, all keys are
|
||||
* tested first against bloom filter. Keys are HStoreKey. If passed bloom
|
||||
* filter is null, just passes invocation to parent.
|
||||
*/
|
||||
public class BloomFilterMapFile extends HBaseMapFile {
|
||||
private static final Log LOG = LogFactory.getLog(BloomFilterMapFile.class);
|
||||
protected static final String BLOOMFILTER_FILE_NAME = "filter";
|
||||
|
||||
public static class Reader extends HBaseReader {
|
||||
private final BloomFilter bloomFilter;
|
||||
|
||||
/**
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param conf
|
||||
* @param filter
|
||||
* @param blockCacheEnabled
|
||||
* @param hri
|
||||
* @throws IOException
|
||||
*/
|
||||
public Reader(FileSystem fs, String dirName, Configuration conf,
|
||||
final boolean filter, final boolean blockCacheEnabled,
|
||||
HRegionInfo hri)
|
||||
throws IOException {
|
||||
super(fs, dirName, conf, blockCacheEnabled, hri);
|
||||
if (filter) {
|
||||
this.bloomFilter = loadBloomFilter(fs, dirName);
|
||||
} else {
|
||||
this.bloomFilter = null;
|
||||
}
|
||||
}
|
||||
|
||||
private BloomFilter loadBloomFilter(FileSystem fs, String dirName)
|
||||
throws IOException {
|
||||
Path filterFile = new Path(dirName, BLOOMFILTER_FILE_NAME);
|
||||
if(!fs.exists(filterFile)) {
|
||||
throw new FileNotFoundException("Could not find bloom filter: " +
|
||||
filterFile);
|
||||
}
|
||||
BloomFilter filter = new BloomFilter();
|
||||
FSDataInputStream in = fs.open(filterFile);
|
||||
try {
|
||||
filter.readFields(in);
|
||||
} finally {
|
||||
in.close();
|
||||
}
|
||||
return filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Writable get(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
if (bloomFilter == null) {
|
||||
return super.get(key, val);
|
||||
}
|
||||
if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key exists");
|
||||
}
|
||||
return super.get(key, val);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key does not exist");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public WritableComparable getClosest(WritableComparable key,
|
||||
Writable val) throws IOException {
|
||||
if (bloomFilter == null) {
|
||||
return super.getClosest(key, val);
|
||||
}
|
||||
// Note - the key being passed to us is always a HStoreKey
|
||||
if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key exists");
|
||||
}
|
||||
return super.getClosest(key, val);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key does not exist");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return size of the bloom filter
|
||||
*/
|
||||
public int getBloomFilterSize() {
|
||||
return bloomFilter == null ? 0 : bloomFilter.getVectorSize();
|
||||
}
|
||||
}
|
||||
|
||||
public static class Writer extends HBaseWriter {
|
||||
private static final double DEFAULT_NUMBER_OF_HASH_FUNCTIONS = 4.0;
|
||||
private final BloomFilter bloomFilter;
|
||||
private final String dirName;
|
||||
private final FileSystem fs;
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param compression
|
||||
* @param filter
|
||||
* @param nrows
|
||||
* @param hri
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Writer(Configuration conf, FileSystem fs, String dirName,
|
||||
SequenceFile.CompressionType compression, final boolean filter,
|
||||
int nrows, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
super(conf, fs, dirName, compression, hri);
|
||||
this.dirName = dirName;
|
||||
this.fs = fs;
|
||||
if (filter) {
|
||||
/*
|
||||
* There is no way to automatically determine the vector size and the
|
||||
* number of hash functions to use. In particular, bloom filters are
|
||||
* very sensitive to the number of elements inserted into them. For
|
||||
* HBase, the number of entries depends on the size of the data stored
|
||||
* in the column. Currently the default region size is 256MB, so the
|
||||
* number of entries is approximately
|
||||
* 256MB / (average value size for column).
|
||||
*
|
||||
* If m denotes the number of bits in the Bloom filter (vectorSize),
|
||||
* n denotes the number of elements inserted into the Bloom filter and
|
||||
* k represents the number of hash functions used (nbHash), then
|
||||
* according to Broder and Mitzenmacher,
|
||||
*
|
||||
* ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf )
|
||||
*
|
||||
* the probability of false positives is minimized when k is
|
||||
* approximately m/n ln(2).
|
||||
*
|
||||
* If we fix the number of hash functions and know the number of
|
||||
* entries, then the optimal vector size m = (k * n) / ln(2)
|
||||
*/
|
||||
this.bloomFilter = new BloomFilter(
|
||||
(int) Math.ceil(
|
||||
(DEFAULT_NUMBER_OF_HASH_FUNCTIONS * (1.0 * nrows)) /
|
||||
Math.log(2.0)),
|
||||
(int) DEFAULT_NUMBER_OF_HASH_FUNCTIONS,
|
||||
Hash.getHashType(conf)
|
||||
);
|
||||
} else {
|
||||
this.bloomFilter = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
if (bloomFilter != null) {
|
||||
bloomFilter.add(getBloomFilterKey(key));
|
||||
}
|
||||
super.append(key, val);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
super.close();
|
||||
if (this.bloomFilter != null) {
|
||||
flushBloomFilter();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes bloom filter to disk
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void flushBloomFilter() throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("flushing bloom filter for " + this.dirName);
|
||||
}
|
||||
FSDataOutputStream out =
|
||||
fs.create(new Path(dirName, BLOOMFILTER_FILE_NAME));
|
||||
try {
|
||||
bloomFilter.write(out);
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("flushed bloom filter for " + this.dirName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,219 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.onelab.filter.Key;
|
||||
|
||||
/**
|
||||
* Hbase customizations of MapFile.
|
||||
*/
|
||||
public class HBaseMapFile extends MapFile {
|
||||
private static final Log LOG = LogFactory.getLog(HBaseMapFile.class);
|
||||
public static final Class<? extends Writable> VALUE_CLASS =
|
||||
ImmutableBytesWritable.class;
|
||||
|
||||
/**
|
||||
* Custom bloom filter key maker.
|
||||
* @param key
|
||||
* @return Key made of bytes of row only.
|
||||
*/
|
||||
protected static Key getBloomFilterKey(WritableComparable key) {
|
||||
return new Key(((HStoreKey) key).getRow());
|
||||
}
|
||||
|
||||
/**
|
||||
* A reader capable of reading and caching blocks of the data file.
|
||||
*/
|
||||
public static class HBaseReader extends MapFile.Reader {
|
||||
private final boolean blockCacheEnabled;
|
||||
private final HStoreKey firstKey;
|
||||
private final HStoreKey finalKey;
|
||||
private final String dirName;
|
||||
|
||||
/**
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param conf
|
||||
* @param hri
|
||||
* @throws IOException
|
||||
*/
|
||||
public HBaseReader(FileSystem fs, String dirName, Configuration conf,
|
||||
HRegionInfo hri)
|
||||
throws IOException {
|
||||
this(fs, dirName, conf, false, hri);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param conf
|
||||
* @param blockCacheEnabled
|
||||
* @param hri
|
||||
* @throws IOException
|
||||
*/
|
||||
public HBaseReader(FileSystem fs, String dirName, Configuration conf,
|
||||
boolean blockCacheEnabled, HRegionInfo hri)
|
||||
throws IOException {
|
||||
super(fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri),
|
||||
conf, false); // defer opening streams
|
||||
this.dirName = dirName;
|
||||
this.blockCacheEnabled = blockCacheEnabled;
|
||||
open(fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri), conf);
|
||||
|
||||
// Force reading of the mapfile index by calling midKey.
|
||||
// Reading the index will bring the index into memory over
|
||||
// here on the client and then close the index file freeing
|
||||
// up socket connection and resources in the datanode.
|
||||
// Usually, the first access on a MapFile.Reader will load the
|
||||
// index force the issue in HStoreFile MapFiles because an
|
||||
// access may not happen for some time; meantime we're
|
||||
// using up datanode resources. See HADOOP-2341.
|
||||
// Of note, midKey just goes to index. Does not seek.
|
||||
midKey();
|
||||
|
||||
// Read in the first and last key. Cache them. Make sure we are at start
|
||||
// of the file.
|
||||
reset();
|
||||
HStoreKey key = new HStoreKey();
|
||||
super.next(key, new ImmutableBytesWritable());
|
||||
key.setHRegionInfo(hri);
|
||||
this.firstKey = key;
|
||||
// Set us back to start of file. Call to finalKey restores whatever
|
||||
// the previous position.
|
||||
reset();
|
||||
|
||||
// Get final key.
|
||||
key = new HStoreKey();
|
||||
super.finalKey(key);
|
||||
key.setHRegionInfo(hri);
|
||||
this.finalKey = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected org.apache.hadoop.io.SequenceFile.Reader createDataFileReader(
|
||||
FileSystem fs, Path dataFile, Configuration conf)
|
||||
throws IOException {
|
||||
if (!blockCacheEnabled) {
|
||||
return super.createDataFileReader(fs, dataFile, conf);
|
||||
}
|
||||
final int blockSize = conf.getInt("hbase.hstore.blockCache.blockSize",
|
||||
64 * 1024);
|
||||
return new SequenceFile.Reader(fs, dataFile, conf) {
|
||||
@Override
|
||||
protected FSDataInputStream openFile(FileSystem fs, Path file,
|
||||
int bufferSize, long length) throws IOException {
|
||||
|
||||
return new FSDataInputStream(new BlockFSInputStream(
|
||||
super.openFile(fs, file, bufferSize, length), length,
|
||||
blockSize));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void finalKey(final WritableComparable fk)
|
||||
throws IOException {
|
||||
Writables.copyWritable(this.finalKey, fk);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param hsk
|
||||
* @return True if the file *could* contain <code>hsk</code> and false if
|
||||
* outside the bounds of this files' start and end keys.
|
||||
*/
|
||||
public boolean containsKey(final HStoreKey hsk) {
|
||||
return this.firstKey.compareTo(hsk) <= 0 &&
|
||||
this.finalKey.compareTo(hsk) >= 0;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
HStoreKey midkey = null;
|
||||
try {
|
||||
midkey = (HStoreKey)midKey();
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Failed get of midkey", ioe);
|
||||
}
|
||||
return "dirName=" + this.dirName + ", firstKey=" +
|
||||
this.firstKey.toString() + ", midKey=" + midkey +
|
||||
", finalKey=" + this.finalKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return First key in this file. Can be null around construction time.
|
||||
*/
|
||||
public HStoreKey getFirstKey() {
|
||||
return this.firstKey;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Final key in file. Can be null around construction time.
|
||||
*/
|
||||
public HStoreKey getFinalKey() {
|
||||
return this.finalKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized WritableComparable getClosest(WritableComparable key,
|
||||
Writable value, boolean before)
|
||||
throws IOException {
|
||||
if ((!before && ((HStoreKey)key).compareTo(this.finalKey) > 0) ||
|
||||
(before && ((HStoreKey)key).compareTo(this.firstKey) < 0)) {
|
||||
return null;
|
||||
}
|
||||
return super.getClosest(key, value, before);
|
||||
}
|
||||
}
|
||||
|
||||
public static class HBaseWriter extends MapFile.Writer {
|
||||
/**
|
||||
* @param conf
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param compression
|
||||
* @param hri
|
||||
* @throws IOException
|
||||
*/
|
||||
public HBaseWriter(Configuration conf, FileSystem fs, String dirName,
|
||||
SequenceFile.CompressionType compression, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
super(conf, fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri),
|
||||
VALUE_CLASS, compression);
|
||||
// Default for mapfiles is 128. Makes random reads faster if we
|
||||
// have more keys indexed and we're not 'next'-ing around in the
|
||||
// mapfile.
|
||||
setIndexInterval(conf.getInt("hbase.io.index.interval", 128));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,228 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.io.Reference.Range;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
|
||||
/**
|
||||
* A facade for a {@link MapFile.Reader} that serves up either the top or
|
||||
* bottom half of a MapFile where 'bottom' is the first half of the file
|
||||
* containing the keys that sort lowest and 'top' is the second half of the
|
||||
* file with keys that sort greater than those of the bottom half. The top
|
||||
* includes the split files midkey, of the key that follows if it does not
|
||||
* exist in the file.
|
||||
*
|
||||
* <p>This type works in tandem with the {@link Reference} type. This class
|
||||
* is used reading while Reference is used writing.
|
||||
*
|
||||
* <p>This file is not splitable. Calls to {@link #midKey()} return null.
|
||||
*/
|
||||
public class HalfMapFileReader extends BloomFilterMapFile.Reader {
|
||||
private final boolean top;
|
||||
private final HStoreKey midkey;
|
||||
private boolean firstNextCall = true;
|
||||
private final WritableComparable<HStoreKey> firstKey;
|
||||
private final WritableComparable<HStoreKey> finalKey;
|
||||
|
||||
public HalfMapFileReader(final FileSystem fs, final String dirName,
|
||||
final Configuration conf, final Range r,
|
||||
final WritableComparable<HStoreKey> mk,
|
||||
final HRegionInfo hri)
|
||||
throws IOException {
|
||||
this(fs, dirName, conf, r, mk, false, false, hri);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public HalfMapFileReader(final FileSystem fs, final String dirName,
|
||||
final Configuration conf, final Range r,
|
||||
final WritableComparable<HStoreKey> mk, final boolean filter,
|
||||
final boolean blockCacheEnabled,
|
||||
final HRegionInfo hri)
|
||||
throws IOException {
|
||||
super(fs, dirName, conf, filter, blockCacheEnabled, hri);
|
||||
// This is not actual midkey for this half-file; its just border
|
||||
// around which we split top and bottom. Have to look in files to find
|
||||
// actual last and first keys for bottom and top halves. Half-files don't
|
||||
// have an actual midkey themselves. No midkey is how we indicate file is
|
||||
// not splittable.
|
||||
this.midkey = new HStoreKey((HStoreKey)mk);
|
||||
this.midkey.setHRegionInfo(hri);
|
||||
// Is it top or bottom half?
|
||||
this.top = Reference.isTopFileRegion(r);
|
||||
// Firstkey comes back null if passed midkey is lower than first key in file
|
||||
// and its a bottom half HalfMapFileReader, OR, if midkey is higher than
|
||||
// the final key in the backing file. In either case, it means this half
|
||||
// file is empty.
|
||||
this.firstKey = this.top?
|
||||
super.getClosest(this.midkey, new ImmutableBytesWritable()):
|
||||
super.getFirstKey().compareTo(this.midkey) > 0?
|
||||
null: super.getFirstKey();
|
||||
this.finalKey = this.top?
|
||||
super.getFinalKey():
|
||||
super.getClosest(new HStoreKey.BeforeThisStoreKey(this.midkey),
|
||||
new ImmutableBytesWritable(), true);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check key is not bleeding into wrong half of the file.
|
||||
* @param key
|
||||
* @throws IOException
|
||||
*/
|
||||
private void checkKey(final WritableComparable<HStoreKey> key)
|
||||
throws IOException {
|
||||
if (top) {
|
||||
if (key.compareTo(midkey) < 0) {
|
||||
throw new IOException("Illegal Access: Key is less than midKey of " +
|
||||
"backing mapfile");
|
||||
}
|
||||
} else if (key.compareTo(midkey) >= 0) {
|
||||
throw new IOException("Illegal Access: Key is greater than or equal " +
|
||||
"to midKey of backing mapfile");
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public synchronized void finalKey(WritableComparable key)
|
||||
throws IOException {
|
||||
Writables.copyWritable(this.finalKey, key);
|
||||
return;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public synchronized Writable get(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
checkKey(key);
|
||||
return super.get(key, val);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public synchronized WritableComparable getClosest(WritableComparable key,
|
||||
Writable val)
|
||||
throws IOException {
|
||||
WritableComparable closest = null;
|
||||
if (top) {
|
||||
// If top, the lowest possible key is first key. Do not have to check
|
||||
// what comes back from super getClosest. Will return exact match or
|
||||
// greater.
|
||||
closest = (key.compareTo(getFirstKey()) < 0)?
|
||||
getClosest(getFirstKey(), val): super.getClosest(key, val);
|
||||
} else {
|
||||
// We're serving bottom of the file.
|
||||
if (key.compareTo(this.midkey) < 0) {
|
||||
// Check key is within range for bottom.
|
||||
closest = super.getClosest(key, val);
|
||||
// midkey was made against largest store file at time of split. Smaller
|
||||
// store files could have anything in them. Check return value is
|
||||
// not beyond the midkey (getClosest returns exact match or next
|
||||
// after).
|
||||
if (closest != null && closest.compareTo(this.midkey) >= 0) {
|
||||
// Don't let this value out.
|
||||
closest = null;
|
||||
}
|
||||
}
|
||||
// Else, key is > midkey so let out closest = null.
|
||||
}
|
||||
return closest;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unused", "unchecked"})
|
||||
@Override
|
||||
public synchronized WritableComparable midKey() throws IOException {
|
||||
// Returns null to indicate file is not splitable.
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public synchronized boolean next(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
if (firstNextCall) {
|
||||
firstNextCall = false;
|
||||
if (isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
// Seek and fill by calling getClosest on first key.
|
||||
if (this.firstKey != null) {
|
||||
Writables.copyWritable(this.firstKey, key);
|
||||
WritableComparable nearest = getClosest(key, val);
|
||||
if (!key.equals(nearest)) {
|
||||
throw new IOException("Keys don't match and should: " +
|
||||
key.toString() + ", " + nearest.toString());
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
boolean result = super.next(key, val);
|
||||
if (!top && key.compareTo(midkey) >= 0) {
|
||||
result = false;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private boolean isEmpty() {
|
||||
return this.firstKey == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void reset() throws IOException {
|
||||
if (top) {
|
||||
firstNextCall = true;
|
||||
// I don't think this is needed. seek(this.firstKey);
|
||||
return;
|
||||
}
|
||||
super.reset();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public synchronized boolean seek(WritableComparable key)
|
||||
throws IOException {
|
||||
checkKey(key);
|
||||
return super.seek(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HStoreKey getFirstKey() {
|
||||
return (HStoreKey)this.firstKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HStoreKey getFinalKey() {
|
||||
return (HStoreKey)this.finalKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString() + ", half=" + (top? "top": "bottom");
|
||||
}
|
||||
}
|
|
@ -79,7 +79,7 @@ public class ImmutableBytesWritable implements WritableComparable {
|
|||
* Get the data from the BytesWritable.
|
||||
* @return The data is only valid between 0 and getSize() - 1.
|
||||
*/
|
||||
public byte[] get() {
|
||||
public byte [] get() {
|
||||
if (this.bytes == null) {
|
||||
throw new IllegalStateException("Uninitialiized. Null constructor " +
|
||||
"called w/o accompaying readFields invocation");
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
/**
|
||||
*
|
||||
*/
|
||||
package org.apache.hadoop.hbase.io;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* A reference to a part of a store file. The file referenced usually lives
|
||||
* under a different region. The part referenced is usually the top or bottom
|
||||
* half of the file. References are made at region split time. Being lazy
|
||||
* about copying data between the parent of the split and the split daughters
|
||||
* makes splitting faster.
|
||||
*
|
||||
* <p>References work with {@link HalfMapFileReader}. References know how to
|
||||
* write out the reference format in the file system and are whats juggled when
|
||||
* references are mixed in with direct store files. The
|
||||
* {@link HalfMapFileReader} is used reading the referred to file.
|
||||
*
|
||||
* <p>References to store files located over in some other region look like
|
||||
* this in the file system
|
||||
* <code>1278437856009925445.hbaserepository,qAReLZD-OyQORZWq_vqR1k==,959247014679548184</code>:
|
||||
* i.e. an id followed by the name of the referenced region. The data
|
||||
* ('mapfiles') of references are empty. The accompanying <code>info</code> file
|
||||
* contains the <code>midkey</code> that demarks top and bottom of the
|
||||
* referenced storefile, the id of the remote store we're referencing and
|
||||
* whether we're to serve the top or bottom region of the remote store file.
|
||||
* Note, a region is itself not splitable if it has instances of store file
|
||||
* references. References are cleaned up by compactions.
|
||||
*/
|
||||
public class Reference implements Writable {
|
||||
// TODO: see if it makes sense making a ReferenceMapFile whose Writer is this
|
||||
// class and whose Reader is the {@link HalfMapFileReader}.
|
||||
|
||||
private int encodedRegionName;
|
||||
private long fileid;
|
||||
private Range region;
|
||||
private HStoreKey midkey;
|
||||
|
||||
/**
|
||||
* For split HStoreFiles, it specifies if the file covers the lower half or
|
||||
* the upper half of the key range
|
||||
*/
|
||||
public static enum Range {
|
||||
/** HStoreFile contains upper half of key range */
|
||||
top,
|
||||
/** HStoreFile contains lower half of key range */
|
||||
bottom
|
||||
}
|
||||
|
||||
public Reference(final int ern, final long fid, final HStoreKey m,
|
||||
final Range fr) {
|
||||
this.encodedRegionName = ern;
|
||||
this.fileid = fid;
|
||||
this.region = fr;
|
||||
this.midkey = m;
|
||||
}
|
||||
|
||||
public Reference() {
|
||||
this(-1, -1, null, Range.bottom);
|
||||
}
|
||||
|
||||
public long getFileId() {
|
||||
return fileid;
|
||||
}
|
||||
|
||||
public Range getFileRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
public HStoreKey getMidkey() {
|
||||
return midkey;
|
||||
}
|
||||
|
||||
public int getEncodedRegionName() {
|
||||
return this.encodedRegionName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return encodedRegionName + "/" + fileid + "/" + region;
|
||||
}
|
||||
|
||||
// Make it serializable.
|
||||
|
||||
public void write(DataOutput out) throws IOException {
|
||||
// Write out the encoded region name as a String. Doing it as a String
|
||||
// keeps a Reference's serialization backword compatible with
|
||||
// pre-HBASE-82 serializations. ALternative is rewriting all
|
||||
// info files in hbase (Serialized References are written into the
|
||||
// 'info' file that accompanies HBase Store files).
|
||||
out.writeUTF(Integer.toString(encodedRegionName));
|
||||
out.writeLong(fileid);
|
||||
// Write true if we're doing top of the file.
|
||||
out.writeBoolean(isTopFileRegion(region));
|
||||
this.midkey.write(out);
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.encodedRegionName = Integer.parseInt(in.readUTF());
|
||||
fileid = in.readLong();
|
||||
boolean tmp = in.readBoolean();
|
||||
// If true, set region to top.
|
||||
region = tmp? Range.top: Range.bottom;
|
||||
midkey = new HStoreKey();
|
||||
midkey.readFields(in);
|
||||
}
|
||||
|
||||
public static boolean isTopFileRegion(final Range r) {
|
||||
return r.equals(Range.top);
|
||||
}
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.io.BatchOperation;
|
|||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.io.HbaseMapWritable;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.io.RowResult;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -775,15 +776,15 @@ public class HRegion implements HConstants {
|
|||
// Split each store file.
|
||||
for(HStoreFile h: hstoreFilesToSplit) {
|
||||
// A reference to the bottom half of the hsf store file.
|
||||
HStoreFile.Reference aReference = new HStoreFile.Reference(
|
||||
Reference aReference = new Reference(
|
||||
this.regionInfo.getEncodedName(), h.getFileId(),
|
||||
new HStoreKey(midKey, this.regionInfo), HStoreFile.Range.bottom);
|
||||
new HStoreKey(midKey, this.regionInfo), Reference.Range.bottom);
|
||||
HStoreFile a = new HStoreFile(this.conf, fs, splits,
|
||||
regionAInfo, h.getColFamily(), -1, aReference);
|
||||
// Reference to top half of the hsf store file.
|
||||
HStoreFile.Reference bReference = new HStoreFile.Reference(
|
||||
Reference bReference = new Reference(
|
||||
this.regionInfo.getEncodedName(), h.getFileId(),
|
||||
new HStoreKey(midKey, this.regionInfo), HStoreFile.Range.top);
|
||||
new HStoreKey(midKey, this.regionInfo), Reference.Range.top);
|
||||
HStoreFile b = new HStoreFile(this.conf, fs, splits,
|
||||
regionBInfo, h.getColFamily(), -1, bReference);
|
||||
h.splitStoreFile(a, b, this.fs);
|
||||
|
|
|
@ -49,13 +49,15 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.io.BloomFilterMapFile;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.io.HBaseMapFile;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
|
@ -191,9 +193,11 @@ public class HStore implements HConstants {
|
|||
}
|
||||
this.desiredMaxFileSize = maxFileSize;
|
||||
|
||||
this.majorCompactionTime = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 86400000);
|
||||
this.majorCompactionTime =
|
||||
conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 86400000);
|
||||
if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
|
||||
String strCompactionTime = family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
|
||||
String strCompactionTime =
|
||||
family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
|
||||
this.majorCompactionTime = (new Long(strCompactionTime)).longValue();
|
||||
}
|
||||
|
||||
|
@ -209,16 +213,10 @@ public class HStore implements HConstants {
|
|||
this.compression = SequenceFile.CompressionType.NONE;
|
||||
}
|
||||
|
||||
Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(),
|
||||
family.getName());
|
||||
if (!fs.exists(mapdir)) {
|
||||
fs.mkdirs(mapdir);
|
||||
}
|
||||
Path infodir = HStoreFile.getInfoDir(basedir, info.getEncodedName(),
|
||||
family.getName());
|
||||
if (!fs.exists(infodir)) {
|
||||
fs.mkdirs(infodir);
|
||||
}
|
||||
Path mapdir = checkdir(HStoreFile.getMapDir(basedir, info.getEncodedName(),
|
||||
family.getName()));
|
||||
Path infodir = checkdir(HStoreFile.getInfoDir(basedir, info.getEncodedName(),
|
||||
family.getName()));
|
||||
|
||||
// Go through the 'mapdir' and 'infodir' together, make sure that all
|
||||
// MapFiles are in a reliable state. Every entry in 'mapdir' must have a
|
||||
|
@ -232,27 +230,19 @@ public class HStore implements HConstants {
|
|||
Bytes.toString(this.storeName) + ", max sequence id " + this.maxSeqId);
|
||||
}
|
||||
|
||||
try {
|
||||
doReconstructionLog(reconstructionLog, maxSeqId, reporter);
|
||||
} catch (EOFException e) {
|
||||
// Presume we got here because of lack of HADOOP-1700; for now keep going
|
||||
// but this is probably not what we want long term. If we got here there
|
||||
// has been data-loss
|
||||
LOG.warn("Exception processing reconstruction log " + reconstructionLog +
|
||||
" opening " + this.storeName +
|
||||
" -- continuing. Probably lack-of-HADOOP-1700 causing DATA LOSS!", e);
|
||||
} catch (IOException e) {
|
||||
// Presume we got here because of some HDFS issue. Don't just keep going.
|
||||
// Fail to open the HStore. Probably means we'll fail over and over
|
||||
// again until human intervention but alternative has us skipping logs
|
||||
// and losing edits: HBASE-642.
|
||||
LOG.warn("Exception processing reconstruction log " + reconstructionLog +
|
||||
" opening " + this.storeName, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Finally, start up all the map readers! (There could be more than one
|
||||
// since we haven't compacted yet.)
|
||||
// Do reconstruction log.
|
||||
runReconstructionLog(reconstructionLog, this.maxSeqId, reporter);
|
||||
|
||||
// Finally, start up all the map readers!
|
||||
setupReaders();
|
||||
}
|
||||
|
||||
/*
|
||||
* Setup the mapfile readers for this store. There could be more than one
|
||||
* since we haven't compacted yet.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void setupReaders() throws IOException {
|
||||
boolean first = true;
|
||||
for(Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
|
||||
MapFile.Reader r = null;
|
||||
|
@ -270,6 +260,18 @@ public class HStore implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @param dir If doesn't exist, create it.
|
||||
* @return Passed <code>dir</code>.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Path checkdir(final Path dir) throws IOException {
|
||||
if (!fs.exists(dir)) {
|
||||
fs.mkdirs(dir);
|
||||
}
|
||||
return dir;
|
||||
}
|
||||
|
||||
HColumnDescriptor getFamily() {
|
||||
return this.family;
|
||||
}
|
||||
|
@ -278,6 +280,36 @@ public class HStore implements HConstants {
|
|||
return this.maxSeqId;
|
||||
}
|
||||
|
||||
/*
|
||||
* Run reconstuction log
|
||||
* @param reconstructionLog
|
||||
* @param msid
|
||||
* @param reporter
|
||||
* @throws IOException
|
||||
*/
|
||||
private void runReconstructionLog(final Path reconstructionLog,
|
||||
final long msid, final Progressable reporter)
|
||||
throws IOException {
|
||||
try {
|
||||
doReconstructionLog(reconstructionLog, msid, reporter);
|
||||
} catch (EOFException e) {
|
||||
// Presume we got here because of lack of HADOOP-1700; for now keep going
|
||||
// but this is probably not what we want long term. If we got here there
|
||||
// has been data-loss
|
||||
LOG.warn("Exception processing reconstruction log " + reconstructionLog +
|
||||
" opening " + this.storeName +
|
||||
" -- continuing. Probably lack-of-HADOOP-1700 causing DATA LOSS!", e);
|
||||
} catch (IOException e) {
|
||||
// Presume we got here because of some HDFS issue. Don't just keep going.
|
||||
// Fail to open the HStore. Probably means we'll fail over and over
|
||||
// again until human intervention but alternative has us skipping logs
|
||||
// and losing edits: HBASE-642.
|
||||
LOG.warn("Exception processing reconstruction log " + reconstructionLog +
|
||||
" opening " + this.storeName, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Read the reconstructionLog to see whether we need to build a brand-new
|
||||
* MapFile out of non-flushed log entries.
|
||||
|
@ -396,7 +428,7 @@ public class HStore implements HConstants {
|
|||
long fid = Long.parseLong(m.group(1));
|
||||
|
||||
HStoreFile curfile = null;
|
||||
HStoreFile.Reference reference = null;
|
||||
Reference reference = null;
|
||||
if (isReference) {
|
||||
reference = HStoreFile.readSplitInfo(p, fs);
|
||||
}
|
||||
|
@ -437,7 +469,7 @@ public class HStore implements HConstants {
|
|||
// TODO: This is going to fail if we are to rebuild a file from
|
||||
// meta because it won't have right comparator: HBASE-848.
|
||||
long count = MapFile.fix(this.fs, mapfile, HStoreKey.class,
|
||||
HStoreFile.HbaseMapFile.VALUE_CLASS, false, this.conf);
|
||||
HBaseMapFile.VALUE_CLASS, false, this.conf);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Fixed index on " + mapfile.toString() + "; had " +
|
||||
count + " entries");
|
||||
|
@ -448,21 +480,33 @@ public class HStore implements HConstants {
|
|||
continue;
|
||||
}
|
||||
}
|
||||
storeSize += curfile.length();
|
||||
long length = curfile.length();
|
||||
storeSize += length;
|
||||
|
||||
// TODO: Confirm referent exists.
|
||||
|
||||
// Found map and sympathetic info file. Add this hstorefile to result.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" +
|
||||
isReference + ", sequence id=" + storeSeqId);
|
||||
isReference + ", sequence id=" + storeSeqId + ", length=" + length);
|
||||
}
|
||||
results.put(Long.valueOf(storeSeqId), curfile);
|
||||
// Keep list of sympathetic data mapfiles for cleaning info dir in next
|
||||
// section. Make sure path is fully qualified for compare.
|
||||
mapfiles.add(this.fs.makeQualified(mapfile));
|
||||
}
|
||||
|
||||
cleanDataFiles(mapfiles, mapdir);
|
||||
return results;
|
||||
}
|
||||
|
||||
/*
|
||||
* If no info file delete the sympathetic data file.
|
||||
* @param mapfiles List of mapfiles.
|
||||
* @param mapdir Directory to check.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void cleanDataFiles(final List<Path> mapfiles, final Path mapdir)
|
||||
throws IOException {
|
||||
// List paths by experience returns fully qualified names -- at least when
|
||||
// running on a mini hdfs cluster.
|
||||
FileStatus [] datfiles = fs.listStatus(mapdir);
|
||||
|
@ -474,7 +518,6 @@ public class HStore implements HConstants {
|
|||
fs.delete(p, true);
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -732,19 +775,20 @@ public class HStore implements HConstants {
|
|||
* @param dir
|
||||
* @throws IOException
|
||||
*/
|
||||
private static long getLowestTimestamp(FileSystem fs, Path dir) throws IOException {
|
||||
FileStatus[] stats = fs.listStatus(dir);
|
||||
if (stats == null || stats.length == 0) {
|
||||
return 0l;
|
||||
}
|
||||
long lowTimestamp = Long.MAX_VALUE;
|
||||
for (int i = 0; i < stats.length; i++) {
|
||||
long timestamp = stats[i].getModificationTime();
|
||||
if (timestamp < lowTimestamp){
|
||||
lowTimestamp = timestamp;
|
||||
}
|
||||
}
|
||||
return lowTimestamp;
|
||||
private static long getLowestTimestamp(FileSystem fs, Path dir)
|
||||
throws IOException {
|
||||
FileStatus[] stats = fs.listStatus(dir);
|
||||
if (stats == null || stats.length == 0) {
|
||||
return 0l;
|
||||
}
|
||||
long lowTimestamp = Long.MAX_VALUE;
|
||||
for (int i = 0; i < stats.length; i++) {
|
||||
long timestamp = stats[i].getModificationTime();
|
||||
if (timestamp < lowTimestamp){
|
||||
lowTimestamp = timestamp;
|
||||
}
|
||||
}
|
||||
return lowTimestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -768,12 +812,11 @@ public class HStore implements HConstants {
|
|||
* @return mid key if a split is needed, null otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
StoreSize compact(boolean majorCompaction) throws IOException {
|
||||
StoreSize compact(final boolean majorCompaction) throws IOException {
|
||||
boolean forceSplit = this.info.shouldSplit(false);
|
||||
boolean doMajorCompaction = majorCompaction;
|
||||
synchronized (compactLock) {
|
||||
long maxId = -1;
|
||||
int nrows = -1;
|
||||
List<HStoreFile> filesToCompact = null;
|
||||
synchronized (storefiles) {
|
||||
if (this.storefiles.size() <= 0) {
|
||||
|
@ -791,18 +834,7 @@ public class HStore implements HConstants {
|
|||
// compacting below. Only check if doMajorCompaction is not true.
|
||||
long lastMajorCompaction = 0L;
|
||||
if (!doMajorCompaction) {
|
||||
Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(), family.getName());
|
||||
long lowTimestamp = getLowestTimestamp(fs, mapdir);
|
||||
lastMajorCompaction = System.currentTimeMillis() - lowTimestamp;
|
||||
if (lowTimestamp < (System.currentTimeMillis() - majorCompactionTime) &&
|
||||
lowTimestamp > 0l) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Major compaction triggered on store: " + this.storeNameStr +
|
||||
". Time since last major compaction: " +
|
||||
((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds");
|
||||
}
|
||||
doMajorCompaction = true;
|
||||
}
|
||||
doMajorCompaction = isMajorCompaction();
|
||||
}
|
||||
if (!doMajorCompaction && !hasReferences(filesToCompact) &&
|
||||
filesToCompact.size() < compactionThreshold) {
|
||||
|
@ -813,7 +845,7 @@ public class HStore implements HConstants {
|
|||
return checkSplit(forceSplit);
|
||||
}
|
||||
|
||||
// HBASE-745, preparing all store file size for incremental compacting
|
||||
// HBASE-745, preparing all store file sizes for incremental compacting
|
||||
// selection.
|
||||
int countOfFiles = filesToCompact.size();
|
||||
long totalSize = 0;
|
||||
|
@ -866,25 +898,7 @@ public class HStore implements HConstants {
|
|||
* based access pattern is practically designed to ruin the cache.
|
||||
*/
|
||||
List<MapFile.Reader> rdrs = new ArrayList<MapFile.Reader>();
|
||||
for (HStoreFile file: filesToCompact) {
|
||||
try {
|
||||
HStoreFile.BloomFilterMapFile.Reader reader =
|
||||
file.getReader(fs, false, false);
|
||||
rdrs.add(reader);
|
||||
|
||||
// Compute the size of the new bloomfilter if needed
|
||||
if (this.family.isBloomfilter()) {
|
||||
nrows += reader.getBloomFilterSize();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// Add info about which file threw exception. It may not be in the
|
||||
// exception message so output a message here where we know the
|
||||
// culprit.
|
||||
LOG.warn("Failed with " + e.toString() + ": " + file.toString());
|
||||
closeCompactionReaders(rdrs);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
int nrows = createReaders(rdrs, filesToCompact);
|
||||
|
||||
// Step through them, writing to the brand-new MapFile
|
||||
HStoreFile compactedOutputFile = new HStoreFile(conf, fs,
|
||||
|
@ -915,8 +929,74 @@ public class HStore implements HConstants {
|
|||
}
|
||||
}
|
||||
return checkSplit(forceSplit);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @return True if we should run a major compaction.
|
||||
*/
|
||||
private boolean isMajorCompaction() throws IOException {
|
||||
boolean result = false;
|
||||
Path mapdir = HStoreFile.getMapDir(this.basedir, this.info.getEncodedName(),
|
||||
this.family.getName());
|
||||
long lowTimestamp = getLowestTimestamp(fs, mapdir);
|
||||
if (lowTimestamp < (System.currentTimeMillis() - this.majorCompactionTime) &&
|
||||
lowTimestamp > 0l) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Major compaction triggered on store: " +
|
||||
this.storeNameStr + ". Time since last major compaction: " +
|
||||
((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds");
|
||||
}
|
||||
result = true;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create readers for the passed in list of HStoreFiles and add them to
|
||||
* <code>readers</code> list.
|
||||
* @param readers Add Readers here.
|
||||
* @param files List of HSFs to make Readers for.
|
||||
* @return Count of rows for bloom filter sizing. Returns -1 if no bloom
|
||||
* filter wanted.
|
||||
*/
|
||||
private int createReaders(final List<MapFile.Reader> rs,
|
||||
final List<HStoreFile> files)
|
||||
throws IOException {
|
||||
/* We create a new list of MapFile.Reader objects so we don't screw up
|
||||
* the caching associated with the currently-loaded ones. Our iteration-
|
||||
* based access pattern is practically designed to ruin the cache.
|
||||
*/
|
||||
int nrows = -1;
|
||||
for (HStoreFile file: files) {
|
||||
try {
|
||||
// TODO: Readers are opened without block-cache enabled. Means we don't
|
||||
// get the prefetch that makes the read faster. But we don't want to
|
||||
// enable block-cache for these readers that are about to be closed.
|
||||
// The compaction of soon-to-be closed readers will probably force out
|
||||
// blocks that may be needed servicing real-time requests whereas
|
||||
// compaction runs in background. TODO: We know we're going to read
|
||||
// this file straight through. Leverage this fact. Use a big buffer
|
||||
// client side to speed things up or read it all up into memory one file
|
||||
// at a time or pull local and memory-map the file but leave the writer
|
||||
// up in hdfs?
|
||||
BloomFilterMapFile.Reader reader = file.getReader(fs, false, false);
|
||||
rs.add(reader);
|
||||
// Compute the size of the new bloomfilter if needed
|
||||
if (this.family.isBloomfilter()) {
|
||||
nrows += reader.getBloomFilterSize();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// Add info about which file threw exception. It may not be in the
|
||||
// exception message so output a message here where we know the
|
||||
// culprit.
|
||||
LOG.warn("Failed with " + e.toString() + ": " + file.toString());
|
||||
closeCompactionReaders(rs);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return nrows;
|
||||
}
|
||||
|
||||
/*
|
||||
* Compact a list of MapFile.Readers into MapFile.Writer.
|
||||
*
|
||||
|
@ -1166,7 +1246,6 @@ public class HStore implements HConstants {
|
|||
// time
|
||||
getFullFromMapFile(map, key, columns, deletes, results);
|
||||
}
|
||||
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
|
@ -1203,7 +1282,7 @@ public class HStore implements HConstants {
|
|||
// recent delete timestamp, record it for later
|
||||
if (!deletes.containsKey(readcol)
|
||||
|| deletes.get(readcol).longValue() < readkey.getTimestamp()) {
|
||||
deletes.put(readcol, readkey.getTimestamp());
|
||||
deletes.put(readcol, Long.valueOf(readkey.getTimestamp()));
|
||||
}
|
||||
} else if (!(deletes.containsKey(readcol)
|
||||
&& deletes.get(readcol).longValue() >= readkey.getTimestamp()) ) {
|
||||
|
@ -1377,7 +1456,8 @@ public class HStore implements HConstants {
|
|||
* @return Matching keys.
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<HStoreKey> getKeys(final HStoreKey origin, final int versions, final long now)
|
||||
public List<HStoreKey> getKeys(final HStoreKey origin, final int versions,
|
||||
final long now)
|
||||
throws IOException {
|
||||
// This code below is very close to the body of the get method. Any
|
||||
// changes in the flow below should also probably be done in get. TODO:
|
||||
|
@ -1788,6 +1868,9 @@ public class HStore implements HConstants {
|
|||
* @throws IOException
|
||||
*/
|
||||
private HStoreKey getFinalKey(final MapFile.Reader mf) throws IOException {
|
||||
if (mf instanceof HBaseMapFile.HBaseReader) {
|
||||
return ((HBaseMapFile.HBaseReader)mf).getFinalKey();
|
||||
}
|
||||
HStoreKey finalKey = new HStoreKey();
|
||||
mf.finalKey(finalKey);
|
||||
finalKey.setHRegionInfo(this.info);
|
||||
|
@ -1839,10 +1922,10 @@ public class HStore implements HConstants {
|
|||
|
||||
/**
|
||||
* Determines if HStore can be split
|
||||
*
|
||||
* @param force Whether to force a split or not.
|
||||
* @return a StoreSize if store can be split, null otherwise
|
||||
*/
|
||||
StoreSize checkSplit(boolean force) {
|
||||
StoreSize checkSplit(final boolean force) {
|
||||
if (this.storefiles.size() <= 0) {
|
||||
return null;
|
||||
}
|
||||
|
@ -1859,39 +1942,35 @@ public class HStore implements HConstants {
|
|||
synchronized (storefiles) {
|
||||
for (Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
|
||||
HStoreFile curHSF = e.getValue();
|
||||
if (splitable) {
|
||||
splitable = !curHSF.isReference();
|
||||
if (!splitable) {
|
||||
// RETURN IN MIDDLE OF FUNCTION!!! If not splitable, just return.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
long size = curHSF.length();
|
||||
if (size > maxSize) {
|
||||
// This is the largest one so far
|
||||
maxSize = size;
|
||||
mapIndex = e.getKey();
|
||||
}
|
||||
if (splitable) {
|
||||
splitable = !curHSF.isReference();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!splitable) {
|
||||
return null;
|
||||
}
|
||||
MapFile.Reader r = this.readers.get(mapIndex);
|
||||
|
||||
// seek back to the beginning of mapfile
|
||||
r.reset();
|
||||
|
||||
// get the first and last keys
|
||||
HStoreKey firstKey = new HStoreKey();
|
||||
HStoreKey lastKey = new HStoreKey();
|
||||
Writable value = new ImmutableBytesWritable();
|
||||
r.next(firstKey, value);
|
||||
r.finalKey(lastKey);
|
||||
// Cast to HbaseReader.
|
||||
HBaseMapFile.HBaseReader r =
|
||||
(HBaseMapFile.HBaseReader)this.readers.get(mapIndex);
|
||||
|
||||
// get the midkey
|
||||
HStoreKey mk = (HStoreKey)r.midKey();
|
||||
if (mk != null) {
|
||||
// if the midkey is the same as the first and last keys, then we cannot
|
||||
// (ever) split this region.
|
||||
if (HStoreKey.equalsTwoRowKeys(info, mk.getRow(), firstKey.getRow()) &&
|
||||
HStoreKey.equalsTwoRowKeys(info, mk.getRow(), lastKey.getRow())) {
|
||||
if (HStoreKey.equalsTwoRowKeys(info, mk.getRow(),
|
||||
r.getFirstKey().getRow()) &&
|
||||
HStoreKey.equalsTwoRowKeys(info, mk.getRow(),
|
||||
r.getFinalKey().getRow())) {
|
||||
return null;
|
||||
}
|
||||
return new StoreSize(maxSize, mk.getRow());
|
||||
|
@ -1967,6 +2046,9 @@ public class HStore implements HConstants {
|
|||
return this.storefiles.size();
|
||||
}
|
||||
|
||||
/*
|
||||
* Datastructure that holds size and key.
|
||||
*/
|
||||
class StoreSize {
|
||||
private final long size;
|
||||
private final byte[] key;
|
||||
|
@ -1988,4 +2070,4 @@ public class HStore implements HConstants {
|
|||
HRegionInfo getHRegionInfo() {
|
||||
return this.info;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,19 +19,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutput;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -39,18 +33,12 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.io.BlockFSInputStream;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.io.BloomFilterMapFile;
|
||||
import org.apache.hadoop.hbase.io.HalfMapFileReader;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Hash;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.onelab.filter.BloomFilter;
|
||||
import org.onelab.filter.Key;
|
||||
|
||||
/**
|
||||
* A HStore data file. HStores usually have one or more of these files. They
|
||||
|
@ -99,17 +87,6 @@ public class HStoreFile implements HConstants {
|
|||
static final String HSTORE_INFO_DIR = "info";
|
||||
static final String HSTORE_FILTER_DIR = "filter";
|
||||
|
||||
/**
|
||||
* For split HStoreFiles, specifies if the file covers the lower half or
|
||||
* the upper half of the key range
|
||||
*/
|
||||
public static enum Range {
|
||||
/** HStoreFile contains upper half of key range */
|
||||
top,
|
||||
/** HStoreFile contains lower half of key range */
|
||||
bottom
|
||||
}
|
||||
|
||||
private final static Random rand = new Random();
|
||||
|
||||
private final Path basedir;
|
||||
|
@ -154,8 +131,7 @@ public class HStoreFile implements HConstants {
|
|||
this.fileId = id;
|
||||
|
||||
// If a reference, construction does not write the pointer files. Thats
|
||||
// done by invocations of writeReferenceFiles(hsf, fs). Happens at fast
|
||||
// split time.
|
||||
// done by invocations of writeReferenceFiles(hsf, fs). Happens at split.
|
||||
this.reference = ref;
|
||||
}
|
||||
|
||||
|
@ -287,11 +263,11 @@ public class HStoreFile implements HConstants {
|
|||
/**
|
||||
* @see #writeSplitInfo(FileSystem fs)
|
||||
*/
|
||||
static HStoreFile.Reference readSplitInfo(final Path p, final FileSystem fs)
|
||||
static Reference readSplitInfo(final Path p, final FileSystem fs)
|
||||
throws IOException {
|
||||
FSDataInputStream in = fs.open(p);
|
||||
try {
|
||||
HStoreFile.Reference r = new HStoreFile.Reference();
|
||||
Reference r = new Reference();
|
||||
r.readFields(in);
|
||||
return r;
|
||||
} finally {
|
||||
|
@ -319,7 +295,8 @@ public class HStoreFile implements HConstants {
|
|||
long loadInfo(FileSystem fs) throws IOException {
|
||||
Path p = null;
|
||||
if (isReference()) {
|
||||
p = getInfoFilePath(reference.getEncodedRegionName(), reference.getFileId());
|
||||
p = getInfoFilePath(reference.getEncodedRegionName(),
|
||||
this.reference.getFileId());
|
||||
} else {
|
||||
p = getInfoFilePath();
|
||||
}
|
||||
|
@ -405,7 +382,7 @@ public class HStoreFile implements HConstants {
|
|||
final boolean bloomFilter, final boolean blockCacheEnabled)
|
||||
throws IOException {
|
||||
if (isReference()) {
|
||||
return new HStoreFile.HalfMapFileReader(fs,
|
||||
return new HalfMapFileReader(fs,
|
||||
getMapFilePath(reference).toString(), conf,
|
||||
reference.getFileRegion(), reference.getMidkey(), bloomFilter,
|
||||
blockCacheEnabled, this.hri);
|
||||
|
@ -453,10 +430,6 @@ public class HStoreFile implements HConstants {
|
|||
return encodedRegionName + "/" + Bytes.toString(colFamily) + "/" + fileId +
|
||||
(isReference()? "-" + reference.toString(): "");
|
||||
}
|
||||
|
||||
static boolean isTopFileRegion(final Range r) {
|
||||
return r.equals(Range.top);
|
||||
}
|
||||
|
||||
private static String createHStoreFilename(final long fid,
|
||||
final int encodedRegionName) {
|
||||
|
@ -510,525 +483,4 @@ public class HStoreFile implements HConstants {
|
|||
return new Path(base, new Path(Integer.toString(encodedRegionName),
|
||||
new Path(Bytes.toString(f), subdir)));
|
||||
}
|
||||
|
||||
/*
|
||||
* Data structure to hold reference to a store file over in another region.
|
||||
*/
|
||||
static class Reference implements Writable {
|
||||
private int encodedRegionName;
|
||||
private long fileid;
|
||||
private Range region;
|
||||
private HStoreKey midkey;
|
||||
|
||||
Reference(final int ern, final long fid, final HStoreKey m,
|
||||
final Range fr) {
|
||||
this.encodedRegionName = ern;
|
||||
this.fileid = fid;
|
||||
this.region = fr;
|
||||
this.midkey = m;
|
||||
}
|
||||
|
||||
Reference() {
|
||||
this(-1, -1, null, Range.bottom);
|
||||
}
|
||||
|
||||
long getFileId() {
|
||||
return fileid;
|
||||
}
|
||||
|
||||
Range getFileRegion() {
|
||||
return region;
|
||||
}
|
||||
|
||||
HStoreKey getMidkey() {
|
||||
return midkey;
|
||||
}
|
||||
|
||||
int getEncodedRegionName() {
|
||||
return this.encodedRegionName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return encodedRegionName + "/" + fileid + "/" + region;
|
||||
}
|
||||
|
||||
// Make it serializable.
|
||||
|
||||
public void write(DataOutput out) throws IOException {
|
||||
// Write out the encoded region name as a String. Doing it as a String
|
||||
// keeps a Reference's serialziation backword compatible with
|
||||
// pre-HBASE-82 serializations. ALternative is rewriting all
|
||||
// info files in hbase (Serialized References are written into the
|
||||
// 'info' file that accompanies HBase Store files).
|
||||
out.writeUTF(Integer.toString(encodedRegionName));
|
||||
out.writeLong(fileid);
|
||||
// Write true if we're doing top of the file.
|
||||
out.writeBoolean(isTopFileRegion(region));
|
||||
midkey.write(out);
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
this.encodedRegionName = Integer.parseInt(in.readUTF());
|
||||
fileid = in.readLong();
|
||||
boolean tmp = in.readBoolean();
|
||||
// If true, set region to top.
|
||||
region = tmp? Range.top: Range.bottom;
|
||||
midkey = new HStoreKey();
|
||||
midkey.readFields(in);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Hbase customizations of MapFile.
|
||||
*/
|
||||
static class HbaseMapFile extends MapFile {
|
||||
static final Class<? extends Writable> VALUE_CLASS =
|
||||
ImmutableBytesWritable.class;
|
||||
|
||||
/**
|
||||
* Custom bloom filter key maker.
|
||||
* @param key
|
||||
* @return Key made of bytes of row only.
|
||||
*/
|
||||
protected static Key getBloomFilterKey(WritableComparable key) {
|
||||
return new Key(((HStoreKey) key).getRow());
|
||||
}
|
||||
|
||||
/**
|
||||
* A reader capable of reading and caching blocks of the data file.
|
||||
*/
|
||||
static class HbaseReader extends MapFile.Reader {
|
||||
|
||||
private final boolean blockCacheEnabled;
|
||||
|
||||
/**
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param conf
|
||||
* @param hri
|
||||
* @throws IOException
|
||||
*/
|
||||
public HbaseReader(FileSystem fs, String dirName, Configuration conf,
|
||||
HRegionInfo hri)
|
||||
throws IOException {
|
||||
this(fs, dirName, conf, false, hri);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param conf
|
||||
* @param blockCacheEnabled
|
||||
* @param hri
|
||||
* @throws IOException
|
||||
*/
|
||||
public HbaseReader(FileSystem fs, String dirName, Configuration conf,
|
||||
boolean blockCacheEnabled, HRegionInfo hri)
|
||||
throws IOException {
|
||||
super(fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri),
|
||||
conf, false); // defer opening streams
|
||||
this.blockCacheEnabled = blockCacheEnabled;
|
||||
open(fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri), conf);
|
||||
|
||||
// Force reading of the mapfile index by calling midKey.
|
||||
// Reading the index will bring the index into memory over
|
||||
// here on the client and then close the index file freeing
|
||||
// up socket connection and resources in the datanode.
|
||||
// Usually, the first access on a MapFile.Reader will load the
|
||||
// index force the issue in HStoreFile MapFiles because an
|
||||
// access may not happen for some time; meantime we're
|
||||
// using up datanode resources. See HADOOP-2341.
|
||||
midKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected org.apache.hadoop.io.SequenceFile.Reader createDataFileReader(
|
||||
FileSystem fs, Path dataFile, Configuration conf)
|
||||
throws IOException {
|
||||
if (!blockCacheEnabled) {
|
||||
return super.createDataFileReader(fs, dataFile, conf);
|
||||
}
|
||||
LOG.info("Block Cache enabled");
|
||||
final int blockSize = conf.getInt("hbase.hstore.blockCache.blockSize",
|
||||
64 * 1024);
|
||||
return new SequenceFile.Reader(fs, dataFile, conf) {
|
||||
@Override
|
||||
protected FSDataInputStream openFile(FileSystem fs, Path file,
|
||||
int bufferSize, long length) throws IOException {
|
||||
|
||||
return new FSDataInputStream(new BlockFSInputStream(
|
||||
super.openFile(fs, file, bufferSize, length), length,
|
||||
blockSize));
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
static class HbaseWriter extends MapFile.Writer {
|
||||
/**
|
||||
* @param conf
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param compression
|
||||
* @param hri
|
||||
* @throws IOException
|
||||
*/
|
||||
public HbaseWriter(Configuration conf, FileSystem fs, String dirName,
|
||||
SequenceFile.CompressionType compression, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
super(conf, fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri),
|
||||
VALUE_CLASS, compression);
|
||||
// Default for mapfiles is 128. Makes random reads faster if we
|
||||
// have more keys indexed and we're not 'next'-ing around in the
|
||||
// mapfile.
|
||||
setIndexInterval(conf.getInt("hbase.io.index.interval", 128));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On write, all keys are added to a bloom filter. On read, all keys are
|
||||
* tested first against bloom filter. Keys are HStoreKey. If passed bloom
|
||||
* filter is null, just passes invocation to parent.
|
||||
*/
|
||||
static class BloomFilterMapFile extends HbaseMapFile {
|
||||
protected static final String BLOOMFILTER_FILE_NAME = "filter";
|
||||
|
||||
static class Reader extends HbaseReader {
|
||||
private final BloomFilter bloomFilter;
|
||||
|
||||
/**
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param conf
|
||||
* @param filter
|
||||
* @param blockCacheEnabled
|
||||
* @param hri
|
||||
* @throws IOException
|
||||
*/
|
||||
public Reader(FileSystem fs, String dirName, Configuration conf,
|
||||
final boolean filter, final boolean blockCacheEnabled,
|
||||
HRegionInfo hri)
|
||||
throws IOException {
|
||||
super(fs, dirName, conf, blockCacheEnabled, hri);
|
||||
if (filter) {
|
||||
this.bloomFilter = loadBloomFilter(fs, dirName);
|
||||
} else {
|
||||
this.bloomFilter = null;
|
||||
}
|
||||
}
|
||||
|
||||
private BloomFilter loadBloomFilter(FileSystem fs, String dirName)
|
||||
throws IOException {
|
||||
Path filterFile = new Path(dirName, BLOOMFILTER_FILE_NAME);
|
||||
if(!fs.exists(filterFile)) {
|
||||
throw new FileNotFoundException("Could not find bloom filter: " +
|
||||
filterFile);
|
||||
}
|
||||
BloomFilter filter = new BloomFilter();
|
||||
FSDataInputStream in = fs.open(filterFile);
|
||||
try {
|
||||
filter.readFields(in);
|
||||
} finally {
|
||||
in.close();
|
||||
}
|
||||
return filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Writable get(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
if (bloomFilter == null) {
|
||||
return super.get(key, val);
|
||||
}
|
||||
if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key exists");
|
||||
}
|
||||
return super.get(key, val);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key does not exist");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public WritableComparable getClosest(WritableComparable key,
|
||||
Writable val) throws IOException {
|
||||
if (bloomFilter == null) {
|
||||
return super.getClosest(key, val);
|
||||
}
|
||||
// Note - the key being passed to us is always a HStoreKey
|
||||
if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key exists");
|
||||
}
|
||||
return super.getClosest(key, val);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("bloom filter reported that key does not exist");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/* @return size of the bloom filter */
|
||||
int getBloomFilterSize() {
|
||||
return bloomFilter == null ? 0 : bloomFilter.getVectorSize();
|
||||
}
|
||||
}
|
||||
|
||||
static class Writer extends HbaseWriter {
|
||||
private static final double DEFAULT_NUMBER_OF_HASH_FUNCTIONS = 4.0;
|
||||
private final BloomFilter bloomFilter;
|
||||
private final String dirName;
|
||||
private final FileSystem fs;
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param compression
|
||||
* @param filter
|
||||
* @param nrows
|
||||
* @param hri
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Writer(Configuration conf, FileSystem fs, String dirName,
|
||||
SequenceFile.CompressionType compression, final boolean filter,
|
||||
int nrows, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
super(conf, fs, dirName, compression, hri);
|
||||
this.dirName = dirName;
|
||||
this.fs = fs;
|
||||
if (filter) {
|
||||
/*
|
||||
* There is no way to automatically determine the vector size and the
|
||||
* number of hash functions to use. In particular, bloom filters are
|
||||
* very sensitive to the number of elements inserted into them. For
|
||||
* HBase, the number of entries depends on the size of the data stored
|
||||
* in the column. Currently the default region size is 256MB, so the
|
||||
* number of entries is approximately
|
||||
* 256MB / (average value size for column).
|
||||
*
|
||||
* If m denotes the number of bits in the Bloom filter (vectorSize),
|
||||
* n denotes the number of elements inserted into the Bloom filter and
|
||||
* k represents the number of hash functions used (nbHash), then
|
||||
* according to Broder and Mitzenmacher,
|
||||
*
|
||||
* ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf )
|
||||
*
|
||||
* the probability of false positives is minimized when k is
|
||||
* approximately m/n ln(2).
|
||||
*
|
||||
* If we fix the number of hash functions and know the number of
|
||||
* entries, then the optimal vector size m = (k * n) / ln(2)
|
||||
*/
|
||||
this.bloomFilter = new BloomFilter(
|
||||
(int) Math.ceil(
|
||||
(DEFAULT_NUMBER_OF_HASH_FUNCTIONS * (1.0 * nrows)) /
|
||||
Math.log(2.0)),
|
||||
(int) DEFAULT_NUMBER_OF_HASH_FUNCTIONS,
|
||||
Hash.getHashType(conf)
|
||||
);
|
||||
} else {
|
||||
this.bloomFilter = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
if (bloomFilter != null) {
|
||||
bloomFilter.add(getBloomFilterKey(key));
|
||||
}
|
||||
super.append(key, val);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
super.close();
|
||||
if (this.bloomFilter != null) {
|
||||
flushBloomFilter();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes bloom filter to disk
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void flushBloomFilter() throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("flushing bloom filter for " + this.dirName);
|
||||
}
|
||||
FSDataOutputStream out =
|
||||
fs.create(new Path(dirName, BLOOMFILTER_FILE_NAME));
|
||||
try {
|
||||
bloomFilter.write(out);
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("flushed bloom filter for " + this.dirName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A facade for a {@link MapFile.Reader} that serves up either the top or
|
||||
* bottom half of a MapFile (where 'bottom' is the first half of the file
|
||||
* containing the keys that sort lowest and 'top' is the second half of the
|
||||
* file with keys that sort greater than those of the bottom half).
|
||||
* Subclasses BloomFilterMapFile.Reader in case
|
||||
*
|
||||
* <p>This file is not splitable. Calls to {@link #midKey()} return null.
|
||||
*/
|
||||
static class HalfMapFileReader extends BloomFilterMapFile.Reader {
|
||||
private final boolean top;
|
||||
private final WritableComparable midkey;
|
||||
private boolean firstNextCall = true;
|
||||
|
||||
HalfMapFileReader(final FileSystem fs, final String dirName,
|
||||
final Configuration conf, final Range r,
|
||||
final WritableComparable midKey,
|
||||
final HRegionInfo hri)
|
||||
throws IOException {
|
||||
this(fs, dirName, conf, r, midKey, false, false, hri);
|
||||
}
|
||||
|
||||
HalfMapFileReader(final FileSystem fs, final String dirName,
|
||||
final Configuration conf, final Range r,
|
||||
final WritableComparable midKey, final boolean filter,
|
||||
final boolean blockCacheEnabled,
|
||||
final HRegionInfo hri)
|
||||
throws IOException {
|
||||
super(fs, dirName, conf, filter, blockCacheEnabled, hri);
|
||||
top = isTopFileRegion(r);
|
||||
midkey = midKey;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void checkKey(final WritableComparable key)
|
||||
throws IOException {
|
||||
if (top) {
|
||||
if (key.compareTo(midkey) < 0) {
|
||||
throw new IOException("Illegal Access: Key is less than midKey of " +
|
||||
"backing mapfile");
|
||||
}
|
||||
} else if (key.compareTo(midkey) >= 0) {
|
||||
throw new IOException("Illegal Access: Key is greater than or equal " +
|
||||
"to midKey of backing mapfile");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void finalKey(WritableComparable key)
|
||||
throws IOException {
|
||||
if (top) {
|
||||
super.finalKey(key);
|
||||
} else {
|
||||
reset();
|
||||
Writable value = new ImmutableBytesWritable();
|
||||
WritableComparable k = super.getClosest(midkey, value, true);
|
||||
ByteArrayOutputStream byteout = new ByteArrayOutputStream();
|
||||
DataOutputStream out = new DataOutputStream(byteout);
|
||||
k.write(out);
|
||||
ByteArrayInputStream bytein =
|
||||
new ByteArrayInputStream(byteout.toByteArray());
|
||||
DataInputStream in = new DataInputStream(bytein);
|
||||
key.readFields(in);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Writable get(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
checkKey(key);
|
||||
return super.get(key, val);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public synchronized WritableComparable getClosest(WritableComparable key,
|
||||
Writable val)
|
||||
throws IOException {
|
||||
WritableComparable closest = null;
|
||||
if (top) {
|
||||
// If top, the lowest possible key is midkey. Do not have to check
|
||||
// what comes back from super getClosest. Will return exact match or
|
||||
// greater.
|
||||
closest = (key.compareTo(this.midkey) < 0)?
|
||||
this.midkey: super.getClosest(key, val);
|
||||
} else {
|
||||
// We're serving bottom of the file.
|
||||
if (key.compareTo(this.midkey) < 0) {
|
||||
// Check key is within range for bottom.
|
||||
closest = super.getClosest(key, val);
|
||||
// midkey was made against largest store file at time of split. Smaller
|
||||
// store files could have anything in them. Check return value is
|
||||
// not beyond the midkey (getClosest returns exact match or next
|
||||
// after).
|
||||
if (closest != null && closest.compareTo(this.midkey) >= 0) {
|
||||
// Don't let this value out.
|
||||
closest = null;
|
||||
}
|
||||
}
|
||||
// Else, key is > midkey so let out closest = null.
|
||||
}
|
||||
return closest;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@Override
|
||||
public synchronized WritableComparable midKey() throws IOException {
|
||||
// Returns null to indicate file is not splitable.
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public synchronized boolean next(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
if (firstNextCall) {
|
||||
firstNextCall = false;
|
||||
if (this.top) {
|
||||
// Seek to midkey. Midkey may not exist in this file. That should be
|
||||
// fine. Then we'll either be positioned at end or start of file.
|
||||
WritableComparable nearest = getClosest(midkey, val);
|
||||
// Now copy the mid key into the passed key.
|
||||
if (nearest != null) {
|
||||
Writables.copyWritable(nearest, key);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
boolean result = super.next(key, val);
|
||||
if (!top && key.compareTo(midkey) >= 0) {
|
||||
result = false;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void reset() throws IOException {
|
||||
if (top) {
|
||||
firstNextCall = true;
|
||||
seek(midkey);
|
||||
return;
|
||||
}
|
||||
super.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean seek(WritableComparable key)
|
||||
throws IOException {
|
||||
checkKey(key);
|
||||
return super.seek(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,24 +21,22 @@ package org.apache.hadoop.hbase;
|
|||
|
||||
import java.io.PrintWriter;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* Abstract base class for HBase cluster junit tests. Spins up an hbase
|
||||
* cluster in setup and tears it down again in tearDown.
|
||||
*/
|
||||
public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(HBaseClusterTestCase.class.getName());
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HBaseClusterTestCase.class);
|
||||
protected MiniHBaseCluster cluster;
|
||||
protected MiniDFSCluster dfsCluster;
|
||||
protected int regionServers;
|
||||
|
@ -85,7 +83,6 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
/**
|
||||
* Actually start the MiniHBase instance.
|
||||
*/
|
||||
@SuppressWarnings("unused")
|
||||
protected void hBaseClusterSetup() throws Exception {
|
||||
// start the mini cluster
|
||||
this.cluster = new MiniHBaseCluster(conf, regionServers);
|
||||
|
@ -93,7 +90,7 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
// We need to sleep because we cannot open a HTable when the cluster
|
||||
// is not ready
|
||||
Thread.sleep(5000);
|
||||
HTable meta = new HTable(conf, ".META.");
|
||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -112,12 +109,12 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
|
|||
|
||||
// mangle the conf so that the fs parameter points to the minidfs we
|
||||
// just started up
|
||||
FileSystem fs = dfsCluster.getFileSystem();
|
||||
conf.set("fs.default.name", fs.getUri().toString());
|
||||
Path parentdir = fs.getHomeDirectory();
|
||||
FileSystem filesystem = dfsCluster.getFileSystem();
|
||||
conf.set("fs.default.name", filesystem.getUri().toString());
|
||||
Path parentdir = filesystem.getHomeDirectory();
|
||||
conf.set(HConstants.HBASE_DIR, parentdir.toString());
|
||||
fs.mkdirs(parentdir);
|
||||
FSUtils.setVersion(fs, parentdir);
|
||||
filesystem.mkdirs(parentdir);
|
||||
FSUtils.setVersion(filesystem, parentdir);
|
||||
}
|
||||
|
||||
// do the super setup now. if we had done it first, then we would have
|
||||
|
|
|
@ -29,7 +29,10 @@ import org.apache.hadoop.hbase.HBaseTestCase;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.io.HBaseMapFile;
|
||||
import org.apache.hadoop.hbase.io.HalfMapFileReader;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
|
@ -114,6 +117,25 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testContains() throws Exception {
|
||||
HStoreFile hsf = new HStoreFile(this.conf, this.fs, this.dir,
|
||||
HRegionInfo.FIRST_META_REGIONINFO,
|
||||
Bytes.toBytes("colfamily"), 1234567890L, null);
|
||||
MapFile.Writer writer =
|
||||
hsf.getWriter(this.fs, SequenceFile.CompressionType.NONE, false, 0);
|
||||
writeStoreFile(writer);
|
||||
MapFile.Reader r = hsf.getReader(this.fs, false, false);
|
||||
HBaseMapFile.HBaseReader reader =
|
||||
(HBaseMapFile.HBaseReader)r;
|
||||
// Store file should contain 'aa' and 'bb' but not 'AA' nor 'ZZ'.
|
||||
assertTrue(reader.containsKey(new HStoreKey("aa", "bb")));
|
||||
assertTrue(reader.containsKey(new HStoreKey("bb")));
|
||||
assertTrue(reader.containsKey(new HStoreKey("zz")));
|
||||
assertFalse(reader.containsKey(new HStoreKey("AA")));
|
||||
assertFalse(reader.containsKey(new HStoreKey("{{")));
|
||||
assertFalse(reader.containsKey(new HStoreKey()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that our mechanism of writing store files in one region to reference
|
||||
* store files in other regions works.
|
||||
|
@ -137,9 +159,9 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
reader.finalKey(hsk);
|
||||
byte [] finalKey = hsk.getRow();
|
||||
// Make a reference for the bottom half of the just written file.
|
||||
HStoreFile.Reference reference =
|
||||
new HStoreFile.Reference(hsf.getEncodedRegionName(), hsf.getFileId(),
|
||||
midkey, HStoreFile.Range.top);
|
||||
Reference reference =
|
||||
new Reference(hsf.getEncodedRegionName(), hsf.getFileId(),
|
||||
midkey, Reference.Range.top);
|
||||
HStoreFile refHsf = new HStoreFile(this.conf, this.fs,
|
||||
new Path(DIR, getName()),
|
||||
HRegionInfo.FIRST_META_REGIONINFO,
|
||||
|
@ -149,7 +171,7 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
refHsf.writeReferenceFiles(this.fs);
|
||||
assertTrue(this.fs.exists(refHsf.getMapFilePath()));
|
||||
assertTrue(this.fs.exists(refHsf.getInfoFilePath()));
|
||||
HStoreFile.Reference otherReference =
|
||||
Reference otherReference =
|
||||
HStoreFile.readSplitInfo(refHsf.getInfoFilePath(), this.fs);
|
||||
assertEquals(reference.getEncodedRegionName(),
|
||||
otherReference.getEncodedRegionName());
|
||||
|
@ -170,6 +192,13 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
assertTrue(Bytes.equals(key.getRow(), finalKey));
|
||||
// Assert contains works properly.
|
||||
HBaseMapFile.HBaseReader hbaseMapfileHalfReader =
|
||||
(HBaseMapFile.HBaseReader)halfReader;
|
||||
assertTrue(hbaseMapfileHalfReader.containsKey(midkey));
|
||||
assertTrue(hbaseMapfileHalfReader.containsKey(new HStoreKey(finalKey)));
|
||||
assertFalse(hbaseMapfileHalfReader.containsKey(new HStoreKey("aa")));
|
||||
assertFalse(hbaseMapfileHalfReader.containsKey(new HStoreKey("{{")));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -223,8 +252,8 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
// Now make two HalfMapFiles and assert they can read the full backing
|
||||
// file, one from the top and the other from the bottom.
|
||||
// Test bottom half first.
|
||||
bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, HStoreFile.Range.bottom, midkey, null);
|
||||
bottom = new HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, Reference.Range.bottom, midkey, null);
|
||||
boolean first = true;
|
||||
while (bottom.next(key, value)) {
|
||||
previous = key.toString();
|
||||
|
@ -238,34 +267,29 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
LOG.info("Last in bottom: " + previous.toString());
|
||||
}
|
||||
// Now test reading from the top.
|
||||
top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf,
|
||||
HStoreFile.Range.top, midkey, null);
|
||||
top = new HalfMapFileReader(this.fs, p.toString(), this.conf,
|
||||
Reference.Range.top, midkey, null);
|
||||
first = true;
|
||||
while (top.next(key, value)) {
|
||||
assertTrue(key.compareTo((HStoreKey)midkey) >= 0);
|
||||
if (first) {
|
||||
first = false;
|
||||
assertTrue(Bytes.equals(((HStoreKey)midkey).getRow(),
|
||||
key.getRow()));
|
||||
LOG.info("First in top: " + key.toString());
|
||||
}
|
||||
}
|
||||
LOG.info("Last in top: " + key.toString());
|
||||
top.getClosest(midkey, value);
|
||||
// Assert value is same as key.
|
||||
assertTrue(Bytes.equals(value.get(), ((HStoreKey)midkey).getRow()));
|
||||
|
||||
// Next test using a midkey that does not exist in the file.
|
||||
// First, do a key that is < than first key. Ensure splits behave
|
||||
// properly.
|
||||
WritableComparable badkey = new HStoreKey(" ");
|
||||
bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, HStoreFile.Range.bottom, badkey, null);
|
||||
bottom = new HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, Reference.Range.bottom, badkey, null);
|
||||
// When badkey is < than the bottom, should return no values.
|
||||
assertFalse(bottom.next(key, value));
|
||||
// Now read from the top.
|
||||
top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf,
|
||||
HStoreFile.Range.top, badkey, null);
|
||||
top = new HalfMapFileReader(this.fs, p.toString(), this.conf,
|
||||
Reference.Range.top, badkey, null);
|
||||
first = true;
|
||||
while (top.next(key, value)) {
|
||||
assertTrue(key.compareTo((HStoreKey)badkey) >= 0);
|
||||
|
@ -286,8 +310,8 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
|
||||
// Test when badkey is > than last key in file ('||' > 'zz').
|
||||
badkey = new HStoreKey("|||");
|
||||
bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, HStoreFile.Range.bottom, badkey, null);
|
||||
bottom = new HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, Reference.Range.bottom, badkey, null);
|
||||
first = true;
|
||||
while (bottom.next(key, value)) {
|
||||
if (first) {
|
||||
|
@ -305,8 +329,8 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
assertTrue(tmp.charAt(i) == 'z');
|
||||
}
|
||||
// Now look at top. Should not return any values.
|
||||
top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf,
|
||||
HStoreFile.Range.top, badkey, null);
|
||||
top = new HalfMapFileReader(this.fs, p.toString(), this.conf,
|
||||
Reference.Range.top, badkey, null);
|
||||
assertFalse(top.next(key, value));
|
||||
|
||||
} finally {
|
||||
|
@ -338,13 +362,13 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
// First, do a key that is < than first key. Ensure splits behave
|
||||
// properly.
|
||||
HStoreKey midkey = new HStoreKey(" ");
|
||||
bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, HStoreFile.Range.bottom, midkey, null);
|
||||
bottom = new HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, Reference.Range.bottom, midkey, null);
|
||||
// When midkey is < than the bottom, should return no values.
|
||||
assertFalse(bottom.next(key, value));
|
||||
// Now read from the top.
|
||||
top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, HStoreFile.Range.top, midkey, null);
|
||||
top = new HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, Reference.Range.top, midkey, null);
|
||||
boolean first = true;
|
||||
while (top.next(key, value)) {
|
||||
assertTrue(key.compareTo(midkey) >= 0);
|
||||
|
@ -359,8 +383,8 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
|
||||
// Test when midkey is > than last key in file ('||' > 'zz').
|
||||
midkey = new HStoreKey("|||");
|
||||
bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, HStoreFile.Range.bottom, midkey, null);
|
||||
bottom = new HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, Reference.Range.bottom, midkey, null);
|
||||
first = true;
|
||||
while (bottom.next(key, value)) {
|
||||
if (first) {
|
||||
|
@ -372,8 +396,8 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
LOG.info("Last bottom when key > top: " + key.toString());
|
||||
assertEquals("zz", Bytes.toString(key.getRow()));
|
||||
// Now look at top. Should not return any values.
|
||||
top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, HStoreFile.Range.top, midkey, null);
|
||||
top = new HalfMapFileReader(this.fs, p.toString(),
|
||||
this.conf, Reference.Range.top, midkey, null);
|
||||
assertFalse(top.next(key, value));
|
||||
} finally {
|
||||
if (top != null) {
|
||||
|
|
|
@ -103,7 +103,7 @@ public class TestSplit extends HBaseClusterTestCase {
|
|||
Thread splitThread = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
HRegion [] regions = split(regionForThread, midkey);
|
||||
split(regionForThread, midkey);
|
||||
} catch (IOException e) {
|
||||
fail("Unexpected exception " + e);
|
||||
}
|
||||
|
@ -259,4 +259,4 @@ public class TestSplit extends HBaseClusterTestCase {
|
|||
assertEquals(regions.length, 2);
|
||||
return regions;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue