diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index f722810f1d0..6d484e42ab3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -1128,6 +1128,17 @@ public abstract class FileSystem extends Configured implements Closeable { public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException; + /** + * Concat existing files together. + * @param trg the path to the target destination. + * @param psrcs the paths to the sources to use for the concatenation. + * @throws IOException + */ + public void concat(final Path trg, final Path [] psrcs) throws IOException { + throw new UnsupportedOperationException("Not implemented by the " + + getClass().getSimpleName() + " FileSystem implementation"); + } + /** * Get replication. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 35aa4dc7f68..a2b34f056ba 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -159,6 +159,11 @@ public class FilterFileSystem extends FileSystem { return fs.append(f, bufferSize, progress); } + @Override + public void concat(Path f, Path[] psrcs) throws IOException { + fs.concat(f, psrcs); + } + @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6ddd129a69a..1e9149a9c96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -210,6 +210,8 @@ Release 2.0.3-alpha - Unreleased HDFS-4259. Improve pipeline DN replacement failure message (harsh) + HDFS-3598. WebHDFS support for file concat. (Plamen Jeliazkov via shv) + OPTIMIZATIONS HDFS-3429. DataNode reads checksums even if client does not need them (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 0b6c0b482bd..e6978f2f36f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -310,10 +310,9 @@ public class DistributedFileSystem extends FileSystem { } /** - * THIS IS DFS only operations, it is not part of FileSystem - * move blocks from srcs to trg + * Move blocks from srcs to trg * and delete srcs afterwards - * all blocks should be the same size + * RESTRICTION: all blocks should be the same size * @param trg existing file to append to * @param psrcs list of files (same block size, same replication) * @throws IOException diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index e93f26093d7..0cb4b127115 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.AccessTimeParam; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; +import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam; import org.apache.hadoop.hdfs.web.resources.CreateParentParam; import org.apache.hadoop.hdfs.web.resources.DelegationParam; import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; @@ -483,10 +484,12 @@ public class NamenodeWebHdfsMethods { final DoAsParam doAsUser, @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) final PostOpParam op, + @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT) + final ConcatSourcesParam concatSrcs, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, InterruptedException { - return post(ugi, delegation, username, doAsUser, ROOT, op, bufferSize); + return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize); } /** Handle HTTP POST request. */ @@ -505,11 +508,13 @@ public class NamenodeWebHdfsMethods { @PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) final PostOpParam op, + @QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT) + final ConcatSourcesParam concatSrcs, @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) final BufferSizeParam bufferSize ) throws IOException, InterruptedException { - init(ugi, delegation, username, doAsUser, path, op, bufferSize); + init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize); return ugi.doAs(new PrivilegedExceptionAction() { @Override @@ -517,7 +522,7 @@ public class NamenodeWebHdfsMethods { REMOTE_ADDRESS.set(request.getRemoteAddr()); try { return post(ugi, delegation, username, doAsUser, - path.getAbsolutePath(), op, bufferSize); + path.getAbsolutePath(), op, concatSrcs, bufferSize); } finally { REMOTE_ADDRESS.set(null); } @@ -532,6 +537,7 @@ public class NamenodeWebHdfsMethods { final DoAsParam doAsUser, final String fullpath, final PostOpParam op, + final ConcatSourcesParam concatSrcs, final BufferSizeParam bufferSize ) throws IOException, URISyntaxException { final NameNode namenode = (NameNode)context.getAttribute("name.node"); @@ -543,6 +549,11 @@ public class NamenodeWebHdfsMethods { fullpath, op.getValue(), -1L, -1L, bufferSize); return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build(); } + case CONCAT: + { + namenode.getRpcServer().concat(fullpath, concatSrcs.getAbsolutePaths()); + return Response.ok().build(); + } default: throw new UnsupportedOperationException(op + " is not supported"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index 6c8c29cd0fa..02c147ab1a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -29,7 +29,9 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.StringTokenizer; @@ -65,6 +67,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.web.resources.AccessTimeParam; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; +import org.apache.hadoop.hdfs.web.resources.ConcatSourcesParam; import org.apache.hadoop.hdfs.web.resources.CreateParentParam; import org.apache.hadoop.hdfs.web.resources.DeleteOpParam; import org.apache.hadoop.hdfs.web.resources.DestinationParam; @@ -103,6 +106,7 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; import org.mortbay.util.ajax.JSON; import com.google.common.base.Charsets; @@ -716,6 +720,22 @@ public class WebHdfsFileSystem extends FileSystem }; } + @Override + public void concat(final Path trg, final Path [] psrcs) throws IOException { + statistics.incrementWriteOps(1); + final HttpOpParam.Op op = PostOpParam.Op.CONCAT; + + List strPaths = new ArrayList(psrcs.length); + for(Path psrc : psrcs) { + strPaths.add(psrc.toUri().getPath()); + } + + String srcs = StringUtils.join(",", strPaths); + + ConcatSourcesParam param = new ConcatSourcesParam(srcs); + run(op, trg, param); + } + @Override public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ConcatSourcesParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ConcatSourcesParam.java new file mode 100644 index 00000000000..c29f2329c48 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ConcatSourcesParam.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.web.resources; + +/** The concat source paths parameter. */ +public class ConcatSourcesParam extends StringParam { + /** Parameter name. */ + public static final String NAME = "srcs"; + + public static final String DEFAULT = NULL; + + private static final Domain DOMAIN = new Domain(NAME, null); + + /** + * Constructor. + * @param str a string representation of the parameter value. + */ + public ConcatSourcesParam(String str) { + super(DOMAIN, str); + } + + @Override + public String getName() { + return NAME; + } + + /** @return the absolute path. */ + public final String[] getAbsolutePaths() { + final String[] paths = getValue().split(","); + return paths; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java index 83f2cd19155..4bb5673ab10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java @@ -23,13 +23,17 @@ import java.net.HttpURLConnection; public class PostOpParam extends HttpOpParam { /** Post operations. */ public static enum Op implements HttpOpParam.Op { - APPEND(HttpURLConnection.HTTP_OK), + APPEND(true, HttpURLConnection.HTTP_OK), - NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED); + CONCAT(false, HttpURLConnection.HTTP_OK), + NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); + + final boolean doOutputAndRedirect; final int expectedHttpResponseCode; - Op(final int expectedHttpResponseCode) { + Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode) { + this.doOutputAndRedirect = doOutputAndRedirect; this.expectedHttpResponseCode = expectedHttpResponseCode; } @@ -40,12 +44,12 @@ public class PostOpParam extends HttpOpParam { @Override public boolean getDoOutput() { - return true; + return doOutputAndRedirect; } @Override public boolean getRedirect() { - return true; + return doOutputAndRedirect; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestFSMainOperationsWebHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestFSMainOperationsWebHdfs.java index a2b9653e6ff..28808e049ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestFSMainOperationsWebHdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestFSMainOperationsWebHdfs.java @@ -27,10 +27,12 @@ import java.security.PrivilegedExceptionAction; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSMainOperationsBaseTest; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.web.resources.ExceptionHandler; @@ -55,6 +57,7 @@ public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest { public static void setupCluster() { final Configuration conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); cluster.waitActive(); @@ -96,6 +99,30 @@ public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest { return defaultWorkingDirectory; } + @Test + public void testConcat() throws Exception { + Path[] paths = {new Path("/test/hadoop/file1"), + new Path("/test/hadoop/file2"), + new Path("/test/hadoop/file3")}; + + DFSTestUtil.createFile(fSys, paths[0], 1024, (short) 3, 0); + DFSTestUtil.createFile(fSys, paths[1], 1024, (short) 3, 0); + DFSTestUtil.createFile(fSys, paths[2], 1024, (short) 3, 0); + + Path catPath = new Path("/test/hadoop/catFile"); + DFSTestUtil.createFile(fSys, catPath, 1024, (short) 3, 0); + Assert.assertTrue(exists(fSys, catPath)); + + fSys.concat(catPath, paths); + + Assert.assertFalse(exists(fSys, paths[0])); + Assert.assertFalse(exists(fSys, paths[1])); + Assert.assertFalse(exists(fSys, paths[2])); + + FileStatus fileStatus = fSys.getFileStatus(catPath); + Assert.assertEquals(1024*4, fileStatus.getLen()); + } + @Override @Test public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {