HBASE-4608 HLog Compression
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1301165 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5055d4cfff
commit
273804e658
|
@ -644,6 +644,10 @@ public final class HConstants {
|
|||
/** File permission umask to use when creating hbase data files */
|
||||
public static final String DATA_FILE_UMASK_KEY = "hbase.data.umask";
|
||||
|
||||
/** Configuration name of HLog Compression */
|
||||
public static final String ENABLE_WAL_COMPRESSION =
|
||||
"hbase.regionserver.wal.enablecompression";
|
||||
|
||||
private HConstants() {
|
||||
// Can't be instantiated with this ctor.
|
||||
}
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Context that holds the various dictionaries for compression in HLog.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class CompressionContext {
|
||||
final Dictionary regionDict;
|
||||
final Dictionary tableDict;
|
||||
final Dictionary familyDict;
|
||||
final Dictionary qualifierDict;
|
||||
final Dictionary rowDict;
|
||||
|
||||
public CompressionContext(Class<? extends Dictionary> dictType)
|
||||
throws SecurityException, NoSuchMethodException, InstantiationException,
|
||||
IllegalAccessException, InvocationTargetException {
|
||||
Constructor<? extends Dictionary> dictConstructor =
|
||||
dictType.getConstructor();
|
||||
regionDict = dictConstructor.newInstance();
|
||||
tableDict = dictConstructor.newInstance();
|
||||
familyDict = dictConstructor.newInstance();
|
||||
qualifierDict = dictConstructor.newInstance();
|
||||
rowDict = dictConstructor.newInstance();
|
||||
}
|
||||
|
||||
void clear() {
|
||||
regionDict.clear();
|
||||
tableDict.clear();
|
||||
familyDict.clear();
|
||||
qualifierDict.clear();
|
||||
rowDict.clear();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,188 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* A set of static functions for running our custom WAL compression/decompression.
|
||||
* Also contains a command line tool to compress and uncompress HLogs.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class Compressor {
|
||||
/**
|
||||
* Command line tool to compress and uncompress WALs.
|
||||
*/
|
||||
public static void main(String[] args) throws IOException {
|
||||
if (args.length != 2 || args[0].equals("--help") || args[0].equals("-h")) {
|
||||
printHelp();
|
||||
System.exit(-1);
|
||||
}
|
||||
|
||||
Path inputPath = new Path(args[0]);
|
||||
Path outputPath = new Path(args[1]);
|
||||
|
||||
transformFile(inputPath, outputPath);
|
||||
}
|
||||
|
||||
private static void printHelp() {
|
||||
System.err.println("usage: Compressor <input> <output>");
|
||||
System.err.println("If <input> HLog is compressed, <output> will be decompressed.");
|
||||
System.err.println("If <input> HLog is uncompressed, <output> will be compressed.");
|
||||
return;
|
||||
}
|
||||
|
||||
private static void transformFile(Path input, Path output)
|
||||
throws IOException {
|
||||
SequenceFileLogReader in = new SequenceFileLogReader();
|
||||
SequenceFileLogWriter out = new SequenceFileLogWriter();
|
||||
|
||||
try {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
|
||||
FileSystem inFS = input.getFileSystem(conf);
|
||||
FileSystem outFS = output.getFileSystem(conf);
|
||||
|
||||
in.init(inFS, input, conf);
|
||||
boolean compress = in.reader.isWALCompressionEnabled();
|
||||
|
||||
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
|
||||
out.init(outFS, output, conf);
|
||||
|
||||
Entry e = null;
|
||||
while ((e = in.next()) != null) out.append(e);
|
||||
} finally {
|
||||
in.close();
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the next compressed entry and returns it as a byte array
|
||||
*
|
||||
* @param in the DataInput to read from
|
||||
* @param dict the dictionary we use for our read.
|
||||
*
|
||||
* @param the uncompressed array.
|
||||
*/
|
||||
static byte[] readCompressed(DataInput in, Dictionary dict)
|
||||
throws IOException {
|
||||
byte status = in.readByte();
|
||||
|
||||
if (status == Dictionary.NOT_IN_DICTIONARY) {
|
||||
int length = WritableUtils.readVInt(in);
|
||||
// if this isn't in the dictionary, we need to add to the dictionary.
|
||||
byte[] arr = new byte[length];
|
||||
in.readFully(arr);
|
||||
if (dict != null) dict.addEntry(arr, 0, length);
|
||||
return arr;
|
||||
} else {
|
||||
// Status here is the higher-order byte of index of the dictionary entry
|
||||
// (when its not Dictionary.NOT_IN_DICTIONARY -- dictionary indices are
|
||||
// shorts).
|
||||
short dictIdx = toShort(status, in.readByte());
|
||||
byte[] entry = dict.getEntry(dictIdx);
|
||||
if (entry == null) {
|
||||
throw new IOException("Missing dictionary entry for index "
|
||||
+ dictIdx);
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a compressed entry into an array.
|
||||
* The output into the array ends up length-prefixed.
|
||||
*
|
||||
* @param to the array to write into
|
||||
* @param offset array offset to start writing to
|
||||
* @param in the DataInput to read from
|
||||
* @param dict the dictionary to use for compression
|
||||
*
|
||||
* @return the length of the uncompressed data
|
||||
*/
|
||||
static int uncompressIntoArray(byte[] to, int offset, DataInput in,
|
||||
Dictionary dict) throws IOException {
|
||||
byte status = in.readByte();
|
||||
|
||||
if (status == Dictionary.NOT_IN_DICTIONARY) {
|
||||
// status byte indicating that data to be read is not in dictionary.
|
||||
// if this isn't in the dictionary, we need to add to the dictionary.
|
||||
int length = WritableUtils.readVInt(in);
|
||||
in.readFully(to, offset, length);
|
||||
dict.addEntry(to, offset, length);
|
||||
return length;
|
||||
} else {
|
||||
// the status byte also acts as the higher order byte of the dictionary
|
||||
// entry
|
||||
short dictIdx = toShort(status, in.readByte());
|
||||
byte[] entry = dict.getEntry(dictIdx);
|
||||
if (entry == null) {
|
||||
throw new IOException("Missing dictionary entry for index "
|
||||
+ dictIdx);
|
||||
}
|
||||
// now we write the uncompressed value.
|
||||
Bytes.putBytes(to, offset, entry, 0, entry.length);
|
||||
return entry.length;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compresses and writes an array to a DataOutput
|
||||
*
|
||||
* @param data the array to write.
|
||||
* @param out the DataOutput to write into
|
||||
* @param dict the dictionary to use for compression
|
||||
*/
|
||||
static void writeCompressed(byte[] data, int offset, int length,
|
||||
DataOutput out, Dictionary dict)
|
||||
throws IOException {
|
||||
short dictIdx = Dictionary.NOT_IN_DICTIONARY;
|
||||
if (dict != null) {
|
||||
dictIdx = dict.findEntry(data, offset, length);
|
||||
}
|
||||
if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
|
||||
// not in dict
|
||||
out.writeByte(Dictionary.NOT_IN_DICTIONARY);
|
||||
WritableUtils.writeVInt(out, length);
|
||||
out.write(data, offset, length);
|
||||
} else {
|
||||
out.writeShort(dictIdx);
|
||||
}
|
||||
}
|
||||
|
||||
static short toShort(byte hi, byte lo) {
|
||||
short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
|
||||
Preconditions.checkArgument(s >= 0);
|
||||
return s;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Dictionary interface
|
||||
*
|
||||
* Dictionary indexes should be either bytes or shorts, only positive. (The
|
||||
* first bit is reserved for detecting whether something is compressed or not).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
interface Dictionary {
|
||||
static final byte NOT_IN_DICTIONARY = -1;
|
||||
|
||||
/**
|
||||
* Gets an entry from the dictionary.
|
||||
*
|
||||
* @param idx index of the entry
|
||||
* @return the entry, or null if non existent
|
||||
*/
|
||||
public byte[] getEntry(short idx);
|
||||
|
||||
/**
|
||||
* Finds the index of an entry.
|
||||
* If no entry found, we add it.
|
||||
*
|
||||
* @param data the byte array that we're looking up
|
||||
* @param offset Offset into <code>data</code> to add to Dictionary.
|
||||
* @param length Length beyond <code>offset</code> that comprises entry; must be > 0.
|
||||
* @return the index of the entry, or {@link #NOT_IN_DICTIONARY} if not found
|
||||
*/
|
||||
public short findEntry(byte[] data, int offset, int length);
|
||||
|
||||
/**
|
||||
* Adds an entry to the dictionary.
|
||||
* Be careful using this method. It will add an entry to the
|
||||
* dictionary even if it already has an entry for the same data.
|
||||
* Call {{@link #findEntry(byte[], int, int)}} to add without duplicating
|
||||
* dictionary entries.
|
||||
*
|
||||
* @param data the entry to add
|
||||
* @param offset Offset into <code>data</code> to add to Dictionary.
|
||||
* @param length Length beyond <code>offset</code> that comprises entry; must be > 0.
|
||||
* @return the index of the entry
|
||||
*/
|
||||
|
||||
public short addEntry(byte[] data, int offset, int length);
|
||||
|
||||
/**
|
||||
* Flushes the dictionary, empties all values.
|
||||
*/
|
||||
public void clear();
|
||||
}
|
|
@ -1671,6 +1671,15 @@ public class HLog implements Syncable {
|
|||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set compression context for this entry.
|
||||
* @param compressionContext Compression context
|
||||
*/
|
||||
public void setCompressionContext(CompressionContext compressionContext) {
|
||||
edit.setCompressionContext(compressionContext);
|
||||
key.setCompressionContext(compressionContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.key + "=" + this.edit;
|
||||
|
|
|
@ -46,7 +46,40 @@ import org.apache.hadoop.io.WritableUtils;
|
|||
@InterfaceAudience.Private
|
||||
public class HLogKey implements WritableComparable<HLogKey> {
|
||||
// should be < 0 (@see #readFields(DataInput))
|
||||
private static final int VERSION = -1;
|
||||
// version 2 supports HLog compression
|
||||
enum Version {
|
||||
UNVERSIONED(0),
|
||||
// Initial number we put on HLogKey when we introduced versioning.
|
||||
INITIAL(-1),
|
||||
// Version -2 introduced a dictionary compression facility. Only this
|
||||
// dictionary-based compression is available in version -2.
|
||||
COMPRESSED(-2);
|
||||
|
||||
final int code;
|
||||
static final Version[] byCode;
|
||||
static {
|
||||
byCode = Version.values();
|
||||
for (int i = 0; i < byCode.length; i++) {
|
||||
if (byCode[i].code != -1 * i) {
|
||||
throw new AssertionError("Values in this enum should be descending by one");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Version(int code) {
|
||||
this.code = code;
|
||||
}
|
||||
|
||||
boolean atLeast(Version other) {
|
||||
return code <= other.code;
|
||||
}
|
||||
|
||||
static Version fromCode(int code) {
|
||||
return byCode[code * -1];
|
||||
}
|
||||
}
|
||||
|
||||
private static final Version VERSION = Version.COMPRESSED;
|
||||
|
||||
// The encoded region name.
|
||||
private byte [] encodedRegionName;
|
||||
|
@ -57,7 +90,9 @@ public class HLogKey implements WritableComparable<HLogKey> {
|
|||
|
||||
private UUID clusterId;
|
||||
|
||||
/** Writable Consructor -- Do not use. */
|
||||
private CompressionContext compressionContext;
|
||||
|
||||
/** Writable Constructor -- Do not use. */
|
||||
public HLogKey() {
|
||||
this(null, null, 0L, HConstants.LATEST_TIMESTAMP,
|
||||
HConstants.DEFAULT_CLUSTER_ID);
|
||||
|
@ -84,6 +119,13 @@ public class HLogKey implements WritableComparable<HLogKey> {
|
|||
this.clusterId = clusterId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param compressionContext Compression context to use
|
||||
*/
|
||||
public void setCompressionContext(CompressionContext compressionContext) {
|
||||
this.compressionContext = compressionContext;
|
||||
}
|
||||
|
||||
/** @return encoded region name */
|
||||
public byte [] getEncodedRegionName() {
|
||||
return encodedRegionName;
|
||||
|
@ -215,9 +257,17 @@ public class HLogKey implements WritableComparable<HLogKey> {
|
|||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
WritableUtils.writeVInt(out, VERSION);
|
||||
Bytes.writeByteArray(out, this.encodedRegionName);
|
||||
Bytes.writeByteArray(out, this.tablename);
|
||||
WritableUtils.writeVInt(out, VERSION.code);
|
||||
if (compressionContext == null) {
|
||||
Bytes.writeByteArray(out, this.encodedRegionName);
|
||||
Bytes.writeByteArray(out, this.tablename);
|
||||
} else {
|
||||
Compressor.writeCompressed(this.encodedRegionName, 0,
|
||||
this.encodedRegionName.length, out,
|
||||
compressionContext.regionDict);
|
||||
Compressor.writeCompressed(this.tablename, 0, this.tablename.length, out,
|
||||
compressionContext.tableDict);
|
||||
}
|
||||
out.writeLong(this.logSeqNum);
|
||||
out.writeLong(this.writeTime);
|
||||
// avoid storing 16 bytes when replication is not enabled
|
||||
|
@ -232,7 +282,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
|
|||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
int version = 0;
|
||||
Version version = Version.UNVERSIONED;
|
||||
// HLogKey was not versioned in the beginning.
|
||||
// In order to introduce it now, we make use of the fact
|
||||
// that encodedRegionName was written with Bytes.writeByteArray,
|
||||
|
@ -244,16 +294,26 @@ public class HLogKey implements WritableComparable<HLogKey> {
|
|||
int len = WritableUtils.readVInt(in);
|
||||
if (len < 0) {
|
||||
// what we just read was the version
|
||||
version = len;
|
||||
len = WritableUtils.readVInt(in);
|
||||
version = Version.fromCode(len);
|
||||
// We only compress V2 of HLogkey.
|
||||
// If compression is on, the length is handled by the dictionary
|
||||
if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) {
|
||||
len = WritableUtils.readVInt(in);
|
||||
}
|
||||
}
|
||||
this.encodedRegionName = new byte[len];
|
||||
in.readFully(this.encodedRegionName);
|
||||
this.tablename = Bytes.readByteArray(in);
|
||||
if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) {
|
||||
this.encodedRegionName = new byte[len];
|
||||
in.readFully(this.encodedRegionName);
|
||||
this.tablename = Bytes.readByteArray(in);
|
||||
} else {
|
||||
this.encodedRegionName = Compressor.readCompressed(in, compressionContext.regionDict);
|
||||
this.tablename = Compressor.readCompressed(in, compressionContext.tableDict);
|
||||
}
|
||||
|
||||
this.logSeqNum = in.readLong();
|
||||
this.writeTime = in.readLong();
|
||||
this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
|
||||
if (version < 0) {
|
||||
if (version.atLeast(Version.INITIAL)) {
|
||||
if (in.readBoolean()) {
|
||||
this.clusterId = new UUID(in.readLong(), in.readLong());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,126 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
||||
/**
|
||||
* Compression class for {@link KeyValue}s written to the WAL. This is not
|
||||
* synchronized, so synchronization should be handled outside.
|
||||
*
|
||||
* Class only compresses and uncompresses row keys, family names, and the
|
||||
* qualifier. More may be added depending on use patterns.
|
||||
*/
|
||||
class KeyValueCompression {
|
||||
/**
|
||||
* Uncompresses a KeyValue from a DataInput and returns it.
|
||||
*
|
||||
* @param in the DataInput
|
||||
* @param readContext the compressionContext to use.
|
||||
* @return an uncompressed KeyValue
|
||||
* @throws IOException
|
||||
*/
|
||||
|
||||
public static KeyValue readKV(DataInput in, CompressionContext readContext)
|
||||
throws IOException {
|
||||
int keylength = WritableUtils.readVInt(in);
|
||||
int vlength = WritableUtils.readVInt(in);
|
||||
int length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength;
|
||||
|
||||
byte[] backingArray = new byte[length];
|
||||
int pos = 0;
|
||||
pos = Bytes.putInt(backingArray, pos, keylength);
|
||||
pos = Bytes.putInt(backingArray, pos, vlength);
|
||||
|
||||
// the row
|
||||
int elemLen = Compressor.uncompressIntoArray(backingArray,
|
||||
pos + Bytes.SIZEOF_SHORT, in, readContext.rowDict);
|
||||
checkLength(elemLen, Short.MAX_VALUE);
|
||||
pos = Bytes.putShort(backingArray, pos, (short)elemLen);
|
||||
pos += elemLen;
|
||||
|
||||
// family
|
||||
elemLen = Compressor.uncompressIntoArray(backingArray,
|
||||
pos + Bytes.SIZEOF_BYTE, in, readContext.familyDict);
|
||||
checkLength(elemLen, Byte.MAX_VALUE);
|
||||
pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
|
||||
pos += elemLen;
|
||||
|
||||
// qualifier
|
||||
elemLen = Compressor.uncompressIntoArray(backingArray, pos, in,
|
||||
readContext.qualifierDict);
|
||||
pos += elemLen;
|
||||
|
||||
// the rest
|
||||
in.readFully(backingArray, pos, length - pos);
|
||||
|
||||
return new KeyValue(backingArray);
|
||||
}
|
||||
|
||||
private static void checkLength(int len, int max) throws IOException {
|
||||
if (len < 0 || len > max) {
|
||||
throw new IOException(
|
||||
"Invalid length for compresesed portion of keyvalue: " + len);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compresses and writes ourKV to out, a DataOutput.
|
||||
*
|
||||
* @param out the DataOutput
|
||||
* @param keyVal the KV to compress and write
|
||||
* @param writeContext the compressionContext to use.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void writeKV(final DataOutput out, KeyValue keyVal,
|
||||
CompressionContext writeContext) throws IOException {
|
||||
byte[] backingArray = keyVal.getBuffer();
|
||||
int offset = keyVal.getOffset();
|
||||
|
||||
// we first write the KeyValue infrastructure as VInts.
|
||||
WritableUtils.writeVInt(out, keyVal.getKeyLength());
|
||||
WritableUtils.writeVInt(out, keyVal.getValueLength());
|
||||
|
||||
// now we write the row key, as the row key is likely to be repeated
|
||||
// We save space only if we attempt to compress elements with duplicates
|
||||
Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getRowOffset(),
|
||||
keyVal.getRowLength(), out, writeContext.rowDict);
|
||||
|
||||
|
||||
// now family, if it exists. if it doesn't, we write a 0 length array.
|
||||
Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getFamilyOffset(),
|
||||
keyVal.getFamilyLength(), out, writeContext.familyDict);
|
||||
|
||||
// qualifier next
|
||||
Compressor.writeCompressed(keyVal.getBuffer(), keyVal.getQualifierOffset(),
|
||||
keyVal.getQualifierLength(), out,
|
||||
writeContext.qualifierDict);
|
||||
|
||||
// now we write the rest uncompressed
|
||||
int pos = keyVal.getTimestampOffset();
|
||||
int remainingLength = keyVal.getLength() + offset - (pos);
|
||||
out.write(backingArray, pos, remainingLength);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,216 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* WALDictionary using an LRU eviction algorithm. Uses a linked list running
|
||||
* through a hashtable. Currently has max of 2^15 entries. Will start
|
||||
* evicting if exceeds this number The maximum memory we expect this dictionary
|
||||
* to take in the worst case is about:
|
||||
* <code>(2 ^ 15) * 5 (Regionname, Row key, CF, Column qual, table) * 100 bytes (these are some big names) = ~16MB</code>.
|
||||
* If you want to get silly, even at 1kb entries, it maxes out at 160 megabytes.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class LRUDictionary implements Dictionary {
|
||||
private final BidirectionalLRUMap backingStore = new BidirectionalLRUMap();
|
||||
|
||||
@Override
|
||||
public byte[] getEntry(short idx) {
|
||||
return backingStore.get(idx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public short findEntry(byte[] data, int offset, int length) {
|
||||
short ret = backingStore.findIdx(data, offset, length);
|
||||
if (ret == NOT_IN_DICTIONARY) {
|
||||
addEntry(data, offset, length);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public short addEntry(byte[] data, int offset, int length) {
|
||||
if (length <= 0) return NOT_IN_DICTIONARY;
|
||||
return backingStore.put(data, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
backingStore.clear();
|
||||
}
|
||||
|
||||
/*
|
||||
* Internal class used to implement LRU eviction and dual lookup (by key and
|
||||
* value).
|
||||
*
|
||||
* This is not thread safe. Don't use in multi-threaded applications.
|
||||
*/
|
||||
static class BidirectionalLRUMap {
|
||||
static final int MAX_SIZE = Short.MAX_VALUE;
|
||||
private int currSize = 0;
|
||||
|
||||
// Head and tail of the LRU list.
|
||||
private Node head;
|
||||
private Node tail;
|
||||
|
||||
private HashMap<Node, Short> nodeToIndex = new HashMap<Node, Short>();
|
||||
private Node[] indexToNode = new Node[MAX_SIZE];
|
||||
|
||||
public BidirectionalLRUMap() {
|
||||
for (int i = 0; i < MAX_SIZE; i++) {
|
||||
indexToNode[i] = new Node();
|
||||
}
|
||||
}
|
||||
|
||||
private short put(byte[] array, int offset, int length) {
|
||||
// We copy the bytes we want, otherwise we might be holding references to
|
||||
// massive arrays in our dictionary (or those arrays might change)
|
||||
byte[] stored = new byte[length];
|
||||
Bytes.putBytes(stored, 0, array, offset, length);
|
||||
|
||||
if (currSize < MAX_SIZE) {
|
||||
// There is space to add without evicting.
|
||||
indexToNode[currSize].setContents(stored, 0, stored.length);
|
||||
setHead(indexToNode[currSize]);
|
||||
short ret = (short) currSize++;
|
||||
nodeToIndex.put(indexToNode[ret], ret);
|
||||
return ret;
|
||||
} else {
|
||||
short s = nodeToIndex.remove(tail);
|
||||
tail.setContents(stored, 0, stored.length);
|
||||
// we need to rehash this.
|
||||
nodeToIndex.put(tail, s);
|
||||
moveToHead(tail);
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
private short findIdx(byte[] array, int offset, int length) {
|
||||
Short s;
|
||||
final Node comparisonNode = new Node();
|
||||
comparisonNode.setContents(array, offset, length);
|
||||
if ((s = nodeToIndex.get(comparisonNode)) != null) {
|
||||
moveToHead(indexToNode[s]);
|
||||
return s;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
private byte[] get(short idx) {
|
||||
Preconditions.checkElementIndex(idx, currSize);
|
||||
moveToHead(indexToNode[idx]);
|
||||
return indexToNode[idx].container;
|
||||
}
|
||||
|
||||
private void moveToHead(Node n) {
|
||||
if (head == n) {
|
||||
// no-op -- it's already the head.
|
||||
return;
|
||||
}
|
||||
// At this point we definitely have prev, since it's not the head.
|
||||
assert n.prev != null;
|
||||
// Unlink prev.
|
||||
n.prev.next = n.next;
|
||||
|
||||
// Unlink next
|
||||
if (n.next != null) {
|
||||
n.next.prev = n.prev;
|
||||
} else {
|
||||
assert n == tail;
|
||||
tail = n.prev;
|
||||
}
|
||||
// Node is now removed from the list. Re-add it at the head.
|
||||
setHead(n);
|
||||
}
|
||||
|
||||
private void setHead(Node n) {
|
||||
// assume it's already unlinked from the list at this point.
|
||||
n.prev = null;
|
||||
n.next = head;
|
||||
if (head != null) {
|
||||
assert head.prev == null;
|
||||
head.prev = n;
|
||||
}
|
||||
|
||||
head = n;
|
||||
|
||||
// First entry
|
||||
if (tail == null) {
|
||||
tail = n;
|
||||
}
|
||||
}
|
||||
|
||||
private void clear() {
|
||||
currSize = 0;
|
||||
nodeToIndex.clear();
|
||||
tail = null;
|
||||
head = null;
|
||||
|
||||
for (Node n : indexToNode) {
|
||||
n.container = null;
|
||||
}
|
||||
|
||||
for (int i = 0; i < MAX_SIZE; i++) {
|
||||
indexToNode[i].next = null;
|
||||
indexToNode[i].prev = null;
|
||||
}
|
||||
}
|
||||
|
||||
private static class Node {
|
||||
byte[] container;
|
||||
int offset;
|
||||
int length;
|
||||
Node next; // link towards the tail
|
||||
Node prev; // link towards the head
|
||||
|
||||
public Node() {
|
||||
}
|
||||
|
||||
private void setContents(byte[] container, int offset, int length) {
|
||||
this.container = container;
|
||||
this.offset = offset;
|
||||
this.length = length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Bytes.hashCode(container, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof Node)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Node casted = (Node) other;
|
||||
return Bytes.equals(container, offset, length, casted.container,
|
||||
casted.offset, casted.length);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,11 +22,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
|||
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.lang.Class;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -58,7 +55,6 @@ public class SequenceFileLogReader implements HLog.Reader {
|
|||
WALReader(final FileSystem fs, final Path p, final Configuration c)
|
||||
throws IOException {
|
||||
super(fs, p, c);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -69,6 +65,15 @@ public class SequenceFileLogReader implements HLog.Reader {
|
|||
bufferSize, length), length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this method after init() has been executed
|
||||
*
|
||||
* @return whether WAL compression is enabled
|
||||
*/
|
||||
public boolean isWALCompressionEnabled() {
|
||||
return SequenceFileLogWriter.isWALCompressionEnabled(this.getMetadata());
|
||||
}
|
||||
|
||||
/**
|
||||
* Override just so can intercept first call to getPos.
|
||||
*/
|
||||
|
@ -136,10 +141,15 @@ public class SequenceFileLogReader implements HLog.Reader {
|
|||
|
||||
Configuration conf;
|
||||
WALReader reader;
|
||||
|
||||
// Needed logging exceptions
|
||||
Path path;
|
||||
int edit = 0;
|
||||
long entryStart = 0;
|
||||
/**
|
||||
* Compression context to use reading. Can be null if no compression.
|
||||
*/
|
||||
private CompressionContext compressionContext = null;
|
||||
|
||||
protected Class<? extends HLogKey> keyClass;
|
||||
|
||||
|
@ -159,19 +169,35 @@ public class SequenceFileLogReader implements HLog.Reader {
|
|||
this.keyClass = keyClass;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void init(FileSystem fs, Path path, Configuration conf)
|
||||
throws IOException {
|
||||
this.conf = conf;
|
||||
this.path = path;
|
||||
reader = new WALReader(fs, path, conf);
|
||||
|
||||
// If compression is enabled, new dictionaries are created here.
|
||||
boolean compression = reader.isWALCompressionEnabled();
|
||||
if (compression) {
|
||||
try {
|
||||
if (compressionContext == null) {
|
||||
compressionContext = new CompressionContext(LRUDictionary.class);
|
||||
} else {
|
||||
compressionContext.clear();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Failed to initialize CompressionContext", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
reader.close();
|
||||
if (reader != null) {
|
||||
this.reader.close();
|
||||
this.reader = null;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw addFileInfoToException(ioe);
|
||||
}
|
||||
|
@ -205,6 +231,9 @@ public class SequenceFileLogReader implements HLog.Reader {
|
|||
}
|
||||
boolean b = false;
|
||||
try {
|
||||
if (compressionContext != null) {
|
||||
e.setCompressionContext(compressionContext);
|
||||
}
|
||||
b = this.reader.next(e.getKey(), e.getEdit());
|
||||
} catch (IOException ioe) {
|
||||
throw addFileInfoToException(ioe);
|
||||
|
@ -259,4 +288,4 @@ public class SequenceFileLogReader implements HLog.Reader {
|
|||
|
||||
return ioe;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import java.io.OutputStream;
|
|||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -33,7 +34,9 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||
import org.apache.hadoop.io.SequenceFile.Metadata;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
|
@ -45,6 +48,16 @@ import org.apache.hadoop.io.compress.DefaultCodec;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SequenceFileLogWriter implements HLog.Writer {
|
||||
static final Text WAL_VERSION_KEY = new Text("version");
|
||||
// Let the version be 1. Let absence of a version meta tag be old, version 0.
|
||||
// Set this version '1' to be the version that introduces compression,
|
||||
// the COMPRESSION_VERSION.
|
||||
private static final int COMPRESSION_VERSION = 1;
|
||||
static final int VERSION = COMPRESSION_VERSION;
|
||||
static final Text WAL_VERSION = new Text("" + VERSION);
|
||||
static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
|
||||
static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
|
||||
|
||||
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||
// The sequence file we delegate to.
|
||||
private SequenceFile.Writer writer;
|
||||
|
@ -54,6 +67,13 @@ public class SequenceFileLogWriter implements HLog.Writer {
|
|||
|
||||
private Class<? extends HLogKey> keyClass;
|
||||
|
||||
/**
|
||||
* Context used by our wal dictionary compressor. Null if we're not to do
|
||||
* our custom dictionary compression. This custom WAL compression is distinct
|
||||
* from sequencefile native compression.
|
||||
*/
|
||||
private CompressionContext compressionContext;
|
||||
|
||||
private Method syncFs = null;
|
||||
private Method hflush = null;
|
||||
|
||||
|
@ -74,9 +94,56 @@ public class SequenceFileLogWriter implements HLog.Writer {
|
|||
this.keyClass = keyClass;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create sequence file Metadata for our WAL file with version and compression
|
||||
* type (if any).
|
||||
* @param conf
|
||||
* @param compress
|
||||
* @return Metadata instance.
|
||||
*/
|
||||
private static Metadata createMetadata(final Configuration conf,
|
||||
final boolean compress) {
|
||||
TreeMap<Text, Text> metaMap = new TreeMap<Text, Text>();
|
||||
metaMap.put(WAL_VERSION_KEY, WAL_VERSION);
|
||||
if (compress) {
|
||||
// Currently we only do one compression type.
|
||||
metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE);
|
||||
}
|
||||
return new Metadata(metaMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this method after init() has been executed
|
||||
*
|
||||
* @return whether WAL compression is enabled
|
||||
*/
|
||||
static boolean isWALCompressionEnabled(final Metadata metadata) {
|
||||
// Check version is >= VERSION?
|
||||
Text txt = metadata.get(WAL_VERSION_KEY);
|
||||
if (txt == null || Integer.parseInt(txt.toString()) < COMPRESSION_VERSION) {
|
||||
return false;
|
||||
}
|
||||
// Now check that compression type is present. Currently only one value.
|
||||
txt = metadata.get(WAL_COMPRESSION_TYPE_KEY);
|
||||
return txt != null && txt.equals(DICTIONARY_COMPRESSION_TYPE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(FileSystem fs, Path path, Configuration conf)
|
||||
throws IOException {
|
||||
// Should we do our custom WAL compression?
|
||||
boolean compress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false);
|
||||
if (compress) {
|
||||
try {
|
||||
if (this.compressionContext == null) {
|
||||
this.compressionContext = new CompressionContext(LRUDictionary.class);
|
||||
} else {
|
||||
this.compressionContext.clear();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Failed to initiate CompressionContext", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (null == keyClass) {
|
||||
keyClass = HLog.getKeyClass(conf);
|
||||
|
@ -101,7 +168,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
|
|||
fs.getDefaultBlockSize())),
|
||||
Boolean.valueOf(false) /*createParent*/,
|
||||
SequenceFile.CompressionType.NONE, new DefaultCodec(),
|
||||
new Metadata()
|
||||
createMetadata(conf, compress)
|
||||
});
|
||||
} catch (InvocationTargetException ite) {
|
||||
// function was properly called, but threw it's own exception
|
||||
|
@ -123,7 +190,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
|
|||
SequenceFile.CompressionType.NONE,
|
||||
new DefaultCodec(),
|
||||
null,
|
||||
new Metadata());
|
||||
createMetadata(conf, compress));
|
||||
} else {
|
||||
LOG.debug("using new createWriter -- HADOOP-6840");
|
||||
}
|
||||
|
@ -133,7 +200,8 @@ public class SequenceFileLogWriter implements HLog.Writer {
|
|||
this.hflush = getHFlush();
|
||||
String msg = "Path=" + path +
|
||||
", syncFs=" + (this.syncFs != null) +
|
||||
", hflush=" + (this.hflush != null);
|
||||
", hflush=" + (this.hflush != null) +
|
||||
", compression=" + compress;
|
||||
if (this.syncFs != null || this.hflush != null) {
|
||||
LOG.debug(msg);
|
||||
} else {
|
||||
|
@ -207,6 +275,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
|
|||
|
||||
@Override
|
||||
public void append(HLog.Entry entry) throws IOException {
|
||||
entry.setCompressionContext(compressionContext);
|
||||
this.writer.append(entry.getKey(), entry.getEdit());
|
||||
}
|
||||
|
||||
|
|
|
@ -76,9 +76,15 @@ public class WALEdit implements Writable, HeapSize {
|
|||
private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
private NavigableMap<byte[], Integer> scopes;
|
||||
|
||||
private CompressionContext compressionContext;
|
||||
|
||||
public WALEdit() {
|
||||
}
|
||||
|
||||
public void setCompressionContext(final CompressionContext compressionContext) {
|
||||
this.compressionContext = compressionContext;
|
||||
}
|
||||
|
||||
public void add(KeyValue kv) {
|
||||
this.kvs.add(kv);
|
||||
}
|
||||
|
@ -116,9 +122,13 @@ public class WALEdit implements Writable, HeapSize {
|
|||
// this is new style HLog entry containing multiple KeyValues.
|
||||
int numEdits = in.readInt();
|
||||
for (int idx = 0; idx < numEdits; idx++) {
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.readFields(in);
|
||||
this.add(kv);
|
||||
if (compressionContext != null) {
|
||||
this.add(KeyValueCompression.readKV(in, compressionContext));
|
||||
} else {
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.readFields(in);
|
||||
this.add(kv);
|
||||
}
|
||||
}
|
||||
int numFamilies = in.readInt();
|
||||
if (numFamilies > 0) {
|
||||
|
@ -133,7 +143,7 @@ public class WALEdit implements Writable, HeapSize {
|
|||
}
|
||||
} else {
|
||||
// this is an old style HLog entry. The int that we just
|
||||
// read is actually the length of a single KeyValue.
|
||||
// read is actually the length of a single KeyValue
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.readFields(versionOrLength, in);
|
||||
this.add(kv);
|
||||
|
@ -146,7 +156,11 @@ public class WALEdit implements Writable, HeapSize {
|
|||
out.writeInt(kvs.size());
|
||||
// We interleave the two lists for code simplicity
|
||||
for (KeyValue kv : kvs) {
|
||||
kv.write(out);
|
||||
if (compressionContext != null) {
|
||||
KeyValueCompression.writeKV(out, kv, compressionContext);
|
||||
} else{
|
||||
kv.write(out);
|
||||
}
|
||||
}
|
||||
if (scopes == null) {
|
||||
out.writeInt(0);
|
||||
|
|
|
@ -1460,6 +1460,18 @@ public class Bytes {
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bytes array to hash
|
||||
* @param offset offset to start from
|
||||
* @param length length to hash
|
||||
* */
|
||||
public static int hashCode(byte[] bytes, int offset, int length) {
|
||||
int hash = 1;
|
||||
for (int i = offset; i < offset + length; i++)
|
||||
hash = (31 * hash) + (int) bytes[i];
|
||||
return hash;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param t operands
|
||||
* @return Array of byte arrays made from passed array of Text
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test our compressor class.
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestCompressor {
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToShort() {
|
||||
short s = 1;
|
||||
assertEquals(s, Compressor.toShort((byte)0, (byte)1));
|
||||
s <<= 8;
|
||||
assertEquals(s, Compressor.toShort((byte)1, (byte)0));
|
||||
}
|
||||
|
||||
@Test (expected = IllegalArgumentException.class)
|
||||
public void testNegativeToShort() {
|
||||
Compressor.toShort((byte)0xff, (byte)0xff);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompressingWithNullDictionaries() throws IOException {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
DataOutputStream dos = new DataOutputStream(baos);
|
||||
byte [] blahBytes = Bytes.toBytes("blah");
|
||||
Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, null);
|
||||
dos.close();
|
||||
byte [] dosbytes = baos.toByteArray();
|
||||
DataInputStream dis =
|
||||
new DataInputStream(new ByteArrayInputStream(dosbytes));
|
||||
byte [] product = Compressor.readCompressed(dis, null);
|
||||
assertTrue(Bytes.equals(blahBytes, product));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompressingWithClearDictionaries() throws IOException {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
DataOutputStream dos = new DataOutputStream(baos);
|
||||
Dictionary dictionary = new LRUDictionary();
|
||||
byte [] blahBytes = Bytes.toBytes("blah");
|
||||
Compressor.writeCompressed(blahBytes, 0, blahBytes.length, dos, dictionary);
|
||||
dos.close();
|
||||
byte [] dosbytes = baos.toByteArray();
|
||||
DataInputStream dis =
|
||||
new DataInputStream(new ByteArrayInputStream(dosbytes));
|
||||
dictionary = new LRUDictionary();
|
||||
byte [] product = Compressor.readCompressed(dis, dictionary);
|
||||
assertTrue(Bytes.equals(blahBytes, product));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestKeyValueCompression {
|
||||
private static final byte[] VALUE = Bytes.toBytes("fake value");
|
||||
private static final int BUF_SIZE = 256*1024;
|
||||
|
||||
@Test
|
||||
public void testCountingKVs() throws Exception {
|
||||
List<KeyValue> kvs = Lists.newArrayList();
|
||||
for (int i = 0; i < 400; i++) {
|
||||
byte[] row = Bytes.toBytes("row" + i);
|
||||
byte[] fam = Bytes.toBytes("fam" + i);
|
||||
byte[] qual = Bytes.toBytes("qual" + i);
|
||||
kvs.add(new KeyValue(row, fam, qual, 12345L, VALUE));
|
||||
}
|
||||
|
||||
runTestCycle(kvs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRepeatingKVs() throws Exception {
|
||||
List<KeyValue> kvs = Lists.newArrayList();
|
||||
for (int i = 0; i < 400; i++) {
|
||||
byte[] row = Bytes.toBytes("row" + (i % 10));
|
||||
byte[] fam = Bytes.toBytes("fam" + (i % 127));
|
||||
byte[] qual = Bytes.toBytes("qual" + (i % 128));
|
||||
kvs.add(new KeyValue(row, fam, qual, 12345L, VALUE));
|
||||
}
|
||||
|
||||
runTestCycle(kvs);
|
||||
}
|
||||
|
||||
private void runTestCycle(List<KeyValue> kvs) throws Exception {
|
||||
CompressionContext ctx = new CompressionContext(LRUDictionary.class);
|
||||
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
|
||||
for (KeyValue kv : kvs) {
|
||||
KeyValueCompression.writeKV(buf, kv, ctx);
|
||||
}
|
||||
|
||||
ctx.clear();
|
||||
DataInputStream in = new DataInputStream(new ByteArrayInputStream(
|
||||
buf.getData(), 0, buf.getLength()));
|
||||
for (KeyValue kv : kvs) {
|
||||
KeyValue readBack = KeyValueCompression.readKV(in, ctx);
|
||||
assertEquals(kv, readBack);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,155 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.util.Arrays;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Tests LRUDictionary
|
||||
*/
|
||||
@Category(SmallTests.class)
|
||||
public class TestLRUDictionary {
|
||||
LRUDictionary testee;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
testee = new LRUDictionary();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void TestContainsNothing() {
|
||||
assertTrue(isDictionaryEmpty(testee));
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert can't add empty array.
|
||||
*/
|
||||
@Test
|
||||
public void testPassingEmptyArrayToFindEntry() {
|
||||
assertEquals(Dictionary.NOT_IN_DICTIONARY,
|
||||
testee.findEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
|
||||
assertEquals(Dictionary.NOT_IN_DICTIONARY,
|
||||
testee.addEntry(HConstants.EMPTY_BYTE_ARRAY, 0, 0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPassingSameArrayToAddEntry() {
|
||||
// Add random predefined byte array, in this case a random byte array from
|
||||
// HConstants. Assert that when we add, we get new index. Thats how it
|
||||
// works.
|
||||
int len = HConstants.CATALOG_FAMILY.length;
|
||||
int index = testee.addEntry(HConstants.CATALOG_FAMILY, 0, len);
|
||||
assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
|
||||
assertFalse(index == testee.addEntry(HConstants.CATALOG_FAMILY, 0, len));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasic() {
|
||||
Random rand = new Random();
|
||||
byte[] testBytes = new byte[10];
|
||||
rand.nextBytes(testBytes);
|
||||
|
||||
// Verify that our randomly generated array doesn't exist in the dictionary
|
||||
assertEquals(testee.findEntry(testBytes, 0, testBytes.length), -1);
|
||||
|
||||
// now since we looked up an entry, we should have added it to the
|
||||
// dictionary, so it isn't empty
|
||||
|
||||
assertFalse(isDictionaryEmpty(testee));
|
||||
|
||||
// Check if we can find it using findEntry
|
||||
short t = testee.findEntry(testBytes, 0, testBytes.length);
|
||||
|
||||
// Making sure we do find what we're looking for
|
||||
assertTrue(t != -1);
|
||||
|
||||
byte[] testBytesCopy = new byte[20];
|
||||
|
||||
Bytes.putBytes(testBytesCopy, 10, testBytes, 0, testBytes.length);
|
||||
|
||||
// copy byte arrays, make sure that we check that equal byte arrays are
|
||||
// equal without just checking the reference
|
||||
assertEquals(testee.findEntry(testBytesCopy, 10, testBytes.length), t);
|
||||
|
||||
// make sure the entry retrieved is the same as the one put in
|
||||
assertTrue(Arrays.equals(testBytes, testee.getEntry(t)));
|
||||
|
||||
testee.clear();
|
||||
|
||||
// making sure clear clears the dictionary
|
||||
assertTrue(isDictionaryEmpty(testee));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void TestLRUPolicy(){
|
||||
//start by filling the dictionary up with byte arrays
|
||||
for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) {
|
||||
testee.findEntry((BigInteger.valueOf(i)).toByteArray(), 0,
|
||||
(BigInteger.valueOf(i)).toByteArray().length);
|
||||
}
|
||||
|
||||
// check we have the first element added
|
||||
assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
|
||||
BigInteger.ZERO.toByteArray().length) != -1);
|
||||
|
||||
// check for an element we know isn't there
|
||||
assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
|
||||
BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) == -1);
|
||||
|
||||
// since we just checked for this element, it should be there now.
|
||||
assertTrue(testee.findEntry(BigInteger.valueOf(Integer.MAX_VALUE).toByteArray(), 0,
|
||||
BigInteger.valueOf(Integer.MAX_VALUE).toByteArray().length) != -1);
|
||||
|
||||
// test eviction, that the least recently added or looked at element is
|
||||
// evicted. We looked at ZERO so it should be in the dictionary still.
|
||||
assertTrue(testee.findEntry(BigInteger.ZERO.toByteArray(), 0,
|
||||
BigInteger.ZERO.toByteArray().length) != -1);
|
||||
// Now go from beyond 1 to the end.
|
||||
for(int i = 1; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) {
|
||||
assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
|
||||
BigInteger.valueOf(i).toByteArray().length) == -1);
|
||||
}
|
||||
|
||||
// check we can find all of these.
|
||||
for (int i = 0; i < LRUDictionary.BidirectionalLRUMap.MAX_SIZE; i++) {
|
||||
assertTrue(testee.findEntry(BigInteger.valueOf(i).toByteArray(), 0,
|
||||
BigInteger.valueOf(i).toByteArray().length) != -1);
|
||||
}
|
||||
}
|
||||
|
||||
static private boolean isDictionaryEmpty(LRUDictionary dict) {
|
||||
try {
|
||||
dict.getEntry((short)0);
|
||||
return false;
|
||||
} catch (IndexOutOfBoundsException ioobe) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -61,7 +61,7 @@ import org.mockito.Mockito;
|
|||
@Category(MediumTests.class)
|
||||
public class TestWALReplay {
|
||||
public static final Log LOG = LogFactory.getLog(TestWALReplay.class);
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
|
||||
private Path hbaseRootDir = null;
|
||||
private Path oldLogDir;
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Enables compression and runs the TestWALReplay tests.
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestWALReplayCompressed extends TestWALReplay {
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TestWALReplay.setUpBeforeClass();
|
||||
Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue