HBASE-6847 HBASE-6649 broke replication (Devaraj Das via JD)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1388161 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2012-09-20 18:59:37 +00:00
parent 55b87a3c96
commit f05e7d917a
1 changed files with 15 additions and 8 deletions

View File

@ -342,10 +342,6 @@ public class ReplicationSource extends Thread
} }
} finally { } finally {
try { try {
// if current path is null, it means we processEndOfFile hence
if (this.currentPath != null && !gotIOE) {
this.position = this.reader.getPosition();
}
if (this.reader != null) { if (this.reader != null) {
this.reader.close(); this.reader.close();
} }
@ -396,7 +392,8 @@ public class ReplicationSource extends Thread
if (this.position != 0) { if (this.position != 0) {
this.reader.seek(this.position); this.reader.seek(this.position);
} }
HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]); long startPosition = this.position;
HLog.Entry entry = readNextAndSetPosition();
while (entry != null) { while (entry != null) {
WALEdit edit = entry.getEdit(); WALEdit edit = entry.getEdit();
this.metrics.incrLogEditsRead(); this.metrics.incrLogEditsRead();
@ -425,13 +422,13 @@ public class ReplicationSource extends Thread
} }
} }
// Stop if too many entries or too big // Stop if too many entries or too big
if ((this.reader.getPosition() - this.position) if ((this.reader.getPosition() - startPosition)
>= this.replicationQueueSizeCapacity || >= this.replicationQueueSizeCapacity ||
currentNbEntries >= this.replicationQueueNbCapacity) { currentNbEntries >= this.replicationQueueNbCapacity) {
break; break;
} }
try { try {
entry = this.reader.next(entriesArray[currentNbEntries]); entry = readNextAndSetPosition();
} catch (IOException ie) { } catch (IOException ie) {
LOG.debug("Break on IOE: " + ie.getMessage()); LOG.debug("Break on IOE: " + ie.getMessage());
break; break;
@ -439,12 +436,22 @@ public class ReplicationSource extends Thread
} }
LOG.debug("currentNbOperations:" + currentNbOperations + LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries + " and seenEntries:" + seenEntries +
" and size: " + (this.reader.getPosition() - this.position)); " and size: " + (this.reader.getPosition() - startPosition));
// If we didn't get anything and the queue has an object, it means we // If we didn't get anything and the queue has an object, it means we
// hit the end of the file for sure // hit the end of the file for sure
return seenEntries == 0 && processEndOfFile(); return seenEntries == 0 && processEndOfFile();
} }
private HLog.Entry readNextAndSetPosition() throws IOException {
HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]);
// Store the position so that in the future the reader can start
// reading from here. If the above call to next() throws an
// exception, the position won't be changed and retry will happen
// from the last known good position
this.position = this.reader.getPosition();
return entry;
}
private void connectToPeers() { private void connectToPeers() {
// Connect to peer cluster first, unless we have to stop // Connect to peer cluster first, unless we have to stop
while (this.isActive() && this.currentPeers.size() == 0) { while (this.isActive() && this.currentPeers.size() == 0) {