HDFS-14997. BPServiceActor processes commands from NameNode asynchronously. Contributed by Xiaoqiao He.

(cherry picked from commit b86895485d)
(cherry picked from commit d5501c125f)
This commit is contained in:
Inigo Goiri 2019-12-19 09:34:43 -08:00 committed by He Xiaoqiao
parent 0d0f344d65
commit baa4190eab
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
3 changed files with 155 additions and 37 deletions

View File

@ -32,7 +32,9 @@ import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
@ -115,6 +117,7 @@ class BPServiceActor implements Runnable {
private DatanodeRegistration bpRegistration;
final LinkedList<BPServiceActorAction> bpThreadQueue
= new LinkedList<BPServiceActorAction>();
private final CommandProcessingThread commandProcessingThread;
BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr,
BPOfferService bpos) {
@ -136,6 +139,9 @@ class BPServiceActor implements Runnable {
dnConf.outliersReportIntervalMs);
// get the value of maxDataLength.
this.maxDataLength = dnConf.getMaxDataLength();
commandProcessingThread = new CommandProcessingThread(this);
commandProcessingThread.start();
}
public DatanodeRegistration getBpRegistration() {
@ -676,8 +682,7 @@ class BPServiceActor implements Runnable {
}
long startProcessCommands = monotonicNow();
if (!processCommand(resp.getCommands()))
continue;
commandProcessingThread.enqueue(resp.getCommands());
long endProcessCommands = monotonicNow();
if (endProcessCommands - startProcessCommands > 2000) {
LOG.info("Took " + (endProcessCommands - startProcessCommands)
@ -702,11 +707,11 @@ class BPServiceActor implements Runnable {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
commandProcessingThread.enqueue(cmds);
if (!dn.areCacheReportsDisabledForTests()) {
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
commandProcessingThread.enqueue(cmd);
}
if (sendHeartbeat) {
@ -876,37 +881,6 @@ class BPServiceActor implements Runnable {
return shouldServiceRun && dn.shouldRun();
}
/**
* Process an array of datanode commands
*
* @param cmds an array of datanode commands
* @return true if further processing may be required or false otherwise.
*/
boolean processCommand(DatanodeCommand[] cmds) {
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
try {
if (bpos.processCommandFromActor(cmd, this) == false) {
return false;
}
} catch (RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredNodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn(this + " is shutting down", re);
shouldServiceRun = false;
return false;
}
} catch (IOException ioe) {
LOG.warn("Error processing datanode Command", ioe);
}
}
}
return true;
}
/**
* Report a bad block from another DN in this cluster.
*/
@ -1281,4 +1255,102 @@ class BPServiceActor implements Runnable {
return Time.monotonicNow();
}
}
}
/**
* CommandProcessingThread that process commands asynchronously.
*/
class CommandProcessingThread extends Thread {
private final BPServiceActor actor;
private final BlockingQueue<Runnable> queue;
CommandProcessingThread(BPServiceActor actor) {
super("Command processor");
this.actor = actor;
this.queue = new LinkedBlockingQueue<>();
setDaemon(true);
}
@Override
public void run() {
try {
processQueue();
} catch (Throwable t) {
LOG.error("{} encountered fatal exception and exit.", getName(), t);
}
}
/**
* Process commands in queue one by one, and wait until queue not empty.
*/
private void processQueue() {
while (shouldRun()) {
try {
Runnable action = queue.take();
action.run();
dn.getMetrics().incrActorCmdQueueLength(-1);
dn.getMetrics().incrNumProcessedCommands();
} catch (InterruptedException e) {
LOG.error("{} encountered interrupt and exit.", getName());
// ignore unless thread was specifically interrupted.
if (Thread.interrupted()) {
break;
}
}
}
dn.getMetrics().incrActorCmdQueueLength(-1 * queue.size());
queue.clear();
}
/**
* Process an array of datanode commands.
*
* @param cmds an array of datanode commands
* @return true if further processing may be required or false otherwise.
*/
private boolean processCommand(DatanodeCommand[] cmds) {
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
try {
if (!bpos.processCommandFromActor(cmd, actor)) {
return false;
}
} catch (RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredNodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn("{} is shutting down", this, re);
shouldServiceRun = false;
return false;
}
} catch (IOException ioe) {
LOG.warn("Error processing datanode Command", ioe);
}
}
}
return true;
}
void enqueue(DatanodeCommand cmd) throws InterruptedException {
if (cmd == null) {
return;
}
queue.put(() -> processCommand(new DatanodeCommand[]{cmd}));
dn.getMetrics().incrActorCmdQueueLength(1);
}
void enqueue(List<DatanodeCommand> cmds) throws InterruptedException {
if (cmds == null) {
return;
}
queue.put(() -> processCommand(
cmds.toArray(new DatanodeCommand[cmds.size()])));
dn.getMetrics().incrActorCmdQueueLength(1);
}
void enqueue(DatanodeCommand[] cmds) throws InterruptedException {
queue.put(() -> processCommand(cmds));
dn.getMetrics().incrActorCmdQueueLength(1);
}
}
}

View File

@ -161,6 +161,10 @@ public class DataNodeMetrics {
private MutableCounterLong ecReconstructionDecodingTimeMillis;
@Metric("Milliseconds spent on write by erasure coding worker")
private MutableCounterLong ecReconstructionWriteTimeMillis;
@Metric("Sum of all BPServiceActors command queue length")
private MutableCounterLong sumOfActorCommandQueueLength;
@Metric("Num of processed commands of all BPServiceActors")
private MutableCounterLong numProcessedCommands;
final MetricsRegistry registry = new MetricsRegistry("datanode");
final String name;
@ -541,4 +545,12 @@ public class DataNodeMetrics {
.value(), totalWriteTime.value(), totalReadTime.value(),
blocksWritten.value(), blocksRead.value(), timeSinceLastReport);
}
public void incrActorCmdQueueLength(int delta) {
sumOfActorCommandQueueLength.incr(delta);
}
public void incrNumProcessedCommands() {
numProcessedCommands.incr();
}
}

View File

@ -18,7 +18,15 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
@ -36,6 +44,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -1050,4 +1059,29 @@ public class TestBPOfferService {
bpos.stop();
}
}
}
@Test(timeout = 15000)
public void testCommandProcessingThread() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
List<DataNode> datanodes = cluster.getDataNodes();
assertEquals(datanodes.size(), 1);
DataNode datanode = datanodes.get(0);
// Try to write file and trigger NN send back command to DataNode.
FileSystem fs = cluster.getFileSystem();
Path file = new Path("/test");
DFSTestUtil.createFile(fs, file, 10240L, (short)1, 0L);
MetricsRecordBuilder mrb = getMetrics(datanode.getMetrics().name());
assertTrue("Process command nums is not expected.",
getLongCounter("NumProcessedCommands", mrb) > 0);
assertEquals(0, getLongCounter("SumOfActorCommandQueueLength", mrb));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}