From de89172244f3f403ec4ab0c38229c779ac4bb6a1 Mon Sep 17 00:00:00 2001 From: anoopsjohn Date: Fri, 30 Oct 2015 21:19:12 +0530 Subject: [PATCH] HBASE-14557 MapReduce WALPlayer issue with NoTagsKeyValue. --- .../org/apache/hadoop/hbase/KeyValueUtil.java | 27 +++++++++++++++++++ .../apache/hadoop/hbase/mapreduce/Import.java | 4 +-- .../mapreduce/LoadIncrementalHFiles.java | 3 +-- .../hadoop/hbase/mapreduce/PutCombiner.java | 2 +- .../hbase/mapreduce/PutSortReducer.java | 2 +- .../hbase/mapreduce/TextSortReducer.java | 2 +- .../hadoop/hbase/mapreduce/WALPlayer.java | 2 +- .../hadoop/hbase/mapreduce/TestWALPlayer.java | 4 +-- 8 files changed, 35 insertions(+), 11 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index 4412da4017f..eb31b87baf6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -544,6 +544,33 @@ public class KeyValueUtil { return cell instanceof KeyValue? (KeyValue)cell: copyToNewKeyValue(cell); } + /** + * @param cell + * @return cell if it is an object of class {@link KeyValue} else we will return a + * new {@link KeyValue} instance made from cell Note: Even if the cell is an object + * of any of the subclass of {@link KeyValue}, we will create a new {@link KeyValue} object + * wrapping same buffer. This API should be used only with MR based tools which expect the type + * to be exactly KeyValue. That is the reason for doing this way. + * + * @deprecated without any replacement. + */ + @Deprecated + public static KeyValue ensureKeyValueTypeForMR(final Cell cell) { + if (cell == null) return null; + if (cell instanceof KeyValue) { + if (cell.getClass().getName().equals(KeyValue.class.getName())) { + return (KeyValue) cell; + } + // Cell is an Object of any of the sub classes of KeyValue. Make a new KeyValue wrapping the + // same byte[] + KeyValue kv = (KeyValue) cell; + KeyValue newKv = new KeyValue(kv.bytes, kv.offset, kv.length); + newKv.setSequenceId(kv.getSequenceId()); + return newKv; + } + return copyToNewKeyValue(cell); + } + @Deprecated public static List ensureKeyValues(List cells) { List lazyList = Lists.transform(cells, new Function() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 725eeb138b2..4ebdea0e592 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -207,7 +207,7 @@ public class Import { // skip if we filtered it out if (kv == null) continue; // TODO get rid of ensureKeyValue - KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)); + KeyValue ret = KeyValueUtil.ensureKeyValueTypeForMR(convertKv(kv, cfRenameMap)); context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret); } } @@ -270,7 +270,7 @@ public class Import { // skip if we filtered it out if (kv == null) continue; // TODO get rid of ensureKeyValue - context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap))); + context.write(row, KeyValueUtil.ensureKeyValueTypeForMR(convertKv(kv, cfRenameMap))); } } } catch (InterruptedException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 59d93b40f1b..443cd96d9e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -897,8 +897,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { HFileScanner scanner = halfReader.getScanner(false, false, false); scanner.seekTo(); do { - KeyValue kv = KeyValueUtil.ensureKeyValue(scanner.getKeyValue()); - halfWriter.append(kv); + halfWriter.append(scanner.getKeyValue()); } while (scanner.next()); for (Map.Entry entry : fileInfo.entrySet()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java index 1a0cfdb0623..a127ab2f993 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java @@ -68,7 +68,7 @@ public class PutCombiner extends Reducer { List cells = familyMap.get(entry.getKey()); List kvs = (cells != null) ? (List) cells : null; for (Cell cell : entry.getValue()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell); curSize += kv.heapSize(); if (kvs != null) { kvs.add(kv); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java index db9e585811d..1706aecf44c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -65,7 +65,7 @@ public class PutSortReducer extends Put p = iter.next(); for (List cells: p.getFamilyCellMap().values()) { for (Cell cell: cells) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell); map.add(kv); curSize += kv.heapSize(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java index 64530e1c637..c367645875b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java @@ -181,7 +181,7 @@ public class TextSortReducer extends parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i), tags); - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell); kvs.add(kv); curSize += kv.heapSize(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index f481c0b9869..6b5286bd96e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -100,7 +100,7 @@ public class WALPlayer extends Configured implements Tool { // skip all other tables if (Bytes.equals(table, key.getTablename().getName())) { for (Cell cell : value.getCells()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell); if (WALEdit.isMetaEditFamily(kv.getFamily())) continue; context.write(new ImmutableBytesWritable(kv.getRow()), kv); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 14cafdf5005..d6929a255bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -154,9 +154,7 @@ public class TestWALPlayer { WALEdit value = mock(WALEdit.class); ArrayList values = new ArrayList(); - KeyValue kv1 = mock(KeyValue.class); - when(kv1.getFamily()).thenReturn(Bytes.toBytes("family")); - when(kv1.getRow()).thenReturn(Bytes.toBytes("row")); + KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("q")); values.add(kv1); when(value.getCells()).thenReturn(values); mapper.setup(context);