HBASE-27211 Data race in MonitoredTaskImpl could cause split wal failure (#4630)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
Duo Zhang 2022-07-18 19:25:06 +08:00 committed by GitHub
parent ff8eb59709
commit 9ab0b1504f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 57 additions and 98 deletions

View File

@ -125,9 +125,8 @@ public abstract class TakeSnapshotHandler extends EventHandler
// prepare the verify // prepare the verify
this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, workingDirFs); this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, workingDirFs);
// update the running tasks // update the running tasks
this.status = TaskMonitor.get() this.status = TaskMonitor.get().createStatus(
.createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable); "Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable, false, true);
this.status.enableStatusJournal(true);
this.snapshotManifest = this.snapshotManifest =
SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status); SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status);
} }

View File

@ -45,7 +45,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements Monito
private Map<String, Object> callInfoMap = new HashMap<>(); private Map<String, Object> callInfoMap = new HashMap<>();
public MonitoredRPCHandlerImpl() { public MonitoredRPCHandlerImpl() {
super(); super(false);
// in this implementation, WAITING indicates that the handler is not // in this implementation, WAITING indicates that the handler is not
// actively servicing an RPC call. // actively servicing an RPC call.
setState(State.WAITING); setState(State.WAITING);
@ -235,7 +235,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements Monito
return map; return map;
} }
Map<String, Object> rpcJSON = new HashMap<>(); Map<String, Object> rpcJSON = new HashMap<>();
ArrayList paramList = new ArrayList(); ArrayList<Object> paramList = new ArrayList<>();
map.put("rpcCall", rpcJSON); map.put("rpcCall", rpcJSON);
rpcJSON.put("queuetimems", getRPCQueueTime()); rpcJSON.put("queuetimems", getRPCQueueTime());
rpcJSON.put("starttimems", getRPCStartTime()); rpcJSON.put("starttimems", getRPCStartTime());

View File

@ -69,16 +69,11 @@ public interface MonitoredTask extends Cloneable {
void setWarnTime(final long t); void setWarnTime(final long t);
List<StatusJournalEntry> getStatusJournal();
/** /**
* Enable journal that will store all statuses that have been set along with the time stamps when * If journal is enabled, we will store all statuses that have been set along with the time stamps
* they were set. * when they were set. This method will give you all the journals stored so far.
* @param includeCurrentStatus whether to include the current set status in the journal
*/ */
void enableStatusJournal(boolean includeCurrentStatus); List<StatusJournalEntry> getStatusJournal();
void disableStatusJournal();
String prettyPrintJournal(); String prettyPrintJournal();

View File

@ -18,15 +18,17 @@
package org.apache.hadoop.hbase.monitoring; package org.apache.hadoop.hbase.monitoring;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.GsonUtil; import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.gson.Gson; import org.apache.hbase.thirdparty.com.google.gson.Gson;
@InterfaceAudience.Private @InterfaceAudience.Private
@ -40,22 +42,25 @@ class MonitoredTaskImpl implements MonitoredTask {
private volatile String description; private volatile String description;
protected volatile State state = State.RUNNING; protected volatile State state = State.RUNNING;
private final ConcurrentLinkedQueue<StatusJournalEntry> journal;
private boolean journalEnabled = false;
private List<StatusJournalEntry> journal;
private static final Gson GSON = GsonUtil.createGson().create(); private static final Gson GSON = GsonUtil.createGson().create();
public MonitoredTaskImpl() { public MonitoredTaskImpl(boolean enableJournal) {
startTime = EnvironmentEdgeManager.currentTime(); startTime = EnvironmentEdgeManager.currentTime();
statusTime = startTime; statusTime = startTime;
stateTime = startTime; stateTime = startTime;
warnTime = startTime; warnTime = startTime;
if (enableJournal) {
journal = new ConcurrentLinkedQueue<>();
} else {
journal = null;
}
} }
private static class StatusJournalEntryImpl implements StatusJournalEntry { private static final class StatusJournalEntryImpl implements StatusJournalEntry {
private long statusTime; private final long statusTime;
private String status; private final String status;
public StatusJournalEntryImpl(String status, long statusTime) { public StatusJournalEntryImpl(String status, long statusTime) {
this.status = status; this.status = status;
@ -74,11 +79,7 @@ class MonitoredTaskImpl implements MonitoredTask {
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); return status + " at " + statusTime;
sb.append(status);
sb.append(" at ");
sb.append(statusTime);
return sb.toString();
} }
} }
@ -162,7 +163,7 @@ class MonitoredTaskImpl implements MonitoredTask {
public void setStatus(String status) { public void setStatus(String status) {
this.status = status; this.status = status;
statusTime = EnvironmentEdgeManager.currentTime(); statusTime = EnvironmentEdgeManager.currentTime();
if (journalEnabled) { if (journal != null) {
journal.add(new StatusJournalEntryImpl(this.status, statusTime)); journal.add(new StatusJournalEntryImpl(this.status, statusTime));
} }
} }
@ -240,52 +241,29 @@ class MonitoredTaskImpl implements MonitoredTask {
if (journal == null) { if (journal == null) {
return Collections.emptyList(); return Collections.emptyList();
} else { } else {
return Collections.unmodifiableList(journal); return ImmutableList.copyOf(journal);
} }
} }
/**
* Enables journaling of this monitored task, the first invocation will lazily initialize the
* journal. The journal implementation itself and this method are not thread safe
*/
@Override
public void enableStatusJournal(boolean includeCurrentStatus) {
if (journalEnabled && journal != null) {
return;
}
journalEnabled = true;
if (journal == null) {
journal = new ArrayList<StatusJournalEntry>();
}
if (includeCurrentStatus && status != null) {
journal.add(new StatusJournalEntryImpl(status, statusTime));
}
}
@Override
public void disableStatusJournal() {
journalEnabled = false;
}
@Override @Override
public String prettyPrintJournal() { public String prettyPrintJournal() {
if (!journalEnabled) { if (journal == null) {
return ""; return "";
} }
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (int i = 0; i < journal.size(); i++) { Iterator<StatusJournalEntry> iter = journal.iterator();
StatusJournalEntry je = journal.get(i); StatusJournalEntry previousEntry = null;
sb.append(je.toString()); while (iter.hasNext()) {
if (i != 0) { StatusJournalEntry entry = iter.next();
StatusJournalEntry jep = journal.get(i - 1); sb.append(entry);
long delta = je.getTimeStamp() - jep.getTimeStamp(); if (previousEntry != null) {
long delta = entry.getTimeStamp() - previousEntry.getTimeStamp();
if (delta != 0) { if (delta != 0) {
sb.append(" (+" + delta + " ms)"); sb.append(" (+" + delta + " ms)");
} }
} }
sb.append("\n"); previousEntry = entry;
} }
return sb.toString(); return sb.toString();
} }
} }

