From f40c745499659cb6c06f2f5b98996f035beee046 Mon Sep 17 00:00:00 2001 From: langdamao Date: Wed, 9 Oct 2019 07:42:54 +0800 Subject: [PATCH] HBASE-22887 Fix HFileOutputFormat2 writer roll (#554) Signed-off-by: langdamao --- .../hbase/mapreduce/HFileOutputFormat2.java | 14 ++++------- .../mapreduce/TestHFileOutputFormat2.java | 23 ++++++++++++++++++- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 32531e6da1a..ebdf9cdbaa5 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -247,9 +247,9 @@ public class HFileOutputFormat2 // Map of families to writers and how much has been output on the writer. private final Map writers = new TreeMap<>(Bytes.BYTES_COMPARATOR); - private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; + private final Map previousRows = + new TreeMap<>(Bytes.BYTES_COMPARATOR); private final long now = EnvironmentEdgeManager.currentTime(); - private boolean rollRequested = false; @Override public void write(ImmutableBytesWritable row, V cell) @@ -291,12 +291,9 @@ public class HFileOutputFormat2 configureStoragePolicy(conf, fs, tableAndFamily, writerPath); } - if (wl != null && wl.written + length >= maxsize) { - this.rollRequested = true; - } - // This can only happen once a row is finished though - if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { + if (wl != null && wl.written + length >= maxsize + && Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) { rollWriters(wl); } @@ -354,7 +351,7 @@ public class HFileOutputFormat2 wl.written += length; // Copy the row so we know when a row transition. - this.previousRow = rowKey; + this.previousRows.put(family, rowKey); } private void rollWriters(WriterLength writerLength) throws IOException { @@ -365,7 +362,6 @@ public class HFileOutputFormat2 closeWriter(wl); } } - this.rollRequested = false; } private void closeWriter(WriterLength wl) throws IOException { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java index 92600f8687b..d4c3802fb3c 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java @@ -424,7 +424,8 @@ public class TestHFileOutputFormat2 { // Set down this value or we OOME in eclipse. conf.setInt("mapreduce.task.io.sort.mb", 20); // Write a few files. - conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024); + long hregionMaxFilesize = 10 * 1024; + conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize); Job job = new Job(conf, "testWritingPEData"); setupRandomGeneratorMapper(job, false); @@ -451,6 +452,26 @@ public class TestHFileOutputFormat2 { assertTrue(job.waitForCompletion(false)); FileStatus [] files = fs.listStatus(testDir); assertTrue(files.length > 0); + + //check output file num and size. + for (byte[] family : FAMILIES) { + long kvCount= 0; + RemoteIterator iterator = + fs.listFiles(testDir.suffix("/" + new String(family)), true); + while (iterator.hasNext()) { + LocatedFileStatus keyFileStatus = iterator.next(); + HFile.Reader reader = + HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf); + HFileScanner scanner = reader.getScanner(false, false, false); + + kvCount += reader.getEntries(); + scanner.seekTo(); + long perKVSize = scanner.getCell().getSerializedSize(); + assertTrue("Data size of each file should not be too large.", + perKVSize * reader.getEntries() <= hregionMaxFilesize); + } + assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount); + } } /**