HDFS-12425. Ozone: OzoneFileSystem: read/write/create/open/getFileInfo APIs. Contributed by Mukul Kumar Singh.

This commit is contained in:
Xiaoyu Yao 2017-09-26 19:26:40 -07:00 committed by Owen O'Malley
parent a1a3ba6529
commit 5e4a6b686c
6 changed files with 470 additions and 8 deletions

View File

@ -192,7 +192,8 @@ public KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException {
volumeName, bucketName, keyName); volumeName, bucketName, keyName);
byte[] value = metadataManager.get(keyKey); byte[] value = metadataManager.get(keyKey);
if (value == null) { if (value == null) {
LOG.debug("Key: {} not found", keyKey); LOG.debug("volume:{} bucket:{} Key:{} not found",
volumeName, bucketName, keyName);
throw new KSMException("Key not found", throw new KSMException("Key not found",
KSMException.ResultCodes.FAILED_KEY_NOT_FOUND); KSMException.ResultCodes.FAILED_KEY_NOT_FOUND);
} }

View File

@ -31,6 +31,14 @@ public class Constants {
public static final String OZONE_USER_DIR = "/user"; public static final String OZONE_USER_DIR = "/user";
/** Local buffer directory. */
public static final String BUFFER_DIR_KEY = "fs.ozone.buffer.dir";
/** Temporary directory. */
public static final String BUFFER_TMP_KEY = "hadoop.tmp.dir";
public static final String OZONE_URI_DELIMITER = "/";
private Constants() { private Constants() {
} }

View File

@ -18,13 +18,18 @@
package org.apache.hadoop.fs.ozone; package org.apache.hadoop.fs.ozone;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.text.ParseException;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Objects; import java.util.Objects;
import org.apache.hadoop.ozone.web.client.OzoneKey;
import org.apache.hadoop.ozone.web.client.OzoneRestClient; import org.apache.hadoop.ozone.web.client.OzoneRestClient;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -37,6 +42,7 @@
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ozone.web.client.OzoneBucket; import org.apache.hadoop.ozone.web.client.OzoneBucket;
import org.apache.hadoop.ozone.web.client.OzoneVolume; import org.apache.hadoop.ozone.web.client.OzoneVolume;
@ -48,6 +54,7 @@
import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_SCHEME; import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_SCHEME;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR; import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_HTTP_SCHEME; import static org.apache.hadoop.fs.ozone.Constants.OZONE_HTTP_SCHEME;
import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_DELIMITER;
/** /**
* The Ozone Filesystem implementation. * The Ozone Filesystem implementation.
@ -76,13 +83,11 @@ public void initialize(URI name, Configuration conf) throws IOException {
Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name); Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name);
assert getScheme().equals(name.getScheme()); assert getScheme().equals(name.getScheme());
uri = name;
Path path = new Path(name.getPath()); Path path = new Path(name.getPath());
String hostStr = name.getAuthority(); String hostStr = name.getAuthority();
String volumeStr = null; String volumeStr = null;
String bucketStr = null; String bucketStr = null;
LOG.info("Ozone URI for ozfs initialization is " + uri);
while (path != null && !path.isRoot()) { while (path != null && !path.isRoot()) {
bucketStr = volumeStr; bucketStr = volumeStr;
volumeStr = path.getName(); volumeStr = path.getName();
@ -98,6 +103,10 @@ public void initialize(URI name, Configuration conf) throws IOException {
} }
try { try {
uri = new URIBuilder().setScheme(OZONE_URI_SCHEME).setHost(hostStr)
.setPath(OZONE_URI_DELIMITER + volumeStr + OZONE_URI_DELIMITER
+ bucketStr + OZONE_URI_DELIMITER).build();
LOG.info("Ozone URI for ozfs initialization is " + uri);
this.ozone = new OzoneRestClient(OZONE_HTTP_SCHEME + hostStr); this.ozone = new OzoneRestClient(OZONE_HTTP_SCHEME + hostStr);
try { try {
this.userName = this.userName =
@ -143,7 +152,16 @@ public String getScheme() {
@Override @Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException { public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return null; LOG.trace("open() path:{}", f);
final FileStatus fileStatus = getFileStatus(f);
if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open directory " + f + " to read");
}
return new FSDataInputStream(
new OzoneInputStream(getConf(), uri, bucket, pathToKey(f),
fileStatus.getLen(), bufferSize, statistics));
} }
@Override @Override
@ -151,7 +169,31 @@ public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, boolean overwrite, int bufferSize,
short replication, long blockSize, short replication, long blockSize,
Progressable progress) throws IOException { Progressable progress) throws IOException {
return null; LOG.trace("create() path:{}", f);
final String key = pathToKey(f);
final FileStatus status;
try {
status = getFileStatus(f);
if (status.isDirectory()) {
throw new FileAlreadyExistsException(f + " is a directory");
} else {
if (!overwrite) {
// path references a file and overwrite is disabled
throw new FileAlreadyExistsException(f + " already exists");
}
LOG.debug("Overwriting file {}", f);
//TODO: Delete the existing file here
}
} catch (FileNotFoundException ignored) {
// This exception needs to ignored as this means that the file currently
// does not exists and a new file can thus be created.
}
final OzoneOutputStream stream =
new OzoneOutputStream(getConf(), uri, bucket, key, this.statistics);
// We pass null to FSDataOutputStream so it won't count writes that
// are being buffered to a file
return new FSDataOutputStream(stream, null);
} }
@Override @Override
@ -162,13 +204,22 @@ public FSDataOutputStream createNonRecursive(Path path,
short replication, short replication,
long blockSize, long blockSize,
Progressable progress) throws IOException { Progressable progress) throws IOException {
return null; final Path parent = path.getParent();
if (parent != null) {
// expect this to raise an exception if there is no parent
if (!getFileStatus(parent).isDirectory()) {
throw new FileAlreadyExistsException("Not a directory: " + parent);
}
}
return create(path, permission, flags.contains(CreateFlag.OVERWRITE),
bufferSize, replication, blockSize, progress);
} }
@Override @Override
public FSDataOutputStream append(Path f, int bufferSize, public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException { Progressable progress) throws IOException {
return null; throw new UnsupportedOperationException("append() Not implemented by the "
+ getClass().getSimpleName() + " FileSystem implementation");
} }
@Override @Override
@ -201,9 +252,80 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return false; return false;
} }
private OzoneKey getKeyStatus(String keyName) {
try {
return bucket.getKeyInfo(keyName);
} catch (OzoneException e) {
LOG.trace("Key:{} does not exists", keyName);
return null;
}
}
private long getModifiedTime(String modifiedTime, String key) {
try {
return OzoneUtils.formatDate(modifiedTime);
} catch (ParseException pe) {
LOG.error("Invalid time:{} for key:{}", modifiedTime, key, pe);
return 0;
}
}
private boolean isDirectory(OzoneKey key) {
LOG.trace("key name:{} size:{}", key.getObjectInfo().getKeyName(),
key.getObjectInfo().getSize());
return key.getObjectInfo().getKeyName().endsWith(OZONE_URI_DELIMITER)
&& (key.getObjectInfo().getSize() == 0);
}
@Override @Override
public FileStatus getFileStatus(Path f) throws IOException { public FileStatus getFileStatus(Path f) throws IOException {
return null; Path qualifiedPath = f.makeQualified(uri, workingDir);
String key = pathToKey(qualifiedPath);
if (key.length() == 0) {
return new FileStatus(0, true, 1, 0,
getModifiedTime(bucket.getCreatedOn(), OZONE_URI_DELIMITER),
qualifiedPath);
}
// consider this a file and get key status
OzoneKey meta = getKeyStatus(key);
if (meta == null && !key.endsWith(OZONE_URI_DELIMITER)) {
// if that fails consider this a directory
key += OZONE_URI_DELIMITER;
meta = getKeyStatus(key);
}
if (meta == null) {
LOG.trace("File:{} not found", f);
throw new FileNotFoundException(f + ": No such file or directory!");
} else if (isDirectory(meta)) {
return new FileStatus(0, true, 1, 0,
getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
qualifiedPath);
} else {
return new FileStatus(meta.getObjectInfo().getSize(), false, 1,
getDefaultBlockSize(f),
getModifiedTime(meta.getObjectInfo().getModifiedOn(), key),
qualifiedPath);
}
}
/**
* Turn a path (relative or otherwise) into an Ozone key.
*
* @param path the path of the file.
* @return the key of the object that represents the file.
*/
private String pathToKey(Path path) {
Objects.requireNonNull(path, "Path can not be null!");
if (!path.isAbsolute()) {
path = new Path(workingDir, path);
}
// removing leading '/' char
String key = path.toUri().getPath().substring(1);
LOG.trace("path for key:{} is:{}", key, path);
return key;
} }
@Override @Override

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URI;
import java.util.Objects;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.ozone.web.client.OzoneBucket;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY;
import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
/**
* Wraps OzoneInputStream implementation.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class OzoneInputStream extends FSInputStream {
private static final Log LOG = LogFactory.getLog(OzoneInputStream.class);
private final RandomAccessFile in;
/** Closed bit. Volatile so reads are non-blocking. */
private volatile boolean closed = false;
/** the ozone bucket client. */
private final OzoneBucket bucket;
/** The object key. */
private final String key;
/** Object content length. */
private final long contentLen;
/** file system stats. */
private final Statistics stats;
private final URI keyUri;
OzoneInputStream(Configuration conf, URI fsUri, OzoneBucket bucket,
String key, long contentLen, int bufferSize, Statistics statistics)
throws IOException {
Objects.requireNonNull(bucket, "bucket can not be null!");
Objects.requireNonNull(key, "kenName can not be null!");
this.bucket = bucket;
this.key = key;
this.contentLen = contentLen;
this.stats = statistics;
this.keyUri = fsUri.resolve(key);
if (conf.get(BUFFER_DIR_KEY) == null) {
conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone");
}
final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
final File tmpFile = dirAlloc.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
try {
LOG.trace("Get Key:" + this.keyUri + " tmp-file:" + tmpFile.toPath());
bucket.getKey(this.key, tmpFile.toPath());
in = new RandomAccessFile(tmpFile, "r");
statistics.incrementReadOps(1);
} catch (OzoneException oe) {
final String msg = "Error when getBytes for key = " + key;
LOG.error(msg, oe);
throw new IOException(msg, oe);
}
}
@Override
public synchronized void seek(long targetPos) throws IOException {
checkNotClosed();
// Do not allow negative seek
if (targetPos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + targetPos);
}
if (this.contentLen <= 0) {
return;
}
in.seek(targetPos);
}
@Override
public synchronized long getPos() throws IOException {
checkNotClosed();
return in.getFilePointer();
}
@Override
public boolean seekToNewSource(long l) throws IOException {
return false;
}
@Override
public synchronized int read() throws IOException {
int ch = in.read();
if (stats != null && ch != -1) {
stats.incrementBytesRead(1);
}
return ch;
}
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
Preconditions.checkArgument(buffer != null, "buffer can not be null");
int numberOfByteRead = super.read(position, buffer, offset, length);
if (stats != null && numberOfByteRead > 0) {
stats.incrementBytesRead(numberOfByteRead);
}
return numberOfByteRead;
}
@Override
public synchronized int read(byte[] buffer, int offset, int length)
throws IOException {
Preconditions.checkArgument(buffer != null, "buffer can not be null");
int numberOfByteRead = in.read(buffer, offset, length);
if (stats != null && numberOfByteRead > 0) {
stats.incrementBytesRead(numberOfByteRead);
}
return numberOfByteRead;
}
@Override
public synchronized int available() throws IOException {
checkNotClosed();
final long remainingInWrapped = contentLen - in.getFilePointer();
return (remainingInWrapped < Integer.MAX_VALUE)
? (int)remainingInWrapped
: Integer.MAX_VALUE;
}
@Override
public synchronized void close() throws IOException {
in.close();
}
@Override
public synchronized long skip(long pos) throws IOException {
return in.skipBytes((int) pos);
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(this.keyUri + ": "
+ FSExceptionMessages.STREAM_IS_CLOSED);
}
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ozone;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.ozone.web.client.OzoneBucket;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY;
import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY;
/**
* The output stream for Ozone file system.
*
* Data will be buffered on local disk, then uploaded to Ozone in
* {@link #close()} method.
*
* This class is not thread safe.
*/
public class OzoneOutputStream extends OutputStream {
private static final Log LOG = LogFactory.getLog(OzoneOutputStream.class);
private OzoneBucket bucket;
private final String key;
private final URI keyUri;
private Statistics statistics;
private LocalDirAllocator dirAlloc;
private boolean closed;
private File tmpFile;
private BufferedOutputStream backupStream;
OzoneOutputStream(Configuration conf, URI fsUri, OzoneBucket bucket,
String key, Statistics statistics) throws IOException {
this.bucket = bucket;
this.key = key;
this.keyUri = fsUri.resolve(key);
this.statistics = statistics;
if (conf.get(BUFFER_DIR_KEY) == null) {
conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone");
}
dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
tmpFile = dirAlloc.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
closed = false;
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
if (backupStream != null) {
backupStream.close();
}
try {
LOG.trace("Put tmp-file:" + tmpFile + " to key "+ keyUri);
bucket.putKey(key, tmpFile);
statistics.incrementWriteOps(1);
} catch (OzoneException oe) {
final String msg = "Uploading error: file=" + tmpFile + ", key=" + key;
LOG.error(msg, oe);
throw new IOException(msg, oe);
} finally {
if (!tmpFile.delete()) {
LOG.warn("Can not delete tmpFile: " + tmpFile);
}
}
}
@Override
public synchronized void flush() throws IOException {
backupStream.flush();
}
@Override
public synchronized void write(int b) throws IOException {
backupStream.write(b);
statistics.incrementBytesWritten(1);
}
}

