HDDS-1333. OzoneFileSystem can't work with spark/hadoop2.7 because incompatible security classes. Contributed by Elek, Marton. (#653)

(cherry picked from commit 7ec6a31eb3)
This commit is contained in:
Ajay Kumar 2019-04-05 09:05:59 -07:00
parent 260d843b25
commit 0356b3699d
19 changed files with 1520 additions and 1155 deletions

View File

@ -78,11 +78,13 @@ And create a custom `core-site.xml`:
<configuration>
<property>
<name>fs.o3fs.impl</name>
<value>org.apache.hadoop.fs.ozone.OzoneFileSystem</value>
<value>org.apache.hadoop.fs.ozone.BasicOzoneFileSystem</value>
</property>
</configuration>
```
_Note_: You may also use `org.apache.hadoop.fs.ozone.OzoneFileSystem` without the `Basic` prefix. The `Basic` version doesn't support FS statistics and encryption zones but can work together with older hadoop versions.
Copy the `ozonefs.jar` file from an ozone distribution (__use the legacy version!__)
```
@ -134,7 +136,7 @@ Write down the ozone filesystem uri as it should be used with the spark-submit c
```
kubectl create serviceaccount spark -n yournamespace
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=poc:yournamespace --namespace=yournamespace
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=yournamespace:spark --namespace=yournamespace
```
## Execute the job

View File

@ -83,7 +83,7 @@ public class OMFailoverProxyProvider implements
/**
* Class to store proxy information.
*/
public final class OMProxyInfo
public class OMProxyInfo
extends FailoverProxyProvider.ProxyInfo<OzoneManagerProtocolPB> {
private InetSocketAddress address;
private Text dtService;

View File

@ -116,6 +116,7 @@ run cp "${ROOT}/hadoop-ozone/objectstore-service/target/hadoop-ozone-objectstore
cp -r "${ROOT}/hadoop-hdds/docs/target/classes/docs" ./
#Copy docker compose files
run cp -p -R "${ROOT}/hadoop-ozone/dist/src/main/compose" .
#compose files are preprocessed: properties (eg. project.version) are replaced first by maven.
run cp -p -R "${ROOT}/hadoop-ozone/dist/target/compose" .
run cp -p -r "${ROOT}/hadoop-ozone/dist/src/main/smoketest" .
run cp -p -r "${ROOT}/hadoop-ozone/dist/src/main/blockade" .

View File

@ -127,6 +127,28 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<id>copy-resources</id>
<phase>compile</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/compose</outputDirectory>
<resources>
<resource>
<directory>src/main/compose</directory>
<filtering>true</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>

View File

@ -49,21 +49,53 @@ services:
environment:
ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION
command: ["/opt/hadoop/bin/ozone","scm"]
hadoop3:
hadoop32:
image: flokkr/hadoop:3.1.0
volumes:
- ../..:/opt/ozone
env_file:
- ./docker-config
environment:
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-current*.jar
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-current-@project.version@.jar
CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.OzoneFileSystem
command: ["watch","-n","100000","ls"]
hadoop2:
hadoop31:
image: flokkr/hadoop:3.1.0
volumes:
- ../..:/opt/ozone
env_file:
- ./docker-config
environment:
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.OzoneFileSystem
command: ["watch","-n","100000","ls"]
hadoop29:
image: flokkr/hadoop:2.9.0
volumes:
- ../..:/opt/ozone
env_file:
- ./docker-config
environment:
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy*.jar
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.BasicOzoneFileSystem
command: ["watch","-n","100000","ls"]
hadoop27:
image: flokkr/hadoop:2.7.3
volumes:
- ../..:/opt/ozone
env_file:
- ./docker-config
environment:
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.BasicOzoneFileSystem
command: ["watch","-n","100000","ls"]
spark:
image: flokkr/spark
volumes:
- ../..:/opt/ozone
env_file:
- ./docker-config
environment:
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.BasicOzoneFileSystem
command: ["watch","-n","100000","ls"]

View File

@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
*** Settings ***
Documentation Test ozone fs usage from Hdfs and Spark
Library OperatingSystem
Library String
Resource ../../smoketest/env-compose.robot
Resource ../../smoketest/commonlib.robot
*** Variables ***
${DATANODE_HOST} datanode
*** Keywords ***
Test hadoop dfs
[arguments] ${prefix}
${random} = Generate Random String 5 [NUMBERS]
${result} = Execute on host ${prefix} hdfs dfs -put /opt/hadoop/NOTICE.txt o3fs://bucket1.vol1/${prefix}-${random}
${result} = Execute on host ${prefix} hdfs dfs -ls o3fs://bucket1.vol1/
Should contain ${result} ${prefix}-${random}
*** Test Cases ***
Create bucket and volume to test
${result} = Run tests on host scm createbucketenv.robot
Test hadoop 3.1
Test hadoop dfs hadoop31
Test hadoop 3.2
Test hadoop dfs hadoop31
Test hadoop 2.9
Test hadoop dfs hadoop29
Test hadoop 2.7
Test hadoop dfs hadoop27
Test spark 2.3
${legacyjar} = Execute on host spark bash -c 'find /opt/ozone/share/ozone/lib/ -name *legacy*.jar'
${postfix} = Generate Random String 5 [NUMBERS]
${result} = Execute on host spark /opt/spark/bin/spark-submit --jars ${legacyjar} --class org.apache.spark.examples.DFSReadWriteTest /opt/spark//examples/jars/spark-examples_2.11-2.3.0.jar /opt/spark/README.md o3fs://bucket1.vol1/spark-${postfix}

View File

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
CORE-SITE.XML_fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem
OZONE-SITE.XML_ozone.om.address=om
OZONE-SITE.XML_ozone.om.http-address=om:9874
OZONE-SITE.XML_ozone.scm.names=scm

View File

@ -29,6 +29,12 @@ Execute
Should Be Equal As Integers ${rc} 0
[return] ${output}
Execute And Ignore Error
[arguments] ${command}
${rc} ${output} = Run And Return Rc And Output ${command}
Log ${output}
[return] ${output}
Execute and checkrc
[arguments] ${command} ${expected_error_code}
${rc} ${output} = Run And Return Rc And Output ${command}

View File

@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
*** Settings ***
Documentation Create bucket and volume for any other testings
Library OperatingSystem
Resource commonlib.robot
Test Timeout 2 minute
*** Variables ***
${volume} vol1
${bucket} bucket1
*** Keywords ***
Create volume
${result} = Execute ozone sh volume create /${volume} --user hadoop --quota 100TB --root
Should not contain ${result} Failed
Should contain ${result} Creating Volume: ${volume}
Create bucket
Execute ozone sh bucket create /${volume}/${bucket}
*** Test Cases ***
Test ozone shell
${result} = Execute And Ignore Error ozone sh bucket info /${volume}/${bucket}
Run Keyword if "VOLUME_NOT_FOUND" in """${result}""" Create volume
Run Keyword if "VOLUME_NOT_FOUND" in """${result}""" Create bucket
Run Keyword if "BUCKET_NOT_FOUND" in """${result}""" Create bucket
${result} = Execute ozone sh bucket info /${volume}/${bucket}
Should not contain ${result} NOT_FOUND

View File

@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
*** Settings ***
Documentation High level utilities to execute commands and tests in docker-compose based environments.
Resource commonlib.robot
*** Keywords ***
Run tests on host
[arguments] ${host} ${robotfile}
${result} = Execute docker-compose exec ${host} robot smoketest/${robotfile}
Execute on host
[arguments] ${host} ${command}
${rc} ${output} = Run And Return Rc And Output docker-compose exec ${host} ${command}
Log ${output}
Should Be Equal As Integers ${rc} 0
[return] ${output}

View File

@ -0,0 +1,361 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.FileAlreadyExistsException;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Basic Implementation of the OzoneFileSystem calls.
* <p>
* This is the minimal version which doesn't include any statistics.
* <p>
* For full featured version use OzoneClientAdapterImpl.
*/
public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
static final Logger LOG =
LoggerFactory.getLogger(BasicOzoneClientAdapterImpl.class);
private OzoneClient ozoneClient;
private ObjectStore objectStore;
private OzoneVolume volume;
private OzoneBucket bucket;
private ReplicationType replicationType;
private ReplicationFactor replicationFactor;
private boolean securityEnabled;
/**
* Create new OzoneClientAdapter implementation.
*
* @param volumeStr Name of the volume to use.
* @param bucketStr Name of the bucket to use
* @throws IOException In case of a problem.
*/
public BasicOzoneClientAdapterImpl(String volumeStr, String bucketStr)
throws IOException {
this(createConf(), volumeStr, bucketStr);
}
private static OzoneConfiguration createConf() {
ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
OzoneConfiguration conf = new OzoneConfiguration();
Thread.currentThread().setContextClassLoader(contextClassLoader);
return conf;
}
public BasicOzoneClientAdapterImpl(OzoneConfiguration conf, String volumeStr,
String bucketStr)
throws IOException {
this(null, -1, conf, volumeStr, bucketStr);
}
public BasicOzoneClientAdapterImpl(String omHost, int omPort,
Configuration hadoopConf, String volumeStr, String bucketStr)
throws IOException {
ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
OzoneConfiguration conf;
if (hadoopConf instanceof OzoneConfiguration) {
conf = (OzoneConfiguration) hadoopConf;
} else {
conf = new OzoneConfiguration(hadoopConf);
}
SecurityConfig secConfig = new SecurityConfig(conf);
if (secConfig.isSecurityEnabled()) {
this.securityEnabled = true;
}
try {
String replicationTypeConf =
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT);
int replicationCountConf = conf.getInt(OzoneConfigKeys.OZONE_REPLICATION,
OzoneConfigKeys.OZONE_REPLICATION_DEFAULT);
if (StringUtils.isNotEmpty(omHost) && omPort != -1) {
this.ozoneClient =
OzoneClientFactory.getRpcClient(omHost, omPort, conf);
} else {
this.ozoneClient =
OzoneClientFactory.getRpcClient(conf);
}
objectStore = ozoneClient.getObjectStore();
this.volume = objectStore.getVolume(volumeStr);
this.bucket = volume.getBucket(bucketStr);
this.replicationType = ReplicationType.valueOf(replicationTypeConf);
this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
@Override
public void close() throws IOException {
ozoneClient.close();
}
@Override
public InputStream readFile(String key) throws IOException {
incrementCounter(Statistic.OBJECTS_READ);
try {
return bucket.readFile(key).getInputStream();
} catch (OMException ex) {
if (ex.getResult() == OMException.ResultCodes.FILE_NOT_FOUND
|| ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
throw new FileNotFoundException(
ex.getResult().name() + ": " + ex.getMessage());
} else {
throw ex;
}
}
}
protected void incrementCounter(Statistic objectsRead) {
//noop: Use OzoneClientAdapterImpl which supports statistics.
}
@Override
public OzoneFSOutputStream createFile(String key, boolean overWrite,
boolean recursive) throws IOException {
incrementCounter(Statistic.OBJECTS_CREATED);
try {
OzoneOutputStream ozoneOutputStream = bucket
.createFile(key, 0, replicationType, replicationFactor, overWrite,
recursive);
return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
} catch (OMException ex) {
if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
|| ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
throw new FileAlreadyExistsException(
ex.getResult().name() + ": " + ex.getMessage());
} else {
throw ex;
}
}
}
@Override
public void renameKey(String key, String newKeyName) throws IOException {
incrementCounter(Statistic.OBJECTS_RENAMED);
bucket.renameKey(key, newKeyName);
}
/**
* Helper method to create an directory specified by key name in bucket.
*
* @param keyName key name to be created as directory
* @return true if the key is created, false otherwise
*/
@Override
public boolean createDirectory(String keyName) throws IOException {
LOG.trace("creating dir for key:{}", keyName);
incrementCounter(Statistic.OBJECTS_CREATED);
try {
bucket.createDirectory(keyName);
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS) {
throw new FileAlreadyExistsException(e.getMessage());
}
throw e;
}
return true;
}
/**
* Helper method to delete an object specified by key name in bucket.
*
* @param keyName key name to be deleted
* @return true if the key is deleted, false otherwise
*/
@Override
public boolean deleteObject(String keyName) {
LOG.trace("issuing delete for key" + keyName);
try {
incrementCounter(Statistic.OBJECTS_DELETED);
bucket.deleteKey(keyName);
return true;
} catch (IOException ioe) {
LOG.error("delete key failed " + ioe.getMessage());
return false;
}
}
public OzoneFileStatus getFileStatus(String pathKey) throws IOException {
try {
incrementCounter(Statistic.OBJECTS_QUERY);
return bucket.getFileStatus(pathKey);
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
throw new
FileNotFoundException(pathKey + ": No such file or directory!");
}
throw e;
}
}
@Override
public Iterator<BasicKeyInfo> listKeys(String pathKey) {
incrementCounter(Statistic.OBJECTS_LIST);
return new IteratorAdapter(bucket.listKeys(pathKey));
}
@Override
public Token<OzoneTokenIdentifier> getDelegationToken(String renewer)
throws IOException {
if (!securityEnabled) {
return null;
}
Token<OzoneTokenIdentifier> token = ozoneClient.getObjectStore()
.getDelegationToken(renewer == null ? null : new Text(renewer));
token.setKind(OzoneTokenIdentifier.KIND_NAME);
return token;
}
@Override
public KeyProvider getKeyProvider() throws IOException {
return objectStore.getKeyProvider();
}
@Override
public URI getKeyProviderUri() throws IOException {
return objectStore.getKeyProviderUri();
}
@Override
public String getCanonicalServiceName() {
return objectStore.getCanonicalServiceName();
}
/**
* Ozone Delegation Token Renewer.
*/
@InterfaceAudience.Private
public static class Renewer extends TokenRenewer {
//Ensure that OzoneConfiguration files are loaded before trying to use
// the renewer.
static {
OzoneConfiguration.activate();
}
public Text getKind() {
return OzoneTokenIdentifier.KIND_NAME;
}
@Override
public boolean handleKind(Text kind) {
return getKind().equals(kind);
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
@Override
public long renew(Token<?> token, Configuration conf)
throws IOException, InterruptedException {
Token<OzoneTokenIdentifier> ozoneDt =
(Token<OzoneTokenIdentifier>) token;
OzoneClient ozoneClient =
OzoneClientFactory.getRpcClient(conf);
return ozoneClient.getObjectStore().renewDelegationToken(ozoneDt);
}
@Override
public void cancel(Token<?> token, Configuration conf)
throws IOException, InterruptedException {
Token<OzoneTokenIdentifier> ozoneDt =
(Token<OzoneTokenIdentifier>) token;
OzoneClient ozoneClient =
OzoneClientFactory.getRpcClient(conf);
ozoneClient.getObjectStore().cancelDelegationToken(ozoneDt);
}
}
/**
* Adapter to convert OzoneKey to a safe and simple Key implementation.
*/
public static class IteratorAdapter implements Iterator<BasicKeyInfo> {
private Iterator<? extends OzoneKey> original;
public IteratorAdapter(Iterator<? extends OzoneKey> listKeys) {
this.original = listKeys;
}
@Override
public boolean hasNext() {
return original.hasNext();
}
@Override
public BasicKeyInfo next() {
OzoneKey next = original.next();
if (next == null) {
return null;
} else {
return new BasicKeyInfo(
next.getName(),
next.getModificationTime(),
next.getDataSize()
);
}
}
}
}

