HBASE-20806 Split style journal for flushes and compactions
This commit is contained in:
parent
8a394ade4c
commit
ae22d57201
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.monitoring;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -32,6 +33,12 @@ public interface MonitoredTask extends Cloneable {
|
|||
ABORTED;
|
||||
}
|
||||
|
||||
public interface StatusJournalEntry {
|
||||
String getStatus();
|
||||
|
||||
long getTimeStamp();
|
||||
}
|
||||
|
||||
long getStartTime();
|
||||
String getDescription();
|
||||
String getStatus();
|
||||
|
@ -51,6 +58,19 @@ public interface MonitoredTask extends Cloneable {
|
|||
void setDescription(String description);
|
||||
void setWarnTime(final long t);
|
||||
|
||||
List<StatusJournalEntry> getStatusJournal();
|
||||
|
||||
/**
|
||||
* Enable journal that will store all statuses that have been set along with the time stamps when
|
||||
* they were set.
|
||||
* @param includeCurrentStatus whether to include the current set status in the journal
|
||||
*/
|
||||
void enableStatusJournal(boolean includeCurrentStatus);
|
||||
|
||||
void disableStatusJournal();
|
||||
|
||||
String prettyPrintJournal();
|
||||
|
||||
/**
|
||||
* Explicitly mark this status as able to be cleaned up,
|
||||
* even though it might not be complete.
|
||||
|
|
|
@ -19,10 +19,15 @@
|
|||
package org.apache.hadoop.hbase.monitoring;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -37,6 +42,9 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
|
||||
protected volatile State state = State.RUNNING;
|
||||
|
||||
private boolean journalEnabled = false;
|
||||
private List<StatusJournalEntry> journal;
|
||||
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
|
||||
public MonitoredTaskImpl() {
|
||||
|
@ -46,6 +54,35 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
warnTime = startTime;
|
||||
}
|
||||
|
||||
private static class StatusJournalEntryImpl implements StatusJournalEntry {
|
||||
private long statusTime;
|
||||
private String status;
|
||||
|
||||
public StatusJournalEntryImpl(String status, long statusTime) {
|
||||
this.status = status;
|
||||
this.statusTime = statusTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimeStamp() {
|
||||
return statusTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(status);
|
||||
sb.append(" at ");
|
||||
sb.append(statusTime);
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized MonitoredTaskImpl clone() {
|
||||
try {
|
||||
|
@ -126,6 +163,9 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
public void setStatus(String status) {
|
||||
this.status = status;
|
||||
statusTime = System.currentTimeMillis();
|
||||
if (journalEnabled) {
|
||||
journal.add(new StatusJournalEntryImpl(this.status, statusTime));
|
||||
}
|
||||
}
|
||||
|
||||
protected void setState(State state) {
|
||||
|
@ -191,4 +231,47 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the status journal. This implementation of status journal is not thread-safe. Currently
|
||||
* we use this to track various stages of flushes and compactions where we can use this/pretty
|
||||
* print for post task analysis, by which time we are already done changing states (writing to
|
||||
* journal)
|
||||
*/
|
||||
@Override
|
||||
public List<StatusJournalEntry> getStatusJournal() {
|
||||
if (journal == null) {
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
return Collections.unmodifiableList(journal);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables journaling of this monitored task, the first invocation will lazily initialize the
|
||||
* journal. The journal implementation itself and this method are not thread safe
|
||||
*/
|
||||
@Override
|
||||
public void enableStatusJournal(boolean includeCurrentStatus) {
|
||||
if (journalEnabled && journal != null) {
|
||||
return;
|
||||
}
|
||||
journalEnabled = true;
|
||||
if (journal == null) {
|
||||
journal = new ArrayList<StatusJournalEntry>();
|
||||
}
|
||||
if (includeCurrentStatus) {
|
||||
journal.add(new StatusJournalEntryImpl(status, statusTime));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disableStatusJournal() {
|
||||
journalEnabled = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String prettyPrintJournal() {
|
||||
return StringUtils.join("\n\t", getStatusJournal());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2081,6 +2081,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
|
||||
status.enableStatusJournal(false);
|
||||
if (this.closed.get()) {
|
||||
String msg = "Skipping compaction on " + this + " because closed";
|
||||
LOG.debug(msg);
|
||||
|
@ -2129,7 +2130,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return true;
|
||||
} finally {
|
||||
if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
|
||||
if (status != null) status.cleanup();
|
||||
if (status != null) {
|
||||
LOG.debug("Compaction status journal:\n\t" + status.prettyPrintJournal());
|
||||
status.cleanup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2210,6 +2214,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
|
||||
}
|
||||
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
|
||||
status.enableStatusJournal(false);
|
||||
status.setStatus("Acquiring readlock on region");
|
||||
// block waiting for the lock for flushing cache
|
||||
lock.readLock().lock();
|
||||
|
@ -2273,6 +2278,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
LOG.debug("Flush status journal:\n\t" + status.prettyPrintJournal());
|
||||
status.cleanup();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -195,5 +195,30 @@ public class TestTaskMonitor {
|
|||
assertEquals(3, operationTasks.size());
|
||||
tm.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStatusJournal() {
|
||||
TaskMonitor tm = new TaskMonitor(new Configuration());
|
||||
MonitoredTask task = tm.createStatus("Test task");
|
||||
assertTrue(task.getStatusJournal().isEmpty());
|
||||
task.disableStatusJournal();
|
||||
task.setStatus("status1");
|
||||
// journal should be empty since it is disabled
|
||||
assertTrue(task.getStatusJournal().isEmpty());
|
||||
task.enableStatusJournal(true);
|
||||
// check existing status entered in journal
|
||||
assertEquals("status1", task.getStatusJournal().get(0).getStatus());
|
||||
assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0);
|
||||
task.disableStatusJournal();
|
||||
task.setStatus("status2");
|
||||
// check status 2 not added since disabled
|
||||
assertEquals(1, task.getStatusJournal().size());
|
||||
task.enableStatusJournal(false);
|
||||
// size should still be 1 since we didn't include current status
|
||||
assertEquals(1, task.getStatusJournal().size());
|
||||
task.setStatus("status3");
|
||||
assertEquals("status3", task.getStatusJournal().get(1).getStatus());
|
||||
tm.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue