HADOOP-11490. Expose truncate API via FileSystem and shell command. Contributed by Milan Desai.
This commit is contained in:
parent
6b22170bee
commit
de66227a57
|
@ -25,6 +25,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
|
|
||||||
HADOOP-8989. hadoop fs -find feature (Jonathan Allen via aw)
|
HADOOP-8989. hadoop fs -find feature (Jonathan Allen via aw)
|
||||||
|
|
||||||
|
HADOOP-11490. Expose truncate API via FileSystem and shell command.
|
||||||
|
(Milan Desai via shv)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HADOOP-11483. HardLink.java should use the jdk7 createLink method (aajisaka)
|
HADOOP-11483. HardLink.java should use the jdk7 createLink method (aajisaka)
|
||||||
|
|
|
@ -352,6 +352,11 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
|
||||||
throw new IOException("Not supported");
|
throw new IOException("Not supported");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean truncate(Path f, long newLength) throws IOException {
|
||||||
|
throw new IOException("Not supported");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calculated the length of the checksum file in bytes.
|
* Calculated the length of the checksum file in bytes.
|
||||||
* @param size the length of the data file in bytes
|
* @param size the length of the data file in bytes
|
||||||
|
|
|
@ -1318,6 +1318,29 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Truncate the file in the indicated path to the indicated size.
|
||||||
|
* <ul>
|
||||||
|
* <li>Fails if path is a directory.
|
||||||
|
* <li>Fails if path does not exist.
|
||||||
|
* <li>Fails if path is not closed.
|
||||||
|
* <li>Fails if new size is greater than current size.
|
||||||
|
* </ul>
|
||||||
|
* @param f The path to the file to be truncated
|
||||||
|
* @param newLength The size the file is to be truncated to
|
||||||
|
*
|
||||||
|
* @return <code>true</code> if the file has been truncated to the desired
|
||||||
|
* <code>newLength</code> and is immediately available to be reused for
|
||||||
|
* write operations such as <code>append</code>, or
|
||||||
|
* <code>false</code> if a background process of adjusting the length of
|
||||||
|
* the last block has been started, and clients should wait for it to
|
||||||
|
* complete before proceeding with further file updates.
|
||||||
|
*/
|
||||||
|
public boolean truncate(Path f, long newLength) throws IOException {
|
||||||
|
throw new UnsupportedOperationException("Not implemented by the " +
|
||||||
|
getClass().getSimpleName() + " FileSystem implementation");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete a file
|
* Delete a file
|
||||||
* @deprecated Use {@link #delete(Path, boolean)} instead.
|
* @deprecated Use {@link #delete(Path, boolean)} instead.
|
||||||
|
|
|
@ -226,6 +226,11 @@ public class FilterFileSystem extends FileSystem {
|
||||||
return fs.rename(src, dst);
|
return fs.rename(src, dst);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean truncate(Path f, final long newLength) throws IOException {
|
||||||
|
return fs.truncate(f, newLength);
|
||||||
|
}
|
||||||
|
|
||||||
/** Delete a file */
|
/** Delete a file */
|
||||||
@Override
|
@Override
|
||||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||||
|
|
|
@ -757,6 +757,14 @@ public class HarFileSystem extends FileSystem {
|
||||||
throw new IOException("Har: append not allowed");
|
throw new IOException("Har: append not allowed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Not implemented.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean truncate(Path f, long newLength) throws IOException {
|
||||||
|
throw new IOException("Har: truncate not allowed");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Not implemented.
|
* Not implemented.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -372,6 +372,31 @@ public class RawLocalFileSystem extends FileSystem {
|
||||||
return FileUtil.copy(this, src, this, dst, true, getConf());
|
return FileUtil.copy(this, src, this, dst, true, getConf());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean truncate(Path f, final long newLength) throws IOException {
|
||||||
|
FileStatus status = getFileStatus(f);
|
||||||
|
if(status == null) {
|
||||||
|
throw new FileNotFoundException("File " + f + " not found");
|
||||||
|
}
|
||||||
|
if(status.isDirectory()) {
|
||||||
|
throw new IOException("Cannot truncate a directory (=" + f + ")");
|
||||||
|
}
|
||||||
|
long oldLength = status.getLen();
|
||||||
|
if(newLength > oldLength) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Cannot truncate to a larger file size. Current size: " + oldLength +
|
||||||
|
", truncate size: " + newLength + ".");
|
||||||
|
}
|
||||||
|
try (FileOutputStream out = new FileOutputStream(pathToFile(f), true)) {
|
||||||
|
try {
|
||||||
|
out.getChannel().truncate(newLength);
|
||||||
|
} catch(IOException e) {
|
||||||
|
throw new FSError(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete the given path to a file or directory.
|
* Delete the given path to a file or directory.
|
||||||
* @param p the path to delete
|
* @param p the path to delete
|
||||||
|
|
|
@ -60,6 +60,7 @@ abstract public class FsCommand extends Command {
|
||||||
factory.registerCommands(Tail.class);
|
factory.registerCommands(Tail.class);
|
||||||
factory.registerCommands(Test.class);
|
factory.registerCommands(Test.class);
|
||||||
factory.registerCommands(Touch.class);
|
factory.registerCommands(Touch.class);
|
||||||
|
factory.registerCommands(Truncate.class);
|
||||||
factory.registerCommands(SnapshotCommands.class);
|
factory.registerCommands(SnapshotCommands.class);
|
||||||
factory.registerCommands(XAttrCommands.class);
|
factory.registerCommands(XAttrCommands.class);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,117 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.shell;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.PathIsDirectoryException;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Truncates a file to a new size
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class Truncate extends FsCommand {
|
||||||
|
public static void registerCommands(CommandFactory factory) {
|
||||||
|
factory.addClass(Truncate.class, "-truncate");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final String NAME = "truncate";
|
||||||
|
public static final String USAGE = "[-w] <length> <path> ...";
|
||||||
|
public static final String DESCRIPTION =
|
||||||
|
"Truncate all files that match the specified file pattern to the " +
|
||||||
|
"specified length.\n" +
|
||||||
|
"-w: Requests that the command wait for block recovery to complete, " +
|
||||||
|
"if necessary.";
|
||||||
|
|
||||||
|
protected long newLength = -1;
|
||||||
|
protected List<PathData> waitList = new LinkedList<>();
|
||||||
|
protected boolean waitOpt = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processOptions(LinkedList<String> args) throws IOException {
|
||||||
|
CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "w");
|
||||||
|
cf.parse(args);
|
||||||
|
waitOpt = cf.getOpt("w");
|
||||||
|
|
||||||
|
try {
|
||||||
|
newLength = Long.parseLong(args.removeFirst());
|
||||||
|
} catch(NumberFormatException nfe) {
|
||||||
|
displayWarning("Illegal length, a non-negative integer expected");
|
||||||
|
throw nfe;
|
||||||
|
}
|
||||||
|
if(newLength < 0) {
|
||||||
|
throw new IllegalArgumentException("length must be >= 0");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processArguments(LinkedList<PathData> args)
|
||||||
|
throws IOException {
|
||||||
|
super.processArguments(args);
|
||||||
|
if (waitOpt) waitForRecovery();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processPath(PathData item) throws IOException {
|
||||||
|
if(item.stat.isDirectory()) {
|
||||||
|
throw new PathIsDirectoryException(item.toString());
|
||||||
|
}
|
||||||
|
long oldLength = item.stat.getLen();
|
||||||
|
if(newLength > oldLength) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Cannot truncate to a larger file size. Current size: " + oldLength +
|
||||||
|
", truncate size: " + newLength + ".");
|
||||||
|
}
|
||||||
|
if(item.fs.truncate(item.path, newLength)) {
|
||||||
|
out.println("Truncated " + item + " to length: " + newLength);
|
||||||
|
}
|
||||||
|
else if(waitOpt) {
|
||||||
|
waitList.add(item);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
out.println("Truncating " + item + " to length: " + newLength + ". " +
|
||||||
|
"Wait for block recovery to complete before further updating this " +
|
||||||
|
"file.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for all files in waitList to have length equal to newLength.
|
||||||
|
*/
|
||||||
|
private void waitForRecovery() throws IOException {
|
||||||
|
for(PathData item : waitList) {
|
||||||
|
out.println("Waiting for " + item + " ...");
|
||||||
|
out.flush();
|
||||||
|
|
||||||
|
for(;;) {
|
||||||
|
item.refreshStatus();
|
||||||
|
if(item.stat.getLen() == newLength) break;
|
||||||
|
try {Thread.sleep(1000);} catch(InterruptedException ignored) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
out.println("Truncated " + item + " to length: " + newLength);
|
||||||
|
out.flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -447,6 +447,14 @@ public class ViewFileSystem extends FileSystem {
|
||||||
resDst.remainingPath);
|
resDst.remainingPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean truncate(final Path f, final long newLength)
|
||||||
|
throws IOException {
|
||||||
|
InodeTree.ResolveResult<FileSystem> res =
|
||||||
|
fsState.resolve(getUriPath(f), true);
|
||||||
|
return res.targetFileSystem.truncate(f, newLength);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setOwner(final Path f, final String username,
|
public void setOwner(final Path f, final String username,
|
||||||
final String groupname) throws AccessControlException,
|
final String groupname) throws AccessControlException,
|
||||||
|
@ -833,6 +841,11 @@ public class ViewFileSystem extends FileSystem {
|
||||||
throw readOnlyMountTable("rename", src);
|
throw readOnlyMountTable("rename", src);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean truncate(Path f, long newLength) throws IOException {
|
||||||
|
throw readOnlyMountTable("truncate", f);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setOwner(Path f, String username, String groupname)
|
public void setOwner(Path f, String username, String groupname)
|
||||||
throws AccessControlException, IOException {
|
throws AccessControlException, IOException {
|
||||||
|
|
|
@ -627,14 +627,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Truncate the file in the indicated path to the indicated size.
|
|
||||||
* @param f The path to the file to be truncated
|
|
||||||
* @param newLength The size the file is to be truncated to
|
|
||||||
*
|
|
||||||
* @return true if and client does not need to wait for block recovery,
|
|
||||||
* false if client needs to wait for block recovery.
|
|
||||||
*/
|
|
||||||
public boolean truncate(Path f, final long newLength) throws IOException {
|
public boolean truncate(Path f, final long newLength) throws IOException {
|
||||||
statistics.incrementWriteOps(1);
|
statistics.incrementWriteOps(1);
|
||||||
return dfs.truncate(getPathName(f), newLength);
|
return dfs.truncate(getPathName(f), newLength);
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FsShell;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -55,6 +56,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -676,6 +678,73 @@ public class TestFileTruncate {
|
||||||
fs.delete(parent, true);
|
fs.delete(parent, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTruncateShellCommand() throws Exception {
|
||||||
|
final Path parent = new Path("/test");
|
||||||
|
final Path src = new Path("/test/testTruncateShellCommand");
|
||||||
|
final int oldLength = 2*BLOCK_SIZE + 1;
|
||||||
|
final int newLength = BLOCK_SIZE + 1;
|
||||||
|
|
||||||
|
String[] argv =
|
||||||
|
new String[]{"-truncate", String.valueOf(newLength), src.toString()};
|
||||||
|
runTruncateShellCommand(src, oldLength, argv);
|
||||||
|
|
||||||
|
// wait for block recovery
|
||||||
|
checkBlockRecovery(src);
|
||||||
|
assertThat(fs.getFileStatus(src).getLen(), is((long) newLength));
|
||||||
|
fs.delete(parent, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTruncateShellCommandOnBlockBoundary() throws Exception {
|
||||||
|
final Path parent = new Path("/test");
|
||||||
|
final Path src = new Path("/test/testTruncateShellCommandOnBoundary");
|
||||||
|
final int oldLength = 2 * BLOCK_SIZE;
|
||||||
|
final int newLength = BLOCK_SIZE;
|
||||||
|
|
||||||
|
String[] argv =
|
||||||
|
new String[]{"-truncate", String.valueOf(newLength), src.toString()};
|
||||||
|
runTruncateShellCommand(src, oldLength, argv);
|
||||||
|
|
||||||
|
// shouldn't need to wait for block recovery
|
||||||
|
assertThat(fs.getFileStatus(src).getLen(), is((long) newLength));
|
||||||
|
fs.delete(parent, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTruncateShellCommandWithWaitOption() throws Exception {
|
||||||
|
final Path parent = new Path("/test");
|
||||||
|
final Path src = new Path("/test/testTruncateShellCommandWithWaitOption");
|
||||||
|
final int oldLength = 2 * BLOCK_SIZE + 1;
|
||||||
|
final int newLength = BLOCK_SIZE + 1;
|
||||||
|
|
||||||
|
String[] argv = new String[]{"-truncate", "-w", String.valueOf(newLength),
|
||||||
|
src.toString()};
|
||||||
|
runTruncateShellCommand(src, oldLength, argv);
|
||||||
|
|
||||||
|
// shouldn't need to wait for block recovery
|
||||||
|
assertThat(fs.getFileStatus(src).getLen(), is((long) newLength));
|
||||||
|
fs.delete(parent, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runTruncateShellCommand(Path src, int oldLength,
|
||||||
|
String[] shellOpts) throws Exception {
|
||||||
|
// create file and write data
|
||||||
|
writeContents(AppendTestUtil.initBuffer(oldLength), oldLength, src);
|
||||||
|
assertThat(fs.getFileStatus(src).getLen(), is((long)oldLength));
|
||||||
|
|
||||||
|
// truncate file using shell
|
||||||
|
FsShell shell = null;
|
||||||
|
try {
|
||||||
|
shell = new FsShell(conf);
|
||||||
|
assertThat(ToolRunner.run(shell, shellOpts), is(0));
|
||||||
|
} finally {
|
||||||
|
if(shell != null) {
|
||||||
|
shell.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void writeContents(byte[] contents, int fileLength, Path p)
|
static void writeContents(byte[] contents, int fileLength, Path p)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
FSDataOutputStream out = fs.create(p, true, BLOCK_SIZE, REPLICATION,
|
FSDataOutputStream out = fs.create(p, true, BLOCK_SIZE, REPLICATION,
|
||||||
|
|
Loading…
Reference in New Issue