diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java index f0eefd7016e..a715549250e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.monitoring; import java.io.IOException; +import java.util.List; import java.util.Map; import org.apache.yetus.audience.InterfaceAudience; @@ -32,6 +33,12 @@ public interface MonitoredTask extends Cloneable { ABORTED } + public interface StatusJournalEntry { + String getStatus(); + + long getTimeStamp(); + } + long getStartTime(); String getDescription(); String getStatus(); @@ -51,6 +58,19 @@ public interface MonitoredTask extends Cloneable { void setDescription(String description); void setWarnTime(final long t); + List getStatusJournal(); + + /** + * Enable journal that will store all statuses that have been set along with the time stamps when + * they were set. + * @param includeCurrentStatus whether to include the current set status in the journal + */ + void enableStatusJournal(boolean includeCurrentStatus); + + void disableStatusJournal(); + + String prettyPrintJournal(); + /** * Explicitly mark this status as able to be cleaned up, * even though it might not be complete. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java index bedb5e28fc3..0fa638f1a20 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java @@ -19,10 +19,15 @@ package org.apache.hadoop.hbase.monitoring; import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; @InterfaceAudience.Private @@ -37,6 +42,9 @@ class MonitoredTaskImpl implements MonitoredTask { protected volatile State state = State.RUNNING; + private boolean journalEnabled = false; + private List journal; + private static final ObjectMapper MAPPER = new ObjectMapper(); public MonitoredTaskImpl() { @@ -46,6 +54,35 @@ class MonitoredTaskImpl implements MonitoredTask { warnTime = startTime; } + private static class StatusJournalEntryImpl implements StatusJournalEntry { + private long statusTime; + private String status; + + public StatusJournalEntryImpl(String status, long statusTime) { + this.status = status; + this.statusTime = statusTime; + } + + @Override + public String getStatus() { + return status; + } + + @Override + public long getTimeStamp() { + return statusTime; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(status); + sb.append(" at "); + sb.append(statusTime); + return sb.toString(); + } + } + @Override public synchronized MonitoredTaskImpl clone() { try { @@ -126,6 +163,9 @@ class MonitoredTaskImpl implements MonitoredTask { public void setStatus(String status) { this.status = status; statusTime = System.currentTimeMillis(); + if (journalEnabled) { + journal.add(new StatusJournalEntryImpl(this.status, statusTime)); + } } protected void setState(State state) { @@ -191,4 +231,47 @@ class MonitoredTaskImpl implements MonitoredTask { return sb.toString(); } + /** + * Returns the status journal. This implementation of status journal is not thread-safe. Currently + * we use this to track various stages of flushes and compactions where we can use this/pretty + * print for post task analysis, by which time we are already done changing states (writing to + * journal) + */ + @Override + public List getStatusJournal() { + if (journal == null) { + return Collections.emptyList(); + } else { + return Collections.unmodifiableList(journal); + } + } + + /** + * Enables journaling of this monitored task, the first invocation will lazily initialize the + * journal. The journal implementation itself and this method are not thread safe + */ + @Override + public void enableStatusJournal(boolean includeCurrentStatus) { + if (journalEnabled && journal != null) { + return; + } + journalEnabled = true; + if (journal == null) { + journal = new ArrayList(); + } + if (includeCurrentStatus) { + journal.add(new StatusJournalEntryImpl(status, statusTime)); + } + } + + @Override + public void disableStatusJournal() { + journalEnabled = false; + } + + @Override + public String prettyPrintJournal() { + return StringUtils.join("\n\t", getStatusJournal()); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 02ae7d87b65..ed6275633a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2111,6 +2111,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this); + status.enableStatusJournal(false); if (this.closed.get()) { String msg = "Skipping compaction on " + this + " because closed"; LOG.debug(msg); @@ -2159,7 +2160,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; } finally { if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction); - if (status != null) status.cleanup(); + if (status != null) { + LOG.debug("Compaction status journal:\n\t" + status.prettyPrintJournal()); + status.cleanup(); + } } } @@ -2240,6 +2244,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); } MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); + status.enableStatusJournal(false); status.setStatus("Acquiring readlock on region"); // block waiting for the lock for flushing cache lock.readLock().lock(); @@ -2303,6 +2308,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } finally { lock.readLock().unlock(); + LOG.debug("Flush status journal:\n\t" + status.prettyPrintJournal()); status.cleanup(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java index a2fe42a263b..38c03083889 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java @@ -195,5 +195,30 @@ public class TestTaskMonitor { assertEquals(3, operationTasks.size()); tm.shutdown(); } + + @Test + public void testStatusJournal() { + TaskMonitor tm = new TaskMonitor(new Configuration()); + MonitoredTask task = tm.createStatus("Test task"); + assertTrue(task.getStatusJournal().isEmpty()); + task.disableStatusJournal(); + task.setStatus("status1"); + // journal should be empty since it is disabled + assertTrue(task.getStatusJournal().isEmpty()); + task.enableStatusJournal(true); + // check existing status entered in journal + assertEquals("status1", task.getStatusJournal().get(0).getStatus()); + assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0); + task.disableStatusJournal(); + task.setStatus("status2"); + // check status 2 not added since disabled + assertEquals(1, task.getStatusJournal().size()); + task.enableStatusJournal(false); + // size should still be 1 since we didn't include current status + assertEquals(1, task.getStatusJournal().size()); + task.setStatus("status3"); + assertEquals("status3", task.getStatusJournal().get(1).getStatus()); + tm.shutdown(); + } }