HADOOP-8886. Remove KFS support. Contributed by Eli Collins

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1395820 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-10-09 00:16:26 +00:00
parent 38e1a5a5d6
commit 2115bd2ec7
14 changed files with 2 additions and 1390 deletions

View File

@ -7,6 +7,8 @@ Trunk (Unreleased)
HADOOP-8124. Remove the deprecated FSDataOutputStream constructor,
FSDataOutputStream.sync() and Syncable.sync(). (szetszwo)
HADOOP-8886. Remove KFS support. (eli)
NEW FEATURES
HADOOP-8469. Make NetworkTopology class pluggable. (Junping Du via

View File

@ -175,18 +175,6 @@
<Bug pattern="ES_COMPARING_STRINGS_WITH_EQ" />
</Match>
<Match>
<Class name="org.apache.hadoop.fs.kfs.KFSOutputStream" />
<Field name="path" />
<Bug pattern="URF_UNREAD_FIELD" />
</Match>
<Match>
<Class name="org.apache.hadoop.fs.kfs.KosmosFileSystem" />
<Method name="initialize" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.io.Closeable" />
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />

View File

@ -194,11 +194,6 @@
<artifactId>avro</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>net.sf.kosmosfs</groupId>
<artifactId>kfs</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>

View File

@ -1,59 +0,0 @@
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* We need to provide the ability to the code in fs/kfs without really
* having a KFS deployment. In particular, the glue code that wraps
* around calls to KfsAccess object. This is accomplished by defining a
* filesystem implementation interface:
* -- for testing purposes, a dummy implementation of this interface
* will suffice; as long as the dummy implementation is close enough
* to doing what KFS does, we are good.
* -- for deployment purposes with KFS, this interface is
* implemented by the KfsImpl object.
*/
package org.apache.hadoop.fs.kfs;
import java.io.*;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Progressable;
interface IFSImpl {
public boolean exists(String path) throws IOException;
public boolean isDirectory(String path) throws IOException;
public boolean isFile(String path) throws IOException;
public String[] readdir(String path) throws IOException;
public FileStatus[] readdirplus(Path path) throws IOException;
public int mkdirs(String path) throws IOException;
public int rename(String source, String dest) throws IOException;
public int rmdir(String path) throws IOException;
public int remove(String path) throws IOException;
public long filesize(String path) throws IOException;
public short getReplication(String path) throws IOException;
public short setReplication(String path, short replication) throws IOException;
public String[][] getDataLocation(String path, long start, long len) throws IOException;
public long getModificationTime(String path) throws IOException;
public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException;
public FSDataInputStream open(String path, int bufferSize) throws IOException;
public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException;
};

View File

@ -1,47 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.kfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
/**
* This class contains constants for configuration keys used
* in the kfs file system.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class KFSConfigKeys extends CommonConfigurationKeys {
public static final String KFS_BLOCK_SIZE_KEY = "kfs.blocksize";
public static final long KFS_BLOCK_SIZE_DEFAULT = 64*1024*1024;
public static final String KFS_REPLICATION_KEY = "kfs.replication";
public static final short KFS_REPLICATION_DEFAULT = 1;
public static final String KFS_STREAM_BUFFER_SIZE_KEY =
"kfs.stream-buffer-size";
public static final int KFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
public static final String KFS_BYTES_PER_CHECKSUM_KEY =
"kfs.bytes-per-checksum";
public static final int KFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
public static final String KFS_CLIENT_WRITE_PACKET_SIZE_KEY =
"kfs.client-write-packet-size";
public static final int KFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
}

View File

@ -1,171 +0,0 @@
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* Provide the implementation of KFS which turn into calls to KfsAccess.
*/
package org.apache.hadoop.fs.kfs;
import java.io.*;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.kosmix.kosmosfs.access.KfsAccess;
import org.kosmix.kosmosfs.access.KfsFileAttr;
import org.apache.hadoop.util.Progressable;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class KFSImpl implements IFSImpl {
private KfsAccess kfsAccess = null;
private FileSystem.Statistics statistics;
@Deprecated
public KFSImpl(String metaServerHost, int metaServerPort
) throws IOException {
this(metaServerHost, metaServerPort, null);
}
public KFSImpl(String metaServerHost, int metaServerPort,
FileSystem.Statistics stats) throws IOException {
kfsAccess = new KfsAccess(metaServerHost, metaServerPort);
statistics = stats;
}
@Override
public boolean exists(String path) throws IOException {
return kfsAccess.kfs_exists(path);
}
@Override
public boolean isDirectory(String path) throws IOException {
return kfsAccess.kfs_isDirectory(path);
}
@Override
public boolean isFile(String path) throws IOException {
return kfsAccess.kfs_isFile(path);
}
@Override
public String[] readdir(String path) throws IOException {
return kfsAccess.kfs_readdir(path);
}
@Override
public FileStatus[] readdirplus(Path path) throws IOException {
String srep = path.toUri().getPath();
KfsFileAttr[] fattr = kfsAccess.kfs_readdirplus(srep);
if (fattr == null)
return null;
int numEntries = 0;
for (int i = 0; i < fattr.length; i++) {
if ((fattr[i].filename.compareTo(".") == 0) || (fattr[i].filename.compareTo("target/generated-sources") == 0))
continue;
numEntries++;
}
FileStatus[] fstatus = new FileStatus[numEntries];
int j = 0;
for (int i = 0; i < fattr.length; i++) {
if ((fattr[i].filename.compareTo(".") == 0) || (fattr[i].filename.compareTo("target/generated-sources") == 0))
continue;
Path fn = new Path(path, fattr[i].filename);
if (fattr[i].isDirectory)
fstatus[j] = new FileStatus(0, true, 1, 0, fattr[i].modificationTime, fn);
else
fstatus[j] = new FileStatus(fattr[i].filesize, fattr[i].isDirectory,
fattr[i].replication,
(long)
(1 << 26),
fattr[i].modificationTime,
fn);
j++;
}
return fstatus;
}
@Override
public int mkdirs(String path) throws IOException {
return kfsAccess.kfs_mkdirs(path);
}
@Override
public int rename(String source, String dest) throws IOException {
return kfsAccess.kfs_rename(source, dest);
}
@Override
public int rmdir(String path) throws IOException {
return kfsAccess.kfs_rmdir(path);
}
@Override
public int remove(String path) throws IOException {
return kfsAccess.kfs_remove(path);
}
@Override
public long filesize(String path) throws IOException {
return kfsAccess.kfs_filesize(path);
}
@Override
public short getReplication(String path) throws IOException {
return kfsAccess.kfs_getReplication(path);
}
@Override
public short setReplication(String path, short replication) throws IOException {
return kfsAccess.kfs_setReplication(path, replication);
}
@Override
public String[][] getDataLocation(String path, long start, long len) throws IOException {
return kfsAccess.kfs_getDataLocation(path, start, len);
}
@Override
public long getModificationTime(String path) throws IOException {
return kfsAccess.kfs_getModificationTime(path);
}
@Override
public FSDataInputStream open(String path, int bufferSize) throws IOException {
return new FSDataInputStream(new KFSInputStream(kfsAccess, path,
statistics));
}
@Override
public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException {
return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, replication, false, progress),
statistics);
}
@Override
public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException {
// when opening for append, # of replicas is ignored
return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, (short) 1, true, progress),
statistics);
}
}

