diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java index e1ef754c8c5..301d5e76238 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java @@ -192,7 +192,8 @@ public class KeyManagerImpl implements KeyManager { volumeName, bucketName, keyName); byte[] value = metadataManager.get(keyKey); if (value == null) { - LOG.debug("Key: {} not found", keyKey); + LOG.debug("volume:{} bucket:{} Key:{} not found", + volumeName, bucketName, keyName); throw new KSMException("Key not found", KSMException.ResultCodes.FAILED_KEY_NOT_FOUND); } diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java index 45c4172e561..0271d6ce0b2 100644 --- a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java +++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/Constants.java @@ -31,6 +31,14 @@ public class Constants { public static final String OZONE_USER_DIR = "/user"; + /** Local buffer directory. */ + public static final String BUFFER_DIR_KEY = "fs.ozone.buffer.dir"; + + /** Temporary directory. */ + public static final String BUFFER_TMP_KEY = "hadoop.tmp.dir"; + + public static final String OZONE_URI_DELIMITER = "/"; + private Constants() { } diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java index ff3340e997e..f6260ffee95 100644 --- a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java +++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java @@ -18,13 +18,18 @@ package org.apache.hadoop.fs.ozone; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.text.ParseException; import java.util.EnumSet; import java.util.Objects; +import org.apache.hadoop.ozone.web.client.OzoneKey; import org.apache.hadoop.ozone.web.client.OzoneRestClient; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.http.client.utils.URIBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +42,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ozone.web.client.OzoneBucket; import org.apache.hadoop.ozone.web.client.OzoneVolume; @@ -48,6 +54,7 @@ import static org.apache.hadoop.fs.ozone.Constants.OZONE_DEFAULT_USER; import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_SCHEME; import static org.apache.hadoop.fs.ozone.Constants.OZONE_USER_DIR; import static org.apache.hadoop.fs.ozone.Constants.OZONE_HTTP_SCHEME; +import static org.apache.hadoop.fs.ozone.Constants.OZONE_URI_DELIMITER; /** * The Ozone Filesystem implementation. @@ -76,13 +83,11 @@ public class OzoneFileSystem extends FileSystem { Objects.requireNonNull(name.getScheme(), "No scheme provided in " + name); assert getScheme().equals(name.getScheme()); - uri = name; Path path = new Path(name.getPath()); String hostStr = name.getAuthority(); String volumeStr = null; String bucketStr = null; - LOG.info("Ozone URI for ozfs initialization is " + uri); while (path != null && !path.isRoot()) { bucketStr = volumeStr; volumeStr = path.getName(); @@ -98,6 +103,10 @@ public class OzoneFileSystem extends FileSystem { } try { + uri = new URIBuilder().setScheme(OZONE_URI_SCHEME).setHost(hostStr) + .setPath(OZONE_URI_DELIMITER + volumeStr + OZONE_URI_DELIMITER + + bucketStr + OZONE_URI_DELIMITER).build(); + LOG.info("Ozone URI for ozfs initialization is " + uri); this.ozone = new OzoneRestClient(OZONE_HTTP_SCHEME + hostStr); try { this.userName = @@ -143,7 +152,16 @@ public class OzoneFileSystem extends FileSystem { @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return null; + LOG.trace("open() path:{}", f); + final FileStatus fileStatus = getFileStatus(f); + + if (fileStatus.isDirectory()) { + throw new FileNotFoundException("Can't open directory " + f + " to read"); + } + + return new FSDataInputStream( + new OzoneInputStream(getConf(), uri, bucket, pathToKey(f), + fileStatus.getLen(), bufferSize, statistics)); } @Override @@ -151,7 +169,31 @@ public class OzoneFileSystem extends FileSystem { boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - return null; + LOG.trace("create() path:{}", f); + final String key = pathToKey(f); + final FileStatus status; + try { + status = getFileStatus(f); + if (status.isDirectory()) { + throw new FileAlreadyExistsException(f + " is a directory"); + } else { + if (!overwrite) { + // path references a file and overwrite is disabled + throw new FileAlreadyExistsException(f + " already exists"); + } + LOG.debug("Overwriting file {}", f); + //TODO: Delete the existing file here + } + } catch (FileNotFoundException ignored) { + // This exception needs to ignored as this means that the file currently + // does not exists and a new file can thus be created. + } + + final OzoneOutputStream stream = + new OzoneOutputStream(getConf(), uri, bucket, key, this.statistics); + // We pass null to FSDataOutputStream so it won't count writes that + // are being buffered to a file + return new FSDataOutputStream(stream, null); } @Override @@ -162,13 +204,22 @@ public class OzoneFileSystem extends FileSystem { short replication, long blockSize, Progressable progress) throws IOException { - return null; + final Path parent = path.getParent(); + if (parent != null) { + // expect this to raise an exception if there is no parent + if (!getFileStatus(parent).isDirectory()) { + throw new FileAlreadyExistsException("Not a directory: " + parent); + } + } + return create(path, permission, flags.contains(CreateFlag.OVERWRITE), + bufferSize, replication, blockSize, progress); } @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { - return null; + throw new UnsupportedOperationException("append() Not implemented by the " + + getClass().getSimpleName() + " FileSystem implementation"); } @Override @@ -201,9 +252,80 @@ public class OzoneFileSystem extends FileSystem { return false; } + private OzoneKey getKeyStatus(String keyName) { + try { + return bucket.getKeyInfo(keyName); + } catch (OzoneException e) { + LOG.trace("Key:{} does not exists", keyName); + return null; + } + } + + private long getModifiedTime(String modifiedTime, String key) { + try { + return OzoneUtils.formatDate(modifiedTime); + } catch (ParseException pe) { + LOG.error("Invalid time:{} for key:{}", modifiedTime, key, pe); + return 0; + } + } + + private boolean isDirectory(OzoneKey key) { + LOG.trace("key name:{} size:{}", key.getObjectInfo().getKeyName(), + key.getObjectInfo().getSize()); + return key.getObjectInfo().getKeyName().endsWith(OZONE_URI_DELIMITER) + && (key.getObjectInfo().getSize() == 0); + } + @Override public FileStatus getFileStatus(Path f) throws IOException { - return null; + Path qualifiedPath = f.makeQualified(uri, workingDir); + String key = pathToKey(qualifiedPath); + + if (key.length() == 0) { + return new FileStatus(0, true, 1, 0, + getModifiedTime(bucket.getCreatedOn(), OZONE_URI_DELIMITER), + qualifiedPath); + } + + // consider this a file and get key status + OzoneKey meta = getKeyStatus(key); + if (meta == null && !key.endsWith(OZONE_URI_DELIMITER)) { + // if that fails consider this a directory + key += OZONE_URI_DELIMITER; + meta = getKeyStatus(key); + } + + if (meta == null) { + LOG.trace("File:{} not found", f); + throw new FileNotFoundException(f + ": No such file or directory!"); + } else if (isDirectory(meta)) { + return new FileStatus(0, true, 1, 0, + getModifiedTime(meta.getObjectInfo().getModifiedOn(), key), + qualifiedPath); + } else { + return new FileStatus(meta.getObjectInfo().getSize(), false, 1, + getDefaultBlockSize(f), + getModifiedTime(meta.getObjectInfo().getModifiedOn(), key), + qualifiedPath); + } + } + + /** + * Turn a path (relative or otherwise) into an Ozone key. + * + * @param path the path of the file. + * @return the key of the object that represents the file. + */ + private String pathToKey(Path path) { + Objects.requireNonNull(path, "Path can not be null!"); + if (!path.isAbsolute()) { + path = new Path(workingDir, path); + } + // removing leading '/' char + String key = path.toUri().getPath().substring(1); + LOG.trace("path for key:{} is:{}", key, path); + return key; } @Override diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java new file mode 100644 index 00000000000..07733e55ea0 --- /dev/null +++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneInputStream.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.ozone; + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.net.URI; +import java.util.Objects; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.ozone.web.client.OzoneBucket; +import org.apache.hadoop.ozone.web.exceptions.OzoneException; + +import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY; +import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY; + +/** + * Wraps OzoneInputStream implementation. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class OzoneInputStream extends FSInputStream { + private static final Log LOG = LogFactory.getLog(OzoneInputStream.class); + + private final RandomAccessFile in; + + /** Closed bit. Volatile so reads are non-blocking. */ + private volatile boolean closed = false; + + /** the ozone bucket client. */ + private final OzoneBucket bucket; + + /** The object key. */ + private final String key; + + /** Object content length. */ + private final long contentLen; + + /** file system stats. */ + private final Statistics stats; + + private final URI keyUri; + + OzoneInputStream(Configuration conf, URI fsUri, OzoneBucket bucket, + String key, long contentLen, int bufferSize, Statistics statistics) + throws IOException { + Objects.requireNonNull(bucket, "bucket can not be null!"); + Objects.requireNonNull(key, "kenName can not be null!"); + this.bucket = bucket; + this.key = key; + this.contentLen = contentLen; + this.stats = statistics; + this.keyUri = fsUri.resolve(key); + + if (conf.get(BUFFER_DIR_KEY) == null) { + conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone"); + } + final LocalDirAllocator dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY); + final File tmpFile = dirAlloc.createTmpFileForWrite("output-", + LocalDirAllocator.SIZE_UNKNOWN, conf); + try { + LOG.trace("Get Key:" + this.keyUri + " tmp-file:" + tmpFile.toPath()); + bucket.getKey(this.key, tmpFile.toPath()); + in = new RandomAccessFile(tmpFile, "r"); + statistics.incrementReadOps(1); + } catch (OzoneException oe) { + final String msg = "Error when getBytes for key = " + key; + LOG.error(msg, oe); + throw new IOException(msg, oe); + } + } + + @Override + public synchronized void seek(long targetPos) throws IOException { + checkNotClosed(); + // Do not allow negative seek + if (targetPos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + targetPos); + } + + if (this.contentLen <= 0) { + return; + } + + in.seek(targetPos); + } + + @Override + public synchronized long getPos() throws IOException { + checkNotClosed(); + return in.getFilePointer(); + } + + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public synchronized int read() throws IOException { + int ch = in.read(); + if (stats != null && ch != -1) { + stats.incrementBytesRead(1); + } + return ch; + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + Preconditions.checkArgument(buffer != null, "buffer can not be null"); + int numberOfByteRead = super.read(position, buffer, offset, length); + + if (stats != null && numberOfByteRead > 0) { + stats.incrementBytesRead(numberOfByteRead); + } + return numberOfByteRead; + } + + @Override + public synchronized int read(byte[] buffer, int offset, int length) + throws IOException { + Preconditions.checkArgument(buffer != null, "buffer can not be null"); + int numberOfByteRead = in.read(buffer, offset, length); + if (stats != null && numberOfByteRead > 0) { + stats.incrementBytesRead(numberOfByteRead); + } + return numberOfByteRead; + } + + @Override + public synchronized int available() throws IOException { + checkNotClosed(); + + final long remainingInWrapped = contentLen - in.getFilePointer(); + return (remainingInWrapped < Integer.MAX_VALUE) + ? (int)remainingInWrapped + : Integer.MAX_VALUE; + } + + @Override + public synchronized void close() throws IOException { + in.close(); + } + + @Override + public synchronized long skip(long pos) throws IOException { + return in.skipBytes((int) pos); + } + + /** + * Verify that the input stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException(this.keyUri + ": " + + FSExceptionMessages.STREAM_IS_CLOSED); + } + } + +} diff --git a/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java new file mode 100644 index 00000000000..bf93c9ed2e0 --- /dev/null +++ b/hadoop-tools/hadoop-ozone/src/main/java/org/apache/hadoop/fs/ozone/OzoneOutputStream.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.ozone; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.ozone.web.client.OzoneBucket; +import org.apache.hadoop.ozone.web.exceptions.OzoneException; + +import static org.apache.hadoop.fs.ozone.Constants.BUFFER_DIR_KEY; +import static org.apache.hadoop.fs.ozone.Constants.BUFFER_TMP_KEY; + + +/** + * The output stream for Ozone file system. + * + * Data will be buffered on local disk, then uploaded to Ozone in + * {@link #close()} method. + * + * This class is not thread safe. + */ +public class OzoneOutputStream extends OutputStream { + private static final Log LOG = LogFactory.getLog(OzoneOutputStream.class); + private OzoneBucket bucket; + private final String key; + private final URI keyUri; + private Statistics statistics; + private LocalDirAllocator dirAlloc; + private boolean closed; + private File tmpFile; + private BufferedOutputStream backupStream; + + OzoneOutputStream(Configuration conf, URI fsUri, OzoneBucket bucket, + String key, Statistics statistics) throws IOException { + this.bucket = bucket; + this.key = key; + this.keyUri = fsUri.resolve(key); + this.statistics = statistics; + + if (conf.get(BUFFER_DIR_KEY) == null) { + conf.set(BUFFER_DIR_KEY, conf.get(BUFFER_TMP_KEY) + "/ozone"); + } + dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY); + tmpFile = dirAlloc.createTmpFileForWrite("output-", + LocalDirAllocator.SIZE_UNKNOWN, conf); + backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile)); + + closed = false; + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + closed = true; + if (backupStream != null) { + backupStream.close(); + } + try { + LOG.trace("Put tmp-file:" + tmpFile + " to key "+ keyUri); + bucket.putKey(key, tmpFile); + statistics.incrementWriteOps(1); + } catch (OzoneException oe) { + final String msg = "Uploading error: file=" + tmpFile + ", key=" + key; + LOG.error(msg, oe); + throw new IOException(msg, oe); + } finally { + if (!tmpFile.delete()) { + LOG.warn("Can not delete tmpFile: " + tmpFile); + } + } + } + + @Override + public synchronized void flush() throws IOException { + backupStream.flush(); + } + + @Override + public synchronized void write(int b) throws IOException { + backupStream.write(b); + statistics.incrementBytesWritten(1); + } + +} diff --git a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java index 469753771a4..24b40d8c112 100644 --- a/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java +++ b/hadoop-tools/hadoop-ozone/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileInterfaces.java @@ -21,6 +21,10 @@ package org.apache.hadoop.fs.ozone; import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -32,6 +36,7 @@ import org.apache.hadoop.ozone.web.handlers.UserArgs; import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.util.Time; import org.junit.BeforeClass; import org.junit.AfterClass; import org.junit.Test; @@ -96,4 +101,26 @@ public class TestOzoneFileInterfaces { Assert.assertTrue(fs instanceof OzoneFileSystem); Assert.assertEquals(fs.getUri().getScheme(), Constants.OZONE_URI_SCHEME); } + + @Test + public void testOzFsReadWrite() throws IOException { + long currentTime = Time.now(); + int stringLen = 20; + String data = RandomStringUtils.randomAlphanumeric(stringLen); + String filePath = RandomStringUtils.randomAlphanumeric(5); + Path path = new Path("/" + filePath); + try (FSDataOutputStream stream = fs.create(path)) { + stream.writeBytes(data); + } + + FileStatus status = fs.getFileStatus(path); + Assert.assertTrue(status.getModificationTime() < currentTime); + + try (FSDataInputStream inputStream = fs.open(path)) { + byte[] buffer = new byte[stringLen]; + inputStream.readFully(0, buffer); + String out = new String(buffer, 0, buffer.length); + Assert.assertEquals(out, data); + } + } }