HDFS-15068. DataNode could meet deadlock if invoke refreshVolumes when register. Contributed by Aiphago.

Signed-off-by: Masatake Iwasaki <iwasakims@apache.org>
(cherry picked from commit 037ec8cfb1)
This commit is contained in:
Masatake Iwasaki 2020-01-04 01:55:27 +09:00 committed by Wei-Chiu Chuang
parent 4af188587f
commit abee7402e4
4 changed files with 133 additions and 79 deletions

View File

@ -419,7 +419,7 @@ class BPOfferService {
reg.getStorageInfo().getClusterID(), "cluster ID"); reg.getStorageInfo().getClusterID(), "cluster ID");
} }
bpRegistration = reg; bpRegistration = reg;
DataNodeFaultInjector.get().delayWhenOfferServiceHoldLock();
dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
// Add the initial block token secret keys to the DN's secret manager. // Add the initial block token secret keys to the DN's secret manager.
if (dn.isBlockTokenEnabled) { if (dn.isBlockTokenEnabled) {

View File

@ -745,90 +745,93 @@ public class DataNode extends ReconfigurableBase
* @throws IOException on error. If an IOException is thrown, some new volumes * @throws IOException on error. If an IOException is thrown, some new volumes
* may have been successfully added and removed. * may have been successfully added and removed.
*/ */
private synchronized void refreshVolumes(String newVolumes) throws IOException { private void refreshVolumes(String newVolumes) throws IOException {
Configuration conf = getConf(); // Add volumes for each Namespace
conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes); final List<NamespaceInfo> nsInfos = Lists.newArrayList();
ExecutorService service = null; for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
int numOldDataDirs = dataDirs.size(); nsInfos.add(bpos.getNamespaceInfo());
ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes);
StringBuilder errorMessageBuilder = new StringBuilder();
List<String> effectiveVolumes = Lists.newArrayList();
for (StorageLocation sl : changedVolumes.unchangedLocations) {
effectiveVolumes.add(sl.toString());
} }
synchronized(this) {
try { Configuration conf = getConf();
if (numOldDataDirs + getFSDataset().getNumFailedVolumes() conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
+ changedVolumes.newLocations.size() ExecutorService service = null;
- changedVolumes.deactivateLocations.size() <= 0) { int numOldDataDirs = dataDirs.size();
throw new IOException("Attempt to remove all volumes."); ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes);
} StringBuilder errorMessageBuilder = new StringBuilder();
if (!changedVolumes.newLocations.isEmpty()) { List<String> effectiveVolumes = Lists.newArrayList();
LOG.info("Adding new volumes: {}", for (StorageLocation sl : changedVolumes.unchangedLocations) {
Joiner.on(",").join(changedVolumes.newLocations)); effectiveVolumes.add(sl.toString());
// Add volumes for each Namespace
final List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
nsInfos.add(bpos.getNamespaceInfo());
}
service = Executors
.newFixedThreadPool(changedVolumes.newLocations.size());
List<Future<IOException>> exceptions = Lists.newArrayList();
for (final StorageLocation location : changedVolumes.newLocations) {
exceptions.add(service.submit(new Callable<IOException>() {
@Override
public IOException call() {
try {
data.addVolume(location, nsInfos);
} catch (IOException e) {
return e;
}
return null;
}
}));
}
for (int i = 0; i < changedVolumes.newLocations.size(); i++) {
StorageLocation volume = changedVolumes.newLocations.get(i);
Future<IOException> ioExceptionFuture = exceptions.get(i);
try {
IOException ioe = ioExceptionFuture.get();
if (ioe != null) {
errorMessageBuilder.append(
String.format("FAILED TO ADD: %s: %s%n",
volume, ioe.getMessage()));
LOG.error("Failed to add volume: {}", volume, ioe);
} else {
effectiveVolumes.add(volume.toString());
LOG.info("Successfully added volume: {}", volume);
}
} catch (Exception e) {
errorMessageBuilder.append(
String.format("FAILED to ADD: %s: %s%n", volume,
e.toString()));
LOG.error("Failed to add volume: {}", volume, e);
}
}
} }
try { try {
removeVolumes(changedVolumes.deactivateLocations); if (numOldDataDirs + getFSDataset().getNumFailedVolumes()
} catch (IOException e) { + changedVolumes.newLocations.size()
errorMessageBuilder.append(e.getMessage()); - changedVolumes.deactivateLocations.size() <= 0) {
LOG.error("Failed to remove volume", e); throw new IOException("Attempt to remove all volumes.");
} }
if (!changedVolumes.newLocations.isEmpty()) {
LOG.info("Adding new volumes: {}",
Joiner.on(",").join(changedVolumes.newLocations));
if (errorMessageBuilder.length() > 0) { service = Executors
throw new IOException(errorMessageBuilder.toString()); .newFixedThreadPool(changedVolumes.newLocations.size());
List<Future<IOException>> exceptions = Lists.newArrayList();
for (final StorageLocation location : changedVolumes.newLocations) {
exceptions.add(service.submit(new Callable<IOException>() {
@Override
public IOException call() {
try {
data.addVolume(location, nsInfos);
} catch (IOException e) {
return e;
}
return null;
}
}));
}
for (int i = 0; i < changedVolumes.newLocations.size(); i++) {
StorageLocation volume = changedVolumes.newLocations.get(i);
Future<IOException> ioExceptionFuture = exceptions.get(i);
try {
IOException ioe = ioExceptionFuture.get();
if (ioe != null) {
errorMessageBuilder.append(
String.format("FAILED TO ADD: %s: %s%n",
volume, ioe.getMessage()));
LOG.error("Failed to add volume: {}", volume, ioe);
} else {
effectiveVolumes.add(volume.toString());
LOG.info("Successfully added volume: {}", volume);
}
} catch (Exception e) {
errorMessageBuilder.append(
String.format("FAILED to ADD: %s: %s%n", volume,
e.toString()));
LOG.error("Failed to add volume: {}", volume, e);
}
}
}
try {
removeVolumes(changedVolumes.deactivateLocations);
} catch (IOException e) {
errorMessageBuilder.append(e.getMessage());
LOG.error("Failed to remove volume", e);
}
if (errorMessageBuilder.length() > 0) {
throw new IOException(errorMessageBuilder.toString());
}
} finally {
if (service != null) {
service.shutdown();
}
conf.set(DFS_DATANODE_DATA_DIR_KEY,
Joiner.on(",").join(effectiveVolumes));
dataDirs = getStorageLocations(conf);
} }
} finally {
if (service != null) {
service.shutdown();
}
conf.set(DFS_DATANODE_DATA_DIR_KEY,
Joiner.on(",").join(effectiveVolumes));
dataDirs = getStorageLocations(conf);
} }
} }

