HADOOP-11490. Expose truncate API via FileSystem and shell command. Contributed by Milan Desai.
This commit is contained in:
parent
0742591335
commit
a0521bc83a
|
@ -19,6 +19,9 @@ Trunk (Unreleased)
|
|||
|
||||
HADOOP-11353. Add support for .hadooprc (aw)
|
||||
|
||||
HADOOP-11490. Expose truncate API via FileSystem and shell command.
|
||||
(Milan Desai via shv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution
|
||||
|
|
|
@ -352,6 +352,11 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
|
|||
throw new IOException("Not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean truncate(Path f, long newLength) throws IOException {
|
||||
throw new IOException("Not supported");
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculated the length of the checksum file in bytes.
|
||||
* @param size the length of the data file in bytes
|
||||
|
|
|
@ -1318,6 +1318,29 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Truncate the file in the indicated path to the indicated size.
|
||||
* <ul>
|
||||
* <li>Fails if path is a directory.
|
||||
* <li>Fails if path does not exist.
|
||||
* <li>Fails if path is not closed.
|
||||
* <li>Fails if new size is greater than current size.
|
||||
* </ul>
|
||||
* @param f The path to the file to be truncated
|
||||
* @param newLength The size the file is to be truncated to
|
||||
*
|
||||
* @return <code>true</code> if the file has been truncated to the desired
|
||||
* <code>newLength</code> and is immediately available to be reused for
|
||||
* write operations such as <code>append</code>, or
|
||||
* <code>false</code> if a background process of adjusting the length of
|
||||
* the last block has been started, and clients should wait for it to
|
||||
* complete before proceeding with further file updates.
|
||||
*/
|
||||
public boolean truncate(Path f, long newLength) throws IOException {
|
||||
throw new UnsupportedOperationException("Not implemented by the " +
|
||||
getClass().getSimpleName() + " FileSystem implementation");
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a file
|
||||
* @deprecated Use {@link #delete(Path, boolean)} instead.
|
||||
|
|
|
@ -226,6 +226,11 @@ public class FilterFileSystem extends FileSystem {
|
|||
return fs.rename(src, dst);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean truncate(Path f, final long newLength) throws IOException {
|
||||
return fs.truncate(f, newLength);
|
||||
}
|
||||
|
||||
/** Delete a file */
|
||||
@Override
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
|
|
|
@ -757,6 +757,14 @@ public class HarFileSystem extends FileSystem {
|
|||
throw new IOException("Har: append not allowed");
|
||||
}
|
||||
|
||||
/**
|
||||
* Not implemented.
|
||||
*/
|
||||
@Override
|
||||
public boolean truncate(Path f, long newLength) throws IOException {
|
||||
throw new IOException("Har: truncate not allowed");
|
||||
}
|
||||
|
||||
/**
|
||||
* Not implemented.
|
||||
*/
|
||||
|
|
|
@ -372,6 +372,31 @@ public class RawLocalFileSystem extends FileSystem {
|
|||
return FileUtil.copy(this, src, this, dst, true, getConf());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean truncate(Path f, final long newLength) throws IOException {
|
||||
FileStatus status = getFileStatus(f);
|
||||
if(status == null) {
|
||||
throw new FileNotFoundException("File " + f + " not found");
|
||||
}
|
||||
if(status.isDirectory()) {
|
||||
throw new IOException("Cannot truncate a directory (=" + f + ")");
|
||||
}
|
||||
long oldLength = status.getLen();
|
||||
if(newLength > oldLength) {
|
||||
throw new IllegalArgumentException(
|
||||
"Cannot truncate to a larger file size. Current size: " + oldLength +
|
||||
", truncate size: " + newLength + ".");
|
||||
}
|
||||
try (FileOutputStream out = new FileOutputStream(pathToFile(f), true)) {
|
||||
try {
|
||||
out.getChannel().truncate(newLength);
|
||||
} catch(IOException e) {
|
||||
throw new FSError(e);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the given path to a file or directory.
|
||||
* @param p the path to delete
|
||||
|
|
|
@ -60,6 +60,7 @@ abstract public class FsCommand extends Command {
|
|||
factory.registerCommands(Tail.class);
|
||||
factory.registerCommands(Test.class);
|
||||
factory.registerCommands(Touch.class);
|
||||
factory.registerCommands(Truncate.class);
|
||||
factory.registerCommands(SnapshotCommands.class);
|
||||
factory.registerCommands(XAttrCommands.class);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.shell;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.PathIsDirectoryException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Truncates a file to a new size
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class Truncate extends FsCommand {
|
||||
public static void registerCommands(CommandFactory factory) {
|
||||
factory.addClass(Truncate.class, "-truncate");
|
||||
}
|
||||
|
||||
public static final String NAME = "truncate";
|
||||
public static final String USAGE = "[-w] <length> <path> ...";
|
||||
public static final String DESCRIPTION =
|
||||
"Truncate all files that match the specified file pattern to the " +
|
||||
"specified length.\n" +
|
||||
"-w: Requests that the command wait for block recovery to complete, " +
|
||||
"if necessary.";
|
||||
|
||||
protected long newLength = -1;
|
||||
protected List<PathData> waitList = new LinkedList<>();
|
||||
protected boolean waitOpt = false;
|
||||
|
||||
@Override
|
||||
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||
CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "w");
|
||||
cf.parse(args);
|
||||
waitOpt = cf.getOpt("w");
|
||||
|
||||
try {
|
||||
newLength = Long.parseLong(args.removeFirst());
|
||||
} catch(NumberFormatException nfe) {
|
||||
displayWarning("Illegal length, a non-negative integer expected");
|
||||
throw nfe;
|
||||
}
|
||||
if(newLength < 0) {
|
||||
throw new IllegalArgumentException("length must be >= 0");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processArguments(LinkedList<PathData> args)
|
||||
throws IOException {
|
||||
super.processArguments(args);
|
||||
if (waitOpt) waitForRecovery();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processPath(PathData item) throws IOException {
|
||||
if(item.stat.isDirectory()) {
|
||||
throw new PathIsDirectoryException(item.toString());
|
||||
}
|
||||
long oldLength = item.stat.getLen();
|
||||
if(newLength > oldLength) {
|
||||
throw new IllegalArgumentException(
|
||||
"Cannot truncate to a larger file size. Current size: " + oldLength +
|
||||
", truncate size: " + newLength + ".");
|
||||
}
|
||||
if(item.fs.truncate(item.path, newLength)) {
|
||||
out.println("Truncated " + item + " to length: " + newLength);
|
||||
}
|
||||
else if(waitOpt) {
|
||||
waitList.add(item);
|
||||
}
|
||||
else {
|
||||
out.println("Truncating " + item + " to length: " + newLength + ". " +
|
||||
"Wait for block recovery to complete before further updating this " +
|
||||
"file.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for all files in waitList to have length equal to newLength.
|
||||
*/
|
||||
private void waitForRecovery() throws IOException {
|
||||
for(PathData item : waitList) {
|
||||
out.println("Waiting for " + item + " ...");
|
||||
out.flush();
|
||||
|
||||
for(;;) {
|
||||
item.refreshStatus();
|
||||
if(item.stat.getLen() == newLength) break;
|
||||
try {Thread.sleep(1000);} catch(InterruptedException ignored) {}
|
||||
}
|
||||
|
||||
out.println("Truncated " + item + " to length: " + newLength);
|
||||
out.flush();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -447,6 +447,14 @@ public class ViewFileSystem extends FileSystem {
|
|||
resDst.remainingPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean truncate(final Path f, final long newLength)
|
||||
throws IOException {
|
||||
InodeTree.ResolveResult<FileSystem> res =
|
||||
fsState.resolve(getUriPath(f), true);
|
||||
return res.targetFileSystem.truncate(f, newLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOwner(final Path f, final String username,
|
||||
final String groupname) throws AccessControlException,
|
||||
|
@ -833,6 +841,11 @@ public class ViewFileSystem extends FileSystem {
|
|||
throw readOnlyMountTable("rename", src);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean truncate(Path f, long newLength) throws IOException {
|
||||
throw readOnlyMountTable("truncate", f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOwner(Path f, String username, String groupname)
|
||||
throws AccessControlException, IOException {
|
||||
|
|
|
@ -627,14 +627,7 @@ public class DistributedFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Truncate the file in the indicated path to the indicated size.
|
||||
* @param f The path to the file to be truncated
|
||||
* @param newLength The size the file is to be truncated to
|
||||
*
|
||||
* @return true if and client does not need to wait for block recovery,
|
||||
* false if client needs to wait for block recovery.
|
||||
*/
|
||||
@Override
|
||||
public boolean truncate(Path f, final long newLength) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
return dfs.truncate(getPathName(f), newLength);
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FsShell;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -55,6 +56,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -676,6 +678,73 @@ public class TestFileTruncate {
|
|||
fs.delete(parent, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncateShellCommand() throws Exception {
|
||||
final Path parent = new Path("/test");
|
||||
final Path src = new Path("/test/testTruncateShellCommand");
|
||||
final int oldLength = 2*BLOCK_SIZE + 1;
|
||||
final int newLength = BLOCK_SIZE + 1;
|
||||
|
||||
String[] argv =
|
||||
new String[]{"-truncate", String.valueOf(newLength), src.toString()};
|
||||
runTruncateShellCommand(src, oldLength, argv);
|
||||
|
||||
// wait for block recovery
|
||||
checkBlockRecovery(src);
|
||||
assertThat(fs.getFileStatus(src).getLen(), is((long) newLength));
|
||||
fs.delete(parent, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncateShellCommandOnBlockBoundary() throws Exception {
|
||||
final Path parent = new Path("/test");
|
||||
final Path src = new Path("/test/testTruncateShellCommandOnBoundary");
|
||||
final int oldLength = 2 * BLOCK_SIZE;
|
||||
final int newLength = BLOCK_SIZE;
|
||||
|
||||
String[] argv =
|
||||
new String[]{"-truncate", String.valueOf(newLength), src.toString()};
|
||||
runTruncateShellCommand(src, oldLength, argv);
|
||||
|
||||
// shouldn't need to wait for block recovery
|
||||
assertThat(fs.getFileStatus(src).getLen(), is((long) newLength));
|
||||
fs.delete(parent, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTruncateShellCommandWithWaitOption() throws Exception {
|
||||
final Path parent = new Path("/test");
|
||||
final Path src = new Path("/test/testTruncateShellCommandWithWaitOption");
|
||||
final int oldLength = 2 * BLOCK_SIZE + 1;
|
||||
final int newLength = BLOCK_SIZE + 1;
|
||||
|
||||
String[] argv = new String[]{"-truncate", "-w", String.valueOf(newLength),
|
||||
src.toString()};
|
||||
runTruncateShellCommand(src, oldLength, argv);
|
||||
|
||||
// shouldn't need to wait for block recovery
|
||||
assertThat(fs.getFileStatus(src).getLen(), is((long) newLength));
|
||||
fs.delete(parent, true);
|
||||
}
|
||||
|
||||
private void runTruncateShellCommand(Path src, int oldLength,
|
||||
String[] shellOpts) throws Exception {
|
||||
// create file and write data
|
||||
writeContents(AppendTestUtil.initBuffer(oldLength), oldLength, src);
|
||||
assertThat(fs.getFileStatus(src).getLen(), is((long)oldLength));
|
||||
|
||||
// truncate file using shell
|
||||
FsShell shell = null;
|
||||
try {
|
||||
shell = new FsShell(conf);
|
||||
assertThat(ToolRunner.run(shell, shellOpts), is(0));
|
||||
} finally {
|
||||
if(shell != null) {
|
||||
shell.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void writeContents(byte[] contents, int fileLength, Path p)
|
||||
throws IOException {
|
||||
FSDataOutputStream out = fs.create(p, true, BLOCK_SIZE, REPLICATION,
|
||||
|
|
Loading…
Reference in New Issue