HDFS-4098. Add FileWithLink, INodeFileUnderConstructionWithLink and INodeFileUnderConstructionSnapshot in order to support append to snapshotted files.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1432788 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-01-14 02:30:15 +00:00
parent bc0aff27a4
commit 25116c26fd
12 changed files with 380 additions and 165 deletions

View File

@ -100,3 +100,7 @@ Branch-2802 Snapshot (Unreleased)
HDFS-4245. Include snapshot related operations in TestOfflineEditsViewer. HDFS-4245. Include snapshot related operations in TestOfflineEditsViewer.
(Jing Zhao via szetszwo) (Jing Zhao via szetszwo)
HDFS-4098. Add FileWithLink, INodeFileUnderConstructionWithLink and
INodeFileUnderConstructionSnapshot in order to support append to snapshotted
files. (szetszwo)

View File

@ -323,7 +323,7 @@ private void applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
if (oldFile.isUnderConstruction()) { if (oldFile.isUnderConstruction()) {
INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile; INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path); fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
INodeFile newFile = ucFile.convertToInodeFile(ucFile.getModificationTime()); INodeFile newFile = ucFile.toINodeFile(ucFile.getModificationTime());
fsDir.unprotectedReplaceINodeFile(addCloseOp.path, ucFile, newFile, fsDir.unprotectedReplaceINodeFile(addCloseOp.path, ucFile, newFile,
iip.getLatestSnapshot()); iip.getLatestSnapshot());
} }

View File