View File

@ -21,6 +21,10 @@
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler; import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
@ -32,6 +36,7 @@
import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.util.Time;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
@ -96,4 +101,26 @@ public void testFileSystemInit() throws IOException {
Assert.assertTrue(fs instanceof OzoneFileSystem); Assert.assertTrue(fs instanceof OzoneFileSystem);
Assert.assertEquals(fs.getUri().getScheme(), Constants.OZONE_URI_SCHEME); Assert.assertEquals(fs.getUri().getScheme(), Constants.OZONE_URI_SCHEME);
} }
@Test
public void testOzFsReadWrite() throws IOException {
long currentTime = Time.now();
int stringLen = 20;
String data = RandomStringUtils.randomAlphanumeric(stringLen);
String filePath = RandomStringUtils.randomAlphanumeric(5);
Path path = new Path("/" + filePath);
try (FSDataOutputStream stream = fs.create(path)) {
stream.writeBytes(data);
}
FileStatus status = fs.getFileStatus(path);
Assert.assertTrue(status.getModificationTime() < currentTime);
try (FSDataInputStream inputStream = fs.open(path)) {
byte[] buffer = new byte[stringLen];
inputStream.readFully(0, buffer);
String out = new String(buffer, 0, buffer.length);
Assert.assertEquals(out, data);
}
}
} }