HBASE-27211 Data race in MonitoredTaskImpl could cause split wal failure (#4630)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
Signed-off-by: Xin Sun <ddupgs@gmail.com>
(cherry picked from commit 9ab0b1504f
)
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
This commit is contained in:
parent
325f90bb40
commit
824785eeeb
|
@ -126,8 +126,7 @@ public abstract class TakeSnapshotHandler extends EventHandler
|
|||
this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, workingDirFs);
|
||||
// update the running tasks
|
||||
this.status = TaskMonitor.get()
|
||||
.createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable);
|
||||
this.status.enableStatusJournal(true);
|
||||
.createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable, true);
|
||||
this.snapshotManifest =
|
||||
SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status);
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements Monito
|
|||
private Map<String, Object> callInfoMap = new HashMap<>();
|
||||
|
||||
public MonitoredRPCHandlerImpl() {
|
||||
super();
|
||||
super(false);
|
||||
// in this implementation, WAITING indicates that the handler is not
|
||||
// actively servicing an RPC call.
|
||||
setState(State.WAITING);
|
||||
|
@ -235,7 +235,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements Monito
|
|||
return map;
|
||||
}
|
||||
Map<String, Object> rpcJSON = new HashMap<>();
|
||||
ArrayList paramList = new ArrayList();
|
||||
ArrayList<Object> paramList = new ArrayList<>();
|
||||
map.put("rpcCall", rpcJSON);
|
||||
rpcJSON.put("queuetimems", getRPCQueueTime());
|
||||
rpcJSON.put("starttimems", getRPCStartTime());
|
||||
|
|
|
@ -69,16 +69,11 @@ public interface MonitoredTask extends Cloneable {
|
|||
|
||||
void setWarnTime(final long t);
|
||||
|
||||
List<StatusJournalEntry> getStatusJournal();
|
||||
|
||||
/**
|
||||
* Enable journal that will store all statuses that have been set along with the time stamps when
|
||||
* they were set.
|
||||
* @param includeCurrentStatus whether to include the current set status in the journal
|
||||
* If journal is enabled, we will store all statuses that have been set along with the time stamps
|
||||
* when they were set. This method will give you all the journals stored so far.
|
||||
*/
|
||||
void enableStatusJournal(boolean includeCurrentStatus);
|
||||
|
||||
void disableStatusJournal();
|
||||
List<StatusJournalEntry> getStatusJournal();
|
||||
|
||||
String prettyPrintJournal();
|
||||
|
||||
|
|
|
@ -18,15 +18,17 @@
|
|||
package org.apache.hadoop.hbase.monitoring;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.GsonUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
|
||||
import org.apache.hbase.thirdparty.com.google.gson.Gson;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -40,22 +42,25 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
private volatile String description;
|
||||
|
||||
protected volatile State state = State.RUNNING;
|
||||
|
||||
private boolean journalEnabled = false;
|
||||
private List<StatusJournalEntry> journal;
|
||||
private final ConcurrentLinkedQueue<StatusJournalEntry> journal;
|
||||
|
||||
private static final Gson GSON = GsonUtil.createGson().create();
|
||||
|
||||
public MonitoredTaskImpl() {
|
||||
public MonitoredTaskImpl(boolean enableJournal) {
|
||||
startTime = EnvironmentEdgeManager.currentTime();
|
||||
statusTime = startTime;
|
||||
stateTime = startTime;
|
||||
warnTime = startTime;
|
||||
if (enableJournal) {
|
||||
journal = new ConcurrentLinkedQueue<>();
|
||||
} else {
|
||||
journal = null;
|
||||
}
|
||||
}
|
||||
|
||||
private static class StatusJournalEntryImpl implements StatusJournalEntry {
|
||||
private long statusTime;
|
||||
private String status;
|
||||
private static final class StatusJournalEntryImpl implements StatusJournalEntry {
|
||||
private final long statusTime;
|
||||
private final String status;
|
||||
|
||||
public StatusJournalEntryImpl(String status, long statusTime) {
|
||||
this.status = status;
|
||||
|
@ -74,11 +79,7 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(status);
|
||||
sb.append(" at ");
|
||||
sb.append(statusTime);
|
||||
return sb.toString();
|
||||
return status + " at " + statusTime;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -162,7 +163,7 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
public void setStatus(String status) {
|
||||
this.status = status;
|
||||
statusTime = EnvironmentEdgeManager.currentTime();
|
||||
if (journalEnabled) {
|
||||
if (journal != null) {
|
||||
journal.add(new StatusJournalEntryImpl(this.status, statusTime));
|
||||
}
|
||||
}
|
||||
|
@ -240,52 +241,29 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
if (journal == null) {
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
return Collections.unmodifiableList(journal);
|
||||
return ImmutableList.copyOf(journal);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables journaling of this monitored task, the first invocation will lazily initialize the
|
||||
* journal. The journal implementation itself and this method are not thread safe
|
||||
*/
|
||||
@Override
|
||||
public void enableStatusJournal(boolean includeCurrentStatus) {
|
||||
if (journalEnabled && journal != null) {
|
||||
return;
|
||||
}
|
||||
journalEnabled = true;
|
||||
if (journal == null) {
|
||||
journal = new ArrayList<StatusJournalEntry>();
|
||||
}
|
||||
if (includeCurrentStatus && status != null) {
|
||||
journal.add(new StatusJournalEntryImpl(status, statusTime));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disableStatusJournal() {
|
||||
journalEnabled = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String prettyPrintJournal() {
|
||||
if (!journalEnabled) {
|
||||
if (journal == null) {
|
||||
return "";
|
||||
}
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < journal.size(); i++) {
|
||||
StatusJournalEntry je = journal.get(i);
|
||||
sb.append(je.toString());
|
||||
if (i != 0) {
|
||||
StatusJournalEntry jep = journal.get(i - 1);
|
||||
long delta = je.getTimeStamp() - jep.getTimeStamp();
|
||||
Iterator<StatusJournalEntry> iter = journal.iterator();
|
||||
StatusJournalEntry previousEntry = null;
|
||||
while (iter.hasNext()) {
|
||||
StatusJournalEntry entry = iter.next();
|
||||
sb.append(entry);
|
||||
if (previousEntry != null) {
|
||||
long delta = entry.getTimeStamp() - previousEntry.getTimeStamp();
|
||||
if (delta != 0) {
|
||||
sb.append(" (+" + delta + " ms)");
|
||||
}
|
||||
}
|
||||
sb.append("\n");
|
||||
previousEntry = entry;
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ public class TaskMonitor {
|
|||
private final int maxTasks;
|
||||
private final long rpcWarnTime;
|
||||
private final long expirationTime;
|
||||
private final CircularFifoQueue tasks;
|
||||
private final CircularFifoQueue<TaskAndWeakRefPair> tasks;
|
||||
private final List<TaskAndWeakRefPair> rpcTasks;
|
||||
private final long monitorInterval;
|
||||
private Thread monitorThread;
|
||||
|
@ -67,7 +67,7 @@ public class TaskMonitor {
|
|||
maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
|
||||
expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME);
|
||||
rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
|
||||
tasks = new CircularFifoQueue(maxTasks);
|
||||
tasks = new CircularFifoQueue<>(maxTasks);
|
||||
rpcTasks = Lists.newArrayList();
|
||||
monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL);
|
||||
monitorThread = new Thread(new MonitorRunnable());
|
||||
|
@ -84,8 +84,12 @@ public class TaskMonitor {
|
|||
return instance;
|
||||
}
|
||||
|
||||
public synchronized MonitoredTask createStatus(String description) {
|
||||
MonitoredTask stat = new MonitoredTaskImpl();
|
||||
public MonitoredTask createStatus(String description) {
|
||||
return createStatus(description, false);
|
||||
}
|
||||
|
||||
public synchronized MonitoredTask createStatus(String description, boolean enableJournal) {
|
||||
MonitoredTask stat = new MonitoredTaskImpl(enableJournal);
|
||||
stat.setDescription(description);
|
||||
MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
|
||||
new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));
|
||||
|
|
|
@ -929,7 +929,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* Initialize this region.
|
||||
* @param reporter Tickle every so often if initialize is taking a while.
|
||||
* @return What the next sequence (edit) id should be.
|
||||
* @throws IOException e
|
||||
*/
|
||||
long initialize(final CancelableProgressable reporter) throws IOException {
|
||||
|
||||
|
@ -939,8 +938,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
+ " should have at least one column family.");
|
||||
}
|
||||
|
||||
MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
|
||||
status.enableStatusJournal(true);
|
||||
MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this, true);
|
||||
long nextSeqId = -1;
|
||||
try {
|
||||
nextSeqId = initializeRegionInternals(reporter, status);
|
||||
|
@ -1552,8 +1550,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// Only allow one thread to close at a time. Serialize them so dual
|
||||
// threads attempting to close will run up against each other.
|
||||
MonitoredTask status = TaskMonitor.get().createStatus(
|
||||
"Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " due to abort" : ""));
|
||||
status.enableStatusJournal(true);
|
||||
"Closing region " + this.getRegionInfo().getEncodedName() + (abort ? " due to abort" : ""),
|
||||
true);
|
||||
status.setStatus("Waiting for close lock");
|
||||
try {
|
||||
synchronized (closeLock) {
|
||||
|
@ -2250,7 +2248,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
|
||||
status.enableStatusJournal(false);
|
||||
if (this.closed.get()) {
|
||||
String msg = "Skipping compaction on " + this + " because closed";
|
||||
LOG.debug(msg);
|
||||
|
@ -2387,7 +2384,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
|
||||
}
|
||||
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
|
||||
status.enableStatusJournal(false);
|
||||
status.setStatus("Acquiring readlock on region");
|
||||
// block waiting for the lock for flushing cache
|
||||
lock.readLock().lock();
|
||||
|
|
|
@ -288,8 +288,7 @@ public class WALSplitter {
|
|||
int editsCount = 0;
|
||||
int editsSkipped = 0;
|
||||
MonitoredTask status =
|
||||
TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area.");
|
||||
status.enableStatusJournal(true);
|
||||
TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area.", true);
|
||||
Reader walReader = null;
|
||||
this.fileBeingSplit = walStatus;
|
||||
long startTS = EnvironmentEdgeManager.currentTime();
|
||||
|
|
|
@ -212,23 +212,17 @@ public class TestTaskMonitor {
|
|||
TaskMonitor tm = new TaskMonitor(new Configuration());
|
||||
MonitoredTask task = tm.createStatus("Test task");
|
||||
assertTrue(task.getStatusJournal().isEmpty());
|
||||
task.disableStatusJournal();
|
||||
task.setStatus("status1");
|
||||
// journal should be empty since it is disabled
|
||||
assertTrue(task.getStatusJournal().isEmpty());
|
||||
task.enableStatusJournal(true);
|
||||
// check existing status entered in journal
|
||||
assertEquals("status1", task.getStatusJournal().get(0).getStatus());
|
||||
assertTrue(task.getStatusJournal().get(0).getTimeStamp() > 0);
|
||||
task.disableStatusJournal();
|
||||
task = tm.createStatus("Test task with journal", true);
|
||||
task.setStatus("status2");
|
||||
// check status 2 not added since disabled
|
||||
assertEquals(1, task.getStatusJournal().size());
|
||||
task.enableStatusJournal(false);
|
||||
// size should still be 1 since we didn't include current status
|
||||
assertEquals(1, task.getStatusJournal().size());
|
||||
assertEquals("status2", task.getStatusJournal().get(0).getStatus());
|
||||
task.setStatus("status3");
|
||||
assertEquals(2, task.getStatusJournal().size());
|
||||
assertEquals("status3", task.getStatusJournal().get(1).getStatus());
|
||||
task.prettyPrintJournal();
|
||||
tm.shutdown();
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.Assume.assumeFalse;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -945,6 +946,9 @@ public class TestWALSplit {
|
|||
*/
|
||||
@Test
|
||||
public void testThreadingSlowWriterSmallBuffer() throws Exception {
|
||||
// The logic of this test has conflict with the limit writers split logic, skip this test for
|
||||
// TestWALSplitBoundedLogWriterCreation
|
||||
assumeFalse(this instanceof TestWALSplitBoundedLogWriterCreation);
|
||||
doTestThreading(200, 1024, 50);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,13 +19,12 @@ package org.apache.hadoop.hbase.wal;
|
|||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
@Category({ RegionServerTests.class, LargeTests.class })
|
||||
public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit {
|
||||
|
||||
@ClassRule
|
||||
|
@ -37,14 +36,4 @@ public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit {
|
|||
TestWALSplit.setUpBeforeClass();
|
||||
TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* The logic of this test has conflict with the limit writers split logic, skip this test
|
||||
*/
|
||||
@Override
|
||||
@Test
|
||||
@Ignore
|
||||
public void testThreadingSlowWriterSmallBuffer() throws Exception {
|
||||
super.testThreadingSlowWriterSmallBuffer();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue