HDFS-7655. Expose truncate API for Web HDFS. (yliu)

This commit is contained in:
yliu 2015-02-05 23:45:06 +08:00
parent 4641196fe0
commit 03f7ed382b
7 changed files with 129 additions and 18 deletions

View File

@ -22,7 +22,6 @@ import java.io.FileNotFoundException;
import java.net.URI; import java.net.URI;
import java.util.Random; import java.util.Random;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -127,28 +126,36 @@ public class FileSystemTestHelper {
*/ */
public static long createFile(FileSystem fSys, Path path, int numBlocks, public static long createFile(FileSystem fSys, Path path, int numBlocks,
int blockSize, short numRepl, boolean createParent) throws IOException { int blockSize, short numRepl, boolean createParent) throws IOException {
FSDataOutputStream out = return createFile(fSys, path, getFileData(numBlocks, blockSize),
fSys.create(path, false, 4096, numRepl, blockSize ); blockSize, numRepl);
}
byte[] data = getFileData(numBlocks, blockSize); public static long createFile(FileSystem fSys, Path path, byte[] data,
out.write(data, 0, data.length); int blockSize, short numRepl) throws IOException {
out.close(); FSDataOutputStream out =
fSys.create(path, false, 4096, numRepl, blockSize);
try {
out.write(data, 0, data.length);
} finally {
out.close();
}
return data.length; return data.length;
} }
public static long createFile(FileSystem fSys, Path path, int numBlocks, public static long createFile(FileSystem fSys, Path path, int numBlocks,
int blockSize, boolean createParent) throws IOException { int blockSize, boolean createParent) throws IOException {
return createFile(fSys, path, numBlocks, blockSize, fSys.getDefaultReplication(path), true); return createFile(fSys, path, numBlocks, blockSize,
fSys.getDefaultReplication(path), true);
} }
public static long createFile(FileSystem fSys, Path path, int numBlocks, public static long createFile(FileSystem fSys, Path path, int numBlocks,
int blockSize) throws IOException { int blockSize) throws IOException {
return createFile(fSys, path, numBlocks, blockSize, true); return createFile(fSys, path, numBlocks, blockSize, true);
} }
public static long createFile(FileSystem fSys, Path path) throws IOException { public static long createFile(FileSystem fSys, Path path) throws IOException {
return createFile(fSys, path, DEFAULT_NUM_BLOCKS, DEFAULT_BLOCK_SIZE, DEFAULT_NUM_REPL, true); return createFile(fSys, path, DEFAULT_NUM_BLOCKS, DEFAULT_BLOCK_SIZE,
DEFAULT_NUM_REPL, true);
} }
public long createFile(FileSystem fSys, String name) throws IOException { public long createFile(FileSystem fSys, String name) throws IOException {

View File

@ -306,6 +306,8 @@ Release 2.7.0 - UNRELEASED
HDFS-6673. Add delimited format support to PB OIV tool. (Eddy Xu via wang) HDFS-6673. Add delimited format support to PB OIV tool. (Eddy Xu via wang)
HDFS-7655. Expose truncate API for Web HDFS. (yliu)
IMPROVEMENTS IMPROVEMENTS
HDFS-7055. Add tracing to DFSInputStream (cmccabe) HDFS-7055. Add tracing to DFSInputStream (cmccabe)

View File

@ -57,7 +57,7 @@ import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@ -614,10 +614,12 @@ public class NamenodeWebHdfsMethods {
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize, final BufferSizeParam bufferSize,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(NewLengthParam.NAME) @DefaultValue(NewLengthParam.DEFAULT)
final NewLengthParam newLength
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs,
bufferSize, excludeDatanodes); bufferSize, excludeDatanodes, newLength);
} }
/** Handle HTTP POST request. */ /** Handle HTTP POST request. */
@ -641,11 +643,13 @@ public class NamenodeWebHdfsMethods {
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize, final BufferSizeParam bufferSize,
@QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT) @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
final ExcludeDatanodesParam excludeDatanodes final ExcludeDatanodesParam excludeDatanodes,
@QueryParam(NewLengthParam.NAME) @DefaultValue(NewLengthParam.DEFAULT)
final NewLengthParam newLength
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize, init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
excludeDatanodes); excludeDatanodes, newLength);
return ugi.doAs(new PrivilegedExceptionAction<Response>() { return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override @Override
@ -653,7 +657,7 @@ public class NamenodeWebHdfsMethods {
try { try {
return post(ugi, delegation, username, doAsUser, return post(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, concatSrcs, bufferSize, path.getAbsolutePath(), op, concatSrcs, bufferSize,
excludeDatanodes); excludeDatanodes, newLength);
} finally { } finally {
reset(); reset();
} }
@ -670,9 +674,11 @@ public class NamenodeWebHdfsMethods {
final PostOpParam op, final PostOpParam op,
final ConcatSourcesParam concatSrcs, final ConcatSourcesParam concatSrcs,
final BufferSizeParam bufferSize, final BufferSizeParam bufferSize,
final ExcludeDatanodesParam excludeDatanodes final ExcludeDatanodesParam excludeDatanodes,
final NewLengthParam newLength
) throws IOException, URISyntaxException { ) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node"); final NameNode namenode = (NameNode)context.getAttribute("name.node");
final NamenodeProtocols np = getRPCServer(namenode);
switch(op.getValue()) { switch(op.getValue()) {
case APPEND: case APPEND:
@ -684,9 +690,17 @@ public class NamenodeWebHdfsMethods {
} }
case CONCAT: case CONCAT:
{ {
getRPCServer(namenode).concat(fullpath, concatSrcs.getAbsolutePaths()); np.concat(fullpath, concatSrcs.getAbsolutePaths());
return Response.ok().build(); return Response.ok().build();
} }
case TRUNCATE:
{
// We treat each rest request as a separate client.
final boolean b = np.truncate(fullpath, newLength.getValue(),
"DFSClient_" + DFSUtil.getSecureRandom().nextLong());
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
default: default:
throw new UnsupportedOperationException(op + " is not supported"); throw new UnsupportedOperationException(op + " is not supported");
} }

View File

@ -1160,6 +1160,14 @@ public class WebHdfsFileSystem extends FileSystem
).run(); ).run();
} }
@Override
public boolean truncate(Path f, long newLength) throws IOException {
statistics.incrementWriteOps(1);
final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE;
return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run();
}
@Override @Override
public boolean delete(Path f, boolean recursive) throws IOException { public boolean delete(Path f, boolean recursive) throws IOException {
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE; final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
/** NewLength parameter. */
public class NewLengthParam extends LongParam {
/** Parameter name. */
public static final String NAME = "newlength";
/** Default parameter value. */
public static final String DEFAULT = NULL;
private static final Domain DOMAIN = new Domain(NAME);
/**
* Constructor.
* @param value the parameter value.
*/
public NewLengthParam(final Long value) {
super(DOMAIN, value, 0L, null);
}
/**
* Constructor.
* @param str a string representation of the parameter value.
*/
public NewLengthParam(final String str) {
this(DOMAIN.parse(str));
}
@Override
public String getName() {
return NAME;
}
}

View File

@ -27,6 +27,8 @@ public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
CONCAT(false, HttpURLConnection.HTTP_OK), CONCAT(false, HttpURLConnection.HTTP_OK),
TRUNCATE(false, HttpURLConnection.HTTP_OK),
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
final boolean doOutputAndRedirect; final boolean doOutputAndRedirect;

View File

@ -29,11 +29,13 @@ import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSMainOperationsBaseTest; import org.apache.hadoop.fs.FSMainOperationsBaseTest;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -136,6 +138,33 @@ public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest {
Assert.assertEquals(1024*4, fileStatus.getLen()); Assert.assertEquals(1024*4, fileStatus.getLen());
} }
@Test
public void testTruncate() throws Exception {
final short repl = 3;
final int blockSize = 1024;
final int numOfBlocks = 2;
Path dir = getTestRootPath(fSys, "test/hadoop");
Path file = getTestRootPath(fSys, "test/hadoop/file");
final byte[] data = getFileData(numOfBlocks, blockSize);
createFile(fSys, file, data, blockSize, repl);
final int newLength = blockSize;
boolean isReady = fSys.truncate(file, newLength);
Assert.assertTrue("Recovery is not expected.", isReady);
FileStatus fileStatus = fSys.getFileStatus(file);
Assert.assertEquals(fileStatus.getLen(), newLength);
AppendTestUtil.checkFullFile(fSys, file, newLength, data, file.toString());
ContentSummary cs = fSys.getContentSummary(dir);
Assert.assertEquals("Bad disk space usage", cs.getSpaceConsumed(),
newLength * repl);
Assert.assertTrue("Deleted", fSys.delete(dir, true));
}
// Test that WebHdfsFileSystem.jsonParse() closes the connection's input // Test that WebHdfsFileSystem.jsonParse() closes the connection's input
// stream. // stream.
// Closing the inputstream in jsonParse will allow WebHDFS to reuse // Closing the inputstream in jsonParse will allow WebHDFS to reuse