HBASE-8096 [replication] NPE while replicating a log that is acquiring a new block from HDFS

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1467662 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-04-13 17:07:14 +00:00
parent 64863bb03e
commit 6c1e484d36
3 changed files with 14 additions and 5 deletions

View File

@ -68,7 +68,11 @@ public class ReplicationHLogReaderManager {
this.reader = HLogFactory.createReader(this.fs, path, this.conf); this.reader = HLogFactory.createReader(this.fs, path, this.conf);
this.lastPath = path; this.lastPath = path;
} else { } else {
try {
this.reader.reset(); this.reader.reset();
} catch (NullPointerException npe) {
throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
}
} }
return this.reader; return this.reader;
} }

View File

@ -622,9 +622,14 @@ public class ReplicationSource extends Thread
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn(peerClusterZnode + " Got: ", ioe); LOG.warn(peerClusterZnode + " Got: ", ioe);
this.reader = null; this.reader = null;
// TODO Need a better way to determinate if a file is really gone but if (ioe.getCause() instanceof NullPointerException) {
// Workaround for race condition in HDFS-4380
// which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed HLogs for compressionContext.
LOG.warn("Got NPE opening reader, will retry.");
} else if (sleepMultiplier == this.maxRetriesMultiplier) {
// TODO Need a better way to determine if a file is really gone but
// TODO without scanning all logs dir // TODO without scanning all logs dir
if (sleepMultiplier == this.maxRetriesMultiplier) {
LOG.warn("Waited too long for this file, considering dumping"); LOG.warn("Waited too long for this file, considering dumping");
return !processEndOfFile(); return !processEndOfFile();
} }

View File

@ -428,7 +428,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH); Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
scanner.close(); scanner.close();
assertEquals(NB_ROWS_IN_BATCH *10, res.length); assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
scan = new Scan(); scan = new Scan();