HDFS-14311. Multi-threading conflict at layoutVersion when loading block pool storage. Contributed by Yicong Cai.

This commit is contained in:
Wei-Chiu Chuang 2019-08-20 10:08:55 -07:00
parent aa6995fde2
commit 4cb22cd867
1 changed files with 14 additions and 6 deletions

View File

@ -447,22 +447,23 @@ public class DataStorage extends Storage {
StartupOption startOpt, ExecutorService executor) throws IOException { StartupOption startOpt, ExecutorService executor) throws IOException {
final String bpid = nsInfo.getBlockPoolID(); final String bpid = nsInfo.getBlockPoolID();
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo); final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
Map<StorageLocation, List<Callable<StorageDirectory>>> upgradeCallableMap =
new HashMap<>();
final List<StorageDirectory> success = Lists.newArrayList(); final List<StorageDirectory> success = Lists.newArrayList();
final List<UpgradeTask> tasks = Lists.newArrayList(); final List<UpgradeTask> tasks = Lists.newArrayList();
for (StorageLocation dataDir : dataDirs) { for (StorageLocation dataDir : dataDirs) {
dataDir.makeBlockPoolDir(bpid, null); dataDir.makeBlockPoolDir(bpid, null);
try { try {
final List<Callable<StorageDirectory>> callables = Lists.newArrayList(); final List<Callable<StorageDirectory>> sdCallables =
Lists.newArrayList();
final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead( final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
nsInfo, dataDir, startOpt, callables, datanode.getConf()); nsInfo, dataDir, startOpt, sdCallables, datanode.getConf());
if (callables.isEmpty()) { if (sdCallables.isEmpty()) {
for(StorageDirectory sd : dirs) { for(StorageDirectory sd : dirs) {
success.add(sd); success.add(sd);
} }
} else { } else {
for(Callable<StorageDirectory> c : callables) { upgradeCallableMap.put(dataDir, sdCallables);
tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
}
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to add storage directory {} for block pool {}", LOG.warn("Failed to add storage directory {} for block pool {}",
@ -470,6 +471,13 @@ public class DataStorage extends Storage {
} }
} }
for (Map.Entry<StorageLocation, List<Callable<StorageDirectory>>> entry :
upgradeCallableMap.entrySet()) {
for(Callable<StorageDirectory> c : entry.getValue()) {
tasks.add(new UpgradeTask(entry.getKey(), executor.submit(c)));
}
}
if (!tasks.isEmpty()) { if (!tasks.isEmpty()) {
LOG.info("loadBlockPoolSliceStorage: {} upgrade tasks", tasks.size()); LOG.info("loadBlockPoolSliceStorage: {} upgrade tasks", tasks.size());
for(UpgradeTask t : tasks) { for(UpgradeTask t : tasks) {