HBASE-5878 Use getVisibleLength public api from HdfsDataInputStream from Hadoop-2.

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Ashish Singhi 2015-08-10 16:20:07 -07:00 committed by Andrew Purtell
parent e4106b4c4a
commit 6e8cdec242
1 changed files with 14 additions and 22 deletions

View File

@ -19,21 +19,20 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.FilterInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text;
@ -111,31 +110,24 @@ public class SequenceFileLogReader extends ReaderBase {
if (this.firstGetPosInvocation) {
this.firstGetPosInvocation = false;
long adjust = 0;
HdfsDataInputStream hdfsDataInputStream = null;
try {
Field fIn = FilterInputStream.class.getDeclaredField("in");
fIn.setAccessible(true);
Object realIn = fIn.get(this.in);
// In hadoop 0.22, DFSInputStream is a standalone class. Before this,
// it was an inner class of DFSClient.
if (realIn.getClass().getName().endsWith("DFSInputStream")) {
Method getFileLength = realIn.getClass().
getDeclaredMethod("getFileLength", new Class<?> []{});
getFileLength.setAccessible(true);
long realLength = ((Long)getFileLength.
invoke(realIn, new Object []{})).longValue();
if (this.in.getClass().getName().endsWith("HdfsDataInputStream")
|| this.in.getClass().getName().endsWith("DFSInputStream")) {
hdfsDataInputStream = (HdfsDataInputStream) this.getWrappedStream();
long realLength = hdfsDataInputStream.getVisibleLength();
assert(realLength >= this.length);
adjust = realLength - this.length;
} else {
LOG.info("Input stream class: " + realIn.getClass().getName() +
", not adjusting length");
LOG.info(
"Input stream class: " + this.in.getClass().getName() + ", not adjusting length");
}
} catch (Exception e) {
SequenceFileLogReader.LOG.warn(
"Error while trying to get accurate file length. " +
"Truncation / data loss may occur if RegionServers die.", e);
LOG.warn("Error while trying to get accurate file length. "
+ "Truncation / data loss may occur if RegionServers die.",
e);
throw new IOException(e);
}
return adjust + super.getPos();
}
return super.getPos();