HBASE-696 Make bloomfilter true/false and self-sizing
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@676088 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ed58bcb163
commit
36f0d36de9
|
@ -14,6 +14,7 @@ Trunk (unreleased changes)
|
|||
HBASE-521 Improve client scanner interface
|
||||
HBASE-288 Add in-memory caching of data. Required update of hadoop to
|
||||
0.17.0-dev.2008-02-07_12-01-58. (Tom White via Stack)
|
||||
HBASE-696 Make bloomfilter true/false and self-sizing
|
||||
|
||||
BUG FIXES
|
||||
HBASE-574 HBase does not load hadoop native libs (Rong-En Fan via Stack)
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.io.WritableComparable;
|
|||
* m/n ln(2).
|
||||
*
|
||||
*/
|
||||
@Deprecated
|
||||
public class BloomFilterDescriptor implements WritableComparable {
|
||||
private static final double DEFAULT_NUMBER_OF_HASH_FUNCTIONS = 4.0;
|
||||
|
||||
|
@ -152,14 +153,12 @@ public class BloomFilterDescriptor implements WritableComparable {
|
|||
return value.toString();
|
||||
}
|
||||
|
||||
public BloomFilterType getType() {
|
||||
return filterType;
|
||||
}
|
||||
|
||||
/** @return the vector size */
|
||||
public int getVectorSize() {
|
||||
return vectorSize;
|
||||
}
|
||||
|
||||
|
||||
/** @return number of hash functions */
|
||||
public int getNbHash() {
|
||||
return nbHash;
|
||||
}
|
||||
|
|
|
@ -41,7 +41,8 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
|
||||
// Version 3 was when column names becaome byte arrays and when we picked up
|
||||
// Time-to-live feature. Version 4 was when we moved to byte arrays, HBASE-82.
|
||||
private static final byte COLUMN_DESCRIPTOR_VERSION = (byte)4;
|
||||
// Version 5 was when bloom filter descriptors were removed.
|
||||
private static final byte COLUMN_DESCRIPTOR_VERSION = (byte)5;
|
||||
|
||||
/**
|
||||
* The type of compression.
|
||||
|
@ -96,11 +97,6 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
*/
|
||||
public static final int DEFAULT_TTL = HConstants.FOREVER;
|
||||
|
||||
/**
|
||||
* Default bloom filter description.
|
||||
*/
|
||||
public static final BloomFilterDescriptor DEFAULT_BLOOMFILTER = null;
|
||||
|
||||
// Column family name
|
||||
private byte [] name;
|
||||
// Number of versions to keep
|
||||
|
@ -116,9 +112,7 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
// Time to live of cell contents, in seconds from last timestamp
|
||||
private int timeToLive = DEFAULT_TTL;
|
||||
// True if bloom filter was specified
|
||||
private boolean bloomFilterSpecified = false;
|
||||
// Descriptor of bloom filter
|
||||
private BloomFilterDescriptor bloomFilter = DEFAULT_BLOOMFILTER;
|
||||
private boolean bloomFilter = false;
|
||||
|
||||
/**
|
||||
* Default constructor. Must be present for Writable.
|
||||
|
@ -155,11 +149,9 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
*/
|
||||
public HColumnDescriptor(final byte [] columnName) {
|
||||
this (columnName == null || columnName.length <= 0?
|
||||
HConstants.EMPTY_BYTE_ARRAY: columnName,
|
||||
DEFAULT_VERSIONS, DEFAULT_COMPRESSION, DEFAULT_IN_MEMORY,
|
||||
DEFAULT_BLOCKCACHE,
|
||||
Integer.MAX_VALUE, DEFAULT_TTL,
|
||||
DEFAULT_BLOOMFILTER);
|
||||
HConstants.EMPTY_BYTE_ARRAY: columnName, DEFAULT_VERSIONS,
|
||||
DEFAULT_COMPRESSION, DEFAULT_IN_MEMORY, DEFAULT_BLOCKCACHE,
|
||||
Integer.MAX_VALUE, DEFAULT_TTL, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -182,9 +174,8 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
*/
|
||||
public HColumnDescriptor(final byte [] columnName, final int maxVersions,
|
||||
final CompressionType compression, final boolean inMemory,
|
||||
final boolean blockCacheEnabled,
|
||||
final int maxValueLength, final int timeToLive,
|
||||
final BloomFilterDescriptor bloomFilter) {
|
||||
final boolean blockCacheEnabled, final int maxValueLength,
|
||||
final int timeToLive, final boolean bloomFilter) {
|
||||
isLegalFamilyName(columnName);
|
||||
this.name = stripColon(columnName);
|
||||
if (maxVersions <= 0) {
|
||||
|
@ -198,7 +189,6 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
this.maxValueLength = maxValueLength;
|
||||
this.timeToLive = timeToLive;
|
||||
this.bloomFilter = bloomFilter;
|
||||
this.bloomFilterSpecified = this.bloomFilter == null ? false : true;
|
||||
this.compressionType = compression;
|
||||
}
|
||||
|
||||
|
@ -295,9 +285,9 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return Bloom filter descriptor or null if none set.
|
||||
* @return true if a bloom filter is enabled
|
||||
*/
|
||||
public BloomFilterDescriptor getBloomFilter() {
|
||||
public boolean isBloomFilterEnabled() {
|
||||
return this.bloomFilter;
|
||||
}
|
||||
|
||||
|
@ -313,9 +303,7 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
", " + TTL + " => " +
|
||||
(timeToLive == HConstants.FOREVER ? "FOREVER" :
|
||||
Integer.toString(timeToLive)) +
|
||||
", " + BLOOMFILTER + " => " +
|
||||
(bloomFilterSpecified ? bloomFilter.toString() : CompressionType.NONE) +
|
||||
"}";
|
||||
", " + BLOOMFILTER + " => " + bloomFilter + "}";
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
@ -334,11 +322,8 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
result ^= Boolean.valueOf(this.blockCacheEnabled).hashCode();
|
||||
result ^= Integer.valueOf(this.maxValueLength).hashCode();
|
||||
result ^= Integer.valueOf(this.timeToLive).hashCode();
|
||||
result ^= Boolean.valueOf(this.bloomFilterSpecified).hashCode();
|
||||
result ^= Boolean.valueOf(this.bloomFilter).hashCode();
|
||||
result ^= Byte.valueOf(COLUMN_DESCRIPTOR_VERSION).hashCode();
|
||||
if (this.bloomFilterSpecified) {
|
||||
result ^= this.bloomFilter.hashCode();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -362,13 +347,15 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
this.compressionType = CompressionType.values()[ordinal];
|
||||
this.inMemory = in.readBoolean();
|
||||
this.maxValueLength = in.readInt();
|
||||
this.bloomFilterSpecified = in.readBoolean();
|
||||
|
||||
if(bloomFilterSpecified) {
|
||||
bloomFilter = new BloomFilterDescriptor();
|
||||
bloomFilter.readFields(in);
|
||||
this.bloomFilter = in.readBoolean();
|
||||
if (this.bloomFilter && versionNumber < 5) {
|
||||
// If a bloomFilter is enabled and the column descriptor is less than
|
||||
// version 5, we need to skip over it to read the rest of the column
|
||||
// descriptor. There are no BloomFilterDescriptors written to disk for
|
||||
// column descriptors with a version number >= 5
|
||||
BloomFilterDescriptor junk = new BloomFilterDescriptor();
|
||||
junk.readFields(in);
|
||||
}
|
||||
|
||||
if (versionNumber > 1) {
|
||||
this.blockCacheEnabled = in.readBoolean();
|
||||
}
|
||||
|
@ -386,11 +373,7 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
out.writeInt(this.compressionType.ordinal());
|
||||
out.writeBoolean(this.inMemory);
|
||||
out.writeInt(this.maxValueLength);
|
||||
out.writeBoolean(this.bloomFilterSpecified);
|
||||
|
||||
if(bloomFilterSpecified) {
|
||||
bloomFilter.write(out);
|
||||
}
|
||||
out.writeBoolean(this.bloomFilter);
|
||||
out.writeBoolean(this.blockCacheEnabled);
|
||||
out.writeInt(this.timeToLive);
|
||||
}
|
||||
|
@ -443,21 +426,16 @@ public class HColumnDescriptor implements WritableComparable {
|
|||
}
|
||||
|
||||
if(result == 0) {
|
||||
if(this.bloomFilterSpecified == other.bloomFilterSpecified) {
|
||||
if(this.bloomFilter == other.bloomFilter) {
|
||||
result = 0;
|
||||
|
||||
} else if(this.bloomFilterSpecified) {
|
||||
} else if(this.bloomFilter) {
|
||||
result = -1;
|
||||
|
||||
} else {
|
||||
result = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if(result == 0 && this.bloomFilterSpecified) {
|
||||
result = this.bloomFilter.compareTo(other.bloomFilter);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -40,9 +40,9 @@ public interface HConstants {
|
|||
|
||||
/**
|
||||
* Current version of file system
|
||||
* Version 3 added 'historian' family to .META.
|
||||
* Version 4 supports only one kind of bloom filter
|
||||
*/
|
||||
public static final String FILE_SYSTEM_VERSION = "3";
|
||||
public static final String FILE_SYSTEM_VERSION = "4";
|
||||
|
||||
// Configuration parameters
|
||||
|
||||
|
|
|
@ -42,17 +42,17 @@ public class HTableDescriptor implements WritableComparable {
|
|||
HConstants.ROOT_TABLE_NAME,
|
||||
new HColumnDescriptor[] { new HColumnDescriptor(HConstants.COLUMN_FAMILY,
|
||||
1, HColumnDescriptor.CompressionType.NONE, false, false,
|
||||
Integer.MAX_VALUE, HConstants.FOREVER, null) });
|
||||
Integer.MAX_VALUE, HConstants.FOREVER, false) });
|
||||
|
||||
/** Table descriptor for <code>.META.</code> catalog table */
|
||||
public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor(
|
||||
HConstants.META_TABLE_NAME, new HColumnDescriptor[] {
|
||||
new HColumnDescriptor(HConstants.COLUMN_FAMILY, 1,
|
||||
HColumnDescriptor.CompressionType.NONE, false, false,
|
||||
Integer.MAX_VALUE, HConstants.FOREVER, null),
|
||||
Integer.MAX_VALUE, HConstants.FOREVER, false),
|
||||
new HColumnDescriptor(HConstants.COLUMN_FAMILY_HISTORIAN,
|
||||
HConstants.ALL_VERSIONS, HColumnDescriptor.CompressionType.NONE,
|
||||
false, false, Integer.MAX_VALUE, HConstants.FOREVER, null) });
|
||||
false, false, Integer.MAX_VALUE, HConstants.FOREVER, false) });
|
||||
|
||||
private boolean rootregion = false;
|
||||
private boolean metaregion = false;
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/** Interface for generic reader for compactions */
|
||||
interface CompactionReader {
|
||||
|
||||
/**
|
||||
* Closes the reader
|
||||
* @throws IOException
|
||||
*/
|
||||
public void close() throws IOException;
|
||||
|
||||
/**
|
||||
* Get the next key/value pair
|
||||
*
|
||||
* @param key
|
||||
* @param val
|
||||
* @return true if more data was returned
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean next(WritableComparable key, Writable val)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Resets the reader
|
||||
* @throws IOException
|
||||
*/
|
||||
public void reset() throws IOException;
|
||||
}
|
|
@ -2147,8 +2147,5 @@ public class HRegion implements HConstants {
|
|||
throws IOException {
|
||||
fs.mkdirs(HStoreFile.getMapDir(basedir, encodedRegionName, colFamily));
|
||||
fs.mkdirs(HStoreFile.getInfoDir(basedir, encodedRegionName, colFamily));
|
||||
if (tabledesc.getFamily(colFamily).getBloomFilter() != null) {
|
||||
fs.mkdirs(HStoreFile.getFilterDir(basedir, encodedRegionName, colFamily));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
|
@ -38,12 +37,9 @@ import java.util.regex.Pattern;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.BloomFilterDescriptor;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -60,10 +56,6 @@ import org.apache.hadoop.io.SequenceFile;
|
|||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.onelab.filter.BloomFilter;
|
||||
import org.onelab.filter.CountingBloomFilter;
|
||||
import org.onelab.filter.Filter;
|
||||
import org.onelab.filter.RetouchedBloomFilter;
|
||||
|
||||
/**
|
||||
* HStore maintains a bunch of data files. It is responsible for maintaining
|
||||
|
@ -84,8 +76,6 @@ public class HStore implements HConstants {
|
|||
private static final Pattern REF_NAME_PARSER =
|
||||
Pattern.compile("^(\\d+)(?:\\.(.+))?$");
|
||||
|
||||
private static final String BLOOMFILTER_FILE_NAME = "filter";
|
||||
|
||||
protected final Memcache memcache;
|
||||
private final Path basedir;
|
||||
private final HRegionInfo info;
|
||||
|
@ -93,8 +83,6 @@ public class HStore implements HConstants {
|
|||
private final SequenceFile.CompressionType compression;
|
||||
final FileSystem fs;
|
||||
private final HBaseConfiguration conf;
|
||||
private final Path filterDir;
|
||||
final Filter bloomFilter;
|
||||
protected long ttl;
|
||||
|
||||
private final long desiredMaxFileSize;
|
||||
|
@ -215,18 +203,6 @@ public class HStore implements HConstants {
|
|||
fs.mkdirs(infodir);
|
||||
}
|
||||
|
||||
if(family.getBloomFilter() == null) {
|
||||
this.filterDir = null;
|
||||
this.bloomFilter = null;
|
||||
} else {
|
||||
this.filterDir = HStoreFile.getFilterDir(basedir, info.getEncodedName(),
|
||||
family.getName());
|
||||
if (!fs.exists(filterDir)) {
|
||||
fs.mkdirs(filterDir);
|
||||
}
|
||||
this.bloomFilter = loadOrCreateBloomFilter();
|
||||
}
|
||||
|
||||
// Go through the 'mapdir' and 'infodir' together, make sure that all
|
||||
// MapFiles are in a reliable state. Every entry in 'mapdir' must have a
|
||||
// corresponding one in 'loginfodir'. Without a corresponding log info
|
||||
|
@ -266,11 +242,12 @@ public class HStore implements HConstants {
|
|||
if (first) {
|
||||
// Use a block cache (if configured) for the first reader only
|
||||
// so as to control memory usage.
|
||||
r = e.getValue().getReader(this.fs, this.bloomFilter,
|
||||
r = e.getValue().getReader(this.fs, this.family.isBloomFilterEnabled(),
|
||||
family.isBlockCacheEnabled());
|
||||
first = false;
|
||||
} else {
|
||||
r = e.getValue().getReader(this.fs, this.bloomFilter);
|
||||
r = e.getValue().getReader(this.fs, this.family.isBloomFilterEnabled(),
|
||||
false);
|
||||
}
|
||||
this.readers.put(e.getKey(), r);
|
||||
}
|
||||
|
@ -516,105 +493,6 @@ public class HStore implements HConstants {
|
|||
this.fs.getFileStatus(f).getLen() == 0;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// Bloom filters
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Called by constructor if a bloom filter is enabled for this column family.
|
||||
* If the HStore already exists, it will read in the bloom filter saved
|
||||
* previously. Otherwise, it will create a new bloom filter.
|
||||
*/
|
||||
private Filter loadOrCreateBloomFilter() throws IOException {
|
||||
Path filterFile = new Path(filterDir, BLOOMFILTER_FILE_NAME);
|
||||
Filter bloomFilter = null;
|
||||
if(fs.exists(filterFile)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("loading bloom filter for " + this.storeNameStr);
|
||||
}
|
||||
|
||||
BloomFilterDescriptor.BloomFilterType type =
|
||||
family.getBloomFilter().getType();
|
||||
|
||||
switch(type) {
|
||||
|
||||
case BLOOMFILTER:
|
||||
bloomFilter = new BloomFilter();
|
||||
break;
|
||||
|
||||
case COUNTING_BLOOMFILTER:
|
||||
bloomFilter = new CountingBloomFilter();
|
||||
break;
|
||||
|
||||
case RETOUCHED_BLOOMFILTER:
|
||||
bloomFilter = new RetouchedBloomFilter();
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IllegalArgumentException("unknown bloom filter type: " +
|
||||
type);
|
||||
}
|
||||
FSDataInputStream in = fs.open(filterFile);
|
||||
try {
|
||||
bloomFilter.readFields(in);
|
||||
} finally {
|
||||
fs.close();
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("creating bloom filter for " + this.storeNameStr);
|
||||
}
|
||||
|
||||
BloomFilterDescriptor.BloomFilterType type =
|
||||
family.getBloomFilter().getType();
|
||||
|
||||
switch(type) {
|
||||
|
||||
case BLOOMFILTER:
|
||||
bloomFilter = new BloomFilter(family.getBloomFilter().getVectorSize(),
|
||||
family.getBloomFilter().getNbHash());
|
||||
break;
|
||||
|
||||
case COUNTING_BLOOMFILTER:
|
||||
bloomFilter =
|
||||
new CountingBloomFilter(family.getBloomFilter().getVectorSize(),
|
||||
family.getBloomFilter().getNbHash());
|
||||
break;
|
||||
|
||||
case RETOUCHED_BLOOMFILTER:
|
||||
bloomFilter =
|
||||
new RetouchedBloomFilter(family.getBloomFilter().getVectorSize(),
|
||||
family.getBloomFilter().getNbHash());
|
||||
}
|
||||
}
|
||||
return bloomFilter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes bloom filter to disk
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void flushBloomFilter() throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("flushing bloom filter for " + this.storeNameStr);
|
||||
}
|
||||
FSDataOutputStream out =
|
||||
fs.create(new Path(filterDir, BLOOMFILTER_FILE_NAME));
|
||||
try {
|
||||
bloomFilter.write(out);
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("flushed bloom filter for " + this.storeNameStr);
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// End bloom filters
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Adds a value to the memcache
|
||||
*
|
||||
|
@ -704,7 +582,7 @@ public class HStore implements HConstants {
|
|||
HStoreFile flushedFile = new HStoreFile(conf, fs, basedir,
|
||||
info.getEncodedName(), family.getName(), -1L, null);
|
||||
MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
|
||||
this.bloomFilter);
|
||||
this.family.isBloomFilterEnabled(), cache.size());
|
||||
|
||||
// Here we tried picking up an existing HStoreFile from disk and
|
||||
// interlacing the memcache flush compacting as we go. The notion was
|
||||
|
@ -746,12 +624,7 @@ public class HStore implements HConstants {
|
|||
// MapFile. The MapFile is current up to and including the log seq num.
|
||||
flushedFile.writeInfo(fs, logCacheFlushId);
|
||||
|
||||
// C. Flush the bloom filter if any
|
||||
if (bloomFilter != null) {
|
||||
flushBloomFilter();
|
||||
}
|
||||
|
||||
// D. Finally, make the new MapFile available.
|
||||
// C. Finally, make the new MapFile available.
|
||||
updateReaders(logCacheFlushId, flushedFile);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Added " + FSUtils.getPath(flushedFile.getMapFilePath()) +
|
||||
|
@ -778,7 +651,8 @@ public class HStore implements HConstants {
|
|||
Long flushid = Long.valueOf(logCacheFlushId);
|
||||
// Open the map file reader.
|
||||
this.readers.put(flushid,
|
||||
flushedFile.getReader(this.fs, this.bloomFilter));
|
||||
flushedFile.getReader(this.fs, this.family.isBloomFilterEnabled(),
|
||||
this.family.isBlockCacheEnabled()));
|
||||
this.storefiles.put(flushid, flushedFile);
|
||||
// Tell listeners of the change in readers.
|
||||
notifyChangedReadersObservers();
|
||||
|
@ -819,21 +693,6 @@ public class HStore implements HConstants {
|
|||
// Compaction
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/*
|
||||
* @param files
|
||||
* @return True if any of the files in <code>files</code> are References.
|
||||
*/
|
||||
private boolean hasReferences(Collection<HStoreFile> files) {
|
||||
if (files != null && files.size() > 0) {
|
||||
for (HStoreFile hsf: files) {
|
||||
if (hsf.isReference()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compact the back-HStores. This method may take some time, so the calling
|
||||
* thread must be able to block for long periods.
|
||||
|
@ -858,44 +717,69 @@ public class HStore implements HConstants {
|
|||
StoreSize compact(final boolean force) throws IOException {
|
||||
synchronized (compactLock) {
|
||||
long maxId = -1;
|
||||
int nrows = -1;
|
||||
List<HStoreFile> filesToCompact = null;
|
||||
synchronized (storefiles) {
|
||||
if (this.storefiles.size() <= 0) {
|
||||
return null;
|
||||
}
|
||||
filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
|
||||
if (!force && !hasReferences(filesToCompact) &&
|
||||
filesToCompact.size() < compactionThreshold) {
|
||||
return checkSplit();
|
||||
}
|
||||
if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) {
|
||||
LOG.warn("Mkdir on " + compactionDir.toString() + " failed");
|
||||
return checkSplit();
|
||||
}
|
||||
|
||||
// The max-sequenceID in any of the to-be-compacted TreeMaps is the
|
||||
// last key of storefiles.
|
||||
maxId = this.storefiles.lastKey().longValue();
|
||||
}
|
||||
if (!force && filesToCompact.size() < compactionThreshold) {
|
||||
return checkSplit();
|
||||
}
|
||||
if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) {
|
||||
LOG.warn("Mkdir on " + compactionDir.toString() + " failed");
|
||||
return checkSplit();
|
||||
}
|
||||
/*
|
||||
* We create a new list of MapFile.Reader objects so we don't screw up the
|
||||
* caching associated with the currently-loaded ones. Our iteration-based
|
||||
* access pattern is practically designed to ruin the cache.
|
||||
*/
|
||||
List<MapFile.Reader> readers = new ArrayList<MapFile.Reader>();
|
||||
for (HStoreFile file: filesToCompact) {
|
||||
try {
|
||||
HStoreFile.BloomFilterMapFile.Reader reader = file.getReader(fs,
|
||||
this.family.isBloomFilterEnabled(), false);
|
||||
readers.add(reader);
|
||||
|
||||
// Compute the size of the new bloomfilter if needed
|
||||
if (this.family.isBloomFilterEnabled()) {
|
||||
nrows += reader.getBloomFilterSize();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// Add info about which file threw exception. It may not be in the
|
||||
// exception message so output a message here where we know the
|
||||
// culprit.
|
||||
LOG.warn("Failed with " + e.toString() + ": " + file.toString());
|
||||
closeCompactionReaders(readers);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
// Storefiles are keyed by sequence id. The oldest file comes first.
|
||||
// We need to return out of here a List that has the newest file first.
|
||||
Collections.reverse(filesToCompact);
|
||||
Collections.reverse(readers);
|
||||
|
||||
// Step through them, writing to the brand-new MapFile
|
||||
HStoreFile compactedOutputFile = new HStoreFile(conf, fs,
|
||||
this.compactionDir, info.getEncodedName(), family.getName(),
|
||||
-1L, null);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("started compaction of " + filesToCompact.size() +
|
||||
" files " + filesToCompact.toString() + " into " +
|
||||
LOG.debug("started compaction of " + readers.size() + " files into " +
|
||||
FSUtils.getPath(compactedOutputFile.getMapFilePath()));
|
||||
}
|
||||
MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
|
||||
this.compression, this.bloomFilter);
|
||||
MapFile.Writer writer = compactedOutputFile.getWriter(this.fs,
|
||||
this.compression, this.family.isBloomFilterEnabled(), nrows);
|
||||
try {
|
||||
compactHStoreFiles(compactedOut, filesToCompact);
|
||||
compactHStoreFiles(writer, readers);
|
||||
} finally {
|
||||
compactedOut.close();
|
||||
writer.close();
|
||||
}
|
||||
|
||||
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
|
||||
|
@ -913,36 +797,17 @@ public class HStore implements HConstants {
|
|||
}
|
||||
|
||||
/*
|
||||
* Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>.
|
||||
* We create a new set of MapFile.Reader objects so we don't screw up the
|
||||
* caching associated with the currently-loaded ones. Our iteration-based
|
||||
* access pattern is practically designed to ruin the cache.
|
||||
* Compact a list of MapFile.Readers into MapFile.Writer.
|
||||
*
|
||||
* We work by opening a single MapFile.Reader for each file, and iterating
|
||||
* through them in parallel. We always increment the lowest-ranked one.
|
||||
* We work by iterating through the readers in parallel. We always increment
|
||||
* the lowest-ranked one.
|
||||
* Updates to a single row/column will appear ranked by timestamp. This allows
|
||||
* us to throw out deleted values or obsolete versions. @param compactedOut
|
||||
* @param toCompactFiles @throws IOException
|
||||
* us to throw out deleted values or obsolete versions.
|
||||
*/
|
||||
private void compactHStoreFiles(final MapFile.Writer compactedOut,
|
||||
final List<HStoreFile> toCompactFiles) throws IOException {
|
||||
final List<MapFile.Reader> readers) throws IOException {
|
||||
|
||||
int size = toCompactFiles.size();
|
||||
CompactionReader[] rdrs = new CompactionReader[size];
|
||||
int index = 0;
|
||||
for (HStoreFile hsf: toCompactFiles) {
|
||||
try {
|
||||
rdrs[index++] =
|
||||
new MapFileCompactionReader(hsf.getReader(fs, bloomFilter));
|
||||
} catch (IOException e) {
|
||||
// Add info about which file threw exception. It may not be in the
|
||||
// exception message so output a message here where we know the
|
||||
// culprit.
|
||||
LOG.warn("Failed with " + e.toString() + ": " + hsf.toString());
|
||||
closeCompactionReaders(rdrs);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
MapFile.Reader[] rdrs = readers.toArray(new MapFile.Reader[readers.size()]);
|
||||
try {
|
||||
HStoreKey[] keys = new HStoreKey[rdrs.length];
|
||||
ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length];
|
||||
|
@ -1035,18 +900,16 @@ public class HStore implements HConstants {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
closeCompactionReaders(rdrs);
|
||||
closeCompactionReaders(readers);
|
||||
}
|
||||
}
|
||||
|
||||
private void closeCompactionReaders(final CompactionReader [] rdrs) {
|
||||
for (int i = 0; i < rdrs.length; i++) {
|
||||
if (rdrs[i] != null) {
|
||||
try {
|
||||
rdrs[i].close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception closing reader for " + this.storeNameStr, e);
|
||||
}
|
||||
private void closeCompactionReaders(final List<MapFile.Reader> rdrs) {
|
||||
for (MapFile.Reader r: rdrs) {
|
||||
try {
|
||||
r.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception closing reader for " + this.storeNameStr, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1163,10 +1026,11 @@ public class HStore implements HConstants {
|
|||
// Add new compacted Reader and store file.
|
||||
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
|
||||
this.readers.put(orderVal,
|
||||
// Use a block cache (if configured) for this reader since
|
||||
// Use a block cache (if configured) for this reader since
|
||||
// it is the only one.
|
||||
finalCompactedFile.getReader(this.fs, this.bloomFilter, family
|
||||
.isBlockCacheEnabled()));
|
||||
finalCompactedFile.getReader(this.fs,
|
||||
this.family.isBloomFilterEnabled(),
|
||||
this.family.isBlockCacheEnabled()));
|
||||
this.storefiles.put(orderVal, finalCompactedFile);
|
||||
// Tell observers that list of Readers has changed.
|
||||
notifyChangedReadersObservers();
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.io.DataOutput;
|
|||
import java.io.DataOutputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -45,7 +44,7 @@ import org.apache.hadoop.io.MapFile;
|
|||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.onelab.filter.Filter;
|
||||
import org.onelab.filter.BloomFilter;
|
||||
import org.onelab.filter.Key;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -394,33 +393,13 @@ public class HStoreFile implements HConstants {
|
|||
* Get reader for the store file map file.
|
||||
* Client is responsible for closing file when done.
|
||||
* @param fs
|
||||
* @param bloomFilter If null, no filtering is done.
|
||||
* @return MapFile.Reader
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized MapFile.Reader getReader(final FileSystem fs,
|
||||
final Filter bloomFilter)
|
||||
throws IOException {
|
||||
if (isReference()) {
|
||||
return new HStoreFile.HalfMapFileReader(fs,
|
||||
getMapFilePath(reference).toString(), conf,
|
||||
reference.getFileRegion(), reference.getMidkey(), bloomFilter);
|
||||
}
|
||||
return new BloomFilterMapFile.Reader(fs, getMapFilePath().toString(),
|
||||
conf, bloomFilter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get reader for the store file map file.
|
||||
* Client is responsible for closing file when done.
|
||||
* @param fs
|
||||
* @param bloomFilter If null, no filtering is done.
|
||||
* @param bloomFilter If true, a bloom filter exists
|
||||
* @param blockCacheEnabled If true, MapFile blocks should be cached.
|
||||
* @return MapFile.Reader
|
||||
* @return BloomFilterMapFile.Reader
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized MapFile.Reader getReader(final FileSystem fs,
|
||||
final Filter bloomFilter, final boolean blockCacheEnabled)
|
||||
public synchronized BloomFilterMapFile.Reader getReader(final FileSystem fs,
|
||||
final boolean bloomFilter, final boolean blockCacheEnabled)
|
||||
throws IOException {
|
||||
if (isReference()) {
|
||||
return new HStoreFile.HalfMapFileReader(fs,
|
||||
|
@ -438,20 +417,21 @@ public class HStoreFile implements HConstants {
|
|||
* @param fs
|
||||
* @param compression Pass <code>SequenceFile.CompressionType.NONE</code>
|
||||
* for none.
|
||||
* @param bloomFilter If null, no filtering is done.
|
||||
* @param bloomFilter If true, create a bloom filter
|
||||
* @param nrows number of rows expected. Required if bloomFilter is true.
|
||||
* @return MapFile.Writer
|
||||
* @throws IOException
|
||||
*/
|
||||
public MapFile.Writer getWriter(final FileSystem fs,
|
||||
final SequenceFile.CompressionType compression,
|
||||
final Filter bloomFilter)
|
||||
final boolean bloomFilter, int nrows)
|
||||
throws IOException {
|
||||
if (isReference()) {
|
||||
throw new IOException("Illegal Access: Cannot get a writer on a" +
|
||||
"HStoreFile reference");
|
||||
}
|
||||
return new BloomFilterMapFile.Writer(conf, fs,
|
||||
getMapFilePath().toString(), compression, bloomFilter);
|
||||
getMapFilePath().toString(), compression, bloomFilter, nrows);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -472,25 +452,6 @@ public class HStoreFile implements HConstants {
|
|||
(isReference()? "-" + reference.toString(): "");
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom bloom filter key maker.
|
||||
* @param key
|
||||
* @return Key made of bytes of row and column only.
|
||||
* @throws IOException
|
||||
*/
|
||||
static Key getBloomFilterKey(WritableComparable key)
|
||||
throws IOException {
|
||||
HStoreKey hsk = (HStoreKey)key;
|
||||
byte [] bytes = null;
|
||||
try {
|
||||
bytes = (hsk.getRow().toString() + hsk.getColumn().toString()).
|
||||
getBytes(UTF8_ENCODING);
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new IOException(e.toString());
|
||||
}
|
||||
return new Key(bytes);
|
||||
}
|
||||
|
||||
static boolean isTopFileRegion(final Range r) {
|
||||
return r.equals(Range.top);
|
||||
}
|
||||
|
@ -529,6 +490,7 @@ public class HStoreFile implements HConstants {
|
|||
* @param f Column family.
|
||||
* @return the bloom filter directory path
|
||||
*/
|
||||
@Deprecated
|
||||
public static Path getFilterDir(Path dir, int encodedRegionName,
|
||||
final byte [] f) {
|
||||
return getFamilySubDir(dir, encodedRegionName, f, HSTORE_FILTER_DIR);
|
||||
|
@ -626,6 +588,15 @@ public class HStoreFile implements HConstants {
|
|||
static final Class<? extends Writable> VALUE_CLASS =
|
||||
ImmutableBytesWritable.class;
|
||||
|
||||
/**
|
||||
* Custom bloom filter key maker.
|
||||
* @param key
|
||||
* @return Key made of bytes of row only.
|
||||
*/
|
||||
protected static Key getBloomFilterKey(WritableComparable key) {
|
||||
return new Key(((HStoreKey) key).getRow());
|
||||
}
|
||||
|
||||
/**
|
||||
* A reader capable of reading and caching blocks of the data file.
|
||||
*/
|
||||
|
@ -718,22 +689,10 @@ public class HStoreFile implements HConstants {
|
|||
* filter is null, just passes invocation to parent.
|
||||
*/
|
||||
static class BloomFilterMapFile extends HbaseMapFile {
|
||||
static class Reader extends HbaseReader {
|
||||
private final Filter bloomFilter;
|
||||
protected static final String BLOOMFILTER_FILE_NAME = "filter";
|
||||
|
||||
/**
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param conf
|
||||
* @param filter
|
||||
* @throws IOException
|
||||
*/
|
||||
public Reader(FileSystem fs, String dirName, Configuration conf,
|
||||
final Filter filter)
|
||||
throws IOException {
|
||||
super(fs, dirName, conf);
|
||||
bloomFilter = filter;
|
||||
}
|
||||
static class Reader extends HbaseReader {
|
||||
private final BloomFilter bloomFilter;
|
||||
|
||||
/**
|
||||
* @param fs
|
||||
|
@ -744,10 +703,31 @@ public class HStoreFile implements HConstants {
|
|||
* @throws IOException
|
||||
*/
|
||||
public Reader(FileSystem fs, String dirName, Configuration conf,
|
||||
final Filter filter, final boolean blockCacheEnabled)
|
||||
final boolean filter, final boolean blockCacheEnabled)
|
||||
throws IOException {
|
||||
super(fs, dirName, conf, blockCacheEnabled);
|
||||
bloomFilter = filter;
|
||||
if (filter) {
|
||||
this.bloomFilter = loadBloomFilter(fs, dirName);
|
||||
} else {
|
||||
this.bloomFilter = null;
|
||||
}
|
||||
}
|
||||
|
||||
private BloomFilter loadBloomFilter(FileSystem fs, String dirName)
|
||||
throws IOException {
|
||||
Path filterFile = new Path(dirName, BLOOMFILTER_FILE_NAME);
|
||||
if(!fs.exists(filterFile)) {
|
||||
throw new FileNotFoundException("Could not find bloom filter: " +
|
||||
filterFile);
|
||||
}
|
||||
BloomFilter filter = new BloomFilter();
|
||||
FSDataInputStream in = fs.open(filterFile);
|
||||
try {
|
||||
bloomFilter.readFields(in);
|
||||
} finally {
|
||||
fs.close();
|
||||
}
|
||||
return filter;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
@ -788,27 +768,65 @@ public class HStoreFile implements HConstants {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/* @return size of the bloom filter */
|
||||
int getBloomFilterSize() {
|
||||
return bloomFilter == null ? 0 : bloomFilter.getVectorSize();
|
||||
}
|
||||
}
|
||||
|
||||
static class Writer extends HbaseWriter {
|
||||
private final Filter bloomFilter;
|
||||
private static final double DEFAULT_NUMBER_OF_HASH_FUNCTIONS = 4.0;
|
||||
private final BloomFilter bloomFilter;
|
||||
private final String dirName;
|
||||
private final FileSystem fs;
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @param fs
|
||||
* @param dirName
|
||||
* @param keyClass
|
||||
* @param valClass
|
||||
* @param compression
|
||||
* @param filter
|
||||
* @param nrows
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Writer(Configuration conf, FileSystem fs, String dirName,
|
||||
SequenceFile.CompressionType compression, final Filter filter)
|
||||
SequenceFile.CompressionType compression, final boolean filter,
|
||||
int nrows)
|
||||
throws IOException {
|
||||
super(conf, fs, dirName, compression);
|
||||
bloomFilter = filter;
|
||||
this.dirName = dirName;
|
||||
this.fs = fs;
|
||||
if (filter) {
|
||||
/*
|
||||
* There is no way to automatically determine the vector size and the
|
||||
* number of hash functions to use. In particular, bloom filters are
|
||||
* very sensitive to the number of elements inserted into them. For
|
||||
* HBase, the number of entries depends on the size of the data stored
|
||||
* in the column. Currently the default region size is 256MB, so the
|
||||
* number of entries is approximately
|
||||
* 256MB / (average value size for column).
|
||||
*
|
||||
* If m denotes the number of bits in the Bloom filter (vectorSize),
|
||||
* n denotes the number of elements inserted into the Bloom filter and
|
||||
* k represents the number of hash functions used (nbHash), then
|
||||
* according to Broder and Mitzenmacher,
|
||||
*
|
||||
* ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf )
|
||||
*
|
||||
* the probability of false positives is minimized when k is
|
||||
* approximately m/n ln(2).
|
||||
*/
|
||||
this.bloomFilter = new BloomFilter(
|
||||
(int) DEFAULT_NUMBER_OF_HASH_FUNCTIONS,
|
||||
(int) Math.ceil(
|
||||
(DEFAULT_NUMBER_OF_HASH_FUNCTIONS * (1.0 * nrows)) /
|
||||
Math.log(2.0))
|
||||
);
|
||||
} else {
|
||||
this.bloomFilter = null;
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
@ -820,6 +838,36 @@ public class HStoreFile implements HConstants {
|
|||
}
|
||||
super.append(key, val);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
super.close();
|
||||
if (this.bloomFilter != null) {
|
||||
flushBloomFilter();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes bloom filter to disk
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void flushBloomFilter() throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("flushing bloom filter for " + this.dirName);
|
||||
}
|
||||
FSDataOutputStream out =
|
||||
fs.create(new Path(dirName, BLOOMFILTER_FILE_NAME));
|
||||
try {
|
||||
bloomFilter.write(out);
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("flushed bloom filter for " + this.dirName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -841,21 +889,12 @@ public class HStoreFile implements HConstants {
|
|||
final Configuration conf, final Range r,
|
||||
final WritableComparable midKey)
|
||||
throws IOException {
|
||||
this(fs, dirName, conf, r, midKey, null, false);
|
||||
this(fs, dirName, conf, r, midKey, false, false);
|
||||
}
|
||||
|
||||
HalfMapFileReader(final FileSystem fs, final String dirName,
|
||||
final Configuration conf, final Range r,
|
||||
final WritableComparable midKey, final Filter filter)
|
||||
throws IOException {
|
||||
super(fs, dirName, conf, filter);
|
||||
top = isTopFileRegion(r);
|
||||
midkey = midKey;
|
||||
}
|
||||
|
||||
HalfMapFileReader(final FileSystem fs, final String dirName,
|
||||
final Configuration conf, final Range r,
|
||||
final WritableComparable midKey, final Filter filter,
|
||||
final WritableComparable midKey, final boolean filter,
|
||||
final boolean blockCacheEnabled)
|
||||
throws IOException {
|
||||
super(fs, dirName, conf, filter, blockCacheEnabled);
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/** A compaction reader for MapFile */
|
||||
class MapFileCompactionReader implements CompactionReader {
|
||||
final MapFile.Reader reader;
|
||||
|
||||
MapFileCompactionReader(final MapFile.Reader r) {
|
||||
this.reader = r;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void close() throws IOException {
|
||||
this.reader.close();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public boolean next(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
return this.reader.next(key, val);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void reset() throws IOException {
|
||||
this.reader.reset();
|
||||
}
|
||||
}
|
||||
|
|
@ -92,7 +92,8 @@ implements ChangedReadersObserver {
|
|||
// Most recent map file should be first
|
||||
int i = readers.length - 1;
|
||||
for(HStoreFile curHSF: store.getStorefiles().values()) {
|
||||
readers[i--] = curHSF.getReader(store.fs, store.bloomFilter);
|
||||
readers[i--] = curHSF.getReader(store.fs,
|
||||
store.getFamily().isBloomFilterEnabled(), false);
|
||||
}
|
||||
|
||||
this.keys = new HStoreKey[readers.length];
|
||||
|
|
|
@ -413,7 +413,7 @@ public class TableHandler extends GenericHandler {
|
|||
doElement(outputter, "name", Bytes.toString(e.getName()));
|
||||
doElement(outputter, "compression", e.getCompression().toString());
|
||||
doElement(outputter, "bloomfilter",
|
||||
e.getBloomFilter() == null? "NONE": e.getBloomFilter().toString());
|
||||
Boolean.toString(e.isBloomFilterEnabled()));
|
||||
doElement(outputter, "max-versions",
|
||||
Integer.toString(e.getMaxVersions()));
|
||||
doElement(outputter, "maximum-cell-size",
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.hbase.thrift;
|
||||
|
||||
import org.apache.hadoop.hbase.BloomFilterDescriptor;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
|
||||
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
|
||||
|
@ -38,19 +37,9 @@ public class ThriftUtilities {
|
|||
static public HColumnDescriptor colDescFromThrift(ColumnDescriptor in)
|
||||
throws IllegalArgument {
|
||||
CompressionType comp = CompressionType.valueOf(in.compression);
|
||||
BloomFilterDescriptor bloom = null;
|
||||
boolean bloom = false;
|
||||
if (in.bloomFilterType.compareTo("NONE") != 0) {
|
||||
if (in.bloomFilterVectorSize > 0 && in.bloomFilterNbHashes > 0) {
|
||||
bloom = new BloomFilterDescriptor(BloomFilterDescriptor.BloomFilterType
|
||||
.valueOf(in.bloomFilterType), in.bloomFilterVectorSize,
|
||||
in.bloomFilterNbHashes);
|
||||
} else if (in.bloomFilterVectorSize > 0) {
|
||||
bloom = new BloomFilterDescriptor(BloomFilterDescriptor.BloomFilterType
|
||||
.valueOf(in.bloomFilterType), in.bloomFilterVectorSize);
|
||||
} else {
|
||||
throw new IllegalArgument(
|
||||
"must specify number of entries for bloom filter");
|
||||
}
|
||||
bloom = true;
|
||||
}
|
||||
|
||||
if (in.name == null || in.name.length <= 0) {
|
||||
|
@ -78,12 +67,7 @@ public class ThriftUtilities {
|
|||
col.inMemory = in.isInMemory();
|
||||
col.blockCacheEnabled = in.isBlockCacheEnabled();
|
||||
col.maxValueLength = in.getMaxValueLength();
|
||||
BloomFilterDescriptor bloom = in.getBloomFilter();
|
||||
if (bloom != null) {
|
||||
col.bloomFilterType = bloom.getType().toString();
|
||||
col.bloomFilterVectorSize = bloom.getVectorSize();
|
||||
col.bloomFilterNbHashes = bloom.getNbHash();
|
||||
}
|
||||
col.bloomFilterType = Boolean.toString(in.isBloomFilterEnabled());
|
||||
return col;
|
||||
}
|
||||
|
||||
|
|
|
@ -389,7 +389,14 @@ public class MetaUtils {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateMETARegionInfo(HRegion r, final HRegionInfo hri)
|
||||
/**
|
||||
* Update COL_REGIONINFO in meta region r with HRegionInfo hri
|
||||
*
|
||||
* @param r
|
||||
* @param hri
|
||||
* @throws IOException
|
||||
*/
|
||||
public void updateMETARegionInfo(HRegion r, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
HRegionInfo h = Writables.getHRegionInfoOrNull(
|
||||
|
|
|
@ -44,10 +44,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
@ -218,12 +220,16 @@ public class Migrate extends Configured implements Tool {
|
|||
migrateFromNoVersion(rootFiles);
|
||||
migrateToV2(rootFiles);
|
||||
migrateToV3();
|
||||
migrateToV4();
|
||||
} else if (version.compareTo("0.1") == 0) {
|
||||
migrateToV2(getRootDirFiles());
|
||||
migrateToV3();
|
||||
migrateToV4();
|
||||
} else if (version.compareTo("2") == 0) {
|
||||
migrateToV3();
|
||||
} else if (version.compareTo("3") == 0) {
|
||||
migrateToV4();
|
||||
} else if (version.compareTo("4") == 0) {
|
||||
// Nothing to do.
|
||||
} else {
|
||||
throw new IOException("Unrecognized version: " + version);
|
||||
|
@ -289,6 +295,11 @@ public class Migrate extends Configured implements Tool {
|
|||
LOG.info("Checking to see if hbase in Filesystem is at version 3.");
|
||||
addHistorianFamilyToMeta();
|
||||
}
|
||||
|
||||
private void migrateToV4() throws IOException {
|
||||
LOG.info("Checking to see if hbase in Filesystem is at version 4.");
|
||||
updateBloomFilters();
|
||||
}
|
||||
|
||||
private FileStatus[] getRootDirFiles() throws IOException {
|
||||
FileStatus[] stats = fs.listStatus(FSUtils.getRootDir(this.conf));
|
||||
|
@ -496,7 +507,7 @@ public class Migrate extends Configured implements Tool {
|
|||
utils.addColumn(HConstants.META_TABLE_NAME,
|
||||
new HColumnDescriptor(HConstants.COLUMN_FAMILY_HISTORIAN,
|
||||
HConstants.ALL_VERSIONS, HColumnDescriptor.CompressionType.NONE,
|
||||
false, false, Integer.MAX_VALUE, HConstants.FOREVER, null));
|
||||
false, false, Integer.MAX_VALUE, HConstants.FOREVER, false));
|
||||
LOG.info("Historian family added to .META.");
|
||||
// Flush out the meta edits.
|
||||
}
|
||||
|
@ -504,6 +515,57 @@ public class Migrate extends Configured implements Tool {
|
|||
utils.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void updateBloomFilters() throws IOException {
|
||||
if (this.migrationNeeded && this.readOnly) {
|
||||
return;
|
||||
}
|
||||
final Path rootDir = FSUtils.getRootDir(conf);
|
||||
final MetaUtils utils = new MetaUtils(this.conf);
|
||||
try {
|
||||
// Scan the root region
|
||||
utils.scanRootRegion(new MetaUtils.ScannerListener() {
|
||||
public boolean processRow(HRegionInfo info) throws IOException {
|
||||
// Scan every meta region
|
||||
final HRegion metaRegion = utils.getMetaRegion(info);
|
||||
utils.scanMetaRegion(info, new MetaUtils.ScannerListener() {
|
||||
public boolean processRow(HRegionInfo tableInfo) throws IOException {
|
||||
HTableDescriptor desc = tableInfo.getTableDesc();
|
||||
Path tableDir =
|
||||
HTableDescriptor.getTableDir(rootDir, desc.getName());
|
||||
for (HColumnDescriptor column: desc.getFamilies()) {
|
||||
if (column.isBloomFilterEnabled()) {
|
||||
// Column has a bloom filter
|
||||
migrationNeeded = true;
|
||||
|
||||
Path filterDir = HStoreFile.getFilterDir(tableDir,
|
||||
tableInfo.getEncodedName(), column.getName());
|
||||
if (fs.exists(filterDir)) {
|
||||
// Filter dir exists
|
||||
if (readOnly) {
|
||||
// And if we are only checking to see if a migration is
|
||||
// needed - it is. We're done.
|
||||
return false;
|
||||
}
|
||||
// Delete the filter
|
||||
fs.delete(filterDir, true);
|
||||
// Update the HRegionInfo in meta
|
||||
utils.updateMETARegionInfo(metaRegion, tableInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
});
|
||||
// Stop scanning if only doing a check and we've determined that a
|
||||
// migration is needed. Otherwise continue by returning true
|
||||
return readOnly && migrationNeeded ? false : true;
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
utils.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("static-access")
|
||||
private int parseArgs(String[] args) {
|
||||
|
|
|
@ -192,6 +192,13 @@ public class BloomFilter extends Filter {
|
|||
bf.or(this);
|
||||
return bf;
|
||||
}//end clone()
|
||||
|
||||
/**
|
||||
* @return size of the the bloomfilter
|
||||
*/
|
||||
public int getVectorSize() {
|
||||
return this.vectorSize;
|
||||
}
|
||||
|
||||
// Writable
|
||||
|
||||
|
|
|
@ -194,13 +194,13 @@ public abstract class HBaseTestCase extends TestCase {
|
|||
HTableDescriptor htd = new HTableDescriptor(name);
|
||||
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME1, versions,
|
||||
CompressionType.NONE, false, false, Integer.MAX_VALUE,
|
||||
HConstants.FOREVER, null));
|
||||
HConstants.FOREVER, false));
|
||||
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME2, versions,
|
||||
CompressionType.NONE, false, false, Integer.MAX_VALUE,
|
||||
HConstants.FOREVER, null));
|
||||
HConstants.FOREVER, false));
|
||||
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3, versions,
|
||||
CompressionType.NONE, false, false, Integer.MAX_VALUE,
|
||||
HConstants.FOREVER, null));
|
||||
HConstants.FOREVER, false));
|
||||
return htd;
|
||||
}
|
||||
|
||||
|
|
|
@ -145,78 +145,8 @@ public class TestBloomFilters extends HBaseClusterTestCase {
|
|||
Bytes.toBytes("yzabcdef")
|
||||
};
|
||||
|
||||
/** constructor */
|
||||
public TestBloomFilters() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that specifies explicit parameters for the bloom filter
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testExplicitParameters() throws IOException {
|
||||
HTable table = null;
|
||||
|
||||
// Setup
|
||||
|
||||
HTableDescriptor desc = new HTableDescriptor(getName());
|
||||
BloomFilterDescriptor bloomFilter =
|
||||
new BloomFilterDescriptor( // if we insert 1000 values
|
||||
BloomFilterDescriptor.BloomFilterType.BLOOMFILTER, // plain old bloom filter
|
||||
12499, // number of bits
|
||||
4 // number of hash functions
|
||||
);
|
||||
|
||||
desc.addFamily(
|
||||
new HColumnDescriptor(CONTENTS, // Column name
|
||||
1, // Max versions
|
||||
HColumnDescriptor.CompressionType.NONE, // no compression
|
||||
HColumnDescriptor.DEFAULT_IN_MEMORY, // not in memory
|
||||
HColumnDescriptor.DEFAULT_BLOCKCACHE,
|
||||
HColumnDescriptor.DEFAULT_LENGTH,
|
||||
HColumnDescriptor.DEFAULT_TTL,
|
||||
bloomFilter
|
||||
)
|
||||
);
|
||||
|
||||
// Create the table
|
||||
|
||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||
admin.createTable(desc);
|
||||
|
||||
// Open table
|
||||
|
||||
table = new HTable(conf, desc.getName());
|
||||
|
||||
// Store some values
|
||||
|
||||
for(int i = 0; i < 100; i++) {
|
||||
byte [] row = rows[i];
|
||||
String value = row.toString();
|
||||
BatchUpdate b = new BatchUpdate(row);
|
||||
b.put(CONTENTS, value.getBytes(HConstants.UTF8_ENCODING));
|
||||
table.commit(b);
|
||||
}
|
||||
try {
|
||||
// Give cache flusher and log roller a chance to run
|
||||
// Otherwise we'll never hit the bloom filter, just the memcache
|
||||
Thread.sleep(conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000) * 2);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
|
||||
for(int i = 0; i < testKeys.length; i++) {
|
||||
Cell value = table.get(testKeys[i], CONTENTS);
|
||||
if(value != null && value.getValue().length != 0) {
|
||||
LOG.info("non existant key: " + testKeys[i] + " returned value: " +
|
||||
new String(value.getValue(), HConstants.UTF8_ENCODING));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that uses computed for the bloom filter
|
||||
* Test that uses automatic bloom filter
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testComputedParameters() throws IOException {
|
||||
|
@ -225,14 +155,6 @@ public class TestBloomFilters extends HBaseClusterTestCase {
|
|||
// Setup
|
||||
|
||||
HTableDescriptor desc = new HTableDescriptor(getName());
|
||||
|
||||
BloomFilterDescriptor bloomFilter =
|
||||
new BloomFilterDescriptor(
|
||||
BloomFilterDescriptor.BloomFilterType.BLOOMFILTER, // plain old bloom filter
|
||||
1000 // estimated number of entries
|
||||
);
|
||||
LOG.info("vector size: " + bloomFilter.vectorSize);
|
||||
|
||||
desc.addFamily(
|
||||
new HColumnDescriptor(CONTENTS, // Column name
|
||||
1, // Max versions
|
||||
|
@ -241,7 +163,7 @@ public class TestBloomFilters extends HBaseClusterTestCase {
|
|||
HColumnDescriptor.DEFAULT_BLOCKCACHE,
|
||||
HColumnDescriptor.DEFAULT_LENGTH,
|
||||
HColumnDescriptor.DEFAULT_TTL,
|
||||
bloomFilter
|
||||
true
|
||||
)
|
||||
);
|
||||
|
||||
|
|
|
@ -129,9 +129,9 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
JenkinsHash.hash(Bytes.toBytes(getName())),
|
||||
Bytes.toBytes("colfamily"), 1234567890L, null);
|
||||
MapFile.Writer writer =
|
||||
hsf.getWriter(this.fs, SequenceFile.CompressionType.NONE, null);
|
||||
hsf.getWriter(this.fs, SequenceFile.CompressionType.NONE, false, 0);
|
||||
writeStoreFile(writer);
|
||||
MapFile.Reader reader = hsf.getReader(this.fs, null);
|
||||
MapFile.Reader reader = hsf.getReader(this.fs, false, false);
|
||||
// Split on a row, not in middle of row. Midkey returned by reader
|
||||
// may be in middle of row. Create new one with empty column and
|
||||
// timestamp.
|
||||
|
@ -162,7 +162,7 @@ public class TestHStoreFile extends HBaseTestCase {
|
|||
otherReference.getMidkey().toString());
|
||||
// Now confirm that I can read from the reference and that it only gets
|
||||
// keys from top half of the file.
|
||||
MapFile.Reader halfReader = refHsf.getReader(this.fs, null);
|
||||
MapFile.Reader halfReader = refHsf.getReader(this.fs, false, false);
|
||||
HStoreKey key = new HStoreKey();
|
||||
ImmutableBytesWritable value = new ImmutableBytesWritable();
|
||||
boolean first = true;
|
||||
|
|
|
@ -80,7 +80,7 @@ public class TestTimestamp extends HBaseClusterTestCase {
|
|||
HTableDescriptor htd = createTableDescriptor(getName());
|
||||
htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS,
|
||||
CompressionType.NONE, false, false, Integer.MAX_VALUE,
|
||||
HConstants.FOREVER, null));
|
||||
HConstants.FOREVER, false));
|
||||
return createNewHRegion(htd, null, null);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue