HDFS-8578. On upgrade, Datanode should process all storage/data dirs in parallel. Contributed by vinayakumarb and szetszwo

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-02-22 15:01:15 -08:00
parent 79027309c9
commit 1b139593c5
4 changed files with 161 additions and 48 deletions

View File

@ -33,6 +33,9 @@ Release 2.7.3 - UNRELEASED
HDFS-4946. Allow preferLocalNode in BlockPlacementPolicyDefault to be HDFS-4946. Allow preferLocalNode in BlockPlacementPolicyDefault to be
configurable (James Kinley and Nathan Roberts via kihwal) configurable (James Kinley and Nathan Roberts via kihwal)
HDFS-8578. On upgrade, Datanode should process all storage/data dirs in
parallel. (vinayakumarb and szetszwo via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8845. DiskChecker should not traverse the entire tree (Chang Li via HDFS-8845. DiskChecker should not traverse the entire tree (Chang Li via

View File

@ -767,6 +767,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.slow.io.warning.threshold.ms"; "dfs.datanode.slow.io.warning.threshold.ms";
public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300; public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
// Number of parallel threads to load multiple datanode volumes
public static final String DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY =
"dfs.datanode.parallel.volumes.load.threads.num";
public static final String DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY = public static final String DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY =
"dfs.namenode.inotify.max.events.per.rpc"; "dfs.namenode.inotify.max.events.per.rpc";
public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT = public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT =

View File

@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -145,7 +146,8 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException * @throws IOException
*/ */
private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo, private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
File dataDir, StartupOption startOpt, Configuration conf) File dataDir, StartupOption startOpt,
List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException { throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, true); StorageDirectory sd = new StorageDirectory(dataDir, null, true);
try { try {
@ -172,19 +174,17 @@ public class BlockPoolSliceStorage extends Storage {
// Each storage directory is treated individually. // Each storage directory is treated individually.
// During startup some of them can upgrade or roll back // During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup. // while others could be up-to-date for the regular startup.
if (doTransition(sd, nsInfo, startOpt, conf)) { if (!doTransition(sd, nsInfo, startOpt, callables, conf)) {
return sd;
}
if (getCTime() != nsInfo.getCTime()) { // 3. Check CTime and update successfully loaded storage.
throw new IOException("Datanode CTime (=" + getCTime() if (getCTime() != nsInfo.getCTime()) {
+ ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")"); throw new IOException("Datanode CTime (=" + getCTime()
+ ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
}
setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd);
} }
// 3. Update successfully loaded storage.
setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd);
return sd; return sd;
} catch (IOException ioe) { } catch (IOException ioe) {
sd.unlock(); sd.unlock();
@ -208,7 +208,8 @@ public class BlockPoolSliceStorage extends Storage {
*/ */
List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo, List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt, Collection<File> dataDirs, StartupOption startOpt,
Configuration conf) throws IOException { List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException {
List<StorageDirectory> succeedDirs = Lists.newArrayList(); List<StorageDirectory> succeedDirs = Lists.newArrayList();
try { try {
for (File dataDir : dataDirs) { for (File dataDir : dataDirs) {
@ -218,7 +219,7 @@ public class BlockPoolSliceStorage extends Storage {
"attempt to load an used block storage: " + dataDir); "attempt to load an used block storage: " + dataDir);
} }
final StorageDirectory sd = loadStorageDirectory( final StorageDirectory sd = loadStorageDirectory(
nsInfo, dataDir, startOpt, conf); nsInfo, dataDir, startOpt, callables, conf);
succeedDirs.add(sd); succeedDirs.add(sd);
} }
} catch (IOException e) { } catch (IOException e) {
@ -242,11 +243,12 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException on error * @throws IOException on error
*/ */
List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo, List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt, Configuration conf) Collection<File> dataDirs, StartupOption startOpt,
List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException { throws IOException {
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID()); LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
final List<StorageDirectory> loaded = loadBpStorageDirectories( final List<StorageDirectory> loaded = loadBpStorageDirectories(
nsInfo, dataDirs, startOpt, conf); nsInfo, dataDirs, startOpt, callables, conf);
for (StorageDirectory sd : loaded) { for (StorageDirectory sd : loaded) {
addStorageDir(sd); addStorageDir(sd);
} }
@ -353,7 +355,8 @@ public class BlockPoolSliceStorage extends Storage {
* @return true if the new properties has been written. * @return true if the new properties has been written.
*/ */
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo, private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
StartupOption startOpt, Configuration conf) throws IOException { StartupOption startOpt, List<Callable<StorageDirectory>> callables,
Configuration conf) throws IOException {
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) { if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
Preconditions.checkState(!getTrashRootDir(sd).exists(), Preconditions.checkState(!getTrashRootDir(sd).exists(),
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " + sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
@ -395,7 +398,7 @@ public class BlockPoolSliceStorage extends Storage {
} }
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) { || this.cTime < nsInfo.getCTime()) {
doUpgrade(sd, nsInfo, conf); // upgrade doUpgrade(sd, nsInfo, callables, conf); // upgrade
return true; return true;
} }
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
@ -425,7 +428,9 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException on error * @throws IOException on error
*/ */
private void doUpgrade(final StorageDirectory bpSd, private void doUpgrade(final StorageDirectory bpSd,
final NamespaceInfo nsInfo, final Configuration conf) throws IOException { final NamespaceInfo nsInfo,
final List<Callable<StorageDirectory>> callables,
final Configuration conf) throws IOException {
// Upgrading is applicable only to release with federation or after // Upgrading is applicable only to release with federation or after
if (!DataNodeLayoutVersion.supports( if (!DataNodeLayoutVersion.supports(
LayoutVersion.Feature.FEDERATION, layoutVersion)) { LayoutVersion.Feature.FEDERATION, layoutVersion)) {
@ -463,10 +468,21 @@ public class BlockPoolSliceStorage extends Storage {
rename(bpCurDir, bpTmpDir); rename(bpCurDir, bpTmpDir);
final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot(); final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot();
doUgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf); if (callables == null) {
doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
} else {
callables.add(new Callable<StorageDirectory>() {
@Override
public StorageDirectory call() throws Exception {
doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV,
conf);
return bpSd;
}
});
}
} }
private void doUgrade(String name, final StorageDirectory bpSd, private void doUpgrade(String name, final StorageDirectory bpSd,
NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir, NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir,
final File bpCurDir, final int oldLV, Configuration conf) final File bpCurDir, final int oldLV, Configuration conf)
throws IOException { throws IOException {

View File

@ -38,10 +38,12 @@ import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -52,6 +54,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -267,8 +270,8 @@ public class DataStorage extends Storage {
} }
private StorageDirectory loadStorageDirectory(DataNode datanode, private StorageDirectory loadStorageDirectory(DataNode datanode,
NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) NamespaceInfo nsInfo, File dataDir, StartupOption startOpt,
throws IOException { List<Callable<StorageDirectory>> callables) throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, false); StorageDirectory sd = new StorageDirectory(dataDir, null, false);
try { try {
StorageState curState = sd.analyzeStorage(startOpt, this); StorageState curState = sd.analyzeStorage(startOpt, this);
@ -294,13 +297,12 @@ public class DataStorage extends Storage {
// Each storage directory is treated individually. // Each storage directory is treated individually.
// During startup some of them can upgrade or roll back // During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup. // while others could be up-to-date for the regular startup.
if (doTransition(sd, nsInfo, startOpt, datanode.getConf())) { if (!doTransition(sd, nsInfo, startOpt, callables, datanode.getConf())) {
return sd;
}
// 3. Update successfully loaded storage. // 3. Update successfully loaded storage.
setServiceLayoutVersion(getServiceLayoutVersion()); setServiceLayoutVersion(getServiceLayoutVersion());
writeProperties(sd); writeProperties(sd);
}
return sd; return sd;
} catch (IOException ioe) { } catch (IOException ioe) {
@ -332,7 +334,7 @@ public class DataStorage extends Storage {
} }
StorageDirectory sd = loadStorageDirectory( StorageDirectory sd = loadStorageDirectory(
datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP); datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP, null);
VolumeBuilder builder = VolumeBuilder builder =
new VolumeBuilder(this, sd); new VolumeBuilder(this, sd);
for (NamespaceInfo nsInfo : nsInfos) { for (NamespaceInfo nsInfo : nsInfos) {
@ -343,12 +345,35 @@ public class DataStorage extends Storage {
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo); final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories( final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
nsInfo, bpDataDirs, StartupOption.HOTSWAP, datanode.getConf()); nsInfo, bpDataDirs, StartupOption.HOTSWAP, null, datanode.getConf());
builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs); builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
} }
return builder; return builder;
} }
static int getParallelVolumeLoadThreadsNum(int dataDirs, Configuration conf) {
final String key
= DFSConfigKeys.DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY;
final int n = conf.getInt(key, dataDirs);
if (n < 1) {
throw new HadoopIllegalArgumentException(key + " = " + n + " < 1");
}
final int min = Math.min(n, dataDirs);
LOG.info("Using " + min + " threads to upgrade data directories ("
+ key + "=" + n + ", dataDirs=" + dataDirs + ")");
return min;
}
static class UpgradeTask {
private final StorageLocation dataDir;
private final Future<StorageDirectory> future;
UpgradeTask(StorageLocation dataDir, Future<StorageDirectory> future) {
this.dataDir = dataDir;
this.future = future;
}
}
/** /**
* Add a list of volumes to be managed by DataStorage. If the volume is empty, * Add a list of volumes to be managed by DataStorage. If the volume is empty,
* format it, otherwise recover it from previous transitions if required. * format it, otherwise recover it from previous transitions if required.
@ -363,32 +388,62 @@ public class DataStorage extends Storage {
synchronized List<StorageDirectory> addStorageLocations(DataNode datanode, synchronized List<StorageDirectory> addStorageLocations(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt) throws IOException { StartupOption startOpt) throws IOException {
final List<StorageLocation> successLocations = loadDataStorage( final int numThreads = getParallelVolumeLoadThreadsNum(
datanode, nsInfo, dataDirs, startOpt); dataDirs.size(), datanode.getConf());
return loadBlockPoolSliceStorage( final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
datanode, nsInfo, successLocations, startOpt); try {
final List<StorageLocation> successLocations = loadDataStorage(
datanode, nsInfo, dataDirs, startOpt, executor);
return loadBlockPoolSliceStorage(
datanode, nsInfo, successLocations, startOpt, executor);
} finally {
executor.shutdown();
}
} }
private List<StorageLocation> loadDataStorage(DataNode datanode, private List<StorageLocation> loadDataStorage(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt) throws IOException { StartupOption startOpt, ExecutorService executor) throws IOException {
final List<StorageLocation> success = Lists.newArrayList(); final List<StorageLocation> success = Lists.newArrayList();
final List<UpgradeTask> tasks = Lists.newArrayList();
for (StorageLocation dataDir : dataDirs) { for (StorageLocation dataDir : dataDirs) {
File root = dataDir.getFile(); File root = dataDir.getFile();
if (!containsStorageDir(root)) { if (!containsStorageDir(root)) {
try { try {
// It first ensures the datanode level format is completed. // It first ensures the datanode level format is completed.
final List<Callable<StorageDirectory>> callables
= Lists.newArrayList();
final StorageDirectory sd = loadStorageDirectory( final StorageDirectory sd = loadStorageDirectory(
datanode, nsInfo, root, startOpt); datanode, nsInfo, root, startOpt, callables);
addStorageDir(sd); if (callables.isEmpty()) {
addStorageDir(sd);
success.add(dataDir);
} else {
for(Callable<StorageDirectory> c : callables) {
tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
}
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to add storage directory " + dataDir, e); LOG.warn("Failed to add storage directory " + dataDir, e);
continue;
} }
} else { } else {
LOG.info("Storage directory " + dataDir + " has already been used."); LOG.info("Storage directory " + dataDir + " has already been used.");
success.add(dataDir);
}
}
if (!tasks.isEmpty()) {
LOG.info("loadDataStorage: " + tasks.size() + " upgrade tasks");
for(UpgradeTask t : tasks) {
try {
addStorageDir(t.future.get());
success.add(t.dataDir);
} catch (ExecutionException e) {
LOG.warn("Failed to upgrade storage directory " + t.dataDir, e);
} catch (InterruptedException e) {
throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
}
} }
success.add(dataDir);
} }
return success; return success;
@ -396,10 +451,11 @@ public class DataStorage extends Storage {
private List<StorageDirectory> loadBlockPoolSliceStorage(DataNode datanode, private List<StorageDirectory> loadBlockPoolSliceStorage(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt) throws IOException { StartupOption startOpt, ExecutorService executor) throws IOException {
final String bpid = nsInfo.getBlockPoolID(); final String bpid = nsInfo.getBlockPoolID();
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo); final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
final List<StorageDirectory> success = Lists.newArrayList(); final List<StorageDirectory> success = Lists.newArrayList();
final List<UpgradeTask> tasks = Lists.newArrayList();
for (StorageLocation dataDir : dataDirs) { for (StorageLocation dataDir : dataDirs) {
final File curDir = new File(dataDir.getFile(), STORAGE_DIR_CURRENT); final File curDir = new File(dataDir.getFile(), STORAGE_DIR_CURRENT);
List<File> bpDataDirs = new ArrayList<File>(); List<File> bpDataDirs = new ArrayList<File>();
@ -407,10 +463,17 @@ public class DataStorage extends Storage {
try { try {
makeBlockPoolDataDir(bpDataDirs, null); makeBlockPoolDataDir(bpDataDirs, null);
final List<Callable<StorageDirectory>> callables = Lists.newArrayList();
final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead( final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
nsInfo, bpDataDirs, startOpt, datanode.getConf()); nsInfo, bpDataDirs, startOpt, callables, datanode.getConf());
for(StorageDirectory sd : dirs) { if (callables.isEmpty()) {
success.add(sd); for(StorageDirectory sd : dirs) {
success.add(sd);
}
} else {
for(Callable<StorageDirectory> c : callables) {
tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
}
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to add storage directory " + dataDir LOG.warn("Failed to add storage directory " + dataDir
@ -418,6 +481,20 @@ public class DataStorage extends Storage {
} }
} }
if (!tasks.isEmpty()) {
LOG.info("loadBlockPoolSliceStorage: " + tasks.size() + " upgrade tasks");
for(UpgradeTask t : tasks) {
try {
success.add(t.future.get());
} catch (ExecutionException e) {
LOG.warn("Failed to upgrade storage directory " + t.dataDir
+ " for block pool " + bpid, e);
} catch (InterruptedException e) {
throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
}
}
}
return success; return success;
} }
@ -668,7 +745,8 @@ public class DataStorage extends Storage {
* @return true if the new properties has been written. * @return true if the new properties has been written.
*/ */
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo, private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
StartupOption startOpt, Configuration conf) throws IOException { StartupOption startOpt, List<Callable<StorageDirectory>> callables,
Configuration conf) throws IOException {
if (startOpt == StartupOption.ROLLBACK) { if (startOpt == StartupOption.ROLLBACK) {
doRollback(sd, nsInfo); // rollback if applicable doRollback(sd, nsInfo); // rollback if applicable
} }
@ -710,7 +788,7 @@ public class DataStorage extends Storage {
// simply update the properties. // simply update the properties.
upgradeProperties(sd); upgradeProperties(sd);
} else { } else {
doUpgradePreFederation(sd, nsInfo, conf); doUpgradePreFederation(sd, nsInfo, callables, conf);
} }
return true; // doUgrade already has written properties return true; // doUgrade already has written properties
} }
@ -747,7 +825,9 @@ public class DataStorage extends Storage {
* @param sd storage directory * @param sd storage directory
*/ */
void doUpgradePreFederation(final StorageDirectory sd, void doUpgradePreFederation(final StorageDirectory sd,
final NamespaceInfo nsInfo, final Configuration conf) throws IOException { final NamespaceInfo nsInfo,
final List<Callable<StorageDirectory>> callables,
final Configuration conf) throws IOException {
final int oldLV = getLayoutVersion(); final int oldLV = getLayoutVersion();
LOG.info("Upgrading storage directory " + sd.getRoot() LOG.info("Upgrading storage directory " + sd.getRoot()
+ ".\n old LV = " + oldLV + ".\n old LV = " + oldLV
@ -780,10 +860,20 @@ public class DataStorage extends Storage {
bpStorage.format(curDir, nsInfo); bpStorage.format(curDir, nsInfo);
final File toDir = new File(curBpDir, STORAGE_DIR_CURRENT); final File toDir = new File(curBpDir, STORAGE_DIR_CURRENT);
doUgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf); if (callables == null) {
doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
} else {
callables.add(new Callable<StorageDirectory>() {
@Override
public StorageDirectory call() throws Exception {
doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
return sd;
}
});
}
} }
private void doUgrade(final StorageDirectory sd, private void doUpgrade(final StorageDirectory sd,
final NamespaceInfo nsInfo, final File prevDir, final NamespaceInfo nsInfo, final File prevDir,
final File tmpDir, final File bbwDir, final File toDir, final int oldLV, final File tmpDir, final File bbwDir, final File toDir, final int oldLV,
Configuration conf) throws IOException { Configuration conf) throws IOException {