HBASE-22887 Fix HFileOutputFormat2 writer roll (#554)
Signed-off-by: langdamao <lang--lang--lang@163.com>
This commit is contained in:
parent
fff0f33c5a
commit
f40c745499
|
@ -247,9 +247,9 @@ public class HFileOutputFormat2
|
||||||
// Map of families to writers and how much has been output on the writer.
|
// Map of families to writers and how much has been output on the writer.
|
||||||
private final Map<byte[], WriterLength> writers =
|
private final Map<byte[], WriterLength> writers =
|
||||||
new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
|
private final Map<byte[], byte[]> previousRows =
|
||||||
|
new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
private final long now = EnvironmentEdgeManager.currentTime();
|
private final long now = EnvironmentEdgeManager.currentTime();
|
||||||
private boolean rollRequested = false;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(ImmutableBytesWritable row, V cell)
|
public void write(ImmutableBytesWritable row, V cell)
|
||||||
|
@ -291,12 +291,9 @@ public class HFileOutputFormat2
|
||||||
configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
|
configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (wl != null && wl.written + length >= maxsize) {
|
|
||||||
this.rollRequested = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// This can only happen once a row is finished though
|
// This can only happen once a row is finished though
|
||||||
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
|
if (wl != null && wl.written + length >= maxsize
|
||||||
|
&& Bytes.compareTo(this.previousRows.get(family), rowKey) != 0) {
|
||||||
rollWriters(wl);
|
rollWriters(wl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -354,7 +351,7 @@ public class HFileOutputFormat2
|
||||||
wl.written += length;
|
wl.written += length;
|
||||||
|
|
||||||
// Copy the row so we know when a row transition.
|
// Copy the row so we know when a row transition.
|
||||||
this.previousRow = rowKey;
|
this.previousRows.put(family, rowKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void rollWriters(WriterLength writerLength) throws IOException {
|
private void rollWriters(WriterLength writerLength) throws IOException {
|
||||||
|
@ -365,7 +362,6 @@ public class HFileOutputFormat2
|
||||||
closeWriter(wl);
|
closeWriter(wl);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.rollRequested = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void closeWriter(WriterLength wl) throws IOException {
|
private void closeWriter(WriterLength wl) throws IOException {
|
||||||
|
|
|
@ -424,7 +424,8 @@ public class TestHFileOutputFormat2 {
|
||||||
// Set down this value or we OOME in eclipse.
|
// Set down this value or we OOME in eclipse.
|
||||||
conf.setInt("mapreduce.task.io.sort.mb", 20);
|
conf.setInt("mapreduce.task.io.sort.mb", 20);
|
||||||
// Write a few files.
|
// Write a few files.
|
||||||
conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
|
long hregionMaxFilesize = 10 * 1024;
|
||||||
|
conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
|
||||||
|
|
||||||
Job job = new Job(conf, "testWritingPEData");
|
Job job = new Job(conf, "testWritingPEData");
|
||||||
setupRandomGeneratorMapper(job, false);
|
setupRandomGeneratorMapper(job, false);
|
||||||
|
@ -451,6 +452,26 @@ public class TestHFileOutputFormat2 {
|
||||||
assertTrue(job.waitForCompletion(false));
|
assertTrue(job.waitForCompletion(false));
|
||||||
FileStatus [] files = fs.listStatus(testDir);
|
FileStatus [] files = fs.listStatus(testDir);
|
||||||
assertTrue(files.length > 0);
|
assertTrue(files.length > 0);
|
||||||
|
|
||||||
|
//check output file num and size.
|
||||||
|
for (byte[] family : FAMILIES) {
|
||||||
|
long kvCount= 0;
|
||||||
|
RemoteIterator<LocatedFileStatus> iterator =
|
||||||
|
fs.listFiles(testDir.suffix("/" + new String(family)), true);
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
LocatedFileStatus keyFileStatus = iterator.next();
|
||||||
|
HFile.Reader reader =
|
||||||
|
HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
|
||||||
|
HFileScanner scanner = reader.getScanner(false, false, false);
|
||||||
|
|
||||||
|
kvCount += reader.getEntries();
|
||||||
|
scanner.seekTo();
|
||||||
|
long perKVSize = scanner.getCell().getSerializedSize();
|
||||||
|
assertTrue("Data size of each file should not be too large.",
|
||||||
|
perKVSize * reader.getEntries() <= hregionMaxFilesize);
|
||||||
|
}
|
||||||
|
assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue