diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index ae1088ace17..7e5a038cbab 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -421,9 +421,10 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Create new state-log - if (!rollWriter(flushLogId + 1)) { + long newFlushLogId = flushLogId + 1; + if (!rollWriter(newFlushLogId)) { // someone else has already created this log - LOG.debug("Someone else has already created log {}. Retrying.", flushLogId); + LOG.debug("Someone else has already created log {}. Retrying.", newFlushLogId); continue; } @@ -1042,8 +1043,9 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Create new state-log - if (!rollWriter(flushLogId + 1)) { - LOG.warn("someone else has already created log {}", flushLogId); + long newFlushLogId = flushLogId + 1; + if (!rollWriter(newFlushLogId)) { + LOG.warn("someone else has already created log {}", newFlushLogId); return false; } @@ -1100,7 +1102,8 @@ public class WALProcedureStore extends ProcedureStoreBase { startPos = newStream.getPos(); } catch (IOException ioe) { LOG.warn("Encountered exception writing header", ioe); - newStream.close(); + // Close and delete the incomplete file + closeAndDeleteIncompleteFile(newStream, newLogFile); return false; } @@ -1165,6 +1168,29 @@ public class WALProcedureStore extends ProcedureStoreBase { stream = null; } + private void closeAndDeleteIncompleteFile(FSDataOutputStream newStream, Path newLogFile) { + // Close the FS + try { + newStream.close(); + } catch (IOException e) { + LOG.error("Exception occured while closing the file {}", newLogFile, e); + } + + // Delete the incomplete file + try { + if (!fs.delete(newLogFile, false)) { + LOG.warn( + "Failed to delete the log file {}, increasing the log id by 1 for the next roll attempt", + newLogFile); + flushLogId++; + } + } catch (IOException e) { + LOG.warn("Exception occured while deleting the file {}", newLogFile, e); + flushLogId++; + LOG.info("Increased the log id to {}", flushLogId); + } + } + // ========================================================================== // Log Files cleaner helpers // ==========================================================================