View File

@ -58,7 +58,7 @@ public class TaskMonitor {
private final int maxTasks; private final int maxTasks;
private final long rpcWarnTime; private final long rpcWarnTime;
private final long expirationTime; private final long expirationTime;
private final CircularFifoQueue tasks; private final CircularFifoQueue<TaskAndWeakRefPair> tasks;
private final List<TaskAndWeakRefPair> rpcTasks; private final List<TaskAndWeakRefPair> rpcTasks;
private final long monitorInterval; private final long monitorInterval;
private Thread monitorThread; private Thread monitorThread;
@ -67,7 +67,7 @@ public class TaskMonitor {
maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS); maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME); expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME);
rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME); rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
tasks = new CircularFifoQueue(maxTasks); tasks = new CircularFifoQueue<>(maxTasks);
rpcTasks = Lists.newArrayList(); rpcTasks = Lists.newArrayList();
monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL); monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL);
monitorThread = new Thread(new MonitorRunnable()); monitorThread = new Thread(new MonitorRunnable());
@ -84,12 +84,17 @@ public class TaskMonitor {
return instance; return instance;
} }
public synchronized MonitoredTask createStatus(String description) { public MonitoredTask createStatus(String description) {
return createStatus(description, false); return createStatus(description, false);
} }
public synchronized MonitoredTask createStatus(String description, boolean ignore) { public MonitoredTask createStatus(String description, boolean ignore) {
MonitoredTask stat = new MonitoredTaskImpl(); return createStatus(description, ignore, false);
}
public synchronized MonitoredTask createStatus(String description, boolean ignore,
boolean enableJournal) {
MonitoredTask stat = new MonitoredTaskImpl(enableJournal);
stat.setDescription(description); stat.setDescription(description);
MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(), MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat)); new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));

View File

