HBASE-16062 Improper error handling in WAL Reader/Writer creation (Vladimir Rodionov)

This commit is contained in:
tedyu 2016-06-20 07:34:26 -07:00
parent 76419df21d
commit 095a82584e
2 changed files with 38 additions and 21 deletions

View File

@ -55,12 +55,20 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
// Configuration already does caching for the Class lookup. // Configuration already does caching for the Class lookup.
Class<? extends Writer> logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl", Class<? extends Writer> logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
ProtobufLogWriter.class, Writer.class); ProtobufLogWriter.class, Writer.class);
Writer writer = null;
try { try {
Writer writer = logWriterClass.newInstance(); writer = logWriterClass.newInstance();
writer.init(fs, path, conf, overwritable); writer.init(fs, path, conf, overwritable);
return writer; return writer;
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Error instantiating log writer.", e); LOG.debug("Error instantiating log writer.", e);
if (writer != null) {
try{
writer.close();
} catch(IOException ee){
LOG.error("cannot close log writer", ee);
}
}
throw new IOException("cannot get log writer", e); throw new IOException("cannot get log writer", e);
} }
} }

View File

@ -277,9 +277,8 @@ public class WALFactory {
return createReader(fs, path, reporter, true); return createReader(fs, path, reporter, true);
} }
public Reader createReader(final FileSystem fs, final Path path, public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter,
CancelableProgressable reporter, boolean allowCustom) boolean allowCustom) throws IOException {
throws IOException {
Class<? extends AbstractFSWALProvider.Reader> lrClass = Class<? extends AbstractFSWALProvider.Reader> lrClass =
allowCustom ? logReaderClass : ProtobufLogReader.class; allowCustom ? logReaderClass : ProtobufLogReader.class;
@ -291,11 +290,12 @@ public class WALFactory {
long openTimeout = timeoutMillis + startWaiting; long openTimeout = timeoutMillis + startWaiting;
int nbAttempt = 0; int nbAttempt = 0;
FSDataInputStream stream = null; FSDataInputStream stream = null;
AbstractFSWALProvider.Reader reader = null;
while (true) { while (true) {
try { try {
if (lrClass != ProtobufLogReader.class) { if (lrClass != ProtobufLogReader.class) {
// User is overriding the WAL reader, let them. // User is overriding the WAL reader, let them.
AbstractFSWALProvider.Reader reader = lrClass.newInstance(); reader = lrClass.newInstance();
reader.init(fs, path, conf, null); reader.init(fs, path, conf, null);
return reader; return reader;
} else { } else {
@ -305,26 +305,36 @@ public class WALFactory {
// rid of the old reader entirely, we need to handle 0-size files differently from // rid of the old reader entirely, we need to handle 0-size files differently from
// merely non-PB files. // merely non-PB files.
byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length]; byte[] magic = new byte[ProtobufLogReader.PB_WAL_MAGIC.length];
boolean isPbWal = (stream.read(magic) == magic.length) boolean isPbWal =
(stream.read(magic) == magic.length)
&& Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC); && Arrays.equals(magic, ProtobufLogReader.PB_WAL_MAGIC);
AbstractFSWALProvider.Reader reader = reader = isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
isPbWal ? new ProtobufLogReader() : new SequenceFileLogReader();
reader.init(fs, path, conf, stream); reader.init(fs, path, conf, stream);
return reader; return reader;
} }
} catch (IOException e) { } catch (IOException e) {
try {
if (stream != null) { if (stream != null) {
try {
stream.close(); stream.close();
} catch (IOException exception) {
LOG.warn("Could not close AbstractFSWALProvider.Reader" + exception.getMessage());
LOG.debug("exception details", exception);
} }
}
if (reader != null) {
try {
reader.close();
} catch (IOException exception) { } catch (IOException exception) {
LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); LOG.warn("Could not close FSDataInputStream" + exception.getMessage());
LOG.debug("exception details", exception); LOG.debug("exception details", exception);
} }
}
String msg = e.getMessage(); String msg = e.getMessage();
if (msg != null && (msg.contains("Cannot obtain block length") if (msg != null
|| msg.contains("Could not obtain the last block") && (msg.contains("Cannot obtain block length")
|| msg.matches("Blocklist for [^ ]* has changed.*"))) { || msg.contains("Could not obtain the last block") || msg
.matches("Blocklist for [^ ]* has changed.*"))) {
if (++nbAttempt == 1) { if (++nbAttempt == 1) {
LOG.warn("Lease should have recovered. This is not expected. Will retry", e); LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
} }
@ -333,8 +343,7 @@ public class WALFactory {
} }
if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
LOG.error("Can't open after " + nbAttempt + " attempts and " LOG.error("Can't open after " + nbAttempt + " attempts and "
+ (EnvironmentEdgeManager.currentTime() - startWaiting) + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
+ "ms " + " for " + path);
} else { } else {
try { try {
Thread.sleep(nbAttempt < 3 ? 500 : 1000); Thread.sleep(nbAttempt < 3 ? 500 : 1000);