HBASE-19432 Roll the specified writer in HFileOutputFormat2
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
78a6e0532e
commit
142e6bb9de
|
@ -242,7 +242,7 @@ public class HFileOutputFormat2
|
||||||
Cell kv = cell;
|
Cell kv = cell;
|
||||||
// null input == user explicitly wants to flush
|
// null input == user explicitly wants to flush
|
||||||
if (row == null && kv == null) {
|
if (row == null && kv == null) {
|
||||||
rollWriters();
|
rollWriters(null);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -276,15 +276,13 @@ public class HFileOutputFormat2
|
||||||
configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
|
configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If any of the HFiles for the column families has reached
|
|
||||||
// maxsize, we need to roll all the writers
|
|
||||||
if (wl != null && wl.written + length >= maxsize) {
|
if (wl != null && wl.written + length >= maxsize) {
|
||||||
this.rollRequested = true;
|
this.rollRequested = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// This can only happen once a row is finished though
|
// This can only happen once a row is finished though
|
||||||
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
|
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
|
||||||
rollWriters();
|
rollWriters(wl);
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a new WAL writer, if necessary
|
// create a new WAL writer, if necessary
|
||||||
|
@ -345,19 +343,27 @@ public class HFileOutputFormat2
|
||||||
this.previousRow = rowKey;
|
this.previousRow = rowKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void rollWriters() throws IOException {
|
private void rollWriters(WriterLength writerLength) throws IOException {
|
||||||
for (WriterLength wl : this.writers.values()) {
|
if (writerLength != null) {
|
||||||
if (wl.writer != null) {
|
closeWriter(writerLength);
|
||||||
LOG.info(
|
} else {
|
||||||
"Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
|
for (WriterLength wl : this.writers.values()) {
|
||||||
close(wl.writer);
|
closeWriter(wl);
|
||||||
}
|
}
|
||||||
wl.writer = null;
|
|
||||||
wl.written = 0;
|
|
||||||
}
|
}
|
||||||
this.rollRequested = false;
|
this.rollRequested = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void closeWriter(WriterLength wl) throws IOException {
|
||||||
|
if (wl.writer != null) {
|
||||||
|
LOG.info(
|
||||||
|
"Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
|
||||||
|
close(wl.writer);
|
||||||
|
}
|
||||||
|
wl.writer = null;
|
||||||
|
wl.written = 0;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Create a new StoreFile.Writer.
|
* Create a new StoreFile.Writer.
|
||||||
* @param family
|
* @param family
|
||||||
|
|
Loading…
Reference in New Issue