View File

@ -1,143 +0,0 @@
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* Implements the Hadoop FSInputStream interfaces to allow applications to read
* files in Kosmos File System (KFS).
*/
package org.apache.hadoop.fs.kfs;
import java.io.*;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSInputStream;
import org.kosmix.kosmosfs.access.KfsAccess;
import org.kosmix.kosmosfs.access.KfsInputChannel;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class KFSInputStream extends FSInputStream {
private KfsInputChannel kfsChannel;
private FileSystem.Statistics statistics;
private long fsize;
@Deprecated
public KFSInputStream(KfsAccess kfsAccess, String path) {
this(kfsAccess, path, null);
}
public KFSInputStream(KfsAccess kfsAccess, String path,
FileSystem.Statistics stats) {
this.statistics = stats;
this.kfsChannel = kfsAccess.kfs_open(path);
if (this.kfsChannel != null)
this.fsize = kfsAccess.kfs_filesize(path);
else
this.fsize = 0;
}
@Override
public long getPos() throws IOException {
if (kfsChannel == null) {
throw new IOException("File closed");
}
return kfsChannel.tell();
}
@Override
public synchronized int available() throws IOException {
if (kfsChannel == null) {
throw new IOException("File closed");
}
return (int) (this.fsize - getPos());
}
@Override
public synchronized void seek(long targetPos) throws IOException {
if (kfsChannel == null) {
throw new IOException("File closed");
}
kfsChannel.seek(targetPos);
}
@Override
public synchronized boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public synchronized int read() throws IOException {
if (kfsChannel == null) {
throw new IOException("File closed");
}
byte b[] = new byte[1];
int res = read(b, 0, 1);
if (res == 1) {
if (statistics != null) {
statistics.incrementBytesRead(1);
}
return b[0] & 0xff;
}
return -1;
}
@Override
public synchronized int read(byte b[], int off, int len) throws IOException {
if (kfsChannel == null) {
throw new IOException("File closed");
}
int res;
res = kfsChannel.read(ByteBuffer.wrap(b, off, len));
// Use -1 to signify EOF
if (res == 0)
return -1;
if (statistics != null) {
statistics.incrementBytesRead(res);
}
return res;
}
@Override
public synchronized void close() throws IOException {
if (kfsChannel == null) {
return;
}
kfsChannel.close();
kfsChannel = null;
}
@Override
public boolean markSupported() {
return false;
}
@Override
public void mark(int readLimit) {
// Do nothing
}
@Override
public void reset() throws IOException {
throw new IOException("Mark not supported");
}
}

View File

@ -1,99 +0,0 @@
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* Implements the Hadoop FSOutputStream interfaces to allow applications to write to
* files in Kosmos File System (KFS).
*/
package org.apache.hadoop.fs.kfs;
import java.io.*;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.Progressable;
import org.kosmix.kosmosfs.access.KfsAccess;
import org.kosmix.kosmosfs.access.KfsOutputChannel;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class KFSOutputStream extends OutputStream {
private String path;
private KfsOutputChannel kfsChannel;
private Progressable progressReporter;
public KFSOutputStream(KfsAccess kfsAccess, String path, short replication,
boolean append, Progressable prog) {
this.path = path;
if ((append) && (kfsAccess.kfs_isFile(path)))
this.kfsChannel = kfsAccess.kfs_append(path);
else
this.kfsChannel = kfsAccess.kfs_create(path, replication);
this.progressReporter = prog;
}
public long getPos() throws IOException {
if (kfsChannel == null) {
throw new IOException("File closed");
}
return kfsChannel.tell();
}
@Override
public void write(int v) throws IOException {
if (kfsChannel == null) {
throw new IOException("File closed");
}
byte[] b = new byte[1];
b[0] = (byte) v;
write(b, 0, 1);
}
@Override
public void write(byte b[], int off, int len) throws IOException {
if (kfsChannel == null) {
throw new IOException("File closed");
}
// touch the progress before going into KFS since the call can block
progressReporter.progress();
kfsChannel.write(ByteBuffer.wrap(b, off, len));
}
@Override
public void flush() throws IOException {
if (kfsChannel == null) {
throw new IOException("File closed");
}
// touch the progress before going into KFS since the call can block
progressReporter.progress();
kfsChannel.sync();
}
@Override
public synchronized void close() throws IOException {
if (kfsChannel == null) {
return;
}
flush();
kfsChannel.close();
kfsChannel = null;
}
}

View File

