HBASE-1215 Migration. Pull in classes I need to read old file types. Removed old classes no longer needed from io package (e.g. SequenceFile and MapFile)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@795230 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-07-17 21:23:11 +00:00
parent 6f242ea63f
commit 1f68460276
26 changed files with 3732 additions and 4635 deletions

View File

@ -49,8 +49,8 @@ public interface HConstants {
* Version 6 enables blockcaching on catalog tables.
* Version 7 introduces hfile -- hbase 0.19 to 0.20..
*/
public static final String FILE_SYSTEM_VERSION = "6";
// public static final String FILE_SYSTEM_VERSION = "7";
// public static final String FILE_SYSTEM_VERSION = "6";
public static final String FILE_SYSTEM_VERSION = "7";
// Configuration parameters

View File

@ -1,284 +0,0 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hbase.util.SoftValueMap;
import org.apache.hadoop.io.DataInputBuffer;
/**
* An implementation of {@link FSInputStream} that reads the stream in blocks
* of a fixed, configurable size. The blocks are stored in a memory-sensitive
* cache. Implements Runnable. Run it on a period so we clean up soft
* references from the reference queue.
*/
public class BlockFSInputStream extends FSInputStream {
static final Log LOG = LogFactory.getLog(BlockFSInputStream.class);
/*
* Set up scheduled execution of cleanup of soft references. Run with one
* thread for now. May need more when many files. Should be an option but
* also want BlockFSInputStream to be self-contained.
*/
private static final ScheduledExecutorService EXECUTOR =
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("BlockFSInputStreamReferenceQueueChecker");
return t;
}
});
/*
* The registration of this object in EXECUTOR.
*/
private final ScheduledFuture<?> registration;
private final InputStream in;
private final long fileLength;
private final int blockSize;
private final SoftValueMap<Long, byte[]> blocks;
private boolean closed;
private DataInputBuffer blockStream = new DataInputBuffer();
private long blockEnd = -1;
private long pos = 0;
/**
* @param in
* @param fileLength
* @param blockSize the size of each block in bytes.
*/
public BlockFSInputStream(InputStream in, long fileLength, int blockSize) {
this.in = in;
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
throw new IllegalArgumentException(
"In is not an instance of Seekable or PositionedReadable");
}
this.fileLength = fileLength;
this.blockSize = blockSize;
// A memory-sensitive map that has soft references to values
this.blocks = new SoftValueMap<Long, byte []>() {
private long hits, misses;
@Override
public byte [] get(Object key) {
byte [] value = super.get(key);
if (value == null) {
misses++;
} else {
hits++;
}
if (LOG.isDebugEnabled() && ((hits + misses) % 10000) == 0) {
long hitRate = (100 * hits) / (hits + misses);
LOG.debug("Hit rate for cache " + hashCode() + ": " + hitRate + "%");
}
return value;
}
};
// Register a Runnable that runs checkReferences on a period.
final int hashcode = hashCode();
this.registration = EXECUTOR.scheduleWithFixedDelay(new Runnable() {
public void run() {
int cleared = checkReferences();
if (LOG.isDebugEnabled() && cleared > 0) {
LOG.debug("Checker cleared " + cleared + " in " + hashcode);
}
}
}, 1, 1, TimeUnit.SECONDS);
}
/**
* @see org.apache.hadoop.fs.FSInputStream#getPos()
*/
@Override
public synchronized long getPos() {
return pos;
}
/**
* @see java.io.InputStream#available()
*/
@Override
public synchronized int available() {
return (int) (fileLength - pos);
}
/**
* @see org.apache.hadoop.fs.FSInputStream#seek(long)
*/
@Override
public synchronized void seek(long targetPos) throws IOException {
if (targetPos > fileLength) {
throw new IOException("Cannot seek after EOF");
}
pos = targetPos;
blockEnd = -1;
}
/**
* @see org.apache.hadoop.fs.FSInputStream#seekToNewSource(long)
*/
@Override
public synchronized boolean seekToNewSource(long targetPos)
throws IOException {
return false;
}
/**
* @see java.io.InputStream#read()
*/
@Override
public synchronized int read() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
int result = -1;
if (pos < fileLength) {
if (pos > blockEnd) {
blockSeekTo(pos);
}
result = blockStream.read();
if (result >= 0) {
pos++;
}
}
return result;
}
/**
* @see java.io.InputStream#read(byte[], int, int)
*/
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
if (pos < fileLength) {
if (pos > blockEnd) {
blockSeekTo(pos);
}
int realLen = Math.min(len, (int) (blockEnd - pos + 1));
int result = blockStream.read(buf, off, realLen);
if (result >= 0) {
pos += result;
}
return result;
}
return -1;
}
private synchronized void blockSeekTo(long target) throws IOException {
long targetBlock = target/blockSize;
long targetBlockStart = targetBlock * blockSize;
long targetBlockEnd = Math.min(targetBlockStart + blockSize, fileLength) - 1;
long blockLength = targetBlockEnd - targetBlockStart + 1;
long offsetIntoBlock = target - targetBlockStart;
byte[] block = blocks.get(Long.valueOf(targetBlockStart));
if (block == null) {
block = new byte[blockSize];
((PositionedReadable) in).readFully(targetBlockStart, block, 0,
(int) blockLength);
blocks.put(Long.valueOf(targetBlockStart), block);
}
this.pos = target;
this.blockEnd = targetBlockEnd;
this.blockStream.reset(block, (int) offsetIntoBlock,
(int) (blockLength - offsetIntoBlock));
}
/**
* @see java.io.InputStream#close()
*/
@Override
public void close() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
if (!this.registration.cancel(false)) {
LOG.warn("Failed cancel of " + this.registration);
}
int cleared = checkReferences();
if (LOG.isDebugEnabled() && cleared > 0) {
LOG.debug("Close cleared " + cleared + " in " + hashCode());
}
if (blockStream != null) {
blockStream.close();
blockStream = null;
}
super.close();
closed = true;
}
/**
* We don't support marks.
*/
@Override
public boolean markSupported() {
return false;
}
/**
* @see java.io.InputStream#mark(int)
*/
@Override
public void mark(int readLimit) {
// Do nothing
}
/**
* @see java.io.InputStream#reset()
*/
@Override
public void reset() throws IOException {
throw new IOException("Mark not supported");
}
/**
* Call frequently to clear Soft Reference Reference Queue.
* @return Count of references cleared.
*/
public synchronized int checkReferences() {
if (this.closed) {
return 0;
}
return this.blocks.checkReferences();
}
}

View File

@ -1,135 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.*;
/** A reusable {@link DataOutput} implementation that writes to an in-memory
* buffer.
*
* <p>This is copy of Hadoop SequenceFile brought local so we can fix bugs;
* e.g. hbase-1097</p>
*
* <p>This saves memory over creating a new DataOutputStream and
* ByteArrayOutputStream each time data is written.
*
* <p>Typical usage is something like the following:<pre>
*
* DataOutputBuffer buffer = new DataOutputBuffer();
* while (... loop condition ...) {
* buffer.reset();
* ... write buffer using DataOutput methods ...
* byte[] data = buffer.getData();
* int dataLength = buffer.getLength();
* ... write data to its ultimate destination ...
* }
* </pre>
*
*/
public class DataOutputBuffer extends DataOutputStream {
private static class Buffer extends ByteArrayOutputStream {
public byte[] getData() { return buf; }
public int getLength() { return count; }
// Keep the initial buffer around so can put it back in place on reset.
private final byte [] initialBuffer;
public Buffer() {
super();
this.initialBuffer = this.buf;
}
public Buffer(int size) {
super(size);
this.initialBuffer = this.buf;
}
public void write(DataInput in, int len) throws IOException {
int newcount = count + len;
if (newcount > buf.length) {
byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
System.arraycopy(buf, 0, newbuf, 0, count);
buf = newbuf;
}
in.readFully(buf, count, len);
count = newcount;
}
@Override
public synchronized void reset() {
// Rest the buffer so we don't keep around the shape of the biggest
// value ever read.
this.buf = this.initialBuffer;
super.reset();
}
}
private Buffer buffer;
/** Constructs a new empty buffer. */
public DataOutputBuffer() {
this(new Buffer());
}
public DataOutputBuffer(int size) {
this(new Buffer(size));
}
private DataOutputBuffer(Buffer buffer) {
super(buffer);
this.buffer = buffer;
}
/** Returns the current contents of the buffer.
* Data is only valid to {@link #getLength()}.
* @return byte[]
*/
public byte[] getData() { return buffer.getData(); }
/** Returns the length of the valid data currently in the buffer.
* @return int
*/
public int getLength() { return buffer.getLength(); }
/** Resets the buffer to empty.
* @return DataOutputBuffer
*/
public DataOutputBuffer reset() {
this.written = 0;
buffer.reset();
return this;
}
/** Writes bytes from a DataInput directly into the buffer.
* @param in
* @param length
* @throws IOException
*/
public void write(DataInput in, int length) throws IOException {
buffer.write(in, length);
}
/** Write to a file stream
* @param out
* @throws IOException
*/
public void writeTo(OutputStream out) throws IOException {
buffer.writeTo(out);
}
}

View File

