diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java index b500e2d0ebe..017e107c6e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java @@ -126,8 +126,7 @@ public abstract class TakeSnapshotHandler extends EventHandler this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, workingDirFs); // update the running tasks this.status = TaskMonitor.get() - .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable); - this.status.enableStatusJournal(true); + .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable, true); this.snapshotManifest = SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java index 68b270b9567..154f3b2e357 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java @@ -45,7 +45,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements Monito private Map callInfoMap = new HashMap<>(); public MonitoredRPCHandlerImpl() { - super(); + super(false); // in this implementation, WAITING indicates that the handler is not // actively servicing an RPC call. setState(State.WAITING); @@ -235,7 +235,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements Monito return map; } Map rpcJSON = new HashMap<>(); - ArrayList paramList = new ArrayList(); + ArrayList paramList = new ArrayList<>(); map.put("rpcCall", rpcJSON); rpcJSON.put("queuetimems", getRPCQueueTime()); rpcJSON.put("starttimems", getRPCStartTime()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java index 5507e1607b6..08a82a3e9de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java @@ -69,16 +69,11 @@ public interface MonitoredTask extends Cloneable { void setWarnTime(final long t); - List getStatusJournal(); - /** - * Enable journal that will store all statuses that have been set along with the time stamps when - * they were set. - * @param includeCurrentStatus whether to include the current set status in the journal + * If journal is enabled, we will store all statuses that have been set along with the time stamps + * when they were set. This method will give you all the journals stored so far. */ - void enableStatusJournal(boolean includeCurrentStatus); - - void disableStatusJournal(); + List getStatusJournal(); String prettyPrintJournal(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java index 4fe272bc497..4fee362a735 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java @@ -18,15 +18,17 @@ package org.apache.hadoop.hbase.monitoring; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.GsonUtil; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.gson.Gson; @InterfaceAudience.Private @@ -40,22 +42,25 @@ class MonitoredTaskImpl implements MonitoredTask { private volatile String description; protected volatile State state = State.RUNNING; - - private boolean journalEnabled = false; - private List journal; + private final ConcurrentLinkedQueue journal; private static final Gson GSON = GsonUtil.createGson().create(); - public MonitoredTaskImpl() { + public MonitoredTaskImpl(boolean enableJournal) { startTime = EnvironmentEdgeManager.currentTime(); statusTime = startTime; stateTime = startTime; warnTime = startTime; + if (enableJournal) { + journal = new ConcurrentLinkedQueue<>(); + } else { + journal = null; + } } - private static class StatusJournalEntryImpl implements StatusJournalEntry { - private long statusTime; - private String status; + private static final class StatusJournalEntryImpl implements StatusJournalEntry { + private final long statusTime; + private final String status; public StatusJournalEntryImpl(String status, long statusTime) { this.status = status; @@ -74,11 +79,7 @@ class MonitoredTaskImpl implements MonitoredTask { @Override public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(status); - sb.append(" at "); - sb.append(statusTime); - return sb.toString(); + return status + " at " + statusTime; } } @@ -162,7 +163,7 @@ class MonitoredTaskImpl implements MonitoredTask { public void setStatus(String status) { this.status = status; statusTime = EnvironmentEdgeManager.currentTime(); - if (journalEnabled) { + if (journal != null) { journal.add(new StatusJournalEntryImpl(this.status, statusTime)); } } @@ -240,52 +241,29 @@ class MonitoredTaskImpl implements MonitoredTask { if (journal == null) { return Collections.emptyList(); } else { - return Collections.unmodifiableList(journal); + return ImmutableList.copyOf(journal); } } - /** - * Enables journaling of this monitored task, the first invocation will lazily initialize the - * journal. The journal implementation itself and this method are not thread safe - */ - @Override - public void enableStatusJournal(boolean includeCurrentStatus) { - if (journalEnabled && journal != null) { - return; - } - journalEnabled = true; - if (journal == null) { - journal = new ArrayList(); - } - if (includeCurrentStatus && status != null) { - journal.add(new StatusJournalEntryImpl(status, statusTime)); - } - } - - @Override - public void disableStatusJournal() { - journalEnabled = false; - } - @Override public String prettyPrintJournal() { - if (!journalEnabled) { + if (journal == null) { return ""; } StringBuilder sb = new StringBuilder(); - for (int i = 0; i < journal.size(); i++) { - StatusJournalEntry je = journal.get(i); - sb.append(je.toString()); - if (i != 0) { - StatusJournalEntry jep = journal.get(i - 1); - long delta = je.getTimeStamp() - jep.getTimeStamp(); + Iterator iter = journal.iterator(); + StatusJournalEntry previousEntry = null; + while (iter.hasNext()) { + StatusJournalEntry entry = iter.next(); + sb.append(entry); + if (previousEntry != null) { + long delta = entry.getTimeStamp() - previousEntry.getTimeStamp(); if (delta != 0) { sb.append(" (+" + delta + " ms)"); } } - sb.append("\n"); + previousEntry = entry; } return sb.toString(); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java index b18790c6176..c2425cbbc1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java @@ -58,7 +58,7 @@ public class TaskMonitor { private final int maxTasks; private final long rpcWarnTime; private final long expirationTime; - private final CircularFifoQueue tasks; + private final CircularFifoQueue tasks; private final List rpcTasks; private final long monitorInterval; private Thread monitorThread; @@ -67,7 +67,7 @@ public class TaskMonitor { maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS); expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME); rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME); - tasks = new CircularFifoQueue(maxTasks); + tasks = new CircularFifoQueue<>(maxTasks); rpcTasks = Lists.newArrayList(); monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL); monitorThread = new Thread(new MonitorRunnable()); @@ -84,8 +84,12 @@ public class TaskMonitor { return instance; } - public synchronized MonitoredTask createStatus(String description) { - MonitoredTask stat = new MonitoredTaskImpl(); + public MonitoredTask createStatus(String description) { + return createStatus(description, false); + } + + public synchronized MonitoredTask createStatus(String description, boolean enableJournal) { + MonitoredTask stat = new MonitoredTaskImpl(enableJournal); stat.setDescription(description); MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(), new Class[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 83bd6f09424..7ee26d2ec92 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -929,7 +929,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Initialize this region. * @param reporter Tickle every so often if initialize is taking a while. * @return What the next sequence (edit) id should be. - * @throws IOException e */ long initialize(final CancelableProgressable reporter) throws IOException { @@ -939,8 +938,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + " should have at least one column family."); } - MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this); - status.enableStatusJournal(true); + MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this, true); long nextSeqId = -1; try { nextSeqId = initializeRegionInternals(reporter, status); @@ -1552,8 +1550,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Only allow one thread to close at a time. Serialize them so dual // threads attempting to close will run up against each other. MonitoredTask status = TaskMonitor.get().createStatus( - "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " due to abort" : "")); - status.enableStatusJournal(true); + "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " due to abort" : ""), + true); status.setStatus("Waiting for close lock"); try { synchronized (closeLock) { @@ -2250,7 +2248,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this); - status.enableStatusJournal(false); if (this.closed.get()) { String msg = "Skipping compaction on " + this + " because closed"; LOG.debug(msg); @@ -2387,7 +2384,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); } MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); - status.enableStatusJournal(false); status.setStatus("Acquiring readlock on region"); // block waiting for the lock for flushing cache lock.readLock().lock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index f1bc9c8dcf7..a6463094bea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -288,8 +288,7 @@ public class WALSplitter { int editsCount = 0; int editsSkipped = 0; MonitoredTask status = - TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area."); - status.enableStatusJournal(true); + TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area.", true); Reader walReader = null; this.fileBeingSplit = walStatus; long startTS = EnvironmentEdgeManager.currentTime(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java index 00a3cd6431d..f7623c4d803 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java @@ -212,23 +212,17 @@ public class TestTaskMonitor { TaskMonitor tm = new TaskMonitor(new Configuration()); MonitoredTask task = tm.createStatus("Test task"); assertTrue(task.getStatusJournal().isEmpty()); - task.disableStatusJournal(); task.setStatus("status1"); // journal should be empty since it is disabled assertTrue(task.getStatusJournal().isEmpty()); - task.enableStatusJournal(true); - // check existing status entered in journal - assertEquals("status1", task.getStatusJournal().get(0).getStatus()); - assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0); - task.disableStatusJournal(); + task = tm.createStatus("Test task with journal", true); task.setStatus("status2"); - // check status 2 not added since disabled - assertEquals(1, task.getStatusJournal().size()); - task.enableStatusJournal(false); - // size should still be 1 since we didn't include current status assertEquals(1, task.getStatusJournal().size()); + assertEquals("status2", task.getStatusJournal().get(0).getStatus()); task.setStatus("status3"); + assertEquals(2, task.getStatusJournal().size()); assertEquals("status3", task.getStatusJournal().get(1).getStatus()); + task.prettyPrintJournal(); tm.shutdown(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 752c434570d..450ed17c731 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeFalse; import java.io.FileNotFoundException; import java.io.IOException; @@ -945,6 +946,9 @@ public class TestWALSplit { */ @Test public void testThreadingSlowWriterSmallBuffer() throws Exception { + // The logic of this test has conflict with the limit writers split logic, skip this test for + // TestWALSplitBoundedLogWriterCreation + assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation); doTestThreading(200, 1024, 50); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java index 2a9e77ba60b..940248eb6f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java @@ -19,13 +19,12 @@ package org.apache.hadoop.hbase.wal; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; -import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(LargeTests.class) +@Category({ RegionServerTests.class, LargeTests.class }) public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit { @ClassRule @@ -37,14 +36,4 @@ public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit { TestWALSplit.setUpBeforeClass(); TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true); } - - /** - * The logic of this test has conflict with the limit writers split logic, skip this test - */ - @Override - @Test - @Ignore - public void testThreadingSlowWriterSmallBuffer() throws Exception { - super.testThreadingSlowWriterSmallBuffer(); - } }