@ -1,352 +0,0 @@
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* Implements the Hadoop FS interfaces to allow applications to store
*files in Kosmos File System (KFS).
*/
package org.apache.hadoop.fs.kfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
/**
* A FileSystem backed by KFS.
*
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class KosmosFileSystem extends FileSystem {
private FileSystem localFs;
private IFSImpl kfsImpl = null;
private URI uri;
private Path workingDir = new Path("/");
public KosmosFileSystem() {
}
KosmosFileSystem(IFSImpl fsimpl) {
this.kfsImpl = fsimpl;
}
/**
* Return the protocol scheme for the FileSystem.
* <p/>
*
* @return <code>kfs</code>
*/
@Override
public String getScheme() {
return "kfs";
}
@Override
public URI getUri() {
return uri;
}
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
try {
if (kfsImpl == null) {
if (uri.getHost() == null) {
kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""),
conf.getInt("fs.kfs.metaServerPort", -1),
statistics);
} else {
kfsImpl = new KFSImpl(uri.getHost(), uri.getPort(), statistics);
}
}
this.localFs = FileSystem.getLocal(conf);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir = new Path("/user", System.getProperty("user.name")
).makeQualified(this);
setConf(conf);
} catch (Exception e) {
e.printStackTrace();
System.out.println("Unable to initialize KFS");
System.exit(-1);
}
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
@Override
public void setWorkingDirectory(Path dir) {
workingDir = makeAbsolute(dir);
}
private Path makeAbsolute(Path path) {
if (path.isAbsolute()) {
return path;
}
return new Path(workingDir, path);
}
@Override
public boolean mkdirs(Path path, FsPermission permission
) throws IOException {
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
int res;
// System.out.println("Calling mkdirs on: " + srep);
res = kfsImpl.mkdirs(srep);
return res == 0;
}
@Override
public boolean isDirectory(Path path) throws IOException {
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
// System.out.println("Calling isdir on: " + srep);
return kfsImpl.isDirectory(srep);
}
@Override
public boolean isFile(Path path) throws IOException {
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
return kfsImpl.isFile(srep);
}
@Override
public FileStatus[] listStatus(Path path) throws IOException {
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
if(!kfsImpl.exists(srep))
throw new FileNotFoundException("File " + path + " does not exist.");
if (kfsImpl.isFile(srep))
return new FileStatus[] { getFileStatus(path) } ;
return kfsImpl.readdirplus(absolute);
}
@Override
public FileStatus getFileStatus(Path path) throws IOException {
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
if (!kfsImpl.exists(srep)) {
throw new FileNotFoundException("File " + path + " does not exist.");
}
if (kfsImpl.isDirectory(srep)) {
// System.out.println("Status of path: " + path + " is dir");
return new FileStatus(0, true, 1, 0, kfsImpl.getModificationTime(srep),
path.makeQualified(this));
} else {
// System.out.println("Status of path: " + path + " is file");
return new FileStatus(kfsImpl.filesize(srep), false,
kfsImpl.getReplication(srep),
getDefaultBlockSize(),
kfsImpl.getModificationTime(srep),
path.makeQualified(this));
}
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
Path parent = f.getParent();
if (parent != null && !mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent);
}
Path absolute = makeAbsolute(f);
String srep = absolute.toUri().getPath();
return kfsImpl.append(srep, bufferSize, progress);
}
@Override
public FSDataOutputStream create(Path file, FsPermission permission,
boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress)
throws IOException {
if (exists(file)) {
if (overwrite) {
delete(file, true);
} else {
throw new IOException("File already exists: " + file);
}
}
Path parent = file.getParent();
if (parent != null && !mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent);
}
Path absolute = makeAbsolute(file);
String srep = absolute.toUri().getPath();
return kfsImpl.create(srep, replication, bufferSize, progress);
}
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
if (!exists(path))
throw new IOException("File does not exist: " + path);
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
return kfsImpl.open(srep, bufferSize);
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
Path absoluteS = makeAbsolute(src);
String srepS = absoluteS.toUri().getPath();
Path absoluteD = makeAbsolute(dst);
String srepD = absoluteD.toUri().getPath();
// System.out.println("Calling rename on: " + srepS + " -> " + srepD);
return kfsImpl.rename(srepS, srepD) == 0;
}
// recursively delete the directory and its contents
@Override
public boolean delete(Path path, boolean recursive) throws IOException {
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
if (kfsImpl.isFile(srep))
return kfsImpl.remove(srep) == 0;
FileStatus[] dirEntries = listStatus(absolute);
if (!recursive && (dirEntries.length != 0)) {
throw new IOException("Directory " + path.toString() +
" is not empty.");
}
for (int i = 0; i < dirEntries.length; i++) {
delete(new Path(absolute, dirEntries[i].getPath()), recursive);
}
return kfsImpl.rmdir(srep) == 0;
}
@Override
public short getDefaultReplication() {
return 3;
}
@Override
public boolean setReplication(Path path, short replication)
throws IOException {
Path absolute = makeAbsolute(path);
String srep = absolute.toUri().getPath();
int res = kfsImpl.setReplication(srep, replication);
return res >= 0;
}
// 64MB is the KFS block size
@Override
public long getDefaultBlockSize() {
return 1 << 26;
}
@Deprecated
public void lock(Path path, boolean shared) throws IOException {
}
@Deprecated
public void release(Path path) throws IOException {
}
/**
* Return null if the file doesn't exist; otherwise, get the
* locations of the various chunks of the file file from KFS.
*/
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
long len) throws IOException {
if (file == null) {
return null;
}
String srep = makeAbsolute(file.getPath()).toUri().getPath();
String[][] hints = kfsImpl.getDataLocation(srep, start, len);
if (hints == null) {
return null;
}
BlockLocation[] result = new BlockLocation[hints.length];
long blockSize = getDefaultBlockSize();
long length = len;
long blockStart = start;
for(int i=0; i < result.length; ++i) {
result[i] = new BlockLocation(null, hints[i], blockStart,
length < blockSize ? length : blockSize);
blockStart += blockSize;
length -= blockSize;
}
return result;
}
@Override
public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
}
@Override
public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
}
@Override
public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException {
return tmpLocalFile;
}
@Override
public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException {
moveFromLocalFile(tmpLocalFile, fsOutputFile);
}
}

