HADOOP-12709 Cut s3:// from trunk. Contributed by Mingliang Liu.

This commit is contained in:
Steve Loughran 2016-06-29 14:06:04 +01:00
parent 8d202f1258
commit 96fa0f848b
48 changed files with 198 additions and 3433 deletions

View File

@ -709,32 +709,16 @@
</property>
<property>
<name>fs.s3.awsAccessKeyId</name>
<description>AWS access key ID used by S3 block file system.</description>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<description>AWS secret key used by S3 block file system.</description>
</property>
<property>
<name>fs.s3.block.size</name>
<value>67108864</value>
<description>Block size to use when writing files to S3.</description>
</property>
<property>
<name>fs.s3.buffer.dir</name>
<value>${hadoop.tmp.dir}/s3</value>
<description>Determines where on the local filesystem the s3:/s3n: filesystem
<name>fs.s3n.buffer.dir</name>
<value>${hadoop.tmp.dir}/s3n</value>
<description>Determines where on the local filesystem the s3n:// filesystem
should store files before sending them to S3
(or after retrieving them from S3).
</description>
</property>
<property>
<name>fs.s3.maxRetries</name>
<name>fs.s3n.maxRetries</name>
<value>4</value>
<description>The maximum number of retries for reading or writing files to S3,
before we signal failure to the application.
@ -742,7 +726,7 @@
</property>
<property>
<name>fs.s3.sleepTimeSeconds</name>
<name>fs.s3n.sleepTimeSeconds</name>
<value>10</value>
<description>The number of seconds to sleep between each S3 retry.
</description>
@ -1377,42 +1361,6 @@
<description>Replication factor</description>
</property>
<!-- s3 File System -->
<property>
<name>s3.stream-buffer-size</name>
<value>4096</value>
<description>The size of buffer to stream files.
The size of this buffer should probably be a multiple of hardware
page size (4096 on Intel x86), and it determines how much data is
buffered during read and write operations.</description>
</property>
<property>
<name>s3.bytes-per-checksum</name>
<value>512</value>
<description>The number of bytes per checksum. Must not be larger than
s3.stream-buffer-size</description>
</property>
<property>
<name>s3.client-write-packet-size</name>
<value>65536</value>
<description>Packet size for clients to write</description>
</property>
<property>
<name>s3.blocksize</name>
<value>67108864</value>
<description>Block size</description>
</property>
<property>
<name>s3.replication</name>
<value>3</value>
<description>Replication factor</description>
</property>
<!-- s3native File System -->
<property>

View File

@ -86,20 +86,15 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
// Lots of properties not in the above classes
xmlPropsToSkipCompare.add("fs.ftp.password.localhost");
xmlPropsToSkipCompare.add("fs.ftp.user.localhost");
xmlPropsToSkipCompare.add("fs.s3.block.size");
xmlPropsToSkipCompare.add("hadoop.tmp.dir");
xmlPropsToSkipCompare.add("nfs3.mountd.port");
xmlPropsToSkipCompare.add("nfs3.server.port");
xmlPropsToSkipCompare.add("test.fs.s3.name");
xmlPropsToSkipCompare.add("test.fs.s3n.name");
// S3/S3A properties are in a different subtree.
// - org.apache.hadoop.fs.s3.S3FileSystemConfigKeys
// S3N/S3A properties are in a different subtree.
// - org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys
xmlPrefixToSkipCompare.add("fs.s3.");
xmlPrefixToSkipCompare.add("fs.s3a.");
xmlPrefixToSkipCompare.add("fs.s3n.");
xmlPrefixToSkipCompare.add("s3.");
xmlPrefixToSkipCompare.add("s3native.");
// ADL properties are in a different subtree

View File

@ -172,7 +172,7 @@ public abstract class FileSystemContractBaseTest extends TestCase {
}
public void testMkdirsWithUmask() throws Exception {
if (fs.getScheme().equals("s3") || fs.getScheme().equals("s3n")) {
if (fs.getScheme().equals("s3n")) {
// skip permission tests for S3FileSystem until HDFS-1333 is fixed.
return;
}

View File

@ -1,38 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
import org.apache.hadoop.conf.Configuration;
import org.junit.Before;
public class TestLocal_S3FileContextURI extends FileContextURIBase {
@Override
@Before
public void setUp() throws Exception {
Configuration S3Conf = new Configuration();
Configuration localConf = new Configuration();
S3Conf.set(FS_DEFAULT_NAME_DEFAULT, S3Conf.get("test.fs.s3.name"));
fc1 = FileContext.getFileContext(S3Conf);
fc2 = FileContext.getFileContext(localConf);
}
}

View File

@ -1,38 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
import org.apache.hadoop.conf.Configuration;
import org.junit.Before;
public class TestS3_LocalFileContextURI extends FileContextURIBase {
@Override
@Before
public void setUp() throws Exception {
Configuration localConf = new Configuration();
fc2 = FileContext.getFileContext(localConf);
Configuration s3conf = new Configuration();
s3conf.set(FS_DEFAULT_NAME_DEFAULT, s3conf.get("test.fs.s3.name"));
fc1 = FileContext.getFileContext(s3conf);
}
}

View File

@ -31,18 +31,6 @@
<final>true</final>
</property>
<property>
<name>test.fs.s3.name</name>
<value>s3:///</value>
<description>The name of the s3 file system for testing.</description>
</property>
<property>
<name>fs.s3.block.size</name>
<value>128</value>
<description>Size of a block in bytes.</description>
</property>
<property>
<name>fs.ftp.user.localhost</name>
<value>user</value>

View File

@ -17,7 +17,6 @@
<property><!--Loaded from job.xml--><name>hadoop.http.authentication.kerberos.keytab</name><value>${user.home}/hadoop.keytab</value></property>
<property><!--Loaded from job.xml--><name>yarn.nodemanager.keytab</name><value>/etc/krb5.keytab</value></property>
<property><!--Loaded from job.xml--><name>io.seqfile.sorter.recordlimit</name><value>1000000</value></property>
<property><!--Loaded from job.xml--><name>s3.blocksize</name><value>67108864</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.task.io.sort.factor</name><value>10</value></property>
<property><!--Loaded from job.xml--><name>yarn.nodemanager.disk-health-checker.interval-ms</name><value>120000</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.job.working.dir</name><value>hdfs://localhost:8021/user/user</value></property>
@ -27,12 +26,10 @@
<property><!--Loaded from job.xml--><name>dfs.namenode.delegation.token.renew-interval</name><value>86400000</value></property>
<property><!--Loaded from job.xml--><name>yarn.nodemanager.resource.memory-mb</name><value>8192</value></property>
<property><!--Loaded from job.xml--><name>io.map.index.interval</name><value>128</value></property>
<property><!--Loaded from job.xml--><name>s3.client-write-packet-size</name><value>65536</value></property>
<property><!--Loaded from job.xml--><name>dfs.namenode.http-address</name><value>0.0.0.0:9870</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.task.files.preserve.failedtasks</name><value>false</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.job.reduce.class</name><value>org.apache.hadoop.mapreduce.SleepJob$SleepReducer</value></property>
<property><!--Loaded from job.xml--><name>hadoop.hdfs.configuration.version</name><value>1</value></property>
<property><!--Loaded from job.xml--><name>s3.replication</name><value>3</value></property>
<property><!--Loaded from job.xml--><name>dfs.datanode.balance.bandwidthPerSec</name><value>1048576</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.reduce.shuffle.connect.timeout</name><value>180000</value></property>
<property><!--Loaded from job.xml--><name>yarn.nodemanager.aux-services</name><value>mapreduce.shuffle</value></property>
@ -105,7 +102,7 @@
<property><!--Loaded from job.xml--><name>mapreduce.job.maxtaskfailures.per.tracker</name><value>4</value></property>
<property><!--Loaded from job.xml--><name>ipc.client.connection.maxidletime</name><value>10000</value></property>
<property><!--Loaded from job.xml--><name>dfs.blockreport.intervalMsec</name><value>21600000</value></property>
<property><!--Loaded from job.xml--><name>fs.s3.sleepTimeSeconds</name><value>10</value></property>
<property><!--Loaded from job.xml--><name>fs.s3n.sleepTimeSeconds</name><value>10</value></property>
<property><!--Loaded from job.xml--><name>dfs.namenode.replication.considerLoad</name><value>true</value></property>
<property><!--Loaded from job.xml--><name>dfs.client.block.write.retries</name><value>3</value></property>
<property><!--Loaded from job.xml--><name>hadoop.proxyuser.user.groups</name><value>users</value></property>
@ -117,7 +114,6 @@
<property><!--Loaded from job.xml--><name>ipc.client.tcpnodelay</name><value>false</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.map.output.value.class</name><value>org.apache.hadoop.io.NullWritable</value></property>
<property><!--Loaded from job.xml--><name>dfs.namenode.accesstime.precision</name><value>3600000</value></property>
<property><!--Loaded from job.xml--><name>s3.stream-buffer-size</name><value>4096</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.jobtracker.tasktracker.maxblacklists</name><value>4</value></property>
<property><!--Loaded from Unknown--><name>rpc.engine.com.google.protobuf.BlockingService</name><value>org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.job.jvm.numtasks</name><value>1</value></property>
@ -136,7 +132,7 @@
<property><!--Loaded from job.xml--><name>kfs.stream-buffer-size</name><value>4096</value></property>
<property><!--Loaded from job.xml--><name>fs.har.impl</name><value>org.apache.hadoop.fs.HarFileSystem</value></property>
<property><!--Loaded from job.xml--><name>hadoop.security.authentication</name><value>simple</value></property>
<property><!--Loaded from job.xml--><name>fs.s3.buffer.dir</name><value>${hadoop.tmp.dir}/s3</value></property>
<property><!--Loaded from job.xml--><name>fs.s3n.buffer.dir</name><value>${hadoop.tmp.dir}/s3n</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.jobtracker.taskscheduler</name><value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value></property>
<property><!--Loaded from job.xml--><name>yarn.app.mapreduce.am.job.task.listener.thread-count</name><value>30</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.job.reduces</name><value>1</value></property>
@ -205,7 +201,7 @@
<property><!--Loaded from job.xml--><name>mapreduce.job.dir</name><value>/tmp/hadoop-yarn/staging/user/.staging/job_1329348432655_0001</value></property>
<property><!--Loaded from job.xml--><name>io.map.index.skip</name><value>0</value></property>
<property><!--Loaded from job.xml--><name>net.topology.node.switch.mapping.impl</name><value>org.apache.hadoop.net.ScriptBasedMapping</value></property>
<property><!--Loaded from job.xml--><name>fs.s3.maxRetries</name><value>4</value></property>
<property><!--Loaded from job.xml--><name>fs.s3n.maxRetries</name><value>4</value></property>
<property><!--Loaded from job.xml--><name>s3native.client-write-packet-size</name><value>65536</value></property>
<property><!--Loaded from job.xml--><name>yarn.resourcemanager.amliveliness-monitor.interval-ms</name><value>1000</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.reduce.speculative</name><value>false</value></property>
@ -263,11 +259,9 @@
<property><!--Loaded from job.xml--><name>ipc.client.idlethreshold</name><value>4000</value></property>
<property><!--Loaded from job.xml--><name>ipc.server.tcpnodelay</name><value>false</value></property>
<property><!--Loaded from job.xml--><name>ftp.bytes-per-checksum</name><value>512</value></property>
<property><!--Loaded from job.xml--><name>s3.bytes-per-checksum</name><value>512</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.job.speculative.slowtaskthreshold</name><value>1.0</value></property>
<property><!--Loaded from job.xml--><name>yarn.nodemanager.localizer.cache.target-size-mb</name><value>1</value></property>
<property><!--Loaded from job.xml--><name>yarn.nodemanager.remote-app-log-dir</name><value>/tmp/logs</value></property>
<property><!--Loaded from job.xml--><name>fs.s3.block.size</name><value>67108864</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.job.queuename</name><value>default</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.sleepjob.reduce.sleep.time</name><value>1</value></property>
<property><!--Loaded from job.xml--><name>hadoop.rpc.protection</name><value>authentication</value></property>
@ -321,7 +315,6 @@
<property><!--Loaded from job.xml--><name>dfs.datanode.address</name><value>0.0.0.0:9866</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.map.skip.maxrecords</name><value>0</value></property>
<property><!--Loaded from job.xml--><name>dfs.datanode.https.address</name><value>0.0.0.0:9865</value></property>
<property><!--Loaded from job.xml--><name>fs.s3.impl</name><value>org.apache.hadoop.fs.s3.S3FileSystem</value></property>
<property><!--Loaded from job.xml--><name>file.replication</name><value>1</value></property>
<property><!--Loaded from job.xml--><name>yarn.resourcemanager.resource-tracker.address</name><value>0.0.0.0:8025</value></property>
<property><!--Loaded from job.xml--><name>mapreduce.jobtracker.restart.recover</name><value>false</value></property>

View File

@ -1,52 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Holds metadata about a block of data being stored in a {@link FileSystemStore}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class Block {
private long id;
private long length;
public Block(long id, long length) {
this.id = id;
this.length = length;
}
public long getId() {
return id;
}
public long getLength() {
return length;
}
@Override
public String toString() {
return "Block[" + id + ", " + length + "]";
}
}

View File

@ -1,67 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
/**
* A facility for storing and retrieving {@link INode}s and {@link Block}s.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface FileSystemStore {
void initialize(URI uri, Configuration conf) throws IOException;
String getVersion() throws IOException;
void storeINode(Path path, INode inode) throws IOException;
void storeBlock(Block block, File file) throws IOException;
boolean inodeExists(Path path) throws IOException;
boolean blockExists(long blockId) throws IOException;
INode retrieveINode(Path path) throws IOException;
File retrieveBlock(Block block, long byteRangeStart) throws IOException;
void deleteINode(Path path) throws IOException;
void deleteBlock(Block block) throws IOException;
Set<Path> listSubPaths(Path path) throws IOException;
Set<Path> listDeepSubPaths(Path path) throws IOException;
/**
* Delete everything. Used for testing.
* @throws IOException on any problem
*/
void purge() throws IOException;
/**
* Diagnostic method to dump all INodes to the console.
* @throws IOException on any problem
*/
void dump() throws IOException;
}

View File

@ -1,128 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.IOUtils;
/**
* Holds file metadata including type (regular file, or directory),
* and the list of blocks that are pointers to the data.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class INode {
enum FileType {
DIRECTORY, FILE
}
public static final FileType[] FILE_TYPES = {
FileType.DIRECTORY,
FileType.FILE
};
public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null);
private FileType fileType;
private Block[] blocks;
public INode(FileType fileType, Block[] blocks) {
this.fileType = fileType;
if (isDirectory() && blocks != null) {
throw new IllegalArgumentException("A directory cannot contain blocks.");
}
this.blocks = blocks;
}
public Block[] getBlocks() {
return blocks;
}
public FileType getFileType() {
return fileType;
}
public boolean isDirectory() {
return fileType == FileType.DIRECTORY;
}
public boolean isFile() {
return fileType == FileType.FILE;
}
public long getSerializedLength() {
return 1L + (blocks == null ? 0 : 4 + blocks.length * 16);
}
public InputStream serialize() throws IOException {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bytes);
try {
out.writeByte(fileType.ordinal());
if (isFile()) {
out.writeInt(blocks.length);
for (int i = 0; i < blocks.length; i++) {
out.writeLong(blocks[i].getId());
out.writeLong(blocks[i].getLength());
}
}
out.close();
out = null;
} finally {
IOUtils.closeStream(out);
}
return new ByteArrayInputStream(bytes.toByteArray());
}
public static INode deserialize(InputStream in) throws IOException {
if (in == null) {
return null;
}
DataInputStream dataIn = new DataInputStream(in);
FileType fileType = INode.FILE_TYPES[dataIn.readByte()];
switch (fileType) {
case DIRECTORY:
in.close();
return INode.DIRECTORY_INODE;
case FILE:
int numBlocks = dataIn.readInt();
Block[] blocks = new Block[numBlocks];
for (int i = 0; i < numBlocks; i++) {
long id = dataIn.readLong();
long length = dataIn.readLong();
blocks[i] = new Block(id, length);
}
in.close();
return new INode(fileType, blocks);
default:
throw new IllegalArgumentException("Cannot deserialize inode.");
}
}
}

View File

@ -1,449 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3.INode.FileType;
import org.jets3t.service.S3Service;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.security.AWSCredentials;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class Jets3tFileSystemStore implements FileSystemStore {
private static final String FILE_SYSTEM_NAME = "fs";
private static final String FILE_SYSTEM_VALUE = "Hadoop";
private static final String FILE_SYSTEM_TYPE_NAME = "fs-type";
private static final String FILE_SYSTEM_TYPE_VALUE = "block";
private static final String FILE_SYSTEM_VERSION_NAME = "fs-version";
private static final String FILE_SYSTEM_VERSION_VALUE = "1";
private static final Map<String, Object> METADATA =
new HashMap<String, Object>();
static {
METADATA.put(FILE_SYSTEM_NAME, FILE_SYSTEM_VALUE);
METADATA.put(FILE_SYSTEM_TYPE_NAME, FILE_SYSTEM_TYPE_VALUE);
METADATA.put(FILE_SYSTEM_VERSION_NAME, FILE_SYSTEM_VERSION_VALUE);
}
private static final String PATH_DELIMITER = Path.SEPARATOR;
private static final String BLOCK_PREFIX = "block_";
private Configuration conf;
private S3Service s3Service;
private S3Bucket bucket;
private int bufferSize;
private static final Log LOG =
LogFactory.getLog(Jets3tFileSystemStore.class.getName());
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
this.conf = conf;
S3Credentials s3Credentials = new S3Credentials();
s3Credentials.initialize(uri, conf);
try {
AWSCredentials awsCredentials =
new AWSCredentials(s3Credentials.getAccessKey(),
s3Credentials.getSecretAccessKey());
this.s3Service = new RestS3Service(awsCredentials);
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
}
bucket = new S3Bucket(uri.getHost());
this.bufferSize = conf.getInt(
S3FileSystemConfigKeys.S3_STREAM_BUFFER_SIZE_KEY,
S3FileSystemConfigKeys.S3_STREAM_BUFFER_SIZE_DEFAULT
);
}
@Override
public String getVersion() throws IOException {
return FILE_SYSTEM_VERSION_VALUE;
}
private void delete(String key) throws IOException {
try {
s3Service.deleteObject(bucket, key);
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
}
}
@Override
public void deleteINode(Path path) throws IOException {
delete(pathToKey(path));
}
@Override
public void deleteBlock(Block block) throws IOException {
delete(blockToKey(block));
}
@Override
public boolean inodeExists(Path path) throws IOException {
String key = pathToKey(path);
InputStream in = get(key, true);
if (in == null) {
if (isRoot(key)) {
storeINode(path, INode.DIRECTORY_INODE);
return true;
} else {
return false;
}
}
in.close();
return true;
}
@Override
public boolean blockExists(long blockId) throws IOException {
InputStream in = get(blockToKey(blockId), false);
if (in == null) {
return false;
}
in.close();
return true;
}
private InputStream get(String key, boolean checkMetadata)
throws IOException {
try {
S3Object object = s3Service.getObject(bucket.getName(), key);
if (checkMetadata) {
checkMetadata(object);
}
return object.getDataInputStream();
} catch (S3ServiceException e) {
if ("NoSuchKey".equals(e.getS3ErrorCode())) {
return null;
}
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
} catch (ServiceException e) {
handleServiceException(e);
return null;
}
}
private InputStream get(String key, long byteRangeStart) throws IOException {
try {
S3Object object = s3Service.getObject(bucket, key, null, null, null,
null, byteRangeStart, null);
return object.getDataInputStream();
} catch (S3ServiceException e) {
if ("NoSuchKey".equals(e.getS3ErrorCode())) {
return null;
}
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
} catch (ServiceException e) {
handleServiceException(e);
return null;
}
}
private void checkMetadata(S3Object object) throws S3FileSystemException,
S3ServiceException {
String name = (String) object.getMetadata(FILE_SYSTEM_NAME);
if (!FILE_SYSTEM_VALUE.equals(name)) {
throw new S3FileSystemException("Not a Hadoop S3 file.");
}
String type = (String) object.getMetadata(FILE_SYSTEM_TYPE_NAME);
if (!FILE_SYSTEM_TYPE_VALUE.equals(type)) {
throw new S3FileSystemException("Not a block file.");
}
String dataVersion = (String) object.getMetadata(FILE_SYSTEM_VERSION_NAME);
if (!FILE_SYSTEM_VERSION_VALUE.equals(dataVersion)) {
throw new VersionMismatchException(FILE_SYSTEM_VERSION_VALUE,
dataVersion);
}
}
@Override
public INode retrieveINode(Path path) throws IOException {
String key = pathToKey(path);
InputStream in = get(key, true);
if (in == null && isRoot(key)) {
storeINode(path, INode.DIRECTORY_INODE);
return INode.DIRECTORY_INODE;
}
return INode.deserialize(in);
}
@Override
public File retrieveBlock(Block block, long byteRangeStart)
throws IOException {
File fileBlock = null;
InputStream in = null;
OutputStream out = null;
try {
fileBlock = newBackupFile();
String blockId = blockToKey(block);
in = get(blockId, byteRangeStart);
if (in == null) {
throw new IOException("Block missing from S3 store: " + blockId);
}
out = new BufferedOutputStream(new FileOutputStream(fileBlock));
byte[] buf = new byte[bufferSize];
int numRead;
while ((numRead = in.read(buf)) >= 0) {
out.write(buf, 0, numRead);
}
return fileBlock;
} catch (IOException e) {
// close output stream to file then delete file
closeQuietly(out);
out = null; // to prevent a second close
if (fileBlock != null) {
boolean b = fileBlock.delete();
if (!b) {
LOG.warn("Ignoring failed delete");
}
}
throw e;
} finally {
closeQuietly(out);
closeQuietly(in);
}
}
private File newBackupFile() throws IOException {
File dir = new File(conf.get("fs.s3.buffer.dir"));
if (!dir.exists() && !dir.mkdirs()) {
throw new IOException("Cannot create S3 buffer directory: " + dir);
}
File result = File.createTempFile("input-", ".tmp", dir);
result.deleteOnExit();
return result;
}
@Override
public Set<Path> listSubPaths(Path path) throws IOException {
try {
String prefix = pathToKey(path);
if (!prefix.endsWith(PATH_DELIMITER)) {
prefix += PATH_DELIMITER;
}
S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, PATH_DELIMITER);
Set<Path> prefixes = new TreeSet<Path>();
for (int i = 0; i < objects.length; i++) {
prefixes.add(keyToPath(objects[i].getKey()));
}
prefixes.remove(path);
return prefixes;
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
}
}
@Override
public Set<Path> listDeepSubPaths(Path path) throws IOException {
try {
String prefix = pathToKey(path);
if (!prefix.endsWith(PATH_DELIMITER)) {
prefix += PATH_DELIMITER;
}
S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null);
Set<Path> prefixes = new TreeSet<Path>();
for (int i = 0; i < objects.length; i++) {
prefixes.add(keyToPath(objects[i].getKey()));
}
prefixes.remove(path);
return prefixes;
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
}
}
private void put(String key, InputStream in, long length, boolean storeMetadata)
throws IOException {
try {
S3Object object = new S3Object(key);
object.setDataInputStream(in);
object.setContentType("binary/octet-stream");
object.setContentLength(length);
if (storeMetadata) {
object.addAllMetadata(METADATA);
}
s3Service.putObject(bucket, object);
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
}
}
@Override
public void storeINode(Path path, INode inode) throws IOException {
put(pathToKey(path), inode.serialize(), inode.getSerializedLength(), true);
}
@Override
public void storeBlock(Block block, File file) throws IOException {
BufferedInputStream in = null;
try {
in = new BufferedInputStream(new FileInputStream(file));
put(blockToKey(block), in, block.getLength(), false);
} finally {
closeQuietly(in);
}
}
private void closeQuietly(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException e) {
// ignore
}
}
}
private String pathToKey(Path path) {
if (!path.isAbsolute()) {
throw new IllegalArgumentException("Path must be absolute: " + path);
}
return path.toUri().getPath();
}
private Path keyToPath(String key) {
return new Path(key);
}
private String blockToKey(long blockId) {
return BLOCK_PREFIX + blockId;
}
private String blockToKey(Block block) {
return blockToKey(block.getId());
}
private boolean isRoot(String key) {
return key.isEmpty() || key.equals("/");
}
@Override
public void purge() throws IOException {
try {
S3Object[] objects = s3Service.listObjects(bucket.getName());
for (int i = 0; i < objects.length; i++) {
s3Service.deleteObject(bucket, objects[i].getKey());
}
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
}
}
@Override
public void dump() throws IOException {
StringBuilder sb = new StringBuilder("S3 Filesystem, ");
sb.append(bucket.getName()).append("\n");
try {
S3Object[] objects = s3Service.listObjects(bucket.getName(), PATH_DELIMITER, null);
for (int i = 0; i < objects.length; i++) {
Path path = keyToPath(objects[i].getKey());
sb.append(path).append("\n");
INode m = retrieveINode(path);
sb.append("\t").append(m.getFileType()).append("\n");
if (m.getFileType() == FileType.DIRECTORY) {
continue;
}
for (int j = 0; j < m.getBlocks().length; j++) {
sb.append("\t").append(m.getBlocks()[j]).append("\n");
}
}
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
}
System.out.println(sb);
}
private void handleServiceException(ServiceException e) throws IOException {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got ServiceException with Error code: " + e.getErrorCode() + ";and Error message: " + e.getErrorMessage());
}
}
}
}

View File

@ -1,291 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.jets3t.service.S3Service;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.security.AWSCredentials;
/**
* <p>
* This class is a tool for migrating data from an older to a newer version
* of an S3 filesystem.
* </p>
* <p>
* All files in the filesystem are migrated by re-writing the block metadata
* - no datafiles are touched.
* </p>
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class MigrationTool extends Configured implements Tool {
private S3Service s3Service;
private S3Bucket bucket;
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new MigrationTool(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Usage: MigrationTool <S3 file system URI>");
System.err.println("\t<S3 file system URI>\tfilesystem to migrate");
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
URI uri = URI.create(args[0]);
initialize(uri);
FileSystemStore newStore = new Jets3tFileSystemStore();
newStore.initialize(uri, getConf());
if (get("%2F") != null) {
System.err.println("Current version number is [unversioned].");
System.err.println("Target version number is " +
newStore.getVersion() + ".");
Store oldStore = new UnversionedStore();
migrate(oldStore, newStore);
return 0;
} else {
S3Object root = get("/");
if (root != null) {
String version = (String) root.getMetadata("fs-version");
if (version == null) {
System.err.println("Can't detect version - exiting.");
} else {
String newVersion = newStore.getVersion();
System.err.println("Current version number is " + version + ".");
System.err.println("Target version number is " + newVersion + ".");
if (version.equals(newStore.getVersion())) {
System.err.println("No migration required.");
return 0;
}
// use version number to create Store
//Store oldStore = ...
//migrate(oldStore, newStore);
System.err.println("Not currently implemented.");
return 0;
}
}
System.err.println("Can't detect version - exiting.");
return 0;
}
}
public void initialize(URI uri) throws IOException {
try {
String accessKey = null;
String secretAccessKey = null;
String userInfo = uri.getUserInfo();
if (userInfo != null) {
int index = userInfo.indexOf(':');
if (index != -1) {
accessKey = userInfo.substring(0, index);
secretAccessKey = userInfo.substring(index + 1);
} else {
accessKey = userInfo;
}
}
if (accessKey == null) {
accessKey = getConf().get("fs.s3.awsAccessKeyId");
}
if (secretAccessKey == null) {
secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey");
}
if (accessKey == null && secretAccessKey == null) {
throw new IllegalArgumentException("AWS " +
"Access Key ID and Secret Access Key " +
"must be specified as the username " +
"or password (respectively) of a s3 URL, " +
"or by setting the " +
"fs.s3.awsAccessKeyId or " +
"fs.s3.awsSecretAccessKey properties (respectively).");
} else if (accessKey == null) {
throw new IllegalArgumentException("AWS " +
"Access Key ID must be specified " +
"as the username of a s3 URL, or by setting the " +
"fs.s3.awsAccessKeyId property.");
} else if (secretAccessKey == null) {
throw new IllegalArgumentException("AWS " +
"Secret Access Key must be specified " +
"as the password of a s3 URL, or by setting the " +
"fs.s3.awsSecretAccessKey property.");
}
AWSCredentials awsCredentials =
new AWSCredentials(accessKey, secretAccessKey);
this.s3Service = new RestS3Service(awsCredentials);
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
}
bucket = new S3Bucket(uri.getHost());
}
private void migrate(Store oldStore, FileSystemStore newStore)
throws IOException {
for (Path path : oldStore.listAllPaths()) {
INode inode = oldStore.retrieveINode(path);
oldStore.deleteINode(path);
newStore.storeINode(path, inode);
}
}
private S3Object get(String key) {
try {
return s3Service.getObject(bucket.getName(), key);
} catch (S3ServiceException e) {
if ("NoSuchKey".equals(e.getS3ErrorCode())) {
return null;
}
}
return null;
}
interface Store {
Set<Path> listAllPaths() throws IOException;
INode retrieveINode(Path path) throws IOException;
void deleteINode(Path path) throws IOException;
}
class UnversionedStore implements Store {
@Override
public Set<Path> listAllPaths() throws IOException {
try {
String prefix = urlEncode(Path.SEPARATOR);
S3Object[] objects = s3Service.listObjects(bucket.getName(), prefix, null);
Set<Path> prefixes = new TreeSet<Path>();
for (int i = 0; i < objects.length; i++) {
prefixes.add(keyToPath(objects[i].getKey()));
}
return prefixes;
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
}
}
@Override
public void deleteINode(Path path) throws IOException {
delete(pathToKey(path));
}
private void delete(String key) throws IOException {
try {
s3Service.deleteObject(bucket, key);
} catch (S3ServiceException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
}
}
@Override
public INode retrieveINode(Path path) throws IOException {
return INode.deserialize(get(pathToKey(path)));
}
private InputStream get(String key) throws IOException {
try {
S3Object object = s3Service.getObject(bucket.getName(), key);
return object.getDataInputStream();
} catch (S3ServiceException e) {
if ("NoSuchKey".equals(e.getS3ErrorCode())) {
return null;
}
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new S3Exception(e);
} catch (ServiceException e) {
return null;
}
}
private String pathToKey(Path path) {
if (!path.isAbsolute()) {
throw new IllegalArgumentException("Path must be absolute: " + path);
}
return urlEncode(path.toUri().getPath());
}
private Path keyToPath(String key) {
return new Path(urlDecode(key));
}
private String urlEncode(String s) {
try {
return URLEncoder.encode(s, "UTF-8");
} catch (UnsupportedEncodingException e) {
// Should never happen since every implementation of the Java Platform
// is required to support UTF-8.
// See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
throw new IllegalStateException(e);
}
}
private String urlDecode(String s) {
try {
return URLDecoder.decode(s, "UTF-8");
} catch (UnsupportedEncodingException e) {
// Should never happen since every implementation of the Java Platform
// is required to support UTF-8.
// See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
throw new IllegalStateException(e);
}
}
}
}

View File

@ -1,102 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
/**
* <p>
* Extracts AWS credentials from the filesystem URI or configuration.
* </p>
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class S3Credentials {
private String accessKey;
private String secretAccessKey;
/**
* @param uri bucket URI optionally containing username and password.
* @param conf configuration
* @throws IllegalArgumentException if credentials for S3 cannot be
* determined.
* @throws IOException if credential providers are misconfigured and we have
* to talk to them.
*/
public void initialize(URI uri, Configuration conf) throws IOException {
if (uri.getHost() == null) {
throw new IllegalArgumentException("Invalid hostname in URI " + uri);
}
S3xLoginHelper.Login login =
S3xLoginHelper.extractLoginDetailsWithWarnings(uri);
if (login.hasLogin()) {
accessKey = login.getUser();
secretAccessKey = login.getPassword();
}
String scheme = uri.getScheme();
String accessKeyProperty = String.format("fs.%s.awsAccessKeyId", scheme);
String secretAccessKeyProperty =
String.format("fs.%s.awsSecretAccessKey", scheme);
if (accessKey == null) {
accessKey = conf.getTrimmed(accessKeyProperty);
}
if (secretAccessKey == null) {
final char[] pass = conf.getPassword(secretAccessKeyProperty);
if (pass != null) {
secretAccessKey = (new String(pass)).trim();
}
}
if (accessKey == null && secretAccessKey == null) {
throw new IllegalArgumentException("AWS " +
"Access Key ID and Secret Access " +
"Key must be specified " +
"by setting the " +
accessKeyProperty + " and " +
secretAccessKeyProperty +
" properties (respectively).");
} else if (accessKey == null) {
throw new IllegalArgumentException("AWS " +
"Access Key ID must be specified " +
"by setting the " +
accessKeyProperty + " property.");
} else if (secretAccessKey == null) {
throw new IllegalArgumentException("AWS " +
"Secret Access Key must be " +
"specified by setting the " +
secretAccessKeyProperty +
" property.");
}
}
public String getAccessKey() {
return accessKey;
}
public String getSecretAccessKey() {
return secretAccessKey;
}
}

