HBASE-8702: Make WALEditCodec pluggable
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1492407 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ba8801c2ff
commit
e9eff5e624
|
@ -175,7 +175,7 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initAfterCompression() throws IOException {
|
protected void initAfterCompression() throws IOException {
|
||||||
WALCellCodec codec = new WALCellCodec(this.compressionContext);
|
WALCellCodec codec = WALCellCodec.create(this.conf, this.compressionContext);
|
||||||
this.cellDecoder = codec.getDecoder(this.inputStream);
|
this.cellDecoder = codec.getDecoder(this.inputStream);
|
||||||
if (this.hasCompression) {
|
if (this.hasCompression) {
|
||||||
this.byteStringUncompressor = codec.getByteStringUncompressor();
|
this.byteStringUncompressor = codec.getByteStringUncompressor();
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class ProtobufLogWriter implements HLog.Writer {
|
||||||
output.write(ProtobufLogReader.PB_WAL_MAGIC);
|
output.write(ProtobufLogReader.PB_WAL_MAGIC);
|
||||||
WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output);
|
WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output);
|
||||||
|
|
||||||
WALCellCodec codec = new WALCellCodec(this.compressionContext);
|
WALCellCodec codec = WALCellCodec.create(conf, this.compressionContext);
|
||||||
this.cellEncoder = codec.getEncoder(this.output);
|
this.cellEncoder = codec.getEncoder(this.output);
|
||||||
if (doCompress) {
|
if (doCompress) {
|
||||||
this.compressor = codec.getByteStringCompressor();
|
this.compressor = codec.getByteStringCompressor();
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.codec.BaseDecoder;
|
import org.apache.hadoop.hbase.codec.BaseDecoder;
|
||||||
|
@ -29,6 +30,7 @@ import org.apache.hadoop.hbase.codec.BaseEncoder;
|
||||||
import org.apache.hadoop.hbase.codec.Codec;
|
import org.apache.hadoop.hbase.codec.Codec;
|
||||||
import org.apache.hadoop.hbase.codec.KeyValueCodec;
|
import org.apache.hadoop.hbase.codec.KeyValueCodec;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
@ -39,6 +41,9 @@ import com.google.protobuf.ByteString;
|
||||||
* This is a pure coincidence... they are independent and don't have to be compatible.
|
* This is a pure coincidence... they are independent and don't have to be compatible.
|
||||||
*/
|
*/
|
||||||
public class WALCellCodec implements Codec {
|
public class WALCellCodec implements Codec {
|
||||||
|
/** Configuration key for the class to use when encoding cells in the WAL */
|
||||||
|
public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
|
||||||
|
|
||||||
private final CompressionContext compression;
|
private final CompressionContext compression;
|
||||||
private final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
|
private final ByteStringUncompressor statelessUncompressor = new ByteStringUncompressor() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -47,10 +52,33 @@ public class WALCellCodec implements Codec {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
public WALCellCodec(CompressionContext compression) {
|
/**
|
||||||
|
* Default constructor - <b>all subclasses must implement a constructor with this signature </b>
|
||||||
|
* if they are to be dynamically loaded from the {@link Configuration}.
|
||||||
|
* @param conf configuration to configure <tt>this</tt>
|
||||||
|
* @param compression compression the codec should support, can be <tt>null</tt> to indicate no
|
||||||
|
* compression
|
||||||
|
*/
|
||||||
|
public WALCellCodec(Configuration conf, CompressionContext compression) {
|
||||||
this.compression = compression;
|
this.compression = compression;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create and setup a {@link WALCellCodec} from the {@link Configuration} and CompressionContext,
|
||||||
|
* if they have been specified. Fully prepares the codec for use.
|
||||||
|
* @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
|
||||||
|
* uses a {@link WALCellCodec}.
|
||||||
|
* @param compression compression the codec should use
|
||||||
|
* @return a {@link WALCellCodec} ready for use.
|
||||||
|
* @throws UnsupportedOperationException if the codec cannot be instantiated
|
||||||
|
*/
|
||||||
|
public static WALCellCodec create(Configuration conf, CompressionContext compression)
|
||||||
|
throws UnsupportedOperationException {
|
||||||
|
String className = conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
|
||||||
|
return ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class,
|
||||||
|
CompressionContext.class }, new Object[] { conf, compression });
|
||||||
|
}
|
||||||
|
|
||||||
public interface ByteStringCompressor {
|
public interface ByteStringCompressor {
|
||||||
ByteString compress(byte[] data, Dictionary dict) throws IOException;
|
ByteString compress(byte[] data, Dictionary dict) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that we can create, load, setup our own custom codec
|
||||||
|
*/
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestCustomWALCellCodec {
|
||||||
|
|
||||||
|
public static class CustomWALCellCodec extends WALCellCodec {
|
||||||
|
public Configuration conf;
|
||||||
|
public CompressionContext context;
|
||||||
|
|
||||||
|
public CustomWALCellCodec(Configuration conf, CompressionContext compression) {
|
||||||
|
super(conf, compression);
|
||||||
|
this.conf = conf;
|
||||||
|
this.context = compression;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that a custom {@link WALCellCodec} will be completely setup when it is instantiated via
|
||||||
|
* {@link WALCellCodec}
|
||||||
|
* @throws Exception on failure
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCreatePreparesCodec() throws Exception {
|
||||||
|
Configuration conf = new Configuration(false);
|
||||||
|
conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, CustomWALCellCodec.class,
|
||||||
|
WALCellCodec.class);
|
||||||
|
CustomWALCellCodec codec = (CustomWALCellCodec) WALCellCodec.create(conf, null);
|
||||||
|
assertEquals("Custom codec didn't get initialized with the right configuration!", conf,
|
||||||
|
codec.conf);
|
||||||
|
assertEquals("Custom codec didn't get initialized with the right compression context!", null,
|
||||||
|
codec.context);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue