HBASE-25562 ReplicationSourceWALReader log and handle exception immediately without retrying (#2943)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: stack <stack@apache.org> Signed-off-by: shahrs87
This commit is contained in:
parent
627ac016cc
commit
c1a9e8973a
|
@ -150,14 +150,13 @@ class ReplicationSourceWALReader extends Thread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) { // stream related
|
} catch (IOException e) { // stream related
|
||||||
if (sleepMultiplier < maxRetriesMultiplier) {
|
if (!handleEofException(e)) {
|
||||||
LOG.debug("Failed to read stream of replication entries: " + e);
|
LOG.warn("Failed to read stream of replication entries", e);
|
||||||
sleepMultiplier++;
|
if (sleepMultiplier < maxRetriesMultiplier) {
|
||||||
} else {
|
sleepMultiplier ++;
|
||||||
LOG.error("Failed to read stream of replication entries", e);
|
}
|
||||||
handleEofException(e);
|
Threads.sleep(sleepForRetries * sleepMultiplier);
|
||||||
}
|
}
|
||||||
Threads.sleep(sleepForRetries * sleepMultiplier);
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.trace("Interrupted while sleeping between WAL reads");
|
LOG.trace("Interrupted while sleeping between WAL reads");
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
@ -244,10 +243,13 @@ class ReplicationSourceWALReader extends Thread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we get an EOF due to a zero-length log, and there are other logs in queue
|
/**
|
||||||
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
|
* if we get an EOF due to a zero-length log, and there are other logs in queue
|
||||||
// enabled, then dump the log
|
* (highly likely we've closed the current log), and autorecovery is
|
||||||
private void handleEofException(IOException e) {
|
* enabled, then dump the log
|
||||||
|
* @return true only the IOE can be handled
|
||||||
|
*/
|
||||||
|
private boolean handleEofException(IOException e) {
|
||||||
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
|
||||||
// Dump the log even if logQueue size is 1 if the source is from recovered Source
|
// Dump the log even if logQueue size is 1 if the source is from recovered Source
|
||||||
// since we don't add current log to recovered source queue so it is safe to remove.
|
// since we don't add current log to recovered source queue so it is safe to remove.
|
||||||
|
@ -255,14 +257,16 @@ class ReplicationSourceWALReader extends Thread {
|
||||||
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
|
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
|
||||||
try {
|
try {
|
||||||
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
|
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
|
||||||
LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek());
|
LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
|
||||||
logQueue.remove(walGroupId);
|
logQueue.remove(walGroupId);
|
||||||
currentPosition = 0;
|
currentPosition = 0;
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Couldn't get file length information about log " + queue.peek());
|
LOG.warn("Couldn't get file length information about log {}", queue.peek());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Path getCurrentPath() {
|
public Path getCurrentPath() {
|
||||||
|
|
Loading…
Reference in New Issue