View File

@ -1,516 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.Progressable;
/**
* A block-based {@link FileSystem} backed by
* <a href="http://aws.amazon.com/s3">Amazon S3</a>.
*
* @see NativeS3FileSystem
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class S3FileSystem extends FileSystem {
private URI uri;
private FileSystemStore store;
private Path workingDir;
public S3FileSystem() {
// set store in initialize()
}
public S3FileSystem(FileSystemStore store) {
this.store = store;
}
/**
* Return the protocol scheme for the FileSystem.
*
* @return <code>s3</code>
*/
@Override
public String getScheme() {
return "s3";
}
@Override
public URI getUri() {
return uri;
}
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
if (store == null) {
store = createDefaultStore(conf);
}
store.initialize(uri, conf);
setConf(conf);
this.uri = S3xLoginHelper.buildFSURI(uri);
this.workingDir =
new Path("/user", System.getProperty("user.name")).makeQualified(this);
}
private static FileSystemStore createDefaultStore(Configuration conf) {
FileSystemStore store = new Jets3tFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt("fs.s3.maxRetries", 4),
conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(IOException.class, basePolicy);
exceptionToPolicyMap.put(S3Exception.class, basePolicy);
RetryPolicy methodPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
methodNameToPolicyMap.put("storeBlock", methodPolicy);
methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
store, methodNameToPolicyMap);
}
@Override
public Path getWorkingDirectory() {
return workingDir;
}
@Override
public void setWorkingDirectory(Path dir) {
workingDir = makeAbsolute(dir);
}
private Path makeAbsolute(Path path) {
if (path.isAbsolute()) {
return path;
}
return new Path(workingDir, path);
}
/**
* Check that a Path belongs to this FileSystem.
* Unlike the superclass, this version does not look at authority,
* only hostnames.
* @param path to check
* @throws IllegalArgumentException if there is an FS mismatch
*/
@Override
protected void checkPath(Path path) {
S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort());
}
@Override
protected URI canonicalizeUri(URI rawUri) {
return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort());
}
/**
* @param permission Currently ignored.
*/
@Override
public boolean mkdirs(Path path, FsPermission permission) throws IOException {
Path absolutePath = makeAbsolute(path);
List<Path> paths = new ArrayList<Path>();
do {
paths.add(0, absolutePath);
absolutePath = absolutePath.getParent();
} while (absolutePath != null);
boolean result = true;
for (int i = 0; i < paths.size(); i++) {
Path p = paths.get(i);
try {
result &= mkdir(p);
} catch(FileAlreadyExistsException e) {
if (i + 1 < paths.size()) {
throw new ParentNotDirectoryException(e.getMessage());
}
throw e;
}
}
return result;
}
private boolean mkdir(Path path) throws IOException {
Path absolutePath = makeAbsolute(path);
INode inode = store.retrieveINode(absolutePath);
if (inode == null) {
store.storeINode(absolutePath, INode.DIRECTORY_INODE);
} else if (inode.isFile()) {
throw new FileAlreadyExistsException(String.format(
"Can't make directory for path %s since it is a file.",
absolutePath));
}
return true;
}
@Override
public boolean isFile(Path path) throws IOException {
INode inode = store.retrieveINode(makeAbsolute(path));
if (inode == null) {
return false;
}
return inode.isFile();
}
private INode checkFile(Path path) throws IOException {
INode inode = store.retrieveINode(makeAbsolute(path));
String message = String.format("No such file: '%s'", path.toString());
if (inode == null) {
throw new FileNotFoundException(message + " does not exist");
}
if (inode.isDirectory()) {
throw new FileNotFoundException(message + " is a directory");
}
return inode;
}
@Override
public FileStatus[] listStatus(Path f) throws IOException {
Path absolutePath = makeAbsolute(f);
INode inode = store.retrieveINode(absolutePath);
if (inode == null) {
throw new FileNotFoundException("File " + f + " does not exist.");
}
if (inode.isFile()) {
return new FileStatus[] {
new S3FileStatus(f.makeQualified(this), inode)
};
}
ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
for (Path p : store.listSubPaths(absolutePath)) {
ret.add(getFileStatus(p.makeQualified(this)));
}
return ret.toArray(new FileStatus[0]);
}
/** This optional operation is not yet supported. */
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new IOException("Not supported");
}
/**
* @param permission Currently ignored.
*/
@Override
public FSDataOutputStream create(Path file, FsPermission permission,
boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress)
throws IOException {
INode inode = store.retrieveINode(makeAbsolute(file));
if (inode != null) {
if (overwrite && !inode.isDirectory()) {
delete(file, true);
} else {
String message = String.format("File already exists: '%s'", file);
if (inode.isDirectory()) {
message = message + " is a directory";
}
throw new FileAlreadyExistsException(message);
}
} else {
Path parent = file.getParent();
if (parent != null) {
if (!mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent.toString());
}
}
}
return new FSDataOutputStream
(new S3OutputStream(getConf(), store, makeAbsolute(file),
blockSize, progress, bufferSize),
statistics);
}
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
INode inode = checkFile(path);
return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
statistics));
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
Path absoluteSrc = makeAbsolute(src);
final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
INode srcINode = store.retrieveINode(absoluteSrc);
boolean debugEnabled = LOG.isDebugEnabled();
if (srcINode == null) {
// src path doesn't exist
if (debugEnabled) {
LOG.debug(debugPreamble + "returning false as src does not exist");
}
return false;
}
Path absoluteDst = makeAbsolute(dst);
//validate the parent dir of the destination
Path dstParent = absoluteDst.getParent();
if (dstParent != null) {
//if the dst parent is not root, make sure it exists
INode dstParentINode = store.retrieveINode(dstParent);
if (dstParentINode == null) {
// dst parent doesn't exist
if (debugEnabled) {
LOG.debug(debugPreamble +
"returning false as dst parent does not exist");
}
return false;
}
if (dstParentINode.isFile()) {
// dst parent exists but is a file
if (debugEnabled) {
LOG.debug(debugPreamble +
"returning false as dst parent exists and is a file");
}
return false;
}
}
//get status of source
boolean srcIsFile = srcINode.isFile();
INode dstINode = store.retrieveINode(absoluteDst);
boolean destExists = dstINode != null;
boolean destIsDir = destExists && !dstINode.isFile();
if (srcIsFile) {
//source is a simple file
if (destExists) {
if (destIsDir) {
//outcome #1 dest exists and is dir -filename to subdir of dest
if (debugEnabled) {
LOG.debug(debugPreamble +
"copying src file under dest dir to " + absoluteDst);
}
absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
} else {
//outcome #2 dest it's a file: fail iff different from src
boolean renamingOnToSelf = absoluteSrc.equals(absoluteDst);
if (debugEnabled) {
LOG.debug(debugPreamble +
"copying file onto file, outcome is " + renamingOnToSelf);
}
return renamingOnToSelf;
}
} else {
// #3 dest does not exist: use dest as path for rename
if (debugEnabled) {
LOG.debug(debugPreamble +
"copying file onto file");
}
}
} else {
//here the source exists and is a directory
// outcomes (given we know the parent dir exists if we get this far)
// #1 destination is a file: fail
// #2 destination is a directory: create a new dir under that one
// #3 destination doesn't exist: create a new dir with that name
// #3 and #4 are only allowed if the dest path is not == or under src
if (destExists) {
if (!destIsDir) {
// #1 destination is a file: fail
if (debugEnabled) {
LOG.debug(debugPreamble +
"returning false as src is a directory, but not dest");
}
return false;
} else {
// the destination dir exists
// destination for rename becomes a subdir of the target name
absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
if (debugEnabled) {
LOG.debug(debugPreamble +
"copying src dir under dest dir to " + absoluteDst);
}
}
}
//the final destination directory is now know, so validate it for
//illegal moves
if (absoluteSrc.equals(absoluteDst)) {
//you can't rename a directory onto itself
if (debugEnabled) {
LOG.debug(debugPreamble +
"Dest==source && isDir -failing");
}
return false;
}
if (absoluteDst.toString().startsWith(absoluteSrc.toString() + "/")) {
//you can't move a directory under itself
if (debugEnabled) {
LOG.debug(debugPreamble +
"dst is equal to or under src dir -failing");
}
return false;
}
}
//here the dest path is set up -so rename
return renameRecursive(absoluteSrc, absoluteDst);
}
private boolean renameRecursive(Path src, Path dst) throws IOException {
INode srcINode = store.retrieveINode(src);
store.storeINode(dst, srcINode);
store.deleteINode(src);
if (srcINode.isDirectory()) {
for (Path oldSrc : store.listDeepSubPaths(src)) {
INode inode = store.retrieveINode(oldSrc);
if (inode == null) {
return false;
}
String oldSrcPath = oldSrc.toUri().getPath();
String srcPath = src.toUri().getPath();
String dstPath = dst.toUri().getPath();
Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
store.storeINode(newDst, inode);
store.deleteINode(oldSrc);
}
}
return true;
}
@Override
public boolean delete(Path path, boolean recursive) throws IOException {
Path absolutePath = makeAbsolute(path);
INode inode = store.retrieveINode(absolutePath);
if (inode == null) {
return false;
}
if (inode.isFile()) {
store.deleteINode(absolutePath);
for (Block block: inode.getBlocks()) {
store.deleteBlock(block);
}
} else {
FileStatus[] contents = null;
try {
contents = listStatus(absolutePath);
} catch(FileNotFoundException fnfe) {
return false;
}
if ((contents.length !=0) && (!recursive)) {
throw new IOException("Directory " + path.toString()
+ " is not empty.");
}
for (FileStatus p:contents) {
if (!delete(p.getPath(), recursive)) {
return false;
}
}
store.deleteINode(absolutePath);
}
return true;
}
/**
* FileStatus for S3 file systems.
*/
@Override
public FileStatus getFileStatus(Path f) throws IOException {
INode inode = store.retrieveINode(makeAbsolute(f));
if (inode == null) {
throw new FileNotFoundException(f + ": No such file or directory.");
}
return new S3FileStatus(f.makeQualified(this), inode);
}
@Override
public long getDefaultBlockSize() {
return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
}
@Override
public String getCanonicalServiceName() {
// Does not support Token
return null;
}
// diagnostic methods
void dump() throws IOException {
store.dump();
}
void purge() throws IOException {
store.purge();
}
private static class S3FileStatus extends FileStatus {
S3FileStatus(Path f, INode inode) throws IOException {
super(findLength(inode), inode.isDirectory(), 1,
findBlocksize(inode), 0, f);
}
private static long findLength(INode inode) {
if (!inode.isDirectory()) {
long length = 0L;
for (Block block : inode.getBlocks()) {
length += block.getLength();
}
return length;
}
return 0;
}
private static long findBlocksize(INode inode) {
final Block[] ret = inode.getBlocks();
return ret == null ? 0L : ret[0].getLength();
}
}
}

