HBASE-26680 Close and do not write trailer for the broken WAL writer (#4174)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
2edb9fdf4d
commit
8cce0d3302
|
@ -151,26 +151,32 @@ public abstract class AbstractProtobufLogWriter {
|
|||
|
||||
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
|
||||
long blocksize) throws IOException, StreamLacksCapabilityException {
|
||||
this.conf = conf;
|
||||
boolean doCompress = initializeCompressionContext(conf, path);
|
||||
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
|
||||
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
|
||||
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
|
||||
CommonFSUtils.getDefaultReplication(fs, path));
|
||||
try {
|
||||
this.conf = conf;
|
||||
boolean doCompress = initializeCompressionContext(conf, path);
|
||||
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
|
||||
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
|
||||
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
|
||||
CommonFSUtils.getDefaultReplication(fs, path));
|
||||
|
||||
initOutput(fs, path, overwritable, bufferSize, replication, blocksize);
|
||||
initOutput(fs, path, overwritable, bufferSize, replication, blocksize);
|
||||
|
||||
boolean doTagCompress = doCompress
|
||||
&& conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
|
||||
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf,
|
||||
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));
|
||||
boolean doTagCompress =
|
||||
doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
|
||||
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf,
|
||||
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));
|
||||
|
||||
initAfterHeader(doCompress);
|
||||
initAfterHeader(doCompress);
|
||||
|
||||
// instantiate trailer to default value.
|
||||
trailer = WALTrailer.newBuilder().build();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
|
||||
// instantiate trailer to default value.
|
||||
trailer = WALTrailer.newBuilder().build();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Init output failed, path={}", path, e);
|
||||
closeOutput();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -237,6 +243,11 @@ public abstract class AbstractProtobufLogWriter {
|
|||
protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
||||
short replication, long blockSize) throws IOException, StreamLacksCapabilityException;
|
||||
|
||||
/**
|
||||
* simply close the output, do not need to write trailer like the Writer.close
|
||||
*/
|
||||
protected abstract void closeOutput();
|
||||
|
||||
/**
|
||||
* return the file length after written.
|
||||
*/
|
||||
|
|
|
@ -195,6 +195,17 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
this.asyncOutputWrapper = new OutputStreamWrapper(output);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void closeOutput() {
|
||||
if (this.output != null) {
|
||||
try {
|
||||
this.output.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Close output failed", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
|
||||
CompletableFuture<Long> future = new CompletableFuture<>();
|
||||
action.accept(future);
|
||||
|
|
|
@ -117,6 +117,17 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void closeOutput() {
|
||||
if (this.output != null) {
|
||||
try {
|
||||
this.output.close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Close output failed", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
|
||||
output.write(magic);
|
||||
|
|
|
@ -88,13 +88,6 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
|
|||
} else {
|
||||
LOG.debug("Error instantiating log writer.", e);
|
||||
}
|
||||
if (writer != null) {
|
||||
try{
|
||||
writer.close();
|
||||
} catch(IOException ee){
|
||||
LOG.error("cannot close log writer", ee);
|
||||
}
|
||||
}
|
||||
throw new IOException("cannot get log writer", e);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue