svn merge -c 1415799 from trunk for HDFS-4213. Add an API to hsync for updating the last block length at the namenode.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1415801 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-11-30 19:30:36 +00:00
parent 74122abb1f
commit 5c0153d833
11 changed files with 231 additions and 46 deletions

View File

@ -19,6 +19,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4155. libhdfs implementation of hsync API (Liang Xie via todd) HDFS-4155. libhdfs implementation of hsync API (Liang Xie via todd)
HDFS-4213. Add an API to hsync for updating the last block length at the
namenode. (Jing Zhao via szetszwo)
IMPROVEMENTS IMPROVEMENTS
HDFS-3925. Prettify PipelineAck#toString() for printing to a log HDFS-3925. Prettify PipelineAck#toString() for printing to a log

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -1496,7 +1497,12 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
*/ */
@Override @Override
public void hflush() throws IOException { public void hflush() throws IOException {
flushOrSync(false); flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
}
@Override
public void hsync() throws IOException {
hsync(EnumSet.noneOf(SyncFlag.class));
} }
/** /**
@ -1507,17 +1513,35 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
* Note that only the current block is flushed to the disk device. * Note that only the current block is flushed to the disk device.
* To guarantee durable sync across block boundaries the stream should * To guarantee durable sync across block boundaries the stream should
* be created with {@link CreateFlag#SYNC_BLOCK}. * be created with {@link CreateFlag#SYNC_BLOCK}.
*
* @param syncFlags
* Indicate the semantic of the sync. Currently used to specify
* whether or not to update the block length in NameNode.
*/ */
@Override public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
public void hsync() throws IOException { flushOrSync(true, syncFlags);
flushOrSync(true);
} }
private void flushOrSync(boolean isSync) throws IOException { /**
* Flush/Sync buffered data to DataNodes.
*
* @param isSync
* Whether or not to require all replicas to flush data to the disk
* device
* @param syncFlags
* Indicate extra detailed semantic of the flush/sync. Currently
* mainly used to specify whether or not to update the file length in
* the NameNode
* @throws IOException
*/
private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
throws IOException {
dfsClient.checkOpen(); dfsClient.checkOpen();
isClosed(); isClosed();
try { try {
long toWaitFor; long toWaitFor;
long lastBlockLength = -1L;
boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
synchronized (this) { synchronized (this) {
/* Record current blockOffset. This might be changed inside /* Record current blockOffset. This might be changed inside
* flushBuffer() where a partial checksum chunk might be flushed. * flushBuffer() where a partial checksum chunk might be flushed.
@ -1582,12 +1606,19 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
waitForAckedSeqno(toWaitFor); waitForAckedSeqno(toWaitFor);
// If any new blocks were allocated since the last flush, if (updateLength) {
// then persist block locations on namenode. synchronized (this) {
// if (streamer != null && streamer.block != null) {
if (persistBlocks.getAndSet(false)) { lastBlockLength = streamer.block.getNumBytes();
}
}
}
// If 1) any new blocks were allocated since the last flush, or 2) to
// update length in NN is requried, then persist block locations on
// namenode.
if (persistBlocks.getAndSet(false) || updateLength) {
try { try {
dfsClient.namenode.fsync(src, dfsClient.clientName); dfsClient.namenode.fsync(src, dfsClient.clientName, lastBlockLength);
} catch (IOException ioe) { } catch (IOException ioe) {
DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe); DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
// If we got an error here, it might be because some other thread called // If we got an error here, it might be because some other thread called

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.client; package org.apache.hadoop.hdfs.client;
import java.io.IOException; import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -56,4 +57,24 @@ public class HdfsDataOutputStream extends FSDataOutputStream {
public synchronized int getCurrentBlockReplication() throws IOException { public synchronized int getCurrentBlockReplication() throws IOException {
return ((DFSOutputStream)getWrappedStream()).getCurrentBlockReplication(); return ((DFSOutputStream)getWrappedStream()).getCurrentBlockReplication();
} }
/**
* Sync buffered data to DataNodes (flush to disk devices).
*
* @param syncFlags
* Indicate the detailed semantic and actions of the hsync.
* @throws IOException
* @see FSDataOutputStream#hsync()
*/
public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
((DFSOutputStream) getWrappedStream()).hsync(syncFlags);
}
public static enum SyncFlag {
/**
* When doing sync to DataNodes, also update the metadata (block
* length) in the NameNode
*/
UPDATE_LENGTH;
}
} }

View File

@ -815,14 +815,15 @@ public interface ClientProtocol {
* The file must be currently open for writing. * The file must be currently open for writing.
* @param src The string representation of the path * @param src The string representation of the path
* @param client The string representation of the client * @param client The string representation of the client
* * @param lastBlockLength The length of the last block (under construction)
* to be reported to NameNode
* @throws AccessControlException permission denied * @throws AccessControlException permission denied
* @throws FileNotFoundException file <code>src</code> is not found * @throws FileNotFoundException file <code>src</code> is not found
* @throws UnresolvedLinkException if <code>src</code> contains a symlink. * @throws UnresolvedLinkException if <code>src</code> contains a symlink.
* @throws IOException If an I/O error occurred * @throws IOException If an I/O error occurred
*/ */
@Idempotent @Idempotent
public void fsync(String src, String client) public void fsync(String src, String client, long lastBlockLength)
throws AccessControlException, FileNotFoundException, throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException; UnresolvedLinkException, IOException;

View File

@ -688,7 +688,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public FsyncResponseProto fsync(RpcController controller, public FsyncResponseProto fsync(RpcController controller,
FsyncRequestProto req) throws ServiceException { FsyncRequestProto req) throws ServiceException {
try { try {
server.fsync(req.getSrc(), req.getClient()); server.fsync(req.getSrc(), req.getClient(), req.getLastBlockLength());
return VOID_FSYNC_RESPONSE; return VOID_FSYNC_RESPONSE;
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);

View File

@ -659,12 +659,11 @@ public class ClientNamenodeProtocolTranslatorPB implements
} }
@Override @Override
public void fsync(String src, String client) throws AccessControlException, public void fsync(String src, String client, long lastBlockLength)
FileNotFoundException, UnresolvedLinkException, IOException { throws AccessControlException, FileNotFoundException,
FsyncRequestProto req = FsyncRequestProto.newBuilder() UnresolvedLinkException, IOException {
.setSrc(src) FsyncRequestProto req = FsyncRequestProto.newBuilder().setSrc(src)
.setClient(client) .setClient(client).setLastBlockLength(lastBlockLength).build();
.build();
try { try {
rpcProxy.fsync(null, req); rpcProxy.fsync(null, req);
} catch (ServiceException e) { } catch (ServiceException e) {

View File

@ -2955,9 +2955,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** Persist all metadata about this file. /** Persist all metadata about this file.
* @param src The string representation of the path * @param src The string representation of the path
* @param clientName The string representation of the client * @param clientName The string representation of the client
* @param lastBlockLength The length of the last block
* under construction reported from client.
* @throws IOException if path does not exist * @throws IOException if path does not exist
*/ */
void fsync(String src, String clientName) void fsync(String src, String clientName, long lastBlockLength)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName); NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
writeLock(); writeLock();
@ -2967,6 +2969,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot fsync file " + src, safeMode); throw new SafeModeException("Cannot fsync file " + src, safeMode);
} }
INodeFileUnderConstruction pendingFile = checkLease(src, clientName); INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
if (lastBlockLength > 0) {
pendingFile.updateLengthOfLastBlock(lastBlockLength);
}
dir.persistBlocks(src, pendingFile); dir.persistBlocks(src, pendingFile);
} finally { } finally {
writeUnlock(); writeUnlock();

View File

@ -170,4 +170,22 @@ class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollec
setBlock(numBlocks()-1, ucBlock); setBlock(numBlocks()-1, ucBlock);
return ucBlock; return ucBlock;
} }
/**
* Update the length for the last block
*
* @param lastBlockLength
* The length of the last block reported from client
* @throws IOException
*/
void updateLengthOfLastBlock(long lastBlockLength) throws IOException {
BlockInfo lastBlock = this.getLastBlock();
assert (lastBlock != null) : "The last block for path "
+ this.getFullPathName() + " is null when updating its length";
assert (lastBlock instanceof BlockInfoUnderConstruction) : "The last block for path "
+ this.getFullPathName()
+ " is not a BlockInfoUnderConstruction when updating its length";
lastBlock.setNumBytes(lastBlockLength);
}
} }

View File

@ -813,8 +813,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
} }
@Override // ClientProtocol @Override // ClientProtocol
public void fsync(String src, String clientName) throws IOException { public void fsync(String src, String clientName, long lastBlockLength)
namesystem.fsync(src, clientName); throws IOException {
namesystem.fsync(src, clientName, lastBlockLength);
} }
@Override // ClientProtocol @Override // ClientProtocol

