HDDS-1333. OzoneFileSystem can't work with spark/hadoop2.7 because incompatible security classes. Contributed by Elek, Marton. (#653)
This commit is contained in:
parent
3e8669cbe2
commit
7ec6a31eb3
|
@ -78,11 +78,13 @@ And create a custom `core-site.xml`:
|
|||
<configuration>
|
||||
<property>
|
||||
<name>fs.o3fs.impl</name>
|
||||
<value>org.apache.hadoop.fs.ozone.OzoneFileSystem</value>
|
||||
<value>org.apache.hadoop.fs.ozone.BasicOzoneFileSystem</value>
|
||||
</property>
|
||||
</configuration>
|
||||
```
|
||||
|
||||
_Note_: You may also use `org.apache.hadoop.fs.ozone.OzoneFileSystem` without the `Basic` prefix. The `Basic` version doesn't support FS statistics and encryption zones but can work together with older hadoop versions.
|
||||
|
||||
Copy the `ozonefs.jar` file from an ozone distribution (__use the legacy version!__)
|
||||
|
||||
```
|
||||
|
@ -134,7 +136,7 @@ Write down the ozone filesystem uri as it should be used with the spark-submit c
|
|||
|
||||
```
|
||||
kubectl create serviceaccount spark -n yournamespace
|
||||
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=poc:yournamespace --namespace=yournamespace
|
||||
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=yournamespace:spark --namespace=yournamespace
|
||||
```
|
||||
## Execute the job
|
||||
|
||||
|
|
|
@ -83,7 +83,7 @@ public class OMFailoverProxyProvider implements
|
|||
/**
|
||||
* Class to store proxy information.
|
||||
*/
|
||||
public final class OMProxyInfo
|
||||
public class OMProxyInfo
|
||||
extends FailoverProxyProvider.ProxyInfo<OzoneManagerProtocolPB> {
|
||||
private InetSocketAddress address;
|
||||
private Text dtService;
|
||||
|
|
|
@ -114,6 +114,7 @@ run cp "${ROOT}/hadoop-ozone/objectstore-service/target/hadoop-ozone-objectstore
|
|||
cp -r "${ROOT}/hadoop-hdds/docs/target/classes/docs" ./
|
||||
|
||||
#Copy docker compose files
|
||||
run cp -p -R "${ROOT}/hadoop-ozone/dist/src/main/compose" .
|
||||
#compose files are preprocessed: properties (eg. project.version) are replaced first by maven.
|
||||
run cp -p -R "${ROOT}/hadoop-ozone/dist/target/compose" .
|
||||
run cp -p -r "${ROOT}/hadoop-ozone/dist/src/main/smoketest" .
|
||||
run cp -p -r "${ROOT}/hadoop-ozone/dist/src/main/blockade" .
|
||||
|
|
|
@ -120,6 +120,28 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-resources-plugin</artifactId>
|
||||
<version>3.1.0</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-resources</id>
|
||||
<phase>compile</phase>
|
||||
<goals>
|
||||
<goal>copy-resources</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${basedir}/target/compose</outputDirectory>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>src/main/compose</directory>
|
||||
<filtering>true</filtering>
|
||||
</resource>
|
||||
</resources>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.codehaus.mojo</groupId>
|
||||
<artifactId>exec-maven-plugin</artifactId>
|
||||
|
|
|
@ -49,21 +49,53 @@ services:
|
|||
environment:
|
||||
ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION
|
||||
command: ["/opt/hadoop/bin/ozone","scm"]
|
||||
hadoop3:
|
||||
hadoop32:
|
||||
image: flokkr/hadoop:3.1.0
|
||||
volumes:
|
||||
- ../..:/opt/ozone
|
||||
env_file:
|
||||
- ./docker-config
|
||||
environment:
|
||||
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-current*.jar
|
||||
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-current-@project.version@.jar
|
||||
CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.OzoneFileSystem
|
||||
command: ["watch","-n","100000","ls"]
|
||||
hadoop2:
|
||||
hadoop31:
|
||||
image: flokkr/hadoop:3.1.0
|
||||
volumes:
|
||||
- ../..:/opt/ozone
|
||||
env_file:
|
||||
- ./docker-config
|
||||
environment:
|
||||
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
|
||||
CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.OzoneFileSystem
|
||||
command: ["watch","-n","100000","ls"]
|
||||
hadoop29:
|
||||
image: flokkr/hadoop:2.9.0
|
||||
volumes:
|
||||
- ../..:/opt/ozone
|
||||
env_file:
|
||||
- ./docker-config
|
||||
environment:
|
||||
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy*.jar
|
||||
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
|
||||
CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.BasicOzoneFileSystem
|
||||
command: ["watch","-n","100000","ls"]
|
||||
hadoop27:
|
||||
image: flokkr/hadoop:2.7.3
|
||||
volumes:
|
||||
- ../..:/opt/ozone
|
||||
env_file:
|
||||
- ./docker-config
|
||||
environment:
|
||||
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
|
||||
CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.BasicOzoneFileSystem
|
||||
command: ["watch","-n","100000","ls"]
|
||||
spark:
|
||||
image: flokkr/spark
|
||||
volumes:
|
||||
- ../..:/opt/ozone
|
||||
env_file:
|
||||
- ./docker-config
|
||||
environment:
|
||||
HADOOP_CLASSPATH: /opt/ozone/share/ozone/lib/hadoop-ozone-filesystem-lib-legacy-@project.version@.jar
|
||||
CORE-SITE.XML_fs.o3fs.impl: org.apache.hadoop.fs.ozone.BasicOzoneFileSystem
|
||||
command: ["watch","-n","100000","ls"]
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
*** Settings ***
|
||||
Documentation Test ozone fs usage from Hdfs and Spark
|
||||
Library OperatingSystem
|
||||
Library String
|
||||
Resource ../../smoketest/env-compose.robot
|
||||
Resource ../../smoketest/commonlib.robot
|
||||
|
||||
*** Variables ***
|
||||
${DATANODE_HOST} datanode
|
||||
|
||||
*** Keywords ***
|
||||
|
||||
Test hadoop dfs
|
||||
[arguments] ${prefix}
|
||||
${random} = Generate Random String 5 [NUMBERS]
|
||||
${result} = Execute on host ${prefix} hdfs dfs -put /opt/hadoop/NOTICE.txt o3fs://bucket1.vol1/${prefix}-${random}
|
||||
${result} = Execute on host ${prefix} hdfs dfs -ls o3fs://bucket1.vol1/
|
||||
Should contain ${result} ${prefix}-${random}
|
||||
|
||||
*** Test Cases ***
|
||||
|
||||
Create bucket and volume to test
|
||||
${result} = Run tests on host scm createbucketenv.robot
|
||||
|
||||
Test hadoop 3.1
|
||||
Test hadoop dfs hadoop31
|
||||
|
||||
Test hadoop 3.2
|
||||
Test hadoop dfs hadoop31
|
||||
|
||||
Test hadoop 2.9
|
||||
Test hadoop dfs hadoop29
|
||||
|
||||
Test hadoop 2.7
|
||||
Test hadoop dfs hadoop27
|
||||
|
||||
Test spark 2.3
|
||||
${legacyjar} = Execute on host spark bash -c 'find /opt/ozone/share/ozone/lib/ -name *legacy*.jar'
|
||||
${postfix} = Generate Random String 5 [NUMBERS]
|
||||
${result} = Execute on host spark /opt/spark/bin/spark-submit --jars ${legacyjar} --class org.apache.spark.examples.DFSReadWriteTest /opt/spark//examples/jars/spark-examples_2.11-2.3.0.jar /opt/spark/README.md o3fs://bucket1.vol1/spark-${postfix}
|
||||
|
|
@ -14,6 +14,8 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
CORE-SITE.XML_fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem
|
||||
|
||||
OZONE-SITE.XML_ozone.om.address=om
|
||||
OZONE-SITE.XML_ozone.om.http-address=om:9874
|
||||
OZONE-SITE.XML_ozone.scm.names=scm
|
||||
|
|
|
@ -29,6 +29,12 @@ Execute
|
|||
Should Be Equal As Integers ${rc} 0
|
||||
[return] ${output}
|
||||
|
||||
Execute And Ignore Error
|
||||
[arguments] ${command}
|
||||
${rc} ${output} = Run And Return Rc And Output ${command}
|
||||
Log ${output}
|
||||
[return] ${output}
|
||||
|
||||
Execute and checkrc
|
||||
[arguments] ${command} ${expected_error_code}
|
||||
${rc} ${output} = Run And Return Rc And Output ${command}
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
*** Settings ***
|
||||
Documentation Create bucket and volume for any other testings
|
||||
Library OperatingSystem
|
||||
Resource commonlib.robot
|
||||
Test Timeout 2 minute
|
||||
|
||||
|
||||
*** Variables ***
|
||||
${volume} vol1
|
||||
${bucket} bucket1
|
||||
|
||||
|
||||
*** Keywords ***
|
||||
Create volume
|
||||
${result} = Execute ozone sh volume create /${volume} --user hadoop --quota 100TB --root
|
||||
Should not contain ${result} Failed
|
||||
Should contain ${result} Creating Volume: ${volume}
|
||||
Create bucket
|
||||
Execute ozone sh bucket create /${volume}/${bucket}
|
||||
|
||||
*** Test Cases ***
|
||||
Test ozone shell
|
||||
${result} = Execute And Ignore Error ozone sh bucket info /${volume}/${bucket}
|
||||
Run Keyword if "VOLUME_NOT_FOUND" in """${result}""" Create volume
|
||||
Run Keyword if "VOLUME_NOT_FOUND" in """${result}""" Create bucket
|
||||
Run Keyword if "BUCKET_NOT_FOUND" in """${result}""" Create bucket
|
||||
${result} = Execute ozone sh bucket info /${volume}/${bucket}
|
||||
Should not contain ${result} NOT_FOUND
|
|
@ -0,0 +1,32 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
*** Settings ***
|
||||
Documentation High level utilities to execute commands and tests in docker-compose based environments.
|
||||
Resource commonlib.robot
|
||||
|
||||
|
||||
*** Keywords ***
|
||||
|
||||
Run tests on host
|
||||
[arguments] ${host} ${robotfile}
|
||||
${result} = Execute docker-compose exec ${host} robot smoketest/${robotfile}
|
||||
|
||||
Execute on host
|
||||
[arguments] ${host} ${command}
|
||||
${rc} ${output} = Run And Return Rc And Output docker-compose exec ${host} ${command}
|
||||
Log ${output}
|
||||
Should Be Equal As Integers ${rc} 0
|
||||
[return] ${output}
|
|
@ -0,0 +1,371 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.ozone;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
||||
import org.apache.hadoop.ozone.client.OzoneBucket;
|
||||
import org.apache.hadoop.ozone.client.OzoneClient;
|
||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||
import org.apache.hadoop.ozone.client.OzoneKey;
|
||||
import org.apache.hadoop.ozone.client.OzoneVolume;
|
||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Basic Implementation of the OzoneFileSystem calls.
|
||||
* <p>
|
||||
* This is the minimal version which doesn't include any statistics.
|
||||
* <p>
|
||||
* For full featured version use OzoneClientAdapterImpl.
|
||||
*/
|
||||
public class BasicOzoneClientAdapterImpl implements OzoneClientAdapter {
|
||||
|
||||
static final Logger LOG =
|
||||
LoggerFactory.getLogger(BasicOzoneClientAdapterImpl.class);
|
||||
|
||||
private OzoneClient ozoneClient;
|
||||
private ObjectStore objectStore;
|
||||
private OzoneVolume volume;
|
||||
private OzoneBucket bucket;
|
||||
private ReplicationType replicationType;
|
||||
private ReplicationFactor replicationFactor;
|
||||
private boolean securityEnabled;
|
||||
|
||||
/**
|
||||
* Create new OzoneClientAdapter implementation.
|
||||
*
|
||||
* @param volumeStr Name of the volume to use.
|
||||
* @param bucketStr Name of the bucket to use
|
||||
* @throws IOException In case of a problem.
|
||||
*/
|
||||
public BasicOzoneClientAdapterImpl(String volumeStr, String bucketStr)
|
||||
throws IOException {
|
||||
this(createConf(), volumeStr, bucketStr);
|
||||
}
|
||||
|
||||
private static OzoneConfiguration createConf() {
|
||||
ClassLoader contextClassLoader =
|
||||
Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(null);
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
Thread.currentThread().setContextClassLoader(contextClassLoader);
|
||||
return conf;
|
||||
}
|
||||
|
||||
public BasicOzoneClientAdapterImpl(OzoneConfiguration conf, String volumeStr,
|
||||
String bucketStr)
|
||||
throws IOException {
|
||||
this(null, -1, conf, volumeStr, bucketStr);
|
||||
}
|
||||
|
||||
public BasicOzoneClientAdapterImpl(String omHost, int omPort,
|
||||
Configuration hadoopConf, String volumeStr, String bucketStr)
|
||||
throws IOException {
|
||||
|
||||
ClassLoader contextClassLoader =
|
||||
Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(null);
|
||||
OzoneConfiguration conf;
|
||||
if (hadoopConf instanceof OzoneConfiguration) {
|
||||
conf = (OzoneConfiguration) hadoopConf;
|
||||
} else {
|
||||
conf = new OzoneConfiguration(hadoopConf);
|
||||
}
|
||||
|
||||
SecurityConfig secConfig = new SecurityConfig(conf);
|
||||
|
||||
if (secConfig.isSecurityEnabled()) {
|
||||
this.securityEnabled = true;
|
||||
}
|
||||
|
||||
try {
|
||||
String replicationTypeConf =
|
||||
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
|
||||
OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT);
|
||||
|
||||
int replicationCountConf = conf.getInt(OzoneConfigKeys.OZONE_REPLICATION,
|
||||
OzoneConfigKeys.OZONE_REPLICATION_DEFAULT);
|
||||
|
||||
if (StringUtils.isNotEmpty(omHost) && omPort != -1) {
|
||||
this.ozoneClient =
|
||||
OzoneClientFactory.getRpcClient(omHost, omPort, conf);
|
||||
} else {
|
||||
this.ozoneClient =
|
||||
OzoneClientFactory.getRpcClient(conf);
|
||||
}
|
||||
objectStore = ozoneClient.getObjectStore();
|
||||
this.volume = objectStore.getVolume(volumeStr);
|
||||
this.bucket = volume.getBucket(bucketStr);
|
||||
this.replicationType = ReplicationType.valueOf(replicationTypeConf);
|
||||
this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(contextClassLoader);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
ozoneClient.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream createInputStream(String key) throws IOException {
|
||||
incrementCounter(Statistic.OBJECTS_READ);
|
||||
return bucket.readKey(key).getInputStream();
|
||||
}
|
||||
|
||||
protected void incrementCounter(Statistic objectsRead) {
|
||||
//noop: Use OzoneClientAdapterImpl which supports statistics.
|
||||
}
|
||||
|
||||
@Override
|
||||
public OzoneFSOutputStream createKey(String key) throws IOException {
|
||||
incrementCounter(Statistic.OBJECTS_CREATED);
|
||||
OzoneOutputStream ozoneOutputStream =
|
||||
bucket.createKey(key, 0, replicationType, replicationFactor,
|
||||
new HashMap<>());
|
||||
return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameKey(String key, String newKeyName) throws IOException {
|
||||
incrementCounter(Statistic.OBJECTS_RENAMED);
|
||||
bucket.renameKey(key, newKeyName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to fetch the key metadata info.
|
||||
*
|
||||
* @param keyName key whose metadata information needs to be fetched
|
||||
* @return metadata info of the key
|
||||
*/
|
||||
@Override
|
||||
public BasicKeyInfo getKeyInfo(String keyName) {
|
||||
try {
|
||||
incrementCounter(Statistic.OBJECTS_QUERY);
|
||||
OzoneKey key = bucket.getKey(keyName);
|
||||
return new BasicKeyInfo(
|
||||
keyName,
|
||||
key.getModificationTime(),
|
||||
key.getDataSize()
|
||||
);
|
||||
} catch (IOException e) {
|
||||
LOG.trace("Key:{} does not exist", keyName);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to check if an Ozone key is representing a directory.
|
||||
*
|
||||
* @param key key to be checked as a directory
|
||||
* @return true if key is a directory, false otherwise
|
||||
*/
|
||||
@Override
|
||||
public boolean isDirectory(BasicKeyInfo key) {
|
||||
LOG.trace("key name:{} size:{}", key.getName(),
|
||||
key.getDataSize());
|
||||
return key.getName().endsWith(OZONE_URI_DELIMITER)
|
||||
&& (key.getDataSize() == 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create an directory specified by key name in bucket.
|
||||
*
|
||||
* @param keyName key name to be created as directory
|
||||
* @return true if the key is created, false otherwise
|
||||
*/
|
||||
@Override
|
||||
public boolean createDirectory(String keyName) {
|
||||
try {
|
||||
LOG.trace("creating dir for key:{}", keyName);
|
||||
incrementCounter(Statistic.OBJECTS_CREATED);
|
||||
bucket.createKey(keyName, 0, replicationType, replicationFactor,
|
||||
new HashMap<>()).close();
|
||||
return true;
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("create key failed for key:{}", keyName, ioe);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to delete an object specified by key name in bucket.
|
||||
*
|
||||
* @param keyName key name to be deleted
|
||||
* @return true if the key is deleted, false otherwise
|
||||
*/
|
||||
@Override
|
||||
public boolean deleteObject(String keyName) {
|
||||
LOG.trace("issuing delete for key" + keyName);
|
||||
try {
|
||||
incrementCounter(Statistic.OBJECTS_DELETED);
|
||||
bucket.deleteKey(keyName);
|
||||
return true;
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("delete key failed " + ioe.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCreationTime() {
|
||||
return bucket.getCreationTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNextKey(String key) {
|
||||
incrementCounter(Statistic.OBJECTS_LIST);
|
||||
return bucket.listKeys(key).hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<BasicKeyInfo> listKeys(String pathKey) {
|
||||
incrementCounter(Statistic.OBJECTS_LIST);
|
||||
return new IteratorAdapter(bucket.listKeys(pathKey));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<OzoneTokenIdentifier> getDelegationToken(String renewer)
|
||||
throws IOException {
|
||||
if (!securityEnabled) {
|
||||
return null;
|
||||
}
|
||||
Token<OzoneTokenIdentifier> token = ozoneClient.getObjectStore()
|
||||
.getDelegationToken(renewer == null ? null : new Text(renewer));
|
||||
token.setKind(OzoneTokenIdentifier.KIND_NAME);
|
||||
return token;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyProvider getKeyProvider() throws IOException {
|
||||
return objectStore.getKeyProvider();
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getKeyProviderUri() throws IOException {
|
||||
return objectStore.getKeyProviderUri();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
return objectStore.getCanonicalServiceName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Ozone Delegation Token Renewer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static class Renewer extends TokenRenewer {
|
||||
|
||||
//Ensure that OzoneConfiguration files are loaded before trying to use
|
||||
// the renewer.
|
||||
static {
|
||||
OzoneConfiguration.activate();
|
||||
}
|
||||
|
||||
public Text getKind() {
|
||||
return OzoneTokenIdentifier.KIND_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
return getKind().equals(kind);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isManaged(Token<?> token) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renew(Token<?> token, Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
Token<OzoneTokenIdentifier> ozoneDt =
|
||||
(Token<OzoneTokenIdentifier>) token;
|
||||
OzoneClient ozoneClient =
|
||||
OzoneClientFactory.getRpcClient(conf);
|
||||
return ozoneClient.getObjectStore().renewDelegationToken(ozoneDt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(Token<?> token, Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
Token<OzoneTokenIdentifier> ozoneDt =
|
||||
(Token<OzoneTokenIdentifier>) token;
|
||||
OzoneClient ozoneClient =
|
||||
OzoneClientFactory.getRpcClient(conf);
|
||||
ozoneClient.getObjectStore().cancelDelegationToken(ozoneDt);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adapter to convert OzoneKey to a safe and simple Key implementation.
|
||||
*/
|
||||
public static class IteratorAdapter implements Iterator<BasicKeyInfo> {
|
||||
|
||||
private Iterator<? extends OzoneKey> original;
|
||||
|
||||
public IteratorAdapter(Iterator<? extends OzoneKey> listKeys) {
|
||||
this.original = listKeys;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return original.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BasicKeyInfo next() {
|
||||
OzoneKey next = original.next();
|
||||
if (next == null) {
|
||||
return null;
|
||||
} else {
|
||||
return new BasicKeyInfo(
|
||||
next.getName(),
|
||||
next.getModificationTime(),
|
||||
next.getDataSize()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,940 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.ozone;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
|
||||
import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
|
||||
import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* The minimal Ozone Filesystem implementation.
|
||||
* <p>
|
||||
* This is a basic version which doesn't extend
|
||||
* KeyProviderTokenIssuer and doesn't include statistics. It can be used
|
||||
* from older hadoop version. For newer hadoop version use the full featured
|
||||
* OzoneFileSystem.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class BasicOzoneFileSystem extends FileSystem {
|
||||
static final Logger LOG =
|
||||
LoggerFactory.getLogger(BasicOzoneFileSystem.class);
|
||||
|
||||
/**
|
||||
* The Ozone client for connecting to Ozone server.
|
||||
*/
|
||||
|
||||
private URI uri;
|
||||
private String userName;
|
||||
private Path workingDir;
|
||||
|
||||
private OzoneClientAdapter adapter;
|
||||
|
||||
private static final Pattern URL_SCHEMA_PATTERN =
|
||||
Pattern.compile("([^\\.]+)\\.([^\\.]+)\\.{0,1}(.*)");
|
||||
|
||||
private static final String URI_EXCEPTION_TEXT = "Ozone file system url " +
|
||||
"should be either one of the two forms: " +
|
||||
"o3fs://bucket.volume/key OR " +
|
||||
"o3fs://bucket.volume.om-host.example.com:5678/key";
|
||||
|
||||
@Override
|
||||
public void initialize(URI name, Configuration conf) throws IOException {
|
||||
super.initialize(name, conf);
|
||||
setConf(conf);
|
||||
Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name);
|
||||
Preconditions.checkArgument(getScheme().equals(name.getScheme()),
|
||||
"Invalid scheme provided in " + name);
|
||||
|
||||
String authority = name.getAuthority();
|
||||
|
||||
Matcher matcher = URL_SCHEMA_PATTERN.matcher(authority);
|
||||
|
||||
if (!matcher.matches()) {
|
||||
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
|
||||
}
|
||||
String bucketStr = matcher.group(1);
|
||||
String volumeStr = matcher.group(2);
|
||||
String remaining = matcher.groupCount() == 3 ? matcher.group(3) : null;
|
||||
|
||||
String omHost = null;
|
||||
String omPort = String.valueOf(-1);
|
||||
if (!isEmpty(remaining)) {
|
||||
String[] parts = remaining.split(":");
|
||||
if (parts.length != 2) {
|
||||
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
|
||||
}
|
||||
omHost = parts[0];
|
||||
omPort = parts[1];
|
||||
if (!isNumber(omPort)) {
|
||||
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
uri = new URIBuilder().setScheme(OZONE_URI_SCHEME)
|
||||
.setHost(authority)
|
||||
.build();
|
||||
LOG.trace("Ozone URI for ozfs initialization is " + uri);
|
||||
|
||||
//isolated is the default for ozonefs-lib-legacy which includes the
|
||||
// /ozonefs.txt, otherwise the default is false. It could be overridden.
|
||||
boolean defaultValue =
|
||||
BasicOzoneFileSystem.class.getClassLoader()
|
||||
.getResource("ozonefs.txt")
|
||||
!= null;
|
||||
|
||||
//Use string here instead of the constant as constant may not be available
|
||||
//on the classpath of a hadoop 2.7
|
||||
boolean isolatedClassloader =
|
||||
conf.getBoolean("ozone.fs.isolated-classloader", defaultValue);
|
||||
|
||||
this.adapter = createAdapter(conf, bucketStr, volumeStr, omHost, omPort,
|
||||
isolatedClassloader);
|
||||
|
||||
try {
|
||||
this.userName =
|
||||
UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
} catch (IOException e) {
|
||||
this.userName = OZONE_DEFAULT_USER;
|
||||
}
|
||||
this.workingDir = new Path(OZONE_USER_DIR, this.userName)
|
||||
.makeQualified(this.uri, this.workingDir);
|
||||
} catch (URISyntaxException ue) {
|
||||
final String msg = "Invalid Ozone endpoint " + name;
|
||||
LOG.error(msg, ue);
|
||||
throw new IOException(msg, ue);
|
||||
}
|
||||
}
|
||||
|
||||
protected OzoneClientAdapter createAdapter(Configuration conf,
|
||||
String bucketStr,
|
||||
String volumeStr, String omHost, String omPort,
|
||||
boolean isolatedClassloader) throws IOException {
|
||||
|
||||
if (isolatedClassloader) {
|
||||
|
||||
return OzoneClientAdapterFactory
|
||||
.createAdapter(volumeStr, bucketStr);
|
||||
|
||||
} else {
|
||||
|
||||
return new BasicOzoneClientAdapterImpl(omHost,
|
||||
Integer.parseInt(omPort), conf,
|
||||
volumeStr, bucketStr);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
adapter.close();
|
||||
} finally {
|
||||
super.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return OZONE_URI_SCHEME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
||||
incrementCounter(Statistic.INVOCATION_OPEN);
|
||||
statistics.incrementWriteOps(1);
|
||||
LOG.trace("open() path:{}", f);
|
||||
final FileStatus fileStatus = getFileStatus(f);
|
||||
final String key = pathToKey(f);
|
||||
if (fileStatus.isDirectory()) {
|
||||
throw new FileNotFoundException("Can't open directory " + f + " to read");
|
||||
}
|
||||
|
||||
return new FSDataInputStream(
|
||||
new OzoneFSInputStream(adapter.createInputStream(key)));
|
||||
}
|
||||
|
||||
protected void incrementCounter(Statistic statistic) {
|
||||
//don't do anyting in this default implementation.
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream create(Path f, FsPermission permission,
|
||||
boolean overwrite, int bufferSize,
|
||||
short replication, long blockSize,
|
||||
Progressable progress) throws IOException {
|
||||
LOG.trace("create() path:{}", f);
|
||||
incrementCounter(Statistic.INVOCATION_CREATE);
|
||||
statistics.incrementWriteOps(1);
|
||||
final String key = pathToKey(f);
|
||||
final FileStatus status;
|
||||
try {
|
||||
status = getFileStatus(f);
|
||||
if (status.isDirectory()) {
|
||||
throw new FileAlreadyExistsException(f + " is a directory");
|
||||
} else {
|
||||
if (!overwrite) {
|
||||
// path references a file and overwrite is disabled
|
||||
throw new FileAlreadyExistsException(f + " already exists");
|
||||
}
|
||||
LOG.trace("Overwriting file {}", f);
|
||||
adapter.deleteObject(key);
|
||||
}
|
||||
} catch (FileNotFoundException ignored) {
|
||||
// this means the file is not found
|
||||
}
|
||||
|
||||
// We pass null to FSDataOutputStream so it won't count writes that
|
||||
// are being buffered to a file
|
||||
return new FSDataOutputStream(adapter.createKey(key), statistics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream createNonRecursive(Path path,
|
||||
FsPermission permission,
|
||||
EnumSet<CreateFlag> flags,
|
||||
int bufferSize,
|
||||
short replication,
|
||||
long blockSize,
|
||||
Progressable progress) throws IOException {
|
||||
incrementCounter(Statistic.INVOCATION_CREATE_NON_RECURSIVE);
|
||||
statistics.incrementWriteOps(1);
|
||||
final Path parent = path.getParent();
|
||||
if (parent != null) {
|
||||
// expect this to raise an exception if there is no parent
|
||||
if (!getFileStatus(parent).isDirectory()) {
|
||||
throw new FileAlreadyExistsException("Not a directory: " + parent);
|
||||
}
|
||||
}
|
||||
return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
|
||||
bufferSize, replication, blockSize, progress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream append(Path f, int bufferSize,
|
||||
Progressable progress) throws IOException {
|
||||
throw new UnsupportedOperationException("append() Not implemented by the "
|
||||
+ getClass().getSimpleName() + " FileSystem implementation");
|
||||
}
|
||||
|
||||
private class RenameIterator extends OzoneListingIterator {
|
||||
private final String srcKey;
|
||||
private final String dstKey;
|
||||
|
||||
RenameIterator(Path srcPath, Path dstPath)
|
||||
throws IOException {
|
||||
super(srcPath);
|
||||
srcKey = pathToKey(srcPath);
|
||||
dstKey = pathToKey(dstPath);
|
||||
LOG.trace("rename from:{} to:{}", srcKey, dstKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean processKey(String key) throws IOException {
|
||||
String newKeyName = dstKey.concat(key.substring(srcKey.length()));
|
||||
adapter.renameKey(key, newKeyName);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the source and destination path are valid and then perform
|
||||
* rename from source path to destination path.
|
||||
* <p>
|
||||
* The rename operation is performed by renaming the keys with src as prefix.
|
||||
* For such keys the prefix is changed from src to dst.
|
||||
*
|
||||
* @param src source path for rename
|
||||
* @param dst destination path for rename
|
||||
* @return true if rename operation succeeded or
|
||||
* if the src and dst have the same path and are of the same type
|
||||
* @throws IOException on I/O errors or if the src/dst paths are invalid.
|
||||
*/
|
||||
@Override
|
||||
public boolean rename(Path src, Path dst) throws IOException {
|
||||
incrementCounter(Statistic.INVOCATION_RENAME);
|
||||
statistics.incrementWriteOps(1);
|
||||
if (src.equals(dst)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
LOG.trace("rename() from:{} to:{}", src, dst);
|
||||
if (src.isRoot()) {
|
||||
// Cannot rename root of file system
|
||||
LOG.trace("Cannot rename the root of a filesystem");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Cannot rename a directory to its own subdirectory
|
||||
Path dstParent = dst.getParent();
|
||||
while (dstParent != null && !src.equals(dstParent)) {
|
||||
dstParent = dstParent.getParent();
|
||||
}
|
||||
Preconditions.checkArgument(dstParent == null,
|
||||
"Cannot rename a directory to its own subdirectory");
|
||||
// Check if the source exists
|
||||
FileStatus srcStatus;
|
||||
try {
|
||||
srcStatus = getFileStatus(src);
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// source doesn't exist, return
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if the destination exists
|
||||
FileStatus dstStatus;
|
||||
try {
|
||||
dstStatus = getFileStatus(dst);
|
||||
} catch (FileNotFoundException fnde) {
|
||||
dstStatus = null;
|
||||
}
|
||||
|
||||
if (dstStatus == null) {
|
||||
// If dst doesn't exist, check whether dst parent dir exists or not
|
||||
// if the parent exists, the source can still be renamed to dst path
|
||||
dstStatus = getFileStatus(dst.getParent());
|
||||
if (!dstStatus.isDirectory()) {
|
||||
throw new IOException(String.format(
|
||||
"Failed to rename %s to %s, %s is a file", src, dst,
|
||||
dst.getParent()));
|
||||
}
|
||||
} else {
|
||||
// if dst exists and source and destination are same,
|
||||
// check both the src and dst are of same type
|
||||
if (srcStatus.getPath().equals(dstStatus.getPath())) {
|
||||
return !srcStatus.isDirectory();
|
||||
} else if (dstStatus.isDirectory()) {
|
||||
// If dst is a directory, rename source as subpath of it.
|
||||
// for example rename /source to /dst will lead to /dst/source
|
||||
dst = new Path(dst, src.getName());
|
||||
FileStatus[] statuses;
|
||||
try {
|
||||
statuses = listStatus(dst);
|
||||
} catch (FileNotFoundException fnde) {
|
||||
statuses = null;
|
||||
}
|
||||
|
||||
if (statuses != null && statuses.length > 0) {
|
||||
// If dst exists and not a directory not empty
|
||||
throw new FileAlreadyExistsException(String.format(
|
||||
"Failed to rename %s to %s, file already exists or not empty!",
|
||||
src, dst));
|
||||
}
|
||||
} else {
|
||||
// If dst is not a directory
|
||||
throw new FileAlreadyExistsException(String.format(
|
||||
"Failed to rename %s to %s, file already exists!", src, dst));
|
||||
}
|
||||
}
|
||||
|
||||
if (srcStatus.isDirectory()) {
|
||||
if (dst.toString().startsWith(src.toString() + OZONE_URI_DELIMITER)) {
|
||||
LOG.trace("Cannot rename a directory to a subdirectory of self");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
RenameIterator iterator = new RenameIterator(src, dst);
|
||||
return iterator.iterate();
|
||||
}
|
||||
|
||||
private class DeleteIterator extends OzoneListingIterator {
|
||||
private boolean recursive;
|
||||
|
||||
DeleteIterator(Path f, boolean recursive)
|
||||
throws IOException {
|
||||
super(f);
|
||||
this.recursive = recursive;
|
||||
if (getStatus().isDirectory()
|
||||
&& !this.recursive
|
||||
&& listStatus(f).length != 0) {
|
||||
throw new PathIsNotEmptyDirectoryException(f.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean processKey(String key) throws IOException {
|
||||
if (key.equals("")) {
|
||||
LOG.trace("Skipping deleting root directory");
|
||||
return true;
|
||||
} else {
|
||||
LOG.trace("deleting key:" + key);
|
||||
boolean succeed = adapter.deleteObject(key);
|
||||
// if recursive delete is requested ignore the return value of
|
||||
// deleteObject and issue deletes for other keys.
|
||||
return recursive || succeed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the children of the input dir path by iterating though the
|
||||
* DeleteIterator.
|
||||
*
|
||||
* @param f directory path to be deleted
|
||||
* @return true if successfully deletes all required keys, false otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean innerDelete(Path f, boolean recursive) throws IOException {
|
||||
LOG.trace("delete() path:{} recursive:{}", f, recursive);
|
||||
try {
|
||||
DeleteIterator iterator = new DeleteIterator(f, recursive);
|
||||
return iterator.iterate();
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.debug("Couldn't delete {} - does not exist", f);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
incrementCounter(Statistic.INVOCATION_DELETE);
|
||||
statistics.incrementWriteOps(1);
|
||||
LOG.debug("Delete path {} - recursive {}", f, recursive);
|
||||
FileStatus status;
|
||||
try {
|
||||
status = getFileStatus(f);
|
||||
} catch (FileNotFoundException ex) {
|
||||
LOG.warn("delete: Path does not exist: {}", f);
|
||||
return false;
|
||||
}
|
||||
|
||||
String key = pathToKey(f);
|
||||
boolean result;
|
||||
|
||||
if (status.isDirectory()) {
|
||||
LOG.debug("delete: Path is a directory: {}", f);
|
||||
key = addTrailingSlashIfNeeded(key);
|
||||
|
||||
if (key.equals("/")) {
|
||||
LOG.warn("Cannot delete root directory.");
|
||||
return false;
|
||||
}
|
||||
|
||||
result = innerDelete(f, recursive);
|
||||
} else {
|
||||
LOG.debug("delete: Path is a file: {}", f);
|
||||
result = adapter.deleteObject(key);
|
||||
}
|
||||
|
||||
if (result) {
|
||||
// If this delete operation removes all files/directories from the
|
||||
// parent direcotry, then an empty parent directory must be created.
|
||||
Path parent = f.getParent();
|
||||
if (parent != null && !parent.isRoot()) {
|
||||
createFakeDirectoryIfNecessary(parent);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a fake parent directory key if it does not already exist and no
|
||||
* other child of this parent directory exists.
|
||||
*
|
||||
* @param f path to the fake parent directory
|
||||
* @throws IOException
|
||||
*/
|
||||
private void createFakeDirectoryIfNecessary(Path f) throws IOException {
|
||||
String key = pathToKey(f);
|
||||
if (!key.isEmpty() && !o3Exists(f)) {
|
||||
LOG.debug("Creating new fake directory at {}", f);
|
||||
String dirKey = addTrailingSlashIfNeeded(key);
|
||||
adapter.createDirectory(dirKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a file or directory exists corresponding to given path.
|
||||
*
|
||||
* @param f path to file/directory.
|
||||
* @return true if it exists, false otherwise.
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean o3Exists(final Path f) throws IOException {
|
||||
Path path = makeQualified(f);
|
||||
try {
|
||||
getFileStatus(path);
|
||||
return true;
|
||||
} catch (FileNotFoundException ex) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private class ListStatusIterator extends OzoneListingIterator {
|
||||
// _fileStatuses_ maintains a list of file(s) which is either the input
|
||||
// path itself or a child of the input directory path.
|
||||
private List<FileStatus> fileStatuses = new ArrayList<>(LISTING_PAGE_SIZE);
|
||||
// _subDirStatuses_ maintains a list of sub-dirs of the input directory
|
||||
// path.
|
||||
private Map<Path, FileStatus> subDirStatuses =
|
||||
new HashMap<>(LISTING_PAGE_SIZE);
|
||||
private Path f; // the input path
|
||||
|
||||
ListStatusIterator(Path f) throws IOException {
|
||||
super(f);
|
||||
this.f = f;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the key to the listStatus result if the key corresponds to the
|
||||
* input path or is an immediate child of the input path.
|
||||
*
|
||||
* @param key key to be processed
|
||||
* @return always returns true
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
boolean processKey(String key) throws IOException {
|
||||
Path keyPath = new Path(OZONE_URI_DELIMITER + key);
|
||||
if (key.equals(getPathKey())) {
|
||||
if (pathIsDirectory()) {
|
||||
// if input path is a directory, we add the sub-directories and
|
||||
// files under this directory.
|
||||
return true;
|
||||
} else {
|
||||
addFileStatus(keyPath);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// Left with only subkeys now
|
||||
// We add only the immediate child files and sub-dirs i.e. we go only
|
||||
// upto one level down the directory tree structure.
|
||||
if (pathToKey(keyPath.getParent()).equals(pathToKey(f))) {
|
||||
// This key is an immediate child. Can be file or directory
|
||||
if (key.endsWith(OZONE_URI_DELIMITER)) {
|
||||
// Key is a directory
|
||||
addSubDirStatus(keyPath);
|
||||
} else {
|
||||
addFileStatus(keyPath);
|
||||
}
|
||||
} else {
|
||||
// This key is not the immediate child of the input directory. So we
|
||||
// traverse the parent tree structure of this key until we get the
|
||||
// immediate child of the input directory.
|
||||
Path immediateChildPath = getImmediateChildPath(keyPath.getParent());
|
||||
if (immediateChildPath != null) {
|
||||
addSubDirStatus(immediateChildPath);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the FileStatus of keyPath to final result of listStatus.
|
||||
*
|
||||
* @param filePath path to the file
|
||||
* @throws FileNotFoundException
|
||||
*/
|
||||
void addFileStatus(Path filePath) throws IOException {
|
||||
fileStatuses.add(getFileStatus(filePath));
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the FileStatus of the subdir to final result of listStatus, if not
|
||||
* already included.
|
||||
*
|
||||
* @param dirPath path to the dir
|
||||
* @throws FileNotFoundException
|
||||
*/
|
||||
void addSubDirStatus(Path dirPath) throws FileNotFoundException {
|
||||
// Check if subdir path is already included in statuses.
|
||||
if (!subDirStatuses.containsKey(dirPath)) {
|
||||
subDirStatuses.put(dirPath, innerGetFileStatusForDir(dirPath));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Traverse the parent directory structure of keyPath to determine the
|
||||
* which parent/ grand-parent/.. is the immediate child of the input path f.
|
||||
*
|
||||
* @param keyPath path whose parent directory structure should be traversed.
|
||||
* @return immediate child path of the input path f.
|
||||
*/
|
||||
Path getImmediateChildPath(Path keyPath) {
|
||||
Path path = keyPath;
|
||||
Path parent = path.getParent();
|
||||
while (parent != null) {
|
||||
if (pathToKey(parent).equals(pathToKey(f))) {
|
||||
return path;
|
||||
}
|
||||
path = parent;
|
||||
parent = path.getParent();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the result of listStatus operation. If the input path is a
|
||||
* file, return the status for only that file. If the input path is a
|
||||
* directory, return the statuses for all the child files and sub-dirs.
|
||||
*/
|
||||
FileStatus[] getStatuses() {
|
||||
List<FileStatus> result = Stream.concat(
|
||||
fileStatuses.stream(), subDirStatuses.values().stream())
|
||||
.collect(Collectors.toList());
|
||||
return result.toArray(new FileStatus[result.size()]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus[] listStatus(Path f) throws IOException {
|
||||
incrementCounter(Statistic.INVOCATION_LIST_STATUS);
|
||||
statistics.incrementReadOps(1);
|
||||
LOG.trace("listStatus() path:{}", f);
|
||||
ListStatusIterator iterator = new ListStatusIterator(f);
|
||||
iterator.iterate();
|
||||
return iterator.getStatuses();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWorkingDirectory(Path newDir) {
|
||||
workingDir = newDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getWorkingDirectory() {
|
||||
return workingDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||
return adapter.getDelegationToken(renewer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a canonical service name for this file system. If the URI is logical,
|
||||
* the hostname part of the URI will be returned.
|
||||
*
|
||||
* @return a service string that uniquely identifies this file system.
|
||||
*/
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
return adapter.getCanonicalServiceName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the username of the FS.
|
||||
*
|
||||
* @return the short name of the user who instantiated the FS
|
||||
*/
|
||||
public String getUsername() {
|
||||
return userName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the path is valid and then create directories.
|
||||
* Directory is represented using a key with no value.
|
||||
* All the non-existent parent directories are also created.
|
||||
*
|
||||
* @param path directory path to be created
|
||||
* @return true if directory exists or created successfully.
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean mkdir(Path path) throws IOException {
|
||||
Path fPart = path;
|
||||
Path prevfPart = null;
|
||||
do {
|
||||
LOG.trace("validating path:{}", fPart);
|
||||
try {
|
||||
FileStatus fileStatus = getFileStatus(fPart);
|
||||
if (fileStatus.isDirectory()) {
|
||||
// If path exists and a directory, exit
|
||||
break;
|
||||
} else {
|
||||
// Found a file here, rollback and delete newly created directories
|
||||
LOG.trace("Found a file with same name as directory, path:{}", fPart);
|
||||
if (prevfPart != null) {
|
||||
delete(prevfPart, true);
|
||||
}
|
||||
throw new FileAlreadyExistsException(String.format(
|
||||
"Can't make directory for path '%s', it is a file.", fPart));
|
||||
}
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
LOG.trace("creating directory for fpart:{}", fPart);
|
||||
String key = pathToKey(fPart);
|
||||
String dirKey = addTrailingSlashIfNeeded(key);
|
||||
if (!adapter.createDirectory(dirKey)) {
|
||||
// Directory creation failed here,
|
||||
// rollback and delete newly created directories
|
||||
LOG.trace("Directory creation failed, path:{}", fPart);
|
||||
if (prevfPart != null) {
|
||||
delete(prevfPart, true);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
prevfPart = fPart;
|
||||
fPart = fPart.getParent();
|
||||
} while (fPart != null);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||
LOG.trace("mkdir() path:{} ", f);
|
||||
String key = pathToKey(f);
|
||||
if (isEmpty(key)) {
|
||||
return false;
|
||||
}
|
||||
return mkdir(f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus getFileStatus(Path f) throws IOException {
|
||||
incrementCounter(Statistic.INVOCATION_GET_FILE_STATUS);
|
||||
statistics.incrementReadOps(1);
|
||||
LOG.trace("getFileStatus() path:{}", f);
|
||||
Path qualifiedPath = f.makeQualified(uri, workingDir);
|
||||
String key = pathToKey(qualifiedPath);
|
||||
|
||||
if (key.length() == 0) {
|
||||
return new FileStatus(0, true, 1, 0,
|
||||
adapter.getCreationTime(), qualifiedPath);
|
||||
}
|
||||
|
||||
// Check if the key exists
|
||||
BasicKeyInfo ozoneKey = adapter.getKeyInfo(key);
|
||||
if (ozoneKey != null) {
|
||||
LOG.debug("Found exact file for path {}: normal file", f);
|
||||
return new FileStatus(ozoneKey.getDataSize(), false, 1,
|
||||
getDefaultBlockSize(f), ozoneKey.getModificationTime(), 0,
|
||||
FsPermission.getFileDefault(), getUsername(), getUsername(),
|
||||
qualifiedPath);
|
||||
}
|
||||
|
||||
return innerGetFileStatusForDir(f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the FileStatus for input directory path.
|
||||
* They key corresponding to input path is appended with a trailing slash
|
||||
* to return only the corresponding directory key in the bucket.
|
||||
*
|
||||
* @param f directory path
|
||||
* @return FileStatus for the input directory path
|
||||
* @throws FileNotFoundException
|
||||
*/
|
||||
public FileStatus innerGetFileStatusForDir(Path f)
|
||||
throws FileNotFoundException {
|
||||
Path qualifiedPath = f.makeQualified(uri, workingDir);
|
||||
String key = pathToKey(qualifiedPath);
|
||||
key = addTrailingSlashIfNeeded(key);
|
||||
|
||||
BasicKeyInfo ozoneKey = adapter.getKeyInfo(key);
|
||||
if (ozoneKey != null) {
|
||||
if (adapter.isDirectory(ozoneKey)) {
|
||||
// Key is a directory
|
||||
LOG.debug("Found file (with /) for path {}: fake directory", f);
|
||||
} else {
|
||||
// Key is a file with trailing slash
|
||||
LOG.warn("Found file (with /) for path {}: real file? should not " +
|
||||
"happen", f, key);
|
||||
}
|
||||
return new FileStatus(0, true, 1, 0,
|
||||
ozoneKey.getModificationTime(), 0,
|
||||
FsPermission.getDirDefault(), getUsername(), getUsername(),
|
||||
qualifiedPath);
|
||||
}
|
||||
|
||||
// File or directory corresponding to input path does not exist.
|
||||
// Check if there exists a key prefixed with this key.
|
||||
boolean hasChildren = adapter.hasNextKey(key);
|
||||
if (hasChildren) {
|
||||
return new FileStatus(0, true, 1, 0, 0, 0, FsPermission.getDirDefault(),
|
||||
getUsername(), getUsername(), qualifiedPath);
|
||||
}
|
||||
|
||||
throw new FileNotFoundException(f + ": No such file or directory!");
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn a path (relative or otherwise) into an Ozone key.
|
||||
*
|
||||
* @param path the path of the file.
|
||||
* @return the key of the object that represents the file.
|
||||
*/
|
||||
public String pathToKey(Path path) {
|
||||
Objects.requireNonNull(path, "Path canf not be null!");
|
||||
if (!path.isAbsolute()) {
|
||||
path = new Path(workingDir, path);
|
||||
}
|
||||
// removing leading '/' char
|
||||
String key = path.toUri().getPath().substring(1);
|
||||
LOG.trace("path for key:{} is:{}", key, path);
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add trailing delimiter to path if it is already not present.
|
||||
*
|
||||
* @param key the ozone Key which needs to be appended
|
||||
* @return delimiter appended key
|
||||
*/
|
||||
private String addTrailingSlashIfNeeded(String key) {
|
||||
if (!isEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
|
||||
return key + OZONE_URI_DELIMITER;
|
||||
} else {
|
||||
return key;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OzoneFileSystem{URI=" + uri + ", "
|
||||
+ "workingDir=" + workingDir + ", "
|
||||
+ "userName=" + userName + ", "
|
||||
+ "statistics=" + statistics
|
||||
+ "}";
|
||||
}
|
||||
|
||||
/**
|
||||
* This class provides an interface to iterate through all the keys in the
|
||||
* bucket prefixed with the input path key and process them.
|
||||
* <p>
|
||||
* Each implementing class should define how the keys should be processed
|
||||
* through the processKey() function.
|
||||
*/
|
||||
private abstract class OzoneListingIterator {
|
||||
private final Path path;
|
||||
private final FileStatus status;
|
||||
private String pathKey;
|
||||
private Iterator<BasicKeyInfo> keyIterator;
|
||||
|
||||
OzoneListingIterator(Path path)
|
||||
throws IOException {
|
||||
this.path = path;
|
||||
this.status = getFileStatus(path);
|
||||
this.pathKey = pathToKey(path);
|
||||
if (status.isDirectory()) {
|
||||
this.pathKey = addTrailingSlashIfNeeded(pathKey);
|
||||
}
|
||||
keyIterator = adapter.listKeys(pathKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* The output of processKey determines if further iteration through the
|
||||
* keys should be done or not.
|
||||
*
|
||||
* @return true if we should continue iteration of keys, false otherwise.
|
||||
* @throws IOException
|
||||
*/
|
||||
abstract boolean processKey(String key) throws IOException;
|
||||
|
||||
/**
|
||||
* Iterates thorugh all the keys prefixed with the input path's key and
|
||||
* processes the key though processKey().
|
||||
* If for any key, the processKey() returns false, then the iteration is
|
||||
* stopped and returned with false indicating that all the keys could not
|
||||
* be processed successfully.
|
||||
*
|
||||
* @return true if all keys are processed successfully, false otherwise.
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean iterate() throws IOException {
|
||||
LOG.trace("Iterating path {}", path);
|
||||
if (status.isDirectory()) {
|
||||
LOG.trace("Iterating directory:{}", pathKey);
|
||||
while (keyIterator.hasNext()) {
|
||||
BasicKeyInfo key = keyIterator.next();
|
||||
LOG.trace("iterating key:{}", key.getName());
|
||||
if (!processKey(key.getName())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
LOG.trace("iterating file:{}", path);
|
||||
return processKey(pathKey);
|
||||
}
|
||||
}
|
||||
|
||||
String getPathKey() {
|
||||
return pathKey;
|
||||
}
|
||||
|
||||
boolean pathIsDirectory() {
|
||||
return status.isDirectory();
|
||||
}
|
||||
|
||||
FileStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
|
||||
public OzoneClientAdapter getAdapter() {
|
||||
return adapter;
|
||||
}
|
||||
|
||||
public boolean isEmpty(CharSequence cs) {
|
||||
return cs == null || cs.length() == 0;
|
||||
}
|
||||
|
||||
public boolean isNumber(String number) {
|
||||
try {
|
||||
Integer.parseInt(number);
|
||||
} catch (NumberFormatException ex) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -51,6 +51,7 @@ public class FilteredClassLoader extends URLClassLoader {
|
|||
public FilteredClassLoader(URL[] urls, ClassLoader parent) {
|
||||
super(urls, null);
|
||||
delegatedClasses.add("org.apache.hadoop.fs.ozone.OzoneClientAdapter");
|
||||
delegatedClasses.add("org.apache.hadoop.security.token.Token");
|
||||
delegatedClasses.add("org.apache.hadoop.fs.ozone.BasicKeyInfo");
|
||||
delegatedClasses.add("org.apache.hadoop.fs.ozone.OzoneFSOutputStream");
|
||||
delegatedClasses.add("org.apache.hadoop.fs.ozone.OzoneFSStorageStatistics");
|
||||
|
|
|
@ -27,7 +27,7 @@ import java.net.URI;
|
|||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* Lightweight adapter to separte hadoop/ozone classes.
|
||||
* Lightweight adapter to separate hadoop/ozone classes.
|
||||
* <p>
|
||||
* This class contains only the bare minimum Ozone classes in the signature.
|
||||
* It could be loaded by a different classloader because only the objects in
|
||||
|
|
|
@ -25,6 +25,8 @@ import java.util.ArrayList;
|
|||
import java.util.Enumeration;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.StorageStatistics;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -44,7 +46,7 @@ public final class OzoneClientAdapterFactory {
|
|||
public static OzoneClientAdapter createAdapter(
|
||||
String volumeStr,
|
||||
String bucketStr) throws IOException {
|
||||
return createAdapter(volumeStr, bucketStr,
|
||||
return createAdapter(volumeStr, bucketStr, true,
|
||||
(aClass) -> (OzoneClientAdapter) aClass
|
||||
.getConstructor(String.class, String.class)
|
||||
.newInstance(
|
||||
|
@ -56,9 +58,8 @@ public final class OzoneClientAdapterFactory {
|
|||
public static OzoneClientAdapter createAdapter(
|
||||
String volumeStr,
|
||||
String bucketStr,
|
||||
OzoneFSStorageStatistics storageStatistics)
|
||||
throws IOException {
|
||||
return createAdapter(volumeStr, bucketStr,
|
||||
StorageStatistics storageStatistics) throws IOException {
|
||||
return createAdapter(volumeStr, bucketStr, false,
|
||||
(aClass) -> (OzoneClientAdapter) aClass
|
||||
.getConstructor(String.class, String.class,
|
||||
OzoneFSStorageStatistics.class)
|
||||
|
@ -72,9 +73,11 @@ public final class OzoneClientAdapterFactory {
|
|||
public static OzoneClientAdapter createAdapter(
|
||||
String volumeStr,
|
||||
String bucketStr,
|
||||
boolean basic,
|
||||
OzoneClientAdapterCreator creator) throws IOException {
|
||||
|
||||
ClassLoader currentClassLoader = OzoneFileSystem.class.getClassLoader();
|
||||
ClassLoader currentClassLoader =
|
||||
OzoneClientAdapterFactory.class.getClassLoader();
|
||||
List<URL> urls = new ArrayList<>();
|
||||
|
||||
findEmbeddedLibsUrl(urls, currentClassLoader);
|
||||
|
@ -99,10 +102,18 @@ public final class OzoneClientAdapterFactory {
|
|||
reflectionUtils.getMethod("getClassByName", String.class)
|
||||
.invoke(null, "org.apache.ratis.grpc.GrpcFactory");
|
||||
|
||||
Class<?> aClass = classLoader
|
||||
.loadClass("org.apache.hadoop.fs.ozone.OzoneClientAdapterImpl");
|
||||
Class<?> adapterClass = null;
|
||||
if (basic) {
|
||||
adapterClass = classLoader
|
||||
.loadClass(
|
||||
"org.apache.hadoop.fs.ozone.BasicOzoneClientAdapterImpl");
|
||||
} else {
|
||||
adapterClass = classLoader
|
||||
.loadClass(
|
||||
"org.apache.hadoop.fs.ozone.OzoneClientAdapterImpl");
|
||||
}
|
||||
OzoneClientAdapter ozoneClientAdapter =
|
||||
creator.createOzoneClientAdapter(aClass);
|
||||
creator.createOzoneClientAdapter(adapterClass);
|
||||
|
||||
Thread.currentThread().setContextClassLoader(contextClassLoader);
|
||||
|
||||
|
@ -134,7 +145,8 @@ public final class OzoneClientAdapterFactory {
|
|||
//marker file is added to the jar to make it easier to find the URL
|
||||
// for the current jar.
|
||||
String markerFile = "ozonefs.txt";
|
||||
ClassLoader currentClassLoader = OzoneFileSystem.class.getClassLoader();
|
||||
ClassLoader currentClassLoader =
|
||||
OzoneClientAdapterFactory.class.getClassLoader();
|
||||
|
||||
URL ozFs = currentClassLoader
|
||||
.getResource(markerFile);
|
||||
|
|
|
@ -17,380 +17,45 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs.ozone;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.security.x509.SecurityConfig;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
||||
import org.apache.hadoop.ozone.client.OzoneBucket;
|
||||
import org.apache.hadoop.ozone.client.OzoneClient;
|
||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||
import org.apache.hadoop.ozone.client.OzoneKey;
|
||||
import org.apache.hadoop.ozone.client.OzoneVolume;
|
||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Implementation of the OzoneFileSystem calls.
|
||||
*/
|
||||
public class OzoneClientAdapterImpl implements OzoneClientAdapter {
|
||||
public class OzoneClientAdapterImpl extends BasicOzoneClientAdapterImpl {
|
||||
|
||||
static final Logger LOG =
|
||||
LoggerFactory.getLogger(OzoneClientAdapterImpl.class);
|
||||
|
||||
private OzoneClient ozoneClient;
|
||||
private ObjectStore objectStore;
|
||||
private OzoneVolume volume;
|
||||
private OzoneBucket bucket;
|
||||
private ReplicationType replicationType;
|
||||
private ReplicationFactor replicationFactor;
|
||||
private OzoneFSStorageStatistics storageStatistics;
|
||||
private boolean securityEnabled;
|
||||
/**
|
||||
* Create new OzoneClientAdapter implementation.
|
||||
*
|
||||
* @param volumeStr Name of the volume to use.
|
||||
* @param bucketStr Name of the bucket to use
|
||||
* @param storageStatistics Storage statistic (optional, can be null)
|
||||
* @throws IOException In case of a problem.
|
||||
*/
|
||||
|
||||
public OzoneClientAdapterImpl(String volumeStr, String bucketStr,
|
||||
OzoneFSStorageStatistics storageStatistics) throws IOException {
|
||||
this(createConf(), volumeStr, bucketStr, storageStatistics);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create new OzoneClientAdapter implementation.
|
||||
*
|
||||
* @param volumeStr Name of the volume to use.
|
||||
* @param bucketStr Name of the bucket to use
|
||||
* @throws IOException In case of a problem.
|
||||
*/
|
||||
public OzoneClientAdapterImpl(String volumeStr, String bucketStr)
|
||||
OzoneFSStorageStatistics storageStatistics)
|
||||
throws IOException {
|
||||
this(createConf(), volumeStr, bucketStr, null);
|
||||
super(volumeStr, bucketStr);
|
||||
this.storageStatistics = storageStatistics;
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static OzoneConfiguration createConf() {
|
||||
ClassLoader contextClassLoader =
|
||||
Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(null);
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
Thread.currentThread().setContextClassLoader(contextClassLoader);
|
||||
return conf;
|
||||
}
|
||||
|
||||
public OzoneClientAdapterImpl(OzoneConfiguration conf, String volumeStr,
|
||||
String bucketStr, OzoneFSStorageStatistics storageStatistics)
|
||||
public OzoneClientAdapterImpl(
|
||||
OzoneConfiguration conf, String volumeStr, String bucketStr,
|
||||
OzoneFSStorageStatistics storageStatistics)
|
||||
throws IOException {
|
||||
this(null, -1, conf, volumeStr, bucketStr, storageStatistics);
|
||||
super(conf, volumeStr, bucketStr);
|
||||
this.storageStatistics = storageStatistics;
|
||||
}
|
||||
|
||||
public OzoneClientAdapterImpl(String omHost, int omPort,
|
||||
Configuration hadoopConf, String volumeStr, String bucketStr,
|
||||
OzoneFSStorageStatistics storageStatistics) throws IOException {
|
||||
|
||||
ClassLoader contextClassLoader =
|
||||
Thread.currentThread().getContextClassLoader();
|
||||
Thread.currentThread().setContextClassLoader(null);
|
||||
OzoneConfiguration conf;
|
||||
if (hadoopConf instanceof OzoneConfiguration) {
|
||||
conf = (OzoneConfiguration) hadoopConf;
|
||||
} else {
|
||||
conf = new OzoneConfiguration(hadoopConf);
|
||||
}
|
||||
|
||||
SecurityConfig secConfig = new SecurityConfig(conf);
|
||||
|
||||
if (secConfig.isSecurityEnabled()) {
|
||||
this.securityEnabled = true;
|
||||
}
|
||||
|
||||
try {
|
||||
String replicationTypeConf =
|
||||
conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
|
||||
OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT);
|
||||
|
||||
int replicationCountConf = conf.getInt(OzoneConfigKeys.OZONE_REPLICATION,
|
||||
OzoneConfigKeys.OZONE_REPLICATION_DEFAULT);
|
||||
|
||||
if (StringUtils.isNotEmpty(omHost) && omPort != -1) {
|
||||
this.ozoneClient =
|
||||
OzoneClientFactory.getRpcClient(omHost, omPort, conf);
|
||||
} else {
|
||||
this.ozoneClient =
|
||||
OzoneClientFactory.getRpcClient(conf);
|
||||
}
|
||||
objectStore = ozoneClient.getObjectStore();
|
||||
this.volume = objectStore.getVolume(volumeStr);
|
||||
this.bucket = volume.getBucket(bucketStr);
|
||||
this.replicationType = ReplicationType.valueOf(replicationTypeConf);
|
||||
this.replicationFactor = ReplicationFactor.valueOf(replicationCountConf);
|
||||
this.storageStatistics = storageStatistics;
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(contextClassLoader);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
ozoneClient.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream createInputStream(String key) throws IOException {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.OBJECTS_READ, 1);
|
||||
}
|
||||
return bucket.readKey(key).getInputStream();
|
||||
}
|
||||
|
||||
@Override
|
||||
public OzoneFSOutputStream createKey(String key) throws IOException {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.OBJECTS_CREATED, 1);
|
||||
}
|
||||
OzoneOutputStream ozoneOutputStream =
|
||||
bucket.createKey(key, 0, replicationType, replicationFactor,
|
||||
new HashMap<>());
|
||||
return new OzoneFSOutputStream(ozoneOutputStream.getOutputStream());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void renameKey(String key, String newKeyName) throws IOException {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.OBJECTS_RENAMED, 1);
|
||||
}
|
||||
bucket.renameKey(key, newKeyName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to fetch the key metadata info.
|
||||
*
|
||||
* @param keyName key whose metadata information needs to be fetched
|
||||
* @return metadata info of the key
|
||||
*/
|
||||
@Override
|
||||
public BasicKeyInfo getKeyInfo(String keyName) {
|
||||
try {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.OBJECTS_QUERY, 1);
|
||||
}
|
||||
OzoneKey key = bucket.getKey(keyName);
|
||||
return new BasicKeyInfo(
|
||||
keyName,
|
||||
key.getModificationTime(),
|
||||
key.getDataSize()
|
||||
);
|
||||
} catch (IOException e) {
|
||||
LOG.trace("Key:{} does not exist", keyName);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to check if an Ozone key is representing a directory.
|
||||
*
|
||||
* @param key key to be checked as a directory
|
||||
* @return true if key is a directory, false otherwise
|
||||
*/
|
||||
@Override
|
||||
public boolean isDirectory(BasicKeyInfo key) {
|
||||
LOG.trace("key name:{} size:{}", key.getName(),
|
||||
key.getDataSize());
|
||||
return key.getName().endsWith(OZONE_URI_DELIMITER)
|
||||
&& (key.getDataSize() == 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create an directory specified by key name in bucket.
|
||||
*
|
||||
* @param keyName key name to be created as directory
|
||||
* @return true if the key is created, false otherwise
|
||||
*/
|
||||
@Override
|
||||
public boolean createDirectory(String keyName) {
|
||||
try {
|
||||
LOG.trace("creating dir for key:{}", keyName);
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.OBJECTS_CREATED, 1);
|
||||
}
|
||||
bucket.createKey(keyName, 0, replicationType, replicationFactor,
|
||||
new HashMap<>()).close();
|
||||
return true;
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("create key failed for key:{}", keyName, ioe);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to delete an object specified by key name in bucket.
|
||||
*
|
||||
* @param keyName key name to be deleted
|
||||
* @return true if the key is deleted, false otherwise
|
||||
*/
|
||||
@Override
|
||||
public boolean deleteObject(String keyName) {
|
||||
LOG.trace("issuing delete for key" + keyName);
|
||||
try {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.OBJECTS_DELETED, 1);
|
||||
}
|
||||
bucket.deleteKey(keyName);
|
||||
return true;
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("delete key failed " + ioe.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCreationTime() {
|
||||
return bucket.getCreationTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNextKey(String key) {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.OBJECTS_LIST, 1);
|
||||
}
|
||||
return bucket.listKeys(key).hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<BasicKeyInfo> listKeys(String pathKey) {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.OBJECTS_LIST, 1);
|
||||
}
|
||||
return new IteratorAdapter(bucket.listKeys(pathKey));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<OzoneTokenIdentifier> getDelegationToken(String renewer)
|
||||
OzoneFSStorageStatistics storageStatistics)
|
||||
throws IOException {
|
||||
if (!securityEnabled) {
|
||||
return null;
|
||||
}
|
||||
Token<OzoneTokenIdentifier> token = ozoneClient.getObjectStore()
|
||||
.getDelegationToken(renewer == null ? null : new Text(renewer));
|
||||
token.setKind(OzoneTokenIdentifier.KIND_NAME);
|
||||
return token;
|
||||
|
||||
super(omHost, omPort, hadoopConf, volumeStr, bucketStr);
|
||||
this.storageStatistics = storageStatistics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyProvider getKeyProvider() throws IOException {
|
||||
return objectStore.getKeyProvider();
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getKeyProviderUri() throws IOException {
|
||||
return objectStore.getKeyProviderUri();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
return objectStore.getCanonicalServiceName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Ozone Delegation Token Renewer.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static class Renewer extends TokenRenewer {
|
||||
|
||||
//Ensure that OzoneConfiguration files are loaded before trying to use
|
||||
// the renewer.
|
||||
static {
|
||||
OzoneConfiguration.activate();
|
||||
}
|
||||
|
||||
public Text getKind() {
|
||||
return OzoneTokenIdentifier.KIND_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
return getKind().equals(kind);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isManaged(Token<?> token) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renew(Token<?> token, Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
Token<OzoneTokenIdentifier> ozoneDt =
|
||||
(Token<OzoneTokenIdentifier>) token;
|
||||
OzoneClient ozoneClient =
|
||||
OzoneClientFactory.getRpcClient(conf);
|
||||
return ozoneClient.getObjectStore().renewDelegationToken(ozoneDt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(Token<?> token, Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
Token<OzoneTokenIdentifier> ozoneDt =
|
||||
(Token<OzoneTokenIdentifier>) token;
|
||||
OzoneClient ozoneClient =
|
||||
OzoneClientFactory.getRpcClient(conf);
|
||||
ozoneClient.getObjectStore().cancelDelegationToken(ozoneDt);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adapter to convert OzoneKey to a safe and simple Key implementation.
|
||||
*/
|
||||
public static class IteratorAdapter implements Iterator<BasicKeyInfo> {
|
||||
|
||||
private Iterator<? extends OzoneKey> original;
|
||||
|
||||
public IteratorAdapter(Iterator<? extends OzoneKey> listKeys) {
|
||||
this.original = listKeys;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return original.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BasicKeyInfo next() {
|
||||
OzoneKey next = original.next();
|
||||
if (next == null) {
|
||||
return null;
|
||||
} else {
|
||||
return new BasicKeyInfo(
|
||||
next.getName(),
|
||||
next.getModificationTime(),
|
||||
next.getDataSize()
|
||||
);
|
||||
}
|
||||
protected void incrementCounter(Statistic objectsRead) {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(objectsRead, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,53 +18,18 @@
|
|||
|
||||
package org.apache.hadoop.fs.ozone;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
|
||||
import org.apache.hadoop.fs.GlobalStorageStatistics;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.fs.StorageStatistics;
|
||||
import org.apache.hadoop.security.token.DelegationTokenIssuer;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
|
||||
import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER;
|
||||
import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* The Ozone Filesystem implementation.
|
||||
|
@ -76,242 +41,19 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class OzoneFileSystem extends FileSystem
|
||||
public class OzoneFileSystem extends BasicOzoneFileSystem
|
||||
implements KeyProviderTokenIssuer {
|
||||
static final Logger LOG = LoggerFactory.getLogger(OzoneFileSystem.class);
|
||||
|
||||
/**
|
||||
* The Ozone client for connecting to Ozone server.
|
||||
*/
|
||||
|
||||
private URI uri;
|
||||
private String userName;
|
||||
private Path workingDir;
|
||||
|
||||
private OzoneClientAdapter adapter;
|
||||
|
||||
private OzoneFSStorageStatistics storageStatistics;
|
||||
|
||||
private static final Pattern URL_SCHEMA_PATTERN =
|
||||
Pattern.compile("([^\\.]+)\\.([^\\.]+)\\.{0,1}(.*)");
|
||||
|
||||
private static final String URI_EXCEPTION_TEXT = "Ozone file system url " +
|
||||
"should be either one of the two forms: " +
|
||||
"o3fs://bucket.volume/key OR " +
|
||||
"o3fs://bucket.volume.om-host.example.com:5678/key";
|
||||
|
||||
@Override
|
||||
public void initialize(URI name, Configuration conf) throws IOException {
|
||||
super.initialize(name, conf);
|
||||
setConf(conf);
|
||||
Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name);
|
||||
Preconditions.checkArgument(getScheme().equals(name.getScheme()),
|
||||
"Invalid scheme provided in " + name);
|
||||
|
||||
String authority = name.getAuthority();
|
||||
|
||||
Matcher matcher = URL_SCHEMA_PATTERN.matcher(authority);
|
||||
|
||||
if (!matcher.matches()) {
|
||||
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
|
||||
}
|
||||
String bucketStr = matcher.group(1);
|
||||
String volumeStr = matcher.group(2);
|
||||
String remaining = matcher.groupCount() == 3 ? matcher.group(3) : null;
|
||||
|
||||
String omHost = null;
|
||||
String omPort = String.valueOf(-1);
|
||||
if (StringUtils.isNotEmpty(remaining)) {
|
||||
String[] parts = remaining.split(":");
|
||||
if (parts.length != 2) {
|
||||
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
|
||||
}
|
||||
omHost = parts[0];
|
||||
omPort = parts[1];
|
||||
if (!NumberUtils.isParsable(omPort)) {
|
||||
throw new IllegalArgumentException(URI_EXCEPTION_TEXT);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
uri = new URIBuilder().setScheme(OZONE_URI_SCHEME)
|
||||
.setHost(authority)
|
||||
.build();
|
||||
LOG.trace("Ozone URI for ozfs initialization is " + uri);
|
||||
|
||||
//isolated is the default for ozonefs-lib-legacy which includes the
|
||||
// /ozonefs.txt, otherwise the default is false. It could be overridden.
|
||||
boolean defaultValue =
|
||||
OzoneFileSystem.class.getClassLoader().getResource("ozonefs.txt")
|
||||
!= null;
|
||||
|
||||
//Use string here instead of the constant as constant may not be available
|
||||
//on the classpath of a hadoop 2.7
|
||||
boolean isolatedClassloader =
|
||||
conf.getBoolean("ozone.fs.isolated-classloader", defaultValue);
|
||||
|
||||
try {
|
||||
//register only to the GlobalStorageStatistics if the class exists.
|
||||
//This is required to support hadoop versions <2.7
|
||||
Class.forName("org.apache.hadoop.fs.GlobalStorageStatistics");
|
||||
storageStatistics = (OzoneFSStorageStatistics)
|
||||
GlobalStorageStatistics.INSTANCE
|
||||
.put(OzoneFSStorageStatistics.NAME,
|
||||
OzoneFSStorageStatistics::new);
|
||||
} catch (ClassNotFoundException e) {
|
||||
//we don't support storage statistics for hadoop2.7 and older
|
||||
}
|
||||
|
||||
if (isolatedClassloader) {
|
||||
try {
|
||||
//register only to the GlobalStorageStatistics if the class exists.
|
||||
//This is required to support hadoop versions <2.7
|
||||
Class.forName("org.apache.hadoop.fs.GlobalStorageStatistics");
|
||||
this.adapter =
|
||||
OzoneClientAdapterFactory.createAdapter(volumeStr, bucketStr,
|
||||
storageStatistics);
|
||||
} catch (ClassNotFoundException e) {
|
||||
this.adapter =
|
||||
OzoneClientAdapterFactory.createAdapter(volumeStr, bucketStr);
|
||||
}
|
||||
} else {
|
||||
|
||||
this.adapter = new OzoneClientAdapterImpl(omHost,
|
||||
Integer.parseInt(omPort), conf,
|
||||
volumeStr, bucketStr, storageStatistics);
|
||||
}
|
||||
|
||||
try {
|
||||
this.userName =
|
||||
UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
} catch (IOException e) {
|
||||
this.userName = OZONE_DEFAULT_USER;
|
||||
}
|
||||
this.workingDir = new Path(OZONE_USER_DIR, this.userName)
|
||||
.makeQualified(this.uri, this.workingDir);
|
||||
} catch (URISyntaxException ue) {
|
||||
final String msg = "Invalid Ozone endpoint " + name;
|
||||
LOG.error(msg, ue);
|
||||
throw new IOException(msg, ue);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
adapter.close();
|
||||
} finally {
|
||||
super.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return OZONE_URI_SCHEME;
|
||||
}
|
||||
|
||||
Statistics getFsStatistics() {
|
||||
return statistics;
|
||||
}
|
||||
|
||||
OzoneFSStorageStatistics getOzoneFSOpsCountStatistics() {
|
||||
return storageStatistics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.INVOCATION_OPEN, 1);
|
||||
}
|
||||
statistics.incrementWriteOps(1);
|
||||
LOG.trace("open() path:{}", f);
|
||||
final FileStatus fileStatus = getFileStatus(f);
|
||||
final String key = pathToKey(f);
|
||||
if (fileStatus.isDirectory()) {
|
||||
throw new FileNotFoundException("Can't open directory " + f + " to read");
|
||||
}
|
||||
|
||||
return new FSDataInputStream(
|
||||
new OzoneFSInputStream(adapter.createInputStream(key)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream create(Path f, FsPermission permission,
|
||||
boolean overwrite, int bufferSize,
|
||||
short replication, long blockSize,
|
||||
Progressable progress) throws IOException {
|
||||
LOG.trace("create() path:{}", f);
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.INVOCATION_CREATE, 1);
|
||||
}
|
||||
statistics.incrementWriteOps(1);
|
||||
final String key = pathToKey(f);
|
||||
final FileStatus status;
|
||||
try {
|
||||
status = getFileStatus(f);
|
||||
if (status.isDirectory()) {
|
||||
throw new FileAlreadyExistsException(f + " is a directory");
|
||||
} else {
|
||||
if (!overwrite) {
|
||||
// path references a file and overwrite is disabled
|
||||
throw new FileAlreadyExistsException(f + " already exists");
|
||||
}
|
||||
LOG.trace("Overwriting file {}", f);
|
||||
adapter.deleteObject(key);
|
||||
}
|
||||
} catch (FileNotFoundException ignored) {
|
||||
// this means the file is not found
|
||||
}
|
||||
|
||||
// We pass null to FSDataOutputStream so it won't count writes that
|
||||
// are being buffered to a file
|
||||
return new FSDataOutputStream(adapter.createKey(key), statistics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream createNonRecursive(Path path,
|
||||
FsPermission permission,
|
||||
EnumSet<CreateFlag> flags,
|
||||
int bufferSize,
|
||||
short replication,
|
||||
long blockSize,
|
||||
Progressable progress) throws IOException {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(
|
||||
Statistic.INVOCATION_CREATE_NON_RECURSIVE, 1);
|
||||
}
|
||||
statistics.incrementWriteOps(1);
|
||||
final Path parent = path.getParent();
|
||||
if (parent != null) {
|
||||
// expect this to raise an exception if there is no parent
|
||||
if (!getFileStatus(parent).isDirectory()) {
|
||||
throw new FileAlreadyExistsException("Not a directory: " + parent);
|
||||
}
|
||||
}
|
||||
return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
|
||||
bufferSize, replication, blockSize, progress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream append(Path f, int bufferSize,
|
||||
Progressable progress) throws IOException {
|
||||
throw new UnsupportedOperationException("append() Not implemented by the "
|
||||
+ getClass().getSimpleName() + " FileSystem implementation");
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyProvider getKeyProvider() throws IOException {
|
||||
return adapter.getKeyProvider();
|
||||
return getAdapter().getKeyProvider();
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getKeyProviderUri() throws IOException {
|
||||
return adapter.getKeyProviderUri();
|
||||
return getAdapter().getKeyProviderUri();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -324,656 +66,36 @@ public class OzoneFileSystem extends FileSystem
|
|||
return null;
|
||||
}
|
||||
|
||||
private class RenameIterator extends OzoneListingIterator {
|
||||
private final String srcKey;
|
||||
private final String dstKey;
|
||||
StorageStatistics getOzoneFSOpsCountStatistics() {
|
||||
return storageStatistics;
|
||||
}
|
||||
|
||||
RenameIterator(Path srcPath, Path dstPath)
|
||||
throws IOException {
|
||||
super(srcPath);
|
||||
srcKey = pathToKey(srcPath);
|
||||
dstKey = pathToKey(dstPath);
|
||||
LOG.trace("rename from:{} to:{}", srcKey, dstKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean processKey(String key) throws IOException {
|
||||
String newKeyName = dstKey.concat(key.substring(srcKey.length()));
|
||||
adapter.renameKey(key, newKeyName);
|
||||
return true;
|
||||
@Override
|
||||
protected void incrementCounter(Statistic statistic) {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(statistic, 1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the source and destination path are valid and then perform
|
||||
* rename from source path to destination path.
|
||||
* <p>
|
||||
* The rename operation is performed by renaming the keys with src as prefix.
|
||||
* For such keys the prefix is changed from src to dst.
|
||||
*
|
||||
* @param src source path for rename
|
||||
* @param dst destination path for rename
|
||||
* @return true if rename operation succeeded or
|
||||
* if the src and dst have the same path and are of the same type
|
||||
* @throws IOException on I/O errors or if the src/dst paths are invalid.
|
||||
*/
|
||||
@Override
|
||||
public boolean rename(Path src, Path dst) throws IOException {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.INVOCATION_RENAME, 1);
|
||||
}
|
||||
statistics.incrementWriteOps(1);
|
||||
if (src.equals(dst)) {
|
||||
return true;
|
||||
}
|
||||
protected OzoneClientAdapter createAdapter(Configuration conf,
|
||||
String bucketStr,
|
||||
String volumeStr, String omHost, String omPort,
|
||||
boolean isolatedClassloader) throws IOException {
|
||||
|
||||
LOG.trace("rename() from:{} to:{}", src, dst);
|
||||
if (src.isRoot()) {
|
||||
// Cannot rename root of file system
|
||||
LOG.trace("Cannot rename the root of a filesystem");
|
||||
return false;
|
||||
}
|
||||
this.storageStatistics =
|
||||
(OzoneFSStorageStatistics) GlobalStorageStatistics.INSTANCE
|
||||
.put(OzoneFSStorageStatistics.NAME,
|
||||
OzoneFSStorageStatistics::new);
|
||||
|
||||
// Cannot rename a directory to its own subdirectory
|
||||
Path dstParent = dst.getParent();
|
||||
while (dstParent != null && !src.equals(dstParent)) {
|
||||
dstParent = dstParent.getParent();
|
||||
}
|
||||
Preconditions.checkArgument(dstParent == null,
|
||||
"Cannot rename a directory to its own subdirectory");
|
||||
// Check if the source exists
|
||||
FileStatus srcStatus;
|
||||
try {
|
||||
srcStatus = getFileStatus(src);
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// source doesn't exist, return
|
||||
return false;
|
||||
}
|
||||
if (isolatedClassloader) {
|
||||
return OzoneClientAdapterFactory.createAdapter(volumeStr, bucketStr,
|
||||
storageStatistics);
|
||||
|
||||
// Check if the destination exists
|
||||
FileStatus dstStatus;
|
||||
try {
|
||||
dstStatus = getFileStatus(dst);
|
||||
} catch (FileNotFoundException fnde) {
|
||||
dstStatus = null;
|
||||
}
|
||||
|
||||
if (dstStatus == null) {
|
||||
// If dst doesn't exist, check whether dst parent dir exists or not
|
||||
// if the parent exists, the source can still be renamed to dst path
|
||||
dstStatus = getFileStatus(dst.getParent());
|
||||
if (!dstStatus.isDirectory()) {
|
||||
throw new IOException(String.format(
|
||||
"Failed to rename %s to %s, %s is a file", src, dst,
|
||||
dst.getParent()));
|
||||
}
|
||||
} else {
|
||||
// if dst exists and source and destination are same,
|
||||
// check both the src and dst are of same type
|
||||
if (srcStatus.getPath().equals(dstStatus.getPath())) {
|
||||
return !srcStatus.isDirectory();
|
||||
} else if (dstStatus.isDirectory()) {
|
||||
// If dst is a directory, rename source as subpath of it.
|
||||
// for example rename /source to /dst will lead to /dst/source
|
||||
dst = new Path(dst, src.getName());
|
||||
FileStatus[] statuses;
|
||||
try {
|
||||
statuses = listStatus(dst);
|
||||
} catch (FileNotFoundException fnde) {
|
||||
statuses = null;
|
||||
}
|
||||
|
||||
if (statuses != null && statuses.length > 0) {
|
||||
// If dst exists and not a directory not empty
|
||||
throw new FileAlreadyExistsException(String.format(
|
||||
"Failed to rename %s to %s, file already exists or not empty!",
|
||||
src, dst));
|
||||
}
|
||||
} else {
|
||||
// If dst is not a directory
|
||||
throw new FileAlreadyExistsException(String.format(
|
||||
"Failed to rename %s to %s, file already exists!", src, dst));
|
||||
}
|
||||
}
|
||||
|
||||
if (srcStatus.isDirectory()) {
|
||||
if (dst.toString().startsWith(src.toString() + OZONE_URI_DELIMITER)) {
|
||||
LOG.trace("Cannot rename a directory to a subdirectory of self");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
RenameIterator iterator = new RenameIterator(src, dst);
|
||||
return iterator.iterate();
|
||||
}
|
||||
|
||||
private class DeleteIterator extends OzoneListingIterator {
|
||||
private boolean recursive;
|
||||
|
||||
DeleteIterator(Path f, boolean recursive)
|
||||
throws IOException {
|
||||
super(f);
|
||||
this.recursive = recursive;
|
||||
if (getStatus().isDirectory()
|
||||
&& !this.recursive
|
||||
&& listStatus(f).length != 0) {
|
||||
throw new PathIsNotEmptyDirectoryException(f.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean processKey(String key) throws IOException {
|
||||
if (key.equals("")) {
|
||||
LOG.trace("Skipping deleting root directory");
|
||||
return true;
|
||||
} else {
|
||||
LOG.trace("deleting key:" + key);
|
||||
boolean succeed = adapter.deleteObject(key);
|
||||
// if recursive delete is requested ignore the return value of
|
||||
// deleteObject and issue deletes for other keys.
|
||||
return recursive || succeed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the children of the input dir path by iterating though the
|
||||
* DeleteIterator.
|
||||
*
|
||||
* @param f directory path to be deleted
|
||||
* @return true if successfully deletes all required keys, false otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean innerDelete(Path f, boolean recursive) throws IOException {
|
||||
LOG.trace("delete() path:{} recursive:{}", f, recursive);
|
||||
try {
|
||||
DeleteIterator iterator = new DeleteIterator(f, recursive);
|
||||
return iterator.iterate();
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.debug("Couldn't delete {} - does not exist", f);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.INVOCATION_DELETE, 1);
|
||||
}
|
||||
statistics.incrementWriteOps(1);
|
||||
LOG.debug("Delete path {} - recursive {}", f, recursive);
|
||||
FileStatus status;
|
||||
try {
|
||||
status = getFileStatus(f);
|
||||
} catch (FileNotFoundException ex) {
|
||||
LOG.warn("delete: Path does not exist: {}", f);
|
||||
return false;
|
||||
}
|
||||
|
||||
String key = pathToKey(f);
|
||||
boolean result;
|
||||
|
||||
if (status.isDirectory()) {
|
||||
LOG.debug("delete: Path is a directory: {}", f);
|
||||
key = addTrailingSlashIfNeeded(key);
|
||||
|
||||
if (key.equals("/")) {
|
||||
LOG.warn("Cannot delete root directory.");
|
||||
return false;
|
||||
}
|
||||
|
||||
result = innerDelete(f, recursive);
|
||||
} else {
|
||||
LOG.debug("delete: Path is a file: {}", f);
|
||||
result = adapter.deleteObject(key);
|
||||
}
|
||||
|
||||
if (result) {
|
||||
// If this delete operation removes all files/directories from the
|
||||
// parent direcotry, then an empty parent directory must be created.
|
||||
Path parent = f.getParent();
|
||||
if (parent != null && !parent.isRoot()) {
|
||||
createFakeDirectoryIfNecessary(parent);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a fake parent directory key if it does not already exist and no
|
||||
* other child of this parent directory exists.
|
||||
*
|
||||
* @param f path to the fake parent directory
|
||||
* @throws IOException
|
||||
*/
|
||||
private void createFakeDirectoryIfNecessary(Path f) throws IOException {
|
||||
String key = pathToKey(f);
|
||||
if (!key.isEmpty() && !o3Exists(f)) {
|
||||
LOG.debug("Creating new fake directory at {}", f);
|
||||
String dirKey = addTrailingSlashIfNeeded(key);
|
||||
adapter.createDirectory(dirKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a file or directory exists corresponding to given path.
|
||||
*
|
||||
* @param f path to file/directory.
|
||||
* @return true if it exists, false otherwise.
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean o3Exists(final Path f) throws IOException {
|
||||
Path path = makeQualified(f);
|
||||
try {
|
||||
getFileStatus(path);
|
||||
return true;
|
||||
} catch (FileNotFoundException ex) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private class ListStatusIterator extends OzoneListingIterator {
|
||||
// _fileStatuses_ maintains a list of file(s) which is either the input
|
||||
// path itself or a child of the input directory path.
|
||||
private List<FileStatus> fileStatuses = new ArrayList<>(LISTING_PAGE_SIZE);
|
||||
// _subDirStatuses_ maintains a list of sub-dirs of the input directory
|
||||
// path.
|
||||
private Map<Path, FileStatus> subDirStatuses =
|
||||
new HashMap<>(LISTING_PAGE_SIZE);
|
||||
private Path f; // the input path
|
||||
|
||||
ListStatusIterator(Path f) throws IOException {
|
||||
super(f);
|
||||
this.f = f;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the key to the listStatus result if the key corresponds to the
|
||||
* input path or is an immediate child of the input path.
|
||||
*
|
||||
* @param key key to be processed
|
||||
* @return always returns true
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
boolean processKey(String key) throws IOException {
|
||||
Path keyPath = new Path(OZONE_URI_DELIMITER + key);
|
||||
if (key.equals(getPathKey())) {
|
||||
if (pathIsDirectory()) {
|
||||
// if input path is a directory, we add the sub-directories and
|
||||
// files under this directory.
|
||||
return true;
|
||||
} else {
|
||||
addFileStatus(keyPath);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// Left with only subkeys now
|
||||
// We add only the immediate child files and sub-dirs i.e. we go only
|
||||
// upto one level down the directory tree structure.
|
||||
if (pathToKey(keyPath.getParent()).equals(pathToKey(f))) {
|
||||
// This key is an immediate child. Can be file or directory
|
||||
if (key.endsWith(OZONE_URI_DELIMITER)) {
|
||||
// Key is a directory
|
||||
addSubDirStatus(keyPath);
|
||||
} else {
|
||||
addFileStatus(keyPath);
|
||||
}
|
||||
} else {
|
||||
// This key is not the immediate child of the input directory. So we
|
||||
// traverse the parent tree structure of this key until we get the
|
||||
// immediate child of the input directory.
|
||||
Path immediateChildPath = getImmediateChildPath(keyPath.getParent());
|
||||
if (immediateChildPath != null) {
|
||||
addSubDirStatus(immediateChildPath);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the FileStatus of keyPath to final result of listStatus.
|
||||
*
|
||||
* @param filePath path to the file
|
||||
* @throws FileNotFoundException
|
||||
*/
|
||||
void addFileStatus(Path filePath) throws IOException {
|
||||
fileStatuses.add(getFileStatus(filePath));
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the FileStatus of the subdir to final result of listStatus, if not
|
||||
* already included.
|
||||
*
|
||||
* @param dirPath path to the dir
|
||||
* @throws FileNotFoundException
|
||||
*/
|
||||
void addSubDirStatus(Path dirPath) throws FileNotFoundException {
|
||||
// Check if subdir path is already included in statuses.
|
||||
if (!subDirStatuses.containsKey(dirPath)) {
|
||||
subDirStatuses.put(dirPath, innerGetFileStatusForDir(dirPath));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Traverse the parent directory structure of keyPath to determine the
|
||||
* which parent/ grand-parent/.. is the immediate child of the input path f.
|
||||
*
|
||||
* @param keyPath path whose parent directory structure should be traversed.
|
||||
* @return immediate child path of the input path f.
|
||||
*/
|
||||
Path getImmediateChildPath(Path keyPath) {
|
||||
Path path = keyPath;
|
||||
Path parent = path.getParent();
|
||||
while (parent != null) {
|
||||
if (pathToKey(parent).equals(pathToKey(f))) {
|
||||
return path;
|
||||
}
|
||||
path = parent;
|
||||
parent = path.getParent();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the result of listStatus operation. If the input path is a
|
||||
* file, return the status for only that file. If the input path is a
|
||||
* directory, return the statuses for all the child files and sub-dirs.
|
||||
*/
|
||||
FileStatus[] getStatuses() {
|
||||
List<FileStatus> result = Stream.concat(
|
||||
fileStatuses.stream(), subDirStatuses.values().stream())
|
||||
.collect(Collectors.toList());
|
||||
return result.toArray(new FileStatus[result.size()]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus[] listStatus(Path f) throws IOException {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics.incrementCounter(Statistic.INVOCATION_LIST_STATUS, 1);
|
||||
}
|
||||
statistics.incrementReadOps(1);
|
||||
LOG.trace("listStatus() path:{}", f);
|
||||
ListStatusIterator iterator = new ListStatusIterator(f);
|
||||
iterator.iterate();
|
||||
return iterator.getStatuses();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWorkingDirectory(Path newDir) {
|
||||
workingDir = newDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getWorkingDirectory() {
|
||||
return workingDir;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||
return adapter.getDelegationToken(renewer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a canonical service name for this file system. If the URI is logical,
|
||||
* the hostname part of the URI will be returned.
|
||||
* @return a service string that uniquely identifies this file system.
|
||||
*/
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
return adapter.getCanonicalServiceName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the username of the FS.
|
||||
*
|
||||
* @return the short name of the user who instantiated the FS
|
||||
*/
|
||||
public String getUsername() {
|
||||
return userName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the path is valid and then create directories.
|
||||
* Directory is represented using a key with no value.
|
||||
* All the non-existent parent directories are also created.
|
||||
*
|
||||
* @param path directory path to be created
|
||||
* @return true if directory exists or created successfully.
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean mkdir(Path path) throws IOException {
|
||||
Path fPart = path;
|
||||
Path prevfPart = null;
|
||||
do {
|
||||
LOG.trace("validating path:{}", fPart);
|
||||
try {
|
||||
FileStatus fileStatus = getFileStatus(fPart);
|
||||
if (fileStatus.isDirectory()) {
|
||||
// If path exists and a directory, exit
|
||||
break;
|
||||
} else {
|
||||
// Found a file here, rollback and delete newly created directories
|
||||
LOG.trace("Found a file with same name as directory, path:{}", fPart);
|
||||
if (prevfPart != null) {
|
||||
delete(prevfPart, true);
|
||||
}
|
||||
throw new FileAlreadyExistsException(String.format(
|
||||
"Can't make directory for path '%s', it is a file.", fPart));
|
||||
}
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
LOG.trace("creating directory for fpart:{}", fPart);
|
||||
String key = pathToKey(fPart);
|
||||
String dirKey = addTrailingSlashIfNeeded(key);
|
||||
if (!adapter.createDirectory(dirKey)) {
|
||||
// Directory creation failed here,
|
||||
// rollback and delete newly created directories
|
||||
LOG.trace("Directory creation failed, path:{}", fPart);
|
||||
if (prevfPart != null) {
|
||||
delete(prevfPart, true);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
prevfPart = fPart;
|
||||
fPart = fPart.getParent();
|
||||
} while (fPart != null);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||
LOG.trace("mkdir() path:{} ", f);
|
||||
String key = pathToKey(f);
|
||||
if (StringUtils.isEmpty(key)) {
|
||||
return false;
|
||||
}
|
||||
return mkdir(f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus getFileStatus(Path f) throws IOException {
|
||||
if (storageStatistics != null) {
|
||||
storageStatistics
|
||||
.incrementCounter(Statistic.INVOCATION_GET_FILE_STATUS, 1);
|
||||
}
|
||||
statistics.incrementReadOps(1);
|
||||
LOG.trace("getFileStatus() path:{}", f);
|
||||
Path qualifiedPath = f.makeQualified(uri, workingDir);
|
||||
String key = pathToKey(qualifiedPath);
|
||||
|
||||
if (key.length() == 0) {
|
||||
return new FileStatus(0, true, 1, 0,
|
||||
adapter.getCreationTime(), qualifiedPath);
|
||||
}
|
||||
|
||||
// Check if the key exists
|
||||
BasicKeyInfo ozoneKey = adapter.getKeyInfo(key);
|
||||
if (ozoneKey != null) {
|
||||
LOG.debug("Found exact file for path {}: normal file", f);
|
||||
return new FileStatus(ozoneKey.getDataSize(), false, 1,
|
||||
getDefaultBlockSize(f), ozoneKey.getModificationTime(), 0,
|
||||
FsPermission.getFileDefault(), getUsername(), getUsername(),
|
||||
qualifiedPath);
|
||||
}
|
||||
|
||||
return innerGetFileStatusForDir(f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the FileStatus for input directory path.
|
||||
* They key corresponding to input path is appended with a trailing slash
|
||||
* to return only the corresponding directory key in the bucket.
|
||||
*
|
||||
* @param f directory path
|
||||
* @return FileStatus for the input directory path
|
||||
* @throws FileNotFoundException
|
||||
*/
|
||||
public FileStatus innerGetFileStatusForDir(Path f)
|
||||
throws FileNotFoundException {
|
||||
Path qualifiedPath = f.makeQualified(uri, workingDir);
|
||||
String key = pathToKey(qualifiedPath);
|
||||
key = addTrailingSlashIfNeeded(key);
|
||||
|
||||
BasicKeyInfo ozoneKey = adapter.getKeyInfo(key);
|
||||
if (ozoneKey != null) {
|
||||
if (adapter.isDirectory(ozoneKey)) {
|
||||
// Key is a directory
|
||||
LOG.debug("Found file (with /) for path {}: fake directory", f);
|
||||
} else {
|
||||
// Key is a file with trailing slash
|
||||
LOG.warn("Found file (with /) for path {}: real file? should not " +
|
||||
"happen", f, key);
|
||||
}
|
||||
return new FileStatus(0, true, 1, 0,
|
||||
ozoneKey.getModificationTime(), 0,
|
||||
FsPermission.getDirDefault(), getUsername(), getUsername(),
|
||||
qualifiedPath);
|
||||
}
|
||||
|
||||
// File or directory corresponding to input path does not exist.
|
||||
// Check if there exists a key prefixed with this key.
|
||||
boolean hasChildren = adapter.hasNextKey(key);
|
||||
if (hasChildren) {
|
||||
return new FileStatus(0, true, 1, 0, 0, 0, FsPermission.getDirDefault(),
|
||||
getUsername(), getUsername(), qualifiedPath);
|
||||
}
|
||||
|
||||
throw new FileNotFoundException(f + ": No such file or directory!");
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn a path (relative or otherwise) into an Ozone key.
|
||||
*
|
||||
* @param path the path of the file.
|
||||
* @return the key of the object that represents the file.
|
||||
*/
|
||||
public String pathToKey(Path path) {
|
||||
Objects.requireNonNull(path, "Path canf not be null!");
|
||||
if (!path.isAbsolute()) {
|
||||
path = new Path(workingDir, path);
|
||||
}
|
||||
// removing leading '/' char
|
||||
String key = path.toUri().getPath().substring(1);
|
||||
LOG.trace("path for key:{} is:{}", key, path);
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add trailing delimiter to path if it is already not present.
|
||||
*
|
||||
* @param key the ozone Key which needs to be appended
|
||||
* @return delimiter appended key
|
||||
*/
|
||||
private String addTrailingSlashIfNeeded(String key) {
|
||||
if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
|
||||
return key + OZONE_URI_DELIMITER;
|
||||
} else {
|
||||
return key;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OzoneFileSystem{URI=" + uri + ", "
|
||||
+ "workingDir=" + workingDir + ", "
|
||||
+ "userName=" + userName + ", "
|
||||
+ "statistics=" + statistics
|
||||
+ "}";
|
||||
}
|
||||
|
||||
/**
|
||||
* This class provides an interface to iterate through all the keys in the
|
||||
* bucket prefixed with the input path key and process them.
|
||||
* <p>
|
||||
* Each implementing class should define how the keys should be processed
|
||||
* through the processKey() function.
|
||||
*/
|
||||
private abstract class OzoneListingIterator {
|
||||
private final Path path;
|
||||
private final FileStatus status;
|
||||
private String pathKey;
|
||||
private Iterator<BasicKeyInfo> keyIterator;
|
||||
|
||||
OzoneListingIterator(Path path)
|
||||
throws IOException {
|
||||
this.path = path;
|
||||
this.status = getFileStatus(path);
|
||||
this.pathKey = pathToKey(path);
|
||||
if (status.isDirectory()) {
|
||||
this.pathKey = addTrailingSlashIfNeeded(pathKey);
|
||||
}
|
||||
keyIterator = adapter.listKeys(pathKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* The output of processKey determines if further iteration through the
|
||||
* keys should be done or not.
|
||||
*
|
||||
* @return true if we should continue iteration of keys, false otherwise.
|
||||
* @throws IOException
|
||||
*/
|
||||
abstract boolean processKey(String key) throws IOException;
|
||||
|
||||
/**
|
||||
* Iterates thorugh all the keys prefixed with the input path's key and
|
||||
* processes the key though processKey().
|
||||
* If for any key, the processKey() returns false, then the iteration is
|
||||
* stopped and returned with false indicating that all the keys could not
|
||||
* be processed successfully.
|
||||
*
|
||||
* @return true if all keys are processed successfully, false otherwise.
|
||||
* @throws IOException
|
||||
*/
|
||||
boolean iterate() throws IOException {
|
||||
LOG.trace("Iterating path {}", path);
|
||||
if (status.isDirectory()) {
|
||||
LOG.trace("Iterating directory:{}", pathKey);
|
||||
while (keyIterator.hasNext()) {
|
||||
BasicKeyInfo key = keyIterator.next();
|
||||
LOG.trace("iterating key:{}", key.getName());
|
||||
if (!processKey(key.getName())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
LOG.trace("iterating file:{}", path);
|
||||
return processKey(pathKey);
|
||||
}
|
||||
}
|
||||
|
||||
String getPathKey() {
|
||||
return pathKey;
|
||||
}
|
||||
|
||||
boolean pathIsDirectory() {
|
||||
return status.isDirectory();
|
||||
}
|
||||
|
||||
FileStatus getStatus() {
|
||||
return status;
|
||||
return new OzoneClientAdapterImpl(omHost,
|
||||
Integer.parseInt(omPort), conf,
|
||||
volumeStr, bucketStr, storageStatistics);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -139,7 +139,7 @@ public class TestOzoneFileInterfaces {
|
|||
fs = FileSystem.get(new URI(rootPath + "/test.txt"), conf);
|
||||
}
|
||||
o3fs = (OzoneFileSystem) fs;
|
||||
statistics = o3fs.getOzoneFSOpsCountStatistics();
|
||||
statistics = (OzoneFSStorageStatistics) o3fs.getOzoneFSOpsCountStatistics();
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
Loading…
Reference in New Issue