HBASE-14557 MapReduce WALPlayer issue with NoTagsKeyValue.
This commit is contained in:
parent
a3842caf77
commit
de89172244
|
@ -544,6 +544,33 @@ public class KeyValueUtil {
|
|||
return cell instanceof KeyValue? (KeyValue)cell: copyToNewKeyValue(cell);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param cell
|
||||
* @return <code>cell</code> if it is an object of class {@link KeyValue} else we will return a
|
||||
* new {@link KeyValue} instance made from <code>cell</code> Note: Even if the cell is an object
|
||||
* of any of the subclass of {@link KeyValue}, we will create a new {@link KeyValue} object
|
||||
* wrapping same buffer. This API should be used only with MR based tools which expect the type
|
||||
* to be exactly KeyValue. That is the reason for doing this way.
|
||||
*
|
||||
* @deprecated without any replacement.
|
||||
*/
|
||||
@Deprecated
|
||||
public static KeyValue ensureKeyValueTypeForMR(final Cell cell) {
|
||||
if (cell == null) return null;
|
||||
if (cell instanceof KeyValue) {
|
||||
if (cell.getClass().getName().equals(KeyValue.class.getName())) {
|
||||
return (KeyValue) cell;
|
||||
}
|
||||
// Cell is an Object of any of the sub classes of KeyValue. Make a new KeyValue wrapping the
|
||||
// same byte[]
|
||||
KeyValue kv = (KeyValue) cell;
|
||||
KeyValue newKv = new KeyValue(kv.bytes, kv.offset, kv.length);
|
||||
newKv.setSequenceId(kv.getSequenceId());
|
||||
return newKv;
|
||||
}
|
||||
return copyToNewKeyValue(cell);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static List<KeyValue> ensureKeyValues(List<Cell> cells) {
|
||||
List<KeyValue> lazyList = Lists.transform(cells, new Function<Cell, KeyValue>() {
|
||||
|
|
|
@ -207,7 +207,7 @@ public class Import {
|
|||
// skip if we filtered it out
|
||||
if (kv == null) continue;
|
||||
// TODO get rid of ensureKeyValue
|
||||
KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
|
||||
KeyValue ret = KeyValueUtil.ensureKeyValueTypeForMR(convertKv(kv, cfRenameMap));
|
||||
context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
|
||||
}
|
||||
}
|
||||
|
@ -270,7 +270,7 @@ public class Import {
|
|||
// skip if we filtered it out
|
||||
if (kv == null) continue;
|
||||
// TODO get rid of ensureKeyValue
|
||||
context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
|
||||
context.write(row, KeyValueUtil.ensureKeyValueTypeForMR(convertKv(kv, cfRenameMap)));
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -897,8 +897,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
HFileScanner scanner = halfReader.getScanner(false, false, false);
|
||||
scanner.seekTo();
|
||||
do {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(scanner.getKeyValue());
|
||||
halfWriter.append(kv);
|
||||
halfWriter.append(scanner.getKeyValue());
|
||||
} while (scanner.next());
|
||||
|
||||
for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
|
||||
|
|
|
@ -68,7 +68,7 @@ public class PutCombiner<K> extends Reducer<K, Put, K, Put> {
|
|||
List<Cell> cells = familyMap.get(entry.getKey());
|
||||
List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null;
|
||||
for (Cell cell : entry.getValue()) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell);
|
||||
curSize += kv.heapSize();
|
||||
if (kvs != null) {
|
||||
kvs.add(kv);
|
||||
|
|
|
@ -65,7 +65,7 @@ public class PutSortReducer extends
|
|||
Put p = iter.next();
|
||||
for (List<Cell> cells: p.getFamilyCellMap().values()) {
|
||||
for (Cell cell: cells) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell);
|
||||
map.add(kv);
|
||||
curSize += kv.heapSize();
|
||||
}
|
||||
|
|
|
@ -181,7 +181,7 @@ public class TextSortReducer extends
|
|||
parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
|
||||
parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
|
||||
parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell);
|
||||
kvs.add(kv);
|
||||
curSize += kv.heapSize();
|
||||
}
|
||||
|
|
|
@ -100,7 +100,7 @@ public class WALPlayer extends Configured implements Tool {
|
|||
// skip all other tables
|
||||
if (Bytes.equals(table, key.getTablename().getName())) {
|
||||
for (Cell cell : value.getCells()) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell);
|
||||
if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
|
||||
context.write(new ImmutableBytesWritable(kv.getRow()), kv);
|
||||
}
|
||||
|
|
|
@ -154,9 +154,7 @@ public class TestWALPlayer {
|
|||
|
||||
WALEdit value = mock(WALEdit.class);
|
||||
ArrayList<Cell> values = new ArrayList<Cell>();
|
||||
KeyValue kv1 = mock(KeyValue.class);
|
||||
when(kv1.getFamily()).thenReturn(Bytes.toBytes("family"));
|
||||
when(kv1.getRow()).thenReturn(Bytes.toBytes("row"));
|
||||
KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("q"));
|
||||
values.add(kv1);
|
||||
when(value.getCells()).thenReturn(values);
|
||||
mapper.setup(context);
|
||||
|
|
Loading…
Reference in New Issue