View File

@ -1,98 +0,0 @@
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head></head>
<body>
<h1>A client for the Kosmos filesystem (KFS)</h1>
<h3>Introduction</h3>
This pages describes how to use Kosmos Filesystem
(<a href="http://kosmosfs.sourceforge.net"> KFS </a>) as a backing
store with Hadoop. This page assumes that you have downloaded the
KFS software and installed necessary binaries as outlined in the KFS
documentation.
<h3>Steps</h3>
<ul>
<li>In the Hadoop conf directory edit core-site.xml,
add the following:
<pre>
&lt;property&gt;
&lt;name&gt;fs.kfs.impl&lt;/name&gt;
&lt;value&gt;org.apache.hadoop.fs.kfs.KosmosFileSystem&lt;/value&gt;
&lt;description&gt;The FileSystem for kfs: uris.&lt;/description&gt;
&lt;/property&gt;
</pre>
<li>In the Hadoop conf directory edit core-site.xml,
adding the following (with appropriate values for
&lt;server&gt; and &lt;port&gt;):
<pre>
&lt;property&gt;
&lt;name&gt;fs.default.name&lt;/name&gt;
&lt;value&gt;kfs://&lt;server:port&gt;&lt;/value&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.kfs.metaServerHost&lt;/name&gt;
&lt;value&gt;&lt;server&gt;&lt;/value&gt;
&lt;description&gt;The location of the KFS meta server.&lt;/description&gt;
&lt;/property&gt;
&lt;property&gt;
&lt;name&gt;fs.kfs.metaServerPort&lt;/name&gt;
&lt;value&gt;&lt;port&gt;&lt;/value&gt;
&lt;description&gt;The location of the meta server's port.&lt;/description&gt;
&lt;/property&gt;
</pre>
</li>
<li>Copy KFS's <i> kfs-0.1.jar </i> to Hadoop's lib directory. This step
enables Hadoop's to load the KFS specific modules. Note
that, kfs-0.1.jar was built when you compiled KFS source
code. This jar file contains code that calls KFS's client
library code via JNI; the native code is in KFS's <i>
libkfsClient.so </i> library.
</li>
<li> When the Hadoop map/reduce trackers start up, those
processes (on local as well as remote nodes) will now need to load
KFS's <i> libkfsClient.so </i> library. To simplify this process, it is advisable to
store libkfsClient.so in an NFS accessible directory (similar to where
Hadoop binaries/scripts are stored); then, modify Hadoop's
conf/hadoop-env.sh adding the following line and providing suitable
value for &lt;path&gt;:
<pre>
export LD_LIBRARY_PATH=&lt;path&gt;
</pre>
<li>Start only the map/reduce trackers
<br />
example: execute Hadoop's bin/start-mapred.sh</li>
</ul>
<br/>
If the map/reduce job trackers start up, all file-I/O is done to KFS.
</body>
</html>

