HBASE-21492 CellCodec Written To WAL Before It's Verified
This commit is contained in:
parent
a53b85ec22
commit
15ebfe42a3
|
@ -364,7 +364,7 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
// No reducers.
|
// No reducers.
|
||||||
job.setNumReduceTasks(0);
|
job.setNumReduceTasks(0);
|
||||||
}
|
}
|
||||||
String codecCls = WALCellCodec.getWALCellCodecClass(conf);
|
String codecCls = WALCellCodec.getWALCellCodecClass(conf).getName();
|
||||||
try {
|
try {
|
||||||
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Class.forName(codecCls));
|
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Class.forName(codecCls));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -80,7 +80,8 @@ public abstract class AbstractProtobufLogWriter {
|
||||||
builder.setWriterClsName(getWriterClassName());
|
builder.setWriterClsName(getWriterClassName());
|
||||||
}
|
}
|
||||||
if (!builder.hasCellCodecClsName()) {
|
if (!builder.hasCellCodecClsName()) {
|
||||||
builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf));
|
builder.setCellCodecClsName(
|
||||||
|
WALCellCodec.getWALCellCodecClass(conf).getName());
|
||||||
}
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.io.OutputStream;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
@ -82,8 +81,8 @@ public class WALCellCodec implements Codec {
|
||||||
this.compression = compression;
|
this.compression = compression;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getWALCellCodecClass(Configuration conf) {
|
public static Class<?> getWALCellCodecClass(Configuration conf) {
|
||||||
return conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
|
return conf.getClass(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -102,7 +101,7 @@ public class WALCellCodec implements Codec {
|
||||||
public static WALCellCodec create(Configuration conf, String cellCodecClsName,
|
public static WALCellCodec create(Configuration conf, String cellCodecClsName,
|
||||||
CompressionContext compression) throws UnsupportedOperationException {
|
CompressionContext compression) throws UnsupportedOperationException {
|
||||||
if (cellCodecClsName == null) {
|
if (cellCodecClsName == null) {
|
||||||
cellCodecClsName = getWALCellCodecClass(conf);
|
cellCodecClsName = getWALCellCodecClass(conf).getName();
|
||||||
}
|
}
|
||||||
return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
|
return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
|
||||||
{ Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
|
{ Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
|
||||||
|
@ -121,7 +120,7 @@ public class WALCellCodec implements Codec {
|
||||||
*/
|
*/
|
||||||
public static WALCellCodec create(Configuration conf,
|
public static WALCellCodec create(Configuration conf,
|
||||||
CompressionContext compression) throws UnsupportedOperationException {
|
CompressionContext compression) throws UnsupportedOperationException {
|
||||||
String cellCodecClsName = getWALCellCodecClass(conf);
|
String cellCodecClsName = getWALCellCodecClass(conf).getName();
|
||||||
return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
|
return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[]
|
||||||
{ Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
|
{ Configuration.class, CompressionContext.class }, new Object[] { conf, compression });
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,4 +64,15 @@ public class TestCustomWALCellCodec {
|
||||||
assertEquals("Custom codec didn't get initialized with the right compression context!", null,
|
assertEquals("Custom codec didn't get initialized with the right compression context!", null,
|
||||||
codec.context);
|
codec.context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that a custom {@link WALCellCodec} will fail if provided an invalid
|
||||||
|
* code class.
|
||||||
|
*/
|
||||||
|
@Test(expected = RuntimeException.class)
|
||||||
|
public void testCreatePreparesCodecInvalidClass() throws Exception {
|
||||||
|
Configuration conf = new Configuration(false);
|
||||||
|
conf.setStrings(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, "org.apache.hbase.wal.NoSuchClass");
|
||||||
|
WALCellCodec.create(conf, null, null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue