HBASE-13673 WALProcedureStore procedure is chatty

This commit is contained in:
Srikanth Srungarapu 2015-05-18 19:37:29 -07:00
parent 92e66ef522
commit bc189d0497
1 changed files with 43 additions and 30 deletions

View File

@ -141,7 +141,7 @@ public class WALProcedureStore implements ProcedureStore {
try { try {
syncLoop(); syncLoop();
} catch (IOException e) { } catch (IOException e) {
LOG.error("got an exception from the sync-loop", e); LOG.error("Got an exception from the sync-loop", e);
sendAbortProcessSignal(); sendAbortProcessSignal();
} }
} }
@ -219,21 +219,23 @@ public class WALProcedureStore implements ProcedureStore {
// Create new state-log // Create new state-log
if (!rollWriter(flushLogId)) { if (!rollWriter(flushLogId)) {
// someone else has already created this log if (LOG.isDebugEnabled()) {
LOG.debug("someone else has already created log " + flushLogId); LOG.debug("Someone else has already created log: " + flushLogId);
}
continue; continue;
} }
// We have the lease on the log // We have the lease on the log
oldLogs = getLogFiles(); oldLogs = getLogFiles();
if (getMaxLogId(oldLogs) > flushLogId) { if (getMaxLogId(oldLogs) > flushLogId) {
// Someone else created new logs if (LOG.isDebugEnabled()) {
LOG.debug("someone else created new logs. expected maxLogId < " + flushLogId); LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId);
}
logs.getLast().removeFile(); logs.getLast().removeFile();
continue; continue;
} }
LOG.info("lease acquired flushLogId=" + flushLogId); LOG.info("Lease acquired for flushLogId: " + flushLogId);
break; break;
} }
} }
@ -246,7 +248,9 @@ public class WALProcedureStore implements ProcedureStore {
// Nothing to do, If we have only the current log. // Nothing to do, If we have only the current log.
if (logs.size() == 1) { if (logs.size() == 1) {
LOG.debug("No state logs to replay"); if (LOG.isDebugEnabled()) {
LOG.debug("No state logs to replay.");
}
return null; return null;
} }
@ -282,7 +286,7 @@ public class WALProcedureStore implements ProcedureStore {
@Override @Override
public void insert(final Procedure proc, final Procedure[] subprocs) { public void insert(final Procedure proc, final Procedure[] subprocs) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("insert " + proc + " subproc=" + Arrays.toString(subprocs)); LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs));
} }
ByteSlot slot = acquireSlot(); ByteSlot slot = acquireSlot();
@ -302,7 +306,7 @@ public class WALProcedureStore implements ProcedureStore {
// We are not able to serialize the procedure. // We are not able to serialize the procedure.
// this is a code error, and we are not able to go on. // this is a code error, and we are not able to go on.
LOG.fatal("Unable to serialize one of the procedure: proc=" + proc + LOG.fatal("Unable to serialize one of the procedure: proc=" + proc +
" subprocs=" + Arrays.toString(subprocs), e); ", subprocs=" + Arrays.toString(subprocs), e);
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
releaseSlot(slot); releaseSlot(slot);
@ -317,7 +321,7 @@ public class WALProcedureStore implements ProcedureStore {
@Override @Override
public void update(final Procedure proc) { public void update(final Procedure proc) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("update " + proc); LOG.trace("Update " + proc);
} }
ByteSlot slot = acquireSlot(); ByteSlot slot = acquireSlot();
@ -354,7 +358,7 @@ public class WALProcedureStore implements ProcedureStore {
@Override @Override
public void delete(final long procId) { public void delete(final long procId) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("delete " + procId); LOG.trace("Delete " + procId);
} }
ByteSlot slot = acquireSlot(); ByteSlot slot = acquireSlot();
@ -469,9 +473,9 @@ public class WALProcedureStore implements ProcedureStore {
long syncWaitMs = System.currentTimeMillis() - syncWaitSt; long syncWaitMs = System.currentTimeMillis() - syncWaitSt;
if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) {
float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f; float rollSec = (System.currentTimeMillis() - lastRollTs) / 1000.0f;
LOG.trace("sync wait " + StringUtils.humanTimeDiff(syncWaitMs) + LOG.trace("Sync wait " + StringUtils.humanTimeDiff(syncWaitMs) +
" slotIndex=" + slotIndex + ", slotIndex=" + slotIndex +
" totalSynced=" + StringUtils.humanSize(totalSynced.get()) + ", totalSynced=" + StringUtils.humanSize(totalSynced.get()) +
" " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec"); " " + StringUtils.humanSize(totalSynced.get() / rollSec) + "/sec");
} }
@ -499,7 +503,7 @@ public class WALProcedureStore implements ProcedureStore {
break; break;
} catch (Throwable e) { } catch (Throwable e) {
if (++retry == MAX_RETRIES_BEFORE_ABORT) { if (++retry == MAX_RETRIES_BEFORE_ABORT) {
LOG.error("sync slot failed, abort.", e); LOG.error("Sync slot failed, abort.", e);
sendAbortProcessSignal(); sendAbortProcessSignal();
} }
} }
@ -524,7 +528,7 @@ public class WALProcedureStore implements ProcedureStore {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Sync slots=" + count + '/' + slots.length + LOG.trace("Sync slots=" + count + '/' + slots.length +
" flushed=" + StringUtils.humanSize(totalSynced)); ", flushed=" + StringUtils.humanSize(totalSynced));
} }
return totalSynced; return totalSynced;
} }
@ -581,7 +585,9 @@ public class WALProcedureStore implements ProcedureStore {
} finally { } finally {
lock.unlock(); lock.unlock();
} }
LOG.info("Roll new state log: " + logId); if (LOG.isDebugEnabled()) {
LOG.debug("Roll new state log: " + logId);
}
return true; return true;
} }
@ -603,24 +609,31 @@ public class WALProcedureStore implements ProcedureStore {
} }
private void removeAllLogs(long lastLogId) { private void removeAllLogs(long lastLogId) {
LOG.info("Remove all state logs with ID less then " + lastLogId); if (logs.size() <= 1) {
while (!logs.isEmpty()) { assert logs.size() == 1: "Expected at least one active log to be running.";
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Remove all state logs with ID less than " + lastLogId);
}
do {
ProcedureWALFile log = logs.getFirst(); ProcedureWALFile log = logs.getFirst();
if (lastLogId < log.getLogId()) { if (lastLogId < log.getLogId()) {
break; break;
} }
removeLogFile(log); removeLogFile(log);
} } while(!logs.isEmpty());
} }
private boolean removeLogFile(final ProcedureWALFile log) { private boolean removeLogFile(final ProcedureWALFile log) {
try { try {
LOG.debug("remove log: " + log); if (LOG.isDebugEnabled()) {
LOG.debug("Remove log: " + log);
}
log.removeFile(); log.removeFile();
logs.remove(log); logs.remove(log);
} catch (IOException e) { } catch (IOException e) {
LOG.error("unable to remove log " + log, e); LOG.error("Unable to remove log: " + log, e);
return false; return false;
} }
return true; return true;
@ -666,7 +679,7 @@ public class WALProcedureStore implements ProcedureStore {
} }
}); });
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
LOG.warn("log directory not found: " + e.getMessage()); LOG.warn("Log directory not found: " + e.getMessage());
return null; return null;
} }
} }
@ -723,16 +736,17 @@ public class WALProcedureStore implements ProcedureStore {
private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException { private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException {
ProcedureWALFile log = new ProcedureWALFile(fs, logFile); ProcedureWALFile log = new ProcedureWALFile(fs, logFile);
if (logFile.getLen() == 0) { if (logFile.getLen() == 0) {
LOG.warn("Remove uninitialized log " + logFile); LOG.warn("Remove uninitialized log: " + logFile);
log.removeFile(); log.removeFile();
return null; return null;
} }
if (LOG.isDebugEnabled()) {
LOG.debug("opening state-log: " + logFile); LOG.debug("Opening state-log: " + logFile);
}
try { try {
log.open(); log.open();
} catch (ProcedureWALFormat.InvalidWALDataException e) { } catch (ProcedureWALFormat.InvalidWALDataException e) {
LOG.warn("Remove uninitialized log " + logFile, e); LOG.warn("Remove uninitialized log: " + logFile, e);
log.removeFile(); log.removeFile();
return null; return null;
} catch (IOException e) { } catch (IOException e) {
@ -745,8 +759,7 @@ public class WALProcedureStore implements ProcedureStore {
try { try {
log.readTrailer(); log.readTrailer();
} catch (IOException e) { } catch (IOException e) {
// unfinished compacted log throw it away LOG.warn("Unfinished compacted log: " + logFile, e);
LOG.warn("Unfinished compacted log " + logFile, e);
log.removeFile(); log.removeFile();
return null; return null;
} }