View File

@ -17,6 +17,5 @@ org.apache.hadoop.fs.LocalFileSystem
org.apache.hadoop.fs.viewfs.ViewFileSystem
org.apache.hadoop.fs.s3.S3FileSystem
org.apache.hadoop.fs.s3native.NativeS3FileSystem
org.apache.hadoop.fs.kfs.KosmosFileSystem
org.apache.hadoop.fs.ftp.FTPFileSystem
org.apache.hadoop.fs.HarFileSystem

View File

@ -774,42 +774,6 @@
<description>Replication factor</description>
</property>
<!-- Kosmos File System -->
<property>
<name>kfs.stream-buffer-size</name>
<value>4096</value>
<description>The size of buffer to stream files.
The size of this buffer should probably be a multiple of hardware
page size (4096 on Intel x86), and it determines how much data is
buffered during read and write operations.</description>
</property>
<property>
<name>kfs.bytes-per-checksum</name>
<value>512</value>
<description>The number of bytes per checksum. Must not be larger than
kfs.stream-buffer-size</description>
</property>
<property>
<name>kfs.client-write-packet-size</name>
<value>65536</value>
<description>Packet size for clients to write</description>
</property>
<property>
<name>kfs.blocksize</name>
<value>67108864</value>
<description>Block size</description>
</property>
<property>
<name>kfs.replication</name>
<value>3</value>
<description>Replication factor</description>
</property>
<!-- FTP file system -->
<property>
<name>ftp.stream-buffer-size</name>

View File

