HDFS-12572. Ozone: OzoneFileSystem: delete/list status/rename/mkdir APIs. Contributed by Mukul Kumar Singh.
This commit is contained in:
parent
ae2b77a0e5
commit
9445a9267f
|
@ -306,8 +306,10 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
|
|||
rangeResult = store.getSequentialRangeKVs(
|
||||
getBucketKey(volumeName, startBucket),
|
||||
maxNumOfBuckets + 1, filter);
|
||||
//Remove start key from result.
|
||||
rangeResult.remove(0);
|
||||
if (!rangeResult.isEmpty()) {
|
||||
//Remove start key from result.
|
||||
rangeResult.remove(0);
|
||||
}
|
||||
} else {
|
||||
rangeResult = store.getSequentialRangeKVs(null, maxNumOfBuckets, filter);
|
||||
}
|
||||
|
@ -350,8 +352,10 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
|
|||
rangeResult = store.getSequentialRangeKVs(
|
||||
getDBKeyBytes(volumeName, bucketName, startKey),
|
||||
maxKeys + 1, filter);
|
||||
//Remove start key from result.
|
||||
rangeResult.remove(0);
|
||||
if (!rangeResult.isEmpty()) {
|
||||
//Remove start key from result.
|
||||
rangeResult.remove(0);
|
||||
}
|
||||
} else {
|
||||
rangeResult = store.getSequentialRangeKVs(null, maxKeys, filter);
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.http.entity.ContentType;
|
|||
import org.apache.http.entity.FileEntity;
|
||||
import org.apache.http.entity.InputStreamEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -53,6 +54,7 @@ import java.io.FileOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.LinkedList;
|
||||
|
@ -506,6 +508,35 @@ public class OzoneBucket {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List keys in a bucket with the provided prefix, with paging results.
|
||||
*
|
||||
* @param prefix The prefix of the object keys
|
||||
* @param maxResult max size per response
|
||||
* @param prevKey the previous key for paging
|
||||
*/
|
||||
public List<OzoneKey> listKeys(String prefix, int maxResult, String prevKey)
|
||||
throws OzoneException {
|
||||
HttpGet getRequest = null;
|
||||
try {
|
||||
final URI uri = new URIBuilder(volume.getClient().getEndPointURI())
|
||||
.setPath(OzoneConsts.KSM_KEY_PREFIX + getVolume().getVolumeName() +
|
||||
OzoneConsts.KSM_KEY_PREFIX + getBucketName())
|
||||
.setParameter(Header.OZONE_LIST_QUERY_PREFIX, prefix)
|
||||
.setParameter(Header.OZONE_LIST_QUERY_MAXKEYS,
|
||||
String.valueOf(maxResult))
|
||||
.setParameter(Header.OZONE_LIST_QUERY_PREVKEY, prevKey)
|
||||
.build();
|
||||
final OzoneRestClient client = getVolume().getClient();
|
||||
getRequest = client.getHttpGet(uri.toString());
|
||||
return executeListKeys(getRequest, HttpClientBuilder.create().build());
|
||||
} catch (IOException | URISyntaxException e) {
|
||||
throw new OzoneRestClientException(e.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(getRequest);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute list Key.
|
||||
*
|
||||
|
|
|
@ -36,7 +36,7 @@ public class OzoneExceptionMapper implements ExceptionMapper<OzoneException> {
|
|||
|
||||
@Override
|
||||
public Response toResponse(OzoneException exception) {
|
||||
LOG.info("Returning exception. ex: {}", exception.toJsonString());
|
||||
LOG.debug("Returning exception. ex: {}", exception.toJsonString());
|
||||
MDC.clear();
|
||||
return Response.status((int)exception.getHttpCode())
|
||||
.entity(exception.toJsonString()).build();
|
||||
|
|
|
@ -94,7 +94,7 @@ public abstract class BucketProcessTemplate {
|
|||
BucketArgs args = new BucketArgs(volume, bucket, userArgs);
|
||||
MDC.put(OZONE_RESOURCE, args.getResourceName());
|
||||
Response response = doProcess(args);
|
||||
LOG.info("Success");
|
||||
LOG.debug("Success");
|
||||
MDC.clear();
|
||||
return response;
|
||||
|
||||
|
|
|
@ -89,7 +89,7 @@ public abstract class KeyProcessTemplate {
|
|||
KeyArgs args = new KeyArgs(volume, bucket, key, userArgs);
|
||||
MDC.put(OZONE_RESOURCE, args.getResourceName());
|
||||
Response response = doProcess(args, is, request, headers, info);
|
||||
LOG.info("Success");
|
||||
LOG.debug("Success");
|
||||
MDC.clear();
|
||||
return response;
|
||||
|
||||
|
|
|
@ -298,8 +298,9 @@ public class LevelDBStore implements MetadataStore {
|
|||
* @param startKey a start key.
|
||||
* @param count max number of entries to return.
|
||||
* @param filters customized one or more {@link MetadataKeyFilter}.
|
||||
* @return a list of entries found in the database.
|
||||
* @throws IOException if an invalid startKey is given or other I/O errors.
|
||||
* @return a list of entries found in the database or an empty list if the
|
||||
* startKey is invalid.
|
||||
* @throws IOException if there are I/O errors.
|
||||
* @throws IllegalArgumentException if count is less than 0.
|
||||
*/
|
||||
private List<Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
||||
|
@ -321,7 +322,8 @@ public class LevelDBStore implements MetadataStore {
|
|||
dbIter.seekToFirst();
|
||||
} else {
|
||||
if (db.get(startKey) == null) {
|
||||
throw new IOException("Invalid start key, not found in current db.");
|
||||
// Key not found, return empty list
|
||||
return result;
|
||||
}
|
||||
dbIter.seek(startKey);
|
||||
}
|
||||
|
|
|
@ -90,8 +90,9 @@ public interface MetadataStore extends Closeable{
|
|||
* @param startKey a start key.
|
||||
* @param count max number of entries to return.
|
||||
* @param filters customized one or more {@link MetadataKeyFilter}.
|
||||
* @return a list of entries found in the database.
|
||||
* @throws IOException if an invalid startKey is given or other I/O errors.
|
||||
* @return a list of entries found in the database or an empty list if the
|
||||
* startKey is invalid.
|
||||
* @throws IOException if there are I/O errors.
|
||||
* @throws IllegalArgumentException if count is less than 0.
|
||||
*/
|
||||
List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
|
||||
|
|
|
@ -160,7 +160,8 @@ public class RocksDBStore implements MetadataStore {
|
|||
it.seekToFirst();
|
||||
} else {
|
||||
if(get(startKey) == null) {
|
||||
throw new IOException("Invalid start key, not found in current db");
|
||||
// Key not found, return empty list
|
||||
return result;
|
||||
}
|
||||
it.seek(startKey);
|
||||
}
|
||||
|
|
|
@ -334,10 +334,10 @@ public class TestMetadataStore {
|
|||
|
||||
@Test
|
||||
public void testInvalidStartKey() throws IOException {
|
||||
// If startKey is invalid, throws an invalid key exception.
|
||||
expectedException.expect(IOException.class);
|
||||
expectedException.expectMessage("Invalid start key");
|
||||
store.getRangeKVs(getBytes("unknownKey"), MAX_GETRANGE_LENGTH);
|
||||
// If startKey is invalid, the returned list should be empty.
|
||||
List<Map.Entry<byte[], byte[]>> kvs =
|
||||
store.getRangeKVs(getBytes("unknownKey"), MAX_GETRANGE_LENGTH);
|
||||
Assert.assertEquals(kvs.size(), 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -699,13 +699,8 @@ public class TestKeySpaceManager {
|
|||
|
||||
// Provide an invalid bucket name as start key.
|
||||
listBucketArgs = new ListArgs(volArgs, null, 100, "unknown_bucket_name");
|
||||
try {
|
||||
storageHandler.listBuckets(listBucketArgs);
|
||||
Assert.fail("Expecting an error when the given bucket name is invalid.");
|
||||
} catch (Exception e) {
|
||||
Assert.assertTrue(e instanceof IOException);
|
||||
Assert.assertTrue(e.getMessage().contains(Status.INTERNAL_ERROR.name()));
|
||||
}
|
||||
ListBuckets buckets = storageHandler.listBuckets(listBucketArgs);
|
||||
Assert.assertEquals(buckets.getBuckets().size(), 0);
|
||||
|
||||
// Use all arguments.
|
||||
listBucketArgs = new ListArgs(volArgs, "b", 5, "bBucket_7");
|
||||
|
@ -824,14 +819,8 @@ public class TestKeySpaceManager {
|
|||
|
||||
// Provide an invalid key name as start key.
|
||||
listKeyArgs = new ListArgs(bucketArgs, null, 100, "invalid_start_key");
|
||||
try {
|
||||
storageHandler.listKeys(listKeyArgs);
|
||||
Assert.fail("Expecting an error when the given start"
|
||||
+ " key name is invalid.");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
Status.INTERNAL_ERROR.name(), e);
|
||||
}
|
||||
ListKeys keys = storageHandler.listKeys(listKeyArgs);
|
||||
Assert.assertEquals(keys.getKeyList().size(), 0);
|
||||
|
||||
// Provide an invalid maxKeys argument.
|
||||
try {
|
||||
|
|
|
@ -23,7 +23,7 @@ package org.apache.hadoop.fs.ozone;
|
|||
*/
|
||||
public class Constants {
|
||||
|
||||
public static final String OZONE_URI_SCHEME = "ozfs";
|
||||
public static final String OZONE_URI_SCHEME = "o3";
|
||||
|
||||
public static final String OZONE_DEFAULT_USER = "hdfs";
|
||||
|
||||
|
@ -39,6 +39,9 @@ public class Constants {
|
|||
|
||||
public static final String OZONE_URI_DELIMITER = "/";
|
||||
|
||||
/** Page size for Ozone listing operation. */
|
||||
public static final int LISTING_PAGE_SIZE = 1024;
|
||||
|
||||
private Constants() {
|
||||
|
||||
}
|
||||
|
|
|
@ -18,14 +18,26 @@
|
|||
|
||||
package org.apache.hadoop.fs.ozone;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
|
||||
import org.apache.hadoop.ozone.web.client.OzoneKey;
|
||||
import org.apache.hadoop.ozone.web.client.OzoneRestClient;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
|
@ -33,16 +45,10 @@ import org.apache.http.client.utils.URIBuilder;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.ozone.web.client.OzoneBucket;
|
||||
import org.apache.hadoop.ozone.web.client.OzoneVolume;
|
||||
|
@ -55,6 +61,8 @@ import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_SCHEME;
|
|||
import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
|
||||
import static org.apache.hadoop.fs.ozone.Constants.OZONE_HTTP_SCHEME;
|
||||
import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_DELIMITER;
|
||||
import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
|
||||
import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
|
||||
|
||||
/**
|
||||
* The Ozone Filesystem implementation.
|
||||
|
@ -181,10 +189,19 @@ public class OzoneFileSystem extends FileSystem {
|
|||
// path references a file and overwrite is disabled
|
||||
throw new FileAlreadyExistsException(f + " already exists");
|
||||
}
|
||||
LOG.debug("Overwriting file {}", f);
|
||||
//TODO: Delete the existing file here
|
||||
LOG.trace("Overwriting file {}", f);
|
||||
deleteObject(key);
|
||||
}
|
||||
} catch (FileNotFoundException ignored) {
|
||||
// check if the parent directory needs to be created
|
||||
Path parent = f.getParent();
|
||||
try {
|
||||
// create all the directories for the parent
|
||||
FileStatus parentStatus = getFileStatus(parent);
|
||||
LOG.trace("parent key:{} status:{}", key, parentStatus);
|
||||
} catch (FileNotFoundException e) {
|
||||
mkdirs(parent);
|
||||
}
|
||||
// This exception needs to ignored as this means that the file currently
|
||||
// does not exists and a new file can thus be created.
|
||||
}
|
||||
|
@ -222,19 +239,221 @@ public class OzoneFileSystem extends FileSystem {
|
|||
+ getClass().getSimpleName() + " FileSystem implementation");
|
||||
}
|
||||
|
||||
private class RenameIterator extends OzoneListingIterator {
|
||||
private final String srcKey;
|
||||
private final String dstKey;
|
||||
|
||||
RenameIterator(Path srcPath, Path dstPath)
|
||||
throws IOException {
|
||||
super(srcPath, true);
|
||||
srcKey = pathToKey(srcPath);
|
||||
dstKey = pathToKey(dstPath);
|
||||
LOG.trace("rename from:{} to:{}", srcKey, dstKey);
|
||||
}
|
||||
|
||||
boolean processKey(String key) throws IOException {
|
||||
String newKeyName = dstKey.concat(key.substring(srcKey.length()));
|
||||
return rename(key, newKeyName);
|
||||
}
|
||||
|
||||
// TODO: currently rename work by copying the file, with changes in KSM,
|
||||
// this operation can be made improved by renaming the keys in KSM directly.
|
||||
private boolean rename(String src, String dst) throws IOException {
|
||||
final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
|
||||
final File tmpFile = dirAlloc.createTmpFileForWrite("output-",
|
||||
LocalDirAllocator.SIZE_UNKNOWN, getConf());
|
||||
|
||||
try {
|
||||
LOG.trace("rename by copying file from:{} to:{}", src, dst);
|
||||
bucket.getKey(src, tmpFile.toPath());
|
||||
bucket.putKey(dst, tmpFile);
|
||||
return true;
|
||||
} catch (OzoneException oe) {
|
||||
String msg = String.format("Error when renaming key from:%s to:%s",
|
||||
src, dst);
|
||||
LOG.error(msg, oe);
|
||||
throw new IOException(msg, oe);
|
||||
} finally {
|
||||
if (!tmpFile.delete()) {
|
||||
LOG.warn("Can not delete tmpFile: " + tmpFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the source and destination path are valid and then perform
|
||||
* rename by copying the data from source path to destination path.
|
||||
*
|
||||
* The rename operation is performed by copying data from source key
|
||||
* to destination key. This is done by reading the source key data into a
|
||||
* temporary file and then writing this temporary file to destination key.
|
||||
* The temporary file is deleted after the rename operation.
|
||||
* TODO: Optimize the operation by renaming keys in KSM.
|
||||
*
|
||||
* @param src source path for rename
|
||||
* @param dst destination path for rename
|
||||
* @return true if rename operation succeeded or
|
||||
* if the src and dst have the same path and are of the same type
|
||||
* @throws IOException on I/O errors or if the src/dst paths are invalid.
|
||||
*/
|
||||
@Override
|
||||
public boolean rename(Path src, Path dst) throws IOException {
|
||||
return false;
|
||||
LOG.trace("rename() from:{} to:{}", src, dst);
|
||||
|
||||
if (src.isRoot()) {
|
||||
// Cannot rename root of file system
|
||||
LOG.trace("Cannot rename the root of a filesystem");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Cannot rename a directory to its own subdirectory
|
||||
Path parent = dst.getParent();
|
||||
while (parent != null && !src.equals(parent)) {
|
||||
parent = parent.getParent();
|
||||
}
|
||||
if (parent != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if the source exists
|
||||
FileStatus srcStatus;
|
||||
try {
|
||||
srcStatus = getFileStatus(src);
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
// source doesn't exist, return
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if the destination exists
|
||||
FileStatus dstStatus;
|
||||
try {
|
||||
dstStatus = getFileStatus(dst);
|
||||
} catch (FileNotFoundException fnde) {
|
||||
dstStatus = null;
|
||||
}
|
||||
|
||||
if (dstStatus == null) {
|
||||
// If dst doesn't exist, check whether dst parent dir exists or not
|
||||
// if the parent exists, the source can still be renamed to dst path
|
||||
dstStatus = getFileStatus(dst.getParent());
|
||||
if (!dstStatus.isDirectory()) {
|
||||
throw new IOException(String.format(
|
||||
"Failed to rename %s to %s, %s is a file", src, dst,
|
||||
dst.getParent()));
|
||||
}
|
||||
} else {
|
||||
// if dst exists and source and destination are same,
|
||||
// check both the src and dst are of same type
|
||||
if (srcStatus.getPath().equals(dstStatus.getPath())) {
|
||||
return !srcStatus.isDirectory();
|
||||
} else if (dstStatus.isDirectory()) {
|
||||
// If dst is a directory, rename source as subpath of it.
|
||||
// for example rename /source to /dst will lead to /dst/source
|
||||
dst = new Path(dst, src.getName());
|
||||
FileStatus[] statuses;
|
||||
try {
|
||||
statuses = listStatus(dst);
|
||||
} catch (FileNotFoundException fnde) {
|
||||
statuses = null;
|
||||
}
|
||||
|
||||
if (statuses != null && statuses.length > 0) {
|
||||
// If dst exists and not a directory not empty
|
||||
throw new FileAlreadyExistsException(String.format(
|
||||
"Failed to rename %s to %s, file already exists or not empty!",
|
||||
src, dst));
|
||||
}
|
||||
} else {
|
||||
// If dst is not a directory
|
||||
throw new FileAlreadyExistsException(String.format(
|
||||
"Failed to rename %s to %s, file already exists!", src, dst));
|
||||
}
|
||||
}
|
||||
|
||||
if (srcStatus.isDirectory()) {
|
||||
if (dst.toString().startsWith(src.toString())) {
|
||||
LOG.trace("Cannot rename a directory to a subdirectory of self");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
RenameIterator iterator = new RenameIterator(src, dst);
|
||||
iterator.iterate();
|
||||
return src.equals(dst) || delete(src, true);
|
||||
}
|
||||
|
||||
private class DeleteIterator extends OzoneListingIterator {
|
||||
private boolean recursive;
|
||||
DeleteIterator(Path f, boolean recursive)
|
||||
throws IOException {
|
||||
super(f, recursive);
|
||||
this.recursive = recursive;
|
||||
}
|
||||
|
||||
boolean processKey(String key) throws IOException {
|
||||
if (key.equals("")) {
|
||||
LOG.trace("Skipping deleting root directory");
|
||||
return true;
|
||||
} else {
|
||||
LOG.trace("deleting key:" + key);
|
||||
boolean succeed = deleteObject(key);
|
||||
// if recursive delete is requested ignore the return value of
|
||||
// deleteObject and issue deletes for other keys.
|
||||
return recursive || succeed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
return false;
|
||||
LOG.trace("delete() path:{} recursive:{}", f, recursive);
|
||||
try {
|
||||
DeleteIterator iterator = new DeleteIterator(f, recursive);
|
||||
return iterator.iterate();
|
||||
} catch (FileNotFoundException e) {
|
||||
LOG.error("Couldn't delete {} - does not exist", f);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private class ListStatusIterator extends OzoneListingIterator {
|
||||
private List<FileStatus> statuses = new ArrayList<>(LISTING_PAGE_SIZE);
|
||||
private Path f;
|
||||
|
||||
ListStatusIterator(Path f) throws IOException {
|
||||
super(f, true);
|
||||
this.f = f;
|
||||
}
|
||||
|
||||
boolean processKey(String key) throws IOException {
|
||||
Path keyPath = new Path(OZONE_URI_DELIMITER + key);
|
||||
if (key.equals(getPathKey())) {
|
||||
if (pathIsDirectory()) {
|
||||
return true;
|
||||
} else {
|
||||
statuses.add(getFileStatus(keyPath));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// left with only subkeys now
|
||||
if (keyPath.getParent().getName().equals(f.getName())) {
|
||||
// skip keys which are for subdirectories of the directory
|
||||
statuses.add(getFileStatus(keyPath));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
FileStatus[] getStatuses() {
|
||||
return statuses.toArray(new FileStatus[statuses.size()]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus[] listStatus(Path f) throws IOException {
|
||||
return null;
|
||||
LOG.trace("listStatus() path:{}", f);
|
||||
ListStatusIterator iterator = new ListStatusIterator(f);
|
||||
iterator.iterate();
|
||||
return iterator.getStatuses();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -247,38 +466,67 @@ public class OzoneFileSystem extends FileSystem {
|
|||
return workingDir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the path is valid and then create directories.
|
||||
* Directory is represented using a key with no value.
|
||||
* All the non-existent parent directories are also created.
|
||||
*
|
||||
* @param path directory path to be created
|
||||
* @return true if directory exists or created successfully.
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean mkdir(Path path) throws IOException {
|
||||
Path fPart = path;
|
||||
Path prevfPart = null;
|
||||
do {
|
||||
LOG.trace("validating path:{}", fPart);
|
||||
try {
|
||||
FileStatus fileStatus = getFileStatus(fPart);
|
||||
if (fileStatus.isDirectory()) {
|
||||
// If path exists and a directory, exit
|
||||
break;
|
||||
} else {
|
||||
// Found a file here, rollback and delete newly created directories
|
||||
LOG.trace("Found a file with same name as directory, path:{}", fPart);
|
||||
if (prevfPart != null) {
|
||||
delete(prevfPart, true);
|
||||
}
|
||||
throw new FileAlreadyExistsException(String.format(
|
||||
"Can't make directory for path '%s', it is a file.", fPart));
|
||||
}
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
LOG.trace("creating directory for fpart:{}", fPart);
|
||||
String key = pathToKey(fPart);
|
||||
String dirKey = addTrailingSlashIfNeeded(key);
|
||||
if (!createDirectory(dirKey)) {
|
||||
// Directory creation failed here,
|
||||
// rollback and delete newly created directories
|
||||
LOG.trace("Directory creation failed, path:{}", fPart);
|
||||
if (prevfPart != null) {
|
||||
delete(prevfPart, true);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
prevfPart = fPart;
|
||||
fPart = fPart.getParent();
|
||||
} while (fPart != null);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
private OzoneKey getKeyStatus(String keyName) {
|
||||
try {
|
||||
return bucket.getKeyInfo(keyName);
|
||||
} catch (OzoneException e) {
|
||||
LOG.trace("Key:{} does not exists", keyName);
|
||||
return null;
|
||||
LOG.trace("mkdir() path:{} ", f);
|
||||
String key = pathToKey(f);
|
||||
if (StringUtils.isEmpty(key)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private long getModifiedTime(String modifiedTime, String key) {
|
||||
try {
|
||||
return OzoneUtils.formatDate(modifiedTime);
|
||||
} catch (ParseException pe) {
|
||||
LOG.error("Invalid time:{} for key:{}", modifiedTime, key, pe);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isDirectory(OzoneKey key) {
|
||||
LOG.trace("key name:{} size:{}", key.getObjectInfo().getKeyName(),
|
||||
key.getObjectInfo().getSize());
|
||||
return key.getObjectInfo().getKeyName().endsWith(OZONE_URI_DELIMITER)
|
||||
&& (key.getObjectInfo().getSize() == 0);
|
||||
return mkdir(f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus getFileStatus(Path f) throws IOException {
|
||||
LOG.trace("getFileStatus() path:{}", f);
|
||||
Path qualifiedPath = f.makeQualified(uri, workingDir);
|
||||
String key = pathToKey(qualifiedPath);
|
||||
|
||||
|
@ -289,11 +537,10 @@ public class OzoneFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
// consider this a file and get key status
|
||||
OzoneKey meta = getKeyStatus(key);
|
||||
if (meta == null && !key.endsWith(OZONE_URI_DELIMITER)) {
|
||||
// if that fails consider this a directory
|
||||
key += OZONE_URI_DELIMITER;
|
||||
meta = getKeyStatus(key);
|
||||
OzoneKey meta = getKeyInfo(key);
|
||||
if (meta == null) {
|
||||
key = addTrailingSlashIfNeeded(key);
|
||||
meta = getKeyInfo(key);
|
||||
}
|
||||
|
||||
if (meta == null) {
|
||||
|
@ -304,6 +551,7 @@ public class OzoneFileSystem extends FileSystem {
|
|||
getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
|
||||
qualifiedPath);
|
||||
} else {
|
||||
//TODO: Fetch replication count from ratis config
|
||||
return new FileStatus(meta.getObjectInfo().getSize(), false, 1,
|
||||
getDefaultBlockSize(f),
|
||||
getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
|
||||
|
@ -311,13 +559,102 @@ public class OzoneFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to fetch the key metadata info.
|
||||
* @param key key whose metadata information needs to be fetched
|
||||
* @return metadata info of the key
|
||||
*/
|
||||
private OzoneKey getKeyInfo(String key) {
|
||||
try {
|
||||
return bucket.getKeyInfo(key);
|
||||
} catch (OzoneException e) {
|
||||
LOG.trace("Key:{} does not exists", key);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to get the modified time of the key.
|
||||
* @param key key to fetch the modified time
|
||||
* @return last modified time of the key
|
||||
*/
|
||||
private long getModifiedTime(String modifiedTime, String key) {
|
||||
try {
|
||||
return OzoneUtils.formatDate(modifiedTime);
|
||||
} catch (ParseException pe) {
|
||||
LOG.error("Invalid time:{} for key:{}", modifiedTime, key, pe);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to check if an Ozone key is representing a directory.
|
||||
* @param key key to be checked as a directory
|
||||
* @return true if key is a directory, false otherwise
|
||||
*/
|
||||
private boolean isDirectory(OzoneKey key) {
|
||||
LOG.trace("key name:{} size:{}", key.getObjectInfo().getKeyName(),
|
||||
key.getObjectInfo().getSize());
|
||||
return key.getObjectInfo().getKeyName().endsWith(OZONE_URI_DELIMITER)
|
||||
&& (key.getObjectInfo().getSize() == 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to list entries matching the key name in bucket.
|
||||
* @param dirKey key prefix for listing the keys
|
||||
* @param lastKey last iterated key
|
||||
* @return List of Keys
|
||||
*/
|
||||
List<OzoneKey> listKeys(String dirKey, String lastKey)
|
||||
throws IOException {
|
||||
LOG.trace("list keys dirKey:{} lastKey:{}", dirKey, lastKey);
|
||||
try {
|
||||
return bucket.listKeys(dirKey, LISTING_PAGE_SIZE, lastKey);
|
||||
} catch (OzoneException oe) {
|
||||
LOG.error("list keys failed dirKey:{} lastKey:{}", dirKey, lastKey, oe);
|
||||
throw new IOException("List keys failed " + oe.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create an directory specified by key name in bucket.
|
||||
* @param keyName key name to be created as directory
|
||||
* @return true if the key is created, false otherwise
|
||||
*/
|
||||
private boolean createDirectory(String keyName) {
|
||||
try {
|
||||
LOG.trace("creating dir for key:{}", keyName);
|
||||
bucket.putKey(keyName, "");
|
||||
return true;
|
||||
} catch (OzoneException oe) {
|
||||
LOG.error("create key failed for key:{}", keyName, oe);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to delete an object specified by key name in bucket.
|
||||
* @param keyName key name to be deleted
|
||||
* @return true if the key is deleted, false otherwise
|
||||
*/
|
||||
private boolean deleteObject(String keyName) {
|
||||
LOG.trace("issuing delete for key" + keyName);
|
||||
try {
|
||||
bucket.deleteKey(keyName);
|
||||
return true;
|
||||
} catch (OzoneException oe) {
|
||||
LOG.error("delete key failed " + oe.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Turn a path (relative or otherwise) into an Ozone key.
|
||||
*
|
||||
* @param path the path of the file.
|
||||
* @return the key of the object that represents the file.
|
||||
*/
|
||||
private String pathToKey(Path path) {
|
||||
public String pathToKey(Path path) {
|
||||
Objects.requireNonNull(path, "Path can not be null!");
|
||||
if (!path.isAbsolute()) {
|
||||
path = new Path(workingDir, path);
|
||||
|
@ -328,6 +665,20 @@ public class OzoneFileSystem extends FileSystem {
|
|||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add trailing delimiter to path if it is already not present.
|
||||
*
|
||||
* @param key the ozone Key which needs to be appended
|
||||
* @return delimiter appended key
|
||||
*/
|
||||
String addTrailingSlashIfNeeded(String key) {
|
||||
if (StringUtils.isNotEmpty(key) && !key.endsWith(OZONE_URI_DELIMITER)) {
|
||||
return key + OZONE_URI_DELIMITER;
|
||||
} else {
|
||||
return key;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "OzoneFileSystem{URI=" + uri + ", "
|
||||
|
@ -336,4 +687,62 @@ public class OzoneFileSystem extends FileSystem {
|
|||
+ "statistics=" + statistics
|
||||
+ "}";
|
||||
}
|
||||
|
||||
private abstract class OzoneListingIterator {
|
||||
private final Path path;
|
||||
private final boolean recursive;
|
||||
private final FileStatus status;
|
||||
private String pathKey;
|
||||
|
||||
OzoneListingIterator(Path path, boolean recursive)
|
||||
throws IOException {
|
||||
this.path = path;
|
||||
this.recursive = recursive;
|
||||
this.status = getFileStatus(path);
|
||||
this.pathKey = pathToKey(path);
|
||||
if (status.isDirectory()) {
|
||||
this.pathKey = addTrailingSlashIfNeeded(pathKey);
|
||||
}
|
||||
}
|
||||
|
||||
abstract boolean processKey(String key) throws IOException;
|
||||
|
||||
// iterates all the keys in the particular path
|
||||
boolean iterate() throws IOException {
|
||||
LOG.trace("Iterating path {} - recursive {}", path, recursive);
|
||||
if (status.isDirectory()) {
|
||||
LOG.trace("Iterating directory:{}", pathKey);
|
||||
String lastKey = pathKey;
|
||||
while (true) {
|
||||
List<OzoneKey> ozoneKeys = listKeys(pathKey, lastKey);
|
||||
LOG.trace("number of sub keys:{}", ozoneKeys.size());
|
||||
if (ozoneKeys.size() == 0) {
|
||||
return processKey(pathKey);
|
||||
} else {
|
||||
if (!recursive) {
|
||||
throw new PathIsNotEmptyDirectoryException(path.toString());
|
||||
} else {
|
||||
for (OzoneKey ozoneKey : ozoneKeys) {
|
||||
lastKey = ozoneKey.getObjectInfo().getKeyName();
|
||||
if (!processKey(lastKey)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.trace("iterating file:{}", path);
|
||||
return processKey(pathKey);
|
||||
}
|
||||
}
|
||||
|
||||
String getPathKey() {
|
||||
return pathKey;
|
||||
}
|
||||
|
||||
boolean pathIsDirectory() {
|
||||
return status.isDirectory();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,4 +123,19 @@ public class TestOzoneFileInterfaces {
|
|||
Assert.assertEquals(out, data);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDirectory() throws IOException {
|
||||
String dirPath = RandomStringUtils.randomAlphanumeric(5);
|
||||
Path path = new Path("/" + dirPath);
|
||||
Assert.assertTrue(fs.mkdirs(path));
|
||||
|
||||
FileStatus status = fs.getFileStatus(path);
|
||||
Assert.assertTrue(status.isDirectory());
|
||||
Assert.assertEquals(status.getLen(), 0);
|
||||
|
||||
FileStatus[] statusList = fs.listStatus(new Path("/"));
|
||||
Assert.assertEquals(statusList.length, 1);
|
||||
Assert.assertEquals(statusList[0], status);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue