HBASE-14501 NPE in replication with TDE

Conflicts:
	hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
	hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
This commit is contained in:
Enis Soztutar 2015-10-12 20:37:34 -07:00
parent 5dba2c71f7
commit bbbb52cfbf
6 changed files with 42 additions and 22 deletions

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.util.Bytes.len;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -2497,8 +2498,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* Create a KeyValue reading from the raw InputStream.
* Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}
* @param in
* @return Created KeyValue OR if we find a length of zero, we will return null which
* can be useful marking a stream as done.
* @return Created KeyValue or throws an exception
* @throws IOException
* @deprecated Use {@link KeyValueUtil#iscreate(InputStream, boolean)}
*/

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase;
import java.io.DataInput;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -187,7 +188,7 @@ public class KeyValueUtil {
* position to the start of the next KeyValue. Does not allocate a new array or copy data.
* @param bb
* @param includesMvccVersion
* @param includesTags
* @param includesTags
*/
public static KeyValue nextShallowCopy(final ByteBuffer bb, final boolean includesMvccVersion,
boolean includesTags) {
@ -251,7 +252,7 @@ public class KeyValueUtil {
return createFirstOnRow(CellUtil.cloneRow(in), CellUtil.cloneFamily(in),
CellUtil.cloneQualifier(in), in.getTimestamp() - 1);
}
/**
* Create a KeyValue for the specified row, family and qualifier that would be
@ -546,6 +547,7 @@ public class KeyValueUtil {
@Deprecated
public static List<KeyValue> ensureKeyValues(List<Cell> cells) {
List<KeyValue> lazyList = Lists.transform(cells, new Function<Cell, KeyValue>() {
@Override
public KeyValue apply(Cell arg0) {
return KeyValueUtil.ensureKeyValue(arg0);
}
@ -571,8 +573,9 @@ public class KeyValueUtil {
while (bytesRead < intBytes.length) {
int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
if (n < 0) {
if (bytesRead == 0)
return null; // EOF at start is ok
if (bytesRead == 0) {
throw new EOFException();
}
throw new IOException("Failed read of int, read " + bytesRead + " bytes");
}
bytesRead += n;

View File

@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.codec;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import javax.annotation.Nonnull;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -32,27 +35,41 @@ import org.apache.hadoop.hbase.Cell;
@InterfaceAudience.Private
public abstract class BaseDecoder implements Codec.Decoder {
protected static final Log LOG = LogFactory.getLog(BaseDecoder.class);
protected final InputStream in;
private boolean hasNext = true;
protected final PBIS in;
private Cell current = null;
protected static class PBIS extends PushbackInputStream {
public PBIS(InputStream in, int size) {
super(in, size);
}
public void resetBuf(int size) {
this.buf = new byte[size];
this.pos = size;
}
}
public BaseDecoder(final InputStream in) {
this.in = in;
this.in = new PBIS(in, 1);
}
@Override
public boolean advance() throws IOException {
if (!this.hasNext) return this.hasNext;
if (this.in.available() == 0) {
this.hasNext = false;
return this.hasNext;
int firstByte = in.read();
if (firstByte == -1) {
return false;
} else {
in.unread(firstByte);
}
try {
this.current = parseCell();
} catch (IOException ioEx) {
in.resetBuf(1); // reset the buffer in case the underlying stream is read from upper layers
rethrowEofException(ioEx);
}
return this.hasNext;
return true;
}
private void rethrowEofException(IOException ioEx) throws IOException {
@ -72,9 +89,12 @@ public abstract class BaseDecoder implements Codec.Decoder {
}
/**
* @return extract a Cell
* Extract a Cell.
* @return a parsed Cell or throws an Exception. EOFException or a generic IOException maybe
* thrown if EOF is reached prematurely. Does not return null.
* @throws IOException
*/
@Nonnull
protected abstract Cell parseCell() throws IOException;
@Override

View File

@ -78,6 +78,7 @@ public class CellCodec implements Codec {
super(in);
}
@Override
protected Cell parseCell() throws IOException {
byte [] row = readByteArray(this.in);
byte [] family = readByteArray(in);

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -84,12 +83,8 @@ public class SecureWALCellCodec extends WALCellCodec {
return super.parseCell();
}
int ivLength = 0;
try {
ivLength = StreamUtils.readRawVarint32(in);
} catch (EOFException e) {
// EOF at start is OK
return null;
}
ivLength = StreamUtils.readRawVarint32(in);
// TODO: An IV length of 0 could signify an unwrapped cell, when the
// encoder supports that just read the remainder in directly

View File

@ -858,6 +858,7 @@ public class ReplicationSource extends Thread
if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
distinctRowKeys++;
}
lastCell = cells.get(i);
}
return distinctRowKeys;
}