View File

@ -1,47 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
/**
* This class contains constants for configuration keys used
* in the s3 file system.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class S3FileSystemConfigKeys extends CommonConfigurationKeys {
public static final String S3_BLOCK_SIZE_KEY = "s3.blocksize";
public static final long S3_BLOCK_SIZE_DEFAULT = 64*1024*1024;
public static final String S3_REPLICATION_KEY = "s3.replication";
public static final short S3_REPLICATION_DEFAULT = 1;
public static final String S3_STREAM_BUFFER_SIZE_KEY =
"s3.stream-buffer-size";
public static final int S3_STREAM_BUFFER_SIZE_DEFAULT = 4096;
public static final String S3_BYTES_PER_CHECKSUM_KEY =
"s3.bytes-per-checksum";
public static final int S3_BYTES_PER_CHECKSUM_DEFAULT = 512;
public static final String S3_CLIENT_WRITE_PACKET_SIZE_KEY =
"s3.client-write-packet-size";
public static final int S3_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
}

View File

@ -1,36 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Thrown when there is a fatal exception while using {@link S3FileSystem}.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class S3FileSystemException extends IOException {
private static final long serialVersionUID = 1L;
public S3FileSystemException(String message) {
super(message);
}
}

View File

@ -1,220 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.EOFException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class S3InputStream extends FSInputStream {
private FileSystemStore store;
private Block[] blocks;
private boolean closed;
private long fileLength;
private long pos = 0;
private File blockFile;
private DataInputStream blockStream;
private long blockEnd = -1;
private FileSystem.Statistics stats;
private static final Log LOG =
LogFactory.getLog(S3InputStream.class.getName());
@Deprecated
public S3InputStream(Configuration conf, FileSystemStore store,
INode inode) {
this(conf, store, inode, null);
}
public S3InputStream(Configuration conf, FileSystemStore store,
INode inode, FileSystem.Statistics stats) {
this.store = store;
this.stats = stats;
this.blocks = inode.getBlocks();
for (Block block : blocks) {
this.fileLength += block.getLength();
}
}
@Override
public synchronized long getPos() throws IOException {
return pos;
}
@Override
public synchronized int available() throws IOException {
return (int) (fileLength - pos);
}
@Override
public synchronized void seek(long targetPos) throws IOException {
String message = String.format("Cannot seek to %d", targetPos);
if (targetPos > fileLength) {
throw new EOFException(message + ": after EOF");
}
if (targetPos < 0) {
throw new EOFException(message + ": negative");
}
pos = targetPos;
blockEnd = -1;
}
@Override
public synchronized boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public synchronized int read() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
int result = -1;
if (pos < fileLength) {
if (pos > blockEnd) {
blockSeekTo(pos);
}
result = blockStream.read();
if (result >= 0) {
pos++;
}
}
if (stats != null && result >= 0) {
stats.incrementBytesRead(1);
}
return result;
}
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
if (pos < fileLength) {
if (pos > blockEnd) {
blockSeekTo(pos);
}
int realLen = (int) Math.min((long) len, (blockEnd - pos + 1L));
int result = blockStream.read(buf, off, realLen);
if (result >= 0) {
pos += result;
}
if (stats != null && result > 0) {
stats.incrementBytesRead(result);
}
return result;
}
return -1;
}
private synchronized void blockSeekTo(long target) throws IOException {
//
// Compute desired block
//
int targetBlock = -1;
long targetBlockStart = 0;
long targetBlockEnd = 0;
for (int i = 0; i < blocks.length; i++) {
long blockLength = blocks[i].getLength();
targetBlockEnd = targetBlockStart + blockLength - 1;
if (target >= targetBlockStart && target <= targetBlockEnd) {
targetBlock = i;
break;
} else {
targetBlockStart = targetBlockEnd + 1;
}
}
if (targetBlock < 0) {
throw new IOException(
"Impossible situation: could not find target position " + target);
}
long offsetIntoBlock = target - targetBlockStart;
// read block blocks[targetBlock] from position offsetIntoBlock
this.blockFile = store.retrieveBlock(blocks[targetBlock], offsetIntoBlock);
this.pos = target;
this.blockEnd = targetBlockEnd;
this.blockStream = new DataInputStream(new FileInputStream(blockFile));
}
@Override
public void close() throws IOException {
if (closed) {
return;
}
if (blockStream != null) {
blockStream.close();
blockStream = null;
}
if (blockFile != null) {
boolean b = blockFile.delete();
if (!b) {
LOG.warn("Ignoring failed delete");
}
}
super.close();
closed = true;
}
/**
* We don't support marks.
*/
@Override
public boolean markSupported() {
return false;
}
@Override
public void mark(int readLimit) {
// Do nothing
}
@Override
public void reset() throws IOException {
throw new IOException("Mark not supported");
}
}

View File

@ -1,235 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3.INode.FileType;
import org.apache.hadoop.util.Progressable;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class S3OutputStream extends OutputStream {
private Configuration conf;
private int bufferSize;
private FileSystemStore store;
private Path path;
private long blockSize;
private File backupFile;
private OutputStream backupStream;
private Random r = new Random();
private boolean closed;
private int pos = 0;
private long filePos = 0;
private int bytesWrittenToBlock = 0;
private byte[] outBuf;
private List<Block> blocks = new ArrayList<Block>();
private Block nextBlock;
private static final Log LOG =
LogFactory.getLog(S3OutputStream.class.getName());
public S3OutputStream(Configuration conf, FileSystemStore store,
Path path, long blockSize, Progressable progress,
int buffersize) throws IOException {
this.conf = conf;
this.store = store;
this.path = path;
this.blockSize = blockSize;
this.backupFile = newBackupFile();
this.backupStream = new FileOutputStream(backupFile);
this.bufferSize = buffersize;
this.outBuf = new byte[bufferSize];
}
private File newBackupFile() throws IOException {
File dir = new File(conf.get("fs.s3.buffer.dir"));
if (!dir.exists() && !dir.mkdirs()) {
throw new IOException("Cannot create S3 buffer directory: " + dir);
}
File result = File.createTempFile("output-", ".tmp", dir);
result.deleteOnExit();
return result;
}
public long getPos() throws IOException {
return filePos;
}
@Override
public synchronized void write(int b) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) {
flush();
}
outBuf[pos++] = (byte) b;
filePos++;
}
@Override
public synchronized void write(byte b[], int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
while (len > 0) {
int remaining = bufferSize - pos;
int toWrite = Math.min(remaining, len);
System.arraycopy(b, off, outBuf, pos, toWrite);
pos += toWrite;
off += toWrite;
len -= toWrite;
filePos += toWrite;
if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) {
flush();
}
}
}
@Override
public synchronized void flush() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
if (bytesWrittenToBlock + pos >= blockSize) {
flushData((int) blockSize - bytesWrittenToBlock);
}
if (bytesWrittenToBlock == blockSize) {
endBlock();
}
flushData(pos);
}
private synchronized void flushData(int maxPos) throws IOException {
int workingPos = Math.min(pos, maxPos);
if (workingPos > 0) {
//
// To the local block backup, write just the bytes
//
backupStream.write(outBuf, 0, workingPos);
//
// Track position
//
bytesWrittenToBlock += workingPos;
System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
pos -= workingPos;
}
}
private synchronized void endBlock() throws IOException {
//
// Done with local copy
//
backupStream.close();
//
// Send it to S3
//
// TODO: Use passed in Progressable to report progress.
nextBlockOutputStream();
store.storeBlock(nextBlock, backupFile);
internalClose();
//
// Delete local backup, start new one
//
boolean b = backupFile.delete();
if (!b) {
LOG.warn("Ignoring failed delete");
}
backupFile = newBackupFile();
backupStream = new FileOutputStream(backupFile);
bytesWrittenToBlock = 0;
}
private synchronized void nextBlockOutputStream() throws IOException {
long blockId = r.nextLong();
while (store.blockExists(blockId)) {
blockId = r.nextLong();
}
nextBlock = new Block(blockId, bytesWrittenToBlock);
blocks.add(nextBlock);
bytesWrittenToBlock = 0;
}
private synchronized void internalClose() throws IOException {
INode inode = new INode(FileType.FILE, blocks.toArray(new Block[blocks
.size()]));
store.storeINode(path, inode);
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
flush();
if (filePos == 0 || bytesWrittenToBlock != 0) {
endBlock();
}
backupStream.close();
boolean b = backupFile.delete();
if (!b) {
LOG.warn("Ignoring failed delete");
}
super.close();
closed = true;
}
}

View File

@ -1,37 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Thrown when Hadoop cannot read the version of the data stored
* in {@link S3FileSystem}.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class VersionMismatchException extends S3FileSystemException {
private static final long serialVersionUID = 1L;
public VersionMismatchException(String clientVersion, String dataVersion) {
super("Version mismatch: client expects version " + clientVersion +
", but data has version " +
(dataVersion == null ? "[unversioned]" : dataVersion));
}
}

View File

@ -1,55 +0,0 @@
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<body>
<p>A distributed, block-based implementation of {@link
org.apache.hadoop.fs.FileSystem} that uses <a href="http://aws.amazon.com/s3">Amazon S3</a>
as a backing store.</p>
<p>
Files are stored in S3 as blocks (represented by
{@link org.apache.hadoop.fs.s3.Block}), which have an ID and a length.
Block metadata is stored in S3 as a small record (represented by
{@link org.apache.hadoop.fs.s3.INode}) using the URL-encoded
path string as a key. Inodes record the file type (regular file or directory) and the list of blocks.
This design makes it easy to seek to any given position in a file by reading the inode data to compute
which block to access, then using S3's support for
<a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35.2">HTTP Range</a> headers
to start streaming from the correct position.
Renames are also efficient since only the inode is moved (by a DELETE followed by a PUT since
S3 does not support renames).
</p>
<p>
For a single file <i>/dir1/file1</i> which takes two blocks of storage, the file structure in S3
would be something like this:
</p>
<pre>
/
/dir1
/dir1/file1
block-6415776850131549260
block-3026438247347758425
</pre>
<p>
Inodes start with a leading <code>/</code>, while blocks are prefixed with <code>block-</code>.
</p>
</body>
</html>

View File

@ -37,8 +37,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.s3.S3Credentials;
import org.apache.hadoop.fs.s3.S3Exception;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
import org.jets3t.service.S3Service;

View File