View File

@ -357,6 +357,7 @@ message SetQuotaResponseProto { // void response
message FsyncRequestProto { message FsyncRequestProto {
required string src = 1; required string src = 1;
required string client = 2; required string client = 2;
optional sint64 lastBlockLength = 3 [default = -1];
} }
message FsyncResponseProto { // void response message FsyncResponseProto { // void response

View File

@ -23,12 +23,14 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.EnumSet;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
@ -43,15 +45,20 @@ public class TestHFlush {
private final String fName = "hflushtest.dat"; private final String fName = "hflushtest.dat";
/** The test uses {@link #doTheJob(Configuration, String, long, short) /**
* The test uses
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* to write a file with a standard block size * to write a file with a standard block size
*/ */
@Test @Test
public void hFlush_01() throws IOException { public void hFlush_01() throws IOException {
doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE, (short)2); doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
(short) 2, false, EnumSet.noneOf(SyncFlag.class));
} }
/** The test uses {@link #doTheJob(Configuration, String, long, short) /**
* The test uses
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* to write a file with a custom block size so the writes will be * to write a file with a custom block size so the writes will be
* happening across block' boundaries * happening across block' boundaries
*/ */
@ -64,10 +71,13 @@ public class TestHFlush {
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short)2); doTheJob(conf, fName, customBlockSize, (short) 2, false,
EnumSet.noneOf(SyncFlag.class));
} }
/** The test uses {@link #doTheJob(Configuration, String, long, short) /**
* The test uses
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* to write a file with a custom block size so the writes will be * to write a file with a custom block size so the writes will be
* happening across block's and checksum' boundaries * happening across block's and checksum' boundaries
*/ */
@ -80,22 +90,106 @@ public class TestHFlush {
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short)2); doTheJob(conf, fName, customBlockSize, (short) 2, false,
EnumSet.noneOf(SyncFlag.class));
} }
/** /**
The method starts new cluster with defined Configuration; * Test hsync (with updating block length in NameNode) while no data is
creates a file with specified block_size and writes 10 equal sections in it; * actually written yet
it also calls hflush() after each write and throws an IOException in case of */
an error. @Test
@param conf cluster configuration public void hSyncUpdateLength_00() throws IOException {
@param fileName of the file to be created and processed as required Configuration conf = new HdfsConfiguration();
@param block_size value to be used for the file's creation MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
@param replicas is the number of replicas 2).build();
@throws IOException in case of any errors DistributedFileSystem fileSystem =
(DistributedFileSystem)cluster.getFileSystem();
try {
Path path = new Path(fName);
FSDataOutputStream stm = fileSystem.create(path, true, 4096, (short) 2,
AppendTestUtil.BLOCK_SIZE);
System.out.println("Created file " + path.toString());
((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
.of(SyncFlag.UPDATE_LENGTH));
long currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(0L, currentFileLength);
stm.close();
} finally {
fileSystem.close();
cluster.shutdown();
}
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
*/
@Test
public void hSyncUpdateLength_01() throws IOException {
doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
(short) 2, true, EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
* Similar with {@link #hFlush_02()} , it writes a file with a custom block
* size so the writes will be happening across block' boundaries
*/
@Test
public void hSyncUpdateLength_02() throws IOException {
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 512;
int customBlockSize = customPerChecksumSize * 3;
// Modify defaul filesystem settings
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
* Similar with {@link #hFlush_03()} , it writes a file with a custom block
* size so the writes will be happening across block's and checksum'
* boundaries.
*/
@Test
public void hSyncUpdateLength_03() throws IOException {
Configuration conf = new HdfsConfiguration();
int customPerChecksumSize = 400;
int customBlockSize = customPerChecksumSize * 3;
// Modify defaul filesystem settings
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
/**
* The method starts new cluster with defined Configuration; creates a file
* with specified block_size and writes 10 equal sections in it; it also calls
* hflush/hsync after each write and throws an IOException in case of an error.
*
* @param conf cluster configuration
* @param fileName of the file to be created and processed as required
* @param block_size value to be used for the file's creation
* @param replicas is the number of replicas
* @param isSync hsync or hflush
* @param syncFlags specify the semantic of the sync/flush
* @throws IOException in case of any errors
*/ */
public static void doTheJob(Configuration conf, final String fileName, public static void doTheJob(Configuration conf, final String fileName,
long block_size, short replicas) throws IOException { long block_size, short replicas, boolean isSync,
EnumSet<SyncFlag> syncFlags) throws IOException {
byte[] fileContent; byte[] fileContent;
final int SECTIONS = 10; final int SECTIONS = 10;
@ -119,8 +213,21 @@ public class TestHFlush {
System.out.println("Writing " + (tenth * i) + " to " + (tenth * (i+1)) + " section to file " + fileName); System.out.println("Writing " + (tenth * i) + " to " + (tenth * (i+1)) + " section to file " + fileName);
// write to the file // write to the file
stm.write(fileContent, tenth * i, tenth); stm.write(fileContent, tenth * i, tenth);
// Wait while hflush() pushes all packets through built pipeline
// Wait while hflush/hsync pushes all packets through built pipeline
if (isSync) {
((DFSOutputStream)stm.getWrappedStream()).hsync(syncFlags);
} else {
((DFSOutputStream)stm.getWrappedStream()).hflush(); ((DFSOutputStream)stm.getWrappedStream()).hflush();
}
// Check file length if updatelength is required
if (isSync && syncFlags.contains(SyncFlag.UPDATE_LENGTH)) {
long currentFileLength = fileSystem.getFileStatus(path).getLen();
assertEquals(
"File size doesn't match for hsync/hflush with updating the length",
tenth * (i + 1), currentFileLength);
}
byte [] toRead = new byte[tenth]; byte [] toRead = new byte[tenth];
byte [] expected = new byte[tenth]; byte [] expected = new byte[tenth];
System.arraycopy(fileContent, tenth * i, expected, 0, tenth); System.arraycopy(fileContent, tenth * i, expected, 0, tenth);
@ -139,8 +246,6 @@ public class TestHFlush {
assertEquals("File size doesn't match ", AppendTestUtil.FILE_SIZE, fileSystem.getFileStatus(path).getLen()); assertEquals("File size doesn't match ", AppendTestUtil.FILE_SIZE, fileSystem.getFileStatus(path).getLen());
AppendTestUtil.checkFullFile(fileSystem, path, fileContent.length, fileContent, "hflush()"); AppendTestUtil.checkFullFile(fileSystem, path, fileContent.length, fileContent, "hflush()");
} catch (Exception e) {
e.printStackTrace();
} finally { } finally {
fileSystem.close(); fileSystem.close();
cluster.shutdown(); cluster.shutdown();