HBASE-25562 ReplicationSourceWALReader log and handle exception immediately without retrying (#2943)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Signed-off-by: stack <stack@apache.org>
Signed-off-by: shahrs87
This commit is contained in:
XinSun 2021-02-20 10:20:54 +08:00 committed by GitHub
parent 88057d8ab6
commit ed90a14995
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 17 additions and 13 deletions

View File

@ -150,14 +150,13 @@ class ReplicationSourceWALReader extends Thread {
} }
} }
} catch (IOException e) { // stream related } catch (IOException e) { // stream related
if (sleepMultiplier < maxRetriesMultiplier) { if (!handleEofException(e)) {
LOG.debug("Failed to read stream of replication entries: " + e); LOG.warn("Failed to read stream of replication entries", e);
sleepMultiplier++; if (sleepMultiplier < maxRetriesMultiplier) {
} else { sleepMultiplier ++;
LOG.error("Failed to read stream of replication entries", e); }
handleEofException(e); Threads.sleep(sleepForRetries * sleepMultiplier);
} }
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads"); LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -245,10 +244,13 @@ class ReplicationSourceWALReader extends Thread {
} }
} }
// if we get an EOF due to a zero-length log, and there are other logs in queue /**
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is * if we get an EOF due to a zero-length log, and there are other logs in queue
// enabled, then dump the log * (highly likely we've closed the current log), and autorecovery is
private void handleEofException(IOException e) { * enabled, then dump the log
* @return true only the IOE can be handled
*/
private boolean handleEofException(IOException e) {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source // Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove. // since we don't add current log to recovered source queue so it is safe to remove.
@ -256,14 +258,16 @@ class ReplicationSourceWALReader extends Thread {
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) { (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
try { try {
if (fs.getFileStatus(queue.peek()).getLen() == 0) { if (fs.getFileStatus(queue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek()); LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
logQueue.remove(walGroupId); logQueue.remove(walGroupId);
currentPosition = 0; currentPosition = 0;
return true;
} }
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " + queue.peek()); LOG.warn("Couldn't get file length information about log {}", queue.peek());
} }
} }
return false;
} }
public Path getCurrentPath() { public Path getCurrentPath() {