@ -53,7 +53,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3.S3Exception;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
@ -62,12 +61,19 @@ import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_BUFFER_DIR_DEFAULT;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_BUFFER_DIR_KEY;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_MAX_RETRIES_DEFAUL;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_MAX_RETRIES_KEY;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_SLEEP_TIME_DEFAULT;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_SLEEP_TIME_KEY;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.addDeprecatedConfigKeys;
/**
* A {@link FileSystem} for reading and writing files stored on
* <a href="http://aws.amazon.com/s3">Amazon S3</a>.
* Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
* stores files on S3 in their
* native form so they can be read by other S3 tools.
* This implementation stores files on S3 in their native form so they can be
* read by other S3 tools.
* <p>
* A note about directories. S3 of course has no "native" support for them.
* The idiom we choose then is: for any directory created by this class,
@ -85,8 +91,6 @@ import org.slf4j.LoggerFactory;
* is never returned.
* </li>
* </ul>
*
* @see org.apache.hadoop.fs.s3.S3FileSystem
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@ -99,6 +103,11 @@ public class NativeS3FileSystem extends FileSystem {
static final String PATH_DELIMITER = Path.SEPARATOR;
private static final int S3_MAX_LISTING_LENGTH = 1000;
static {
// Add the deprecated config keys
addDeprecatedConfigKeys();
}
static class NativeS3FsInputStream extends FSInputStream {
private NativeFileSystemStore store;
@ -257,8 +266,10 @@ public class NativeS3FileSystem extends FileSystem {
}
private File newBackupFile() throws IOException {
if (lDirAlloc == null) {
lDirAlloc = new LocalDirAllocator("fs.s3.buffer.dir");
if (conf.get(S3_NATIVE_BUFFER_DIR_KEY, null) != null) {
lDirAlloc = new LocalDirAllocator(S3_NATIVE_BUFFER_DIR_KEY);
} else {
lDirAlloc = new LocalDirAllocator(S3_NATIVE_BUFFER_DIR_DEFAULT);
}
File result = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
result.deleteOnExit();
@ -342,8 +353,9 @@ public class NativeS3FileSystem extends FileSystem {
NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
conf.getInt("fs.s3.maxRetries", 4),
conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
conf.getInt(S3_NATIVE_MAX_RETRIES_KEY, S3_NATIVE_MAX_RETRIES_DEFAUL),
conf.getLong(S3_NATIVE_SLEEP_TIME_KEY, S3_NATIVE_SLEEP_TIME_DEFAULT),
TimeUnit.SECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(IOException.class, basePolicy);

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3native;
import java.io.IOException;
import java.net.URI;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_AWS_ACCESS_KEY_ID;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_AWS_SECRET_ACCESS_KEY;
/**
* <p>
* Extracts AWS credentials from the filesystem URI or configuration.
* </p>
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class S3Credentials {
private String accessKey;
private String secretAccessKey;
/**
* @param uri bucket URI optionally containing username and password.
* @param conf configuration
* @throws IllegalArgumentException if credentials for S3 cannot be
* determined.
* @throws IOException if credential providers are misconfigured and we have
* to talk to them.
*/
public void initialize(URI uri, Configuration conf) throws IOException {
Preconditions.checkArgument(uri.getHost() != null,
"Invalid hostname in URI " + uri);
String userInfo = uri.getUserInfo();
if (userInfo != null) {
int index = userInfo.indexOf(':');
if (index != -1) {
accessKey = userInfo.substring(0, index);
secretAccessKey = userInfo.substring(index + 1);
} else {
accessKey = userInfo;
}
}
if (accessKey == null) {
accessKey = conf.getTrimmed(S3_NATIVE_AWS_ACCESS_KEY_ID);
}
if (secretAccessKey == null) {
final char[] pass = conf.getPassword(S3_NATIVE_AWS_SECRET_ACCESS_KEY);
if (pass != null) {
secretAccessKey = (new String(pass)).trim();
}
}
final String scheme = uri.getScheme();
Preconditions.checkArgument(!(accessKey == null && secretAccessKey == null),
"AWS Access Key ID and Secret Access Key must be specified as the " +
"username or password (respectively) of a " + scheme + " URL, or " +
"by setting the " + S3_NATIVE_AWS_ACCESS_KEY_ID + " or " +
S3_NATIVE_AWS_SECRET_ACCESS_KEY + " properties (respectively).");
Preconditions.checkArgument(accessKey != null,
"AWS Access Key ID must be specified as the username of a " + scheme +
" URL, or by setting the " + S3_NATIVE_AWS_ACCESS_KEY_ID +
" property.");
Preconditions.checkArgument(secretAccessKey != null,
"AWS Secret Access Key must be specified as the password of a " + scheme
+ " URL, or by setting the " + S3_NATIVE_AWS_SECRET_ACCESS_KEY +
" property.");
}
public String getAccessKey() {
return accessKey;
}
public String getSecretAccessKey() {
return secretAccessKey;
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
package org.apache.hadoop.fs.s3native;
import java.io.IOException;

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.fs.s3native;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.DeprecationDelta;
import org.apache.hadoop.fs.CommonConfigurationKeys;
/**
@ -43,5 +45,22 @@ public class S3NativeFileSystemConfigKeys extends CommonConfigurationKeys {
public static final String S3_NATIVE_CLIENT_WRITE_PACKET_SIZE_KEY =
"s3native.client-write-packet-size";
public static final int S3_NATIVE_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
static final String S3_NATIVE_BUFFER_DIR_KEY = "fs.s3n.buffer.dir";
static final String S3_NATIVE_BUFFER_DIR_DEFAULT = "${hadoop.tmp.dir}/s3n";
static final String S3_NATIVE_MAX_RETRIES_KEY = "fs.s3n.maxRetries";
static final int S3_NATIVE_MAX_RETRIES_DEFAUL = 4;
static final String S3_NATIVE_SLEEP_TIME_KEY = "fs.s3n.sleepTimeSeconds";
static final int S3_NATIVE_SLEEP_TIME_DEFAULT = 10;
static final String S3_NATIVE_AWS_ACCESS_KEY_ID = "fs.s3n.awsAccessKeyId";
static final String S3_NATIVE_AWS_SECRET_ACCESS_KEY =
"fs.s3n.awsSecretAccessKey";
static void addDeprecatedConfigKeys() {
Configuration.addDeprecations(new DeprecationDelta[]{
new DeprecationDelta("fs.s3.buffer.dir", S3_NATIVE_BUFFER_DIR_KEY),
new DeprecationDelta("fs.s3.maxRetries", S3_NATIVE_MAX_RETRIES_KEY),
new DeprecationDelta("fs.s3.sleepTimeSeconds", S3_NATIVE_SLEEP_TIME_KEY)
});
}
}

View File

@ -23,9 +23,8 @@
A distributed implementation of {@link
org.apache.hadoop.fs.FileSystem} for reading and writing files on
<a href="http://aws.amazon.com/s3">Amazon S3</a>.
Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem}, which is block-based,
this implementation stores
files on S3 in their native form for interoperability with other S3 tools.
This implementation stores files on S3 in their native form for interoperability
with other S3 tools.
</p>
</body>

View File

@ -13,6 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.fs.s3.S3FileSystem
org.apache.hadoop.fs.s3native.NativeS3FileSystem
org.apache.hadoop.fs.s3a.S3AFileSystem

View File

@ -28,8 +28,8 @@ HADOOP_OPTIONAL_TOOLS in hadoop-env.sh has 'hadoop-aws' in the list.
### Features
1. The "classic" `s3:` filesystem for storing objects in Amazon S3 Storage.
**NOTE: `s3:` is being phased out. Use `s3n:` or `s3a:` instead.**
1. The second-generation, `s3n:` filesystem, making it easy to share
data between hadoop and other applications via the S3 object store.
1. The third generation, `s3a:` filesystem. Designed to be a switch in
@ -972,7 +972,6 @@ each filesystem for its testing.
1. `test.fs.s3n.name` : the URL of the bucket for S3n tests
1. `test.fs.s3a.name` : the URL of the bucket for S3a tests
2. `test.fs.s3.name` : the URL of the bucket for "S3" tests
The contents of each bucket will be destroyed during the test process:
do not use the bucket for any purpose other than testing. Furthermore, for
@ -994,21 +993,6 @@ Example:
<value>s3a://test-aws-s3a/</value>
</property>
<property>
<name>test.fs.s3.name</name>
<value>s3://test-aws-s3/</value>
</property>
<property>
<name>fs.s3.awsAccessKeyId</name>
<value>DONOTPCOMMITTHISKEYTOSCM</value>
</property>
<property>
<name>fs.s3.awsSecretAccessKey</name>
<value>DONOTEVERSHARETHISSECRETKEY!</value>
</property>
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>DONOTPCOMMITTHISKEYTOSCM</value>
@ -1051,18 +1035,6 @@ The standard S3 authentication details must also be provided. This can be
through copy-and-paste of the `auth-keys.xml` credentials, or it can be
through direct XInclude inclusion.
### s3://
The filesystem name must be defined in the property `fs.contract.test.fs.s3`.
Example:
<property>
<name>fs.contract.test.fs.s3</name>
<value>s3://test-aws-s3/</value>
</property>
### s3n://
@ -1122,12 +1094,6 @@ Example:
<include xmlns="http://www.w3.org/2001/XInclude"
href="/home/testuser/.ssh/auth-keys.xml"/>
<property>
<name>fs.contract.test.fs.s3</name>
<value>s3://test-aws-s3/</value>
</property>
<property>
<name>fs.contract.test.fs.s3a</name>
<value>s3a://test-aws-s3a/</value>

View File

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
/**
* The contract of S3: only enabled if the test bucket is provided.
*/
public class S3Contract extends AbstractBondedFSContract {
public static final String CONTRACT_XML = "contract/s3.xml";
public S3Contract(Configuration conf) {
super(conf);
//insert the base features
addConfResource(CONTRACT_XML);
}
@Override
public String getScheme() {
return "s3";
}
@Override
public Path getTestPath() {
String testUniqueForkId = System.getProperty("test.unique.fork.id");
return testUniqueForkId == null ? super.getTestPath() :
new Path("/" + testUniqueForkId, "test");
}
}

View File

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
public class TestS3ContractCreate extends AbstractContractCreateTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3Contract(conf);
}
}

View File

@ -1,31 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
public class TestS3ContractDelete extends AbstractContractDeleteTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3Contract(conf);
}
}

View File

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
public class TestS3ContractMkdir extends AbstractContractMkdirTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3Contract(conf);
}
}

View File

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
public class TestS3ContractOpen extends AbstractContractOpenTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3Contract(conf);
}
}

View File

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
public class TestS3ContractRename extends AbstractContractRenameTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3Contract(conf);
}
}

View File

@ -1,49 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.Ignore;
import org.junit.Test;
/**
* root dir operations against an S3 bucket
*/
public class TestS3ContractRootDir extends AbstractContractRootDirectoryTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3Contract(conf);
}
@Override
@Test
@Ignore
public void testRmEmptyRootDirNonRecursive() {
}
@Override
@Test
@Ignore
public void testRmRootRecursive() {
}
}

View File

@ -1,41 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Ignore;
import org.junit.Test;
public class TestS3ContractSeek extends AbstractContractSeekTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3Contract(conf);
}
@Override
@Test
@Ignore
public void testReadFullyZeroByteFile() {
}
}

View File

@ -1,200 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3.INode.FileType;
/**
* A stub implementation of {@link FileSystemStore} for testing
* {@link S3FileSystem} without actually connecting to S3.
*/
public class InMemoryFileSystemStore implements FileSystemStore {
private Configuration conf;
private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>();
private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>();
@Override
public void initialize(URI uri, Configuration conf) {
this.conf = conf;
inodes.put(new Path("/"), INode.DIRECTORY_INODE);
}
@Override
public String getVersion() throws IOException {
return "0";
}
@Override
public void deleteINode(Path path) throws IOException {
inodes.remove(normalize(path));
}
@Override
public void deleteBlock(Block block) throws IOException {
blocks.remove(block.getId());
}
@Override
public boolean inodeExists(Path path) throws IOException {
return inodes.containsKey(normalize(path));
}
@Override
public boolean blockExists(long blockId) throws IOException {
return blocks.containsKey(blockId);
}
@Override
public INode retrieveINode(Path path) throws IOException {
return inodes.get(normalize(path));
}
@Override
public File retrieveBlock(Block block, long byteRangeStart) throws IOException {
byte[] data = blocks.get(block.getId());
File file = createTempFile();
BufferedOutputStream out = null;
try {
out = new BufferedOutputStream(new FileOutputStream(file));
out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart);
} finally {
if (out != null) {
out.close();
}
}
return file;
}
private File createTempFile() throws IOException {
File dir = new File(conf.get("fs.s3.buffer.dir"));
if (!dir.exists() && !dir.mkdirs()) {
throw new IOException("Cannot create S3 buffer directory: " + dir);
}
File result = File.createTempFile("test-", ".tmp", dir);
result.deleteOnExit();
return result;
}
@Override
public Set<Path> listSubPaths(Path path) throws IOException {
Path normalizedPath = normalize(path);
// This is inefficient but more than adequate for testing purposes.
Set<Path> subPaths = new LinkedHashSet<Path>();
for (Path p : inodes.tailMap(normalizedPath).keySet()) {
if (normalizedPath.equals(p.getParent())) {
subPaths.add(p);
}
}
return subPaths;
}
@Override
public Set<Path> listDeepSubPaths(Path path) throws IOException {
Path normalizedPath = normalize(path);
String pathString = normalizedPath.toUri().getPath();
if (!pathString.endsWith("/")) {
pathString += "/";
}
// This is inefficient but more than adequate for testing purposes.
Set<Path> subPaths = new LinkedHashSet<Path>();
for (Path p : inodes.tailMap(normalizedPath).keySet()) {
if (p.toUri().getPath().startsWith(pathString)) {
subPaths.add(p);
}
}
return subPaths;
}
@Override
public void storeINode(Path path, INode inode) throws IOException {
inodes.put(normalize(path), inode);
}
@Override
public void storeBlock(Block block, File file) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buf = new byte[8192];
int numRead;
BufferedInputStream in = null;
try {
in = new BufferedInputStream(new FileInputStream(file));
while ((numRead = in.read(buf)) >= 0) {
out.write(buf, 0, numRead);
}
} finally {
if (in != null) {
in.close();
}
}
blocks.put(block.getId(), out.toByteArray());
}
private Path normalize(Path path) {
if (!path.isAbsolute()) {
throw new IllegalArgumentException("Path must be absolute: " + path);
}
return new Path(path.toUri().getPath());
}
@Override
public void purge() throws IOException {
inodes.clear();
blocks.clear();
}
@Override
public void dump() throws IOException {
StringBuilder sb = new StringBuilder(getClass().getSimpleName());
sb.append(", \n");
for (Map.Entry<Path, INode> entry : inodes.entrySet()) {
sb.append(entry.getKey()).append("\n");
INode inode = entry.getValue();
sb.append("\t").append(inode.getFileType()).append("\n");
if (inode.getFileType() == FileType.DIRECTORY) {
continue;
}
for (int j = 0; j < inode.getBlocks().length; j++) {
sb.append("\t").append(inode.getBlocks()[j]).append("\n");
}
}
System.out.println(sb);
System.out.println(inodes.keySet());
System.out.println(blocks.keySet());
}
}

View File

@ -1,31 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.IOException;
public class Jets3tS3FileSystemContractTest
extends S3FileSystemContractBaseTest {
@Override
FileSystemStore getFileSystemStore() throws IOException {
return new Jets3tFileSystemStore();
}
}

View File

@ -1,61 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.junit.internal.AssumptionViolatedException;
public abstract class S3FileSystemContractBaseTest
extends FileSystemContractBaseTest {
public static final String KEY_TEST_FS = "test.fs.s3.name";
private FileSystemStore store;
abstract FileSystemStore getFileSystemStore() throws IOException;
@Override
protected void setUp() throws Exception {
Configuration conf = new Configuration();
store = getFileSystemStore();
fs = new S3FileSystem(store);
String fsname = conf.get(KEY_TEST_FS);
if (StringUtils.isEmpty(fsname)) {
throw new AssumptionViolatedException(
"No test FS defined in :" + KEY_TEST_FS);
}
fs.initialize(URI.create(fsname), conf);
}
@Override
protected void tearDown() throws Exception {
store.purge();
super.tearDown();
}
public void testCanonicalName() throws Exception {
assertNull("s3 doesn't support security token and shouldn't have canonical name",
fs.getCanonicalServiceName());
}
}