View File

@ -0,0 +1,890 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The minimal Ozone Filesystem implementation.
* <p>
* This is a basic version which doesn't extend
* KeyProviderTokenIssuer and doesn't include statistics. It can be used
* from older hadoop version. For newer hadoop version use the full featured
* OzoneFileSystem.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BasicOzoneFileSystem extends FileSystem {
static final Logger LOG =
LoggerFactory.getLogger(BasicOzoneFileSystem.class);
/**
* The Ozone client for connecting to Ozone server.
*/
private URI uri;
private String userName;
private Path workingDir;
private OzoneClientAdapter adapter;
private static final Pattern URL_SCHEMA_PATTERN =
Pattern.compile("([^\\.]+)\\.([^\\.]+)\\.{0,1}(.*)");
private static final String URI_EXCEPTION_TEXT = "Ozone file system url " +
"should be either one of the two forms: " +
"o3fs://bucket.volume/key OR " +
"o3fs://bucket.volume.om-host.example.com:5678/key";
@Override
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
setConf(conf);
Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name);
Preconditions.checkArgument(getScheme().equals(name.getScheme()),
"Invalid scheme provided in " + name);
String authority = name.getAuthority();
Matcher matcher = URL_SCHEMA_PATTERN.matcher(authority);
if (!matcher.matches()) {
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
}
String bucketStr = matcher.group(1);
String volumeStr = matcher.group(2);
String remaining = matcher.groupCount() == 3 ? matcher.group(3) : null;
String omHost = null;
String omPort = String.valueOf(-1);
if (!isEmpty(remaining)) {
String[] parts = remaining.split(":");
if (parts.length != 2) {
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
}
omHost = parts[0];
omPort = parts[1];
if (!isNumber(omPort)) {
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
}
}
try {
uri = new URIBuilder().setScheme(OZONE_URI_SCHEME)
.setHost(authority)
.build();
LOG.trace("Ozone URI for ozfs initialization is " + uri);
//isolated is the default for ozonefs-lib-legacy which includes the
// /ozonefs.txt, otherwise the default is false. It could be overridden.
boolean defaultValue =
BasicOzoneFileSystem.class.getClassLoader()
.getResource("ozonefs.txt")
!= null;
//Use string here instead of the constant as constant may not be available
//on the classpath of a hadoop 2.7
boolean isolatedClassloader =
conf.getBoolean("ozone.fs.isolated-classloader", defaultValue);
this.adapter = createAdapter(conf, bucketStr, volumeStr, omHost, omPort,
isolatedClassloader);
try {
this.userName =
UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) {
this.userName = OZONE_DEFAULT_USER;
}
this.workingDir = new Path(OZONE_USER_DIR, this.userName)
.makeQualified(this.uri, this.workingDir);
} catch (URISyntaxException ue) {
final String msg = "Invalid Ozone endpoint " + name;
LOG.error(msg, ue);
throw new IOException(msg, ue);
}
}
protected OzoneClientAdapter createAdapter(Configuration conf,
String bucketStr,
String volumeStr, String omHost, String omPort,
boolean isolatedClassloader) throws IOException {
if (isolatedClassloader) {
return OzoneClientAdapterFactory
.createAdapter(volumeStr, bucketStr);
} else {
return new BasicOzoneClientAdapterImpl(omHost,
Integer.parseInt(omPort), conf,
volumeStr, bucketStr);
}
}
@Override
public void close() throws IOException {
try {
adapter.close();
} finally {
super.close();
}
}
@Override
public URI getUri() {
return uri;
}
@Override
public String getScheme() {
return OZONE_URI_SCHEME;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
incrementCounter(Statistic.INVOCATION_OPEN);
statistics.incrementWriteOps(1);
LOG.trace("open() path:{}", f);
final FileStatus fileStatus = getFileStatus(f);
final String key = pathToKey(f);
if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open directory " + f + " to read");
}
return new FSDataInputStream(
new OzoneFSInputStream(adapter.readFile(key)));
}
protected void incrementCounter(Statistic statistic) {
//don't do anyting in this default implementation.
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize,
short replication, long blockSize,
Progressable progress) throws IOException {
LOG.trace("create() path:{}", f);
incrementCounter(Statistic.INVOCATION_CREATE);
statistics.incrementWriteOps(1);
final String key = pathToKey(f);
final FileStatus status;
try {
status = getFileStatus(f);
if (status.isDirectory()) {
throw new FileAlreadyExistsException(f + " is a directory");
} else {
if (!overwrite) {
// path references a file and overwrite is disabled
throw new FileAlreadyExistsException(f + " already exists");
}
LOG.trace("Overwriting file {}", f);
adapter.deleteObject(key);
}
} catch (FileNotFoundException ignored) {
// this means the file is not found
}
// We pass null to FSDataOutputStream so it won't count writes that
// are being buffered to a file
return createOutputStream(key, overwrite, true);
}
@Override
public FSDataOutputStream createNonRecursive(Path path,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE);
statistics.incrementWriteOps(1);
final String key = pathToKey(path);
final Path parent = path.getParent();
if (parent != null) {
// expect this to raise an exception if there is no parent
if (!getFileStatus(parent).isDirectory()) {
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
}
return createOutputStream(key, flags.contains(CreateFlag.OVERWRITE), false);
}
private FSDataOutputStream createOutputStream(String key, boolean overwrite,
boolean recursive) throws IOException {
return new FSDataOutputStream(adapter.createFile(key, overwrite, recursive),
statistics);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new UnsupportedOperationException("append() Not implemented by the "
+ getClass().getSimpleName() + " FileSystem implementation");
}
private class RenameIterator extends OzoneListingIterator {
private final String srcKey;
private final String dstKey;
RenameIterator(Path srcPath, Path dstPath)
throws IOException {
super(srcPath);
srcKey = pathToKey(srcPath);
dstKey = pathToKey(dstPath);
LOG.trace("rename from:{} to:{}", srcKey, dstKey);
}
@Override
boolean processKey(String key) throws IOException {
String newKeyName = dstKey.concat(key.substring(srcKey.length()));
adapter.renameKey(key, newKeyName);
return true;
}
}
/**
* Check whether the source and destination path are valid and then perform
* rename from source path to destination path.
* <p>
* The rename operation is performed by renaming the keys with src as prefix.
* For such keys the prefix is changed from src to dst.
*
* @param src source path for rename
* @param dst destination path for rename
* @return true if rename operation succeeded or
* if the src and dst have the same path and are of the same type
* @throws IOException on I/O errors or if the src/dst paths are invalid.
*/
@Override
public boolean rename(Path src, Path dst) throws IOException {
incrementCounter(Statistic.INVOCATION_RENAME);
statistics.incrementWriteOps(1);
if (src.equals(dst)) {
return true;
}
LOG.trace("rename() from:{} to:{}", src, dst);
if (src.isRoot()) {
// Cannot rename root of file system
LOG.trace("Cannot rename the root of a filesystem");
return false;
}
// Cannot rename a directory to its own subdirectory
Path dstParent = dst.getParent();
while (dstParent != null && !src.equals(dstParent)) {
dstParent = dstParent.getParent();
}
Preconditions.checkArgument(dstParent == null,
"Cannot rename a directory to its own subdirectory");
// Check if the source exists
FileStatus srcStatus;
try {
srcStatus = getFileStatus(src);
} catch (FileNotFoundException fnfe) {
// source doesn't exist, return
return false;
}
// Check if the destination exists
FileStatus dstStatus;
try {
dstStatus = getFileStatus(dst);
} catch (FileNotFoundException fnde) {
dstStatus = null;
}
if (dstStatus == null) {
// If dst doesn't exist, check whether dst parent dir exists or not
// if the parent exists, the source can still be renamed to dst path
dstStatus = getFileStatus(dst.getParent());
if (!dstStatus.isDirectory()) {
throw new IOException(String.format(
"Failed to rename %s to %s, %s is a file", src, dst,
dst.getParent()));
}
} else {
// if dst exists and source and destination are same,
// check both the src and dst are of same type
if (srcStatus.getPath().equals(dstStatus.getPath())) {
return !srcStatus.isDirectory();
} else if (dstStatus.isDirectory()) {
// If dst is a directory, rename source as subpath of it.
// for example rename /source to /dst will lead to /dst/source
dst = new Path(dst, src.getName());
FileStatus[] statuses;
try {
statuses = listStatus(dst);
} catch (FileNotFoundException fnde) {
statuses = null;
}
if (statuses != null && statuses.length > 0) {
// If dst exists and not a directory not empty
throw new FileAlreadyExistsException(String.format(
"Failed to rename %s to %s, file already exists or not empty!",
src, dst));
}
} else {
// If dst is not a directory
throw new FileAlreadyExistsException(String.format(
"Failed to rename %s to %s, file already exists!", src, dst));
}
}
if (srcStatus.isDirectory()) {
if (dst.toString().startsWith(src.toString() + OZONE_URI_DELIMITER)) {
LOG.trace("Cannot rename a directory to a subdirectory of self");
return false;
}
}
RenameIterator iterator = new RenameIterator(src, dst);
return iterator.iterate();
}
private class DeleteIterator extends OzoneListingIterator {
private boolean recursive;
DeleteIterator(Path f, boolean recursive)
throws IOException {
super(f);
this.recursive = recursive;
if (getStatus().isDirectory()
&& !this.recursive
&& listStatus(f).length != 0) {
throw new PathIsNotEmptyDirectoryException(f.toString());
}
}
@Override
boolean processKey(String key) throws IOException {
if (key.equals("")) {
LOG.trace("Skipping deleting root directory");
return true;
} else {
LOG.trace("deleting key:" + key);
boolean succeed = adapter.deleteObject(key);
// if recursive delete is requested ignore the return value of
// deleteObject and issue deletes for other keys.
return recursive || succeed;
}
}
}
/**
* Deletes the children of the input dir path by iterating though the
* DeleteIterator.
*
* @param f directory path to be deleted
* @return true if successfully deletes all required keys, false otherwise
* @throws IOException
*/
private boolean innerDelete(Path f, boolean recursive) throws IOException {
LOG.trace("delete() path:{} recursive:{}", f, recursive);
try {
DeleteIterator iterator = new DeleteIterator(f, recursive);
return iterator.iterate();
} catch (FileNotFoundException e) {
LOG.debug("Couldn't delete {} - does not exist", f);
return false;
}
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
incrementCounter(Statistic.INVOCATION_DELETE);
statistics.incrementWriteOps(1);
LOG.debug("Delete path {} - recursive {}", f, recursive);
FileStatus status;
try {
status = getFileStatus(f);
} catch (FileNotFoundException ex) {
LOG.warn("delete: Path does not exist: {}", f);
return false;
}
String key = pathToKey(f);
boolean result;
if (status.isDirectory()) {
LOG.debug("delete: Path is a directory: {}", f);
key = addTrailingSlashIfNeeded(key);
if (key.equals("/")) {
LOG.warn("Cannot delete root directory.");
return false;
}
result = innerDelete(f, recursive);
} else {
LOG.debug("delete: Path is a file: {}", f);
result = adapter.deleteObject(key);
}
if (result) {
// If this delete operation removes all files/directories from the
// parent direcotry, then an empty parent directory must be created.
Path parent = f.getParent();
if (parent != null && !parent.isRoot()) {
createFakeDirectoryIfNecessary(parent);
}
}
return result;
}
/**
* Create a fake parent directory key if it does not already exist and no
* other child of this parent directory exists.
*
* @param f path to the fake parent directory
* @throws IOException
*/
private void createFakeDirectoryIfNecessary(Path f) throws IOException {
String key = pathToKey(f);
if (!key.isEmpty() && !o3Exists(f)) {
LOG.debug("Creating new fake directory at {}", f);
String dirKey = addTrailingSlashIfNeeded(key);
adapter.createDirectory(dirKey);
}
}
/**
* Check if a file or directory exists corresponding to given path.
*
* @param f path to file/directory.
* @return true if it exists, false otherwise.
* @throws IOException
*/
private boolean o3Exists(final Path f) throws IOException {
Path path = makeQualified(f);
try {
getFileStatus(path);
return true;
} catch (FileNotFoundException ex) {
return false;
}
}
private class ListStatusIterator extends OzoneListingIterator {
// _fileStatuses_ maintains a list of file(s) which is either the input
// path itself or a child of the input directory path.
private List<FileStatus> fileStatuses = new ArrayList<>(LISTING_PAGE_SIZE);
// _subDirStatuses_ maintains a list of sub-dirs of the input directory
// path.
private Map<Path, FileStatus> subDirStatuses =
new HashMap<>(LISTING_PAGE_SIZE);
private Path f; // the input path
ListStatusIterator(Path f) throws IOException {
super(f);
this.f = f;
}
/**
* Add the key to the listStatus result if the key corresponds to the
* input path or is an immediate child of the input path.
*
* @param key key to be processed
* @return always returns true
* @throws IOException
*/
@Override
boolean processKey(String key) throws IOException {
Path keyPath = new Path(OZONE_URI_DELIMITER + key);
if (key.equals(getPathKey())) {
if (pathIsDirectory()) {
// if input path is a directory, we add the sub-directories and
// files under this directory.
return true;
} else {
addFileStatus(keyPath);
return true;
}
}
// Left with only subkeys now
// We add only the immediate child files and sub-dirs i.e. we go only
// upto one level down the directory tree structure.
if (pathToKey(keyPath.getParent()).equals(pathToKey(f))) {
// This key is an immediate child. Can be file or directory
if (key.endsWith(OZONE_URI_DELIMITER)) {
// Key is a directory
addSubDirStatus(keyPath);
} else {
addFileStatus(keyPath);
}
} else {
// This key is not the immediate child of the input directory. So we
// traverse the parent tree structure of this key until we get the
// immediate child of the input directory.
Path immediateChildPath = getImmediateChildPath(keyPath.getParent());
if (immediateChildPath != null) {
addSubDirStatus(immediateChildPath);
}
}
return true;
}
/**
* Adds the FileStatus of keyPath to final result of listStatus.
*
* @param filePath path to the file
* @throws FileNotFoundException
*/
void addFileStatus(Path filePath) throws IOException {
fileStatuses.add(getFileStatus(filePath));
}
/**
* Adds the FileStatus of the subdir to final result of listStatus, if not
* already included.
*
* @param dirPath path to the dir
* @throws FileNotFoundException
*/
void addSubDirStatus(Path dirPath) throws IOException {
// Check if subdir path is already included in statuses.
if (!subDirStatuses.containsKey(dirPath)) {
subDirStatuses.put(dirPath, getFileStatus(dirPath));
}
}
/**
* Traverse the parent directory structure of keyPath to determine the
* which parent/ grand-parent/.. is the immediate child of the input path f.
*
* @param keyPath path whose parent directory structure should be traversed.
* @return immediate child path of the input path f.
*/
Path getImmediateChildPath(Path keyPath) {
Path path = keyPath;
Path parent = path.getParent();
while (parent != null) {
if (pathToKey(parent).equals(pathToKey(f))) {
return path;
}
path = parent;
parent = path.getParent();
}
return null;
}
/**
* Return the result of listStatus operation. If the input path is a
* file, return the status for only that file. If the input path is a
* directory, return the statuses for all the child files and sub-dirs.
*/
FileStatus[] getStatuses() {
List<FileStatus> result = Stream.concat(
fileStatuses.stream(), subDirStatuses.values().stream())
.collect(Collectors.toList());
return result.toArray(new FileStatus[result.size()]);
}
}
@Override
public FileStatus[] listStatus(Path f) throws IOException {
incrementCounter(Statistic.INVOCATION_LIST_STATUS);
statistics.incrementReadOps(1);
LOG.trace("listStatus() path:{}", f);
ListStatusIterator iterator = new ListStatusIterator(f);
iterator.iterate();
return iterator.getStatuses();
}
@Override
public void setWorkingDirectory(Path newDir) {
workingDir = newDir;
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
@Override
public Token<?> getDelegationToken(String renewer) throws IOException {
return adapter.getDelegationToken(renewer);
}
/**
* Get a canonical service name for this file system. If the URI is logical,
* the hostname part of the URI will be returned.
*
* @return a service string that uniquely identifies this file system.
*/
@Override
public String getCanonicalServiceName() {
return adapter.getCanonicalServiceName();
}
/**
* Get the username of the FS.
*
* @return the short name of the user who instantiated the FS
*/
public String getUsername() {
return userName;
}
/**
* Check whether the path is valid and then create directories.
* Directory is represented using a key with no value.
* All the non-existent parent directories are also created.
*
* @param path directory path to be created
* @return true if directory exists or created successfully.
* @throws IOException
*/
private boolean mkdir(Path path) throws IOException {
Path fPart = path;
Path prevfPart = null;
do {
LOG.trace("validating path:{}", fPart);
try {
FileStatus fileStatus = getFileStatus(fPart);
if (fileStatus.isDirectory()) {
// If path exists and a directory, exit
break;
} else {
// Found a file here, rollback and delete newly created directories
LOG.trace("Found a file with same name as directory, path:{}", fPart);
if (prevfPart != null) {
delete(prevfPart, true);
}
throw new FileAlreadyExistsException(String.format(
"Can't make directory for path '%s', it is a file.", fPart));
}
} catch (FileNotFoundException fnfe) {
LOG.trace("creating directory for fpart:{}", fPart);
String key = pathToKey(fPart);
String dirKey = addTrailingSlashIfNeeded(key);
if (!adapter.createDirectory(dirKey)) {
// Directory creation failed here,
// rollback and delete newly created directories
LOG.trace("Directory creation failed, path:{}", fPart);
if (prevfPart != null) {
delete(prevfPart, true);
}
return false;
}
}
prevfPart = fPart;
fPart = fPart.getParent();
} while (fPart != null);
return true;
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
LOG.trace("mkdir() path:{} ", f);
String key = pathToKey(f);
if (isEmpty(key)) {
return false;
}
return mkdir(f);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
incrementCounter(Statistic.INVOCATION_GET_FILE_STATUS);
statistics.incrementReadOps(1);
LOG.trace("getFileStatus() path:{}", f);
Path qualifiedPath = f.makeQualified(uri, workingDir);
String key = pathToKey(qualifiedPath);
return adapter.getFileStatus(key)
.makeQualified(uri, qualifiedPath, getUsername(), getUsername());
}
/**
* Turn a path (relative or otherwise) into an Ozone key.
*
* @param path the path of the file.
* @return the key of the object that represents the file.
*/
public String pathToKey(Path path) {
Objects.requireNonNull(path, "Path canf not be null!");
if (!path.isAbsolute()) {
path = new Path(workingDir, path);
}
// removing leading '/' char
String key = path.toUri().getPath().substring(1);
LOG.trace("path for key:{} is:{}", key, path);
return key;
}
/**
* Add trailing delimiter to path if it is already not present.
*
* @param key the ozone Key which needs to be appended
* @return delimiter appended key
*/
private String addTrailingSlashIfNeeded(String key) {
if (!isEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
return key + OZONE_URI_DELIMITER;
} else {
return key;
}
}
@Override
public String toString() {
return "OzoneFileSystem{URI=" + uri + ", "
+ "workingDir=" + workingDir + ", "
+ "userName=" + userName + ", "
+ "statistics=" + statistics
+ "}";
}
/**
* This class provides an interface to iterate through all the keys in the
* bucket prefixed with the input path key and process them.
* <p>
* Each implementing class should define how the keys should be processed
* through the processKey() function.
*/
private abstract class OzoneListingIterator {
private final Path path;
private final FileStatus status;
private String pathKey;
private Iterator<BasicKeyInfo> keyIterator;
OzoneListingIterator(Path path)
throws IOException {
this.path = path;
this.status = getFileStatus(path);
this.pathKey = pathToKey(path);
if (status.isDirectory()) {
this.pathKey = addTrailingSlashIfNeeded(pathKey);
}
keyIterator = adapter.listKeys(pathKey);
}
/**
* The output of processKey determines if further iteration through the
* keys should be done or not.
*
* @return true if we should continue iteration of keys, false otherwise.
* @throws IOException
*/
abstract boolean processKey(String key) throws IOException;
/**
* Iterates thorugh all the keys prefixed with the input path's key and
* processes the key though processKey().
* If for any key, the processKey() returns false, then the iteration is
* stopped and returned with false indicating that all the keys could not
* be processed successfully.
*
* @return true if all keys are processed successfully, false otherwise.
* @throws IOException
*/
boolean iterate() throws IOException {
LOG.trace("Iterating path {}", path);
if (status.isDirectory()) {
LOG.trace("Iterating directory:{}", pathKey);
while (keyIterator.hasNext()) {
BasicKeyInfo key = keyIterator.next();
LOG.trace("iterating key:{}", key.getName());
if (!processKey(key.getName())) {
return false;
}
}
return true;
} else {
LOG.trace("iterating file:{}", path);
return processKey(pathKey);
}
}
String getPathKey() {
return pathKey;
}
boolean pathIsDirectory() {
return status.isDirectory();
}
FileStatus getStatus() {
return status;
}
}
public OzoneClientAdapter getAdapter() {
return adapter;
}
public boolean isEmpty(CharSequence cs) {
return cs == null || cs.length() == 0;
}
public boolean isNumber(String number) {
try {
Integer.parseInt(number);
} catch (NumberFormatException ex) {
return false;
}
return true;
}
}

View File

@ -51,6 +51,7 @@ public class FilteredClassLoader extends URLClassLoader {
public FilteredClassLoader(URL[] urls, ClassLoader parent) {
super(urls, null);
delegatedClasses.add("org.apache.hadoop.fs.ozone.OzoneClientAdapter");
delegatedClasses.add("org.apache.hadoop.security.token.Token");
delegatedClasses.add("org.apache.hadoop.fs.ozone.BasicKeyInfo");
delegatedClasses.add("org.apache.hadoop.fs.ozone.OzoneFSOutputStream");
delegatedClasses.add("org.apache.hadoop.fs.ozone.OzoneFSStorageStatistics");

View File

@ -29,7 +29,7 @@ import java.net.URI;
import java.util.Iterator;
/**
* Lightweight adapter to separte hadoop/ozone classes.
* Lightweight adapter to separate hadoop/ozone classes.
* <p>
* This class contains only the bare minimum Ozone classes in the signature.
* It could be loaded by a different classloader because only the objects in

View File

@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import org.apache.hadoop.fs.StorageStatistics;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,7 +46,7 @@ public final class OzoneClientAdapterFactory {
public static OzoneClientAdapter createAdapter(
String volumeStr,
String bucketStr) throws IOException {
return createAdapter(volumeStr, bucketStr,
return createAdapter(volumeStr, bucketStr, true,
(aClass) -> (OzoneClientAdapter) aClass
.getConstructor(String.class, String.class)
.newInstance(
@ -56,9 +58,8 @@ public final class OzoneClientAdapterFactory {
public static OzoneClientAdapter createAdapter(
String volumeStr,
String bucketStr,
OzoneFSStorageStatistics storageStatistics)
throws IOException {
return createAdapter(volumeStr, bucketStr,
StorageStatistics storageStatistics) throws IOException {
return createAdapter(volumeStr, bucketStr, false,
(aClass) -> (OzoneClientAdapter) aClass
.getConstructor(String.class, String.class,
OzoneFSStorageStatistics.class)
@ -72,9 +73,11 @@ public final class OzoneClientAdapterFactory {
public static OzoneClientAdapter createAdapter(
String volumeStr,
String bucketStr,
boolean basic,
OzoneClientAdapterCreator creator) throws IOException {
ClassLoader currentClassLoader = OzoneFileSystem.class.getClassLoader();
ClassLoader currentClassLoader =
OzoneClientAdapterFactory.class.getClassLoader();
List<URL> urls = new ArrayList<>();
findEmbeddedLibsUrl(urls, currentClassLoader);
@ -99,10 +102,18 @@ public final class OzoneClientAdapterFactory {
reflectionUtils.getMethod("getClassByName", String.class)
.invoke(null, "org.apache.ratis.grpc.GrpcFactory");
Class<?> aClass = classLoader
.loadClass("org.apache.hadoop.fs.ozone.OzoneClientAdapterImpl");
Class<?> adapterClass = null;
if (basic) {
adapterClass = classLoader
.loadClass(
"org.apache.hadoop.fs.ozone.BasicOzoneClientAdapterImpl");
} else {
adapterClass = classLoader
.loadClass(
"org.apache.hadoop.fs.ozone.OzoneClientAdapterImpl");
}
OzoneClientAdapter ozoneClientAdapter =
creator.createOzoneClientAdapter(aClass);
creator.createOzoneClientAdapter(adapterClass);
Thread.currentThread().setContextClassLoader(contextClassLoader);
@ -134,7 +145,8 @@ public final class OzoneClientAdapterFactory {
//marker file is added to the jar to make it easier to find the URL
// for the current jar.
String markerFile = "ozonefs.txt";
ClassLoader currentClassLoader = OzoneFileSystem.class.getClassLoader();
ClassLoader currentClassLoader =
OzoneClientAdapterFactory.class.getClassLoader();
URL ozFs = currentClassLoader
.getResource(markerFile);

View File

@ -17,368 +17,44 @@
*/
package org.apache.hadoop.fs.ozone;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Iterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of the OzoneFileSystem calls.
*/
public class OzoneClientAdapterImpl implements OzoneClientAdapter {
public class OzoneClientAdapterImpl extends BasicOzoneClientAdapterImpl {
static final Logger LOG =
LoggerFactory.getLogger(OzoneClientAdapterImpl.class);
private OzoneClient ozoneClient;
private ObjectStore objectStore;
private OzoneVolume volume;
private OzoneBucket bucket;
private ReplicationType replicationType;
private ReplicationFactor replicationFactor;
private OzoneFSStorageStatistics storageStatistics;
private boolean securityEnabled;
/**
* Create new OzoneClientAdapter implementation.
*
* @param volumeStr Name of the volume to use.
* @param bucketStr Name of the bucket to use
* @param storageStatistics Storage statistic (optional, can be null)
* @throws IOException In case of a problem.
*/
public OzoneClientAdapterImpl(String volumeStr, String bucketStr,
OzoneFSStorageStatistics storageStatistics) throws IOException {
this(createConf(), volumeStr, bucketStr, storageStatistics);
}
/**
* Create new OzoneClientAdapter implementation.
*
* @param volumeStr Name of the volume to use.
* @param bucketStr Name of the bucket to use
* @throws IOException In case of a problem.
*/
public OzoneClientAdapterImpl(String volumeStr, String bucketStr)
OzoneFSStorageStatistics storageStatistics)
throws IOException {
this(createConf(), volumeStr, bucketStr, null);
super(volumeStr, bucketStr);
this.storageStatistics = storageStatistics;
}
private static OzoneConfiguration createConf() {
ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
OzoneConfiguration conf = new OzoneConfiguration();
Thread.currentThread().setContextClassLoader(contextClassLoader);
return conf;
}
public OzoneClientAdapterImpl(OzoneConfiguration conf, String volumeStr,
String bucketStr, OzoneFSStorageStatistics storageStatistics)
public OzoneClientAdapterImpl(
OzoneConfiguration conf, String volumeStr, String bucketStr,
OzoneFSStorageStatistics storageStatistics)
throws IOException {
this(null, -1, conf, volumeStr, bucketStr, storageStatistics);
super(conf, volumeStr, bucketStr);
this.storageStatistics = storageStatistics;
}
public OzoneClientAdapterImpl(String omHost, int omPort,
Configuration hadoopConf, String volumeStr, String bucketStr,
OzoneFSStorageStatistics storageStatistics) throws IOException {
ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
OzoneConfiguration conf;
if (hadoopConf instanceof OzoneConfiguration) {
conf = (OzoneConfiguration) hadoopConf;
} else {
conf = new OzoneConfiguration(hadoopConf);
}
SecurityConfig secConfig = new SecurityConfig(conf);
if (secConfig.isSecurityEnabled()) {
this.securityEnabled = true;
}
try {
String replicationTypeConf =
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT);
int replicationCountConf = conf.getInt(OzoneConfigKeys.OZONE_REPLICATION,
OzoneConfigKeys.OZONE_REPLICATION_DEFAULT);
if (StringUtils.isNotEmpty(omHost) && omPort != -1) {
this.ozoneClient =
OzoneClientFactory.getRpcClient(omHost, omPort, conf);
} else {
this.ozoneClient =
OzoneClientFactory.getRpcClient(conf);
}
objectStore = ozoneClient.getObjectStore();
this.volume = objectStore.getVolume(volumeStr);
this.bucket = volume.getBucket(bucketStr);
this.replicationType = ReplicationType.valueOf(replicationTypeConf);
this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf);
this.storageStatistics = storageStatistics;
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
@Override
public void close() throws IOException {
ozoneClient.close();
}
@Override
public InputStream readFile(String key) throws IOException {
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.OBJECTS_READ, 1);
}
try {
return bucket.readFile(key).getInputStream();
} catch (OMException ex) {
if (ex.getResult() == OMException.ResultCodes.FILE_NOT_FOUND
|| ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
throw new FileNotFoundException(
ex.getResult().name() + ": " + ex.getMessage());
} else {
throw ex;
}
}
}
@Override
public OzoneFSOutputStream createFile(String key, boolean overWrite,
boolean recursive) throws IOException {
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.OBJECTS_CREATED, 1);
}
try {
OzoneOutputStream ozoneOutputStream = bucket
.createFile(key, 0, replicationType, replicationFactor, overWrite,
recursive);
return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
} catch (OMException ex) {
if (ex.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS
|| ex.getResult() == OMException.ResultCodes.NOT_A_FILE) {
throw new FileAlreadyExistsException(
ex.getResult().name() + ": " + ex.getMessage());
} else {
throw ex;
}
}
}
@Override
public void renameKey(String key, String newKeyName) throws IOException {
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.OBJECTS_RENAMED, 1);
}
bucket.renameKey(key, newKeyName);
}
/**
* Helper method to create an directory specified by key name in bucket.
*
* @param keyName key name to be created as directory
* @return true if the key is created, false otherwise
*/
@Override
public boolean createDirectory(String keyName) throws IOException {
LOG.trace("creating dir for key:{}", keyName);
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.OBJECTS_CREATED, 1);
}
try {
bucket.createDirectory(keyName);
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.FILE_ALREADY_EXISTS) {
throw new FileAlreadyExistsException(e.getMessage());
}
throw e;
}
return true;
}
/**
* Helper method to delete an object specified by key name in bucket.
*
* @param keyName key name to be deleted
* @return true if the key is deleted, false otherwise
*/
@Override
public boolean deleteObject(String keyName) {
LOG.trace("issuing delete for key" + keyName);
try {
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.OBJECTS_DELETED, 1);
}
bucket.deleteKey(keyName);
return true;
} catch (IOException ioe) {
LOG.error("delete key failed " + ioe.getMessage());
return false;
}
}
public OzoneFileStatus getFileStatus(String pathKey) throws IOException {
try {
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.OBJECTS_QUERY, 1);
}
return bucket.getFileStatus(pathKey);
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
throw new
FileNotFoundException(pathKey + ": No such file or directory!");
}
throw e;
}
}
@Override
public Iterator<BasicKeyInfo> listKeys(String pathKey) {
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.OBJECTS_LIST, 1);
}
return new IteratorAdapter(bucket.listKeys(pathKey));
}
@Override
public Token<OzoneTokenIdentifier> getDelegationToken(String renewer)
OzoneFSStorageStatistics storageStatistics)
throws IOException {
if (!securityEnabled) {
return null;
}
Token<OzoneTokenIdentifier> token = ozoneClient.getObjectStore()
.getDelegationToken(renewer == null ? null : new Text(renewer));
token.setKind(OzoneTokenIdentifier.KIND_NAME);
return token;
super(omHost, omPort, hadoopConf, volumeStr, bucketStr);
this.storageStatistics = storageStatistics;
}
@Override
public KeyProvider getKeyProvider() throws IOException {
return objectStore.getKeyProvider();
}
@Override
public URI getKeyProviderUri() throws IOException {
return objectStore.getKeyProviderUri();
}
@Override
public String getCanonicalServiceName() {
return objectStore.getCanonicalServiceName();
}
/**
* Ozone Delegation Token Renewer.
*/
@InterfaceAudience.Private
public static class Renewer extends TokenRenewer {
//Ensure that OzoneConfiguration files are loaded before trying to use
// the renewer.
static {
OzoneConfiguration.activate();
}
public Text getKind() {
return OzoneTokenIdentifier.KIND_NAME;
}
@Override
public boolean handleKind(Text kind) {
return getKind().equals(kind);
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
@Override
public long renew(Token<?> token, Configuration conf)
throws IOException, InterruptedException {
Token<OzoneTokenIdentifier> ozoneDt =
(Token<OzoneTokenIdentifier>) token;
OzoneClient ozoneClient =
OzoneClientFactory.getRpcClient(conf);
return ozoneClient.getObjectStore().renewDelegationToken(ozoneDt);
}
@Override
public void cancel(Token<?> token, Configuration conf)
throws IOException, InterruptedException {
Token<OzoneTokenIdentifier> ozoneDt =
(Token<OzoneTokenIdentifier>) token;
OzoneClient ozoneClient =
OzoneClientFactory.getRpcClient(conf);
ozoneClient.getObjectStore().cancelDelegationToken(ozoneDt);
}
}
/**
* Adapter to convert OzoneKey to a safe and simple Key implementation.
*/
public static class IteratorAdapter implements Iterator<BasicKeyInfo> {
private Iterator<? extends OzoneKey> original;
public IteratorAdapter(Iterator<? extends OzoneKey> listKeys) {
this.original = listKeys;
}
@Override
public boolean hasNext() {
return original.hasNext();
}
@Override
public BasicKeyInfo next() {
OzoneKey next = original.next();
if (next == null) {
return null;
} else {
return new BasicKeyInfo(
next.getName(),
next.getModificationTime(),
next.getDataSize()
);
}
protected void incrementCounter(Statistic objectsRead) {
if (storageStatistics != null) {
storageStatistics.incrementCounter(objectsRead, 1);
}
}
}

View File

@ -18,53 +18,18 @@
package org.apache.hadoop.fs.ozone;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Ozone Filesystem implementation.
@ -76,225 +41,19 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class OzoneFileSystem extends FileSystem
public class OzoneFileSystem extends BasicOzoneFileSystem
implements KeyProviderTokenIssuer {
static final Logger LOG = LoggerFactory.getLogger(OzoneFileSystem.class);
/**
* The Ozone client for connecting to Ozone server.
*/
private URI uri;
private String userName;
private Path workingDir;
private OzoneClientAdapter adapter;
private OzoneFSStorageStatistics storageStatistics;
private static final Pattern URL_SCHEMA_PATTERN =
Pattern.compile("([^\\.]+)\\.([^\\.]+)\\.{0,1}(.*)");
private static final String URI_EXCEPTION_TEXT = "Ozone file system url " +
"should be either one of the two forms: " +
"o3fs://bucket.volume/key OR " +
"o3fs://bucket.volume.om-host.example.com:5678/key";
@Override
public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
setConf(conf);
Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name);
Preconditions.checkArgument(getScheme().equals(name.getScheme()),
"Invalid scheme provided in " + name);
String authority = name.getAuthority();
Matcher matcher = URL_SCHEMA_PATTERN.matcher(authority);
if (!matcher.matches()) {
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
}
String bucketStr = matcher.group(1);
String volumeStr = matcher.group(2);
String remaining = matcher.groupCount() == 3 ? matcher.group(3) : null;
String omHost = null;
String omPort = String.valueOf(-1);
if (StringUtils.isNotEmpty(remaining)) {
String[] parts = remaining.split(":");
if (parts.length != 2) {
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
}
omHost = parts[0];
omPort = parts[1];
if (!NumberUtils.isParsable(omPort)) {
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
}
}
try {
uri = new URIBuilder().setScheme(OZONE_URI_SCHEME)
.setHost(authority)
.build();
LOG.trace("Ozone URI for ozfs initialization is " + uri);
//isolated is the default for ozonefs-lib-legacy which includes the
// /ozonefs.txt, otherwise the default is false. It could be overridden.
boolean defaultValue =
OzoneFileSystem.class.getClassLoader().getResource("ozonefs.txt")
!= null;
//Use string here instead of the constant as constant may not be available
//on the classpath of a hadoop 2.7
boolean isolatedClassloader =
conf.getBoolean("ozone.fs.isolated-classloader", defaultValue);
try {
//register only to the GlobalStorageStatistics if the class exists.
//This is required to support hadoop versions <2.7
Class.forName("org.apache.hadoop.fs.GlobalStorageStatistics");
storageStatistics = (OzoneFSStorageStatistics)
GlobalStorageStatistics.INSTANCE
.put(OzoneFSStorageStatistics.NAME,
OzoneFSStorageStatistics::new);
} catch (ClassNotFoundException e) {
//we don't support storage statistics for hadoop2.7 and older
}
if (isolatedClassloader) {
try {
//register only to the GlobalStorageStatistics if the class exists.
//This is required to support hadoop versions <2.7
Class.forName("org.apache.hadoop.fs.GlobalStorageStatistics");
this.adapter =
OzoneClientAdapterFactory.createAdapter(volumeStr, bucketStr,
storageStatistics);
} catch (ClassNotFoundException e) {
this.adapter =
OzoneClientAdapterFactory.createAdapter(volumeStr, bucketStr);
}
} else {
this.adapter = new OzoneClientAdapterImpl(omHost,
Integer.parseInt(omPort), conf,
volumeStr, bucketStr, storageStatistics);
}
try {
this.userName =
UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) {
this.userName = OZONE_DEFAULT_USER;
}
this.workingDir = new Path(OZONE_USER_DIR, this.userName)
.makeQualified(this.uri, this.workingDir);
} catch (URISyntaxException ue) {
final String msg = "Invalid Ozone endpoint " + name;
LOG.error(msg, ue);
throw new IOException(msg, ue);
}
}
@Override
public void close() throws IOException {
try {
adapter.close();
} finally {
super.close();
}
}
@Override
public URI getUri() {
return uri;
}
@Override
public String getScheme() {
return OZONE_URI_SCHEME;
}
Statistics getFsStatistics() {
return statistics;
}
OzoneFSStorageStatistics getOzoneFSOpsCountStatistics() {
return storageStatistics;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.INVOCATION_OPEN, 1);
}
statistics.incrementWriteOps(1);
LOG.trace("open() path:{}", f);
final String key = pathToKey(f);
return new FSDataInputStream(new OzoneFSInputStream(adapter.readFile(key)));
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize,
short replication, long blockSize,
Progressable progress) throws IOException {
LOG.trace("create() path:{}", f);
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.INVOCATION_CREATE, 1);
}
statistics.incrementWriteOps(1);
final String key = pathToKey(f);
// We pass null to FSDataOutputStream so it won't count writes that
// are being buffered to a file
return createOutputStream(key, overwrite, true);
}
@Override
public FSDataOutputStream createNonRecursive(Path path,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
if (storageStatistics != null) {
storageStatistics.incrementCounter(
Statistic.INVOCATION_CREATE_NON_RECURSIVE, 1);
}
statistics.incrementWriteOps(1);
final String key = pathToKey(path);
final Path parent = path.getParent();
if (parent != null) {
// expect this to raise an exception if there is no parent
if (!getFileStatus(parent).isDirectory()) {
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
}
return createOutputStream(key, flags.contains(CreateFlag.OVERWRITE), false);
}
private FSDataOutputStream createOutputStream(String key, boolean overwrite,
boolean recursive) throws IOException {
return new FSDataOutputStream(adapter.createFile(key, overwrite, recursive),
statistics);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new UnsupportedOperationException("append() Not implemented by the "
+ getClass().getSimpleName() + " FileSystem implementation");
}
@Override
public KeyProvider getKeyProvider() throws IOException {
return adapter.getKeyProvider();
return getAdapter().getKeyProvider();
}
@Override
public URI getKeyProviderUri() throws IOException {
return adapter.getKeyProviderUri();
return getAdapter().getKeyProviderUri();
}
@Override
@ -307,566 +66,36 @@ public class OzoneFileSystem extends FileSystem
return null;
}
private class RenameIterator extends OzoneListingIterator {
private final String srcKey;
private final String dstKey;
RenameIterator(Path srcPath, Path dstPath)
throws IOException {
super(srcPath);
srcKey = pathToKey(srcPath);
dstKey = pathToKey(dstPath);
LOG.trace("rename from:{} to:{}", srcKey, dstKey);
StorageStatistics getOzoneFSOpsCountStatistics() {
return storageStatistics;
}
@Override
boolean processKey(String key) throws IOException {
String newKeyName = dstKey.concat(key.substring(srcKey.length()));
adapter.renameKey(key, newKeyName);
return true;
}
}
/**
* Check whether the source and destination path are valid and then perform
* rename from source path to destination path.
* <p>
* The rename operation is performed by renaming the keys with src as prefix.
* For such keys the prefix is changed from src to dst.
*
* @param src source path for rename
* @param dst destination path for rename
* @return true if rename operation succeeded or
* if the src and dst have the same path and are of the same type
* @throws IOException on I/O errors or if the src/dst paths are invalid.
*/
@Override
public boolean rename(Path src, Path dst) throws IOException {
protected void incrementCounter(Statistic statistic) {
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.INVOCATION_RENAME, 1);
storageStatistics.incrementCounter(statistic, 1);
}
statistics.incrementWriteOps(1);
if (src.equals(dst)) {
return true;
}
LOG.trace("rename() from:{} to:{}", src, dst);
if (src.isRoot()) {
// Cannot rename root of file system
LOG.trace("Cannot rename the root of a filesystem");
return false;
}
@Override
protected OzoneClientAdapter createAdapter(Configuration conf,
String bucketStr,
String volumeStr, String omHost, String omPort,
boolean isolatedClassloader) throws IOException {
// Cannot rename a directory to its own subdirectory
Path dstParent = dst.getParent();
while (dstParent != null && !src.equals(dstParent)) {
dstParent = dstParent.getParent();
}
Preconditions.checkArgument(dstParent == null,
"Cannot rename a directory to its own subdirectory");
// Check if the source exists
FileStatus srcStatus;
try {
srcStatus = getFileStatus(src);
} catch (FileNotFoundException fnfe) {
// source doesn't exist, return
return false;
}
this.storageStatistics =
(OzoneFSStorageStatistics) GlobalStorageStatistics.INSTANCE
.put(OzoneFSStorageStatistics.NAME,
OzoneFSStorageStatistics::new);
// Check if the destination exists
FileStatus dstStatus;
try {
dstStatus = getFileStatus(dst);
} catch (FileNotFoundException fnde) {
dstStatus = null;
}
if (isolatedClassloader) {
return OzoneClientAdapterFactory.createAdapter(volumeStr, bucketStr,
storageStatistics);
if (dstStatus == null) {
// If dst doesn't exist, check whether dst parent dir exists or not
// if the parent exists, the source can still be renamed to dst path
dstStatus = getFileStatus(dst.getParent());
if (!dstStatus.isDirectory()) {
throw new IOException(String.format(
"Failed to rename %s to %s, %s is a file", src, dst,
dst.getParent()));
}
} else {
// if dst exists and source and destination are same,
// check both the src and dst are of same type
if (srcStatus.getPath().equals(dstStatus.getPath())) {
return !srcStatus.isDirectory();
} else if (dstStatus.isDirectory()) {
// If dst is a directory, rename source as subpath of it.
// for example rename /source to /dst will lead to /dst/source
dst = new Path(dst, src.getName());
FileStatus[] statuses;
try {
statuses = listStatus(dst);
} catch (FileNotFoundException fnde) {
statuses = null;
}
if (statuses != null && statuses.length > 0) {
// If dst exists and not a directory not empty
throw new FileAlreadyExistsException(String.format(
"Failed to rename %s to %s, file already exists or not empty!",
src, dst));
}
} else {
// If dst is not a directory
throw new FileAlreadyExistsException(String.format(
"Failed to rename %s to %s, file already exists!", src, dst));
}
}
if (srcStatus.isDirectory()) {
if (dst.toString().startsWith(src.toString() + OZONE_URI_DELIMITER)) {
LOG.trace("Cannot rename a directory to a subdirectory of self");
return false;
}
}
RenameIterator iterator = new RenameIterator(src, dst);
return iterator.iterate();
}
private class DeleteIterator extends OzoneListingIterator {
private boolean recursive;
DeleteIterator(Path f, boolean recursive)
throws IOException {
super(f);
this.recursive = recursive;
if (getStatus().isDirectory()
&& !this.recursive
&& listStatus(f).length != 0) {
throw new PathIsNotEmptyDirectoryException(f.toString());
}
}
@Override
boolean processKey(String key) throws IOException {
if (key.equals("")) {
LOG.trace("Skipping deleting root directory");
return true;
} else {
LOG.trace("deleting key:" + key);
boolean succeed = adapter.deleteObject(key);
// if recursive delete is requested ignore the return value of
// deleteObject and issue deletes for other keys.
return recursive || succeed;
}
}
}
/**
* Deletes the children of the input dir path by iterating though the
* DeleteIterator.
*
* @param f directory path to be deleted
* @return true if successfully deletes all required keys, false otherwise
* @throws IOException
*/
private boolean innerDelete(Path f, boolean recursive) throws IOException {
LOG.trace("delete() path:{} recursive:{}", f, recursive);
try {
DeleteIterator iterator = new DeleteIterator(f, recursive);
return iterator.iterate();
} catch (FileNotFoundException e) {
LOG.debug("Couldn't delete {} - does not exist", f);
return false;
}
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.INVOCATION_DELETE, 1);
}
statistics.incrementWriteOps(1);
LOG.debug("Delete path {} - recursive {}", f, recursive);
FileStatus status;
try {
status = getFileStatus(f);
} catch (FileNotFoundException ex) {
LOG.warn("delete: Path does not exist: {}", f);
return false;
}
String key = pathToKey(f);
boolean result;
if (status.isDirectory()) {
LOG.debug("delete: Path is a directory: {}", f);
key = addTrailingSlashIfNeeded(key);
if (key.equals("/")) {
LOG.warn("Cannot delete root directory.");
return false;
}
result = innerDelete(f, recursive);
} else {
LOG.debug("delete: Path is a file: {}", f);
result = adapter.deleteObject(key);
}
if (result) {
// If this delete operation removes all files/directories from the
// parent direcotry, then an empty parent directory must be created.
Path parent = f.getParent();
if (parent != null && !parent.isRoot()) {
createFakeDirectoryIfNecessary(parent);
}
}
return result;
}
/**
* Create a fake parent directory key if it does not already exist and no
* other child of this parent directory exists.
*
* @param f path to the fake parent directory
* @throws IOException
*/
private void createFakeDirectoryIfNecessary(Path f) throws IOException {
String key = pathToKey(f);
if (!key.isEmpty() && !o3Exists(f)) {
LOG.debug("Creating new fake directory at {}", f);
String dirKey = addTrailingSlashIfNeeded(key);
adapter.createDirectory(dirKey);
}
}
/**
* Check if a file or directory exists corresponding to given path.
*
* @param f path to file/directory.
* @return true if it exists, false otherwise.
* @throws IOException
*/
private boolean o3Exists(final Path f) throws IOException {
Path path = makeQualified(f);
try {
getFileStatus(path);
return true;
} catch (FileNotFoundException ex) {
return false;
}
}
private class ListStatusIterator extends OzoneListingIterator {
// _fileStatuses_ maintains a list of file(s) which is either the input
// path itself or a child of the input directory path.
private List<FileStatus> fileStatuses = new ArrayList<>(LISTING_PAGE_SIZE);
// _subDirStatuses_ maintains a list of sub-dirs of the input directory
// path.
private Map<Path, FileStatus> subDirStatuses =
new HashMap<>(LISTING_PAGE_SIZE);
private Path f; // the input path
ListStatusIterator(Path f) throws IOException {
super(f);
this.f = f;
}
/**
* Add the key to the listStatus result if the key corresponds to the
* input path or is an immediate child of the input path.
*
* @param key key to be processed
* @return always returns true
* @throws IOException
*/
@Override
boolean processKey(String key) throws IOException {
Path keyPath = new Path(OZONE_URI_DELIMITER + key);
if (key.equals(getPathKey())) {
if (pathIsDirectory()) {
// if input path is a directory, we add the sub-directories and
// files under this directory.
return true;
} else {
addFileStatus(keyPath);
return true;
}
}
// Left with only subkeys now
// We add only the immediate child files and sub-dirs i.e. we go only
// upto one level down the directory tree structure.
if (pathToKey(keyPath.getParent()).equals(pathToKey(f))) {
// This key is an immediate child. Can be file or directory
if (key.endsWith(OZONE_URI_DELIMITER)) {
// Key is a directory
addSubDirStatus(keyPath);
} else {
addFileStatus(keyPath);
}
} else {
// This key is not the immediate child of the input directory. So we
// traverse the parent tree structure of this key until we get the
// immediate child of the input directory.
Path immediateChildPath = getImmediateChildPath(keyPath.getParent());
if (immediateChildPath != null) {
addSubDirStatus(immediateChildPath);
}
}
return true;
}
/**
* Adds the FileStatus of keyPath to final result of listStatus.
*
* @param filePath path to the file
* @throws FileNotFoundException
*/
void addFileStatus(Path filePath) throws IOException {
fileStatuses.add(getFileStatus(filePath));
}
/**
* Adds the FileStatus of the subdir to final result of listStatus, if not
* already included.
*
* @param dirPath path to the dir
* @throws FileNotFoundException
*/
void addSubDirStatus(Path dirPath) throws IOException {
// Check if subdir path is already included in statuses.
if (!subDirStatuses.containsKey(dirPath)) {
subDirStatuses.put(dirPath, getFileStatus(dirPath));
}
}
/**
* Traverse the parent directory structure of keyPath to determine the
* which parent/ grand-parent/.. is the immediate child of the input path f.
*
* @param keyPath path whose parent directory structure should be traversed.
* @return immediate child path of the input path f.
*/
Path getImmediateChildPath(Path keyPath) {
Path path = keyPath;
Path parent = path.getParent();
while (parent != null) {
if (pathToKey(parent).equals(pathToKey(f))) {
return path;
}
path = parent;
parent = path.getParent();
}
return null;
}
/**
* Return the result of listStatus operation. If the input path is a
* file, return the status for only that file. If the input path is a
* directory, return the statuses for all the child files and sub-dirs.
*/
FileStatus[] getStatuses() {
List<FileStatus> result = Stream.concat(
fileStatuses.stream(), subDirStatuses.values().stream())
.collect(Collectors.toList());
return result.toArray(new FileStatus[result.size()]);
}
}
@Override
public FileStatus[] listStatus(Path f) throws IOException {
if (storageStatistics != null) {
storageStatistics.incrementCounter(Statistic.INVOCATION_LIST_STATUS, 1);
}
statistics.incrementReadOps(1);
LOG.trace("listStatus() path:{}", f);
ListStatusIterator iterator = new ListStatusIterator(f);
iterator.iterate();
return iterator.getStatuses();
}
@Override
public void setWorkingDirectory(Path newDir) {
workingDir = newDir;
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
@Override
public Token<?> getDelegationToken(String renewer) throws IOException {
return adapter.getDelegationToken(renewer);
}
/**
* Get a canonical service name for this file system. If the URI is logical,
* the hostname part of the URI will be returned.
* @return a service string that uniquely identifies this file system.
*/
@Override
public String getCanonicalServiceName() {
return adapter.getCanonicalServiceName();
}
/**
* Get the username of the FS.
*
* @return the short name of the user who instantiated the FS
*/
public String getUsername() {
return userName;
}
/**
* Check whether the path is valid and then create directories.
* Directory is represented using a key with no value.
*
* @param path directory path to be created
* @return true if directory exists or created successfully.
* @throws IOException
*/
private boolean mkdir(Path path) throws IOException {
String key = pathToKey(path);
adapter.createDirectory(key);
return true;
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
LOG.trace("mkdir() path:{} ", f);
String key = pathToKey(f);
if (StringUtils.isEmpty(key)) {
return false;
}
return mkdir(f);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
if (storageStatistics != null) {
storageStatistics
.incrementCounter(Statistic.INVOCATION_GET_FILE_STATUS, 1);
}
statistics.incrementReadOps(1);
LOG.trace("getFileStatus() path:{}", f);
Path qualifiedPath = f.makeQualified(uri, workingDir);
String key = pathToKey(qualifiedPath);
return adapter.getFileStatus(key)
.makeQualified(uri, qualifiedPath, getUsername(), getUsername());
}
/**
* Turn a path (relative or otherwise) into an Ozone key.
*
* @param path the path of the file.
* @return the key of the object that represents the file.
*/
public String pathToKey(Path path) {
Objects.requireNonNull(path, "Path canf not be null!");
if (!path.isAbsolute()) {
path = new Path(workingDir, path);
}
// removing leading '/' char
String key = path.toUri().getPath().substring(1);
LOG.trace("path for key:{} is:{}", key, path);
return key;
}
/**
* Add trailing delimiter to path if it is already not present.
*
* @param key the ozone Key which needs to be appended
* @return delimiter appended key
*/
private String addTrailingSlashIfNeeded(String key) {
if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
return key + OZONE_URI_DELIMITER;
} else {
return key;
}
}
@Override
public String toString() {
return "OzoneFileSystem{URI=" + uri + ", "
+ "workingDir=" + workingDir + ", "
+ "userName=" + userName + ", "
+ "statistics=" + statistics
+ "}";
}
/**
* This class provides an interface to iterate through all the keys in the
* bucket prefixed with the input path key and process them.
* <p>
* Each implementing class should define how the keys should be processed
* through the processKey() function.
*/
private abstract class OzoneListingIterator {
private final Path path;
private final FileStatus status;
private String pathKey;
private Iterator<BasicKeyInfo> keyIterator;
OzoneListingIterator(Path path)
throws IOException {
this.path = path;
this.status = getFileStatus(path);
this.pathKey = pathToKey(path);
if (status.isDirectory()) {
this.pathKey = addTrailingSlashIfNeeded(pathKey);
}
keyIterator = adapter.listKeys(pathKey);
}
/**
* The output of processKey determines if further iteration through the
* keys should be done or not.
*
* @return true if we should continue iteration of keys, false otherwise.
* @throws IOException
*/
abstract boolean processKey(String key) throws IOException;
/**
* Iterates thorugh all the keys prefixed with the input path's key and
* processes the key though processKey().
* If for any key, the processKey() returns false, then the iteration is
* stopped and returned with false indicating that all the keys could not
* be processed successfully.
*
* @return true if all keys are processed successfully, false otherwise.
* @throws IOException
*/
boolean iterate() throws IOException {
LOG.trace("Iterating path {}", path);
if (status.isDirectory()) {
LOG.trace("Iterating directory:{}", pathKey);
while (keyIterator.hasNext()) {
BasicKeyInfo key = keyIterator.next();
LOG.trace("iterating key:{}", key.getName());
if (!processKey(key.getName())) {
return false;
}
}
return true;
} else {
LOG.trace("iterating file:{}", path);
return processKey(pathKey);
}
}
String getPathKey() {
return pathKey;
}
boolean pathIsDirectory() {
return status.isDirectory();
}
FileStatus getStatus() {
return status;
return new OzoneClientAdapterImpl(omHost,
Integer.parseInt(omPort), conf,
volumeStr, bucketStr, storageStatistics);
}
}
}

View File

@ -146,7 +146,7 @@ public class TestOzoneFileInterfaces {
fs = FileSystem.get(new URI(rootPath + "/test.txt"), conf);
}
o3fs = (OzoneFileSystem) fs;
statistics = o3fs.getOzoneFSOpsCountStatistics();
statistics = (OzoneFSStorageStatistics) o3fs.getOzoneFSOpsCountStatistics();
}
@After