HADOOP-13186. Multipart Uploader API. Contributed by Ewan Higgs

This commit is contained in:
Chris Douglas 2018-06-17 11:54:26 -07:00
parent 3905fdb793
commit 980031bb04
23 changed files with 1290 additions and 2 deletions

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* Byte array backed part handle.
*/
public final class BBPartHandle implements PartHandle {
private static final long serialVersionUID = 0x23ce3eb1;
private final byte[] bytes;
private BBPartHandle(ByteBuffer byteBuffer){
this.bytes = byteBuffer.array();
}
public static PartHandle from(ByteBuffer byteBuffer) {
return new BBPartHandle(byteBuffer);
}
@Override
public ByteBuffer bytes() {
return ByteBuffer.wrap(bytes);
}
@Override
public int hashCode() {
return Arrays.hashCode(bytes);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof PartHandle)) {
return false;
}
PartHandle o = (PartHandle) other;
return bytes().equals(o.bytes());
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* Byte array backed upload handle.
*/
public final class BBUploadHandle implements UploadHandle {
private static final long serialVersionUID = 0x69d5509b;
private final byte[] bytes;
private BBUploadHandle(ByteBuffer byteBuffer){
this.bytes = byteBuffer.array();
}
public static UploadHandle from(ByteBuffer byteBuffer) {
return new BBUploadHandle(byteBuffer);
}
@Override
public int hashCode() {
return Arrays.hashCode(bytes);
}
@Override
public ByteBuffer bytes() {
return ByteBuffer.wrap(bytes);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof UploadHandle)) {
return false;
}
UploadHandle o = (UploadHandle) other;
return bytes().equals(o.bytes());
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import com.google.common.base.Charsets;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
/**
* A MultipartUploader that uses the basic FileSystem commands.
* This is done in three stages:
* Init - create a temp _multipart directory.
* PutPart - copying the individual parts of the file to the temp directory.
* Complete - use {@link FileSystem#concat} to merge the files; and then delete
* the temp directory.
*/
public class FileSystemMultipartUploader extends MultipartUploader {
private final FileSystem fs;
public FileSystemMultipartUploader(FileSystem fs) {
this.fs = fs;
}
@Override
public UploadHandle initialize(Path filePath) throws IOException {
Path collectorPath = createCollectorPath(filePath);
fs.mkdirs(collectorPath, FsPermission.getDirDefault());
ByteBuffer byteBuffer = ByteBuffer.wrap(
collectorPath.toString().getBytes(Charsets.UTF_8));
return BBUploadHandle.from(byteBuffer);
}
@Override
public PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes)
throws IOException {
byte[] uploadIdByteArray = uploadId.toByteArray();
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
uploadIdByteArray.length, Charsets.UTF_8));
Path partPath =
Path.mergePaths(collectorPath, Path.mergePaths(new Path(Path.SEPARATOR),
new Path(Integer.toString(partNumber) + ".part")));
FSDataOutputStreamBuilder outputStream = fs.createFile(partPath);
FSDataOutputStream fsDataOutputStream = outputStream.build();
IOUtils.copy(inputStream, fsDataOutputStream, 4096);
fsDataOutputStream.close();
return BBPartHandle.from(ByteBuffer.wrap(
partPath.toString().getBytes(Charsets.UTF_8)));
}
private Path createCollectorPath(Path filePath) {
return Path.mergePaths(filePath.getParent(),
Path.mergePaths(new Path(filePath.getName().split("\\.")[0]),
Path.mergePaths(new Path("_multipart"),
new Path(Path.SEPARATOR))));
}
@Override
@SuppressWarnings("deprecation") // rename w/ OVERWRITE
public PathHandle complete(Path filePath,
List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
throws IOException {
handles.sort(Comparator.comparing(Pair::getKey));
List<Path> partHandles = handles
.stream()
.map(pair -> {
byte[] byteArray = pair.getValue().toByteArray();
return new Path(new String(byteArray, 0, byteArray.length,
Charsets.UTF_8));
})
.collect(Collectors.toList());
Path collectorPath = createCollectorPath(filePath);
Path filePathInsideCollector = Path.mergePaths(collectorPath,
new Path(Path.SEPARATOR + filePath.getName()));
fs.create(filePathInsideCollector).close();
fs.concat(filePathInsideCollector,
partHandles.toArray(new Path[handles.size()]));
fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE);
fs.delete(collectorPath, true);
FileStatus status = fs.getFileStatus(filePath);
return fs.getPathHandle(status);
}
@Override
public void abort(Path filePath, UploadHandle uploadId) throws IOException {
byte[] uploadIdByteArray = uploadId.toByteArray();
Path collectorPath = new Path(new String(uploadIdByteArray, 0,
uploadIdByteArray.length, Charsets.UTF_8));
fs.delete(collectorPath, true);
}
/**
* Factory for creating MultipartUploaderFactory objects for file://
* filesystems.
*/
public static class Factory extends MultipartUploaderFactory {
protected MultipartUploader createMultipartUploader(FileSystem fs,
Configuration conf) {
if (fs.getScheme().equals("file")) {
return new FileSystemMultipartUploader(fs);
}
return null;
}
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import com.google.protobuf.ByteString;
import org.apache.hadoop.fs.FSProtos.LocalFileSystemPathHandleProto;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;
/**
* Opaque handle to an entity in a FileSystem.
*/
public class LocalFileSystemPathHandle implements PathHandle {
private final String path;
private final Long mtime;
public LocalFileSystemPathHandle(String path, Optional<Long> mtime) {
this.path = path;
this.mtime = mtime.orElse(null);
}
public LocalFileSystemPathHandle(ByteBuffer bytes) throws IOException {
if (null == bytes) {
throw new IOException("Missing PathHandle");
}
LocalFileSystemPathHandleProto p =
LocalFileSystemPathHandleProto.parseFrom(ByteString.copyFrom(bytes));
path = p.hasPath() ? p.getPath() : null;
mtime = p.hasMtime() ? p.getMtime() : null;
}
public String getPath() {
return path;
}
public void verify(FileStatus stat) throws InvalidPathHandleException {
if (null == stat) {
throw new InvalidPathHandleException("Could not resolve handle");
}
if (mtime != null && mtime != stat.getModificationTime()) {
throw new InvalidPathHandleException("Content changed");
}
}
@Override
public ByteBuffer bytes() {
LocalFileSystemPathHandleProto.Builder b =
LocalFileSystemPathHandleProto.newBuilder();
b.setPath(path);
if (mtime != null) {
b.setMtime(mtime);
}
return b.build().toByteString().asReadOnlyByteBuffer();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LocalFileSystemPathHandle that = (LocalFileSystemPathHandle) o;
return Objects.equals(path, that.path) &&
Objects.equals(mtime, that.mtime);
}
@Override
public int hashCode() {
return Objects.hash(path, mtime);
}
@Override
public String toString() {
return "LocalFileSystemPathHandle{" +
"path='" + path + '\'' +
", mtime=" + mtime +
'}';
}
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MultipartUploader is an interface for copying files multipart and across
* multiple nodes. Users should:
* 1. Initialize an upload
* 2. Upload parts in any order
* 3. Complete the upload in order to have it materialize in the destination FS.
*
* Implementers should make sure that the complete function should make sure
* that 'complete' will reorder parts if the destination FS doesn't already
* do it for them.
*/
public abstract class MultipartUploader {
public static final Logger LOG =
LoggerFactory.getLogger(MultipartUploader.class);
/**
* Initialize a multipart upload.
* @param filePath Target path for upload.
* @return unique identifier associating part uploads.
* @throws IOException
*/
public abstract UploadHandle initialize(Path filePath) throws IOException;
/**
* Put part as part of a multipart upload. It should be possible to have
* parts uploaded in any order (or in parallel).
* @param filePath Target path for upload (same as {@link #initialize(Path)}).
* @param inputStream Data for this part.
* @param partNumber Index of the part relative to others.
* @param uploadId Identifier from {@link #initialize(Path)}.
* @param lengthInBytes Target length to read from the stream.
* @return unique PartHandle identifier for the uploaded part.
* @throws IOException
*/
public abstract PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes)
throws IOException;
/**
* Complete a multipart upload.
* @param filePath Target path for upload (same as {@link #initialize(Path)}.
* @param handles Identifiers with associated part numbers from
* {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
* Depending on the backend, the list order may be significant.
* @param multipartUploadId Identifier from {@link #initialize(Path)}.
* @return unique PathHandle identifier for the uploaded file.
* @throws IOException
*/
public abstract PathHandle complete(Path filePath,
List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
throws IOException;
/**
* Aborts a multipart upload.
* @param filePath Target path for upload (same as {@link #initialize(Path)}.
* @param multipartuploadId Identifier from {@link #initialize(Path)}.
* @throws IOException
*/
public abstract void abort(Path filePath, UploadHandle multipartuploadId)
throws IOException;
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.ServiceLoader;
/**
* {@link ServiceLoader}-driven uploader API for storage services supporting
* multipart uploads.
*/
public abstract class MultipartUploaderFactory {
public static final Logger LOG =
LoggerFactory.getLogger(MultipartUploaderFactory.class);
/**
* Multipart Uploaders listed as services.
*/
private static ServiceLoader<MultipartUploaderFactory> serviceLoader =
ServiceLoader.load(MultipartUploaderFactory.class,
MultipartUploaderFactory.class.getClassLoader());
// Iterate through the serviceLoader to avoid lazy loading.
// Lazy loading would require synchronization in concurrent use cases.
static {
Iterator<MultipartUploaderFactory> iterServices = serviceLoader.iterator();
while (iterServices.hasNext()) {
iterServices.next();
}
}
public static MultipartUploader get(FileSystem fs, Configuration conf)
throws IOException {
MultipartUploader mpu = null;
for (MultipartUploaderFactory factory : serviceLoader) {
mpu = factory.createMultipartUploader(fs, conf);
if (mpu != null) {
break;
}
}
return mpu;
}
protected abstract MultipartUploader createMultipartUploader(FileSystem fs,
Configuration conf) throws IOException;
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.Serializable;
import java.nio.ByteBuffer;
/**
* Opaque, serializable reference to an part id for multipart uploads.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface PartHandle extends Serializable {
/**
* @return Serialized from in bytes.
*/
default byte[] toByteArray() {
ByteBuffer bb = bytes();
byte[] ret = new byte[bb.remaining()];
bb.get(ret);
return ret;
}
ByteBuffer bytes();
@Override
boolean equals(Object other);
}

View File

@ -40,6 +40,7 @@
import java.nio.file.attribute.FileTime; import java.nio.file.attribute.FileTime;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Optional;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -213,6 +214,18 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
new LocalFSFileInputStream(f), bufferSize)); new LocalFSFileInputStream(f), bufferSize));
} }
@Override
public FSDataInputStream open(PathHandle fd, int bufferSize)
throws IOException {
if (!(fd instanceof LocalFileSystemPathHandle)) {
fd = new LocalFileSystemPathHandle(fd.bytes());
}
LocalFileSystemPathHandle id = (LocalFileSystemPathHandle) fd;
id.verify(getFileStatus(new Path(id.getPath())));
return new FSDataInputStream(new BufferedFSInputStream(
new LocalFSFileInputStream(new Path(id.getPath())), bufferSize));
}
/********************************************************* /*********************************************************
* For create()'s FSOutputStream. * For create()'s FSOutputStream.
*********************************************************/ *********************************************************/
@ -350,6 +363,18 @@ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
return out; return out;
} }
@Override
public void concat(final Path trg, final Path [] psrcs) throws IOException {
final int bufferSize = 4096;
try(FSDataOutputStream out = create(trg)) {
for (Path src : psrcs) {
try(FSDataInputStream in = open(src)) {
IOUtils.copyBytes(in, out, bufferSize, false);
}
}
}
}
@Override @Override
public boolean rename(Path src, Path dst) throws IOException { public boolean rename(Path src, Path dst) throws IOException {
// Attempt rename using Java API. // Attempt rename using Java API.
@ -863,6 +888,38 @@ public void setTimes(Path p, long mtime, long atime) throws IOException {
} }
} }
/**
* Hook to implement support for {@link PathHandle} operations.
* @param stat Referent in the target FileSystem
* @param opts Constraints that determine the validity of the
* {@link PathHandle} reference.
*/
protected PathHandle createPathHandle(FileStatus stat,
Options.HandleOpt... opts) {
if (stat.isDirectory() || stat.isSymlink()) {
throw new IllegalArgumentException("PathHandle only available for files");
}
String authority = stat.getPath().toUri().getAuthority();
if (authority != null && !authority.equals("file://")) {
throw new IllegalArgumentException("Wrong FileSystem: " + stat.getPath());
}
Options.HandleOpt.Data data =
Options.HandleOpt.getOpt(Options.HandleOpt.Data.class, opts)
.orElse(Options.HandleOpt.changed(false));
Options.HandleOpt.Location loc =
Options.HandleOpt.getOpt(Options.HandleOpt.Location.class, opts)
.orElse(Options.HandleOpt.moved(false));
if (loc.allowChange()) {
throw new UnsupportedOperationException("Tracking file movement in " +
"basic FileSystem is not supported");
}
final Path p = stat.getPath();
final Optional<Long> mtime = !data.allowChange()
? Optional.of(stat.getModificationTime())
: Optional.empty();
return new LocalFileSystemPathHandle(p.toString(), mtime);
}
@Override @Override
public boolean supportsSymlinks() { public boolean supportsSymlinks() {
return true; return true;

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* MultipartUploader for a given file system name/scheme is not supported.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class UnsupportedMultipartUploaderException extends IOException {
private static final long serialVersionUID = 1L;
/**
* Constructs exception with the specified detail message.
*
* @param message exception message.
*/
public UnsupportedMultipartUploaderException(final String message) {
super(message);
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.Serializable;
import java.nio.ByteBuffer;
/**
* Opaque, serializable reference to an uploadId for multipart uploads.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface UploadHandle extends Serializable {
/**
* @return Serialized from in bytes.
*/
default byte[] toByteArray() {
ByteBuffer bb = bytes();
byte[] ret = new byte[bb.remaining()];
bb.get(ret);
return ret;
}
ByteBuffer bytes();
@Override
boolean equals(Object other);
}

View File

@ -68,3 +68,11 @@ message FileStatusProto {
optional bytes ec_data = 17; optional bytes ec_data = 17;
optional uint32 flags = 18 [default = 0]; optional uint32 flags = 18 [default = 0];
} }
/**
* Placeholder type for consistent basic FileSystem operations.
*/
message LocalFileSystemPathHandleProto {
optional uint64 mtime = 1;
optional string path = 2;
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.fs.FileSystemMultipartUploader$Factory

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public abstract class AbstractSystemMultipartUploaderTest {
abstract FileSystem getFS() throws IOException;
abstract Path getBaseTestPath();
@Test
public void testMultipartUpload() throws Exception {
FileSystem fs = getFS();
Path file = new Path(getBaseTestPath(), "some-file");
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
UploadHandle uploadHandle = mpu.initialize(file);
List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
StringBuilder sb = new StringBuilder();
for (int i = 1; i <= 100; ++i) {
String contents = "ThisIsPart" + i + "\n";
sb.append(contents);
int len = contents.getBytes().length;
InputStream is = IOUtils.toInputStream(contents, "UTF-8");
PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
partHandles.add(Pair.of(i, partHandle));
}
PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
byte[] fdData = IOUtils.toByteArray(fs.open(fd));
byte[] fileData = IOUtils.toByteArray(fs.open(file));
String readString = new String(fdData);
assertEquals(sb.toString(), readString);
assertArrayEquals(fdData, fileData);
}
@Test
public void testMultipartUploadReverseOrder() throws Exception {
FileSystem fs = getFS();
Path file = new Path(getBaseTestPath(), "some-file");
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
UploadHandle uploadHandle = mpu.initialize(file);
List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
StringBuilder sb = new StringBuilder();
for (int i = 1; i <= 100; ++i) {
String contents = "ThisIsPart" + i + "\n";
sb.append(contents);
}
for (int i = 100; i > 0; --i) {
String contents = "ThisIsPart" + i + "\n";
int len = contents.getBytes().length;
InputStream is = IOUtils.toInputStream(contents, "UTF-8");
PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
partHandles.add(Pair.of(i, partHandle));
}
PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
byte[] fdData = IOUtils.toByteArray(fs.open(fd));
byte[] fileData = IOUtils.toByteArray(fs.open(file));
String readString = new String(fdData);
assertEquals(sb.toString(), readString);
assertArrayEquals(fdData, fileData);
}
@Test
public void testMultipartUploadReverseOrderNoNContiguousPartNumbers()
throws Exception {
FileSystem fs = getFS();
Path file = new Path(getBaseTestPath(), "some-file");
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
UploadHandle uploadHandle = mpu.initialize(file);
List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
StringBuilder sb = new StringBuilder();
for (int i = 2; i <= 200; i += 2) {
String contents = "ThisIsPart" + i + "\n";
sb.append(contents);
}
for (int i = 200; i > 0; i -= 2) {
String contents = "ThisIsPart" + i + "\n";
int len = contents.getBytes().length;
InputStream is = IOUtils.toInputStream(contents, "UTF-8");
PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
partHandles.add(Pair.of(i, partHandle));
}
PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
byte[] fdData = IOUtils.toByteArray(fs.open(fd));
byte[] fileData = IOUtils.toByteArray(fs.open(file));
String readString = new String(fdData);
assertEquals(sb.toString(), readString);
assertArrayEquals(fdData, fileData);
}
@Test
public void testMultipartUploadAbort() throws Exception {
FileSystem fs = getFS();
Path file = new Path(getBaseTestPath(), "some-file");
MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
UploadHandle uploadHandle = mpu.initialize(file);
for (int i = 100; i >= 50; --i) {
String contents = "ThisIsPart" + i + "\n";
int len = contents.getBytes().length;
InputStream is = IOUtils.toInputStream(contents, "UTF-8");
PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
}
mpu.abort(file, uploadHandle);
String contents = "ThisIsPart49\n";
int len = contents.getBytes().length;
InputStream is = IOUtils.toInputStream(contents, "UTF-8");
try {
mpu.putPart(file, is, 49, uploadHandle, len);
fail("putPart should have thrown an exception");
} catch (IOException ok) {
// ignore
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.test.GenericTestUtils.getRandomizedTestDir;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.File;
import java.io.IOException;
/**
* Test the FileSystemMultipartUploader on local file system.
*/
public class TestLocalFileSystemMultipartUploader
extends AbstractSystemMultipartUploaderTest {
private static FileSystem fs;
private File tmp;
@BeforeClass
public static void init() throws IOException {
fs = LocalFileSystem.getLocal(new Configuration());
}
@Before
public void setup() throws IOException {
tmp = getRandomizedTestDir();
tmp.mkdirs();
}
@After
public void tearDown() throws IOException {
tmp.delete();
}
@Override
public FileSystem getFS() {
return fs;
}
@Override
public Path getBaseTestPath() {
return new Path(tmp.getAbsolutePath());
}
}

View File

@ -123,6 +123,12 @@ public void testChanged() throws IOException {
HandleOpt.Data data = HandleOpt.getOpt(HandleOpt.Data.class, opts) HandleOpt.Data data = HandleOpt.getOpt(HandleOpt.Data.class, opts)
.orElseThrow(IllegalArgumentException::new); .orElseThrow(IllegalArgumentException::new);
FileStatus stat = testFile(B1); FileStatus stat = testFile(B1);
try {
// Temporary workaround while RawLocalFS supports only second precision
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new IOException(e);
}
// modify the file by appending data // modify the file by appending data
appendFile(getFileSystem(), stat.getPath(), B2); appendFile(getFileSystem(), stat.getPath(), B2);
byte[] b12 = Arrays.copyOf(B1, B1.length + B2.length); byte[] b12 = Arrays.copyOf(B1, B1.length + B2.length);

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.rawlocal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.contract.AbstractContractPathHandleTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.localfs.LocalFSContract;
import org.apache.hadoop.fs.contract.rawlocal.RawlocalFSContract;
public class TestRawlocalContractPathHandle
extends AbstractContractPathHandleTest {
public TestRawlocalContractPathHandle(String testname,
Options.HandleOpt[] opts, boolean serialized) {
super(testname, opts, serialized);
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RawlocalFSContract(conf);
}
}

View File

@ -122,4 +122,9 @@
<value>true</value> <value>true</value>
</property> </property>
<property>
<name>fs.contract.supports-content-check</name>
<value>true</value>
</property>
</configuration> </configuration>

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemMultipartUploader;
import org.apache.hadoop.fs.MultipartUploader;
import org.apache.hadoop.fs.MultipartUploaderFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
/**
* Support for HDFS multipart uploads, built on
* {@link FileSystem#concat(Path, Path[])}.
*/
public class DFSMultipartUploaderFactory extends MultipartUploaderFactory {
protected MultipartUploader createMultipartUploader(FileSystem fs,
Configuration conf) {
if (fs.getScheme().equals(HdfsConstants.HDFS_URI_SCHEME)) {
return new FileSystemMultipartUploader(fs);
}
return null;
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.hdfs.DFSMultipartUploaderFactory

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import java.io.IOException;
public class TestHDFSMultipartUploader
extends AbstractSystemMultipartUploaderTest {
private static MiniDFSCluster cluster;
private Path tmp;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void init() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf,
GenericTestUtils.getRandomizedTestDir())
.numDataNodes(1)
.build();
cluster.waitClusterUp();
}
@AfterClass
public static void cleanup() throws IOException {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Before
public void setup() throws IOException {
tmp = new Path(cluster.getFileSystem().getWorkingDirectory(),
name.getMethodName());
cluster.getFileSystem().mkdirs(tmp);
}
@Override
public FileSystem getFS() throws IOException {
return cluster.getFileSystem();
}
@Override
public Path getBaseTestPath() {
return tmp;
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.google.common.base.Charsets;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BBPartHandle;
import org.apache.hadoop.fs.BBUploadHandle;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.MultipartUploader;
import org.apache.hadoop.fs.MultipartUploaderFactory;
import org.apache.hadoop.fs.PartHandle;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.UploadHandle;
import org.apache.hadoop.hdfs.DFSUtilClient;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;
/**
* MultipartUploader for S3AFileSystem. This uses the S3 multipart
* upload mechanism.
*/
public class S3AMultipartUploader extends MultipartUploader {
private final S3AFileSystem s3a;
public S3AMultipartUploader(FileSystem fs, Configuration conf) {
if (!(fs instanceof S3AFileSystem)) {
throw new IllegalArgumentException(
"S3A MultipartUploads must use S3AFileSystem");
}
s3a = (S3AFileSystem) fs;
}
@Override
public UploadHandle initialize(Path filePath) throws IOException {
String key = s3a.pathToKey(filePath);
InitiateMultipartUploadRequest request =
new InitiateMultipartUploadRequest(s3a.getBucket(), key);
LOG.debug("initialize request: {}", request);
InitiateMultipartUploadResult result = s3a.initiateMultipartUpload(request);
String uploadId = result.getUploadId();
return BBUploadHandle.from(ByteBuffer.wrap(
uploadId.getBytes(Charsets.UTF_8)));
}
@Override
public PartHandle putPart(Path filePath, InputStream inputStream,
int partNumber, UploadHandle uploadId, long lengthInBytes) {
String key = s3a.pathToKey(filePath);
UploadPartRequest request = new UploadPartRequest();
byte[] uploadIdBytes = uploadId.toByteArray();
request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length,
Charsets.UTF_8));
request.setInputStream(inputStream);
request.setPartSize(lengthInBytes);
request.setPartNumber(partNumber);
request.setBucketName(s3a.getBucket());
request.setKey(key);
LOG.debug("putPart request: {}", request);
UploadPartResult result = s3a.uploadPart(request);
String eTag = result.getETag();
return BBPartHandle.from(ByteBuffer.wrap(eTag.getBytes(Charsets.UTF_8)));
}
@Override
public PathHandle complete(Path filePath,
List<Pair<Integer, PartHandle>> handles, UploadHandle uploadId) {
String key = s3a.pathToKey(filePath);
CompleteMultipartUploadRequest request =
new CompleteMultipartUploadRequest();
request.setBucketName(s3a.getBucket());
request.setKey(key);
byte[] uploadIdBytes = uploadId.toByteArray();
request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length,
Charsets.UTF_8));
List<PartETag> eTags = handles
.stream()
.map(handle -> {
byte[] partEtagBytes = handle.getRight().toByteArray();
return new PartETag(handle.getLeft(),
new String(partEtagBytes, 0, partEtagBytes.length,
Charsets.UTF_8));
})
.collect(Collectors.toList());
request.setPartETags(eTags);
LOG.debug("Complete request: {}", request);
CompleteMultipartUploadResult completeMultipartUploadResult =
s3a.getAmazonS3Client().completeMultipartUpload(request);
byte[] eTag = DFSUtilClient.string2Bytes(
completeMultipartUploadResult.getETag());
return (PathHandle) () -> ByteBuffer.wrap(eTag);
}
@Override
public void abort(Path filePath, UploadHandle uploadId) {
String key = s3a.pathToKey(filePath);
byte[] uploadIdBytes = uploadId.toByteArray();
String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
Charsets.UTF_8);
AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(s3a
.getBucket(), key, uploadIdString);
LOG.debug("Abort request: {}", request);
s3a.getAmazonS3Client().abortMultipartUpload(request);
}
/**
* Factory for creating MultipartUploader objects for s3a:// FileSystems.
*/
public static class Factory extends MultipartUploaderFactory {
@Override
protected MultipartUploader createMultipartUploader(FileSystem fs,
Configuration conf) {
if (fs.getScheme().equals("s3a")) {
return new S3AMultipartUploader(fs, conf);
}
return null;
}
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.fs.s3a.S3AMultipartUploader$Factory

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.fs.s3a.S3AMultipartUploader