@ -1324,10 +1324,11 @@ private LocatedBlocks getBlockLocationsUpdateTimes(String src,
doAccessTime = false; doAccessTime = false;
} }
long now = now(); final INodesInPath iip = dir.getINodesInPath(src);
final INodesInPath iip = dir.getMutableINodesInPath(src);
final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src); final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
if (doAccessTime && isAccessTimeSupported()) { if (!iip.isSnapshot() //snapshots are readonly, so don't update atime.
&& doAccessTime && isAccessTimeSupported()) {
final long now = now();
if (now <= inode.getAccessTime() + getAccessTimePrecision()) { if (now <= inode.getAccessTime() + getAccessTimePrecision()) {
// if we have to set access time but we only have the readlock, then // if we have to set access time but we only have the readlock, then
// restart this entire operation with the writeLock. // restart this entire operation with the writeLock.
@ -1948,17 +1949,13 @@ private LocatedBlock startFileInternal(String src,
LocatedBlock prepareFileForWrite(String src, INodeFile file, LocatedBlock prepareFileForWrite(String src, INodeFile file,
String leaseHolder, String clientMachine, DatanodeDescriptor clientNode, String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
boolean writeToEditLog, Snapshot latestSnapshot) throws IOException { boolean writeToEditLog, Snapshot latestSnapshot) throws IOException {
//TODO SNAPSHOT: INodeFileUnderConstruction with link if (latestSnapshot != null) {
INodeFileUnderConstruction cons = new INodeFileUnderConstruction( file = (INodeFile)file.recordModification(latestSnapshot).left;
file.getLocalNameBytes(), }
file.getFileReplication(), final INodeFileUnderConstruction cons
file.getModificationTime(), = INodeFileUnderConstruction.toINodeFileUnderConstruction(
file.getPreferredBlockSize(), file, leaseHolder, clientMachine, clientNode);
file.getBlocks(),
file.getPermissionStatus(),
leaseHolder,
clientMachine,
clientNode);
dir.replaceINodeFile(src, file, cons, latestSnapshot); dir.replaceINodeFile(src, file, cons, latestSnapshot);
leaseManager.addLease(cons.getClientName(), src); leaseManager.addLease(cons.getClientName(), src);
@ -3271,7 +3268,7 @@ private void finalizeINodeFileUnderConstruction(String src,
// The file is no longer pending. // The file is no longer pending.
// Create permanent INode, update blocks // Create permanent INode, update blocks
INodeFile newFile = pendingFile.convertToInodeFile(now()); INodeFile newFile = pendingFile.toINodeFile(now());
dir.replaceINodeFile(src, pendingFile, newFile, latestSnapshot); dir.replaceINodeFile(src, pendingFile, newFile, latestSnapshot);
// close file and persist block allocations for this file // close file and persist block allocations for this file

View File

@ -497,7 +497,7 @@ public long getAccessTime() {
/** /**
* Set last access time of inode. * Set last access time of inode.
*/ */
INode setAccessTime(long atime, Snapshot latest) { public INode setAccessTime(long atime, Snapshot latest) {
Pair<? extends INode, ? extends INode> pair = recordModification(latest); Pair<? extends INode, ? extends INode> pair = recordModification(latest);
INode nodeToUpdate = pair != null ? pair.left : this; INode nodeToUpdate = pair != null ? pair.left : this;
nodeToUpdate.accessTime = atime; nodeToUpdate.accessTime = atime;

View File

@ -28,8 +28,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
/** I-node for closed file. */ /** I-node for closed file. */
@ -96,7 +94,7 @@ static long combinePreferredBlockSize(long header, long blockSize) {
preferredBlockSize); preferredBlockSize);
} }
INodeFile(byte[] name, PermissionStatus permissions, long mtime, long atime, protected INodeFile(byte[] name, PermissionStatus permissions, long mtime, long atime,
BlockInfo[] blklist, short replication, long preferredBlockSize) { BlockInfo[] blklist, short replication, long preferredBlockSize) {
super(name, permissions, null, mtime, atime); super(name, permissions, null, mtime, atime);
header = HeaderFormat.combineReplication(header, replication); header = HeaderFormat.combineReplication(header, replication);
@ -111,7 +109,7 @@ protected INodeFile(INodeFile that) {
} }
@Override @Override
public Pair<INodeFileWithLink, INodeFileSnapshot> createSnapshotCopy() { public Pair<? extends INodeFile, ? extends INodeFile> createSnapshotCopy() {
return parent.replaceINodeFile(this).createSnapshotCopy(); return parent.replaceINodeFile(this).createSnapshotCopy();
} }
@ -141,7 +139,7 @@ public short getBlockReplication() {
return getFileReplication(); return getFileReplication();
} }
protected void setFileReplication(short replication, Snapshot latest) { public void setFileReplication(short replication, Snapshot latest) {
if (latest != null) { if (latest != null) {
final Pair<? extends INode, ? extends INode> p = recordModification(latest); final Pair<? extends INode, ? extends INode> p = recordModification(latest);
if (p != null) { if (p != null) {

View File

@ -29,6 +29,10 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithLink;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithLink;
import com.google.common.base.Preconditions;
/** /**
* I-node for file being written. * I-node for file being written.
@ -45,6 +49,28 @@ public static INodeFileUnderConstruction valueOf(INode inode, String path
return (INodeFileUnderConstruction)file; return (INodeFileUnderConstruction)file;
} }
/** Convert the given file to an {@link INodeFileUnderConstruction}. */
public static INodeFileUnderConstruction toINodeFileUnderConstruction(
INodeFile file,
String clientName,
String clientMachine,
DatanodeDescriptor clientNode) {
Preconditions.checkArgument(!(file instanceof INodeFileUnderConstruction),
"file is already an INodeFileUnderConstruction");
final INodeFileUnderConstruction uc = new INodeFileUnderConstruction(
file.getLocalNameBytes(),
file.getFileReplication(),
file.getModificationTime(),
file.getPreferredBlockSize(),
file.getBlocks(),
file.getPermissionStatus(),
clientName,
clientMachine,
clientNode);
return file instanceof INodeFileWithLink?
new INodeFileUnderConstructionWithLink(uc): uc;
}
private String clientName; // lease holder private String clientName; // lease holder
private final String clientMachine; private final String clientMachine;
private final DatanodeDescriptor clientNode; // if client is a cluster node too. private final DatanodeDescriptor clientNode; // if client is a cluster node too.
@ -56,11 +82,8 @@ public static INodeFileUnderConstruction valueOf(INode inode, String path
String clientName, String clientName,
String clientMachine, String clientMachine,
DatanodeDescriptor clientNode) { DatanodeDescriptor clientNode) {
super(permissions.applyUMask(UMASK), BlockInfo.EMPTY_ARRAY, replication, this(null, replication, modTime, preferredBlockSize, BlockInfo.EMPTY_ARRAY,
modTime, modTime, preferredBlockSize); permissions.applyUMask(UMASK), clientName, clientMachine, clientNode);
this.clientName = clientName;
this.clientMachine = clientMachine;
this.clientNode = clientNode;
} }
INodeFileUnderConstruction(byte[] name, INodeFileUnderConstruction(byte[] name,
@ -78,6 +101,13 @@ public static INodeFileUnderConstruction valueOf(INode inode, String path
this.clientMachine = clientMachine; this.clientMachine = clientMachine;
this.clientNode = clientNode; this.clientNode = clientNode;
} }
protected INodeFileUnderConstruction(INodeFileUnderConstruction that) {
super(that);
this.clientName = that.clientName;
this.clientMachine = that.clientMachine;
this.clientNode = that.clientNode;
}
String getClientName() { String getClientName() {
return clientName; return clientName;
@ -103,30 +133,26 @@ public boolean isUnderConstruction() {
return true; return true;
} }
// /**
// converts a INodeFileUnderConstruction into a INodeFile * Converts an INodeFileUnderConstruction to an INodeFile.
// use the modification time as the access time * The original modification time is used as the access time.
// * The new modification is the specified mtime.
INodeFile convertToInodeFile(long mtime) { */
assert allBlocksComplete() : "Can't finalize inode " + this protected INodeFile toINodeFile(long mtime) {
+ " since it contains non-complete blocks! Blocks are " assertAllBlocksComplete();
+ Arrays.asList(getBlocks());
//TODO SNAPSHOT: may convert to INodeFileWithLink
return new INodeFile(getLocalNameBytes(), getPermissionStatus(), return new INodeFile(getLocalNameBytes(), getPermissionStatus(),
mtime, getModificationTime(), mtime, getModificationTime(),
getBlocks(), getFileReplication(), getPreferredBlockSize()); getBlocks(), getFileReplication(), getPreferredBlockSize());
} }
/** /** Assert all blocks are complete. */
* @return true if all of the blocks in this file are marked as completed. protected void assertAllBlocksComplete() {
*/ final BlockInfo[] blocks = getBlocks();
private boolean allBlocksComplete() { for (int i = 0; i < blocks.length; i++) {
for (BlockInfo b : getBlocks()) { Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
if (!b.isComplete()) { + " %s %s since blocks[%s] is non-complete, where blocks=%s.",
return false; getClass().getSimpleName(), this, i, Arrays.asList(getBlocks()));
}
} }
return true;
} }
/** /**

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapINodeUpdateEntry;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import com.google.common.base.Preconditions;
/**
* {@link INodeFile} with a link to the next element.
* The link of all the snapshot files and the original file form a circular
* linked list so that all elements are accessible by any of the elements.
*/
@InterfaceAudience.Private
public interface FileWithLink {
/** @return the next element. */
public <N extends INodeFile & FileWithLink> N getNext();
/** Set the next element. */
public <N extends INodeFile & FileWithLink> void setNext(N next);
/** Insert inode to the circular linked list. */
public <N extends INodeFile & FileWithLink> void insert(N inode);
/** Utility methods for the classes which implement the interface. */
static class Util {
/**
* @return the max file replication of the elements
* in the circular linked list.
*/
static <F extends INodeFile & FileWithLink,
N extends INodeFile & FileWithLink> short getBlockReplication(
final F file) {
short max = file.getFileReplication();
// i may be null since next will be set to null when the INode is deleted
for(N i = file.getNext(); i != file && i != null; i = i.getNext()) {
final short replication = i.getFileReplication();
if (replication > max) {
max = replication;
}
}
return max;
}
/**
* Remove the current inode from the circular linked list.
* If some blocks at the end of the block list no longer belongs to
* any other inode, collect them and update the block list.
*/
static <F extends INodeFile & FileWithLink,
N extends INodeFile & FileWithLink>
int collectSubtreeBlocksAndClear(F file, BlocksMapUpdateInfo info) {
final N next = file.getNext();
Preconditions.checkState(next != file, "this is the only remaining inode.");
// There are other inode(s) using the blocks.
// Compute max file size excluding this and find the last inode.
long max = next.computeFileSize(true);
short maxReplication = next.getFileReplication();
FileWithLink last = next;
for(N i = next.getNext(); i != file; i = i.getNext()) {
final long size = i.computeFileSize(true);
if (size > max) {
max = size;
}
final short rep = i.getFileReplication();
if (rep > maxReplication) {
maxReplication = rep;
}
last = i;
}
collectBlocksBeyondMaxAndClear(file, max, info);
// remove this from the circular linked list.
last.setNext(next);
// Set the replication of the current INode to the max of all the other
// linked INodes, so that in case the current INode is retrieved from the
// blocksMap before it is removed or updated, the correct replication
// number can be retrieved.
file.setFileReplication(maxReplication, null);
file.setNext(null);
// clear parent
file.setParent(null);
return 1;
}
static <F extends INodeFile & FileWithLink,
N extends INodeFile & FileWithLink>
void collectBlocksBeyondMaxAndClear(final F file,
final long max, final BlocksMapUpdateInfo info) {
final BlockInfo[] oldBlocks = file.getBlocks();
if (oldBlocks != null) {
//find the minimum n such that the size of the first n blocks > max
int n = 0;
for(long size = 0; n < oldBlocks.length && max > size; n++) {
size += oldBlocks[n].getNumBytes();
}
// Replace the INode for all the remaining blocks in blocksMap
final N next = file.getNext();
final BlocksMapINodeUpdateEntry entry = new BlocksMapINodeUpdateEntry(
file, next);
if (info != null) {
for (int i = 0; i < n; i++) {
info.addUpdateBlock(oldBlocks[i], entry);
}
}
// starting from block n, the data is beyond max.
if (n < oldBlocks.length) {
// resize the array.
final BlockInfo[] newBlocks;
if (n == 0) {
newBlocks = null;
} else {
newBlocks = new BlockInfo[n];
System.arraycopy(oldBlocks, 0, newBlocks, 0, n);
}
for(N i = next; i != file; i = i.getNext()) {
i.setBlocks(newBlocks);
}
// collect the blocks beyond max.
if (info != null) {
for(; n < oldBlocks.length; n++) {
info.addDeleteBlock(oldBlocks[n]);
}
}
}
file.setBlocks(null);
}
}
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
/**
* INode representing a snapshot of an {@link INodeFileUnderConstruction}.
*/
@InterfaceAudience.Private
public class INodeFileUnderConstructionSnapshot
extends INodeFileUnderConstructionWithLink {
/** The file size at snapshot creation time. */
final long size;
INodeFileUnderConstructionSnapshot(INodeFileUnderConstructionWithLink f) {
super(f);
this.size = f.computeFileSize(true);
f.insert(this);
}
@Override
public long computeFileSize(boolean includesBlockInfoUnderConstruction) {
//ignore includesBlockInfoUnderConstruction
//since files in a snapshot are considered as closed.
return size;
}
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
/**
* Represent an {@link INodeFileUnderConstruction} that is snapshotted.
* Note that snapshot files are represented by
* {@link INodeFileUnderConstructionSnapshot}.
*/
@InterfaceAudience.Private
public class INodeFileUnderConstructionWithLink
extends INodeFileUnderConstruction implements FileWithLink {
private FileWithLink next;
public INodeFileUnderConstructionWithLink(INodeFileUnderConstruction f) {
super(f);
setNext(this);
}
@Override
protected INodeFileWithLink toINodeFile(final long mtime) {
assertAllBlocksComplete();
final long atime = getModificationTime();
final INodeFileWithLink f = new INodeFileWithLink(this);
f.setModificationTime(mtime, null);
f.setAccessTime(atime, null);
return f;
}
@Override
public Pair<? extends INodeFileUnderConstruction,
INodeFileUnderConstructionSnapshot> createSnapshotCopy() {
return new Pair<INodeFileUnderConstructionWithLink,
INodeFileUnderConstructionSnapshot>(
this, new INodeFileUnderConstructionSnapshot(this));
}
@SuppressWarnings("unchecked")
@Override
public <N extends INodeFile & FileWithLink> N getNext() {
return (N)next;
}
@Override
public <N extends INodeFile & FileWithLink> void setNext(N next) {
this.next = next;
}
@Override
public <N extends INodeFile & FileWithLink> void insert(N inode) {
inode.setNext(this.getNext());
this.setNext(inode);
}
@Override
public short getBlockReplication() {
return Util.getBlockReplication(this);
}
@Override
protected int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
if (next == null || next == this) {
// this is the only remaining inode.
return super.collectSubtreeBlocksAndClear(info);
} else {
return Util.collectSubtreeBlocksAndClear(this, info);
}
}
}

View File

@ -18,23 +18,19 @@
package org.apache.hadoop.hdfs.server.namenode.snapshot; package org.apache.hadoop.hdfs.server.namenode.snapshot;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFile;
/** /**
* INodeFile with a link to the next element. * Represent an {@link INodeFile} that is snapshotted.
* This class is used to represent the original file that is snapshotted. * Note that snapshot files are represented by {@link INodeFileSnapshot}.
* The snapshot files are represented by {@link INodeFileSnapshot}.
* The link of all the snapshot files and the original file form a circular
* linked list so that all elements are accessible by any of the elements.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class INodeFileWithLink extends INodeFile { public class INodeFileWithLink extends INodeFile implements FileWithLink {
private INodeFileWithLink next; private FileWithLink next;
public INodeFileWithLink(INodeFile f) { public INodeFileWithLink(INodeFile f) {
super(f); super(f);
next = this; setNext(this);
} }
@Override @Override
@ -43,124 +39,35 @@ public Pair<INodeFileWithLink, INodeFileSnapshot> createSnapshotCopy() {
new INodeFileSnapshot(this)); new INodeFileSnapshot(this));
} }
void setNext(INodeFileWithLink next) { @SuppressWarnings("unchecked")
@Override
public <N extends INodeFile & FileWithLink> N getNext() {
return (N)next;
}
@Override
public <N extends INodeFile & FileWithLink> void setNext(N next) {
this.next = next; this.next = next;
} }
INodeFileWithLink getNext() { @Override
return next; public <N extends INodeFile & FileWithLink> void insert(N inode) {
}
/** Insert inode to the circular linked list. */
void insert(INodeFileWithLink inode) {
inode.setNext(this.getNext()); inode.setNext(this.getNext());
this.setNext(inode); this.setNext(inode);
} }
/**
* @return the max file replication of the elements
* in the circular linked list.
*/
@Override @Override
public short getBlockReplication() { public short getBlockReplication() {
short max = getFileReplication(); return Util.getBlockReplication(this);
// i may be null since next will be set to null when the INode is deleted
for(INodeFileWithLink i = next; i != this && i != null; i = i.getNext()) {
final short replication = i.getFileReplication();
if (replication > max) {
max = replication;
}
}
return max;
} }
/**
* {@inheritDoc}
*
* Remove the current inode from the circular linked list.
* If some blocks at the end of the block list no longer belongs to
* any other inode, collect them and update the block list.
*/
@Override @Override
public int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) { public int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
if (next == this) { if (next == null || next == this) {
// this is the only remaining inode. // this is the only remaining inode.
super.collectSubtreeBlocksAndClear(info); return super.collectSubtreeBlocksAndClear(info);
} else { } else {
// There are other inode(s) using the blocks. return Util.collectSubtreeBlocksAndClear(this, info);
// Compute max file size excluding this and find the last inode.
long max = next.computeFileSize(true);
short maxReplication = next.getFileReplication();
INodeFileWithLink last = next;
for(INodeFileWithLink i = next.getNext(); i != this; i = i.getNext()) {
final long size = i.computeFileSize(true);
if (size > max) {
max = size;
}
final short rep = i.getFileReplication();
if (rep > maxReplication) {
maxReplication = rep;
}
last = i;
}
collectBlocksBeyondMaxAndClear(max, info);
// remove this from the circular linked list.
last.next = this.next;
// Set the replication of the current INode to the max of all the other
// linked INodes, so that in case the current INode is retrieved from the
// blocksMap before it is removed or updated, the correct replication
// number can be retrieved.
this.setFileReplication(maxReplication, null);
this.next = null;
// clear parent
setParent(null);
}
return 1;
}
private void collectBlocksBeyondMaxAndClear(final long max,
final BlocksMapUpdateInfo info) {
final BlockInfo[] oldBlocks = getBlocks();
if (oldBlocks != null) {
//find the minimum n such that the size of the first n blocks > max
int n = 0;
for(long size = 0; n < oldBlocks.length && max > size; n++) {
size += oldBlocks[n].getNumBytes();
}
// Replace the INode for all the remaining blocks in blocksMap
BlocksMapINodeUpdateEntry entry = new BlocksMapINodeUpdateEntry(this,
next);
if (info != null) {
for (int i = 0; i < n; i++) {
info.addUpdateBlock(oldBlocks[i], entry);
}
}
// starting from block n, the data is beyond max.
if (n < oldBlocks.length) {
// resize the array.
final BlockInfo[] newBlocks;
if (n == 0) {
newBlocks = null;
} else {
newBlocks = new BlockInfo[n];
System.arraycopy(oldBlocks, 0, newBlocks, 0, n);
}
for(INodeFileWithLink i = next; i != this; i = i.getNext()) {
i.setBlocks(newBlocks);
}
// collect the blocks beyond max.
if (info != null) {
for(; n < oldBlocks.length; n++) {
info.addDeleteBlock(oldBlocks[n]);
}
}
}
setBlocks(null);
} }
} }
} }

View File

@ -364,8 +364,7 @@ public void testSnapshotPathINodesWithAddedFile() throws Exception {
* Test {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)} * Test {@link INodeDirectory#getExistingPathINodes(byte[][], int, boolean)}
* for snapshot file while modifying file after snapshot. * for snapshot file while modifying file after snapshot.
*/ */
// TODO: disable it temporarily since it uses append. @Test
// @Test
public void testSnapshotPathINodesAfterModification() throws Exception { public void testSnapshotPathINodesAfterModification() throws Exception {
//file1 was deleted, create it again. //file1 was deleted, create it again.
DFSTestUtil.createFile(hdfs, file1, 1024, REPLICATION, seed); DFSTestUtil.createFile(hdfs, file1, 1024, REPLICATION, seed);

View File

@ -270,10 +270,9 @@ private Modification[] prepareModifications(TestDirectoryTree.Node[] nodes)
Modification delete = new FileDeletion( Modification delete = new FileDeletion(
node.fileList.get((node.nullFileIndex + 1) % node.fileList.size()), node.fileList.get((node.nullFileIndex + 1) % node.fileList.size()),
hdfs); hdfs);
// TODO: fix append for snapshots Modification append = new FileAppend(
// Modification append = new FileAppend( node.fileList.get((node.nullFileIndex + 2) % node.fileList.size()),
// node.fileList.get((node.nullFileIndex + 2) % node.fileList.size()), hdfs, (int) BLOCKSIZE);
// hdfs, (int) BLOCKSIZE);
Modification chmod = new FileChangePermission( Modification chmod = new FileChangePermission(
node.fileList.get((node.nullFileIndex + 3) % node.fileList.size()), node.fileList.get((node.nullFileIndex + 3) % node.fileList.size()),
hdfs, genRandomPermission()); hdfs, genRandomPermission());
@ -290,8 +289,7 @@ private Modification[] prepareModifications(TestDirectoryTree.Node[] nodes)
mList.add(create); mList.add(create);
mList.add(delete); mList.add(delete);
// TODO: fix append for snapshots mList.add(append);
// mList.add(append);
mList.add(chmod); mList.add(chmod);
mList.add(chown); mList.add(chown);
mList.add(replication); mList.add(replication);