View File

@ -1,32 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import org.apache.hadoop.fs.s3.S3FileSystem;
import org.apache.hadoop.fs.s3.InMemoryFileSystemStore;
/**
* A helper implementation of {@link S3FileSystem}
* without actually connecting to S3 for unit testing.
*/
public class S3InMemoryFileSystem extends S3FileSystem {
public S3InMemoryFileSystem() {
super(new InMemoryFileSystemStore());
}
}

View File

@ -1,60 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.IOException;
import java.io.InputStream;
import junit.framework.TestCase;
import org.apache.hadoop.fs.s3.INode.FileType;
public class TestINode extends TestCase {
public void testSerializeFileWithSingleBlock() throws IOException {
Block[] blocks = { new Block(849282477840258181L, 128L) };
INode inode = new INode(FileType.FILE, blocks);
assertEquals("Length", 1L + 4 + 16, inode.getSerializedLength());
InputStream in = inode.serialize();
INode deserialized = INode.deserialize(in);
assertEquals("FileType", inode.getFileType(), deserialized.getFileType());
Block[] deserializedBlocks = deserialized.getBlocks();
assertEquals("Length", 1, deserializedBlocks.length);
assertEquals("Id", blocks[0].getId(), deserializedBlocks[0].getId());
assertEquals("Length", blocks[0].getLength(), deserializedBlocks[0]
.getLength());
}
public void testSerializeDirectory() throws IOException {
INode inode = INode.DIRECTORY_INODE;
assertEquals("Length", 1L, inode.getSerializedLength());
InputStream in = inode.serialize();
INode deserialized = INode.deserialize(in);
assertSame(INode.DIRECTORY_INODE, deserialized);
}
public void testDeserializeNull() throws IOException {
assertNull(INode.deserialize(null));
}
}

View File

@ -1,31 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.IOException;
public class TestInMemoryS3FileSystemContract
extends S3FileSystemContractBaseTest {
@Override
FileSystemStore getFileSystemStore() throws IOException {
return new InMemoryFileSystemStore();
}
}

View File

@ -1,52 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.IOException;
import java.net.URI;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
public class TestS3FileSystem extends TestCase {
public static final URI EXPECTED = URI.create("s3://c");
public void testInitialization() throws IOException {
initializationTest("s3://a:b@c");
initializationTest("s3://a:b@c/");
initializationTest("s3://a:b@c/path");
initializationTest("s3://a@c");
initializationTest("s3://a@c/");
initializationTest("s3://a@c/path");
initializationTest("s3://c");
initializationTest("s3://c/");
initializationTest("s3://c/path");
}
private void initializationTest(String initializationUri)
throws IOException {
S3FileSystem fs = new S3FileSystem(new InMemoryFileSystemStore());
fs.initialize(URI.create(initializationUri), new Configuration());
assertEquals(EXPECTED, fs.getUri());
}
}

View File