@ -1,168 +0,0 @@
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* We need to provide the ability to the code in fs/kfs without really
* having a KFS deployment. For this purpose, use the LocalFileSystem
* as a way to "emulate" KFS.
*/
package org.apache.hadoop.fs.kfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Progressable;
public class KFSEmulationImpl implements IFSImpl {
FileSystem localFS;
public KFSEmulationImpl(Configuration conf) throws IOException {
localFS = FileSystem.getLocal(conf);
}
@Override
public boolean exists(String path) throws IOException {
return localFS.exists(new Path(path));
}
@Override
public boolean isDirectory(String path) throws IOException {
return localFS.isDirectory(new Path(path));
}
@Override
public boolean isFile(String path) throws IOException {
return localFS.isFile(new Path(path));
}
@Override
public String[] readdir(String path) throws IOException {
FileStatus[] p = localFS.listStatus(new Path(path));
try {
p = localFS.listStatus(new Path(path));
} catch ( FileNotFoundException fnfe ) {
return null;
}
String[] entries = null;
entries = new String[p.length];
for (int i = 0; i < p.length; i++)
entries[i] = p[i].getPath().toString();
return entries;
}
@Override
public FileStatus[] readdirplus(Path path) throws IOException {
return localFS.listStatus(path);
}
@Override
public int mkdirs(String path) throws IOException {
if (localFS.mkdirs(new Path(path)))
return 0;
return -1;
}
@Override
public int rename(String source, String dest) throws IOException {
if (localFS.rename(new Path(source), new Path(dest)))
return 0;
return -1;
}
@Override
public int rmdir(String path) throws IOException {
if (isDirectory(path)) {
// the directory better be empty
String[] dirEntries = readdir(path);
if ((dirEntries.length <= 2) && (localFS.delete(new Path(path), true)))
return 0;
}
return -1;
}
@Override
public int remove(String path) throws IOException {
if (isFile(path) && (localFS.delete(new Path(path), true)))
return 0;
return -1;
}
@Override
public long filesize(String path) throws IOException {
return localFS.getFileStatus(new Path(path)).getLen();
}
@Override
public short getReplication(String path) throws IOException {
return 1;
}
@Override
public short setReplication(String path, short replication) throws IOException {
return 1;
}
@Override
public String[][] getDataLocation(String path, long start, long len) throws IOException {
BlockLocation[] blkLocations =
localFS.getFileBlockLocations(localFS.getFileStatus(new Path(path)),
start, len);
if ((blkLocations == null) || (blkLocations.length == 0)) {
return new String[0][];
}
int blkCount = blkLocations.length;
String[][]hints = new String[blkCount][];
for (int i=0; i < blkCount ; i++) {
String[] hosts = blkLocations[i].getHosts();
hints[i] = new String[hosts.length];
hints[i] = hosts;
}
return hints;
}
@Override
public long getModificationTime(String path) throws IOException {
FileStatus s = localFS.getFileStatus(new Path(path));
if (s == null)
return 0;
return s.getModificationTime();
}
@Override
public FSDataOutputStream append(String path, int bufferSize, Progressable progress) throws IOException {
// besides path/overwrite, the other args don't matter for
// testing purposes.
return localFS.append(new Path(path));
}
@Override
public FSDataOutputStream create(String path, short replication, int bufferSize, Progressable progress) throws IOException {
// besides path/overwrite, the other args don't matter for
// testing purposes.
return localFS.create(new Path(path));
}
@Override
public FSDataInputStream open(String path, int bufferSize) throws IOException {
return localFS.open(new Path(path));
}
};

View File

