SOLR-11473: Make HDFSDirectoryFactory support other prefixes (besides hdfs:/)

Signed-off-by: Kevin Risden <krisden@apache.org>
This commit is contained in:
Kevin Risden 2019-03-29 17:56:31 -04:00
parent 03a3562f78
commit ae95487db6
No known key found for this signature in database
GPG Key ID: 040FAE3292C5F73F
6 changed files with 73 additions and 35 deletions

View File

@ -72,6 +72,8 @@ New Features
a back-compat check of the .system collection to notify users of potential compatibility issues after
upgrades or schema changes. (ab)
* SOLR-11473: Make HDFSDirectoryFactory support other prefixes (besides hdfs:/) (Kevin Risden)
Bug Fixes
----------------------
@ -116,8 +118,7 @@ Bug Fixes
* SOLR-13351: Workaround for VELOCITY-908 (Kevin Risden)
* SOLR-13349: High CPU usage in Solr due to Java 8 bug (Erick Erickson)
* SOLR-13349: High CPU usage in Solr due to Java 8 bug (Erick Erickson)
Improvements
----------------------

View File

@ -192,7 +192,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
protected Directory create(String path, LockFactory lockFactory, DirContext dirContext) throws IOException {
assert params != null : "init must be called before create";
log.info("creating directory factory for path {}", path);
Configuration conf = getConf();
Configuration conf = getConf(new Path(path));
if (metrics == null) {
metrics = MetricsHolder.metrics;
@ -333,16 +333,21 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
}
}
public Configuration getConf() {
public Configuration getConf(Path path) {
Configuration conf = new Configuration();
confDir = getConfig(CONFIG_DIRECTORY, null);
HdfsUtil.addHdfsResources(conf, confDir);
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
if(path != null) {
String fsScheme = path.toUri().getScheme();
if(fsScheme != null) {
conf.setBoolean("fs." + fsScheme + ".impl.disable.cache", true);
}
}
return conf;
}
protected synchronized void removeDirectory(final CacheValue cacheValue)
throws IOException {
protected synchronized void removeDirectory(final CacheValue cacheValue) {
FileSystem fileSystem = getCachedFileSystem(cacheValue.path);
try {
@ -359,7 +364,10 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
@Override
public boolean isAbsolute(String path) {
return path.startsWith("hdfs:/");
if(path.startsWith("/")) {
return false;
}
return new Path(path).isAbsolute();
}
@Override
@ -435,10 +443,11 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
}
}
private FileSystem getCachedFileSystem(String path) {
private FileSystem getCachedFileSystem(String pathStr) {
try {
// no need to close the fs, the cache will do it
return tmpFsCache.get(path, () -> FileSystem.get(new Path(path).toUri(), getConf()));
Path path = new Path(pathStr);
return tmpFsCache.get(pathStr, () -> FileSystem.get(path.toUri(), getConf(path)));
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
@ -462,7 +471,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
synchronized (HdfsDirectoryFactory.class) {
if (kerberosInit == null) {
kerberosInit = Boolean.TRUE;
final Configuration conf = getConf();
final Configuration conf = getConf(null);
final String authVal = conf.get(HADOOP_SECURITY_AUTHENTICATION);
final String kerberos = "kerberos";
if (authVal != null && !authVal.equals(kerberos)) {
@ -471,7 +480,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
+ " connect to HDFS via kerberos");
}
// let's avoid modifying the supplied configuration, just to be conservative
final Configuration ugiConf = new Configuration(getConf());
final Configuration ugiConf = new Configuration(getConf(null));
ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, kerberos);
UserGroupInformation.setConfiguration(ugiConf);
log.info(
@ -585,8 +594,8 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
// perform an atomic rename if possible
public void renameWithOverwrite(Directory dir, String fileName, String toName) throws IOException {
String hdfsDirPath = getPath(dir);
FileContext fileContext = FileContext.getFileContext(getConf());
fileContext.rename(new Path(hdfsDirPath + "/" + fileName), new Path(hdfsDirPath + "/" + toName), Options.Rename.OVERWRITE);
FileContext fileContext = FileContext.getFileContext(getConf(new Path(hdfsDirPath)));
fileContext.rename(new Path(hdfsDirPath, fileName), new Path(hdfsDirPath, toName), Options.Rename.OVERWRITE);
}
@Override
@ -600,7 +609,7 @@ public class HdfsDirectoryFactory extends CachingDirectoryFactory implements Sol
Path dir2 = ((HdfsDirectory) baseToDir).getHdfsDirPath();
Path file1 = new Path(dir1, fileName);
Path file2 = new Path(dir2, fileName);
FileContext fileContext = FileContext.getFileContext(getConf());
FileContext fileContext = FileContext.getFileContext(getConf(dir1));
fileContext.rename(file1, file2);
return;
}

View File

@ -65,18 +65,6 @@ public class HdfsBackupRepository implements BackupRepository {
}
}
// We don't really need this factory instance. But we want to initialize it here to
// make sure that all HDFS related initialization is at one place (and not duplicated here).
factory = new HdfsDirectoryFactory();
factory.init(args);
this.hdfsConfig = factory.getConf();
// Configure the umask mode if specified.
if (args.get(HDFS_UMASK_MODE_PARAM) != null) {
String umaskVal = (String)args.get(HDFS_UMASK_MODE_PARAM);
this.hdfsConfig.set(FsPermission.UMASK_LABEL, umaskVal);
}
String hdfsSolrHome = (String) Objects.requireNonNull(args.get(HdfsDirectoryFactory.HDFS_HOME),
"Please specify " + HdfsDirectoryFactory.HDFS_HOME + " property.");
Path path = new Path(hdfsSolrHome);
@ -85,6 +73,18 @@ public class HdfsBackupRepository implements BackupRepository {
path = path.getParent();
}
// We don't really need this factory instance. But we want to initialize it here to
// make sure that all HDFS related initialization is at one place (and not duplicated here).
factory = new HdfsDirectoryFactory();
factory.init(args);
this.hdfsConfig = factory.getConf(new Path(hdfsSolrHome));
// Configure the umask mode if specified.
if (args.get(HDFS_UMASK_MODE_PARAM) != null) {
String umaskVal = (String)args.get(HDFS_UMASK_MODE_PARAM);
this.hdfsConfig.set(FsPermission.UMASK_LABEL, umaskVal);
}
try {
this.fileSystem = FileSystem.get(this.baseHdfsPath.toUri(), this.hdfsConfig);
} catch (IOException e) {

View File

@ -52,13 +52,14 @@ public class CheckHdfsIndex {
System.out.println("\nIgnoring specified -dir-impl, instead using " + HdfsDirectory.class.getSimpleName());
}
System.out.println("\nOpening index @ " + opts.getIndexPath() + "\n");
Path indexPath = new Path(opts.getIndexPath());
System.out.println("\nOpening index @ " + indexPath + "\n");
Directory directory;
try {
directory = new HdfsDirectory(new Path(opts.getIndexPath()), getConf());
directory = new HdfsDirectory(indexPath, getConf(indexPath));
} catch (IOException e) {
System.out.println("ERROR: could not open hdfs directory \"" + opts.getIndexPath() + "\"; exiting");
System.out.println("ERROR: could not open hdfs directory \"" + indexPath + "\"; exiting");
e.printStackTrace(System.out);
return 1;
}
@ -69,11 +70,15 @@ public class CheckHdfsIndex {
}
}
private static Configuration getConf() {
private static Configuration getConf(Path path) {
Configuration conf = new Configuration();
String confDir = System.getProperty(HdfsDirectoryFactory.CONFIG_DIRECTORY);
HdfsUtil.addHdfsResources(conf, confDir);
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
String fsScheme = path.toUri().getScheme();
if(fsScheme != null) {
conf.setBoolean("fs." + fsScheme + ".impl.disable.cache", true);
}
return conf;
}
}

View File

@ -75,12 +75,16 @@ public class HdfsUpdateLog extends UpdateLog {
log.info("Initializing HdfsUpdateLog: tlogDfsReplication={}", tlogDfsReplication);
}
private Configuration getConf() {
private Configuration getConf(Path path) {
Configuration conf = new Configuration();
if (confDir != null) {
HdfsUtil.addHdfsResources(conf, confDir);
}
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
String fsScheme = path.toUri().getScheme();
if(fsScheme != null) {
conf.setBoolean("fs." + fsScheme + ".impl.disable.cache", true);
}
return conf;
}
@ -112,7 +116,8 @@ public class HdfsUpdateLog extends UpdateLog {
}
try {
fs = FileSystem.get(new Path(dataDir).toUri(), getConf());
Path dataDirPath = new Path(dataDir);
fs = FileSystem.get(dataDirPath.toUri(), getConf(dataDirPath));
} catch (IOException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}

View File

@ -20,12 +20,15 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import com.google.common.base.Strings;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.lucene.store.Directory;
@ -232,4 +235,19 @@ public class HdfsDirectoryFactoryTest extends SolrTestCaseJ4 {
}
}
}
@Test
public void testIsAbsolute() throws Exception {
try(HdfsDirectoryFactory hdfsFactory = new HdfsDirectoryFactory()) {
String relativePath = Strings.repeat(
RandomStrings.randomAsciiAlphanumOfLength(random(), random().nextInt(10) + 1) + '/',
random().nextInt(5) + 1);
assertFalse(hdfsFactory.isAbsolute(relativePath));
assertFalse(hdfsFactory.isAbsolute("/" + relativePath));
for(String rootPrefix : Arrays.asList("file://", "hdfs://", "s3a://", "foo://")) {
assertTrue(hdfsFactory.isAbsolute(rootPrefix + relativePath));
}
}
}
}