From 4cb22cd867a9295efc815dc95525b5c3e5960ea6 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Tue, 20 Aug 2019 10:08:55 -0700 Subject: [PATCH] HDFS-14311. Multi-threading conflict at layoutVersion when loading block pool storage. Contributed by Yicong Cai. --- .../hdfs/server/datanode/DataStorage.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index a803c0b0b98..2447fd71372 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -447,22 +447,23 @@ public class DataStorage extends Storage { StartupOption startOpt, ExecutorService executor) throws IOException { final String bpid = nsInfo.getBlockPoolID(); final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo); + Map>> upgradeCallableMap = + new HashMap<>(); final List success = Lists.newArrayList(); final List tasks = Lists.newArrayList(); for (StorageLocation dataDir : dataDirs) { dataDir.makeBlockPoolDir(bpid, null); try { - final List> callables = Lists.newArrayList(); + final List> sdCallables = + Lists.newArrayList(); final List dirs = bpStorage.recoverTransitionRead( - nsInfo, dataDir, startOpt, callables, datanode.getConf()); - if (callables.isEmpty()) { + nsInfo, dataDir, startOpt, sdCallables, datanode.getConf()); + if (sdCallables.isEmpty()) { for(StorageDirectory sd : dirs) { success.add(sd); } } else { - for(Callable c : callables) { - tasks.add(new UpgradeTask(dataDir, executor.submit(c))); - } + upgradeCallableMap.put(dataDir, sdCallables); } } catch (IOException e) { LOG.warn("Failed to add storage directory {} for block pool {}", @@ -470,6 +471,13 @@ public class DataStorage extends Storage { } } + for (Map.Entry>> entry : + upgradeCallableMap.entrySet()) { + for(Callable c : entry.getValue()) { + tasks.add(new UpgradeTask(entry.getKey(), executor.submit(c))); + } + } + if (!tasks.isEmpty()) { LOG.info("loadBlockPoolSliceStorage: {} upgrade tasks", tasks.size()); for(UpgradeTask t : tasks) {