HDFS-4456. Add concat to HttpFS and WebHDFS REST API docs. (plamenj2003 via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1441606 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-02-01 19:46:14 +00:00
parent a4abf4f573
commit 1d068dee85
8 changed files with 189 additions and 7 deletions

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.fs.http.client; package org.apache.hadoop.fs.http.client;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
@ -86,6 +88,7 @@ public class HttpFSFileSystem extends FileSystem
public static final String PERMISSION_PARAM = "permission"; public static final String PERMISSION_PARAM = "permission";
public static final String DESTINATION_PARAM = "destination"; public static final String DESTINATION_PARAM = "destination";
public static final String RECURSIVE_PARAM = "recursive"; public static final String RECURSIVE_PARAM = "recursive";
public static final String SOURCES_PARAM = "sources";
public static final String OWNER_PARAM = "owner"; public static final String OWNER_PARAM = "owner";
public static final String GROUP_PARAM = "group"; public static final String GROUP_PARAM = "group";
public static final String MODIFICATION_TIME_PARAM = "modificationtime"; public static final String MODIFICATION_TIME_PARAM = "modificationtime";
@ -167,7 +170,7 @@ public class HttpFSFileSystem extends FileSystem
GETHOMEDIRECTORY(HTTP_GET), GETCONTENTSUMMARY(HTTP_GET), GETHOMEDIRECTORY(HTTP_GET), GETCONTENTSUMMARY(HTTP_GET),
GETFILECHECKSUM(HTTP_GET), GETFILEBLOCKLOCATIONS(HTTP_GET), GETFILECHECKSUM(HTTP_GET), GETFILEBLOCKLOCATIONS(HTTP_GET),
INSTRUMENTATION(HTTP_GET), INSTRUMENTATION(HTTP_GET),
APPEND(HTTP_POST), APPEND(HTTP_POST), CONCAT(HTTP_POST),
CREATE(HTTP_PUT), MKDIRS(HTTP_PUT), RENAME(HTTP_PUT), SETOWNER(HTTP_PUT), CREATE(HTTP_PUT), MKDIRS(HTTP_PUT), RENAME(HTTP_PUT), SETOWNER(HTTP_PUT),
SETPERMISSION(HTTP_PUT), SETREPLICATION(HTTP_PUT), SETTIMES(HTTP_PUT), SETPERMISSION(HTTP_PUT), SETREPLICATION(HTTP_PUT), SETTIMES(HTTP_PUT),
DELETE(HTTP_DELETE); DELETE(HTTP_DELETE);
@ -528,6 +531,29 @@ public class HttpFSFileSystem extends FileSystem
HttpURLConnection.HTTP_OK); HttpURLConnection.HTTP_OK);
} }
/**
* Concat existing files together.
* @param f the path to the target destination.
* @param psrcs the paths to the sources to use for the concatenation.
*
* @throws IOException
*/
@Override
public void concat(Path f, Path[] psrcs) throws IOException {
List<String> strPaths = new ArrayList<String>(psrcs.length);
for(Path psrc : psrcs) {
strPaths.add(psrc.toUri().getPath());
}
String srcs = StringUtils.join(",", strPaths);
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.CONCAT.toString());
params.put(SOURCES_PARAM, srcs);
HttpURLConnection conn = getConnection(Operation.CONCAT.getMethod(),
params, f, true);
HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
}
/** /**
* Renames Path src to Path dst. Can take place on local fs * Renames Path src to Path dst. Can take place on local fs
* or remote DFS. * or remote DFS.

View File

@ -198,6 +198,47 @@ public class FSOperations {
} }
/**
* Executor that performs an append FileSystemAccess files system operation.
*/
@InterfaceAudience.Private
public static class FSConcat implements FileSystemAccess.FileSystemExecutor<Void> {
private Path path;
private Path[] sources;
/**
* Creates a Concat executor.
*
* @param path target path to concat to.
* @param sources comma seperated absolute paths to use as sources.
*/
public FSConcat(String path, String[] sources) {
this.sources = new Path[sources.length];
for(int i = 0; i < sources.length; i++) {
this.sources[i] = new Path(sources[i]);
}
this.path = new Path(path);
}
/**
* Executes the filesystem operation.
*
* @param fs filesystem instance to use.
*
* @return void.
*
* @throws IOException thrown if an IO error occured.
*/
@Override
public Void execute(FileSystem fs) throws IOException {
fs.concat(path, sources);
return null;
}
}
/** /**
* Executor that performs a content-summary FileSystemAccess files system operation. * Executor that performs a content-summary FileSystemAccess files system operation.
*/ */

View File