View File

@ -95,4 +95,9 @@ public class DataNodeFaultInjector {
* process. * process.
*/ */
public void stripedBlockReconstruction() throws IOException {} public void stripedBlockReconstruction() throws IOException {}
/**
* Used as a hook to inject intercept when BPOfferService hold lock.
*/
public void delayWhenOfferServiceHoldLock() {}
} }

View File

@ -36,6 +36,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -72,6 +73,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -404,6 +406,50 @@ public class TestDataNodeVolumeFailure {
assertTrue(dn0.shouldRun()); assertTrue(dn0.shouldRun());
} }
/**
* Test {@link DataNode#refreshVolumes(String)} not deadLock with
* {@link BPOfferService#registrationSucceeded(BPServiceActor,
* DatanodeRegistration)}.
*/
@Test(timeout=10000)
public void testRefreshDeadLock() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
public void delayWhenOfferServiceHoldLock() {
try {
latch.await();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
DataNode dn = cluster.getDataNodes().get(0);
File volume = cluster.getInstanceStorageDir(0, 0);
String dataDirs = volume.getPath();
List<BPOfferService> allBpOs = dn.getAllBpOs();
BPOfferService service = allBpOs.get(0);
BPServiceActor actor = service.getBPServiceActors().get(0);
DatanodeRegistration bpRegistration = actor.getBpRegistration();
Thread register = new Thread(() -> {
try {
service.registrationSucceeded(actor, bpRegistration);
} catch (IOException e) {
e.printStackTrace();
}
});
register.start();
String newdir = dataDirs + "tmp";
// Make sure service have get writelock
latch.countDown();
String result = dn.reconfigurePropertyImpl(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newdir);
assertNotNull(result);
}
/** /**
* Test changing the number of volumes does not impact the disk failure * Test changing the number of volumes does not impact the disk failure
* tolerance. * tolerance.