HBASE-18649 Deprecate KV Usage in MR to move to Cells in 3.0 (ram)
This commit is contained in:
parent
16d483f900
commit
0a24178d06
|
@ -24,22 +24,21 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
|
||||
import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
|
||||
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
|
||||
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.MapReduceCell;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
|
@ -47,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
|||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
|
||||
|
@ -70,24 +70,15 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool {
|
|||
|
||||
/**
|
||||
* A mapper that just writes out cells. This one can be used together with
|
||||
* {@link KeyValueSortReducer}
|
||||
* {@link CellSortReducer}
|
||||
*/
|
||||
static class HFileCellMapper extends
|
||||
Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
|
||||
Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
|
||||
|
||||
@Override
|
||||
public void map(NullWritable key, KeyValue value, Context context) throws IOException,
|
||||
InterruptedException {
|
||||
// Convert value to KeyValue if subclass
|
||||
if (!value.getClass().equals(KeyValue.class)) {
|
||||
value =
|
||||
new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
|
||||
value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
|
||||
value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
|
||||
value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
|
||||
value.getValueOffset(), value.getValueLength());
|
||||
}
|
||||
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
|
||||
public void map(NullWritable key, Cell value, Context context)
|
||||
throws IOException, InterruptedException {
|
||||
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), new MapReduceCell(value));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -119,14 +110,14 @@ public class MapReduceHFileSplitterJob extends Configured implements Tool {
|
|||
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
|
||||
TableName tableName = TableName.valueOf(tabName);
|
||||
job.setMapperClass(HFileCellMapper.class);
|
||||
job.setReducerClass(KeyValueSortReducer.class);
|
||||
job.setReducerClass(CellSortReducer.class);
|
||||
Path outputDir = new Path(hfileOutPath);
|
||||
FileOutputFormat.setOutputPath(job, outputDir);
|
||||
job.setMapOutputValueClass(KeyValue.class);
|
||||
job.setMapOutputValueClass(MapReduceCell.class);
|
||||
try (Connection conn = ConnectionFactory.createConnection(conf);
|
||||
Table table = conn.getTable(tableName);
|
||||
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
|
||||
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
|
||||
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
|
||||
}
|
||||
LOG.debug("success configuring load incremental job");
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.KeyValue.COLUMN_FAMILY_DELIMITER;
|
|||
import static org.apache.hadoop.hbase.KeyValue.getDelimiter;
|
||||
import static org.apache.hadoop.hbase.KeyValue.COLUMN_FAMILY_DELIM_ARRAY;
|
||||
|
||||
import java.io.DataOutput;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
@ -1465,9 +1466,12 @@ public final class CellUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Estimate based on keyvalue's serialization format.
|
||||
* Estimate based on keyvalue's serialization format in the RPC layer. Note that there is an extra
|
||||
* SIZEOF_INT added to the size here that indicates the actual length of the cell for cases where
|
||||
* cell's are serialized in a contiguous format (For eg in RPCs).
|
||||
* @param cell
|
||||
* @return Estimate of the <code>cell</code> size in bytes.
|
||||
* @return Estimate of the <code>cell</code> size in bytes plus an extra SIZEOF_INT indicating the
|
||||
* actual cell length.
|
||||
*/
|
||||
public static int estimatedSerializedSizeOf(final Cell cell) {
|
||||
if (cell instanceof ExtendedCell) {
|
||||
|
@ -1764,7 +1768,7 @@ public final class CellUtil {
|
|||
* @param out
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void writeFlatKey(Cell cell, DataOutputStream out) throws IOException {
|
||||
public static void writeFlatKey(Cell cell, DataOutput out) throws IOException {
|
||||
short rowLen = cell.getRowLength();
|
||||
byte fLen = cell.getFamilyLength();
|
||||
int qLen = cell.getQualifierLength();
|
||||
|
@ -1790,6 +1794,69 @@ public final class CellUtil {
|
|||
out.writeByte(cell.getTypeByte());
|
||||
}
|
||||
|
||||
/**
|
||||
* Deep clones the given cell if the cell supports deep cloning
|
||||
* @param cell the cell to be cloned
|
||||
* @return the cloned cell
|
||||
* @throws CloneNotSupportedException
|
||||
*/
|
||||
public static Cell deepClone(Cell cell) throws CloneNotSupportedException {
|
||||
if (cell instanceof ExtendedCell) {
|
||||
return ((ExtendedCell) cell).deepClone();
|
||||
}
|
||||
throw new CloneNotSupportedException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the cell to the given OutputStream
|
||||
* @param cell the cell to be written
|
||||
* @param out the outputstream
|
||||
* @param withTags if tags are to be written or not
|
||||
* @return the total bytes written
|
||||
* @throws IOException
|
||||
*/
|
||||
public static int writeCell(Cell cell, OutputStream out, boolean withTags) throws IOException {
|
||||
if (cell instanceof ExtendedCell) {
|
||||
return ((ExtendedCell) cell).write(out, withTags);
|
||||
} else {
|
||||
ByteBufferUtils.putInt(out, CellUtil.estimatedSerializedSizeOfKey(cell));
|
||||
ByteBufferUtils.putInt(out, cell.getValueLength());
|
||||
CellUtil.writeFlatKey(cell, out);
|
||||
CellUtil.writeValue(out, cell, cell.getValueLength());
|
||||
int tagsLength = cell.getTagsLength();
|
||||
if (withTags) {
|
||||
byte[] len = new byte[Bytes.SIZEOF_SHORT];
|
||||
Bytes.putAsShort(len, 0, tagsLength);
|
||||
out.write(len);
|
||||
if (tagsLength > 0) {
|
||||
CellUtil.writeTags(out, cell, tagsLength);
|
||||
}
|
||||
}
|
||||
int lenWritten = (2 * Bytes.SIZEOF_INT) + CellUtil.estimatedSerializedSizeOfKey(cell)
|
||||
+ cell.getValueLength();
|
||||
if (withTags) {
|
||||
lenWritten += Bytes.SIZEOF_SHORT + tagsLength;
|
||||
}
|
||||
return lenWritten;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a cell to the buffer at the given offset
|
||||
* @param cell the cell to be written
|
||||
* @param buf the buffer to which the cell has to be wrriten
|
||||
* @param offset the offset at which the cell should be written
|
||||
*/
|
||||
public static void writeCellToBuffer(Cell cell, ByteBuffer buf, int offset) {
|
||||
if (cell instanceof ExtendedCell) {
|
||||
((ExtendedCell) cell).write(buf, offset);
|
||||
} else {
|
||||
// Using the KVUtil
|
||||
byte[] bytes = KeyValueUtil.copyToNewByteArray(cell);
|
||||
ByteBufferUtils.copyFromArrayToBuffer(buf, offset, bytes, 0, bytes.length);
|
||||
}
|
||||
}
|
||||
|
||||
public static int writeFlatKey(Cell cell, OutputStream out) throws IOException {
|
||||
short rowLen = cell.getRowLength();
|
||||
byte fLen = cell.getFamilyLength();
|
||||
|
@ -1844,7 +1911,7 @@ public final class CellUtil {
|
|||
public static void writeRowSkippingBytes(DataOutputStream out, Cell cell, short rlength,
|
||||
int commonPrefix) throws IOException {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(),
|
||||
ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getRowByteBuffer(),
|
||||
((ByteBufferCell) cell).getRowPosition() + commonPrefix, rlength - commonPrefix);
|
||||
} else {
|
||||
out.write(cell.getRowArray(), cell.getRowOffset() + commonPrefix, rlength - commonPrefix);
|
||||
|
@ -1894,7 +1961,7 @@ public final class CellUtil {
|
|||
public static void writeQualifierSkippingBytes(DataOutputStream out, Cell cell,
|
||||
int qlength, int commonPrefix) throws IOException {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
|
||||
ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
|
||||
((ByteBufferCell) cell).getQualifierPosition() + commonPrefix, qlength - commonPrefix);
|
||||
} else {
|
||||
out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonPrefix,
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutput;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -262,7 +263,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
ByteBufferUtils.putCompressedInt(out, kLength);
|
||||
ByteBufferUtils.putCompressedInt(out, vLength);
|
||||
ByteBufferUtils.putCompressedInt(out, 0);
|
||||
CellUtil.writeFlatKey(cell, out);
|
||||
CellUtil.writeFlatKey(cell, (DataOutput)out);
|
||||
// Write the value part
|
||||
CellUtil.writeValue(out, cell, cell.getValueLength());
|
||||
} else {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutput;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -59,7 +60,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder {
|
|||
ByteBufferUtils.putCompressedInt(out, klength);
|
||||
ByteBufferUtils.putCompressedInt(out, vlength);
|
||||
ByteBufferUtils.putCompressedInt(out, 0);
|
||||
CellUtil.writeFlatKey(cell, out);
|
||||
CellUtil.writeFlatKey(cell, (DataOutput)out);
|
||||
} else {
|
||||
// find a common prefix and skip it
|
||||
int common = CellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true);
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
@ -193,6 +194,28 @@ public final class ByteBufferUtils {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy data from a buffer to an output stream. Does not update the position
|
||||
* in the buffer.
|
||||
* @param out the output stream to write bytes to
|
||||
* @param in the buffer to read bytes from
|
||||
* @param offset the offset in the buffer (from the buffer's array offset)
|
||||
* to start copying bytes from
|
||||
* @param length the number of bytes to copy
|
||||
*/
|
||||
public static void copyBufferToStream(DataOutput out, ByteBuffer in, int offset, int length)
|
||||
throws IOException {
|
||||
if (out instanceof ByteBufferWriter) {
|
||||
((ByteBufferWriter) out).write(in, offset, length);
|
||||
} else if (in.hasArray()) {
|
||||
out.write(in.array(), in.arrayOffset() + offset, length);
|
||||
} else {
|
||||
for (int i = 0; i < length; ++i) {
|
||||
out.write(toByte(in, offset + i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static int putLong(OutputStream out, final long value,
|
||||
final int fitInBytes) throws IOException {
|
||||
long tmpValue = value;
|
||||
|
|
|
@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -488,4 +490,119 @@ public class TestCellUtil {
|
|||
bbCell = new ByteBufferKeyValue(buffer, 0, buffer.remaining());
|
||||
assertEquals(bd, CellUtil.getValueAsBigDecimal(bbCell));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteCell() throws IOException {
|
||||
byte[] r = Bytes.toBytes("row1");
|
||||
byte[] f = Bytes.toBytes("cf1");
|
||||
byte[] q1 = Bytes.toBytes("qual1");
|
||||
byte[] q2 = Bytes.toBytes("qual2");
|
||||
byte[] v = Bytes.toBytes("val1");
|
||||
byte[] tags = Bytes.toBytes("tag1");
|
||||
KeyValue kv = new KeyValue(r, f, q1, 0, q1.length, 1234L, Type.Put, v, 0, v.length, tags);
|
||||
NonExtendedCell nonExtCell = new NonExtendedCell(kv);
|
||||
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
||||
int writeCell = CellUtil.writeCell(nonExtCell, os, true);
|
||||
byte[] byteArray = os.toByteArray();
|
||||
KeyValue res = new KeyValue(byteArray);
|
||||
assertTrue(CellUtil.equals(kv, res));
|
||||
}
|
||||
|
||||
private static class NonExtendedCell implements Cell {
|
||||
private KeyValue kv;
|
||||
|
||||
public NonExtendedCell(KeyValue kv) {
|
||||
this.kv = kv;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getRowArray() {
|
||||
return this.kv.getRowArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRowOffset() {
|
||||
return this.kv.getRowOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getRowLength() {
|
||||
return this.kv.getRowLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getFamilyArray() {
|
||||
return this.kv.getFamilyArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFamilyOffset() {
|
||||
return this.kv.getFamilyOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getFamilyLength() {
|
||||
return this.kv.getFamilyLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getQualifierArray() {
|
||||
return this.kv.getQualifierArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQualifierOffset() {
|
||||
return this.kv.getQualifierOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQualifierLength() {
|
||||
return this.kv.getQualifierLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimestamp() {
|
||||
return this.kv.getTimestamp();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getTypeByte() {
|
||||
return this.kv.getTypeByte();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSequenceId() {
|
||||
return this.kv.getSequenceId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getValueArray() {
|
||||
return this.kv.getValueArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueOffset() {
|
||||
return this.kv.getValueOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueLength() {
|
||||
return this.kv.getValueLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getTagsArray() {
|
||||
return this.kv.getTagsArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTagsOffset() {
|
||||
return this.kv.getTagsOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTagsLength() {
|
||||
return this.kv.getTagsLength();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,31 +23,35 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.ExtendedCell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.io.serializer.Deserializer;
|
||||
import org.apache.hadoop.io.serializer.Serialization;
|
||||
import org.apache.hadoop.io.serializer.Serializer;
|
||||
|
||||
@InterfaceAudience.Public
|
||||
public class KeyValueSerialization implements Serialization<KeyValue> {
|
||||
public class CellSerialization implements Serialization<Cell> {
|
||||
@Override
|
||||
public boolean accept(Class<?> c) {
|
||||
return KeyValue.class.isAssignableFrom(c);
|
||||
return Cell.class.isAssignableFrom(c);
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValueDeserializer getDeserializer(Class<KeyValue> t) {
|
||||
return new KeyValueDeserializer();
|
||||
public CellDeserializer getDeserializer(Class<Cell> t) {
|
||||
return new CellDeserializer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValueSerializer getSerializer(Class<KeyValue> c) {
|
||||
return new KeyValueSerializer();
|
||||
public CellSerializer getSerializer(Class<Cell> c) {
|
||||
return new CellSerializer();
|
||||
}
|
||||
|
||||
public static class KeyValueDeserializer implements Deserializer<KeyValue> {
|
||||
public static class CellDeserializer implements Deserializer<Cell> {
|
||||
private DataInputStream dis;
|
||||
|
||||
@Override
|
||||
|
@ -56,7 +60,7 @@ public class KeyValueSerialization implements Serialization<KeyValue> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public KeyValue deserialize(KeyValue ignore) throws IOException {
|
||||
public KeyValue deserialize(Cell ignore) throws IOException {
|
||||
// I can't overwrite the passed in KV, not from a proto kv, not just yet. TODO
|
||||
return KeyValueUtil.create(this.dis);
|
||||
}
|
||||
|
@ -67,7 +71,7 @@ public class KeyValueSerialization implements Serialization<KeyValue> {
|
|||
}
|
||||
}
|
||||
|
||||
public static class KeyValueSerializer implements Serializer<KeyValue> {
|
||||
public static class CellSerializer implements Serializer<Cell> {
|
||||
private DataOutputStream dos;
|
||||
|
||||
@Override
|
||||
|
@ -81,8 +85,9 @@ public class KeyValueSerialization implements Serialization<KeyValue> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void serialize(KeyValue kv) throws IOException {
|
||||
KeyValueUtil.write(kv, this.dos);
|
||||
public void serialize(Cell kv) throws IOException {
|
||||
dos.writeInt(CellUtil.estimatedSerializedSizeOf(kv) - Bytes.SIZEOF_INT);
|
||||
CellUtil.writeCell(kv, dos, true);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,39 +18,42 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.mapreduce;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.MapReduceCell;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Emits sorted KeyValues.
|
||||
* Reads in all KeyValues from passed Iterator, sorts them, then emits
|
||||
* KeyValues in sorted order. If lots of columns per row, it will use lots of
|
||||
* Emits sorted Cells.
|
||||
* Reads in all Cells from passed Iterator, sorts them, then emits
|
||||
* Cells in sorted order. If lots of columns per row, it will use lots of
|
||||
* memory sorting.
|
||||
* @see HFileOutputFormat2
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class KeyValueSortReducer
|
||||
extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
|
||||
protected void reduce(ImmutableBytesWritable row, Iterable<KeyValue> kvs,
|
||||
Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
|
||||
public class CellSortReducer
|
||||
extends Reducer<ImmutableBytesWritable, Cell, ImmutableBytesWritable, Cell> {
|
||||
protected void reduce(ImmutableBytesWritable row, Iterable<Cell> kvs,
|
||||
Reducer<ImmutableBytesWritable, Cell, ImmutableBytesWritable, Cell>.Context context)
|
||||
throws java.io.IOException, InterruptedException {
|
||||
TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
|
||||
for (KeyValue kv: kvs) {
|
||||
TreeSet<Cell> map = new TreeSet<>(CellComparator.COMPARATOR);
|
||||
for (Cell kv : kvs) {
|
||||
try {
|
||||
map.add(kv.clone());
|
||||
map.add(CellUtil.deepClone(kv));
|
||||
} catch (CloneNotSupportedException e) {
|
||||
throw new java.io.IOException(e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
context.setStatus("Read " + map.getClass());
|
||||
int index = 0;
|
||||
for (KeyValue kv: map) {
|
||||
context.write(row, kv);
|
||||
for (Cell kv: map) {
|
||||
context.write(row, new MapReduceCell(kv));
|
||||
if (++index % 100 == 0) context.setStatus("Wrote " + index);
|
||||
}
|
||||
}
|
|
@ -139,7 +139,7 @@ public class CopyTable extends Configured implements Tool {
|
|||
job.setNumReduceTasks(0);
|
||||
|
||||
if (bulkload) {
|
||||
TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
|
||||
TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null,
|
||||
null, job);
|
||||
|
||||
// We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
|
||||
|
|
|
@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.MapReduceCell;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
@ -90,7 +91,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
|
|||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
|
@ -233,14 +233,13 @@ public class HFileOutputFormat2
|
|||
private final Map<byte[], WriterLength> writers =
|
||||
new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
|
||||
private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
|
||||
private final long now = EnvironmentEdgeManager.currentTime();
|
||||
private boolean rollRequested = false;
|
||||
|
||||
@Override
|
||||
public void write(ImmutableBytesWritable row, V cell)
|
||||
throws IOException {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
|
||||
Cell kv = cell;
|
||||
// null input == user explicitly wants to flush
|
||||
if (row == null && kv == null) {
|
||||
rollWriters();
|
||||
|
@ -248,7 +247,7 @@ public class HFileOutputFormat2
|
|||
}
|
||||
|
||||
byte[] rowKey = CellUtil.cloneRow(kv);
|
||||
long length = kv.getLength();
|
||||
int length = (CellUtil.estimatedSerializedSizeOf(kv)) - Bytes.SIZEOF_INT;
|
||||
byte[] family = CellUtil.cloneFamily(kv);
|
||||
byte[] tableNameBytes = null;
|
||||
if (writeMultipleTables) {
|
||||
|
@ -337,7 +336,8 @@ public class HFileOutputFormat2
|
|||
}
|
||||
|
||||
// we now have the proper WAL writer. full steam ahead
|
||||
kv.updateLatestStamp(this.now);
|
||||
// TODO : Currently in SettableTimeStamp but this will also move to ExtendedCell
|
||||
CellUtil.updateLatestStamp(cell, this.now);
|
||||
wl.writer.append(kv);
|
||||
wl.written += length;
|
||||
|
||||
|
@ -578,10 +578,11 @@ public class HFileOutputFormat2
|
|||
configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
|
||||
}
|
||||
|
||||
static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, Class<? extends OutputFormat<?, ?>> cls) throws IOException {
|
||||
static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
|
||||
Class<? extends OutputFormat<?, ?>> cls) throws IOException {
|
||||
Configuration conf = job.getConfiguration();
|
||||
job.setOutputKeyClass(ImmutableBytesWritable.class);
|
||||
job.setOutputValueClass(KeyValue.class);
|
||||
job.setOutputValueClass(MapReduceCell.class);
|
||||
job.setOutputFormatClass(cls);
|
||||
|
||||
if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
|
||||
|
@ -595,8 +596,9 @@ public class HFileOutputFormat2
|
|||
// Based on the configured map output class, set the correct reducer to properly
|
||||
// sort the incoming values.
|
||||
// TODO it would be nice to pick one or the other of these formats.
|
||||
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
|
||||
job.setReducerClass(KeyValueSortReducer.class);
|
||||
if (KeyValue.class.equals(job.getMapOutputValueClass())
|
||||
|| MapReduceCell.class.equals(job.getMapOutputValueClass())) {
|
||||
job.setReducerClass(CellSortReducer.class);
|
||||
} else if (Put.class.equals(job.getMapOutputValueClass())) {
|
||||
job.setReducerClass(PutSortReducer.class);
|
||||
} else if (Text.class.equals(job.getMapOutputValueClass())) {
|
||||
|
@ -607,7 +609,7 @@ public class HFileOutputFormat2
|
|||
|
||||
conf.setStrings("io.serializations", conf.get("io.serializations"),
|
||||
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
|
||||
KeyValueSerialization.class.getName());
|
||||
CellSerialization.class.getName());
|
||||
|
||||
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
|
||||
LOG.info("bulkload locality sensitive enabled");
|
||||
|
@ -655,7 +657,7 @@ public class HFileOutputFormat2
|
|||
Configuration conf = job.getConfiguration();
|
||||
|
||||
job.setOutputKeyClass(ImmutableBytesWritable.class);
|
||||
job.setOutputValueClass(KeyValue.class);
|
||||
job.setOutputValueClass(MapReduceCell.class);
|
||||
job.setOutputFormatClass(HFileOutputFormat2.class);
|
||||
|
||||
ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.MapReduceCell;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.io.RawComparator;
|
||||
|
@ -96,11 +97,11 @@ public class Import extends Configured implements Tool {
|
|||
|
||||
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
||||
|
||||
public static class KeyValueWritableComparablePartitioner
|
||||
extends Partitioner<KeyValueWritableComparable, KeyValue> {
|
||||
private static KeyValueWritableComparable[] START_KEYS = null;
|
||||
public static class CellWritableComparablePartitioner
|
||||
extends Partitioner<CellWritableComparable, Cell> {
|
||||
private static CellWritableComparable[] START_KEYS = null;
|
||||
@Override
|
||||
public int getPartition(KeyValueWritableComparable key, KeyValue value,
|
||||
public int getPartition(CellWritableComparable key, Cell value,
|
||||
int numPartitions) {
|
||||
for (int i = 0; i < START_KEYS.length; ++i) {
|
||||
if (key.compareTo(START_KEYS[i]) <= 0) {
|
||||
|
@ -112,27 +113,29 @@ public class Import extends Configured implements Tool {
|
|||
|
||||
}
|
||||
|
||||
public static class KeyValueWritableComparable
|
||||
implements WritableComparable<KeyValueWritableComparable> {
|
||||
public static class CellWritableComparable
|
||||
implements WritableComparable<CellWritableComparable> {
|
||||
|
||||
private KeyValue kv = null;
|
||||
private Cell kv = null;
|
||||
|
||||
static {
|
||||
// register this comparator
|
||||
WritableComparator.define(KeyValueWritableComparable.class,
|
||||
new KeyValueWritableComparator());
|
||||
WritableComparator.define(CellWritableComparable.class,
|
||||
new CellWritableComparator());
|
||||
}
|
||||
|
||||
public KeyValueWritableComparable() {
|
||||
public CellWritableComparable() {
|
||||
}
|
||||
|
||||
public KeyValueWritableComparable(KeyValue kv) {
|
||||
public CellWritableComparable(Cell kv) {
|
||||
this.kv = kv;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
KeyValue.write(kv, out);
|
||||
out.writeInt(CellUtil.estimatedSerializedSizeOfKey(kv));
|
||||
out.writeInt(0);
|
||||
CellUtil.writeFlatKey(kv, out);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -143,18 +146,18 @@ public class Import extends Configured implements Tool {
|
|||
@Override
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
|
||||
justification="This is wrong, yes, but we should be purging Writables, not fixing them")
|
||||
public int compareTo(KeyValueWritableComparable o) {
|
||||
return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv);
|
||||
public int compareTo(CellWritableComparable o) {
|
||||
return CellComparator.COMPARATOR.compare(this.kv, ((CellWritableComparable)o).kv);
|
||||
}
|
||||
|
||||
public static class KeyValueWritableComparator extends WritableComparator {
|
||||
public static class CellWritableComparator extends WritableComparator {
|
||||
|
||||
@Override
|
||||
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
|
||||
try {
|
||||
KeyValueWritableComparable kv1 = new KeyValueWritableComparable();
|
||||
CellWritableComparable kv1 = new CellWritableComparable();
|
||||
kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
|
||||
KeyValueWritableComparable kv2 = new KeyValueWritableComparable();
|
||||
CellWritableComparable kv2 = new CellWritableComparable();
|
||||
kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
|
||||
return compare(kv1, kv2);
|
||||
} catch (IOException e) {
|
||||
|
@ -166,18 +169,19 @@ public class Import extends Configured implements Tool {
|
|||
|
||||
}
|
||||
|
||||
public static class KeyValueReducer
|
||||
public static class CellReducer
|
||||
extends
|
||||
Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
|
||||
Reducer<CellWritableComparable, Cell, ImmutableBytesWritable, Cell> {
|
||||
protected void reduce(
|
||||
KeyValueWritableComparable row,
|
||||
Iterable<KeyValue> kvs,
|
||||
Reducer<KeyValueWritableComparable,
|
||||
KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
|
||||
CellWritableComparable row,
|
||||
Iterable<Cell> kvs,
|
||||
Reducer<CellWritableComparable,
|
||||
Cell, ImmutableBytesWritable, Cell>.Context context)
|
||||
throws java.io.IOException, InterruptedException {
|
||||
int index = 0;
|
||||
for (KeyValue kv : kvs) {
|
||||
context.write(new ImmutableBytesWritable(kv.getRowArray()), kv);
|
||||
for (Cell kv : kvs) {
|
||||
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)),
|
||||
new MapReduceCell(kv));
|
||||
if (++index % 100 == 0)
|
||||
context.setStatus("Wrote " + index + " KeyValues, "
|
||||
+ "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
|
||||
|
@ -185,11 +189,11 @@ public class Import extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
public static class KeyValueSortImporter
|
||||
extends TableMapper<KeyValueWritableComparable, KeyValue> {
|
||||
public static class CellSortImporter
|
||||
extends TableMapper<CellWritableComparable, Cell> {
|
||||
private Map<byte[], byte[]> cfRenameMap;
|
||||
private Filter filter;
|
||||
private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
|
||||
private static final Log LOG = LogFactory.getLog(CellImporter.class);
|
||||
|
||||
/**
|
||||
* @param row The current table row key.
|
||||
|
@ -213,9 +217,8 @@ public class Import extends Configured implements Tool {
|
|||
kv = filterKv(filter, kv);
|
||||
// skip if we filtered it out
|
||||
if (kv == null) continue;
|
||||
// TODO get rid of ensureKeyValue
|
||||
KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
|
||||
context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
|
||||
Cell ret = convertKv(kv, cfRenameMap);
|
||||
context.write(new CellWritableComparable(ret), ret);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -236,13 +239,13 @@ public class Import extends Configured implements Tool {
|
|||
if (startKeys.length != reduceNum) {
|
||||
throw new IOException("Region split after job initialization");
|
||||
}
|
||||
KeyValueWritableComparable[] startKeyWraps =
|
||||
new KeyValueWritableComparable[startKeys.length - 1];
|
||||
CellWritableComparable[] startKeyWraps =
|
||||
new CellWritableComparable[startKeys.length - 1];
|
||||
for (int i = 1; i < startKeys.length; ++i) {
|
||||
startKeyWraps[i - 1] =
|
||||
new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
|
||||
new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
|
||||
}
|
||||
KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps;
|
||||
CellWritableComparablePartitioner.START_KEYS = startKeyWraps;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -252,10 +255,10 @@ public class Import extends Configured implements Tool {
|
|||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
|
||||
justification="Writables are going away and this has been this way forever")
|
||||
public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
|
||||
public static class CellImporter extends TableMapper<ImmutableBytesWritable, Cell> {
|
||||
private Map<byte[], byte[]> cfRenameMap;
|
||||
private Filter filter;
|
||||
private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
|
||||
private static final Log LOG = LogFactory.getLog(CellImporter.class);
|
||||
|
||||
/**
|
||||
* @param row The current table row key.
|
||||
|
@ -279,8 +282,7 @@ public class Import extends Configured implements Tool {
|
|||
kv = filterKv(filter, kv);
|
||||
// skip if we filtered it out
|
||||
if (kv == null) continue;
|
||||
// TODO get rid of ensureKeyValue
|
||||
context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
|
||||
context.write(row, new MapReduceCell(convertKv(kv, cfRenameMap)));
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -505,21 +507,21 @@ public class Import extends Configured implements Tool {
|
|||
if(cfRenameMap != null) {
|
||||
// If there's a rename mapping for this CF, create a new KeyValue
|
||||
byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
|
||||
if(newCfName != null) {
|
||||
kv = new KeyValue(kv.getRowArray(), // row buffer
|
||||
kv.getRowOffset(), // row offset
|
||||
kv.getRowLength(), // row length
|
||||
newCfName, // CF buffer
|
||||
0, // CF offset
|
||||
newCfName.length, // CF length
|
||||
kv.getQualifierArray(), // qualifier buffer
|
||||
kv.getQualifierOffset(), // qualifier offset
|
||||
kv.getQualifierLength(), // qualifier length
|
||||
kv.getTimestamp(), // timestamp
|
||||
KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
|
||||
kv.getValueArray(), // value buffer
|
||||
kv.getValueOffset(), // value offset
|
||||
kv.getValueLength()); // value length
|
||||
if (newCfName != null) {
|
||||
kv = new KeyValue(kv.getRowArray(), // row buffer
|
||||
kv.getRowOffset(), // row offset
|
||||
kv.getRowLength(), // row length
|
||||
newCfName, // CF buffer
|
||||
0, // CF offset
|
||||
newCfName.length, // CF length
|
||||
kv.getQualifierArray(), // qualifier buffer
|
||||
kv.getQualifierOffset(), // qualifier offset
|
||||
kv.getQualifierLength(), // qualifier length
|
||||
kv.getTimestamp(), // timestamp
|
||||
KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
|
||||
kv.getValueArray(), // value buffer
|
||||
kv.getValueOffset(), // value offset
|
||||
kv.getValueLength()); // value length
|
||||
}
|
||||
}
|
||||
return kv;
|
||||
|
@ -626,35 +628,35 @@ public class Import extends Configured implements Tool {
|
|||
Table table = conn.getTable(tableName);
|
||||
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
|
||||
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
|
||||
job.setMapperClass(KeyValueSortImporter.class);
|
||||
job.setReducerClass(KeyValueReducer.class);
|
||||
job.setMapperClass(CellSortImporter.class);
|
||||
job.setReducerClass(CellReducer.class);
|
||||
Path outputDir = new Path(hfileOutPath);
|
||||
FileOutputFormat.setOutputPath(job, outputDir);
|
||||
job.setMapOutputKeyClass(KeyValueWritableComparable.class);
|
||||
job.setMapOutputValueClass(KeyValue.class);
|
||||
job.setMapOutputKeyClass(CellWritableComparable.class);
|
||||
job.setMapOutputValueClass(MapReduceCell.class);
|
||||
job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
|
||||
KeyValueWritableComparable.KeyValueWritableComparator.class,
|
||||
CellWritableComparable.CellWritableComparator.class,
|
||||
RawComparator.class);
|
||||
Path partitionsPath =
|
||||
new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
|
||||
FileSystem fs = FileSystem.get(job.getConfiguration());
|
||||
fs.deleteOnExit(partitionsPath);
|
||||
job.setPartitionerClass(KeyValueWritableComparablePartitioner.class);
|
||||
job.setPartitionerClass(CellWritableComparablePartitioner.class);
|
||||
job.setNumReduceTasks(regionLocator.getStartKeys().length);
|
||||
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
|
||||
org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
|
||||
}
|
||||
} else if (hfileOutPath != null) {
|
||||
LOG.info("writing to hfiles for bulk load.");
|
||||
job.setMapperClass(KeyValueImporter.class);
|
||||
job.setMapperClass(CellImporter.class);
|
||||
try (Connection conn = ConnectionFactory.createConnection(conf);
|
||||
Table table = conn.getTable(tableName);
|
||||
RegionLocator regionLocator = conn.getRegionLocator(tableName)){
|
||||
job.setReducerClass(KeyValueSortReducer.class);
|
||||
job.setReducerClass(CellSortReducer.class);
|
||||
Path outputDir = new Path(hfileOutPath);
|
||||
FileOutputFormat.setOutputPath(job, outputDir);
|
||||
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||
job.setMapOutputValueClass(KeyValue.class);
|
||||
job.setMapOutputValueClass(MapReduceCell.class);
|
||||
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
|
||||
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
|
||||
org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
|
||||
|
|
|
@ -584,7 +584,7 @@ public class ImportTsv extends Configured implements Tool {
|
|||
job.getConfiguration().setStrings("io.serializations",
|
||||
job.getConfiguration().get("io.serializations"),
|
||||
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
|
||||
KeyValueSerialization.class.getName());
|
||||
CellSerialization.class.getName());
|
||||
}
|
||||
TableMapReduceUtil.addDependencyJars(job);
|
||||
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
|
||||
|
|
|
@ -48,7 +48,7 @@ import org.apache.hadoop.util.StringUtils;
|
|||
* Puts in sorted order. If lots of columns per row, it will use lots of
|
||||
* memory sorting.
|
||||
* @see HFileOutputFormat2
|
||||
* @see KeyValueSortReducer
|
||||
* @see CellSortReducer
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public class PutSortReducer extends
|
||||
|
|
|
@ -208,7 +208,7 @@ public class TableMapReduceUtil {
|
|||
conf.set(TableInputFormat.SCAN, convertScanToString(scan));
|
||||
conf.setStrings("io.serializations", conf.get("io.serializations"),
|
||||
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
|
||||
KeyValueSerialization.class.getName());
|
||||
CellSerialization.class.getName());
|
||||
if (addDependencyJars) {
|
||||
addDependencyJars(job);
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ import org.apache.hadoop.util.StringUtils;
|
|||
/**
|
||||
* Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit.
|
||||
* @see HFileOutputFormat2
|
||||
* @see KeyValueSortReducer
|
||||
* @see CellSortReducer
|
||||
* @see PutSortReducer
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
|
|
|
@ -31,10 +31,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
|
@ -44,8 +41,9 @@ import org.apache.hadoop.hbase.client.RegionLocator;
|
|||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.MapReduceCell;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
|
@ -53,6 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
|||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A tool to replay WAL files as a M/R job.
|
||||
|
@ -94,10 +93,10 @@ public class WALPlayer extends Configured implements Tool {
|
|||
|
||||
/**
|
||||
* A mapper that just writes out KeyValues.
|
||||
* This one can be used together with {@link KeyValueSortReducer}
|
||||
* This one can be used together with {@link CellSortReducer}
|
||||
*/
|
||||
static class WALKeyValueMapper
|
||||
extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
|
||||
extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell> {
|
||||
private byte[] table;
|
||||
|
||||
@Override
|
||||
|
@ -108,11 +107,11 @@ public class WALPlayer extends Configured implements Tool {
|
|||
// skip all other tables
|
||||
if (Bytes.equals(table, key.getTablename().getName())) {
|
||||
for (Cell cell : value.getCells()) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
if (WALEdit.isMetaEditFamily(kv)) {
|
||||
if (WALEdit.isMetaEditFamily(cell)) {
|
||||
continue;
|
||||
}
|
||||
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv);
|
||||
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)),
|
||||
new MapReduceCell(cell));
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -300,10 +299,10 @@ public class WALPlayer extends Configured implements Tool {
|
|||
}
|
||||
TableName tableName = TableName.valueOf(tables[0]);
|
||||
job.setMapperClass(WALKeyValueMapper.class);
|
||||
job.setReducerClass(KeyValueSortReducer.class);
|
||||
job.setReducerClass(CellSortReducer.class);
|
||||
Path outputDir = new Path(hfileOutPath);
|
||||
FileOutputFormat.setOutputPath(job, outputDir);
|
||||
job.setMapOutputValueClass(KeyValue.class);
|
||||
job.setMapOutputValueClass(MapReduceCell.class);
|
||||
try (Connection conn = ConnectionFactory.createConnection(conf);
|
||||
Table table = conn.getTable(tableName);
|
||||
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
|
||||
|
|
|
@ -0,0 +1,270 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.apache.hadoop.hbase.ByteBufferCell;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.ExtendedCell;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A wrapper for a cell to be used with mapreduce, as the output value class for mappers/reducers.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class MapReduceCell extends ByteBufferCell implements ExtendedCell {
|
||||
|
||||
private final Cell cell;
|
||||
|
||||
public MapReduceCell(Cell cell) {
|
||||
this.cell = cell;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getRowArray() {
|
||||
return this.cell.getRowArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRowOffset() {
|
||||
return this.cell.getRowOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public short getRowLength() {
|
||||
return this.cell.getRowLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getFamilyArray() {
|
||||
return this.cell.getFamilyArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFamilyOffset() {
|
||||
return this.cell.getFamilyOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getFamilyLength() {
|
||||
return this.cell.getFamilyLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getQualifierArray() {
|
||||
return this.cell.getQualifierArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQualifierOffset() {
|
||||
return this.cell.getQualifierOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQualifierLength() {
|
||||
return this.cell.getQualifierLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimestamp() {
|
||||
return this.cell.getTimestamp();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getTypeByte() {
|
||||
return this.cell.getTypeByte();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSequenceId() {
|
||||
return this.cell.getSequenceId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getValueArray() {
|
||||
return this.cell.getValueArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueOffset() {
|
||||
return this.cell.getValueOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueLength() {
|
||||
return this.cell.getValueLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getTagsArray() {
|
||||
return this.cell.getTagsArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTagsOffset() {
|
||||
return this.cell.getTagsOffset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTagsLength() {
|
||||
return this.cell.getTagsLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getRowByteBuffer() {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return ((ByteBufferCell) this.cell).getRowByteBuffer();
|
||||
} else {
|
||||
return ByteBuffer.wrap(CellUtil.cloneRow(this.cell));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRowPosition() {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return ((ByteBufferCell) this.cell).getRowPosition();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getFamilyByteBuffer() {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return ((ByteBufferCell) this.cell).getFamilyByteBuffer();
|
||||
} else {
|
||||
return ByteBuffer.wrap(CellUtil.cloneFamily(this.cell));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFamilyPosition() {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return ((ByteBufferCell) this.cell).getFamilyPosition();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getQualifierByteBuffer() {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return ((ByteBufferCell) this.cell).getQualifierByteBuffer();
|
||||
} else {
|
||||
return ByteBuffer.wrap(CellUtil.cloneQualifier(this.cell));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQualifierPosition() {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return ((ByteBufferCell) this.cell).getQualifierPosition();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getValueByteBuffer() {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return ((ByteBufferCell) this.cell).getValueByteBuffer();
|
||||
} else {
|
||||
return ByteBuffer.wrap(CellUtil.cloneValue(this.cell));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValuePosition() {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return ((ByteBufferCell) this.cell).getValuePosition();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getTagsByteBuffer() {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return ((ByteBufferCell) this.cell).getTagsByteBuffer();
|
||||
} else {
|
||||
return ByteBuffer.wrap(CellUtil.cloneTags(this.cell));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTagsPosition() {
|
||||
if (cell instanceof ByteBufferCell) {
|
||||
return ((ByteBufferCell) this.cell).getTagsPosition();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.cell.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSequenceId(long seqId) throws IOException {
|
||||
CellUtil.setSequenceId(cell, seqId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimestamp(long ts) throws IOException {
|
||||
CellUtil.setTimestamp(cell, ts);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimestamp(byte[] ts, int tsOffset) throws IOException {
|
||||
CellUtil.setTimestamp(cell, ts, tsOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
return CellUtil.estimatedHeapSizeOf(cell);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int write(OutputStream out, boolean withTags) throws IOException {
|
||||
return CellUtil.writeCell(cell, out, withTags);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSerializedSize(boolean withTags) {
|
||||
return CellUtil.estimatedSerializedSizeOf(cell) - Bytes.SIZEOF_INT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ByteBuffer buf, int offset) {
|
||||
CellUtil.writeCellToBuffer(cell, buf, offset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtendedCell deepClone() {
|
||||
try {
|
||||
return (ExtendedCell) CellUtil.deepClone(cell);
|
||||
} catch (CloneNotSupportedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -440,12 +440,12 @@ public class TestHFileOutputFormat2 {
|
|||
// Set start and end rows for partitioner.
|
||||
SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
|
||||
SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
|
||||
job.setReducerClass(KeyValueSortReducer.class);
|
||||
job.setReducerClass(CellSortReducer.class);
|
||||
job.setOutputFormatClass(HFileOutputFormat2.class);
|
||||
job.setNumReduceTasks(4);
|
||||
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
|
||||
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
|
||||
KeyValueSerialization.class.getName());
|
||||
CellSerialization.class.getName());
|
||||
|
||||
FileOutputFormat.setOutputPath(job, testDir);
|
||||
assertTrue(job.waitForCompletion(false));
|
||||
|
@ -764,7 +764,7 @@ public class TestHFileOutputFormat2 {
|
|||
job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
|
||||
job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
|
||||
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
|
||||
KeyValueSerialization.class.getName());
|
||||
CellSerialization.class.getName());
|
||||
setupRandomGeneratorMapper(job, putSortReducer);
|
||||
if (tableInfo.size() > 1) {
|
||||
MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.util.MapReduceCell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
@ -63,7 +64,7 @@ import org.apache.hadoop.hbase.filter.Filter;
|
|||
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
|
||||
import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
|
||||
|
@ -664,7 +665,7 @@ public class TestImportExport {
|
|||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Test
|
||||
public void testKeyValueImporter() throws Throwable {
|
||||
KeyValueImporter importer = new KeyValueImporter();
|
||||
CellImporter importer = new CellImporter();
|
||||
Configuration configuration = new Configuration();
|
||||
Context ctx = mock(Context.class);
|
||||
when(ctx.getConfiguration()).thenReturn(configuration);
|
||||
|
@ -674,12 +675,12 @@ public class TestImportExport {
|
|||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
|
||||
KeyValue key = (KeyValue) invocation.getArguments()[1];
|
||||
MapReduceCell key = (MapReduceCell) invocation.getArguments()[1];
|
||||
assertEquals("Key", Bytes.toString(writer.get()));
|
||||
assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
|
||||
return null;
|
||||
}
|
||||
}).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
|
||||
}).when(ctx).write(any(ImmutableBytesWritable.class), any(MapReduceCell.class));
|
||||
|
||||
importer.setup(ctx);
|
||||
Result value = mock(Result.class);
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|||
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
|
||||
import org.apache.hadoop.hbase.util.MapReduceCell;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
|
@ -168,7 +169,7 @@ public class TestWALPlayer {
|
|||
WALKey key = mock(WALKey.class);
|
||||
when(key.getTablename()).thenReturn(TableName.valueOf("table"));
|
||||
@SuppressWarnings("unchecked")
|
||||
Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context = mock(Context.class);
|
||||
Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell>.Context context = mock(Context.class);
|
||||
when(context.getConfiguration()).thenReturn(configuration);
|
||||
|
||||
WALEdit value = mock(WALEdit.class);
|
||||
|
@ -184,12 +185,12 @@ public class TestWALPlayer {
|
|||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
|
||||
KeyValue key = (KeyValue) invocation.getArguments()[1];
|
||||
MapReduceCell key = (MapReduceCell) invocation.getArguments()[1];
|
||||
assertEquals("row", Bytes.toString(writer.get()));
|
||||
assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
|
||||
return null;
|
||||
}
|
||||
}).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
|
||||
}).when(context).write(any(ImmutableBytesWritable.class), any(MapReduceCell.class));
|
||||
|
||||
mapper.map(key, value, context);
|
||||
|
||||
|
|
Loading…
Reference in New Issue