@ -58,6 +58,7 @@ public class HttpFSParametersProvider extends ParametersProvider {
PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{DoAsParam.class}); PARAMS_DEF.put(Operation.INSTRUMENTATION, new Class[]{DoAsParam.class});
PARAMS_DEF.put(Operation.APPEND, PARAMS_DEF.put(Operation.APPEND,
new Class[]{DoAsParam.class, DataParam.class}); new Class[]{DoAsParam.class, DataParam.class});
PARAMS_DEF.put(Operation.CONCAT, new Class[]{SourcesParam.class});
PARAMS_DEF.put(Operation.CREATE, PARAMS_DEF.put(Operation.CREATE,
new Class[]{DoAsParam.class, PermissionParam.class, OverwriteParam.class, new Class[]{DoAsParam.class, PermissionParam.class, OverwriteParam.class,
ReplicationParam.class, BlockSizeParam.class, DataParam.class}); ReplicationParam.class, BlockSizeParam.class, DataParam.class});
@ -388,6 +389,25 @@ public class HttpFSParametersProvider extends ParametersProvider {
} }
} }
/**
* Class for concat sources parameter.
*/
@InterfaceAudience.Private
public static class SourcesParam extends StringParam {
/**
* Parameter name.
*/
public static final String NAME = HttpFSFileSystem.SOURCES_PARAM;
/**
* Constructor.
*/
public SourcesParam() {
super(NAME, null);
}
}
/** /**
* Class for to-path parameter. * Class for to-path parameter.
*/ */

View File