@ -1,67 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
public class TestS3InMemoryFileSystem extends TestCase {
private static final String TEST_PATH = "s3://test/data.txt";
private static final String TEST_DATA = "Sample data for testing.";
private S3InMemoryFileSystem fs;
@Override
public void setUp() throws IOException {
fs = new S3InMemoryFileSystem();
fs.initialize(URI.create("s3://test/"), new Configuration());
}
public void testBasicReadWriteIO() throws IOException {
FSDataOutputStream writeStream = fs.create(new Path(TEST_PATH));
writeStream.write(TEST_DATA.getBytes());
writeStream.flush();
writeStream.close();
FSDataInputStream readStream = fs.open(new Path(TEST_PATH));
BufferedReader br = new BufferedReader(new InputStreamReader(readStream));
String line = "";
StringBuffer stringBuffer = new StringBuffer();
while ((line = br.readLine()) != null) {
stringBuffer.append(line);
}
br.close();
assert(TEST_DATA.equals(stringBuffer.toString()));
}
@Override
public void tearDown() throws IOException {
fs.close();
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.fs.s3native;
import static org.apache.hadoop.fs.s3native.NativeS3FileSystem.PATH_DELIMITER;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_BUFFER_DIR_KEY;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.addDeprecatedConfigKeys;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@ -49,6 +51,11 @@ import org.apache.hadoop.util.Time;
*/
public class InMemoryNativeFileSystemStore implements NativeFileSystemStore {
static {
// Add the deprecated config keys
addDeprecatedConfigKeys();
}
private Configuration conf;
private SortedMap<String, FileMetadata> metadataMap =
@ -114,7 +121,7 @@ public class InMemoryNativeFileSystemStore implements NativeFileSystemStore {
}
private File createTempFile() throws IOException {
File dir = new File(conf.get("fs.s3.buffer.dir"));
File dir = new File(conf.get(S3_NATIVE_BUFFER_DIR_KEY));
if (!dir.exists() && !dir.mkdirs()) {
throw new IOException("Cannot create S3 buffer directory: " + dir);
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3;
package org.apache.hadoop.fs.s3native;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,9 +33,16 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_AWS_ACCESS_KEY_ID;
import static org.apache.hadoop.fs.s3native.S3NativeFileSystemConfigKeys.S3_NATIVE_AWS_SECRET_ACCESS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
* This is to test the {@link S3Credentials} class for extracting AWS
* credentials.
*/
public class TestS3Credentials {
public static final Log LOG = LogFactory.getLog(TestS3Credentials.class);
@ -55,10 +62,10 @@ public class TestS3Credentials {
public void testInvalidHostnameWithUnderscores() throws Exception {
S3Credentials s3Credentials = new S3Credentials();
try {
s3Credentials.initialize(new URI("s3://a:b@c_d"), new Configuration());
s3Credentials.initialize(new URI("s3n://a:b@c_d"), new Configuration());
fail("Should throw IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertEquals("Invalid hostname in URI s3://a:b@c_d", e.getMessage());
assertEquals("Invalid hostname in URI s3n://a:b@c_d", e.getMessage());
}
}
@ -66,9 +73,9 @@ public class TestS3Credentials {
public void testPlaintextConfigPassword() throws Exception {
S3Credentials s3Credentials = new S3Credentials();
Configuration conf = new Configuration();
conf.set("fs.s3.awsAccessKeyId", EXAMPLE_ID);
conf.set("fs.s3.awsSecretAccessKey", EXAMPLE_KEY);
s3Credentials.initialize(new URI("s3://foobar"), conf);
conf.set(S3_NATIVE_AWS_ACCESS_KEY_ID, EXAMPLE_ID);
conf.set(S3_NATIVE_AWS_SECRET_ACCESS_KEY, EXAMPLE_KEY);
s3Credentials.initialize(new URI("s3n://foobar"), conf);
assertEquals("Could not retrieve proper access key", EXAMPLE_ID,
s3Credentials.getAccessKey());
assertEquals("Could not retrieve proper secret", EXAMPLE_KEY,
@ -79,11 +86,11 @@ public class TestS3Credentials {
public void testPlaintextConfigPasswordWithWhitespace() throws Exception {
S3Credentials s3Credentials = new S3Credentials();
Configuration conf = new Configuration();
conf.set("fs.s3.awsAccessKeyId", "\r\n " + EXAMPLE_ID +
conf.set(S3_NATIVE_AWS_ACCESS_KEY_ID, "\r\n " + EXAMPLE_ID +
" \r\n");
conf.set("fs.s3.awsSecretAccessKey", "\r\n " + EXAMPLE_KEY +
conf.set(S3_NATIVE_AWS_SECRET_ACCESS_KEY, "\r\n " + EXAMPLE_KEY +
" \r\n");
s3Credentials.initialize(new URI("s3://foobar"), conf);
s3Credentials.initialize(new URI("s3n://foobar"), conf);
assertEquals("Could not retrieve proper access key", EXAMPLE_ID,
s3Credentials.getAccessKey());
assertEquals("Could not retrieve proper secret", EXAMPLE_KEY,
@ -106,14 +113,14 @@ public class TestS3Credentials {
// add our creds to the provider
final CredentialProvider provider =
CredentialProviderFactory.getProviders(conf).get(0);
provider.createCredentialEntry("fs.s3.awsSecretAccessKey",
provider.createCredentialEntry(S3_NATIVE_AWS_SECRET_ACCESS_KEY,
EXAMPLE_KEY.toCharArray());
provider.flush();
// make sure S3Creds can retrieve things.
S3Credentials s3Credentials = new S3Credentials();
conf.set("fs.s3.awsAccessKeyId", EXAMPLE_ID);
s3Credentials.initialize(new URI("s3://foobar"), conf);
conf.set(S3_NATIVE_AWS_ACCESS_KEY_ID, EXAMPLE_ID);
s3Credentials.initialize(new URI("s3n://foobar"), conf);
assertEquals("Could not retrieve proper access key", EXAMPLE_ID,
s3Credentials.getAccessKey());
assertEquals("Could not retrieve proper secret", EXAMPLE_KEY,
@ -125,8 +132,8 @@ public class TestS3Credentials {
public void noSecretShouldThrow() throws Exception {
S3Credentials s3Credentials = new S3Credentials();
Configuration conf = new Configuration();
conf.set("fs.s3.awsAccessKeyId", EXAMPLE_ID);
s3Credentials.initialize(new URI("s3://foobar"), conf);
conf.set(S3_NATIVE_AWS_ACCESS_KEY_ID, EXAMPLE_ID);
s3Credentials.initialize(new URI("s3n://foobar"), conf);
}
@Test(expected=IllegalArgumentException.class)
@ -134,7 +141,7 @@ public class TestS3Credentials {
public void noAccessIdShouldThrow() throws Exception {
S3Credentials s3Credentials = new S3Credentials();
Configuration conf = new Configuration();
conf.set("fs.s3.awsSecretAccessKey", EXAMPLE_KEY);
s3Credentials.initialize(new URI("s3://foobar"), conf);
conf.set(S3_NATIVE_AWS_SECRET_ACCESS_KEY, EXAMPLE_KEY);
s3Credentials.initialize(new URI("s3n://foobar"), conf);
}
}

View File

@ -1,104 +0,0 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<!--
S3 is backed by a blobstore.
-->
<property>
<name>fs.contract.test.root-tests-enabled</name>
<value>true</value>
</property>
<property>
<name>fs.contract.test.random-seek-count</name>
<value>10</value>
</property>
<property>
<name>fs.contract.is-blobstore</name>
<value>true</value>
</property>
<property>
<name>fs.contract.is-case-sensitive</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-returns-false-if-source-missing</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-append</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-atomic-directory-delete</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-atomic-rename</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-block-locality</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-concat</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-seek</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-seek-on-closed-file</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-available-on-closed-file</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rejects-seek-past-eof</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-strict-exceptions</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-unix-permissions</name>
<value>false</value>
</property>
</configuration>

View File

@ -4546,7 +4546,6 @@
"hadoop.http.authentication.kerberos.keytab" : "${user.home}/hadoop.keytab",
"yarn.nodemanager.keytab" : "/etc/krb5.keytab",
"io.seqfile.sorter.recordlimit" : "1000000",
"s3.blocksize" : "67108864",
"mapreduce.task.io.sort.factor" : "10",
"yarn.nodemanager.disk-health-checker.interval-ms" : "120000",
"mapreduce.job.working.dir" : "hdfs://a2115.smile.com:9820/user/jenkins",
@ -4556,12 +4555,10 @@
"dfs.namenode.delegation.token.renew-interval" : "86400000",
"yarn.nodemanager.resource.memory-mb" : "8192",
"io.map.index.interval" : "128",
"s3.client-write-packet-size" : "65536",
"mapreduce.task.files.preserve.failedtasks" : "false",
"dfs.namenode.http-address" : "a2115.smile.com:20101",
"ha.zookeeper.session-timeout.ms" : "5000",
"hadoop.hdfs.configuration.version" : "1",
"s3.replication" : "3",
"dfs.datanode.balance.bandwidthPerSec" : "1048576",
"mapreduce.reduce.shuffle.connect.timeout" : "180000",
"hadoop.ssl.enabled" : "false",
@ -4662,7 +4659,7 @@
"mapreduce.shuffle.ssl.enabled" : "false",
"dfs.namenode.invalidate.work.pct.per.iteration" : "0.32f",
"dfs.blockreport.intervalMsec" : "21600000",
"fs.s3.sleepTimeSeconds" : "10",
"fs.s3n.sleepTimeSeconds" : "10",
"dfs.namenode.replication.considerLoad" : "true",
"dfs.client.block.write.retries" : "3",
"hadoop.ssl.server.conf" : "ssl-server.xml",
@ -4676,7 +4673,6 @@
"dfs.replication" : "3",
"ipc.client.tcpnodelay" : "false",
"dfs.namenode.accesstime.precision" : "3600000",
"s3.stream-buffer-size" : "4096",
"mapreduce.jobtracker.tasktracker.maxblacklists" : "4",
"dfs.client.read.shortcircuit.skip.checksum" : "false",
"mapreduce.job.jvm.numtasks" : "1",
@ -4694,7 +4690,7 @@
"kfs.stream-buffer-size" : "4096",
"dfs.ha.tail-edits.period" : "60",
"hadoop.security.authentication" : "simple",
"fs.s3.buffer.dir" : "${hadoop.tmp.dir}/s3",
"fs.s3n.buffer.dir" : "${hadoop.tmp.dir}/s3n",
"rpc.engine.org.apache.hadoop.yarn.api.AMRMProtocolPB" : "org.apache.hadoop.ipc.ProtobufRpcEngine",
"mapreduce.jobtracker.taskscheduler" : "org.apache.hadoop.mapred.JobQueueTaskScheduler",
"yarn.app.mapreduce.am.job.task.listener.thread-count" : "30",
@ -4776,7 +4772,7 @@
"mapreduce.job.dir" : "/user/jenkins/.staging/job_1369942127770_1205",
"io.map.index.skip" : "0",
"net.topology.node.switch.mapping.impl" : "org.apache.hadoop.net.ScriptBasedMapping",
"fs.s3.maxRetries" : "4",
"fs.s3n.maxRetries" : "4",
"ha.failover-controller.new-active.rpc-timeout.ms" : "60000",
"s3native.client-write-packet-size" : "65536",
"yarn.resourcemanager.amliveliness-monitor.interval-ms" : "1000",
@ -4844,11 +4840,9 @@
"ftp.bytes-per-checksum" : "512",
"ipc.server.tcpnodelay" : "false",
"dfs.namenode.stale.datanode.interval" : "30000",
"s3.bytes-per-checksum" : "512",
"mapreduce.job.speculative.slowtaskthreshold" : "1.0",
"yarn.nodemanager.localizer.cache.target-size-mb" : "10240",
"yarn.nodemanager.remote-app-log-dir" : "/tmp/logs",
"fs.s3.block.size" : "67108864",
"mapreduce.job.queuename" : "sls_queue_1",
"dfs.client.failover.connection.retries" : "0",
"hadoop.rpc.protection" : "authentication",
@ -9649,7 +9643,6 @@
"hadoop.http.authentication.kerberos.keytab" : "${user.home}/hadoop.keytab",
"yarn.nodemanager.keytab" : "/etc/krb5.keytab",
"io.seqfile.sorter.recordlimit" : "1000000",
"s3.blocksize" : "67108864",
"mapreduce.task.io.sort.factor" : "10",
"yarn.nodemanager.disk-health-checker.interval-ms" : "120000",
"mapreduce.job.working.dir" : "hdfs://a2115.smile.com:9820/user/jenkins",
@ -9659,12 +9652,10 @@
"dfs.namenode.delegation.token.renew-interval" : "86400000",
"yarn.nodemanager.resource.memory-mb" : "8192",
"io.map.index.interval" : "128",
"s3.client-write-packet-size" : "65536",
"mapreduce.task.files.preserve.failedtasks" : "false",
"dfs.namenode.http-address" : "a2115.smile.com:20101",
"ha.zookeeper.session-timeout.ms" : "5000",
"hadoop.hdfs.configuration.version" : "1",
"s3.replication" : "3",
"dfs.datanode.balance.bandwidthPerSec" : "1048576",
"mapreduce.reduce.shuffle.connect.timeout" : "180000",
"hadoop.ssl.enabled" : "false",
@ -9765,7 +9756,7 @@
"mapreduce.shuffle.ssl.enabled" : "false",
"dfs.namenode.invalidate.work.pct.per.iteration" : "0.32f",
"dfs.blockreport.intervalMsec" : "21600000",
"fs.s3.sleepTimeSeconds" : "10",
"fs.s3n.sleepTimeSeconds" : "10",
"dfs.namenode.replication.considerLoad" : "true",
"dfs.client.block.write.retries" : "3",
"hadoop.ssl.server.conf" : "ssl-server.xml",
@ -9779,7 +9770,6 @@
"dfs.replication" : "3",
"ipc.client.tcpnodelay" : "false",
"dfs.namenode.accesstime.precision" : "3600000",
"s3.stream-buffer-size" : "4096",
"mapreduce.jobtracker.tasktracker.maxblacklists" : "4",
"dfs.client.read.shortcircuit.skip.checksum" : "false",
"mapreduce.job.jvm.numtasks" : "1",
@ -9797,7 +9787,7 @@
"kfs.stream-buffer-size" : "4096",
"dfs.ha.tail-edits.period" : "60",
"hadoop.security.authentication" : "simple",
"fs.s3.buffer.dir" : "${hadoop.tmp.dir}/s3",
"fs.s3n.buffer.dir" : "${hadoop.tmp.dir}/s3",
"rpc.engine.org.apache.hadoop.yarn.api.AMRMProtocolPB" : "org.apache.hadoop.ipc.ProtobufRpcEngine",
"mapreduce.jobtracker.taskscheduler" : "org.apache.hadoop.mapred.JobQueueTaskScheduler",
"yarn.app.mapreduce.am.job.task.listener.thread-count" : "30",
@ -9879,7 +9869,7 @@
"mapreduce.job.dir" : "/user/jenkins/.staging/job_1369942127770_1206",
"io.map.index.skip" : "0",
"net.topology.node.switch.mapping.impl" : "org.apache.hadoop.net.ScriptBasedMapping",
"fs.s3.maxRetries" : "4",
"fs.s3n.maxRetries" : "4",
"ha.failover-controller.new-active.rpc-timeout.ms" : "60000",
"s3native.client-write-packet-size" : "65536",
"yarn.resourcemanager.amliveliness-monitor.interval-ms" : "1000",
@ -9947,11 +9937,9 @@
"ftp.bytes-per-checksum" : "512",
"ipc.server.tcpnodelay" : "false",
"dfs.namenode.stale.datanode.interval" : "30000",
"s3.bytes-per-checksum" : "512",
"mapreduce.job.speculative.slowtaskthreshold" : "1.0",
"yarn.nodemanager.localizer.cache.target-size-mb" : "10240",
"yarn.nodemanager.remote-app-log-dir" : "/tmp/logs",
"fs.s3.block.size" : "67108864",
"mapreduce.job.queuename" : "sls_queue_1",
"dfs.client.failover.connection.retries" : "0",
"hadoop.rpc.protection" : "authentication",
@ -10252,7 +10240,6 @@
"hadoop.http.authentication.kerberos.keytab" : "${user.home}/hadoop.keytab",
"yarn.nodemanager.keytab" : "/etc/krb5.keytab",
"io.seqfile.sorter.recordlimit" : "1000000",
"s3.blocksize" : "67108864",
"mapreduce.task.io.sort.factor" : "10",
"yarn.nodemanager.disk-health-checker.interval-ms" : "120000",
"mapreduce.job.working.dir" : "hdfs://a2115.smile.com:9820/user/jenkins",
@ -10262,12 +10249,10 @@
"dfs.namenode.delegation.token.renew-interval" : "86400000",
"yarn.nodemanager.resource.memory-mb" : "8192",
"io.map.index.interval" : "128",
"s3.client-write-packet-size" : "65536",
"mapreduce.task.files.preserve.failedtasks" : "false",
"dfs.namenode.http-address" : "a2115.smile.com:20101",
"ha.zookeeper.session-timeout.ms" : "5000",
"hadoop.hdfs.configuration.version" : "1",
"s3.replication" : "3",
"dfs.datanode.balance.bandwidthPerSec" : "1048576",
"mapreduce.reduce.shuffle.connect.timeout" : "180000",
"hadoop.ssl.enabled" : "false",
@ -10369,7 +10354,7 @@
"mapreduce.shuffle.ssl.enabled" : "false",
"dfs.namenode.invalidate.work.pct.per.iteration" : "0.32f",
"dfs.blockreport.intervalMsec" : "21600000",
"fs.s3.sleepTimeSeconds" : "10",
"fs.s3n.sleepTimeSeconds" : "10",
"dfs.namenode.replication.considerLoad" : "true",
"dfs.client.block.write.retries" : "3",
"hadoop.ssl.server.conf" : "ssl-server.xml",
@ -10383,7 +10368,6 @@
"dfs.replication" : "3",
"ipc.client.tcpnodelay" : "false",
"dfs.namenode.accesstime.precision" : "3600000",
"s3.stream-buffer-size" : "4096",
"mapreduce.jobtracker.tasktracker.maxblacklists" : "4",
"dfs.client.read.shortcircuit.skip.checksum" : "false",
"mapreduce.job.jvm.numtasks" : "1",
@ -10401,7 +10385,7 @@
"kfs.stream-buffer-size" : "4096",
"dfs.ha.tail-edits.period" : "60",
"hadoop.security.authentication" : "simple",
"fs.s3.buffer.dir" : "${hadoop.tmp.dir}/s3",
"fs.s3n.buffer.dir" : "${hadoop.tmp.dir}/s3n",
"rpc.engine.org.apache.hadoop.yarn.api.AMRMProtocolPB" : "org.apache.hadoop.ipc.ProtobufRpcEngine",
"mapreduce.jobtracker.taskscheduler" : "org.apache.hadoop.mapred.JobQueueTaskScheduler",
"yarn.app.mapreduce.am.job.task.listener.thread-count" : "30",
@ -10483,7 +10467,7 @@
"mapreduce.job.dir" : "/user/jenkins/.staging/job_1369942127770_1207",
"io.map.index.skip" : "0",
"net.topology.node.switch.mapping.impl" : "org.apache.hadoop.net.ScriptBasedMapping",
"fs.s3.maxRetries" : "4",
"fs.s3n.maxRetries" : "4",
"ha.failover-controller.new-active.rpc-timeout.ms" : "60000",
"s3native.client-write-packet-size" : "65536",
"yarn.resourcemanager.amliveliness-monitor.interval-ms" : "1000",
@ -10551,11 +10535,9 @@
"ftp.bytes-per-checksum" : "512",
"ipc.server.tcpnodelay" : "false",
"dfs.namenode.stale.datanode.interval" : "30000",
"s3.bytes-per-checksum" : "512",
"mapreduce.job.speculative.slowtaskthreshold" : "1.0",
"yarn.nodemanager.localizer.cache.target-size-mb" : "10240",
"yarn.nodemanager.remote-app-log-dir" : "/tmp/logs",
"fs.s3.block.size" : "67108864",
"mapreduce.job.queuename" : "sls_queue_1",
"dfs.client.failover.connection.retries" : "0",
"hadoop.rpc.protection" : "authentication",