HBASE-26680 Close and do not write trailer for the broken WAL writer (#4174)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Xiaolin Ha 2022-03-16 19:58:45 +08:00 committed by GitHub
parent 5b28d76652
commit c7773adeed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 59 additions and 35 deletions

View File

@ -166,38 +166,43 @@ public abstract class AbstractProtobufLogWriter {
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
long blocksize, StreamSlowMonitor monitor) throws IOException, long blocksize, StreamSlowMonitor monitor) throws IOException,
StreamLacksCapabilityException { StreamLacksCapabilityException {
this.conf = conf; try {
boolean doCompress = initializeCompressionContext(conf, path); this.conf = conf;
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); boolean doCompress = initializeCompressionContext(conf, path);
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs); this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
CommonFSUtils.getDefaultReplication(fs, path)); short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
CommonFSUtils.getDefaultReplication(fs, path));
initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor); initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);
boolean doTagCompress = doCompress && boolean doTagCompress =
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
boolean doValueCompress = doCompress && boolean doValueCompress =
conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
WALHeader.Builder headerBuilder = WALHeader.newBuilder() WALHeader.Builder headerBuilder =
.setHasCompression(doCompress) WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)
.setHasTagCompression(doTagCompress) .setHasValueCompression(doValueCompress);
.setHasValueCompression(doValueCompress); if (doValueCompress) {
if (doValueCompress) { headerBuilder.setValueCompressionAlgorithm(
headerBuilder.setValueCompressionAlgorithm( CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
CompressionContext.getValueCompressionAlgorithm(conf).ordinal()); }
} length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf, headerBuilder)));
buildWALHeader(conf, headerBuilder)));
initAfterHeader(doCompress); initAfterHeader(doCompress);
// instantiate trailer to default value. // instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build(); trailer = WALTrailer.newBuilder().build();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" + LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}"
", valueCompression={}", path, doCompress, doTagCompress, doValueCompress); + ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
}
} catch (Exception e) {
LOG.warn("Init output failed, path={}", path, e);
closeOutput();
throw e;
} }
} }
@ -265,6 +270,11 @@ public abstract class AbstractProtobufLogWriter {
short replication, long blockSize, StreamSlowMonitor monitor) short replication, long blockSize, StreamSlowMonitor monitor)
throws IOException, StreamLacksCapabilityException; throws IOException, StreamLacksCapabilityException;
/**
* simply close the output, do not need to write trailer like the Writer.close
*/
protected abstract void closeOutput();
/** /**
* return the file length after written. * return the file length after written.
*/ */

View File

@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
@ -197,6 +196,17 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
this.asyncOutputWrapper = new OutputStreamWrapper(output); this.asyncOutputWrapper = new OutputStreamWrapper(output);
} }
@Override
protected void closeOutput() {
if (this.output != null) {
try {
this.output.close();
} catch (IOException e) {
LOG.warn("Close output failed", e);
}
}
}
private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException { private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
CompletableFuture<Long> future = new CompletableFuture<>(); CompletableFuture<Long> future = new CompletableFuture<>();
action.accept(future); action.accept(future);

View File

@ -131,6 +131,17 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
} }
} }
@Override
protected void closeOutput() {
if (this.output != null) {
try {
this.output.close();
} catch (IOException e) {
LOG.warn("Close output failed", e);
}
}
}
@Override @Override
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
output.write(magic); output.write(magic);

View File

@ -90,13 +90,6 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
} else { } else {
LOG.debug("Error instantiating log writer.", e); LOG.debug("Error instantiating log writer.", e);
} }
if (writer != null) {
try{
writer.close();
} catch(IOException ee){
LOG.error("cannot close log writer", ee);
}
}
throw new IOException("cannot get log writer", e); throw new IOException("cannot get log writer", e);
} }
} }