HBASE-8498 PB WAL reading is broken due to some partial reads
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1480511 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c6f4c60ac4
commit
be3568fb5b
|
@ -20,7 +20,9 @@
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -35,6 +37,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.hbase.io.HeapSize;
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
@ -2330,9 +2333,16 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
|
||||||
*/
|
*/
|
||||||
public static KeyValue iscreate(final InputStream in) throws IOException {
|
public static KeyValue iscreate(final InputStream in) throws IOException {
|
||||||
byte [] intBytes = new byte[Bytes.SIZEOF_INT];
|
byte [] intBytes = new byte[Bytes.SIZEOF_INT];
|
||||||
int length = in.read(intBytes);
|
int bytesRead = 0;
|
||||||
if (length == 0) return null;
|
while (bytesRead < intBytes.length) {
|
||||||
if (length != intBytes.length) throw new IOException("Failed read of int length " + length);
|
int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
|
||||||
|
if (n < 0) {
|
||||||
|
if (bytesRead == 0) return null; // EOF at start is ok
|
||||||
|
throw new IOException("Failed read of int, read " + bytesRead + " bytes");
|
||||||
|
}
|
||||||
|
bytesRead += n;
|
||||||
|
}
|
||||||
|
// TODO: perhaps some sanity check is needed here.
|
||||||
byte [] bytes = new byte[Bytes.toInt(intBytes)];
|
byte [] bytes = new byte[Bytes.toInt(intBytes)];
|
||||||
IOUtils.readFully(in, bytes, 0, bytes.length);
|
IOUtils.readFully(in, bytes, 0, bytes.length);
|
||||||
return new KeyValue(bytes, 0, bytes.length);
|
return new KeyValue(bytes, 0, bytes.length);
|
||||||
|
|
|
@ -17,12 +17,16 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.codec;
|
package org.apache.hadoop.hbase.codec;
|
||||||
|
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
|
||||||
public abstract class BaseDecoder implements Codec.Decoder {
|
public abstract class BaseDecoder implements Codec.Decoder {
|
||||||
|
protected static final Log LOG = LogFactory.getLog(BaseDecoder.class);
|
||||||
protected final InputStream in;
|
protected final InputStream in;
|
||||||
private boolean hasNext = true;
|
private boolean hasNext = true;
|
||||||
private Cell current = null;
|
private Cell current = null;
|
||||||
|
@ -34,14 +38,32 @@ public abstract class BaseDecoder implements Codec.Decoder {
|
||||||
@Override
|
@Override
|
||||||
public boolean advance() throws IOException {
|
public boolean advance() throws IOException {
|
||||||
if (!this.hasNext) return this.hasNext;
|
if (!this.hasNext) return this.hasNext;
|
||||||
if (this.in.available() <= 0) {
|
if (this.in.available() == 0) {
|
||||||
this.hasNext = false;
|
this.hasNext = false;
|
||||||
return this.hasNext;
|
return this.hasNext;
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
this.current = parseCell();
|
this.current = parseCell();
|
||||||
|
} catch (IOException ioEx) {
|
||||||
|
rethrowEofException(ioEx);
|
||||||
|
}
|
||||||
return this.hasNext;
|
return this.hasNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void rethrowEofException(IOException ioEx) throws IOException {
|
||||||
|
boolean isEof = false;
|
||||||
|
try {
|
||||||
|
isEof = this.in.available() == 0;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.trace("Error getting available for error message - ignoring", t);
|
||||||
|
}
|
||||||
|
if (!isEof) throw ioEx;
|
||||||
|
LOG.error("Partial cell read caused by EOF: " + ioEx);
|
||||||
|
EOFException eofEx = new EOFException("Partial cell read");
|
||||||
|
eofEx.initCause(ioEx);
|
||||||
|
throw eofEx;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return extract a Cell
|
* @return extract a Cell
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
|
|
@ -116,6 +116,7 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean readNext(HLog.Entry entry) throws IOException {
|
protected boolean readNext(HLog.Entry entry) throws IOException {
|
||||||
|
while (true) {
|
||||||
WALKey.Builder builder = WALKey.newBuilder();
|
WALKey.Builder builder = WALKey.newBuilder();
|
||||||
boolean hasNext = false;
|
boolean hasNext = false;
|
||||||
try {
|
try {
|
||||||
|
@ -132,28 +133,46 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
}
|
}
|
||||||
WALKey walKey = builder.build();
|
WALKey walKey = builder.build();
|
||||||
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
|
||||||
try {
|
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
|
||||||
|
LOG.warn("WALKey has no KVs that follow it; trying the next one");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
int expectedCells = walKey.getFollowingKvCount();
|
int expectedCells = walKey.getFollowingKvCount();
|
||||||
|
long posBefore = this.inputStream.getPos();
|
||||||
|
try {
|
||||||
int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
|
int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
|
||||||
if (expectedCells != actualCells) {
|
if (expectedCells != actualCells) {
|
||||||
throw new EOFException("Unable to read " + expectedCells + " cells, got " + actualCells);
|
throw new EOFException("Only read " + actualCells); // other info added in catch
|
||||||
}
|
}
|
||||||
} catch (EOFException ex) {
|
|
||||||
LOG.error("EOF while reading KVs, ignoring", ex);
|
|
||||||
return false;
|
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
IOException realEofEx = extractHiddenEofOrRethrow(ex);
|
String posAfterStr = "<unknown>";
|
||||||
LOG.error("EOF while reading KVs, ignoring", realEofEx);
|
try {
|
||||||
|
posAfterStr = this.inputStream.getPos() + "";
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.trace("Error getting pos for error message - ignoring", t);
|
||||||
|
}
|
||||||
|
String message = " while reading " + expectedCells + " WAL KVs; started reading at "
|
||||||
|
+ posBefore + " and read up to " + posAfterStr;
|
||||||
|
IOException realEofEx = extractHiddenEof(ex);
|
||||||
|
if (realEofEx != null) {
|
||||||
|
LOG.error("EOF " + message, realEofEx);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
message = "Error " + message;
|
||||||
|
LOG.error(message);
|
||||||
|
throw new IOException(message, ex);
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private IOException extractHiddenEofOrRethrow(Exception ex) throws IOException {
|
private IOException extractHiddenEof(Exception ex) {
|
||||||
// There are two problems we are dealing with here. Hadoop stream throws generic exception
|
// There are two problems we are dealing with here. Hadoop stream throws generic exception
|
||||||
// for EOF, not EOFException; and scanner further hides it inside RuntimeException.
|
// for EOF, not EOFException; and scanner further hides it inside RuntimeException.
|
||||||
IOException ioEx = null;
|
IOException ioEx = null;
|
||||||
if (ex instanceof IOException) {
|
if (ex instanceof EOFException) {
|
||||||
|
return (EOFException)ex;
|
||||||
|
} else if (ex instanceof IOException) {
|
||||||
ioEx = (IOException)ex;
|
ioEx = (IOException)ex;
|
||||||
} else if (ex instanceof RuntimeException
|
} else if (ex instanceof RuntimeException
|
||||||
&& ex.getCause() != null && ex.getCause() instanceof IOException) {
|
&& ex.getCause() != null && ex.getCause() instanceof IOException) {
|
||||||
|
@ -161,9 +180,9 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
}
|
}
|
||||||
if (ioEx != null) {
|
if (ioEx != null) {
|
||||||
if (ioEx.getMessage().contains("EOF")) return ioEx;
|
if (ioEx.getMessage().contains("EOF")) return ioEx;
|
||||||
throw ioEx;
|
return null;
|
||||||
}
|
}
|
||||||
throw new IOException(ex);
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue