HDFS-15221. Add checking of effective filesystem during initializing storage locations. Contributed by Yang Yun.
This commit is contained in:
parent
7dda804a1a
commit
ad40715690
|
@ -24,6 +24,7 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -100,4 +101,20 @@ public enum StorageType {
|
||||||
}
|
}
|
||||||
return nonTransientTypes;
|
return nonTransientTypes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The configuration header for different StorageType.
|
||||||
|
public static final String CONF_KEY_HEADER =
|
||||||
|
"dfs.datanode.storagetype.";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the configured values for different StorageType.
|
||||||
|
* @param conf - absolute or fully qualified path
|
||||||
|
* @param t - the StorageType
|
||||||
|
* @param name - the sub-name of key
|
||||||
|
* @return the file system of the path
|
||||||
|
*/
|
||||||
|
public static String getConf(Configuration conf,
|
||||||
|
StorageType t, String name) {
|
||||||
|
return conf.get(CONF_KEY_HEADER + t.toString() + "." + name);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -52,6 +52,7 @@ import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStag
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.fs.DF;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
|
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
|
@ -59,6 +60,7 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -116,7 +118,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
|
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
|
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
|
||||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
import org.apache.hadoop.util.*;
|
||||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.net.DomainPeerServer;
|
import org.apache.hadoop.hdfs.net.DomainPeerServer;
|
||||||
|
@ -209,15 +211,7 @@ import org.apache.hadoop.tracing.TraceAdminProtocolPB;
|
||||||
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.tracing.TraceUtils;
|
import org.apache.hadoop.tracing.TraceUtils;
|
||||||
import org.apache.hadoop.tracing.TracerConfigurationManager;
|
import org.apache.hadoop.tracing.TracerConfigurationManager;
|
||||||
import org.apache.hadoop.util.Daemon;
|
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||||
import org.apache.hadoop.util.GenericOptionsParser;
|
|
||||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
|
||||||
import org.apache.hadoop.util.ServicePlugin;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
import org.apache.hadoop.util.Time;
|
|
||||||
import org.apache.hadoop.util.Timer;
|
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
|
||||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
import org.apache.htrace.core.Tracer;
|
import org.apache.htrace.core.Tracer;
|
||||||
import org.eclipse.jetty.util.ajax.JSON;
|
import org.eclipse.jetty.util.ajax.JSON;
|
||||||
|
@ -2774,6 +2768,51 @@ public class DataNode extends ReconfigurableBase
|
||||||
return makeInstance(dataLocations, conf, resources);
|
return makeInstance(dataLocations, conf, resources);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the effective file system where the path is located.
|
||||||
|
* DF is a packaged cross-platform class, it can get volumes
|
||||||
|
* information from current system.
|
||||||
|
* @param path - absolute or fully qualified path
|
||||||
|
* @param conf - the Configuration
|
||||||
|
* @return the effective filesystem of the path
|
||||||
|
*/
|
||||||
|
private static String getEffectiveFileSystem(
|
||||||
|
String path, Configuration conf) {
|
||||||
|
try {
|
||||||
|
DF df = new DF(new File(path), conf);
|
||||||
|
return df.getFilesystem();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Failed to get filesystem for dir {}", path, ioe);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sometimes we mount different disks for different storage types
|
||||||
|
* as the storage location. It's important to check the volume is
|
||||||
|
* mounted rightly before initializing storage locations.
|
||||||
|
* @param conf - Configuration
|
||||||
|
* @param location - Storage location
|
||||||
|
* @return false if the filesystem of location is configured and mismatch
|
||||||
|
* with effective filesystem.
|
||||||
|
*/
|
||||||
|
private static boolean checkFileSystemWithConfigured(
|
||||||
|
Configuration conf, StorageLocation location) {
|
||||||
|
String configFs = StorageType.getConf(
|
||||||
|
conf, location.getStorageType(), "filesystem");
|
||||||
|
if (configFs != null && !configFs.isEmpty()) {
|
||||||
|
String effectiveFs = getEffectiveFileSystem(
|
||||||
|
location.getUri().getPath(), conf);
|
||||||
|
if (effectiveFs == null || !effectiveFs.equals(configFs)) {
|
||||||
|
LOG.error("Filesystem mismatch for storage location {}. " +
|
||||||
|
"Configured is {}, effective is {}.",
|
||||||
|
location.getUri(), configFs, effectiveFs);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
public static List<StorageLocation> getStorageLocations(Configuration conf) {
|
public static List<StorageLocation> getStorageLocations(Configuration conf) {
|
||||||
Collection<String> rawLocations =
|
Collection<String> rawLocations =
|
||||||
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
|
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
|
||||||
|
@ -2790,8 +2829,9 @@ public class DataNode extends ReconfigurableBase
|
||||||
// Ignore the exception.
|
// Ignore the exception.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if(checkFileSystemWithConfigured(conf, location)) {
|
||||||
locations.add(location);
|
locations.add(location);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return locations;
|
return locations;
|
||||||
|
|
|
@ -103,6 +103,9 @@ The effective storage policy can be retrieved by the "[`storagepolicies -getStor
|
||||||
|
|
||||||
The default storage type of a datanode storage location will be DISK if it does not have a storage type tagged explicitly.
|
The default storage type of a datanode storage location will be DISK if it does not have a storage type tagged explicitly.
|
||||||
|
|
||||||
|
Sometimes, users can setup the DataNode data directory to point to multiple volumes with different storage types. It is important to check if the volume is mounted correctly before initializing the storage locations. The user has the option to enforce the filesystem for a storage key with the following key:
|
||||||
|
**dfs.datanode.storagetype.*.filesystem** - replace the '*' to any storage type, for example, dfs.datanode.storagetype.ARCHIVE.filesystem=fuse_filesystem
|
||||||
|
|
||||||
Storage Policy Based Data Movement
|
Storage Policy Based Data Movement
|
||||||
----------------------------------
|
----------------------------------
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,10 @@ import java.io.*;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.DF;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
|
import org.junit.AssumptionViolatedException;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
|
@ -95,4 +98,34 @@ public class TestDataDirs {
|
||||||
assertThat(locations.get(1).getStorageType(), is(StorageType.DISK));
|
assertThat(locations.get(1).getStorageType(), is(StorageType.DISK));
|
||||||
assertThat(locations.get(1).getUri(), is(dir1.toURI()));
|
assertThat(locations.get(1).getUri(), is(dir1.toURI()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDataDirFileSystem() throws Exception {
|
||||||
|
if (Shell.MAC) {
|
||||||
|
throw new AssumptionViolatedException("Not supported on MAC OS");
|
||||||
|
}
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
String archiveDir = "/home";
|
||||||
|
String location = "[DISK]/dir1,[ARCHIVE]" + archiveDir;
|
||||||
|
conf.set(DFS_DATANODE_DATA_DIR_KEY, location);
|
||||||
|
|
||||||
|
// NO any filesystem is set, should do as before
|
||||||
|
List<StorageLocation> locations = DataNode.getStorageLocations(conf);
|
||||||
|
assertEquals(2, locations.size());
|
||||||
|
|
||||||
|
// Set the filesystem of archive as NOT existing filesystem
|
||||||
|
// the archive directory should not be added.
|
||||||
|
conf.set("dfs.datanode.storagetype.ARCHIVE.filesystem",
|
||||||
|
"nothis_filesystem");
|
||||||
|
locations = DataNode.getStorageLocations(conf);
|
||||||
|
assertEquals(1, locations.size());
|
||||||
|
|
||||||
|
// Set the filesystem of archive as right filesystem
|
||||||
|
// the archive directory should be added.
|
||||||
|
DF df = new DF(new File(archiveDir), conf);
|
||||||
|
String fsInfo = df.getFilesystem();
|
||||||
|
conf.set("dfs.datanode.storagetype.ARCHIVE.filesystem", fsInfo);
|
||||||
|
locations = DataNode.getStorageLocations(conf);
|
||||||
|
assertEquals(2, locations.size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue