HBASE-20806 Split style journal for flushes and compactions

This commit is contained in:
Abhishek Singh Chouhan 2018-07-06 12:02:26 +05:30
parent 97c3984aa3
commit 59867eeeeb
4 changed files with 135 additions and 1 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.monitoring;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.yetus.audience.InterfaceAudience;
@ -32,6 +33,12 @@ public interface MonitoredTask extends Cloneable {
ABORTED
}
public interface StatusJournalEntry {
String getStatus();
long getTimeStamp();
}
long getStartTime();
String getDescription();
String getStatus();
@ -51,6 +58,19 @@ public interface MonitoredTask extends Cloneable {
void setDescription(String description);
void setWarnTime(final long t);
List<StatusJournalEntry> getStatusJournal();
/**
* Enable journal that will store all statuses that have been set along with the time stamps when
* they were set.
* @param includeCurrentStatus whether to include the current set status in the journal
*/
void enableStatusJournal(boolean includeCurrentStatus);
void disableStatusJournal();
String prettyPrintJournal();
/**
* Explicitly mark this status as able to be cleaned up,
* even though it might not be complete.

View File

@ -19,10 +19,15 @@
package org.apache.hadoop.hbase.monitoring;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@InterfaceAudience.Private
@ -37,6 +42,9 @@ class MonitoredTaskImpl implements MonitoredTask {
protected volatile State state = State.RUNNING;
private boolean journalEnabled = false;
private List<StatusJournalEntry> journal;
private static final ObjectMapper MAPPER = new ObjectMapper();
public MonitoredTaskImpl() {
@ -46,6 +54,35 @@ class MonitoredTaskImpl implements MonitoredTask {
warnTime = startTime;
}
private static class StatusJournalEntryImpl implements StatusJournalEntry {
private long statusTime;
private String status;
public StatusJournalEntryImpl(String status, long statusTime) {
this.status = status;
this.statusTime = statusTime;
}
@Override
public String getStatus() {
return status;
}
@Override
public long getTimeStamp() {
return statusTime;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(status);
sb.append(" at ");
sb.append(statusTime);
return sb.toString();
}
}
@Override
public synchronized MonitoredTaskImpl clone() {
try {
@ -126,6 +163,9 @@ class MonitoredTaskImpl implements MonitoredTask {
public void setStatus(String status) {
this.status = status;
statusTime = System.currentTimeMillis();
if (journalEnabled) {
journal.add(new StatusJournalEntryImpl(this.status, statusTime));
}
}
protected void setState(State state) {
@ -191,4 +231,47 @@ class MonitoredTaskImpl implements MonitoredTask {
return sb.toString();
}
/**
* Returns the status journal. This implementation of status journal is not thread-safe. Currently
* we use this to track various stages of flushes and compactions where we can use this/pretty
* print for post task analysis, by which time we are already done changing states (writing to
* journal)
*/
@Override
public List<StatusJournalEntry> getStatusJournal() {
if (journal == null) {
return Collections.emptyList();
} else {
return Collections.unmodifiableList(journal);
}
}
/**
* Enables journaling of this monitored task, the first invocation will lazily initialize the
* journal. The journal implementation itself and this method are not thread safe
*/
@Override
public void enableStatusJournal(boolean includeCurrentStatus) {
if (journalEnabled && journal != null) {
return;
}
journalEnabled = true;
if (journal == null) {
journal = new ArrayList<StatusJournalEntry>();
}
if (includeCurrentStatus) {
journal.add(new StatusJournalEntryImpl(status, statusTime));
}
}
@Override
public void disableStatusJournal() {
journalEnabled = false;
}
@Override
public String prettyPrintJournal() {
return StringUtils.join("\n\t", getStatusJournal());
}
}

View File

@ -2111,6 +2111,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
status.enableStatusJournal(false);
if (this.closed.get()) {
String msg = "Skipping compaction on " + this + " because closed";
LOG.debug(msg);
@ -2159,7 +2160,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return true;
} finally {
if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
if (status != null) status.cleanup();
if (status != null) {
LOG.debug("Compaction status journal:\n\t" + status.prettyPrintJournal());
status.cleanup();
}
}
}
@ -2240,6 +2244,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
}
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
status.enableStatusJournal(false);
status.setStatus("Acquiring readlock on region");
// block waiting for the lock for flushing cache
lock.readLock().lock();
@ -2303,6 +2308,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
} finally {
lock.readLock().unlock();
LOG.debug("Flush status journal:\n\t" + status.prettyPrintJournal());
status.cleanup();
}
}

View File

@ -195,5 +195,30 @@ public class TestTaskMonitor {
assertEquals(3, operationTasks.size());
tm.shutdown();
}
@Test
public void testStatusJournal() {
TaskMonitor tm = new TaskMonitor(new Configuration());
MonitoredTask task = tm.createStatus("Test task");
assertTrue(task.getStatusJournal().isEmpty());
task.disableStatusJournal();
task.setStatus("status1");
// journal should be empty since it is disabled
assertTrue(task.getStatusJournal().isEmpty());
task.enableStatusJournal(true);
// check existing status entered in journal
assertEquals("status1", task.getStatusJournal().get(0).getStatus());
assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0);
task.disableStatusJournal();
task.setStatus("status2");
// check status 2 not added since disabled
assertEquals(1, task.getStatusJournal().size());
task.enableStatusJournal(false);
// size should still be 1 since we didn't include current status
assertEquals(1, task.getStatusJournal().size());
task.setStatus("status3");
assertEquals("status3", task.getStatusJournal().get(1).getStatus());
tm.shutdown();
}
}