From 77fd690c05441eeab5c686af07239c9f9170db64 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Tue, 20 Aug 2019 10:08:55 -0700 Subject: [PATCH] HDFS-14311. Multi-threading conflict at layoutVersion when loading block pool storage. Contributed by Yicong Cai. (cherry picked from commit fbe87eddbc30fe5191c008b496fb83e51ef4ee4a) --- .../hdfs/server/datanode/DataStorage.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index a85ae32fad4..2645e07ed94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -445,22 +445,23 @@ public class DataStorage extends Storage { StartupOption startOpt, ExecutorService executor) throws IOException { final String bpid = nsInfo.getBlockPoolID(); final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo); + Map>> upgradeCallableMap = + new HashMap<>(); final List success = Lists.newArrayList(); final List tasks = Lists.newArrayList(); for (StorageLocation dataDir : dataDirs) { dataDir.makeBlockPoolDir(bpid, null); try { - final List> callables = Lists.newArrayList(); + final List> sdCallables = + Lists.newArrayList(); final List dirs = bpStorage.recoverTransitionRead( - nsInfo, dataDir, startOpt, callables, datanode.getConf()); - if (callables.isEmpty()) { + nsInfo, dataDir, startOpt, sdCallables, datanode.getConf()); + if (sdCallables.isEmpty()) { for(StorageDirectory sd : dirs) { success.add(sd); } } else { - for(Callable c : callables) { - tasks.add(new UpgradeTask(dataDir, executor.submit(c))); - } + upgradeCallableMap.put(dataDir, sdCallables); } } catch (IOException e) { LOG.warn("Failed to add storage directory {} for block pool {}", @@ -468,6 +469,13 @@ public class DataStorage extends Storage { } } + for (Map.Entry>> entry : + upgradeCallableMap.entrySet()) { + for(Callable c : entry.getValue()) { + tasks.add(new UpgradeTask(entry.getKey(), executor.submit(c))); + } + } + if (!tasks.isEmpty()) { LOG.info("loadBlockPoolSliceStorage: {} upgrade tasks", tasks.size()); for(UpgradeTask t : tasks) {