diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index f657202aa83..a589cceba88 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -20,7 +20,9 @@ package org.apache.hadoop.hbase; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -35,6 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -2330,9 +2333,16 @@ public class KeyValue implements Cell, HeapSize, Cloneable { */ public static KeyValue iscreate(final InputStream in) throws IOException { byte [] intBytes = new byte[Bytes.SIZEOF_INT]; - int length = in.read(intBytes); - if (length == 0) return null; - if (length != intBytes.length) throw new IOException("Failed read of int length " + length); + int bytesRead = 0; + while (bytesRead < intBytes.length) { + int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead); + if (n < 0) { + if (bytesRead == 0) return null; // EOF at start is ok + throw new IOException("Failed read of int, read " + bytesRead + " bytes"); + } + bytesRead += n; + } + // TODO: perhaps some sanity check is needed here. byte [] bytes = new byte[Bytes.toInt(intBytes)]; IOUtils.readFully(in, bytes, 0, bytes.length); return new KeyValue(bytes, 0, bytes.length); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java index 3b95c53a85f..e5fc2a51848 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hbase.codec; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; public abstract class BaseDecoder implements Codec.Decoder { + protected static final Log LOG = LogFactory.getLog(BaseDecoder.class); protected final InputStream in; private boolean hasNext = true; private Cell current = null; @@ -34,14 +38,32 @@ public abstract class BaseDecoder implements Codec.Decoder { @Override public boolean advance() throws IOException { if (!this.hasNext) return this.hasNext; - if (this.in.available() <= 0) { + if (this.in.available() == 0) { this.hasNext = false; return this.hasNext; } - this.current = parseCell(); + try { + this.current = parseCell(); + } catch (IOException ioEx) { + rethrowEofException(ioEx); + } return this.hasNext; } + private void rethrowEofException(IOException ioEx) throws IOException { + boolean isEof = false; + try { + isEof = this.in.available() == 0; + } catch (Throwable t) { + LOG.trace("Error getting available for error message - ignoring", t); + } + if (!isEof) throw ioEx; + LOG.error("Partial cell read caused by EOF: " + ioEx); + EOFException eofEx = new EOFException("Partial cell read"); + eofEx.initCause(ioEx); + throw eofEx; + } + /** * @return extract a Cell * @throws IOException @@ -52,4 +74,4 @@ public abstract class BaseDecoder implements Codec.Decoder { public Cell current() { return this.current; } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 12dbb972dcc..213ef7ab041 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -116,44 +116,63 @@ public class ProtobufLogReader extends ReaderBase { @Override protected boolean readNext(HLog.Entry entry) throws IOException { - WALKey.Builder builder = WALKey.newBuilder(); - boolean hasNext = false; - try { - hasNext = builder.mergeDelimitedFrom(inputStream); - } catch (InvalidProtocolBufferException ipbe) { - LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe); - } - if (!hasNext) return false; - if (!builder.isInitialized()) { - // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. - // If we can get the KV count, we could, theoretically, try to get next record. - LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring"); - return false; - } - WALKey walKey = builder.build(); - entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); - try { - int expectedCells = walKey.getFollowingKvCount(); - int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); - if (expectedCells != actualCells) { - throw new EOFException("Unable to read " + expectedCells + " cells, got " + actualCells); + while (true) { + WALKey.Builder builder = WALKey.newBuilder(); + boolean hasNext = false; + try { + hasNext = builder.mergeDelimitedFrom(inputStream); + } catch (InvalidProtocolBufferException ipbe) { + LOG.error("Invalid PB while reading WAL, probably an unexpected EOF, ignoring", ipbe); } - } catch (EOFException ex) { - LOG.error("EOF while reading KVs, ignoring", ex); - return false; - } catch (Exception ex) { - IOException realEofEx = extractHiddenEofOrRethrow(ex); - LOG.error("EOF while reading KVs, ignoring", realEofEx); - return false; + if (!hasNext) return false; + if (!builder.isInitialized()) { + // TODO: not clear if we should try to recover from corrupt PB that looks semi-legit. + // If we can get the KV count, we could, theoretically, try to get next record. + LOG.error("Partial PB while reading WAL, probably an unexpected EOF, ignoring"); + return false; + } + WALKey walKey = builder.build(); + entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); + if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { + LOG.warn("WALKey has no KVs that follow it; trying the next one"); + continue; + } + int expectedCells = walKey.getFollowingKvCount(); + long posBefore = this.inputStream.getPos(); + try { + int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); + if (expectedCells != actualCells) { + throw new EOFException("Only read " + actualCells); // other info added in catch + } + } catch (Exception ex) { + String posAfterStr = ""; + try { + posAfterStr = this.inputStream.getPos() + ""; + } catch (Throwable t) { + LOG.trace("Error getting pos for error message - ignoring", t); + } + String message = " while reading " + expectedCells + " WAL KVs; started reading at " + + posBefore + " and read up to " + posAfterStr; + IOException realEofEx = extractHiddenEof(ex); + if (realEofEx != null) { + LOG.error("EOF " + message, realEofEx); + return false; + } + message = "Error " + message; + LOG.error(message); + throw new IOException(message, ex); + } + return true; } - return true; } - private IOException extractHiddenEofOrRethrow(Exception ex) throws IOException { + private IOException extractHiddenEof(Exception ex) { // There are two problems we are dealing with here. Hadoop stream throws generic exception // for EOF, not EOFException; and scanner further hides it inside RuntimeException. IOException ioEx = null; - if (ex instanceof IOException) { + if (ex instanceof EOFException) { + return (EOFException)ex; + } else if (ex instanceof IOException) { ioEx = (IOException)ex; } else if (ex instanceof RuntimeException && ex.getCause() != null && ex.getCause() instanceof IOException) { @@ -161,9 +180,9 @@ public class ProtobufLogReader extends ReaderBase { } if (ioEx != null) { if (ioEx.getMessage().contains("EOF")) return ioEx; - throw ioEx; + return null; } - throw new IOException(ex); + return null; } @Override