@ -1,781 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
import java.io.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.hbase.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
/** A file-based map from keys to values.
*
* <p>This is copy of Hadoop SequenceFile brought local so we can fix bugs;
* e.g. hbase-1097</p>
*
* <p>A map is a directory containing two files, the <code>data</code> file,
* containing all keys and values in the map, and a smaller <code>index</code>
* file, containing a fraction of the keys. The fraction is determined by
* {@link Writer#getIndexInterval()}.
*
* <p>The index file is read entirely into memory. Thus key implementations
* should try to keep themselves small.
*
* <p>Map files are created by adding entries in-order. To maintain a large
* database, perform updates by copying the previous version of a database and
* merging in a sorted change list, to create a new version of the database in
* a new file. Sorting large change lists can be done with {@link
* SequenceFile.Sorter}.
*/
public class MapFile {
protected static final Log LOG = LogFactory.getLog(MapFile.class);
/** The name of the index file. */
public static final String INDEX_FILE_NAME = "index";
/** The name of the data file. */
public static final String DATA_FILE_NAME = "data";
protected MapFile() {} // no public ctor
/** Writes a new map. */
public static class Writer implements java.io.Closeable {
private SequenceFile.Writer data;
private SequenceFile.Writer index;
final private static String INDEX_INTERVAL = "io.map.index.interval";
private int indexInterval = 128;
private long size;
private LongWritable position = new LongWritable();
// the following fields are used only for checking key order
private WritableComparator comparator;
private DataInputBuffer inBuf = new DataInputBuffer();
private DataOutputBuffer outBuf = new DataOutputBuffer();
private WritableComparable lastKey;
/** Create the named map for keys of the named class.
* @param conf
* @param fs
* @param dirName
* @param keyClass
* @param valClass
* @throws IOException
*/
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass)
throws IOException {
this(conf, fs, dirName,
WritableComparator.get(keyClass), valClass,
SequenceFile.getCompressionType(conf));
}
/** Create the named map for keys of the named class.
* @param conf
* @param fs
* @param dirName
* @param keyClass
* @param valClass
* @param compress
* @param progress
* @throws IOException
*/
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
CompressionType compress, Progressable progress)
throws IOException {
this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
compress, progress);
}
/** Create the named map for keys of the named class. */
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
CompressionType compress, CompressionCodec codec,
Progressable progress)
throws IOException {
this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
compress, codec, progress);
}
/** Create the named map for keys of the named class.
* @param conf
* @param fs
* @param dirName
* @param keyClass
* @param valClass
* @param compress
* @throws IOException
*/
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
CompressionType compress)
throws IOException {
this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress);
}
/** Create the named map using the named key comparator.
* @param conf
* @param fs
* @param dirName
* @param comparator
* @param valClass
* @throws IOException
*/
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass)
throws IOException {
this(conf, fs, dirName, comparator, valClass,
SequenceFile.getCompressionType(conf));
}
/** Create the named map using the named key comparator.
* @param conf
* @param fs
* @param dirName
* @param comparator
* @param valClass
* @param compress
* @throws IOException
*/
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
SequenceFile.CompressionType compress)
throws IOException {
this(conf, fs, dirName, comparator, valClass, compress, null);
}
/** Create the named map using the named key comparator.
* @param conf
* @param fs
* @param dirName
* @param comparator
* @param valClass
* @param compress
* @param progress
* @throws IOException
*/
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
SequenceFile.CompressionType compress,
Progressable progress)
throws IOException {
this(conf, fs, dirName, comparator, valClass,
compress, new DefaultCodec(), progress);
}
/** Create the named map using the named key comparator.
* @param conf
* @param fs
* @param dirName
* @param comparator
* @param valClass
* @param compress
* @param codec
* @param progress
* @throws IOException
*/
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
SequenceFile.CompressionType compress, CompressionCodec codec,
Progressable progress)
throws IOException {
this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
this.comparator = comparator;
this.lastKey = comparator.newKey();
Path dir = new Path(dirName);
if (!fs.mkdirs(dir)) {
throw new IOException("Mkdirs failed to create directory " + dir.toString());
}
Path dataFile = new Path(dir, DATA_FILE_NAME);
Path indexFile = new Path(dir, INDEX_FILE_NAME);
Class keyClass = comparator.getKeyClass();
this.data =
SequenceFile.createWriter
(fs, conf, dataFile, keyClass, valClass, compress, codec, progress);
this.index =
SequenceFile.createWriter
(fs, conf, indexFile, keyClass, LongWritable.class,
CompressionType.BLOCK, progress);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileWriter#getIndexInterval()
*/
public int getIndexInterval() { return indexInterval; }
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileWriter#setIndexInterval(int)
*/
public void setIndexInterval(int interval) { indexInterval = interval; }
/** Sets the index interval and stores it in conf
* @param conf
* @param interval
* @see #getIndexInterval()
*/
public static void setIndexInterval(Configuration conf, int interval) {
conf.setInt(INDEX_INTERVAL, interval);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileWriter#close()
*/
public synchronized void close() throws IOException {
data.close();
index.close();
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileWriter#append(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
*/
public synchronized void append(WritableComparable key, Writable val)
throws IOException {
checkKey(key);
if (size % indexInterval == 0) { // add an index entry
position.set(data.getLength()); // point to current eof
index.append(key, position);
}
data.append(key, val); // append key/value to data
size++;
}
private void checkKey(WritableComparable key) throws IOException {
// check that keys are well-ordered
if (size != 0 && comparator.compare(lastKey, key) > 0)
throw new IOException("key out of order: "+key+" after "+lastKey);
// update lastKey with a copy of key by writing and reading
outBuf.reset();
key.write(outBuf); // write new key
inBuf.reset(outBuf.getData(), outBuf.getLength());
lastKey.readFields(inBuf); // read into lastKey
}
}
/** Provide access to an existing map. */
public static class Reader implements java.io.Closeable {
/** Number of index entries to skip between each entry. Zero by default.
* Setting this to values larger than zero can facilitate opening large map
* files using less memory. */
private int INDEX_SKIP = 0;
private WritableComparator comparator;
private WritableComparable nextKey;
private long seekPosition = -1;
private int seekIndex = -1;
private long firstPosition;
// the data, on disk
private SequenceFile.Reader data;
private SequenceFile.Reader index;
// whether the index Reader was closed
private boolean indexClosed = false;
// the index, in memory
private int count = -1;
private WritableComparable[] keys;
private long[] positions;
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileReader#getKeyClass()
*/
public Class<?> getKeyClass() { return data.getKeyClass(); }
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileReader#getValueClass()
*/
public Class<?> getValueClass() { return data.getValueClass(); }
/** Construct a map reader for the named map.
* @param fs
* @param dirName
* @param conf
* @throws IOException
*/
public Reader(FileSystem fs, String dirName, Configuration conf) throws IOException {
this(fs, dirName, null, conf);
INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
}
/** Construct a map reader for the named map using the named comparator.
* @param fs
* @param dirName
* @param comparator
* @param conf
* @throws IOException
*/
public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf)
throws IOException {
this(fs, dirName, comparator, conf, true);
}
/**
* Hook to allow subclasses to defer opening streams until further
* initialization is complete.
* @see #createDataFileReader(FileSystem, Path, Configuration)
*/
protected Reader(FileSystem fs, String dirName,
WritableComparator comparator, Configuration conf, boolean open)
throws IOException {
if (open) {
open(fs, dirName, comparator, conf);
}
}
protected synchronized void open(FileSystem fs, String dirName,
WritableComparator comparator, Configuration conf) throws IOException {
Path dir = new Path(dirName);
Path dataFile = new Path(dir, DATA_FILE_NAME);
Path indexFile = new Path(dir, INDEX_FILE_NAME);
// open the data
this.data = createDataFileReader(fs, dataFile, conf);
this.firstPosition = data.getPosition();
if (comparator == null)
this.comparator = WritableComparator.get(data.getKeyClass().asSubclass(WritableComparable.class));
else
this.comparator = comparator;
// open the index
this.index = new SequenceFile.Reader(fs, indexFile, conf);
}
/**
* Override this method to specialize the type of
* {@link SequenceFile.Reader} returned.
*/
protected SequenceFile.Reader createDataFileReader(FileSystem fs,
Path dataFile, Configuration conf) throws IOException {
return new SequenceFile.Reader(fs, dataFile, conf);
}
private void readIndex() throws IOException {
// read the index entirely into memory
if (this.keys != null)
return;
this.count = 0;
this.keys = new WritableComparable[1024];
this.positions = new long[1024];
try {
int skip = INDEX_SKIP;
LongWritable position = new LongWritable();
WritableComparable lastKey = null;
while (true) {
WritableComparable k = comparator.newKey();
if (!index.next(k, position))
break;
// check order to make sure comparator is compatible
if (lastKey != null && comparator.compare(lastKey, k) > 0)
throw new IOException("key out of order: "+k+" after "+lastKey);
lastKey = k;
if (skip > 0) {
skip--;
continue; // skip this entry
}
skip = INDEX_SKIP; // reset skip
if (count == keys.length) { // time to grow arrays
int newLength = (keys.length*3)/2;
WritableComparable[] newKeys = new WritableComparable[newLength];
long[] newPositions = new long[newLength];
System.arraycopy(keys, 0, newKeys, 0, count);
System.arraycopy(positions, 0, newPositions, 0, count);
keys = newKeys;
positions = newPositions;
}
keys[count] = k;
positions[count] = position.get();
count++;
}
} catch (EOFException e) {
LOG.warn("Unexpected EOF reading " + index +
" at entry #" + count + ". Ignoring.");
} finally {
indexClosed = true;
index.close();
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileReader#reset()
*/
public synchronized void reset() throws IOException {
data.seek(firstPosition);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileReader#midKey()
*/
public synchronized WritableComparable midKey() throws IOException {
readIndex();
int pos = ((count - 1) / 2); // middle of the index
if (pos < 0) {
throw new IOException("MapFile empty");
}
return keys[pos];
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileReader#finalKey(org.apache.hadoop.io.WritableComparable)
*/
public synchronized void finalKey(WritableComparable key)
throws IOException {
long originalPosition = data.getPosition(); // save position
try {
readIndex(); // make sure index is valid
if (count > 0) {
data.seek(positions[count-1]); // skip to last indexed entry
} else {
reset(); // start at the beginning
}
while (data.next(key)) {} // scan to eof
} finally {
data.seek(originalPosition); // restore position
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileReader#seek(org.apache.hadoop.io.WritableComparable)
*/
public synchronized boolean seek(WritableComparable key) throws IOException {
return seekInternal(key) == 0;
}
/**
* Positions the reader at the named key, or if none such exists, at the
* first entry after the named key.
*
* @return 0 - exact match found
* < 0 - positioned at next record
* 1 - no more records in file
*/
private synchronized int seekInternal(WritableComparable key)
throws IOException {
return seekInternal(key, false);
}
/**
* Positions the reader at the named key, or if none such exists, at the
* key that falls just before or just after dependent on how the
* <code>before</code> parameter is set.
*
* @param before - IF true, and <code>key</code> does not exist, position
* file at entry that falls just before <code>key</code>. Otherwise,
* position file at record that sorts just after.
* @return 0 - exact match found
* < 0 - positioned at next record
* 1 - no more records in file
*/
private synchronized int seekInternal(WritableComparable key,
final boolean before)
throws IOException {
readIndex(); // make sure index is read
if (seekIndex != -1 // seeked before
&& seekIndex+1 < count
&& comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
&& comparator.compare(key, nextKey)
>= 0) { // but after last seeked
// do nothing
} else {
seekIndex = binarySearch(key);
if (seekIndex < 0) // decode insertion point
seekIndex = -seekIndex-2;
if (seekIndex == -1) // belongs before first entry
seekPosition = firstPosition; // use beginning of file
else
seekPosition = positions[seekIndex]; // else use index
}
data.seek(seekPosition);
if (nextKey == null)
nextKey = comparator.newKey();
// If we're looking for the key before, we need to keep track
// of the position we got the current key as well as the position
// of the key before it.
long prevPosition = -1;
long curPosition = seekPosition;
while (data.next(nextKey)) {
int c = comparator.compare(key, nextKey);
if (c <= 0) { // at or beyond desired
if (before && c != 0) {
if (prevPosition == -1) {
// We're on the first record of this index block
// and we've already passed the search key. Therefore
// we must be at the beginning of the file, so seek
// to the beginning of this block and return c
data.seek(curPosition);
} else {
// We have a previous record to back up to
data.seek(prevPosition);
data.next(nextKey);
// now that we've rewound, the search key must be greater than this key
return 1;
}
}
return c;
}
if (before) {
prevPosition = curPosition;
curPosition = data.getPosition();
}
}
return 1;
}
private int binarySearch(WritableComparable key) {
int low = 0;
int high = count-1;
while (low <= high) {
int mid = (low + high) >>> 1;
WritableComparable midVal = keys[mid];
int cmp = comparator.compare(midVal, key);
if (cmp < 0)
low = mid + 1;
else if (cmp > 0)
high = mid - 1;
else
return mid; // key found
}
return -(low + 1); // key not found.
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileReader#next(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
*/
public synchronized boolean next(WritableComparable key, Writable val)
throws IOException {
return data.next(key, val);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileReader#get(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
*/
public synchronized Writable get(WritableComparable key, Writable val)
throws IOException {
if (seek(key)) {
data.getCurrentValue(val);
return val;
}
return null;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileReader#getClosest(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
*/
public synchronized WritableComparable getClosest(WritableComparable key,
Writable val)
throws IOException {
return getClosest(key, val, false);
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileReader#getClosest(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable, boolean)
*/
public synchronized WritableComparable getClosest(WritableComparable key,
Writable val, final boolean before)
throws IOException {
int c = seekInternal(key, before);
// If we didn't get an exact match, and we ended up in the wrong
// direction relative to the query key, return null since we
// must be at the beginning or end of the file.
if ((!before && c > 0) ||
(before && c < 0)) {
return null;
}
data.getCurrentValue(val);
return nextKey;
}
/* (non-Javadoc)
* @see org.apache.hadoop.hbase.io.StoreFileReader#close()
*/
public synchronized void close() throws IOException {
if (!indexClosed) {
index.close();
}
data.close();
}
}
/** Renames an existing map directory.
* @param fs
* @param oldName
* @param newName
* @throws IOException
*/
public static void rename(FileSystem fs, String oldName, String newName)
throws IOException {
Path oldDir = new Path(oldName);
Path newDir = new Path(newName);
if (!fs.rename(oldDir, newDir)) {
throw new IOException("Could not rename " + oldDir + " to " + newDir);
}
}
/** Deletes the named map file.
* @param fs
* @param name
* @throws IOException
*/
public static void delete(FileSystem fs, String name) throws IOException {
Path dir = new Path(name);
Path data = new Path(dir, DATA_FILE_NAME);
Path index = new Path(dir, INDEX_FILE_NAME);
fs.delete(data, true);
fs.delete(index, true);
fs.delete(dir, true);
}
/**
* This method attempts to fix a corrupt MapFile by re-creating its index.
* @param fs filesystem
* @param dir directory containing the MapFile data and index
* @param keyClass key class (has to be a subclass of Writable)
* @param valueClass value class (has to be a subclass of Writable)
* @param dryrun do not perform any changes, just report what needs to be done
* @param conf
* @return number of valid entries in this MapFile, or -1 if no fixing was needed
* @throws Exception
*/
public static long fix(FileSystem fs, Path dir,
Class<? extends Writable> keyClass,
Class<? extends Writable> valueClass, boolean dryrun,
Configuration conf) throws Exception {
String dr = (dryrun ? "[DRY RUN ] " : "");
Path data = new Path(dir, DATA_FILE_NAME);
Path index = new Path(dir, INDEX_FILE_NAME);
int indexInterval = 128;
if (!fs.exists(data)) {
// there's nothing we can do to fix this!
throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
}
if (fs.exists(index)) {
// no fixing needed
return -1;
}
SequenceFile.Reader dataReader = new SequenceFile.Reader(fs, data, conf);
if (!dataReader.getKeyClass().equals(keyClass)) {
throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
", got " + dataReader.getKeyClass().getName());
}
if (!dataReader.getValueClass().equals(valueClass)) {
throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
", got " + dataReader.getValueClass().getName());
}
long cnt = 0L;
Writable key = ReflectionUtils.newInstance(keyClass, conf);
Writable value = ReflectionUtils.newInstance(valueClass, conf);
SequenceFile.Writer indexWriter = null;
if (!dryrun) indexWriter = SequenceFile.createWriter(fs, conf, index, keyClass, LongWritable.class);
try {
long pos = 0L;
LongWritable position = new LongWritable();
while(dataReader.next(key, value)) {
cnt++;
if (cnt % indexInterval == 0) {
position.set(pos);
if (!dryrun) indexWriter.append(key, position);
}
pos = dataReader.getPosition();
}
} catch(Throwable t) {
// truncated data file. swallow it.
}
dataReader.close();
if (!dryrun) indexWriter.close();
return cnt;
}
public static void main(String[] args) throws Exception {
String usage = "Usage: MapFile inFile outFile";
if (args.length != 2) {
System.err.println(usage);
System.exit(-1);
}
String in = args[0];
String out = args[1];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
MapFile.Reader reader = new MapFile.Reader(fs, in, conf);
MapFile.Writer writer =
new MapFile.Writer(conf, fs, out,
reader.getKeyClass().asSubclass(WritableComparable.class),
reader.getValueClass());
WritableComparable key =
ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf);
Writable value =
ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf);
while (reader.next(key, value)) // copy all entries
writer.append(key, value);
writer.close();
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,738 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration.nineteen;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.ColumnNameParseException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* A Key for a stored row.
*/
public class HStoreKey implements WritableComparable<HStoreKey>, HeapSize {
/**
* Colon character in UTF-8
*/
public static final char COLUMN_FAMILY_DELIMITER = ':';
private byte [] row = HConstants.EMPTY_BYTE_ARRAY;
private byte [] column = HConstants.EMPTY_BYTE_ARRAY;
private long timestamp = Long.MAX_VALUE;
/*
* regionInfo is only used as a hack to compare HSKs.
* It is not serialized. See https://issues.apache.org/jira/browse/HBASE-832
*/
private HRegionInfo regionInfo = null;
/**
* Estimated size tax paid for each instance of HSK. Estimate based on
* study of jhat and jprofiler numbers.
*/
// In jprofiler, says shallow size is 48 bytes. Add to it cost of two
// byte arrays and then something for the HRI hosting.
public static final int ESTIMATED_HEAP_TAX = 48;
/** Default constructor used in conjunction with Writable interface */
public HStoreKey() {
super();
}
/**
* Create an HStoreKey specifying only the row
* The column defaults to the empty string, the time stamp defaults to
* Long.MAX_VALUE and the table defaults to empty string
*
* @param row - row key
*/
public HStoreKey(final byte [] row) {
this(row, Long.MAX_VALUE);
}
/**
* Create an HStoreKey specifying only the row
* The column defaults to the empty string, the time stamp defaults to
* Long.MAX_VALUE and the table defaults to empty string
*
* @param row - row key
*/
public HStoreKey(final String row) {
this(row, Long.MAX_VALUE);
}
/**
* Create an HStoreKey specifying the row and timestamp
* The column and table names default to the empty string
*
* @param row row key
* @param hri
*/
public HStoreKey(final byte [] row, final HRegionInfo hri) {
this(row, HConstants.EMPTY_BYTE_ARRAY, hri);
}
/**
* Create an HStoreKey specifying the row and timestamp
* The column and table names default to the empty string
*
* @param row row key
* @param timestamp timestamp value
* @param hri HRegionInfo
*/
public HStoreKey(final byte [] row, long timestamp, final HRegionInfo hri) {
this(row, HConstants.EMPTY_BYTE_ARRAY, timestamp, hri);
}
/**
* Create an HStoreKey specifying the row and timestamp
* The column and table names default to the empty string
*
* @param row row key
* @param timestamp timestamp value
*/
public HStoreKey(final byte [] row, long timestamp) {
this(row, HConstants.EMPTY_BYTE_ARRAY, timestamp);
}
/**
* Create an HStoreKey specifying the row and timestamp
* The column and table names default to the empty string
*
* @param row row key
* @param timestamp timestamp value
*/
public HStoreKey(final String row, long timestamp) {
this (row, "", timestamp, new HRegionInfo());
}
/**
* Create an HStoreKey specifying the row and column names
* The timestamp defaults to LATEST_TIMESTAMP
* and table name defaults to the empty string
*
* @param row row key
* @param column column key
*/
public HStoreKey(final String row, final String column) {
this(row, column, HConstants.LATEST_TIMESTAMP, new HRegionInfo());
}
/**
* Create an HStoreKey specifying the row and column names
* The timestamp defaults to LATEST_TIMESTAMP
* and table name defaults to the empty string
*
* @param row row key
* @param column column key
*/
public HStoreKey(final byte [] row, final byte [] column) {
this(row, column, HConstants.LATEST_TIMESTAMP);
}
/**
* Create an HStoreKey specifying the row, column names and table name
* The timestamp defaults to LATEST_TIMESTAMP
*
* @param row row key
* @param column column key
* @param regionInfo region info
*/
public HStoreKey(final byte [] row,
final byte [] column, final HRegionInfo regionInfo) {
this(row, column, HConstants.LATEST_TIMESTAMP, regionInfo);
}
/**
* Create an HStoreKey specifying all the fields
* Does not make copies of the passed byte arrays. Presumes the passed
* arrays immutable.
* @param row row key
* @param column column key
* @param timestamp timestamp value
* @param regionInfo region info
*/
public HStoreKey(final String row,
final String column, long timestamp, final HRegionInfo regionInfo) {
this (Bytes.toBytes(row), Bytes.toBytes(column),
timestamp, regionInfo);
}
/**
* Create an HStoreKey specifying all the fields with unspecified table
* Does not make copies of the passed byte arrays. Presumes the passed
* arrays immutable.
* @param row row key
* @param column column key
* @param timestamp timestamp value
*/
public HStoreKey(final byte [] row, final byte [] column, long timestamp) {
this(row, column, timestamp, null);
}
/**
* Create an HStoreKey specifying all the fields with specified table
* Does not make copies of the passed byte arrays. Presumes the passed
* arrays immutable.
* @param row row key
* @param column column key
* @param timestamp timestamp value
* @param regionInfo region info
*/
public HStoreKey(final byte [] row,
final byte [] column, long timestamp, final HRegionInfo regionInfo) {
// Make copies
this.row = row;
this.column = column;
this.timestamp = timestamp;
this.regionInfo = regionInfo;
}
/**
* Constructs a new HStoreKey from another
*
* @param other the source key
*/
public HStoreKey(HStoreKey other) {
this(other.getRow(), other.getColumn(), other.getTimestamp(),
other.getHRegionInfo());
}
/**
* Change the value of the row key
*
* @param newrow new row key value
*/
public void setRow(byte [] newrow) {
this.row = newrow;
}
/**
* Change the value of the column in this key
*
* @param c new column family value
*/
public void setColumn(byte [] c) {
this.column = c;
}
/**
* Change the value of the timestamp field
*
* @param timestamp new timestamp value
*/
public void setVersion(long timestamp) {
this.timestamp = timestamp;
}
/**
* Set the value of this HStoreKey from the supplied key
*
* @param k key value to copy
*/
public void set(HStoreKey k) {
this.row = k.getRow();
this.column = k.getColumn();
this.timestamp = k.getTimestamp();
}
/** @return value of row key */
public byte [] getRow() {
return row;
}
/** @return value of column */
public byte [] getColumn() {
return this.column;
}
/** @return value of timestamp */
public long getTimestamp() {
return this.timestamp;
}
/** @return value of regioninfo */
public HRegionInfo getHRegionInfo() {
return this.regionInfo;
}
/**
* @param hri
*/
public void setHRegionInfo(final HRegionInfo hri) {
this.regionInfo = hri;
}
/**
* Compares the row and column of two keys
* @param other Key to compare against. Compares row and column.
* @return True if same row and column.
* @see #matchesWithoutColumn(HStoreKey)
* @see #matchesRowFamily(HStoreKey)
*/
public boolean matchesRowCol(HStoreKey other) {
return HStoreKey.equalsTwoRowKeys(getHRegionInfo(), getRow(), other.getRow()) &&
Bytes.equals(getColumn(), other.getColumn());
}
/**
* Compares the row and timestamp of two keys
*
* @param other Key to copmare against. Compares row and timestamp.
*
* @return True if same row and timestamp is greater than <code>other</code>
* @see #matchesRowCol(HStoreKey)
* @see #matchesRowFamily(HStoreKey)
*/
public boolean matchesWithoutColumn(HStoreKey other) {
return equalsTwoRowKeys(getHRegionInfo(), getRow(), other.getRow()) &&
getTimestamp() >= other.getTimestamp();
}
/**
* Compares the row and column family of two keys
*
* @param that Key to compare against. Compares row and column family
*
* @return true if same row and column family
* @see #matchesRowCol(HStoreKey)
* @see #matchesWithoutColumn(HStoreKey)
*/
public boolean matchesRowFamily(HStoreKey that) {
int delimiterIndex = getFamilyDelimiterIndex(getColumn());
return equalsTwoRowKeys(getHRegionInfo(), getRow(), that.getRow()) &&
Bytes.compareTo(getColumn(), 0, delimiterIndex, that.getColumn(), 0,
delimiterIndex) == 0;
}
@Override
public String toString() {
return Bytes.toString(this.row) + "/" + Bytes.toString(this.column) + "/" +
timestamp;
}
@Override
public boolean equals(Object obj) {
HStoreKey other = (HStoreKey)obj;
// Do a quick check.
if (this.row.length != other.row.length ||
this.column.length != other.column.length ||
this.timestamp != other.timestamp) {
return false;
}
return compareTo(other) == 0;
}
@Override
public int hashCode() {
int result = Bytes.hashCode(getRow());
result ^= Bytes.hashCode(getColumn());
result ^= getTimestamp();
return result;
}
// Comparable
public int compareTo(final HStoreKey o) {
return compareTo(this.regionInfo, this, o);
}
static int compareTo(final HRegionInfo hri, final HStoreKey left,
final HStoreKey right) {
// We can be passed null
if (left == null && right == null) return 0;
if (left == null) return -1;
if (right == null) return 1;
int result = compareTwoRowKeys(hri, left.getRow(), right.getRow());
if (result != 0) {
return result;
}
result = left.getColumn() == null && right.getColumn() == null? 0:
left.getColumn() == null && right.getColumn() != null? -1:
left.getColumn() != null && right.getColumn() == null? 1:
Bytes.compareTo(left.getColumn(), right.getColumn());
if (result != 0) {
return result;
}
// The below older timestamps sorting ahead of newer timestamps looks
// wrong but it is intentional. This way, newer timestamps are first
// found when we iterate over a memcache and newer versions are the
// first we trip over when reading from a store file.
if (left.getTimestamp() < right.getTimestamp()) {
result = 1;
} else if (left.getTimestamp() > right.getTimestamp()) {
result = -1;
}
// Because of HBASE-877, our BeforeThisStoreKey trick no longer works in
// mapfiles and so instead we need to do this weird check here below.
return result == 0 && left instanceof BeforeThisStoreKey? -1:
result == 0 && right instanceof BeforeThisStoreKey? 1:
result;
}
/**
* @param column
* @return New byte array that holds <code>column</code> family prefix only
* (Does not include the colon DELIMITER).
* @throws ColumnNameParseException
* @see #parseColumn(byte[])
*/
public static byte [] getFamily(final byte [] column)
throws ColumnNameParseException {
int index = getFamilyDelimiterIndex(column);
if (index <= 0) {
throw new ColumnNameParseException("Missing ':' delimiter between " +
"column family and qualifier in the passed column name <" +
Bytes.toString(column) + ">");
}
byte [] result = new byte[index];
System.arraycopy(column, 0, result, 0, index);
return result;
}
/**
* @param column
* @return Return hash of family portion of passed column.
*/
public static Integer getFamilyMapKey(final byte [] column) {
int index = getFamilyDelimiterIndex(column);
// If index < -1, presume passed column is a family name absent colon
// delimiter
return Bytes.mapKey(column, index > 0? index: column.length);
}
/**
* @param family
* @param column
* @return True if <code>column</code> has a family of <code>family</code>.
*/
public static boolean matchingFamily(final byte [] family,
final byte [] column) {
// Make sure index of the ':' is at same offset.
int index = getFamilyDelimiterIndex(column);
if (index != family.length) {
return false;
}
return Bytes.compareTo(family, 0, index, column, 0, index) == 0;
}
/**
* @param family
* @return Return <code>family</code> plus the family delimiter.
*/
public static byte [] addDelimiter(final byte [] family) {
// Manufacture key by adding delimiter to the passed in colFamily.
byte [] familyPlusDelimiter = new byte [family.length + 1];
System.arraycopy(family, 0, familyPlusDelimiter, 0, family.length);
familyPlusDelimiter[family.length] = HStoreKey.COLUMN_FAMILY_DELIMITER;
return familyPlusDelimiter;
}
/**
* @param column
* @return New byte array that holds <code>column</code> qualifier suffix.
* @see #parseColumn(byte[])
*/
public static byte [] getQualifier(final byte [] column) {
int index = getFamilyDelimiterIndex(column);
int len = column.length - (index + 1);
byte [] result = new byte[len];
System.arraycopy(column, index + 1, result, 0, len);
return result;
}
/**
* @param c Column name
* @return Return array of size two whose first element has the family
* prefix of passed column <code>c</code> and whose second element is the
* column qualifier.
* @throws ColumnNameParseException
*/
public static byte [][] parseColumn(final byte [] c)
throws ColumnNameParseException {
byte [][] result = new byte [2][];
int index = getFamilyDelimiterIndex(c);
if (index == -1) {
throw new ColumnNameParseException("Impossible column name: " + c);
}
result[0] = new byte [index];
System.arraycopy(c, 0, result[0], 0, index);
int len = c.length - (index + 1);
result[1] = new byte[len];
System.arraycopy(c, index + 1 /*Skip delimiter*/, result[1], 0,
len);
return result;
}
/**
* @param b
* @return Index of the family-qualifier colon delimiter character in passed
* buffer.
*/
public static int getFamilyDelimiterIndex(final byte [] b) {
if (b == null) {
throw new NullPointerException();
}
int result = -1;
for (int i = 0; i < b.length; i++) {
if (b[i] == COLUMN_FAMILY_DELIMITER) {
result = i;
break;
}
}
return result;
}
/**
* Returns row and column bytes out of an HStoreKey.
* @param hsk Store key.
* @return byte array encoding of HStoreKey
*/
public static byte[] getBytes(final HStoreKey hsk) {
return Bytes.add(hsk.getRow(), hsk.getColumn());
}
/**
* Utility method to compare two row keys.
* This is required because of the meta delimiters.
* This is a hack.
* @param regionInfo
* @param rowA
* @param rowB
* @return value of the comparison
*/
public static int compareTwoRowKeys(HRegionInfo regionInfo,
byte[] rowA, byte[] rowB) {
if (regionInfo != null && regionInfo.isMetaRegion()) {
byte[][] keysA = stripStartKeyMeta(rowA);
byte[][] KeysB = stripStartKeyMeta(rowB);
int rowCompare = Bytes.compareTo(keysA[0], KeysB[0]);
if(rowCompare == 0)
rowCompare = Bytes.compareTo(keysA[1], KeysB[1]);
return rowCompare;
}
return Bytes.compareTo(rowA, rowB);
}
/**
* Utility method to check if two row keys are equal.
* This is required because of the meta delimiters
* This is a hack
* @param regionInfo
* @param rowA
* @param rowB
* @return if it's equal
*/
public static boolean equalsTwoRowKeys(HRegionInfo regionInfo,
byte[] rowA, byte[] rowB) {
return ((rowA == null) && (rowB == null)) ? true:
(rowA == null) || (rowB == null) || (rowA.length != rowB.length) ? false:
compareTwoRowKeys(regionInfo,rowA,rowB) == 0;
}
private static byte[][] stripStartKeyMeta(byte[] rowKey) {
int offset = -1;
for (int i = rowKey.length - 1; i > 0; i--) {
if (rowKey[i] == HConstants.META_ROW_DELIMITER) {
offset = i;
break;
}
}
byte [] row = rowKey;
byte [] timestamp = HConstants.EMPTY_BYTE_ARRAY;
if (offset != -1) {
row = new byte[offset];
System.arraycopy(rowKey, 0, row, 0, offset);
timestamp = new byte[rowKey.length - offset - 1];
System.arraycopy(rowKey, offset+1, timestamp, 0,rowKey.length - offset - 1);
}
byte[][] elements = new byte[2][];
elements[0] = row;
elements[1] = timestamp;
return elements;
}
// Writable
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.row);
Bytes.writeByteArray(out, this.column);
out.writeLong(timestamp);
}
public void readFields(DataInput in) throws IOException {
this.row = Bytes.readByteArray(in);
this.column = Bytes.readByteArray(in);
this.timestamp = in.readLong();
}
public long heapSize() {
return getRow().length + Bytes.ESTIMATED_HEAP_TAX +
getColumn().length + Bytes.ESTIMATED_HEAP_TAX +
ESTIMATED_HEAP_TAX;
}
/**
* Passed as comparator for memcache and for store files. See HBASE-868.
*/
public static class HStoreKeyWritableComparator extends WritableComparator {
private final HRegionInfo hri;
/** @param hri */
public HStoreKeyWritableComparator(final HRegionInfo hri) {
super(HStoreKey.class);
this.hri = hri;
}
@SuppressWarnings("unchecked")
@Override
public int compare(final WritableComparable left,
final WritableComparable right) {
return compareTo(this.hri, (HStoreKey)left, (HStoreKey)right);
}
}
/**
* Pass this class into {@link org.apache.hadoop.io.MapFile}.getClosest when
* searching for the key that comes BEFORE this one but NOT this one. This
* class will return > 0 when asked to compare against itself rather than 0.
* This is a hack for case where getClosest returns a deleted key and we want
* to get the previous. Can't unless use use this class; it'll just keep
* returning us the deleted key (getClosest gets exact or nearest before when
* you pass true argument). TODO: Throw this class away when MapFile has
* a real 'previous' method. See HBASE-751.
*/
public static class BeforeThisStoreKey extends HStoreKey {
private final HStoreKey beforeThisKey;
/**
* @param beforeThisKey
*/
public BeforeThisStoreKey(final HStoreKey beforeThisKey) {
super();
this.beforeThisKey = beforeThisKey;
}
@Override
public int compareTo(final HStoreKey o) {
int result = this.beforeThisKey.compareTo(o);
return result == 0? -1: result;
}
@Override
public boolean equals(Object obj) {
return false;
}
@Override
public byte[] getColumn() {
return this.beforeThisKey.getColumn();
}
@Override
public byte[] getRow() {
return this.beforeThisKey.getRow();
}
@Override
public long heapSize() {
return this.beforeThisKey.heapSize();
}
@Override
public long getTimestamp() {
return this.beforeThisKey.getTimestamp();
}
@Override
public int hashCode() {
return this.beforeThisKey.hashCode();
}
@Override
public boolean matchesRowCol(HStoreKey other) {
return this.beforeThisKey.matchesRowCol(other);
}
@Override
public boolean matchesRowFamily(HStoreKey that) {
return this.beforeThisKey.matchesRowFamily(that);
}
@Override
public boolean matchesWithoutColumn(HStoreKey other) {
return this.beforeThisKey.matchesWithoutColumn(other);
}
@Override
public void readFields(DataInput in) throws IOException {
this.beforeThisKey.readFields(in);
}
@Override
public void set(HStoreKey k) {
this.beforeThisKey.set(k);
}
@Override
public void setColumn(byte[] c) {
this.beforeThisKey.setColumn(c);
}
@Override
public void setRow(byte[] newrow) {
this.beforeThisKey.setRow(newrow);
}
@Override
public void setVersion(long timestamp) {
this.beforeThisKey.setVersion(timestamp);
}
@Override
public String toString() {
return this.beforeThisKey.toString();
}
@Override
public void write(DataOutput out) throws IOException {
this.beforeThisKey.write(out);
}
@Override
public HRegionInfo getHRegionInfo() {
return this.beforeThisKey.getHRegionInfo();
}
@Override
public void setHRegionInfo(final HRegionInfo hri) {
this.beforeThisKey.setHRegionInfo(hri);
}
}
}

View File

@ -0,0 +1,249 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration.nineteen.io;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.util.Hash;
import org.apache.hadoop.hbase.migration.nineteen.HStoreKey;
import org.apache.hadoop.hbase.migration.nineteen.onelab.filter.BloomFilter;
import org.apache.hadoop.hbase.migration.nineteen.onelab.filter.Key;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
* On write, all keys are added to a bloom filter. On read, all keys are
* tested first against bloom filter. Keys are HStoreKey. If passed bloom
* filter is null, just passes invocation to parent.
*/
// TODO should be fixed generic warnings from MapFile methods
@SuppressWarnings("unchecked")
public class BloomFilterMapFile extends HBaseMapFile {
@SuppressWarnings("hiding")
static final Log LOG = LogFactory.getLog(BloomFilterMapFile.class);
protected static final String BLOOMFILTER_FILE_NAME = "filter";
public static class Reader extends HBaseReader {
private final BloomFilter bloomFilter;
/**
* @param fs
* @param dirName
* @param conf
* @param filter
* @param blockCacheEnabled
* @param hri
* @throws IOException
*/
public Reader(FileSystem fs, String dirName, Configuration conf,
final boolean filter, final boolean blockCacheEnabled,
HRegionInfo hri)
throws IOException {
super(fs, dirName, conf, blockCacheEnabled, hri);
if (filter) {
this.bloomFilter = loadBloomFilter(fs, dirName);
} else {
this.bloomFilter = null;
}
}
private BloomFilter loadBloomFilter(FileSystem fs, String dirName)
throws IOException {
Path filterFile = new Path(dirName, BLOOMFILTER_FILE_NAME);
if(!fs.exists(filterFile)) {
LOG.warn("FileNotFound: " + filterFile + "; proceeding without");
return null;
}
BloomFilter filter = new BloomFilter();
FSDataInputStream in = fs.open(filterFile);
try {
filter.readFields(in);
} finally {
in.close();
}
return filter;
}
@Override
public Writable get(WritableComparable key, Writable val)
throws IOException {
if (bloomFilter == null) {
return super.get(key, val);
}
if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
if (LOG.isDebugEnabled()) {
LOG.debug("bloom filter reported that key exists");
}
return super.get(key, val);
}
if (LOG.isDebugEnabled()) {
LOG.debug("bloom filter reported that key does not exist");
}
return null;
}
@Override
public WritableComparable getClosest(WritableComparable key,
Writable val) throws IOException {
if (bloomFilter == null) {
return super.getClosest(key, val);
}
// Note - the key being passed to us is always a HStoreKey
if(bloomFilter.membershipTest(getBloomFilterKey(key))) {
if (LOG.isDebugEnabled()) {
LOG.debug("bloom filter reported that key exists");
}
return super.getClosest(key, val);
}
if (LOG.isDebugEnabled()) {
LOG.debug("bloom filter reported that key does not exist");
}
return null;
}
/**
* @return size of the bloom filter
*/
public int getBloomFilterSize() {
return bloomFilter == null ? 0 : bloomFilter.getVectorSize();
}
}
public static class Writer extends HBaseWriter {
private static final double DEFAULT_NUMBER_OF_HASH_FUNCTIONS = 4.0;
private final BloomFilter bloomFilter;
private final String dirName;
private final FileSystem fs;
/**
* @param conf
* @param fs
* @param dirName
* @param compression
* @param filter
* @param nrows
* @param hri
* @throws IOException
*/
public Writer(Configuration conf, FileSystem fs, String dirName,
SequenceFile.CompressionType compression, final boolean filter,
int nrows, final HRegionInfo hri)
throws IOException {
super(conf, fs, dirName, compression, hri);
this.dirName = dirName;
this.fs = fs;
if (filter) {
/*
* There is no way to automatically determine the vector size and the
* number of hash functions to use. In particular, bloom filters are
* very sensitive to the number of elements inserted into them. For
* HBase, the number of entries depends on the size of the data stored
* in the column. Currently the default region size is 256MB, so the
* number of entries is approximately
* 256MB / (average value size for column).
*
* If m denotes the number of bits in the Bloom filter (vectorSize),
* n denotes the number of elements inserted into the Bloom filter and
* k represents the number of hash functions used (nbHash), then
* according to Broder and Mitzenmacher,
*
* ( http://www.eecs.harvard.edu/~michaelm/NEWWORK/postscripts/BloomFilterSurvey.pdf )
*
* the probability of false positives is minimized when k is
* approximately m/n ln(2).
*
* If we fix the number of hash functions and know the number of
* entries, then the optimal vector size m = (k * n) / ln(2)
*/
BloomFilter f = null;
try {
f = new BloomFilter(
(int) Math.ceil(
(DEFAULT_NUMBER_OF_HASH_FUNCTIONS * (1.0 * nrows)) /
Math.log(2.0)),
(int) DEFAULT_NUMBER_OF_HASH_FUNCTIONS,
Hash.getHashType(conf)
);
} catch (IllegalArgumentException e) {
LOG.warn("Failed creating bloomfilter; proceeding without", e);
}
this.bloomFilter = f;
} else {
this.bloomFilter = null;
}
}
@Override
public void append(WritableComparable key, Writable val)
throws IOException {
if (bloomFilter != null) {
bloomFilter.add(getBloomFilterKey(key));
}
super.append(key, val);
}
@Override
public synchronized void close() throws IOException {
super.close();
if (this.bloomFilter != null) {
flushBloomFilter();
}
}
/**
* Flushes bloom filter to disk
*
* @throws IOException
*/
private void flushBloomFilter() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("flushing bloom filter for " + this.dirName);
}
FSDataOutputStream out =
fs.create(new Path(dirName, BLOOMFILTER_FILE_NAME));
try {
bloomFilter.write(out);
} finally {
out.close();
}
if (LOG.isDebugEnabled()) {
LOG.debug("flushed bloom filter for " + this.dirName);
}
}
}
/**
* Custom bloom filter key maker.
* @param key
* @return Key made of bytes of row only.
*/
protected static Key getBloomFilterKey(WritableComparable key) {
return new Key(((HStoreKey) key).getRow());
}
}

View File

@ -17,16 +17,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
package org.apache.hadoop.hbase.migration.nineteen.io;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.migration.nineteen.HStoreKey;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
/**
@ -72,10 +73,10 @@ public class HBaseMapFile extends MapFile {
public HBaseReader(FileSystem fs, String dirName, Configuration conf,
boolean blockCacheEnabled, HRegionInfo hri)
throws IOException {
super(fs, dirName, new HStoreKey.HStoreKeyComparator(),
super(fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri),
conf, false); // defer opening streams
this.blockCacheEnabled = blockCacheEnabled;
open(fs, dirName, new HStoreKey.HStoreKeyComparator(), conf);
open(fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri), conf);
// Force reading of the mapfile index by calling midKey. Reading the
// index will bring the index into memory over here on the client and
@ -86,29 +87,8 @@ public class HBaseMapFile extends MapFile {
// resources (See HADOOP-2341). midKey() goes to index. Does not seek.
midKey();
}
@Override
protected org.apache.hadoop.hbase.io.SequenceFile.Reader createDataFileReader(
FileSystem fs, Path dataFile, Configuration conf)
throws IOException {
if (!blockCacheEnabled) {
return super.createDataFileReader(fs, dataFile, conf);
}
final int blockSize = conf.getInt("hbase.hstore.blockCache.blockSize",
64 * 1024);
return new SequenceFile.Reader(fs, dataFile, conf) {
@Override
protected FSDataInputStream openFile(FileSystem fs, Path file,
int bufferSize, long length)
throws IOException {
return new FSDataInputStream(new BlockFSInputStream(
super.openFile(fs, file, bufferSize, length), length,
blockSize));
}
};
}
}
public static class HBaseWriter extends MapFile.Writer {
/**
* @param conf
@ -121,7 +101,7 @@ public class HBaseMapFile extends MapFile {
public HBaseWriter(Configuration conf, FileSystem fs, String dirName,
SequenceFile.CompressionType compression, final HRegionInfo hri)
throws IOException {
super(conf, fs, dirName, new HStoreKey.HStoreKeyComparator(),
super(conf, fs, dirName, new HStoreKey.HStoreKeyWritableComparator(hri),
VALUE_CLASS, compression);
// Default for mapfiles is 128. Makes random reads faster if we
// have more keys indexed and we're not 'next'-ing around in the

View File

@ -17,15 +17,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io;
package org.apache.hadoop.hbase.migration.nineteen.io;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.migration.nineteen.HStoreKey;
import org.apache.hadoop.hbase.migration.nineteen.io.Reference.Range;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@ -44,7 +48,9 @@ import org.apache.hadoop.io.WritableComparable;
* <p>This file is not splitable. Calls to {@link #midKey()} return null.
*/
//TODO should be fixed generic warnings from MapFile methods
public class HalfMapFileReader extends HBaseMapFile.HBaseReader {
public class HalfMapFileReader extends BloomFilterMapFile.Reader {
private static final Log LOG = LogFactory.getLog(HalfMapFileReader.class);
private final boolean top;
private final HStoreKey midkey;
private boolean firstNextCall = true;
@ -63,7 +69,7 @@ public class HalfMapFileReader extends HBaseMapFile.HBaseReader {
final WritableComparable<HStoreKey> mk,
final HRegionInfo hri)
throws IOException {
this(fs, dirName, conf, r, mk, false, hri);
this(fs, dirName, conf, r, mk, false, false, hri);
}
/**
@ -72,23 +78,25 @@ public class HalfMapFileReader extends HBaseMapFile.HBaseReader {
* @param conf
* @param r
* @param mk
* @param filter
* @param blockCacheEnabled
* @param hri
* @throws IOException
*/
public HalfMapFileReader(final FileSystem fs, final String dirName,
final Configuration conf, final Range r,
final WritableComparable<HStoreKey> mk,
final WritableComparable<HStoreKey> mk, final boolean filter,
final boolean blockCacheEnabled,
final HRegionInfo hri)
throws IOException {
super(fs, dirName, conf, blockCacheEnabled, hri);
super(fs, dirName, conf, filter, blockCacheEnabled, hri);
// This is not actual midkey for this half-file; its just border
// around which we split top and bottom. Have to look in files to find
// actual last and first keys for bottom and top halves. Half-files don't
// have an actual midkey themselves. No midkey is how we indicate file is
// not splittable.
this.midkey = new HStoreKey((HStoreKey)mk);
this.midkey.setHRegionInfo(hri);
// Is it top or bottom half?
this.top = Reference.isTopFileRegion(r);
}
@ -144,6 +152,8 @@ public class HalfMapFileReader extends HBaseMapFile.HBaseReader {
// greater.
closest = (key.compareTo(this.midkey) < 0)?
this.midkey: super.getClosest(key, val);
// we know that we just went past the midkey
firstNextCall = false;
} else {
// We're serving bottom of the file.
if (key.compareTo(this.midkey) < 0) {
@ -188,7 +198,12 @@ public class HalfMapFileReader extends HBaseMapFile.HBaseReader {
}
}
boolean result = super.next(key, val);
if (!top && key.compareTo(midkey) >= 0) {
int cmpresult = key.compareTo(midkey);
if (top && cmpresult < 0) {
LOG.error("BUG BUG BUG. HalfMapFileReader wanted to return key out of range. DANGER");
throw new IOException("BUG BUG BUG. HalfMapFileReader wanted to return key out of range. DANGER");
} else if (!top && cmpresult >= 0) {
result = false;
}
return result;

View File

@ -0,0 +1,117 @@
/**
*
*/
package org.apache.hadoop.hbase.migration.nineteen.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.migration.nineteen.HStoreKey;
import org.apache.hadoop.io.Writable;
/**
* A reference to a part of a store file. The file referenced usually lives
* under a different region. The part referenced is usually the top or bottom
* half of the file. References are made at region split time. Being lazy
* about copying data between the parent of the split and the split daughters
* makes splitting faster.
*
* <p>References work with {@link HalfMapFileReader}. References know how to
* write out the reference format in the file system and are whats juggled when
* references are mixed in with direct store files. The
* {@link HalfMapFileReader} is used reading the referred to file.
*
* <p>References to store files located over in some other region look like
* this in the file system
* <code>1278437856009925445.hbaserepository,qAReLZD-OyQORZWq_vqR1k==,959247014679548184</code>:
* i.e. an id followed by the name of the referenced region. The data
* ('mapfiles') of references are empty. The accompanying <code>info</code> file
* contains the <code>midkey</code> that demarks top and bottom of the
* referenced storefile, the id of the remote store we're referencing and
* whether we're to serve the top or bottom region of the remote store file.
* Note, a region is itself not splitable if it has instances of store file
* references. References are cleaned up by compactions.
*/
public class Reference implements Writable {
// TODO: see if it makes sense making a ReferenceMapFile whose Writer is this
// class and whose Reader is the {@link HalfMapFileReader}.
private int encodedRegionName;
private long fileid;
private Range region;
private HStoreKey midkey;
/**
* For split HStoreFiles, it specifies if the file covers the lower half or
* the upper half of the key range
*/
public static enum Range {
/** HStoreFile contains upper half of key range */
top,
/** HStoreFile contains lower half of key range */
bottom
}
public Reference(final int ern, final long fid, final HStoreKey m,
final Range fr) {
this.encodedRegionName = ern;
this.fileid = fid;
this.region = fr;
this.midkey = m;
}
public Reference() {
this(-1, -1, null, Range.bottom);
}
public long getFileId() {
return fileid;
}
public Range getFileRegion() {
return region;
}
public HStoreKey getMidkey() {
return midkey;
}
public int getEncodedRegionName() {
return this.encodedRegionName;
}
@Override
public String toString() {
return encodedRegionName + "/" + fileid + "/" + region;
}
// Make it serializable.
public void write(DataOutput out) throws IOException {
// Write out the encoded region name as a String. Doing it as a String
// keeps a Reference's serialization backword compatible with
// pre-HBASE-82 serializations. ALternative is rewriting all
// info files in hbase (Serialized References are written into the
// 'info' file that accompanies HBase Store files).
out.writeUTF(Integer.toString(encodedRegionName));
out.writeLong(fileid);
// Write true if we're doing top of the file.
out.writeBoolean(isTopFileRegion(region));
this.midkey.write(out);
}
public void readFields(DataInput in) throws IOException {
this.encodedRegionName = Integer.parseInt(in.readUTF());
fileid = in.readLong();
boolean tmp = in.readBoolean();
// If true, set region to top.
region = tmp? Range.top: Range.bottom;
midkey = new HStoreKey();
midkey.readFields(in);
}
public static boolean isTopFileRegion(final Range r) {
return r.equals(Range.top);
}
}

View File

@ -0,0 +1,240 @@
/**
*
* Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
* All rights reserved.
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the distribution.
* - Neither the name of the University Catholique de Louvain - UCL
* nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration.nineteen.onelab.filter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.BitSet;
import org.apache.hadoop.hbase.util.Hash;
/**
* Implements a <i>Bloom filter</i>, as defined by Bloom in 1970.
* <p>
* The Bloom filter is a data structure that was introduced in 1970 and that has been adopted by
* the networking research community in the past decade thanks to the bandwidth efficiencies that it
* offers for the transmission of set membership information between networked hosts. A sender encodes
* the information into a bit vector, the Bloom filter, that is more compact than a conventional
* representation. Computation and space costs for construction are linear in the number of elements.
* The receiver uses the filter to test whether various elements are members of the set. Though the
* filter will occasionally return a false positive, it will never return a false negative. When creating
* the filter, the sender can choose its desired point in a trade-off between the false positive rate and the size.
*
* contract <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
*
* @version 1.0 - 2 Feb. 07
*
* @see org.onelab.filter.Filter The general behavior of a filter
*
* @see <a href="http://portal.acm.org/citation.cfm?id=362692&dl=ACM&coll=portal">Space/Time Trade-Offs in Hash Coding with Allowable Errors</a>
*/
public class BloomFilter extends Filter {
private static final byte[] bitvalues = new byte[] {
(byte)0x01,
(byte)0x02,
(byte)0x04,
(byte)0x08,
(byte)0x10,
(byte)0x20,
(byte)0x40,
(byte)0x80
};
/** The bit vector. */
BitSet bits;
/** Default constructor - use with readFields */
public BloomFilter() {
super();
}
/**
* Constructor
* @param vectorSize The vector size of <i>this</i> filter.
* @param nbHash The number of hash function to consider.
* @param hashType type of the hashing function (see {@link Hash}).
*/
public BloomFilter(int vectorSize, int nbHash, int hashType){
super(vectorSize, nbHash, hashType);
bits = new BitSet(this.vectorSize);
}//end constructor
@Override
public void add(Key key) {
if(key == null) {
throw new NullPointerException("key cannot be null");
}
int[] h = hash.hash(key);
hash.clear();
for(int i = 0; i < nbHash; i++) {
bits.set(h[i]);
}
}//end add()
@Override
public void and(Filter filter){
if(filter == null
|| !(filter instanceof BloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be and-ed");
}
this.bits.and(((BloomFilter) filter).bits);
}//end and()
@Override
public boolean membershipTest(Key key){
if(key == null) {
throw new NullPointerException("key cannot be null");
}
int[] h = hash.hash(key);
hash.clear();
for(int i = 0; i < nbHash; i++) {
if(!bits.get(h[i])) {
return false;
}
}
return true;
}//end memberhsipTest()
@Override
public void not(){
bits.flip(0, vectorSize - 1);
}//end not()
@Override
public void or(Filter filter){
if(filter == null
|| !(filter instanceof BloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be or-ed");
}
bits.or(((BloomFilter) filter).bits);
}//end or()
@Override
public void xor(Filter filter){
if(filter == null
|| !(filter instanceof BloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be xor-ed");
}
bits.xor(((BloomFilter) filter).bits);
}//and xor()
@Override
public String toString(){
return bits.toString();
}//end toString()
@Override
public Object clone(){
BloomFilter bf = new BloomFilter(vectorSize, nbHash, hashType);
bf.or(this);
return bf;
}//end clone()
/**
* @return size of the the bloomfilter
*/
public int getVectorSize() {
return this.vectorSize;
}
// Writable
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
byte[] bytes = new byte[getNBytes()];
for(int i = 0, byteIndex = 0, bitIndex = 0; i < vectorSize; i++, bitIndex++) {
if (bitIndex == 8) {
bitIndex = 0;
byteIndex++;
}
if (bitIndex == 0) {
bytes[byteIndex] = 0;
}
if (bits.get(i)) {
bytes[byteIndex] |= bitvalues[bitIndex];
}
}
out.write(bytes);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
bits = new BitSet(this.vectorSize);
byte[] bytes = new byte[getNBytes()];
in.readFully(bytes);
for(int i = 0, byteIndex = 0, bitIndex = 0; i < vectorSize; i++, bitIndex++) {
if (bitIndex == 8) {
bitIndex = 0;
byteIndex++;
}
if ((bytes[byteIndex] & bitvalues[bitIndex]) != 0) {
bits.set(i);
}
}
}
/* @return number of bytes needed to hold bit vector */
private int getNBytes() {
return (vectorSize + 7) / 8;
}
}//end class

View File

@ -0,0 +1,314 @@
/**
*
* Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
* All rights reserved.
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the distribution.
* - Neither the name of the University Catholique de Louvain - UCL
* nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration.nineteen.onelab.filter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays; //TODO: remove
import org.apache.hadoop.hbase.util.Hash;
/**
* Implements a <i>counting Bloom filter</i>, as defined by Fan et al. in a ToN
* 2000 paper.
* <p>
* A counting Bloom filter is an improvement to standard a Bloom filter as it
* allows dynamic additions and deletions of set membership information. This
* is achieved through the use of a counting vector instead of a bit vector.
*
* contract <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
*
* @version 1.1 - 19 Jan. 08
*
* @see org.onelab.filter.Filter The general behavior of a filter
*
* @see <a href="http://portal.acm.org/citation.cfm?id=343571.343572">Summary cache: a scalable wide-area web cache sharing protocol</a>
*/
public final class CountingBloomFilter extends Filter {
/** Storage for the counting buckets */
private long[] buckets;
/** We are using 4bit buckets, so each bucket can count to 15 */
private final static long BUCKET_MAX_VALUE = 15;
/** Default constructor - use with readFields */
public CountingBloomFilter() {}
/**
* Constructor
* @param vectorSize The vector size of <i>this</i> filter.
* @param nbHash The number of hash function to consider.
* @param hashType type of the hashing function (see {@link Hash}).
*/
public CountingBloomFilter(int vectorSize, int nbHash, int hashType){
super(vectorSize, nbHash, hashType);
buckets = new long[buckets2words(vectorSize)];
}//end constructor
/** returns the number of 64 bit words it would take to hold vectorSize buckets */
private static int buckets2words(int vectorSize) {
return ((vectorSize - 1) >>> 4) + 1;
}
@Override
public void add(Key key) {
if(key == null) {
throw new NullPointerException("key can not be null");
}
int[] h = hash.hash(key);
hash.clear();
for(int i = 0; i < nbHash; i++) {
// find the bucket
int wordNum = h[i] >> 4; // div 16
int bucketShift = (h[i] & 0x0f) << 2; // (mod 16) * 4
long bucketMask = 15L << bucketShift;
long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
// only increment if the count in the bucket is less than BUCKET_MAX_VALUE
if(bucketValue < BUCKET_MAX_VALUE) {
// increment by 1
buckets[wordNum] = (buckets[wordNum] & ~bucketMask) | ((bucketValue + 1) << bucketShift);
}
}
}//end add()
/**
* Removes a specified key from <i>this</i> counting Bloom filter.
* <p>
* <b>Invariant</b>: nothing happens if the specified key does not belong to <i>this</i> counter Bloom filter.
* @param key The key to remove.
*/
public void delete(Key key) {
if(key == null) {
throw new NullPointerException("Key may not be null");
}
if(!membershipTest(key)) {
throw new IllegalArgumentException("Key is not a member");
}
int[] h = hash.hash(key);
hash.clear();
for(int i = 0; i < nbHash; i++) {
// find the bucket
int wordNum = h[i] >> 4; // div 16
int bucketShift = (h[i] & 0x0f) << 2; // (mod 16) * 4
long bucketMask = 15L << bucketShift;
long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
// only decrement if the count in the bucket is between 0 and BUCKET_MAX_VALUE
if(bucketValue >= 1 && bucketValue < BUCKET_MAX_VALUE) {
// decrement by 1
buckets[wordNum] = (buckets[wordNum] & ~bucketMask) | ((bucketValue - 1) << bucketShift);
}
}
}//end delete
@Override
public void and(Filter filter){
if(filter == null
|| !(filter instanceof CountingBloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be and-ed");
}
CountingBloomFilter cbf = (CountingBloomFilter)filter;
int sizeInWords = buckets2words(vectorSize);
for(int i = 0; i < sizeInWords; i++) {
this.buckets[i] &= cbf.buckets[i];
}
}//end and()
@Override
public boolean membershipTest(Key key){
if(key == null) {
throw new NullPointerException("Key may not be null");
}
int[] h = hash.hash(key);
hash.clear();
for(int i = 0; i < nbHash; i++) {
// find the bucket
int wordNum = h[i] >> 4; // div 16
int bucketShift = (h[i] & 0x0f) << 2; // (mod 16) * 4
long bucketMask = 15L << bucketShift;
if((buckets[wordNum] & bucketMask) == 0) {
return false;
}
}
return true;
}//end membershipTest()
/**
* This method calculates an approximate count of the key, i.e. how many
* times the key was added to the filter. This allows the filter to be
* used as an approximate <code>key -&gt; count</code> map.
* <p>NOTE: due to the bucket size of this filter, inserting the same
* key more than 15 times will cause an overflow at all filter positions
* associated with this key, and it will significantly increase the error
* rate for this and other keys. For this reason the filter can only be
* used to store small count values <code>0 &lt;= N &lt;&lt; 15</code>.
* @param key key to be tested
* @return 0 if the key is not present. Otherwise, a positive value v will
* be returned such that <code>v == count</code> with probability equal to the
* error rate of this filter, and <code>v &gt; count</code> otherwise.
* Additionally, if the filter experienced an underflow as a result of
* {@link #delete(Key)} operation, the return value may be lower than the
* <code>count</code> with the probability of the false negative rate of such
* filter.
*/
public int approximateCount(Key key) {
int res = Integer.MAX_VALUE;
int[] h = hash.hash(key);
hash.clear();
for (int i = 0; i < nbHash; i++) {
// find the bucket
int wordNum = h[i] >> 4; // div 16
int bucketShift = (h[i] & 0x0f) << 2; // (mod 16) * 4
long bucketMask = 15L << bucketShift;
long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
if (bucketValue < res) res = (int)bucketValue;
}
if (res != Integer.MAX_VALUE) {
return res;
} else {
return 0;
}
}
@Override
public void not(){
throw new UnsupportedOperationException("not() is undefined for "
+ this.getClass().getName());
}//end not()
@Override
public void or(Filter filter){
if(filter == null
|| !(filter instanceof CountingBloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be or-ed");
}
CountingBloomFilter cbf = (CountingBloomFilter)filter;
int sizeInWords = buckets2words(vectorSize);
for(int i = 0; i < sizeInWords; i++) {
this.buckets[i] |= cbf.buckets[i];
}
}//end or()
@Override
@SuppressWarnings("unused")
public void xor(Filter filter){
throw new UnsupportedOperationException("xor() is undefined for "
+ this.getClass().getName());
}//end xor()
@Override
public String toString(){
StringBuilder res = new StringBuilder();
for(int i = 0; i < vectorSize; i++) {
if(i > 0) {
res.append(" ");
}
int wordNum = i >> 4; // div 16
int bucketShift = (i & 0x0f) << 2; // (mod 16) * 4
long bucketMask = 15L << bucketShift;
long bucketValue = (buckets[wordNum] & bucketMask) >>> bucketShift;
res.append(bucketValue);
}
return res.toString();
}//end toString()
@Override
public Object clone(){
CountingBloomFilter cbf = new CountingBloomFilter(vectorSize, nbHash, hashType);
cbf.buckets = this.buckets.clone();
return cbf;
}//end clone()
// Writable
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
int sizeInWords = buckets2words(vectorSize);
for(int i = 0; i < sizeInWords; i++) {
out.writeLong(buckets[i]);
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
int sizeInWords = buckets2words(vectorSize);
buckets = new long[sizeInWords];
for(int i = 0; i < sizeInWords; i++) {
buckets[i] = in.readLong();
}
}
}//end class

View File

@ -0,0 +1,303 @@
/**
*
* Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
* All rights reserved.
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the distribution.
* - Neither the name of the University Catholique de Louvain - UCL
* nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration.nineteen.onelab.filter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.util.Hash;
/**
* Implements a <i>dynamic Bloom filter</i>, as defined in the INFOCOM 2006 paper.
* <p>
* A dynamic Bloom filter (DBF) makes use of a <code>s * m</code> bit matrix but
* each of the <code>s</code> rows is a standard Bloom filter. The creation
* process of a DBF is iterative. At the start, the DBF is a <code>1 * m</code>
* bit matrix, i.e., it is composed of a single standard Bloom filter.
* It assumes that <code>n<sub>r</sub></code> elements are recorded in the
* initial bit vector, where <code>n<sub>r</sub> <= n</code> (<code>n</code> is
* the cardinality of the set <code>A</code> to record in the filter).
* <p>
* As the size of <code>A</code> grows during the execution of the application,
* several keys must be inserted in the DBF. When inserting a key into the DBF,
* one must first get an active Bloom filter in the matrix. A Bloom filter is
* active when the number of recorded keys, <code>n<sub>r</sub></code>, is
* strictly less than the current cardinality of <code>A</code>, <code>n</code>.
* If an active Bloom filter is found, the key is inserted and
* <code>n<sub>r</sub></code> is incremented by one. On the other hand, if there
* is no active Bloom filter, a new one is created (i.e., a new row is added to
* the matrix) according to the current size of <code>A</code> and the element
* is added in this new Bloom filter and the <code>n<sub>r</sub></code> value of
* this new Bloom filter is set to one. A given key is said to belong to the
* DBF if the <code>k</code> positions are set to one in one of the matrix rows.
*
* contract <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
*
* @version 1.0 - 6 Feb. 07
*
* @see org.onelab.filter.Filter The general behavior of a filter
* @see org.onelab.filter.BloomFilter A Bloom filter
*
* @see <a href="http://www.cse.fau.edu/~jie/research/publications/Publication_files/infocom2006.pdf">Theory and Network Applications of Dynamic Bloom Filters</a>
*/
public class DynamicBloomFilter extends Filter {
/**
* Threshold for the maximum number of key to record in a dynamic Bloom filter row.
*/
private int nr;
/**
* The number of keys recorded in the current standard active Bloom filter.
*/
private int currentNbRecord;
/**
* The matrix of Bloom filter.
*/
private BloomFilter[] matrix;
/**
* Zero-args constructor for the serialization.
*/
public DynamicBloomFilter() { }
/**
* Constructor.
* <p>
* Builds an empty Dynamic Bloom filter.
* @param vectorSize The number of bits in the vector.
* @param nbHash The number of hash function to consider.
* @param hashType type of the hashing function (see {@link Hash}).
* @param nr The threshold for the maximum number of keys to record in a dynamic Bloom filter row.
*/
public DynamicBloomFilter(int vectorSize, int nbHash, int hashType, int nr) {
super(vectorSize, nbHash, hashType);
this.nr = nr;
this.currentNbRecord = 0;
matrix = new BloomFilter[1];
matrix[0] = new BloomFilter(this.vectorSize, this.nbHash, this.hashType);
}//end constructor
@Override
public void add(Key key){
if(key == null) {
throw new NullPointerException("Key can not be null");
}
BloomFilter bf = getActiveStandardBF();
if(bf == null){
addRow();
bf = matrix[matrix.length - 1];
currentNbRecord = 0;
}
bf.add(key);
currentNbRecord++;
}//end add()
@Override
public void and(Filter filter) {
if(filter == null
|| !(filter instanceof DynamicBloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be and-ed");
}
DynamicBloomFilter dbf = (DynamicBloomFilter)filter;
if(dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) {
throw new IllegalArgumentException("filters cannot be and-ed");
}
for(int i = 0; i < matrix.length; i++) {
matrix[i].and(dbf.matrix[i]);
}
}//end and()
@Override
public boolean membershipTest(Key key){
if(key == null) {
return true;
}
for(int i = 0; i < matrix.length; i++) {
if(matrix[i].membershipTest(key)) {
return true;
}
}
return false;
}//end membershipTest()
@Override
public void not(){
for(int i = 0; i < matrix.length; i++) {
matrix[i].not();
}
}//end not()
@Override
public void or(Filter filter){
if(filter == null
|| !(filter instanceof DynamicBloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be or-ed");
}
DynamicBloomFilter dbf = (DynamicBloomFilter)filter;
if(dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) {
throw new IllegalArgumentException("filters cannot be or-ed");
}
for(int i = 0; i < matrix.length; i++) {
matrix[i].or(dbf.matrix[i]);
}
}//end or()
@Override
public void xor(Filter filter){
if(filter == null
|| !(filter instanceof DynamicBloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be xor-ed");
}
DynamicBloomFilter dbf = (DynamicBloomFilter)filter;
if(dbf.matrix.length != this.matrix.length || dbf.nr != this.nr) {
throw new IllegalArgumentException("filters cannot be xor-ed");
}
for(int i = 0; i<matrix.length; i++) {
matrix[i].xor(dbf.matrix[i]);
}
}//end xor()
@Override
public String toString(){
StringBuilder res = new StringBuilder();
for(int i=0; i<matrix.length; i++) {
res.append(matrix[i]);
res.append(Character.LINE_SEPARATOR);
}
return res.toString();
}//end toString()
@Override
public Object clone(){
DynamicBloomFilter dbf = new DynamicBloomFilter(vectorSize, nbHash, hashType, nr);
dbf.currentNbRecord = this.currentNbRecord;
dbf.matrix = new BloomFilter[this.matrix.length];
for(int i = 0; i < this.matrix.length; i++) {
dbf.matrix[i] = (BloomFilter)this.matrix[i].clone();
}
return dbf;
}//end clone()
// Writable
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(nr);
out.writeInt(currentNbRecord);
out.writeInt(matrix.length);
for (int i = 0; i < matrix.length; i++) {
matrix[i].write(out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
nr = in.readInt();
currentNbRecord = in.readInt();
int len = in.readInt();
matrix = new BloomFilter[len];
for (int i = 0; i < matrix.length; i++) {
matrix[i] = new BloomFilter();
matrix[i].readFields(in);
}
}
/**
* Adds a new row to <i>this</i> dynamic Bloom filter.
*/
private void addRow(){
BloomFilter[] tmp = new BloomFilter[matrix.length + 1];
for(int i = 0; i < matrix.length; i++) {
tmp[i] = (BloomFilter)matrix[i].clone();
}
tmp[tmp.length-1] = new BloomFilter(vectorSize, nbHash, hashType);
matrix = tmp;
}//end addRow()
/**
* Returns the active standard Bloom filter in <i>this</i> dynamic Bloom filter.
* @return BloomFilter The active standard Bloom filter.
* <code>Null</code> otherwise.
*/
private BloomFilter getActiveStandardBF() {
if(currentNbRecord >= nr) {
return null;
}
return matrix[matrix.length - 1];
}//end getActiveStandardBF()
}//end class

View File

@ -0,0 +1,216 @@
/**
*
* Copyright (c) 2005, European Commission project OneLab under contract 034819
* (http://www.one-lab.org)
*
* All rights reserved.
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the distribution.
* - Neither the name of the University Catholique de Louvain - UCL
* nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration.nineteen.onelab.filter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.util.Hash;
import org.apache.hadoop.io.Writable;
/**
* Defines the general behavior of a filter.
* <p>
* A filter is a data structure which aims at offering a lossy summary of a set <code>A</code>. The
* key idea is to map entries of <code>A</code> (also called <i>keys</i>) into several positions
* in a vector through the use of several hash functions.
* <p>
* Typically, a filter will be implemented as a Bloom filter (or a Bloom filter extension).
* <p>
* It must be extended in order to define the real behavior.
*
* @see org.onelab.filter.Filter The general behavior of a filter
*
* @version 1.0 - 2 Feb. 07
*
* @see org.onelab.filter.Key The general behavior of a key
* @see org.onelab.filter.HashFunction A hash function
*/
public abstract class Filter implements Writable {
private static final int VERSION = -1; // negative to accommodate for old format
/** The vector size of <i>this</i> filter. */
protected int vectorSize;
/** The hash function used to map a key to several positions in the vector. */
protected HashFunction hash;
/** The number of hash function to consider. */
protected int nbHash;
/** Type of hashing function to use. */
protected int hashType;
protected Filter() {}
/**
* Constructor.
* @param vectorSize The vector size of <i>this</i> filter.
* @param nbHash The number of hash functions to consider.
* @param hashType type of the hashing function (see {@link Hash}).
*/
protected Filter(int vectorSize, int nbHash, int hashType) {
this.vectorSize = vectorSize;
this.nbHash = nbHash;
this.hashType = hashType;
this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType);
}//end constructor
/**
* Adds a key to <i>this</i> filter.
* @param key The key to add.
*/
public abstract void add(Key key);
/**
* Determines wether a specified key belongs to <i>this</i> filter.
* @param key The key to test.
* @return boolean True if the specified key belongs to <i>this</i> filter.
* False otherwise.
*/
public abstract boolean membershipTest(Key key);
/**
* Peforms a logical AND between <i>this</i> filter and a specified filter.
* <p>
* <b>Invariant</b>: The result is assigned to <i>this</i> filter.
* @param filter The filter to AND with.
*/
public abstract void and(Filter filter);
/**
* Peforms a logical OR between <i>this</i> filter and a specified filter.
* <p>
* <b>Invariant</b>: The result is assigned to <i>this</i> filter.
* @param filter The filter to OR with.
*/
public abstract void or(Filter filter);
/**
* Peforms a logical XOR between <i>this</i> filter and a specified filter.
* <p>
* <b>Invariant</b>: The result is assigned to <i>this</i> filter.
* @param filter The filter to XOR with.
*/
public abstract void xor(Filter filter);
/**
* Performs a logical NOT on <i>this</i> filter.
* <p>
* The result is assigned to <i>this</i> filter.
*/
public abstract void not();
/**
* Adds a list of keys to <i>this</i> filter.
* @param keys The list of keys.
*/
public void add(List<Key> keys){
if(keys == null) {
throw new IllegalArgumentException("ArrayList<Key> may not be null");
}
for(Key key: keys) {
add(key);
}
}//end add()
/**
* Adds a collection of keys to <i>this</i> filter.
* @param keys The collection of keys.
*/
public void add(Collection<Key> keys){
if(keys == null) {
throw new IllegalArgumentException("Collection<Key> may not be null");
}
for(Key key: keys) {
add(key);
}
}//end add()
/**
* Adds an array of keys to <i>this</i> filter.
* @param keys The array of keys.
*/
public void add(Key[] keys){
if(keys == null) {
throw new IllegalArgumentException("Key[] may not be null");
}
for(int i = 0; i < keys.length; i++) {
add(keys[i]);
}
}//end add()
// Writable interface
public void write(DataOutput out) throws IOException {
out.writeInt(VERSION);
out.writeInt(this.nbHash);
out.writeByte(this.hashType);
out.writeInt(this.vectorSize);
}
public void readFields(DataInput in) throws IOException {
int ver = in.readInt();
if (ver > 0) { // old unversioned format
this.nbHash = ver;
this.hashType = Hash.JENKINS_HASH;
} else if (ver == VERSION) {
this.nbHash = in.readInt();
this.hashType = in.readByte();
} else {
throw new IOException("Unsupported version: " + ver);
}
this.vectorSize = in.readInt();
this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashType);
}
}//end class

View File

@ -0,0 +1,127 @@
/**
*
* Copyright (c) 2005, European Commission project OneLab under contract 034819
* (http://www.one-lab.org)
*
* All rights reserved.
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the distribution.
* - Neither the name of the University Catholique de Louvain - UCL
* nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration.nineteen.onelab.filter;
import org.apache.hadoop.hbase.util.Hash;
/**
* Implements a hash object that returns a certain number of hashed values.
* <p>
* It is based on the SHA-1 algorithm.
*
* @see org.onelab.filter.Filter The general behavior of a filter
*
* @version 1.0 - 2 Feb. 07
*
* @see org.onelab.filter.Key The general behavior of a key being stored in a filter
* @see org.onelab.filter.Filter The general behavior of a filter
*
* @see <a href="http://www.itl.nist.gov/fipspubs/fip180-1.htm">SHA-1 algorithm</a>
*/
public final class HashFunction {
/** The number of hashed values. */
private int nbHash;
/** The maximum highest returned value. */
private int maxValue;
/** Hashing algorithm to use. */
private Hash hashFunction;
/**
* Constructor.
* <p>
* Builds a hash function that must obey to a given maximum number of returned values and a highest value.
* @param maxValue The maximum highest returned value.
* @param nbHash The number of resulting hashed values.
* @param hashType type of the hashing function (see {@link Hash}).
*/
public HashFunction(int maxValue, int nbHash, int hashType) {
if(maxValue <= 0) {
throw new IllegalArgumentException("maxValue must be > 0");
}
if(nbHash <= 0) {
throw new IllegalArgumentException("nbHash must be > 0");
}
this.maxValue = maxValue;
this.nbHash = nbHash;
this.hashFunction = Hash.getInstance(hashType);
if (this.hashFunction == null)
throw new IllegalArgumentException("hashType must be known");
}//end constructor
/** Clears <i>this</i> hash function. A NOOP */
public void clear() {
}
/**
* Hashes a specified key into several integers.
* @param k The specified key.
* @return The array of hashed values.
*/
public int[] hash(Key k){
byte[] b = k.getBytes();
if(b == null) {
throw new NullPointerException("buffer reference is null");
}
if(b.length == 0) {
throw new IllegalArgumentException("key length must be > 0");
}
int[] result = new int[nbHash];
for (int i = 0, initval = 0; i < nbHash; i++) {
initval = hashFunction.hash(b, initval);
result[i] = Math.abs(initval) % maxValue;
}
return result;
}//end hash()
}//end class

View File

@ -0,0 +1,176 @@
/**
*
* Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
* All rights reserved.
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the distribution.
* - Neither the name of the University Catholique de Louvain - UCL
* nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration.nineteen.onelab.filter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* The general behavior of a key that must be stored in a filter.
*
* @see org.onelab.filter.Filter The general behavior of a filter
*/
public class Key implements WritableComparable {
/** Byte value of key */
byte[] bytes;
/**
* The weight associated to <i>this</i> key.
* <p>
* <b>Invariant</b>: if it is not specified, each instance of
* <code>Key</code> will have a default weight of 1.0
*/
double weight;
/** default constructor - use with readFields */
public Key() {}
/**
* Constructor.
* <p>
* Builds a key with a default weight.
* @param value The byte value of <i>this</i> key.
*/
public Key(byte[] value) {
this(value, 1.0);
}//end constructor
/**
* Constructor.
* <p>
* Builds a key with a specified weight.
* @param value The value of <i>this</i> key.
* @param weight The weight associated to <i>this</i> key.
*/
public Key(byte[] value, double weight) {
set(value, weight);
}//end constructor
/**
* @param value
* @param weight
*/
public void set(byte[] value, double weight) {
if(value == null) {
throw new IllegalArgumentException("value can not be null");
}
this.bytes = value;
this.weight = weight;
}
/** @return byte[] The value of <i>this</i> key. */
public byte[] getBytes() {
return this.bytes;
}
/** @return Returns the weight associated to <i>this</i> key. */
public double getWeight(){
return weight;
}//end getWeight()
/**
* Increments the weight of <i>this</i> key with a specified value.
* @param weight The increment.
*/
public void incrementWeight(double weight){
this.weight += weight;
}//end incrementWeight()
/** Increments the weight of <i>this</i> key by one. */
public void incrementWeight(){
this.weight++;
}//end incrementWeight()
@Override
public boolean equals(Object o) {
return this.compareTo(o) == 0;
}
@Override
public int hashCode() {
int result = 0;
for(int i = 0; i < bytes.length; i++) {
result ^= Byte.valueOf(bytes[i]).hashCode();
}
result ^= Double.valueOf(weight).hashCode();
return result;
}
// Writable
public void write(DataOutput out) throws IOException {
out.writeInt(bytes.length);
out.write(bytes);
out.writeDouble(weight);
}
public void readFields(DataInput in) throws IOException {
this.bytes = new byte[in.readInt()];
in.readFully(this.bytes);
weight = in.readDouble();
}
// Comparable
public int compareTo(Object o) {
Key other = (Key)o;
int result = this.bytes.length - other.getBytes().length;
for(int i = 0; result == 0 && i < bytes.length; i++) {
result = this.bytes[i] - other.bytes[i];
}
if(result == 0) {
result = Double.valueOf(this.weight - other.weight).intValue();
}
return result;
}
}//end class

View File

@ -0,0 +1,91 @@
/**
*
* Copyright (c) 2005, European Commission project OneLab under contract 034819
* (http://www.one-lab.org)
*
* All rights reserved.
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the distribution.
* - Neither the name of the University Catholique de Louvain - UCL
* nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration.nineteen.onelab.filter;
/**
* Defines the different remove scheme for retouched Bloom filters.
*
* contract <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
*
* @version 1.0 - 7 Feb. 07
*/
public interface RemoveScheme {
/**
* Random selection.
* <p>
* The idea is to randomly select a bit to reset.
*/
public final static short RANDOM = 0;
/**
* MinimumFN Selection.
* <p>
* The idea is to select the bit to reset that will generate the minimum
* number of false negative.
*/
public final static short MINIMUM_FN = 1;
/**
* MaximumFP Selection.
* <p>
* The idea is to select the bit to reset that will remove the maximum number
* of false positive.
*/
public final static short MAXIMUM_FP = 2;
/**
* Ratio Selection.
* <p>
* The idea is to select the bit to reset that will, at the same time, remove
* the maximum number of false positve while minimizing the amount of false
* negative generated.
*/
public final static short RATIO = 3;
}//end interface

View File

@ -0,0 +1,450 @@
/**
*
* Copyright (c) 2005, European Commission project OneLab under contract 034819 (http://www.one-lab.org)
* All rights reserved.
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the distribution.
* - Neither the name of the University Catholique de Louvain - UCL
* nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration.nineteen.onelab.filter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.hbase.util.Hash;
/**
* Implements a <i>retouched Bloom filter</i>, as defined in the CoNEXT 2006 paper.
* <p>
* It allows the removal of selected false positives at the cost of introducing
* random false negatives, and with the benefit of eliminating some random false
* positives at the same time.
*
* contract <a href="http://www.one-lab.org">European Commission One-Lab Project 034819</a>.
*
* @version 1.0 - 7 Feb. 07
*
* @see org.onelab.filter.Filter The general behavior of a filter
* @see org.onelab.filter.BloomFilter A Bloom filter
* @see org.onelab.filter.RemoveScheme The different selective clearing algorithms
*
* @see <a href="http://www-rp.lip6.fr/site_npa/site_rp/_publications/740-rbf_cameraready.pdf">Retouched Bloom Filters: Allowing Networked Applications to Trade Off Selected False Positives Against False Negatives</a>
*/
public final class RetouchedBloomFilter extends BloomFilter
implements RemoveScheme {
/**
* KeyList vector (or ElementList Vector, as defined in the paper) of false positives.
*/
List<Key>[] fpVector;
/**
* KeyList vector of keys recorded in the filter.
*/
List<Key>[] keyVector;
/**
* Ratio vector.
*/
double[] ratio;
private Random rand;
/** Default constructor - use with readFields */
public RetouchedBloomFilter() {}
/**
* Constructor
* @param vectorSize The vector size of <i>this</i> filter.
* @param nbHash The number of hash function to consider.
* @param hashType type of the hashing function (see {@link Hash}).
*/
public RetouchedBloomFilter(int vectorSize, int nbHash, int hashType) {
super(vectorSize, nbHash, hashType);
this.rand = null;
createVector();
}//end constructor
@Override
public void add(Key key){
if(key == null) {
throw new NullPointerException("key can not be null");
}
int[] h = hash.hash(key);
hash.clear();
for(int i = 0; i < nbHash; i++) {
bits.set(h[i]);
keyVector[h[i]].add(key);
}//end for - i
}//end add()
/**
* Adds a false positive information to <i>this</i> retouched Bloom filter.
* <p>
* <b>Invariant</b>: if the false positive is <code>null</code>, nothing happens.
* @param key The false positive key to add.
*/
public void addFalsePositive(Key key){
if(key == null) {
throw new NullPointerException("key can not be null");
}
int[] h = hash.hash(key);
hash.clear();
for(int i = 0; i < nbHash; i++) {
fpVector[h[i]].add(key);
}
}//end addFalsePositive()
/**
* Adds a collection of false positive information to <i>this</i> retouched Bloom filter.
* @param coll The collection of false positive.
*/
public void addFalsePositive(Collection<Key> coll) {
if(coll == null) {
throw new NullPointerException("Collection<Key> can not be null");
}
for(Key k: coll) {
addFalsePositive(k);
}
}//end addFalsePositive()
/**
* Adds a list of false positive information to <i>this</i> retouched Bloom filter.
* @param keys The list of false positive.
*/
public void addFalsePositive(List<Key> keys){
if(keys == null) {
throw new NullPointerException("ArrayList<Key> can not be null");
}
for(Key k: keys) {
addFalsePositive(k);
}
}//end addFalsePositive()
/**
* Adds an array of false positive information to <i>this</i> retouched Bloom filter.
* @param keys The array of false positive.
*/
public void addFalsePositive(Key[] keys){
if(keys == null) {
throw new NullPointerException("Key[] can not be null");
}
for(int i = 0; i < keys.length; i++) {
addFalsePositive(keys[i]);
}
}//end addFalsePositive()
/**
* Performs the selective clearing for a given key.
* @param k The false positive key to remove from <i>this</i> retouched Bloom filter.
* @param scheme The selective clearing scheme to apply.
*/
public void selectiveClearing(Key k, short scheme) {
if(k == null) {
throw new NullPointerException("Key can not be null");
}
if(!membershipTest(k)) {
throw new IllegalArgumentException("Key is not a member");
}
int index = 0;
int[] h = hash.hash(k);
switch(scheme) {
case RANDOM:
index = randomRemove();
break;
case MINIMUM_FN:
index = minimumFnRemove(h);
break;
case MAXIMUM_FP:
index = maximumFpRemove(h);
break;
case RATIO:
index = ratioRemove(h);
break;
default:
throw new AssertionError("Undefined selective clearing scheme");
}//end switch
clearBit(index);
}//end selectiveClearing()
private int randomRemove() {
if(rand == null) {
rand = new Random();
}
return rand.nextInt(nbHash);
}//end randomRemove()
/**
* Chooses the bit position that minimizes the number of false negative generated.
* @param h The different bit positions.
* @return int The position that minimizes the number of false negative generated.
*/
private int minimumFnRemove(int[] h) {
int minIndex = Integer.MAX_VALUE;
double minValue = Double.MAX_VALUE;
for(int i = 0; i < nbHash; i++) {
double keyWeight = getWeight(keyVector[h[i]]);
if(keyWeight < minValue) {
minIndex = h[i];
minValue = keyWeight;
}
}//end for - i
return minIndex;
}//end minimumFnRemove()
/**
* Chooses the bit position that maximizes the number of false positive removed.
* @param h The different bit positions.
* @return int The position that maximizes the number of false positive removed.
*/
private int maximumFpRemove(int[] h){
int maxIndex = Integer.MIN_VALUE;
double maxValue = Double.MIN_VALUE;
for(int i = 0; i < nbHash; i++) {
double fpWeight = getWeight(fpVector[h[i]]);
if(fpWeight > maxValue) {
maxValue = fpWeight;
maxIndex = h[i];
}
}
return maxIndex;
}//end maximumFpRemove()
/**
* Chooses the bit position that minimizes the number of false negative generated while maximizing.
* the number of false positive removed.
* @param h The different bit positions.
* @return int The position that minimizes the number of false negative generated while maximizing.
*/
private int ratioRemove(int[] h){
computeRatio();
int minIndex = Integer.MAX_VALUE;
double minValue = Double.MAX_VALUE;
for(int i = 0; i < nbHash; i++) {
if(ratio[h[i]] < minValue) {
minValue = ratio[h[i]];
minIndex = h[i];
}
}//end for - i
return minIndex;
}//end ratioRemove()
/**
* Clears a specified bit in the bit vector and keeps up-to-date the KeyList vectors.
* @param index The position of the bit to clear.
*/
private void clearBit(int index){
if(index < 0 || index >= vectorSize) {
throw new ArrayIndexOutOfBoundsException(index);
}
List<Key> kl = keyVector[index];
List<Key> fpl = fpVector[index];
// update key list
int listSize = kl.size();
for(int i = 0; i < listSize && !kl.isEmpty(); i++) {
removeKey(kl.get(0), keyVector);
}
kl.clear();
keyVector[index].clear();
//update false positive list
listSize = fpl.size();
for(int i = 0; i < listSize && !fpl.isEmpty(); i++) {
removeKey(fpl.get(0), fpVector);
}
fpl.clear();
fpVector[index].clear();
//update ratio
ratio[index] = 0.0;
//update bit vector
bits.clear(index);
}//end clearBit()
/**
* Removes a given key from <i>this</i> filer.
* @param k The key to remove.
* @param vector The counting vector associated to the key.
*/
private void removeKey(Key k, List<Key>[] vector) {
if(k == null) {
throw new NullPointerException("Key can not be null");
}
if(vector == null) {
throw new NullPointerException("ArrayList<Key>[] can not be null");
}
int[] h = hash.hash(k);
hash.clear();
for(int i = 0; i < nbHash; i++) {
vector[h[i]].remove(k);
}
}//end removeKey()
/**
* Computes the ratio A/FP.
*/
private void computeRatio() {
for(int i = 0; i < vectorSize; i++) {
double keyWeight = getWeight(keyVector[i]);
double fpWeight = getWeight(fpVector[i]);
if(keyWeight > 0 && fpWeight > 0) {
ratio[i] = keyWeight/fpWeight;
}
}//end for - i
}//end computeRatio()
private double getWeight(List<Key> keyList) {
double weight = 0.0;
for(Key k: keyList) {
weight += k.getWeight();
}
return weight;
}
/**
* Creates and initialises the various vectors.
*/
@SuppressWarnings("unchecked")
private void createVector() {
fpVector = new List[vectorSize];
keyVector = new List[vectorSize];
ratio = new double[vectorSize];
for(int i = 0; i < vectorSize; i++) {
fpVector[i] = Collections.synchronizedList(new ArrayList<Key>());
keyVector[i] = Collections.synchronizedList(new ArrayList<Key>());
ratio[i] = 0.0;
}//end for -i
}//end createVector()
// Writable
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
for(int i = 0; i < fpVector.length; i++) {
List<Key> list = fpVector[i];
out.writeInt(list.size());
for(Key k: list) {
k.write(out);
}
}
for(int i = 0; i < keyVector.length; i++) {
List<Key> list = keyVector[i];
out.writeInt(list.size());
for(Key k: list) {
k.write(out);
}
}
for(int i = 0; i < ratio.length; i++) {
out.writeDouble(ratio[i]);
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
createVector();
for(int i = 0; i < fpVector.length; i++) {
List<Key> list = fpVector[i];
int size = in.readInt();
for(int j = 0; j < size; j++) {
Key k = new Key();
k.readFields(in);
list.add(k);
}
}
for(int i = 0; i < keyVector.length; i++) {
List<Key> list = keyVector[i];
int size = in.readInt();
for(int j = 0; j < size; j++) {
Key k = new Key();
k.readFields(in);
list.add(k);
}
}
for(int i = 0; i < ratio.length; i++) {
ratio[i] = in.readDouble();
}
}
}//end class

View File

@ -0,0 +1,558 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.migration.nineteen.regionserver;
import java.io.DataInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.migration.nineteen.io.BloomFilterMapFile;
import org.apache.hadoop.hbase.migration.nineteen.io.HalfMapFileReader;
import org.apache.hadoop.hbase.migration.nineteen.io.Reference;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
/**
* A HStore data file. HStores usually have one or more of these files. They
* are produced by flushing the memcache to disk.
*
* <p>Each HStore maintains a bunch of different data files. The filename is a
* mix of the parent dir, the region name, the column name, and a file
* identifier. The name may also be a reference to a store file located
* elsewhere. This class handles all that path-building stuff for you.
*
* <p>An HStoreFile usually tracks 4 things: its parent dir, the region
* identifier, the column family, and the file identifier. If you know those
* four things, you know how to obtain the right HStoreFile. HStoreFiles may
* also reference store files in another region serving either from
* the top-half of the remote file or from the bottom-half. Such references
* are made fast splitting regions.
*
* <p>Plain HStoreFiles are named for a randomly generated id as in:
* <code>1278437856009925445</code> A file by this name is made in both the
* <code>mapfiles</code> and <code>info</code> subdirectories of a
* HStore columnfamily directoy: E.g. If the column family is 'anchor:', then
* under the region directory there is a subdirectory named 'anchor' within
* which is a 'mapfiles' and 'info' subdirectory. In each will be found a
* file named something like <code>1278437856009925445</code>, one to hold the
* data in 'mapfiles' and one under 'info' that holds the sequence id for this
* store file.
*
* <p>References to store files located over in some other region look like
* this:
* <code>1278437856009925445.hbaserepository,qAReLZD-OyQORZWq_vqR1k==,959247014679548184</code>:
* i.e. an id followed by the name of the referenced region. The data
* ('mapfiles') of HStoreFile references are empty. The accompanying
* <code>info</code> file contains the
* midkey, the id of the remote store we're referencing and whether we're
* to serve the top or bottom region of the remote store file. Note, a region
* is not splitable if it has instances of store file references (References
* are cleaned up by compactions).
*
* <p>When merging or splitting HRegions, we might want to modify one of the
* params for an HStoreFile (effectively moving it elsewhere).
*/
public class HStoreFile implements HConstants {
static final Log LOG = LogFactory.getLog(HStoreFile.class.getName());
static final byte INFO_SEQ_NUM = 0;
static final byte MAJOR_COMPACTION = INFO_SEQ_NUM + 1;
static final String HSTORE_DATFILE_DIR = "mapfiles";
static final String HSTORE_INFO_DIR = "info";
static final String HSTORE_FILTER_DIR = "filter";
private final static Random rand = new Random();
private final Path basedir;
private final int encodedRegionName;
private final byte [] colFamily;
private final long fileId;
private final HBaseConfiguration conf;
private final FileSystem fs;
private final Reference reference;
private final HRegionInfo hri;
/* If true, this file was product of a major compaction.
*/
private boolean majorCompaction = false;
private long indexLength;
/**
* Constructor that fully initializes the object
* @param conf Configuration object
* @param basedir qualified path that is parent of region directory
* @param colFamily name of the column family
* @param fileId file identifier
* @param ref Reference to another HStoreFile.
* @param hri The region info for this file (HACK HBASE-868). TODO: Fix.
* @throws IOException
*/
HStoreFile(HBaseConfiguration conf, FileSystem fs, Path basedir,
final HRegionInfo hri, byte [] colFamily, long fileId,
final Reference ref)
throws IOException {
this(conf, fs, basedir, hri, colFamily, fileId, ref, false);
}
/**
* Constructor that fully initializes the object
* @param conf Configuration object
* @param basedir qualified path that is parent of region directory
* @param colFamily name of the column family
* @param fileId file identifier
* @param ref Reference to another HStoreFile.
* @param hri The region info for this file (HACK HBASE-868). TODO: Fix.
* @param mc Try if this file was result of a major compression.
* @throws IOException
*/
HStoreFile(HBaseConfiguration conf, FileSystem fs, Path basedir,
final HRegionInfo hri, byte [] colFamily, long fileId,
final Reference ref, final boolean mc)
throws IOException {
this.conf = conf;
this.fs = fs;
this.basedir = basedir;
this.encodedRegionName = hri.getEncodedName();
this.colFamily = colFamily;
this.hri = hri;
long id = fileId;
if (id == -1) {
Path mapdir = HStoreFile.getMapDir(basedir, encodedRegionName, colFamily);
Path testpath = null;
do {
id = Math.abs(rand.nextLong());
testpath = new Path(mapdir, createHStoreFilename(id, -1));
} while(fs.exists(testpath));
}
this.fileId = id;
// If a reference, construction does not write the pointer files. Thats
// done by invocations of writeReferenceFiles(hsf, fs). Happens at split.
this.reference = ref;
this.majorCompaction = mc;
}
/** @return the region name */
boolean isReference() {
return reference != null;
}
Reference getReference() {
return reference;
}
int getEncodedRegionName() {
return this.encodedRegionName;
}
/** @return the column family */
byte [] getColFamily() {
return colFamily;
}
/** @return the file identifier */
long getFileId() {
return fileId;
}
// Build full filenames from those components
/** @return path for MapFile */
Path getMapFilePath() {
if (isReference()) {
return getMapFilePath(encodedRegionName, fileId,
reference.getEncodedRegionName());
}
return getMapFilePath(this.encodedRegionName, fileId);
}
private Path getMapFilePath(final Reference r) {
if (r == null) {
return getMapFilePath();
}
return getMapFilePath(r.getEncodedRegionName(), r.getFileId());
}
private Path getMapFilePath(final int encodedName, final long fid) {
return getMapFilePath(encodedName, fid, HRegionInfo.NO_HASH);
}
private Path getMapFilePath(final int encodedName, final long fid,
final int ern) {
return new Path(HStoreFile.getMapDir(basedir, encodedName, colFamily),
createHStoreFilename(fid, ern));
}
/** @return path for info file */
Path getInfoFilePath() {
if (isReference()) {
return getInfoFilePath(encodedRegionName, fileId,
reference.getEncodedRegionName());
}
return getInfoFilePath(encodedRegionName, fileId);
}
private Path getInfoFilePath(final int encodedName, final long fid) {
return getInfoFilePath(encodedName, fid, HRegionInfo.NO_HASH);
}
private Path getInfoFilePath(final int encodedName, final long fid,
final int ern) {
return new Path(HStoreFile.getInfoDir(basedir, encodedName, colFamily),
createHStoreFilename(fid, ern));
}
// File handling
/*
* Split by making two new store files that reference top and bottom regions
* of original store file.
* @param midKey
* @param dstA
* @param dstB
* @param fs
* @param c
* @throws IOException
*
* @param midKey the key which will be the starting key of the second region
* @param dstA the file which will contain keys from the start of the source
* @param dstB the file which will contain keys from midKey to end of source
* @param fs file system
* @param c configuration
* @throws IOException
*/
void splitStoreFile(final HStoreFile dstA, final HStoreFile dstB,
final FileSystem fs)
throws IOException {
dstA.writeReferenceFiles(fs);
dstB.writeReferenceFiles(fs);
}
void writeReferenceFiles(final FileSystem fs)
throws IOException {
createOrFail(fs, getMapFilePath());
writeSplitInfo(fs);
}
/*
* If reference, create and write the remote store file id, the midkey and
* whether we're going against the top file region of the referent out to
* the info file.
* @param p Path to info file.
* @param hsf
* @param fs
* @throws IOException
*/
private void writeSplitInfo(final FileSystem fs) throws IOException {
Path p = getInfoFilePath();
if (fs.exists(p)) {
throw new IOException("File already exists " + p.toString());
}
FSDataOutputStream out = fs.create(p);
try {
reference.write(out);
} finally {
out.close();
}
}
/**
* @see #writeSplitInfo(FileSystem fs)
*/
static Reference readSplitInfo(final Path p, final FileSystem fs)
throws IOException {
FSDataInputStream in = fs.open(p);
try {
Reference r = new Reference();
r.readFields(in);
return r;
} finally {
in.close();
}
}
private void createOrFail(final FileSystem fs, final Path p)
throws IOException {
if (fs.exists(p)) {
throw new IOException("File already exists " + p.toString());
}
if (!fs.createNewFile(p)) {
throw new IOException("Failed create of " + p);
}
}
/**
* Reads in an info file
*
* @param filesystem file system
* @return The sequence id contained in the info file
* @throws IOException
*/
long loadInfo(final FileSystem filesystem) throws IOException {
Path p = null;
if (isReference()) {
p = getInfoFilePath(reference.getEncodedRegionName(),
this.reference.getFileId());
} else {
p = getInfoFilePath();
}
long length = filesystem.getFileStatus(p).getLen();
boolean hasMoreThanSeqNum = length > (Byte.SIZE + Bytes.SIZEOF_LONG);
DataInputStream in = new DataInputStream(filesystem.open(p));
try {
byte flag = in.readByte();
if (flag == INFO_SEQ_NUM) {
if (hasMoreThanSeqNum) {
flag = in.readByte();
if (flag == MAJOR_COMPACTION) {
this.majorCompaction = in.readBoolean();
}
}
return in.readLong();
}
throw new IOException("Cannot process log file: " + p);
} finally {
in.close();
}
}
/**
* Writes the file-identifier to disk
*
* @param filesystem file system
* @param infonum file id
* @throws IOException
*/
void writeInfo(final FileSystem filesystem, final long infonum)
throws IOException {
writeInfo(filesystem, infonum, false);
}
/**
* Writes the file-identifier to disk
*
* @param filesystem file system
* @param infonum file id
* @param mc True if this file is product of a major compaction
* @throws IOException
*/
void writeInfo(final FileSystem filesystem, final long infonum,
final boolean mc)
throws IOException {
Path p = getInfoFilePath();
FSDataOutputStream out = filesystem.create(p);
try {
out.writeByte(INFO_SEQ_NUM);
out.writeLong(infonum);
if (mc) {
// Set whether major compaction flag on this file.
this.majorCompaction = mc;
out.writeByte(MAJOR_COMPACTION);
out.writeBoolean(mc);
}
} finally {
out.close();
}
}
/**
* Delete store map files.
* @throws IOException
*/
public void delete() throws IOException {
fs.delete(getMapFilePath(), true);
fs.delete(getInfoFilePath(), true);
}
/**
* Renames the mapfiles and info directories under the passed
* <code>hsf</code> directory.
* @param fs
* @param hsf
* @return True if succeeded.
* @throws IOException
*/
public boolean rename(final FileSystem fs, final HStoreFile hsf)
throws IOException {
Path src = getMapFilePath();
if (!fs.exists(src)) {
throw new FileNotFoundException(src.toString());
}
boolean success = fs.rename(src, hsf.getMapFilePath());
if (!success) {
LOG.warn("Failed rename of " + src + " to " + hsf.getMapFilePath());
} else {
src = getInfoFilePath();
if (!fs.exists(src)) {
throw new FileNotFoundException(src.toString());
}
success = fs.rename(src, hsf.getInfoFilePath());
if (!success) {
LOG.warn("Failed rename of " + src + " to " + hsf.getInfoFilePath());
}
}
return success;
}
/**
* Get reader for the store file map file.
* Client is responsible for closing file when done.
* @param fs
* @param bloomFilter If true, a bloom filter exists
* @param blockCacheEnabled If true, MapFile blocks should be cached.
* @return BloomFilterMapFile.Reader
* @throws IOException
*/
public synchronized BloomFilterMapFile.Reader getReader(final FileSystem fs,
final boolean bloomFilter, final boolean blockCacheEnabled)
throws IOException {
if (isReference()) {
return new HalfMapFileReader(fs,
getMapFilePath(reference).toString(), conf,
reference.getFileRegion(), reference.getMidkey(), bloomFilter,
blockCacheEnabled, this.hri);
}
return new BloomFilterMapFile.Reader(fs, getMapFilePath().toString(),
conf, bloomFilter, blockCacheEnabled, this.hri);
}
/**
* Get a store file writer.
* Client is responsible for closing file when done.
* @param fs
* @param compression Pass <code>SequenceFile.CompressionType.NONE</code>
* for none.
* @param bloomFilter If true, create a bloom filter
* @param nrows number of rows expected. Required if bloomFilter is true.
* @return MapFile.Writer
* @throws IOException
*/
public MapFile.Writer getWriter(final FileSystem fs,
final SequenceFile.CompressionType compression,
final boolean bloomFilter, int nrows)
throws IOException {
if (isReference()) {
throw new IOException("Illegal Access: Cannot get a writer on a" +
"HStoreFile reference");
}
return new BloomFilterMapFile.Writer(conf, fs,
getMapFilePath().toString(), compression, bloomFilter, nrows, this.hri);
}
/**
* @return Length of the store map file. If a reference, size is
* approximation.
* @throws IOException
*/
public long length() throws IOException {
Path p = new Path(getMapFilePath(reference), MapFile.DATA_FILE_NAME);
long l = p.getFileSystem(conf).getFileStatus(p).getLen();
return (isReference())? l / 2: l;
}
/**
* @return Length of the store map file index.
* @throws IOException
*/
public synchronized long indexLength() throws IOException {
if (indexLength == 0) {
Path p = new Path(getMapFilePath(reference), MapFile.INDEX_FILE_NAME);
indexLength = p.getFileSystem(conf).getFileStatus(p).getLen();
}
return indexLength;
}
@Override
public String toString() {
return encodedRegionName + "/" + Bytes.toString(colFamily) + "/" + fileId +
(isReference()? "-" + reference.toString(): "");
}
/**
* @return True if this file was made by a major compaction.
*/
public boolean isMajorCompaction() {
return this.majorCompaction;
}
private static String createHStoreFilename(final long fid,
final int encodedRegionName) {
return Long.toString(fid) +
((encodedRegionName != HRegionInfo.NO_HASH)?
"." + encodedRegionName : "");
}
/**
* @param dir Base directory
* @param encodedRegionName Encoding of region name.
* @param f Column family.
* @return path for map file directory
*/
public static Path getMapDir(Path dir, int encodedRegionName,
final byte [] f) {
return getFamilySubDir(dir, encodedRegionName, f, HSTORE_DATFILE_DIR);
}
/**
* @param dir Base directory
* @param encodedRegionName Encoding of region name.
* @param f Column family.
* @return the info directory path
*/
public static Path getInfoDir(Path dir, int encodedRegionName, byte [] f) {
return getFamilySubDir(dir, encodedRegionName, f, HSTORE_INFO_DIR);
}
/**
* @param dir Base directory
* @param encodedRegionName Encoding of region name.
* @param f Column family.
* @return the bloom filter directory path
*/
@Deprecated
public static Path getFilterDir(Path dir, int encodedRegionName,
final byte [] f) {
return getFamilySubDir(dir, encodedRegionName, f, HSTORE_FILTER_DIR);
}
/*
* @param base Base directory
* @param encodedRegionName Encoding of region name.
* @param f Column family.
* @param subdir Subdirectory to create under column family/store directory.
* @return
*/
private static Path getFamilySubDir(final Path base,
final int encodedRegionName, final byte [] f, final String subdir) {
return new Path(base, new Path(Integer.toString(encodedRegionName),
new Path(Bytes.toString(f), subdir)));
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright 2009The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
Provides classes from old hbase versions used migrating data.
Nineteen package has classes from hbase 0.19.
*/
package org.apache.hadoop.hbase.migration.nineteen;

View File

@ -1241,12 +1241,12 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
Integer lid = getLock(lockid, row);
byte [] now = Bytes.toBytes(System.currentTimeMillis());
try {
for(Map.Entry<byte[], List<KeyValue>> entry :
put.getFamilyMap().entrySet()) {
for (Map.Entry<byte[], List<KeyValue>> entry :
put.getFamilyMap().entrySet()) {
byte [] family = entry.getKey();
checkFamily(family);
List<KeyValue> puts = entry.getValue();
if(updateKeys(puts, now)) {
if (updateKeys(puts, now)) {
put(family, puts, writeToWAL);
}
}

View File

@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;

View File

@ -281,22 +281,22 @@ public class FSUtils {
final Path hbaseRootDir)
throws IOException {
// Presumes any directory under hbase.rootdir is a table.
FileStatus [] directories = fs.listStatus(hbaseRootDir, new DirFilter(fs));
for (int i = 0; i < directories.length; i++) {
FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, new DirFilter(fs));
for (int i = 0; i < tableDirs.length; i++) {
// Skip the .log directory. All others should be tables. Inside a table,
// there are compaction.dir directories to skip. Otherwise, all else
// should be regions. Then in each region, should only be family
// directories. Under each of these, should be one file only.
Path d = directories[i].getPath();
Path d = tableDirs[i].getPath();
if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) continue;
FileStatus [] tablesubdirectories = fs.listStatus(d, new DirFilter(fs));
for (int j = 0; j < tablesubdirectories.length; j++) {
Path dd = tablesubdirectories[j].getPath();
FileStatus [] regionDirs = fs.listStatus(d, new DirFilter(fs));
for (int j = 0; j < regionDirs.length; j++) {
Path dd = regionDirs[j].getPath();
if (dd.equals(HConstants.HREGION_COMPACTIONDIR_NAME)) continue;
// Else its a region name. Now look in region for families.
FileStatus [] familydirectories = fs.listStatus(dd, new DirFilter(fs));
for (int k = 0; k < familydirectories.length; k++) {
Path family = familydirectories[k].getPath();
FileStatus [] familyDirs = fs.listStatus(dd, new DirFilter(fs));
for (int k = 0; k < familyDirs.length; k++) {
Path family = familyDirs[k].getPath();
// Now in family make sure only one file.
FileStatus [] familyStatus = fs.listStatus(family);
if (familyStatus.length > 1) {
@ -320,8 +320,8 @@ public class FSUtils {
public static boolean isPre020FileLayout(final FileSystem fs,
final Path hbaseRootDir)
throws IOException {
Path mapfiles = new Path(new Path(new Path(hbaseRootDir, "-ROOT-"),
"70236052"), "mapfiles");
Path mapfiles = new Path(new Path(new Path(new Path(hbaseRootDir, "-ROOT-"),
"70236052"), "info"), "mapfiles");
return fs.exists(mapfiles);
}
@ -340,22 +340,38 @@ public class FSUtils {
final Path hbaseRootDir)
throws IOException {
// Presumes any directory under hbase.rootdir is a table.
FileStatus [] directories = fs.listStatus(hbaseRootDir, new DirFilter(fs));
for (int i = 0; i < directories.length; i++) {
FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, new DirFilter(fs));
for (int i = 0; i < tableDirs.length; i++) {
// Inside a table, there are compaction.dir directories to skip.
// Otherwise, all else should be regions. Then in each region, should
// only be family directories. Under each of these, should be a mapfile
// and info directory and in these only one file.
Path d = directories[i].getPath();
Path d = tableDirs[i].getPath();
if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) continue;
FileStatus [] tablesubdirectories = fs.listStatus(d, new DirFilter(fs));
for (int j = 0; j < tablesubdirectories.length; j++) {
Path dd = tablesubdirectories[j].getPath();
FileStatus [] regionDirs = fs.listStatus(d, new DirFilter(fs));
for (int j = 0; j < regionDirs.length; j++) {
Path dd = regionDirs[j].getPath();
if (dd.equals(HConstants.HREGION_COMPACTIONDIR_NAME)) continue;
// Else its a region name. Now look in region for families.
FileStatus [] familydirectories = fs.listStatus(dd, new DirFilter(fs));
for (int k = 0; k < familydirectories.length; k++) {
Path family = familydirectories[k].getPath();
FileStatus [] familyDirs = fs.listStatus(dd, new DirFilter(fs));
for (int k = 0; k < familyDirs.length; k++) {
Path family = familyDirs[k].getPath();
FileStatus [] infoAndMapfile = fs.listStatus(family);
// Assert that only info and mapfile in family dir.
if (infoAndMapfile.length != 0 && infoAndMapfile.length != 2) {
LOG.debug(family.toString() +
" has more than just info and mapfile: " + infoAndMapfile.length);
return false;
}
// Make sure directory named info or mapfile.
for (int ll = 0; ll < 2; ll++) {
if (infoAndMapfile[ll].getPath().getName().equals("info") ||
infoAndMapfile[ll].getPath().getName().equals("mapfiles"))
continue;
LOG.debug("Unexpected directory name: " +
infoAndMapfile[ll].getPath());
return false;
}
// Now in family, there are 'mapfile' and 'info' subdirs. Just
// look in the 'mapfile' subdir.
FileStatus [] familyStatus =

View File

@ -20,12 +20,14 @@
package org.apache.hadoop.hbase.util;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -36,6 +38,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.FSUtils.DirFilter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@ -224,7 +227,7 @@ public class Migrate extends Configured implements Tool {
System.out.println(msg);
throw new IOException(msg);
}
rewrite(fs, hbaseRootDir);
rewrite(hbaseRootDir);
}
LOG.info("Checking filesystem is major compacted");
// Below check is good for both making sure that we are major compacted
@ -285,10 +288,57 @@ set to control the master's address (not mandatory).
/*
* Rewrite all under hbase root dir.
* Presumes that {@link FSUtils#isMajorCompactedPre020(FileSystem, Path)}
* has been run before this method is called.
* @param fs
* @param hbaseRootDir
* @throws IOException
*/
private void rewrite(final FileSystem fs, final Path hbaseRootDir) {
private void rewrite(final Path hbaseRootDir) throws IOException {
FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, new DirFilter(fs));
for (int i = 0; i < tableDirs.length; i++) {
// Inside a table, there are compaction.dir directories to skip.
// Otherwise, all else should be regions. Then in each region, should
// only be family directories. Under each of these, should be a mapfile
// and info directory and in these only one file.
Path d = tableDirs[i].getPath();
if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) continue;
FileStatus [] regionDirs = fs.listStatus(d, new DirFilter(fs));
for (int j = 0; j < regionDirs.length; j++) {
Path dd = regionDirs[j].getPath();
if (dd.equals(HConstants.HREGION_COMPACTIONDIR_NAME)) continue;
// Else its a region name. Now look in region for families.
FileStatus [] familyDirs = fs.listStatus(dd, new DirFilter(fs));
for (int k = 0; k < familyDirs.length; k++) {
Path family = familyDirs[k].getPath();
Path mfdir = new Path(family, "mapfiles");
FileStatus [] mfs = fs.listStatus(mfdir);
if (mfs.length > 1) {
throw new IOException("Should only be one directory in: " + mfdir);
}
Path mf = mfs[0].getPath();
Path infofile = new Path(new Path(family, "info"), mf.getName());
rewrite(this.fs, mf, infofile);
}
}
}
}
/**
* Rewrite the passed mapfile
* @param mapfiledir
* @param infofile
* @throws IOExcepion
*/
public static void rewrite (final FileSystem fs, final Path mapfiledir,
final Path infofile)
throws IOException {
if (!fs.exists(mapfiledir)) {
throw new FileNotFoundException(mapfiledir.toString());
}
if (!fs.exists(infofile)) {
throw new FileNotFoundException(infofile.toString());
}
}

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.MapFile;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;

View File

@ -58,14 +58,14 @@ public class MigrationTest extends HBaseTestCase {
*/
public void testMigration() throws IOException {
Path rootdir = getUnitTestdir(getName());
FileSystem fs = FileSystem.get(this.conf);
Path hbasedir = loadTestData(fs, rootdir);
assertTrue(fs.exists(hbasedir));
Migrate migrator = new Migrate(this.conf);
Path qualified = fs.makeQualified(hbasedir);
String uri = qualified.toString();
this.conf.set("hbase.rootdir", uri);
migrator.run(new String [] {"upgrade"});
int result = migrator.run(new String [] {"upgrade"});
assertEquals(0, result);
}
/*