HBASE-11857 Restore ReaderBase.initAfterCompression() and WALCellCodec.create(Configuration, CompressionContext) (Ted Yu)

This commit is contained in:
Andrew Purtell 2014-08-29 16:53:22 -07:00
parent 2ceb875957
commit cdfc96f294
5 changed files with 43 additions and 5 deletions

View File

@ -22,6 +22,7 @@ import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.io.CellOutputStream; import org.apache.hadoop.hbase.io.CellOutputStream;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
@ -32,7 +33,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
* and without presuming an hfile context. Intent is an Interface that will work for hfile and * and without presuming an hfile context. Intent is an Interface that will work for hfile and
* rpc. * rpc.
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
public interface Codec { public interface Codec {
// TODO: interfacing with {@link DataBlockEncoder} // TODO: interfacing with {@link DataBlockEncoder}
/** /**

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.io.LimitInputStream; import org.apache.hadoop.hbase.io.LimitInputStream;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
@ -54,7 +55,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
* which is appended at the end of the WAL. This is empty for now; it can contain some meta * which is appended at the end of the WAL. This is empty for now; it can contain some meta
* information such as Region level stats, etc in future. * information such as Region level stats, etc in future.
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
public class ProtobufLogReader extends ReaderBase { public class ProtobufLogReader extends ReaderBase {
private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class); private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL"); static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
@ -242,6 +243,11 @@ public class ProtobufLogReader extends ReaderBase {
return WALCellCodec.create(conf, cellCodecClsName, compressionContext); return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
} }
@Override
protected void initAfterCompression() throws IOException {
initAfterCompression(null);
}
@Override @Override
protected void initAfterCompression(String cellCodecClsName) throws IOException { protected void initAfterCompression(String cellCodecClsName) throws IOException {
WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext); WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);

View File

@ -28,12 +28,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.io.util.LRUDictionary;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
public abstract class ReaderBase implements HLog.Reader { public abstract class ReaderBase implements HLog.Reader {
private static final Log LOG = LogFactory.getLog(ReaderBase.class); private static final Log LOG = LogFactory.getLog(ReaderBase.class);
protected Configuration conf; protected Configuration conf;
@ -139,6 +140,11 @@ public abstract class ReaderBase implements HLog.Reader {
*/ */
protected abstract String initReader(FSDataInputStream stream) throws IOException; protected abstract String initReader(FSDataInputStream stream) throws IOException;
/**
* Initializes the compression after the shared stuff has been initialized. Called once.
*/
protected abstract void initAfterCompression() throws IOException;
/** /**
* Initializes the compression after the shared stuff has been initialized. Called once. * Initializes the compression after the shared stuff has been initialized. Called once.
* @param cellCodecClsName class name of cell Codec * @param cellCodecClsName class name of cell Codec

View File

@ -32,12 +32,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
public class SequenceFileLogReader extends ReaderBase { public class SequenceFileLogReader extends ReaderBase {
private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class); private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
@ -189,6 +190,11 @@ public class SequenceFileLogReader extends ReaderBase {
// Nothing to do here // Nothing to do here
} }
@Override
protected void initAfterCompression() throws IOException {
// Nothing to do here
}
@Override @Override
protected boolean hasCompression() { protected boolean hasCompression() {
return isWALCompressionEnabled(reader.getMetadata()); return isWALCompressionEnabled(reader.getMetadata());

View File

@ -25,6 +25,7 @@ import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.BaseDecoder; import org.apache.hadoop.hbase.codec.BaseDecoder;
import org.apache.hadoop.hbase.codec.BaseEncoder; import org.apache.hadoop.hbase.codec.BaseEncoder;
@ -46,7 +47,7 @@ import com.google.protobuf.ByteString;
* This codec is used at server side for writing cells to WAL as well as for sending edits * This codec is used at server side for writing cells to WAL as well as for sending edits
* as part of the distributed splitting process. * as part of the distributed splitting process.
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
public class WALCellCodec implements Codec { public class WALCellCodec implements Codec {
/** Configuration key for the class to use when encoding cells in the WAL */ /** Configuration key for the class to use when encoding cells in the WAL */
public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec"; public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
@ -102,6 +103,24 @@ public class WALCellCodec implements Codec {
{ Configuration.class, CompressionContext.class }, new Object[] { conf, compression }); { Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
} }
/**
* Create and setup a {@link WALCellCodec} from the
* CompressionContext.
* Cell Codec classname is read from {@link Configuration}.
* Fully prepares the codec for use.
* @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
* uses a {@link WALCellCodec}.
* @param compression compression the codec should use
* @return a {@link WALCellCodec} ready for use.
* @throws UnsupportedOperationException if the codec cannot be instantiated
*/
public static WALCellCodec create(Configuration conf,
CompressionContext compression) throws UnsupportedOperationException {
String cellCodecClsName = getWALCellCodecClass(conf);
return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
{ Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
}
public interface ByteStringCompressor { public interface ByteStringCompressor {
ByteString compress(byte[] data, Dictionary dict) throws IOException; ByteString compress(byte[] data, Dictionary dict) throws IOException;
} }