@ -22,22 +22,23 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem; import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OperationParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AccessTimeParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AccessTimeParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DataParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DataParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.RecursiveParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DestinationParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DoAsParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DoAsParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.FilterParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.FilterParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.GroupParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.GroupParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.LenParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.LenParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ModifiedTimeParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ModifiedTimeParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OffsetParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OffsetParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OperationParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OverwriteParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OverwriteParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OwnerParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.OwnerParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.PermissionParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.PermissionParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.RecursiveParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ReplicationParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.ReplicationParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.DestinationParam; import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.SourcesParam;
import org.apache.hadoop.lib.service.FileSystemAccess; import org.apache.hadoop.lib.service.FileSystemAccess;
import org.apache.hadoop.lib.service.FileSystemAccessException; import org.apache.hadoop.lib.service.FileSystemAccessException;
import org.apache.hadoop.lib.service.Groups; import org.apache.hadoop.lib.service.Groups;
@ -403,9 +404,9 @@ public class HttpFSServer {
Response response; Response response;
path = makeAbsolute(path); path = makeAbsolute(path);
MDC.put(HttpFSFileSystem.OP_PARAM, op.value().name()); MDC.put(HttpFSFileSystem.OP_PARAM, op.value().name());
String doAs = params.get(DoAsParam.NAME, DoAsParam.class);
switch (op.value()) { switch (op.value()) {
case APPEND: { case APPEND: {
String doAs = params.get(DoAsParam.NAME, DoAsParam.class);
Boolean hasData = params.get(DataParam.NAME, DataParam.class); Boolean hasData = params.get(DataParam.NAME, DataParam.class);
if (!hasData) { if (!hasData) {
response = Response.temporaryRedirect( response = Response.temporaryRedirect(
@ -420,6 +421,18 @@ public class HttpFSServer {
} }
break; break;
} }
case CONCAT: {
System.out.println("HTTPFS SERVER CONCAT");
String sources = params.get(SourcesParam.NAME, SourcesParam.class);
FSOperations.FSConcat command =
new FSOperations.FSConcat(path, sources.split(","));
fsExecute(user, null, command);
AUDIT_LOG.info("[{}]", path);
System.out.println("SENT RESPONSE");
response = Response.ok().build();
break;
}
default: { default: {
throw new IOException( throw new IOException(
MessageFormat.format("Invalid HTTP POST operation [{0}]", MessageFormat.format("Invalid HTTP POST operation [{0}]",

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp; import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.HFSTestCase; import org.apache.hadoop.test.HFSTestCase;
import org.apache.hadoop.test.HadoopUsersConfTestHelper; import org.apache.hadoop.test.HadoopUsersConfTestHelper;
@ -206,6 +208,30 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
} }
} }
private void testConcat() throws Exception {
Configuration config = getProxiedFSConf();
config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
if (!isLocalFS()) {
FileSystem fs = FileSystem.get(config);
fs.mkdirs(getProxiedFSTestDir());
Path path1 = new Path("/test/foo.txt");
Path path2 = new Path("/test/bar.txt");
Path path3 = new Path("/test/derp.txt");
DFSTestUtil.createFile(fs, path1, 1024, (short) 3, 0);
DFSTestUtil.createFile(fs, path2, 1024, (short) 3, 0);
DFSTestUtil.createFile(fs, path3, 1024, (short) 3, 0);
fs.close();
fs = getHttpFSFileSystem();
fs.concat(path1, new Path[]{path2, path3});
fs.close();
fs = FileSystem.get(config);
Assert.assertTrue(fs.exists(path1));
Assert.assertFalse(fs.exists(path2));
Assert.assertFalse(fs.exists(path3));
fs.close();
}
}
private void testRename() throws Exception { private void testRename() throws Exception {
FileSystem fs = FileSystem.get(getProxiedFSConf()); FileSystem fs = FileSystem.get(getProxiedFSConf());
Path path = new Path(getProxiedFSTestDir(), "foo"); Path path = new Path(getProxiedFSTestDir(), "foo");
@ -450,7 +476,7 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
} }
protected enum Operation { protected enum Operation {
GET, OPEN, CREATE, APPEND, RENAME, DELETE, LIST_STATUS, WORKING_DIRECTORY, MKDIRS, GET, OPEN, CREATE, APPEND, CONCAT, RENAME, DELETE, LIST_STATUS, WORKING_DIRECTORY, MKDIRS,
SET_TIMES, SET_PERMISSION, SET_OWNER, SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY SET_TIMES, SET_PERMISSION, SET_OWNER, SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY
} }
@ -468,6 +494,8 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
case APPEND: case APPEND:
testAppend(); testAppend();
break; break;
case CONCAT:
testConcat();
case RENAME: case RENAME:
testRename(); testRename();
break; break;

View File

@ -216,6 +216,8 @@ Release 2.0.3-alpha - Unreleased
HDFS-3598. WebHDFS support for file concat. (Plamen Jeliazkov via shv) HDFS-3598. WebHDFS support for file concat. (Plamen Jeliazkov via shv)
HDFS-4456. Add concat to HttpFS and WebHDFS REST API docs. (plamenj2003 via tucu)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-3429. DataNode reads checksums even if client does not need them (todd) HDFS-3429. DataNode reads checksums even if client does not need them (todd)

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.web.resources;
/** The concat source paths parameter. */ /** The concat source paths parameter. */
public class ConcatSourcesParam extends StringParam { public class ConcatSourcesParam extends StringParam {
/** Parameter name. */ /** Parameter name. */
public static final String NAME = "srcs"; public static final String NAME = "sources";
public static final String DEFAULT = NULL; public static final String DEFAULT = NULL;

View File

@ -109,6 +109,9 @@ WebHDFS REST API
* {{{Append to a File}<<<APPEND>>>}} * {{{Append to a File}<<<APPEND>>>}}
(see {{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.append) (see {{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.append)
* {{{Concat File(s)}<<<CONCAT>>>}}
(see {{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.concat)
* HTTP DELETE * HTTP DELETE
* {{{Delete a File/Directory}<<<DELETE>>>}} * {{{Delete a File/Directory}<<<DELETE>>>}}
@ -299,6 +302,32 @@ Content-Length: 0
{{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.append {{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.append
** {Concat File(s)}
* Submit a HTTP POST request.
+---------------------------------
curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CONCAT&sources=<SOURCES>"
+---------------------------------
The client receives a response with zero content length:
+---------------------------------
HTTP/1.1 200 OK
Content-Length: 0
+---------------------------------
[]
This REST API call is available as of Hadoop version 2.0.3.
Please note that <SOURCES> is a comma seperated list of absolute paths.
(Example: sources=/test/file1,/test/file2,/test/file3)
See also:
{{{Sources}<<<sources>>>}},
{{{../../api/org/apache/hadoop/fs/FileSystem.html}FileSystem}}.concat
** {Open and Read a File} ** {Open and Read a File}
* Submit a HTTP GET request with automatically following redirects. * Submit a HTTP GET request with automatically following redirects.
@ -1727,6 +1756,29 @@ var tokenProperties =
{{{Set Replication Factor}<<<SETREPLICATION>>>}} {{{Set Replication Factor}<<<SETREPLICATION>>>}}
** {Sources}
*----------------+-------------------------------------------------------------------+
|| Name | <<<sources>>> |
*----------------+-------------------------------------------------------------------+
|| Description | The comma seperated absolute paths used for concatenation. |
*----------------+-------------------------------------------------------------------+
|| Type | String |
*----------------+-------------------------------------------------------------------+
|| Default Value | \<empty\> |
*----------------+-------------------------------------------------------------------+
|| Valid Values | A list of comma seperated absolute FileSystem paths without scheme and authority. |
*----------------+-------------------------------------------------------------------+
|| Syntax | See the note in {{Delegation}}. |
*----------------+-------------------------------------------------------------------+
<<Note>> that sources are absolute FileSystem paths.
See also:
{{{Concat File(s)}<<<CONCAT>>>}}
** {Token} ** {Token}
*----------------+-------------------------------------------------------------------+ *----------------+-------------------------------------------------------------------+