@ -931,7 +931,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Initialize this region. * Initialize this region.
* @param reporter Tickle every so often if initialize is taking a while. * @param reporter Tickle every so often if initialize is taking a while.
* @return What the next sequence (edit) id should be. * @return What the next sequence (edit) id should be.
* @throws IOException e
*/ */
long initialize(final CancelableProgressable reporter) throws IOException { long initialize(final CancelableProgressable reporter) throws IOException {
@ -941,8 +940,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
+ " should have at least one column family."); + " should have at least one column family.");
} }
MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this); MonitoredTask status =
status.enableStatusJournal(true); TaskMonitor.get().createStatus("Initializing region " + this, false, true);
long nextSeqId = -1; long nextSeqId = -1;
try { try {
nextSeqId = initializeRegionInternals(reporter, status); nextSeqId = initializeRegionInternals(reporter, status);
@ -1596,8 +1595,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// threads attempting to close will run up against each other. // threads attempting to close will run up against each other.
MonitoredTask status = TaskMonitor.get().createStatus( MonitoredTask status = TaskMonitor.get().createStatus(
"Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " due to abort" : ""), "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " due to abort" : ""),
ignoreStatus); ignoreStatus, true);
status.enableStatusJournal(true);
status.setStatus("Waiting for close lock"); status.setStatus("Waiting for close lock");
try { try {
synchronized (closeLock) { synchronized (closeLock) {
@ -2318,7 +2316,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this); status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
status.enableStatusJournal(false);
if (this.closed.get()) { if (this.closed.get()) {
String msg = "Skipping compaction on " + this + " because closed"; String msg = "Skipping compaction on " + this + " because closed";
LOG.debug(msg); LOG.debug(msg);
@ -2455,7 +2452,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
} }
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
status.enableStatusJournal(false);
status.setStatus("Acquiring readlock on region"); status.setStatus("Acquiring readlock on region");
// block waiting for the lock for flushing cache // block waiting for the lock for flushing cache
lock.readLock().lock(); lock.readLock().lock();

View File

@ -287,9 +287,8 @@ public class WALSplitter {
boolean cancelled = false; boolean cancelled = false;
int editsCount = 0; int editsCount = 0;
int editsSkipped = 0; int editsSkipped = 0;
MonitoredTask status = MonitoredTask status = TaskMonitor.get()
TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area."); .createStatus("Splitting " + wal + " to temporary staging area.", false, true);
status.enableStatusJournal(true);
Reader walReader = null; Reader walReader = null;
this.fileBeingSplit = walStatus; this.fileBeingSplit = walStatus;
long startTS = EnvironmentEdgeManager.currentTime(); long startTS = EnvironmentEdgeManager.currentTime();

View File

@ -212,23 +212,17 @@ public class TestTaskMonitor {
TaskMonitor tm = new TaskMonitor(new Configuration()); TaskMonitor tm = new TaskMonitor(new Configuration());
MonitoredTask task = tm.createStatus("Test task"); MonitoredTask task = tm.createStatus("Test task");
assertTrue(task.getStatusJournal().isEmpty()); assertTrue(task.getStatusJournal().isEmpty());
task.disableStatusJournal();
task.setStatus("status1"); task.setStatus("status1");
// journal should be empty since it is disabled // journal should be empty since it is disabled
assertTrue(task.getStatusJournal().isEmpty()); assertTrue(task.getStatusJournal().isEmpty());
task.enableStatusJournal(true); task = tm.createStatus("Test task with journal", false, true);
// check existing status entered in journal
assertEquals("status1", task.getStatusJournal().get(0).getStatus());
assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0);
task.disableStatusJournal();
task.setStatus("status2"); task.setStatus("status2");
// check status 2 not added since disabled
assertEquals(1, task.getStatusJournal().size());
task.enableStatusJournal(false);
// size should still be 1 since we didn't include current status
assertEquals(1, task.getStatusJournal().size()); assertEquals(1, task.getStatusJournal().size());
assertEquals("status2", task.getStatusJournal().get(0).getStatus());
task.setStatus("status3"); task.setStatus("status3");
assertEquals(2, task.getStatusJournal().size());
assertEquals("status3", task.getStatusJournal().get(1).getStatus()); assertEquals("status3", task.getStatusJournal().get(1).getStatus());
task.prettyPrintJournal();
tm.shutdown(); tm.shutdown();
} }

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -945,6 +946,9 @@ public class TestWALSplit {
*/ */
@Test @Test
public void testThreadingSlowWriterSmallBuffer() throws Exception { public void testThreadingSlowWriterSmallBuffer() throws Exception {
// The logic of this test has conflict with the limit writers split logic, skip this test for
// TestWALSplitBoundedLogWriterCreation
assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation);
doTestThreading(200, 1024, 50); doTestThreading(200, 1024, 50);
} }

View File

@ -19,13 +19,12 @@ package org.apache.hadoop.hbase.wal;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@Category(LargeTests.class) @Category({ RegionServerTests.class, LargeTests.class })
public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit { public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit {
@ClassRule @ClassRule
@ -37,14 +36,4 @@ public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit {
TestWALSplit.setUpBeforeClass(); TestWALSplit.setUpBeforeClass();
TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true); TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
} }
/**
* The logic of this test has conflict with the limit writers split logic, skip this test
*/
@Override
@Test
@Ignore
public void testThreadingSlowWriterSmallBuffer() throws Exception {
super.testThreadingSlowWriterSmallBuffer();
}
} }