@ -1,199 +0,0 @@
/**
*
* Licensed under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*
* Unit tests for testing the KosmosFileSystem API implementation.
*/
package org.apache.hadoop.fs.kfs;
import java.io.IOException;
import java.net.URI;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
public class TestKosmosFileSystem extends TestCase {
KosmosFileSystem kosmosFileSystem;
KFSEmulationImpl kfsEmul;
Path baseDir;
@Override
protected void setUp() throws IOException {
Configuration conf = new Configuration();
kfsEmul = new KFSEmulationImpl(conf);
kosmosFileSystem = new KosmosFileSystem(kfsEmul);
// a dummy URI; we are not connecting to any setup here
kosmosFileSystem.initialize(URI.create("kfs:///"), conf);
baseDir = new Path(System.getProperty("test.build.data", "/tmp" ) +
"/kfs-test");
}
@Override
protected void tearDown() throws Exception {
}
// @Test
// Check all the directory API's in KFS
public void testDirs() throws Exception {
Path subDir1 = new Path("dir.1");
// make the dir
kosmosFileSystem.mkdirs(baseDir);
assertTrue(kosmosFileSystem.isDirectory(baseDir));
kosmosFileSystem.setWorkingDirectory(baseDir);
kosmosFileSystem.mkdirs(subDir1);
assertTrue(kosmosFileSystem.isDirectory(subDir1));
assertFalse(kosmosFileSystem.exists(new Path("test1")));
assertFalse(kosmosFileSystem.isDirectory(new Path("test/dir.2")));
FileStatus[] p = kosmosFileSystem.listStatus(baseDir);
assertEquals(p.length, 1);
kosmosFileSystem.delete(baseDir, true);
assertFalse(kosmosFileSystem.exists(baseDir));
}
// @Test
// Check the file API's
public void testFiles() throws Exception {
Path subDir1 = new Path("dir.1");
Path file1 = new Path("dir.1/foo.1");
Path file2 = new Path("dir.1/foo.2");
kosmosFileSystem.mkdirs(baseDir);
assertTrue(kosmosFileSystem.isDirectory(baseDir));
kosmosFileSystem.setWorkingDirectory(baseDir);
kosmosFileSystem.mkdirs(subDir1);
FSDataOutputStream s1 = kosmosFileSystem.create(file1, true, 4096, (short) 1, (long) 4096, null);
FSDataOutputStream s2 = kosmosFileSystem.create(file2, true, 4096, (short) 1, (long) 4096, null);
s1.close();
s2.close();
FileStatus[] p = kosmosFileSystem.listStatus(subDir1);
assertEquals(p.length, 2);
kosmosFileSystem.delete(file1, true);
p = kosmosFileSystem.listStatus(subDir1);
assertEquals(p.length, 1);
kosmosFileSystem.delete(file2, true);
p = kosmosFileSystem.listStatus(subDir1);
assertEquals(p.length, 0);
kosmosFileSystem.delete(baseDir, true);
assertFalse(kosmosFileSystem.exists(baseDir));
}
// @Test
// Check file/read write
public void testFileIO() throws Exception {
Path subDir1 = new Path("dir.1");
Path file1 = new Path("dir.1/foo.1");
kosmosFileSystem.mkdirs(baseDir);
assertTrue(kosmosFileSystem.isDirectory(baseDir));
kosmosFileSystem.setWorkingDirectory(baseDir);
kosmosFileSystem.mkdirs(subDir1);
FSDataOutputStream s1 = kosmosFileSystem.create(file1, true, 4096, (short) 1, (long) 4096, null);
int bufsz = 4096;
byte[] data = new byte[bufsz];
for (int i = 0; i < data.length; i++)
data[i] = (byte) (i % 16);
// write 4 bytes and read them back; read API should return a byte per call
s1.write(32);
s1.write(32);
s1.write(32);
s1.write(32);
// write some data
s1.write(data, 0, data.length);
// flush out the changes
s1.close();
// Read the stuff back and verify it is correct
FSDataInputStream s2 = kosmosFileSystem.open(file1, 4096);
int v;
long nread = 0;
v = s2.read();
assertEquals(v, 32);
v = s2.read();
assertEquals(v, 32);
v = s2.read();
assertEquals(v, 32);
v = s2.read();
assertEquals(v, 32);
assertEquals(s2.available(), data.length);
byte[] buf = new byte[bufsz];
s2.read(buf, 0, buf.length);
nread = s2.getPos();
for (int i = 0; i < data.length; i++)
assertEquals(data[i], buf[i]);
assertEquals(s2.available(), 0);
s2.close();
// append some data to the file
try {
s1 = kosmosFileSystem.append(file1);
for (int i = 0; i < data.length; i++)
data[i] = (byte) (i % 17);
// write the data
s1.write(data, 0, data.length);
// flush out the changes
s1.close();
// read it back and validate
s2 = kosmosFileSystem.open(file1, 4096);
s2.seek(nread);
s2.read(buf, 0, buf.length);
for (int i = 0; i < data.length; i++)
assertEquals(data[i], buf[i]);
s2.close();
} catch (Exception e) {
System.out.println("append isn't supported by the underlying fs");
}
kosmosFileSystem.delete(file1, true);
assertFalse(kosmosFileSystem.exists(file1));
kosmosFileSystem.delete(subDir1, true);
assertFalse(kosmosFileSystem.exists(subDir1));
kosmosFileSystem.delete(baseDir, true);
assertFalse(kosmosFileSystem.exists(baseDir));
}
}