HBASE-11857 Restore ReaderBase.initAfterCompression() and WALCellCodec.create(Configuration, CompressionContext) (Ted Yu)
This commit is contained in:
parent
8362e62222
commit
2668cc4672
|
@ -22,6 +22,7 @@ import java.io.OutputStream;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.CellOutputStream;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
|
||||
|
||||
|
@ -32,7 +33,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
|
|||
* and without presuming an hfile context. Intent is an Interface that will work for hfile and
|
||||
* rpc.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||
public interface Codec {
|
||||
// TODO: interfacing with {@link DataBlockEncoder}
|
||||
/**
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.hbase.codec.Codec;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.io.LimitInputStream;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader.Builder;
|
||||
|
@ -54,7 +55,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
|
|||
* which is appended at the end of the WAL. This is empty for now; it can contain some meta
|
||||
* information such as Region level stats, etc in future.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||
public class ProtobufLogReader extends ReaderBase {
|
||||
private static final Log LOG = LogFactory.getLog(ProtobufLogReader.class);
|
||||
static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
|
||||
|
@ -242,6 +243,11 @@ public class ProtobufLogReader extends ReaderBase {
|
|||
return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initAfterCompression() throws IOException {
|
||||
initAfterCompression(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initAfterCompression(String cellCodecClsName) throws IOException {
|
||||
WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
|
||||
|
|
|
@ -28,12 +28,13 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.io.util.LRUDictionary;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||
public abstract class ReaderBase implements HLog.Reader {
|
||||
private static final Log LOG = LogFactory.getLog(ReaderBase.class);
|
||||
protected Configuration conf;
|
||||
|
@ -139,6 +140,11 @@ public abstract class ReaderBase implements HLog.Reader {
|
|||
*/
|
||||
protected abstract String initReader(FSDataInputStream stream) throws IOException;
|
||||
|
||||
/**
|
||||
* Initializes the compression after the shared stuff has been initialized. Called once.
|
||||
*/
|
||||
protected abstract void initAfterCompression() throws IOException;
|
||||
|
||||
/**
|
||||
* Initializes the compression after the shared stuff has been initialized. Called once.
|
||||
* @param cellCodecClsName class name of cell Codec
|
||||
|
|
|
@ -32,12 +32,13 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.SequenceFile.Metadata;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||
public class SequenceFileLogReader extends ReaderBase {
|
||||
private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
|
||||
|
||||
|
@ -189,6 +190,11 @@ public class SequenceFileLogReader extends ReaderBase {
|
|||
// Nothing to do here
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initAfterCompression() throws IOException {
|
||||
// Nothing to do here
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean hasCompression() {
|
||||
return isWALCompressionEnabled(reader.getMetadata());
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.OutputStream;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.codec.BaseDecoder;
|
||||
import org.apache.hadoop.hbase.codec.BaseEncoder;
|
||||
|
@ -46,7 +47,7 @@ import com.google.protobuf.ByteString;
|
|||
* This codec is used at server side for writing cells to WAL as well as for sending edits
|
||||
* as part of the distributed splitting process.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||
public class WALCellCodec implements Codec {
|
||||
/** Configuration key for the class to use when encoding cells in the WAL */
|
||||
public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
|
||||
|
@ -102,6 +103,24 @@ public class WALCellCodec implements Codec {
|
|||
{ Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and setup a {@link WALCellCodec} from the
|
||||
* CompressionContext.
|
||||
* Cell Codec classname is read from {@link Configuration}.
|
||||
* Fully prepares the codec for use.
|
||||
* @param conf {@link Configuration} to read for the user-specified codec. If none is specified,
|
||||
* uses a {@link WALCellCodec}.
|
||||
* @param compression compression the codec should use
|
||||
* @return a {@link WALCellCodec} ready for use.
|
||||
* @throws UnsupportedOperationException if the codec cannot be instantiated
|
||||
*/
|
||||
public static WALCellCodec create(Configuration conf,
|
||||
CompressionContext compression) throws UnsupportedOperationException {
|
||||
String cellCodecClsName = getWALCellCodecClass(conf);
|
||||
return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
|
||||
{ Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
|
||||
}
|
||||
|
||||
public interface ByteStringCompressor {
|
||||
ByteString compress(byte[] data, Dictionary dict) throws IOException;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue