Merging r1544304 through r1544665 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1544672 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-11-22 20:51:06 +00:00
commit f48ea1b516
40 changed files with 1026 additions and 1022 deletions

View File

@ -17,42 +17,111 @@
*/
package org.apache.hadoop.portmap;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.oncrpc.RpcProgram;
import org.apache.hadoop.oncrpc.SimpleTcpServer;
import org.apache.hadoop.oncrpc.SimpleUdpServer;
import org.apache.hadoop.oncrpc.RpcUtil;
import org.apache.hadoop.util.StringUtils;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.HashedWheelTimer;
import com.google.common.annotations.VisibleForTesting;
/**
* Portmap service for binding RPC protocols. See RFC 1833 for details.
*/
public class Portmap {
public static final Log LOG = LogFactory.getLog(Portmap.class);
final class Portmap {
private static final Log LOG = LogFactory.getLog(Portmap.class);
private static final int DEFAULT_IDLE_TIME_MILLISECONDS = 5000;
private static void startUDPServer(RpcProgramPortmap rpcProgram) {
rpcProgram.register(PortmapMapping.TRANSPORT_UDP, RpcProgram.RPCB_PORT);
SimpleUdpServer udpServer = new SimpleUdpServer(RpcProgram.RPCB_PORT,
rpcProgram, 1);
udpServer.run();
}
private static void startTCPServer(final RpcProgramPortmap rpcProgram) {
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT);
SimpleTcpServer tcpServer = new SimpleTcpServer(RpcProgram.RPCB_PORT,
rpcProgram, 1);
tcpServer.run();
}
private ConnectionlessBootstrap udpServer;
private ServerBootstrap tcpServer;
private ChannelGroup allChannels = new DefaultChannelGroup();
private Channel udpChannel;
private Channel tcpChannel;
private final RpcProgramPortmap handler = new RpcProgramPortmap(allChannels);
public static void main(String[] args) {
StringUtils.startupShutdownMessage(Portmap.class, args, LOG);
RpcProgramPortmap program = new RpcProgramPortmap();
final int port = RpcProgram.RPCB_PORT;
Portmap pm = new Portmap();
try {
startUDPServer(program);
startTCPServer(program);
pm.start(DEFAULT_IDLE_TIME_MILLISECONDS,
new InetSocketAddress(port), new InetSocketAddress(port));
} catch (Throwable e) {
LOG.fatal("Start server failure");
LOG.fatal("Failed to start the server. Cause:" + e.getMessage());
pm.shutdown();
System.exit(-1);
}
}
void shutdown() {
allChannels.close().awaitUninterruptibly();
tcpServer.releaseExternalResources();
udpServer.releaseExternalResources();
}
@VisibleForTesting
SocketAddress getTcpServerLocalAddress() {
return tcpChannel.getLocalAddress();
}
@VisibleForTesting
SocketAddress getUdpServerLoAddress() {
return udpChannel.getLocalAddress();
}
@VisibleForTesting
RpcProgramPortmap getHandler() {
return handler;
}
void start(final int idleTimeMilliSeconds, final SocketAddress tcpAddress,
final SocketAddress udpAddress) {
tcpServer = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
tcpServer.setPipelineFactory(new ChannelPipelineFactory() {
private final HashedWheelTimer timer = new HashedWheelTimer();
private final IdleStateHandler idleStateHandler = new IdleStateHandler(
timer, 0, 0, idleTimeMilliSeconds, TimeUnit.MILLISECONDS);
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(RpcUtil.constructRpcFrameDecoder(),
RpcUtil.STAGE_RPC_MESSAGE_PARSER, idleStateHandler, handler,
RpcUtil.STAGE_RPC_TCP_RESPONSE);
}
});
udpServer = new ConnectionlessBootstrap(new NioDatagramChannelFactory(
Executors.newCachedThreadPool()));
udpServer.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
handler, RpcUtil.STAGE_RPC_UDP_RESPONSE));
tcpChannel = tcpServer.bind(tcpAddress);
udpChannel = udpServer.bind(udpAddress);
allChannels.add(tcpChannel);
allChannels.add(udpChannel);
LOG.info("Portmap server started at tcp://" + tcpChannel.getLocalAddress()
+ ", udp://" + udpChannel.getLocalAddress());
}
}

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.portmap;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.oncrpc.RpcAcceptedReply;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
@ -45,18 +42,13 @@ public class PortmapResponse {
return xdr;
}
public static XDR pmapList(XDR xdr, int xid, Collection<PortmapMapping> list) {
public static XDR pmapList(XDR xdr, int xid, PortmapMapping[] list) {
RpcAcceptedReply.getAcceptInstance(xid, new VerifierNone()).write(xdr);
for (PortmapMapping mapping : list) {
System.out.println(mapping);
xdr.writeBoolean(true); // Value follows
mapping.serialize(xdr);
}
xdr.writeBoolean(false); // No value follows
return xdr;
}
public static XDR pmapList(XDR xdr, int xid, PortmapMapping[] list) {
return pmapList(xdr, xid, Arrays.asList(list));
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.portmap;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,31 +32,34 @@ import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.handler.timeout.IdleState;
import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
import org.jboss.netty.handler.timeout.IdleStateEvent;
/**
* An rpcbind request handler.
*/
public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
public static final int PROGRAM = 100000;
public static final int VERSION = 2;
final class RpcProgramPortmap extends IdleStateAwareChannelUpstreamHandler implements PortmapInterface {
static final int PROGRAM = 100000;
static final int VERSION = 2;
private static final Log LOG = LogFactory.getLog(RpcProgramPortmap.class);
/** Map synchronized usis monitor lock of this instance */
private final HashMap<String, PortmapMapping> map;
public RpcProgramPortmap() {
super("portmap", "localhost", RPCB_PORT, PROGRAM, VERSION, VERSION);
map = new HashMap<String, PortmapMapping>(256);
}
/** ChannelGroup that remembers all active channels for gracefully shutdown. */
private final ChannelGroup allChannels;
/** Dump all the register RPC services */
private synchronized void dumpRpcServices() {
Set<Entry<String, PortmapMapping>> entrySet = map.entrySet();
for (Entry<String, PortmapMapping> entry : entrySet) {
LOG.info("Service: " + entry.getKey() + " portmapping: "
+ entry.getValue());
}
RpcProgramPortmap(ChannelGroup allChannels) {
this.allChannels = allChannels;
map = new HashMap<String, PortmapMapping>(256);
PortmapMapping m = new PortmapMapping(PROGRAM, VERSION,
PortmapMapping.TRANSPORT_TCP, RpcProgram.RPCB_PORT);
PortmapMapping m1 = new PortmapMapping(PROGRAM, VERSION,
PortmapMapping.TRANSPORT_UDP, RpcProgram.RPCB_PORT);
map.put(PortmapMapping.key(m), m);
map.put(PortmapMapping.key(m1), m1);
}
@Override
@ -77,7 +78,6 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
PortmapMapping value = null;
synchronized(this) {
map.put(key, mapping);
dumpRpcServices();
value = map.get(key);
}
return PortmapResponse.intReply(out, xid, value.getPort());
@ -126,21 +126,15 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
}
@Override
public void register(PortmapMapping mapping) {
String key = PortmapMapping.key(mapping);
synchronized(this) {
map.put(key, mapping);
}
}
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
@Override
public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
RpcInfo info = (RpcInfo) e.getMessage();
RpcCall rpcCall = (RpcCall) info.header();
final Procedure portmapProc = Procedure.fromValue(rpcCall.getProcedure());
int xid = rpcCall.getXid();
byte[] data = new byte[info.data().readableBytes()];
info.data().readBytes(data);
XDR in = new XDR(data);
XDR in = new XDR(info.data().toByteBuffer().asReadOnlyBuffer(),
XDR.State.READING);
XDR out = new XDR();
if (portmapProc == Procedure.PMAPPROC_NULL) {
@ -162,13 +156,29 @@ public class RpcProgramPortmap extends RpcProgram implements PortmapInterface {
reply.write(out);
}
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap()
.buffer());
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
RpcUtil.sendRpcResponse(ctx, rsp);
}
@Override
protected boolean isIdempotent(RpcCall call) {
return false;
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
allChannels.add(e.getChannel());
}
@Override
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
throws Exception {
if (e.getState() == IdleState.ALL_IDLE) {
e.getChannel().close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
LOG.warn("Encountered ", e.getCause());
e.getChannel().close();
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.portmap;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.HashMap;
import junit.framework.Assert;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
public class TestPortmap {
private static Portmap pm = new Portmap();
private static final int SHORT_TIMEOUT_MILLISECONDS = 10;
private static final int RETRY_TIMES = 5;
private int xid;
@BeforeClass
public static void setup() {
pm.start(SHORT_TIMEOUT_MILLISECONDS, new InetSocketAddress("localhost", 0),
new InetSocketAddress("localhost", 0));
}
@AfterClass
public static void tearDown() {
pm.shutdown();
}
@Test(timeout = 1000)
public void testIdle() throws InterruptedException, IOException {
Socket s = new Socket();
try {
s.connect(pm.getTcpServerLocalAddress());
int i = 0;
while (!s.isConnected() && i < RETRY_TIMES) {
++i;
Thread.sleep(SHORT_TIMEOUT_MILLISECONDS);
}
Assert.assertTrue("Failed to connect to the server", s.isConnected()
&& i < RETRY_TIMES);
int b = s.getInputStream().read();
Assert.assertTrue("The server failed to disconnect", b == -1);
} finally {
s.close();
}
}
@Test(timeout = 1000)
public void testRegistration() throws IOException, InterruptedException {
XDR req = new XDR();
RpcCall.getInstance(++xid, RpcProgramPortmap.PROGRAM,
RpcProgramPortmap.VERSION,
PortmapInterface.Procedure.PMAPPROC_SET.getValue(),
new CredentialsNone(), new VerifierNone()).write(req);
PortmapMapping sent = new PortmapMapping(90000, 1,
PortmapMapping.TRANSPORT_TCP, 1234);
sent.serialize(req);
byte[] reqBuf = req.getBytes();
DatagramSocket s = new DatagramSocket();
DatagramPacket p = new DatagramPacket(reqBuf, reqBuf.length,
pm.getUdpServerLoAddress());
try {
s.send(p);
} finally {
s.close();
}
// Give the server a chance to process the request
Thread.sleep(100);
boolean found = false;
@SuppressWarnings("unchecked")
HashMap<String, PortmapMapping> map = (HashMap<String, PortmapMapping>) Whitebox
.getInternalState(pm.getHandler(), "map");
for (PortmapMapping m : map.values()) {
if (m.getPort() == sent.getPort()
&& PortmapMapping.key(m).equals(PortmapMapping.key(sent))) {
found = true;
break;
}
}
Assert.assertTrue("Registration failed", found);
}
}

View File

@ -99,7 +99,7 @@ class DFSClientCache {
this.config = config;
this.clientCache = CacheBuilder.newBuilder()
.maximumSize(clientCache)
.removalListener(clientRemovealListener())
.removalListener(clientRemovalListener())
.build(clientLoader());
this.inputstreamCache = CacheBuilder.newBuilder()
@ -127,7 +127,7 @@ class DFSClientCache {
};
}
private RemovalListener<String, DFSClient> clientRemovealListener() {
private RemovalListener<String, DFSClient> clientRemovalListener() {
return new RemovalListener<String, DFSClient>() {
@Override
public void onRemoval(RemovalNotification<String, DFSClient> notification) {

View File

@ -1,141 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.nfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.oncrpc.RegistrationClient;
import org.apache.hadoop.oncrpc.RpcCall;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.CredentialsNone;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.portmap.PortmapMapping;
import org.apache.hadoop.portmap.PortmapRequest;
public class TestPortmapRegister {
public static final Log LOG = LogFactory.getLog(TestPortmapRegister.class);
static void testRequest(XDR request, XDR request2) {
RegistrationClient registrationClient = new RegistrationClient(
"localhost", Nfs3Constant.SUN_RPCBIND, request);
registrationClient.run();
}
public static void main(String[] args) throws InterruptedException {
PortmapMapping mapEntry = new PortmapMapping(RpcProgramMountd.PROGRAM,
RpcProgramMountd.VERSION_1, PortmapMapping.TRANSPORT_UDP,
RpcProgramMountd.PORT);
XDR mappingRequest = PortmapRequest.create(mapEntry);
RegistrationClient registrationClient = new RegistrationClient(
"localhost", Nfs3Constant.SUN_RPCBIND, mappingRequest);
registrationClient.run();
Thread t1 = new Runtest1();
//Thread t2 = testa.new Runtest2();
t1.start();
//t2.start();
t1.join();
//t2.join();
//testDump();
}
static class Runtest1 extends Thread {
@Override
public void run() {
//testGetportMount();
PortmapMapping mapEntry = new PortmapMapping(RpcProgramMountd.PROGRAM,
RpcProgramMountd.VERSION_1, PortmapMapping.TRANSPORT_UDP,
RpcProgramMountd.PORT);
XDR req = PortmapRequest.create(mapEntry);
testRequest(req, req);
}
}
static class Runtest2 extends Thread {
@Override
public void run() {
testDump();
}
}
static void createPortmapXDRheader(XDR xdr_out, int procedure) {
// TODO: Move this to RpcRequest
RpcCall.getInstance(0, 100000, 2, procedure, new CredentialsNone(),
new VerifierNone()).write(xdr_out);
/*
xdr_out.putInt(1); //unix auth
xdr_out.putVariableOpaque(new byte[20]);
xdr_out.putInt(0);
xdr_out.putInt(0);
*/
}
static void testGetportMount() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 3);
xdr_out.writeInt(100005);
xdr_out.writeInt(1);
xdr_out.writeInt(6);
xdr_out.writeInt(0);
XDR request2 = new XDR();
createPortmapXDRheader(xdr_out, 3);
request2.writeInt(100005);
request2.writeInt(1);
request2.writeInt(6);
request2.writeInt(0);
testRequest(xdr_out, request2);
}
static void testGetport() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 3);
xdr_out.writeInt(100003);
xdr_out.writeInt(3);
xdr_out.writeInt(6);
xdr_out.writeInt(0);
XDR request2 = new XDR();
createPortmapXDRheader(xdr_out, 3);
request2.writeInt(100003);
request2.writeInt(3);
request2.writeInt(6);
request2.writeInt(0);
testRequest(xdr_out, request2);
}
static void testDump() {
XDR xdr_out = new XDR();
createPortmapXDRheader(xdr_out, 4);
testRequest(xdr_out, xdr_out);
}
}

View File

@ -13,9 +13,6 @@ Trunk (Unreleased)
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
HDFS-5444. Choose default web UI based on browser capabilities. (Haohui Mai
via jing9)
IMPROVEMENTS
HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
@ -202,8 +199,6 @@ Trunk (Unreleased)
HDFS-5511. improve CacheManipulator interface to allow better unit testing
(cmccabe)
HDFS-5525. Inline dust templates for new Web UI. (Haohui Mai via jing9)
HDFS-5451. Add byte and file statistics to PathBasedCacheEntry.
(Colin Patrick McCabe via Andrew Wang)
@ -213,6 +208,10 @@ Trunk (Unreleased)
HDFS-5473. Consistent naming of user-visible caching classes and methods
(cmccabe)
HDFS-5285. Flatten INodeFile hierarchy: Replace INodeFileUnderConstruction
and INodeFileUnderConstructionWithSnapshot with FileUnderContructionFeature.
(jing9 via szetszwo)
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
@ -397,6 +396,9 @@ Trunk (Unreleased)
HDFS-5513. CacheAdmin commands fail when using . as the path. (wang)
HDFS-5543. Fix narrow race condition in TestPathBasedCacheRequests
(cmccabe)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -422,6 +424,9 @@ Release 2.3.0 - UNRELEASED
HDFS-3987. Support webhdfs over HTTPS. (Haohui Mai via jing9)
HDFS-5444. Choose default web UI based on browser capabilities. (Haohui Mai
via jing9)
IMPROVEMENTS
HDFS-5267. Remove volatile from LightWeightHashSet. (Junping Du via llu)
@ -528,6 +533,8 @@ Release 2.3.0 - UNRELEASED
HDFS-5532. Enable the webhdfs by default to support new HDFS web UI. (Vinay
via jing9)
HDFS-5525. Inline dust templates for new Web UI. (Haohui Mai via jing9)
OPTIMIZATIONS
@ -602,6 +609,9 @@ Release 2.3.0 - UNRELEASED
HDFS-5428. Under construction files deletion after snapshot+checkpoint+nn restart
leads nn safemode. (jing9)
HDFS-5552. Fix wrong information of "Cluster summay" in dfshealth.html.
(Haohui Mai via jing9)
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES
@ -622,6 +632,8 @@ Release 2.2.1 - UNRELEASED
HDFS-5344. Make LsSnapshottableDir as Tool interface implementation. (Sathish via umamahesh)
HDFS-5544. Adding Test case For Checking dfs.checksum type as NULL value. (Sathish via umamahesh)
OPTIMIZATIONS
BUG FIXES
@ -711,6 +723,10 @@ Release 2.2.1 - UNRELEASED
HDFS-5014. Process register commands with out holding BPOfferService lock.
(Vinaykumar B via umamahesh)
HDFS-5288. Close idle connections in portmap (Haohui Mai via brandonli)
HDFS-5407. Fix typos in DFSClientCache (Haohui Mai via brandonli)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -64,4 +64,21 @@ public interface BlockCollection {
* Get the name of the collection.
*/
public String getName();
/**
* Set the block at the given index.
*/
public void setBlock(int index, BlockInfo blk);
/**
* Convert the last block of the collection to an under-construction block
* and set the locations.
*/
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
DatanodeStorageInfo[] targets) throws IOException;
/**
* @return whether the block collection is under construction.
*/
public boolean isUnderConstruction();
}

View File

@ -563,7 +563,7 @@ public class BlockManager {
* @throws IOException if the block does not have at least a minimal number
* of replicas reported from data-nodes.
*/
public boolean commitOrCompleteLastBlock(MutableBlockCollection bc,
public boolean commitOrCompleteLastBlock(BlockCollection bc,
Block commitBlock) throws IOException {
if(commitBlock == null)
return false; // not committing, this is a block allocation retry
@ -586,7 +586,7 @@ public class BlockManager {
* @throws IOException if the block does not have at least a minimal number
* of replicas reported from data-nodes.
*/
private BlockInfo completeBlock(final MutableBlockCollection bc,
private BlockInfo completeBlock(final BlockCollection bc,
final int blkIndex, boolean force) throws IOException {
if(blkIndex < 0)
return null;
@ -619,7 +619,7 @@ public class BlockManager {
return blocksMap.replaceBlock(completeBlock);
}
private BlockInfo completeBlock(final MutableBlockCollection bc,
private BlockInfo completeBlock(final BlockCollection bc,
final BlockInfo block, boolean force) throws IOException {
BlockInfo[] fileBlocks = bc.getBlocks();
for(int idx = 0; idx < fileBlocks.length; idx++)
@ -634,7 +634,7 @@ public class BlockManager {
* regardless of whether enough replicas are present. This is necessary
* when tailing edit logs as a Standby.
*/
public BlockInfo forceCompleteBlock(final MutableBlockCollection bc,
public BlockInfo forceCompleteBlock(final BlockCollection bc,
final BlockInfoUnderConstruction block) throws IOException {
block.commitBlock(block);
return completeBlock(bc, block, true);
@ -655,7 +655,7 @@ public class BlockManager {
* @return the last block locations if the block is partial or null otherwise
*/
public LocatedBlock convertLastBlockToUnderConstruction(
MutableBlockCollection bc) throws IOException {
BlockCollection bc) throws IOException {
BlockInfo oldBlock = bc.getLastBlock();
if(oldBlock == null ||
bc.getPreferredBlockSize() == oldBlock.getNumBytes())
@ -1214,7 +1214,7 @@ public class BlockManager {
// block should belong to a file
bc = blocksMap.getBlockCollection(block);
// abandoned block or block reopened for append
if(bc == null || bc instanceof MutableBlockCollection) {
if(bc == null || bc.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
continue;
}
@ -1295,7 +1295,7 @@ public class BlockManager {
// block should belong to a file
bc = blocksMap.getBlockCollection(block);
// abandoned block or block reopened for append
if(bc == null || bc instanceof MutableBlockCollection) {
if(bc == null || bc.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
continue;
@ -2161,7 +2161,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
&& numCurrentReplica >= minReplication) {
completeBlock((MutableBlockCollection)storedBlock.getBlockCollection(), storedBlock, false);
completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
} else if (storedBlock.isComplete()) {
// check whether safe replication is reached for the block
// only complete blocks are counted towards that.
@ -2232,7 +2232,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
numLiveReplicas >= minReplication) {
storedBlock = completeBlock((MutableBlockCollection)bc, storedBlock, false);
storedBlock = completeBlock(bc, storedBlock, false);
} else if (storedBlock.isComplete()) {
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
@ -2243,7 +2243,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
}
// if file is under construction, then done for now
if (bc instanceof MutableBlockCollection) {
if (bc.isUnderConstruction()) {
return storedBlock;
}
@ -2856,7 +2856,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+ ", corrupt replicas: " + num.corruptReplicas()
+ ", decommissioned replicas: " + num.decommissionedReplicas()
+ ", excess replicas: " + num.excessReplicas()
+ ", Is Open File: " + (bc instanceof MutableBlockCollection)
+ ", Is Open File: " + bc.isUnderConstruction()
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ srcNode + ", Is current datanode decommissioning: "
+ srcNode.isDecommissionInProgress());
@ -2915,7 +2915,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
decommissionOnlyReplicas++;
}
if (bc instanceof MutableBlockCollection) {
if (bc.isUnderConstruction()) {
underReplicatedInOpenFiles++;
}
}

View File

@ -277,13 +277,9 @@ public class FSDirectory implements Closeable {
* @throws UnresolvedLinkException
* @throws SnapshotAccessControlException
*/
INodeFileUnderConstruction addFile(String path,
PermissionStatus permissions,
short replication,
long preferredBlockSize,
String clientName,
String clientMachine,
DatanodeDescriptor clientNode)
INodeFile addFile(String path, PermissionStatus permissions,
short replication, long preferredBlockSize, String clientName,
String clientMachine, DatanodeDescriptor clientNode)
throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException {
waitForReady();
@ -301,11 +297,11 @@ public class FSDirectory implements Closeable {
if (!mkdirs(parent.toString(), permissions, true, modTime)) {
return null;
}
INodeFileUnderConstruction newNode = new INodeFileUnderConstruction(
namesystem.allocateNewInodeId(),
permissions,replication,
preferredBlockSize, modTime, clientName,
clientMachine, clientNode);
INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null,
permissions, modTime, modTime, BlockInfo.EMPTY_ARRAY, replication,
preferredBlockSize);
newNode.toUnderConstruction(clientName, clientMachine, clientNode);
boolean added = false;
writeLock();
try {
@ -337,8 +333,11 @@ public class FSDirectory implements Closeable {
final INodeFile newNode;
assert hasWriteLock();
if (underConstruction) {
newNode = new INodeFileUnderConstruction(id, permissions, replication,
preferredBlockSize, modificationTime, clientName, clientMachine, null);
newNode = new INodeFile(id, null, permissions, modificationTime,
modificationTime, BlockInfo.EMPTY_ARRAY, replication,
preferredBlockSize);
newNode.toUnderConstruction(clientName, clientMachine, null);
} else {
newNode = new INodeFile(id, null, permissions, modificationTime, atime,
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize);
@ -367,8 +366,8 @@ public class FSDirectory implements Closeable {
writeLock();
try {
final INodeFileUnderConstruction fileINode =
INodeFileUnderConstruction.valueOf(inodesInPath.getLastINode(), path);
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
Preconditions.checkState(fileINode.isUnderConstruction());
// check quota limits and updated space consumed
updateCount(inodesInPath, 0, fileINode.getBlockDiskspace(), true);
@ -398,8 +397,8 @@ public class FSDirectory implements Closeable {
/**
* Persist the block list for the inode.
*/
void persistBlocks(String path, INodeFileUnderConstruction file,
boolean logRetryCache) {
void persistBlocks(String path, INodeFile file, boolean logRetryCache) {
Preconditions.checkArgument(file.isUnderConstruction());
waitForReady();
writeLock();
@ -438,8 +437,9 @@ public class FSDirectory implements Closeable {
* Remove a block from the file.
* @return Whether the block exists in the corresponding file
*/
boolean removeBlock(String path, INodeFileUnderConstruction fileNode,
Block block) throws IOException {
boolean removeBlock(String path, INodeFile fileNode, Block block)
throws IOException {
Preconditions.checkArgument(fileNode.isUnderConstruction());
waitForReady();
writeLock();
@ -451,7 +451,8 @@ public class FSDirectory implements Closeable {
}
boolean unprotectedRemoveBlock(String path,
INodeFileUnderConstruction fileNode, Block block) throws IOException {
INodeFile fileNode, Block block) throws IOException {
Preconditions.checkArgument(fileNode.isUnderConstruction());
// modify file-> block and blocksMap
boolean removed = fileNode.removeLastBlock(block);
if (!removed) {
@ -1478,38 +1479,6 @@ public class FSDirectory implements Closeable {
}
}
/**
* Replaces the specified INodeFile with the specified one.
*/
void replaceINodeFile(String path, INodeFile oldnode,
INodeFile newnode) throws IOException {
writeLock();
try {
unprotectedReplaceINodeFile(path, oldnode, newnode);
} finally {
writeUnlock();
}
}
/** Replace an INodeFile and record modification for the latest snapshot. */
void unprotectedReplaceINodeFile(final String path, final INodeFile oldnode,
final INodeFile newnode) {
Preconditions.checkState(hasWriteLock());
oldnode.getParent().replaceChild(oldnode, newnode, inodeMap);
oldnode.clear();
/* Currently oldnode and newnode are assumed to contain the same
* blocks. Otherwise, blocks need to be removed from the blocksMap.
*/
int index = 0;
for (BlockInfo b : newnode.getBlocks()) {
BlockInfo info = getBlockManager().addBlockCollection(b, newnode);
newnode.setBlock(index, info); // inode refers to the block in BlocksMap
index++;
}
}
/**
* Get a partial listing of the indicated directory
*

View File

@ -680,8 +680,8 @@ public class FSEditLog implements LogsPurgeable {
* Add open lease record to edit log.
* Records the block locations of the last block.
*/
public void logOpenFile(String path, INodeFileUnderConstruction newNode,
boolean toLogRpcIds) {
public void logOpenFile(String path, INodeFile newNode, boolean toLogRpcIds) {
Preconditions.checkArgument(newNode.isUnderConstruction());
AddOp op = AddOp.getInstance(cache.get())
.setInodeId(newNode.getId())
.setPath(path)
@ -691,8 +691,8 @@ public class FSEditLog implements LogsPurgeable {
.setBlockSize(newNode.getPreferredBlockSize())
.setBlocks(newNode.getBlocks())
.setPermissionStatus(newNode.getPermissionStatus())
.setClientName(newNode.getClientName())
.setClientMachine(newNode.getClientMachine());
.setClientName(newNode.getFileUnderConstructionFeature().getClientName())
.setClientMachine(newNode.getFileUnderConstructionFeature().getClientMachine());
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
@ -713,8 +713,8 @@ public class FSEditLog implements LogsPurgeable {
logEdit(op);
}
public void logUpdateBlocks(String path, INodeFileUnderConstruction file,
boolean toLogRpcIds) {
public void logUpdateBlocks(String path, INodeFile file, boolean toLogRpcIds) {
Preconditions.checkArgument(file.isUnderConstruction());
UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
.setPath(path)
.setBlocks(file.getBlocks());

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.util.ChunkedArrayList;
import org.apache.hadoop.hdfs.util.Holder;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -369,15 +370,15 @@ public class FSEditLogLoader {
}
final INodesInPath iip = fsDir.getLastINodeInPath(addCloseOp.path);
final INodeFile oldFile = INodeFile.valueOf(iip.getINode(0), addCloseOp.path);
final INodeFile file = INodeFile.valueOf(iip.getINode(0), addCloseOp.path);
// Update the salient file attributes.
oldFile.setAccessTime(addCloseOp.atime, null, fsDir.getINodeMap());
oldFile.setModificationTime(addCloseOp.mtime, null, fsDir.getINodeMap());
updateBlocks(fsDir, addCloseOp, oldFile);
file.setAccessTime(addCloseOp.atime, null, fsDir.getINodeMap());
file.setModificationTime(addCloseOp.mtime, null, fsDir.getINodeMap());
updateBlocks(fsDir, addCloseOp, file);
// Now close the file
if (!oldFile.isUnderConstruction() &&
if (!file.isUnderConstruction() &&
logVersion <= LayoutVersion.BUGFIX_HDFS_2991_VERSION) {
// There was a bug (HDFS-2991) in hadoop < 0.23.1 where OP_CLOSE
// could show up twice in a row. But after that version, this
@ -387,11 +388,9 @@ public class FSEditLogLoader {
}
// One might expect that you could use removeLease(holder, path) here,
// but OP_CLOSE doesn't serialize the holder. So, remove by path.
if (oldFile.isUnderConstruction()) {
INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
if (file.isUnderConstruction()) {
fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
INodeFile newFile = ucFile.toINodeFile(ucFile.getModificationTime());
fsDir.unprotectedReplaceINodeFile(addCloseOp.path, ucFile, newFile);
file.toCompleteFile(file.getModificationTime());
}
break;
}
@ -564,9 +563,8 @@ public class FSEditLogLoader {
Lease lease = fsNamesys.leaseManager.getLease(
reassignLeaseOp.leaseHolder);
INodeFileUnderConstruction pendingFile =
INodeFileUnderConstruction.valueOf(
fsDir.getINode(reassignLeaseOp.path), reassignLeaseOp.path);
INodeFile pendingFile = fsDir.getINode(reassignLeaseOp.path).asFile();
Preconditions.checkState(pendingFile.isUnderConstruction());
fsNamesys.reassignLeaseInternal(lease,
reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
break;
@ -751,9 +749,8 @@ public class FSEditLogLoader {
if (oldBlock instanceof BlockInfoUnderConstruction &&
(!isLastBlock || op.shouldCompleteLastBlock())) {
changeMade = true;
fsNamesys.getBlockManager().forceCompleteBlock(
(INodeFileUnderConstruction)file,
(BlockInfoUnderConstruction)oldBlock);
fsNamesys.getBlockManager().forceCompleteBlock(file,
(BlockInfoUnderConstruction) oldBlock);
}
if (changeMade) {
// The state or gen-stamp of the block has changed. So, we may be
@ -774,8 +771,7 @@ public class FSEditLogLoader {
+ path);
}
Block oldBlock = oldBlocks[oldBlocks.length - 1];
boolean removed = fsDir.unprotectedRemoveBlock(path,
(INodeFileUnderConstruction) file, oldBlock);
boolean removed = fsDir.unprotectedRemoveBlock(path, file, oldBlock);
if (!removed && !(op instanceof UpdateBlocksOp)) {
throw new IOException("Trying to delete non-existant block " + oldBlock);
}

View File

@ -55,7 +55,6 @@ import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat;
@ -659,13 +658,10 @@ public class FSImageFormat {
// file
// read blocks
BlockInfo[] blocks = null;
if (numBlocks >= 0) {
blocks = new BlockInfo[numBlocks];
for (int j = 0; j < numBlocks; j++) {
blocks[j] = new BlockInfo(replication);
blocks[j].readFields(in);
}
BlockInfo[] blocks = new BlockInfo[numBlocks];
for (int j = 0; j < numBlocks; j++) {
blocks[j] = new BlockInfo(replication);
blocks[j].readFields(in);
}
String clientName = "";
@ -700,10 +696,9 @@ public class FSImageFormat {
final INodeFile file = new INodeFile(inodeId, localName, permissions,
modificationTime, atime, blocks, replication, blockSize);
if (underConstruction) {
INodeFileUnderConstruction fileUC = new INodeFileUnderConstruction(
file, clientName, clientMachine, null);
return fileDiffs == null ? fileUC :
new INodeFileUnderConstructionWithSnapshot(fileUC, fileDiffs);
file.toUnderConstruction(clientName, clientMachine, null);
return fileDiffs == null ? file : new INodeFileWithSnapshot(file,
fileDiffs);
} else {
return fileDiffs == null ? file :
new INodeFileWithSnapshot(file, fileDiffs);
@ -829,8 +824,8 @@ public class FSImageFormat {
LOG.info("Number of files under construction = " + size);
for (int i = 0; i < size; i++) {
INodeFileUnderConstruction cons = FSImageSerialization
.readINodeUnderConstruction(in, namesystem, getLayoutVersion());
INodeFile cons = FSImageSerialization.readINodeUnderConstruction(in,
namesystem, getLayoutVersion());
counter.increment();
// verify that file exists in namespace
@ -848,33 +843,21 @@ public class FSImageFormat {
final INodesInPath iip = fsDir.getLastINodeInPath(path);
oldnode = INodeFile.valueOf(iip.getINode(0), path);
}
cons.setLocalName(oldnode.getLocalNameBytes());
INodeReference parentRef = oldnode.getParentReference();
if (parentRef != null) {
cons.setParentReference(parentRef);
} else {
cons.setParent(oldnode.getParent());
}
if (oldnode instanceof INodeFileWithSnapshot) {
cons = new INodeFileUnderConstructionWithSnapshot(cons,
((INodeFileWithSnapshot) oldnode).getDiffs());
FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();
oldnode.toUnderConstruction(uc.getClientName(), uc.getClientMachine(),
uc.getClientNode());
if (oldnode.numBlocks() > 0) {
BlockInfo ucBlock = cons.getLastBlock();
// we do not replace the inode, just replace the last block of oldnode
BlockInfo info = namesystem.getBlockManager().addBlockCollection(
ucBlock, oldnode);
oldnode.setBlock(oldnode.numBlocks() - 1, info);
}
if (!inSnapshot) {
fsDir.replaceINodeFile(path, oldnode, cons);
namesystem.leaseManager.addLease(cons.getClientName(), path);
} else {
if (parentRef != null) {
// replace oldnode with cons
parentRef.setReferredINode(cons);
} else {
// replace old node in its parent's children list and deleted list
oldnode.getParent().replaceChildFileInSnapshot(oldnode, cons);
namesystem.dir.addToInodeMap(cons);
updateBlocksMap(cons);
}
namesystem.leaseManager.addLease(cons
.getFileUnderConstructionFeature().getClientName(), path);
}
}
}
@ -955,8 +938,8 @@ public class FSImageFormat {
private MD5Hash savedDigest;
private final ReferenceMap referenceMap = new ReferenceMap();
private final Map<Long, INodeFileUnderConstruction> snapshotUCMap =
new HashMap<Long, INodeFileUnderConstruction>();
private final Map<Long, INodeFile> snapshotUCMap =
new HashMap<Long, INodeFile>();
/** @throws IllegalStateException if the instance has not yet saved an image */
private void checkSaved() {
@ -1096,8 +1079,7 @@ public class FSImageFormat {
dirNum++;
} else if (inSnapshot && child.isFile()
&& child.asFile().isUnderConstruction()) {
this.snapshotUCMap.put(child.getId(),
(INodeFileUnderConstruction) child.asFile());
this.snapshotUCMap.put(child.getId(), child.asFile());
}
if (i++ % 50 == 0) {
context.checkCancelled();

View File

@ -108,7 +108,7 @@ public class FSImageSerialization {
// Helper function that reads in an INodeUnderConstruction
// from the input stream
//
static INodeFileUnderConstruction readINodeUnderConstruction(
static INodeFile readINodeUnderConstruction(
DataInput in, FSNamesystem fsNamesys, int imgVersion)
throws IOException {
byte[] name = readBytes(in);
@ -141,25 +141,17 @@ public class FSImageSerialization {
int numLocs = in.readInt();
assert numLocs == 0 : "Unexpected block locations";
return new INodeFileUnderConstruction(inodeId,
name,
blockReplication,
modificationTime,
preferredBlockSize,
blocks,
perm,
clientName,
clientMachine,
null);
INodeFile file = new INodeFile(inodeId, name, perm, modificationTime,
modificationTime, blocks, blockReplication, preferredBlockSize);
file.toUnderConstruction(clientName, clientMachine, null);
return file;
}
// Helper function that writes an INodeUnderConstruction
// into the input stream
//
static void writeINodeUnderConstruction(DataOutputStream out,
INodeFileUnderConstruction cons,
String path)
throws IOException {
static void writeINodeUnderConstruction(DataOutputStream out, INodeFile cons,
String path) throws IOException {
writeString(path, out);
out.writeLong(cons.getId());
out.writeShort(cons.getFileReplication());
@ -169,8 +161,9 @@ public class FSImageSerialization {
writeBlocks(cons.getBlocks(), out);
cons.getPermissionStatus().write(out);
writeString(cons.getClientName(), out);
writeString(cons.getClientMachine(), out);
FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();
writeString(uc.getClientName(), out);
writeString(uc.getClientMachine(), out);
out.writeInt(0); // do not store locations of last block
}
@ -194,9 +187,9 @@ public class FSImageSerialization {
SnapshotFSImageFormat.saveFileDiffList(file, out);
if (writeUnderConstruction) {
if (file instanceof INodeFileUnderConstruction) {
if (file.isUnderConstruction()) {
out.writeBoolean(true);
final INodeFileUnderConstruction uc = (INodeFileUnderConstruction)file;
final FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
writeString(uc.getClientName(), out);
writeString(uc.getClientMachine(), out);
} else {

View File

@ -2203,13 +2203,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final DatanodeDescriptor clientNode =
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
replication, blockSize, holder, clientMachine, clientNode);
INodeFile newNode = dir.addFile(src, permissions, replication, blockSize,
holder, clientMachine, clientNode);
if (newNode == null) {
throw new IOException("DIR* NameSystem.startFile: " +
"Unable to add file to namespace.");
}
leaseManager.addLease(newNode.getClientName(), src);
leaseManager.addLease(newNode.getFileUnderConstructionFeature()
.getClientName(), src);
// record file record in log, record new generation stamp
getEditLog().logOpenFile(src, newNode, logRetryEntry);
@ -2301,11 +2302,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean writeToEditLog, Snapshot latestSnapshot, boolean logRetryCache)
throws IOException {
file = file.recordModification(latestSnapshot, dir.getINodeMap());
final INodeFileUnderConstruction cons = file.toUnderConstruction(
leaseHolder, clientMachine, clientNode);
final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine,
clientNode);
dir.replaceINodeFile(src, file, cons);
leaseManager.addLease(cons.getClientName(), src);
leaseManager.addLease(cons.getFileUnderConstructionFeature()
.getClientName(), src);
LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
if (writeToEditLog) {
@ -2368,7 +2369,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException {
assert hasWriteLock();
if (fileInode != null && fileInode.isUnderConstruction()) {
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
//
// If the file is under construction , then it must be in our
// leases. Find the appropriate lease record.
@ -2391,7 +2391,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
//
// Find the original holder.
//
lease = leaseManager.getLease(pendingFile.getClientName());
FileUnderConstructionFeature uc = fileInode.getFileUnderConstructionFeature();
String clientName = uc.getClientName();
lease = leaseManager.getLease(clientName);
if (lease == null) {
throw new AlreadyBeingCreatedException(
"failed to create file " + src + " for " + holder +
@ -2402,26 +2404,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// close now: no need to wait for soft lease expiration and
// close only the file src
LOG.info("recoverLease: " + lease + ", src=" + src +
" from client " + pendingFile.getClientName());
" from client " + clientName);
internalReleaseLease(lease, src, holder);
} else {
assert lease.getHolder().equals(pendingFile.getClientName()) :
assert lease.getHolder().equals(clientName) :
"Current lease holder " + lease.getHolder() +
" does not match file creator " + pendingFile.getClientName();
" does not match file creator " + clientName;
//
// If the original holder has not renewed in the last SOFTLIMIT
// period, then start lease recovery.
//
if (lease.expiredSoftLimit()) {
LOG.info("startFile: recover " + lease + ", src=" + src + " client "
+ pendingFile.getClientName());
+ clientName);
boolean isClosed = internalReleaseLease(lease, src, null);
if(!isClosed)
throw new RecoveryInProgressException(
"Failed to close file " + src +
". Lease recovery is in progress. Try again later.");
} else {
final BlockInfo lastBlock = pendingFile.getLastBlock();
final BlockInfo lastBlock = fileInode.getLastBlock();
if (lastBlock != null
&& lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
throw new RecoveryInProgressException("Recovery in progress, file ["
@ -2430,8 +2432,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new AlreadyBeingCreatedException("Failed to create file ["
+ src + "] for [" + holder + "] on client [" + clientMachine
+ "], because this file is already being created by ["
+ pendingFile.getClientName() + "] on ["
+ pendingFile.getClientMachine() + "]");
+ clientName + "] on ["
+ uc.getClientMachine() + "]");
}
}
}
@ -2561,8 +2563,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
final INode[] inodes = analyzeFileState(
src, fileId, clientName, previous, onRetryBlock).getINodes();
final INodeFileUnderConstruction pendingFile =
(INodeFileUnderConstruction) inodes[inodes.length - 1].asFile();
final INodeFile pendingFile = inodes[inodes.length - 1].asFile();
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
// This is a retry. Just return the last block if having locations.
@ -2575,7 +2576,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ maxBlocksPerFile);
}
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
clientNode = pendingFile.getFileUnderConstructionFeature().getClientNode();
replication = pendingFile.getFileReplication();
} finally {
readUnlock();
@ -2599,8 +2600,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
INodesInPath inodesInPath =
analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
INode[] inodes = inodesInPath.getINodes();
final INodeFileUnderConstruction pendingFile =
(INodeFileUnderConstruction) inodes[inodes.length - 1].asFile();
final INodeFile pendingFile = inodes[inodes.length - 1].asFile();
if (onRetryBlock[0] != null) {
if (onRetryBlock[0].getLocations().length > 0) {
@ -2655,7 +2655,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
Block previousBlock = ExtendedBlock.getLocalBlock(previous);
final INodesInPath iip = dir.getINodesInPath4Write(src);
final INodeFileUnderConstruction pendingFile
final INodeFile pendingFile
= checkLease(src, fileId, clientName, iip.getLastINode());
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
@ -2761,8 +2761,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
src = FSDirectory.resolvePath(src, pathComponents, dir);
//check lease
final INodeFileUnderConstruction file = checkLease(src, clientName);
clientnode = file.getClientNode();
final INodeFile file = checkLease(src, clientName);
clientnode = file.getFileUnderConstructionFeature().getClientNode();
preferredblocksize = file.getPreferredBlockSize();
//find datanode storages
@ -2803,7 +2803,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
//
// Remove the block from the pending creates list
//
INodeFileUnderConstruction file = checkLease(src, holder);
INodeFile file = checkLease(src, holder);
boolean removed = dir.removeBlock(src, file,
ExtendedBlock.getLocalBlock(b));
if (!removed) {
@ -2825,16 +2825,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
/** make sure that we still have the lease on this file. */
private INodeFileUnderConstruction checkLease(String src, String holder)
private INodeFile checkLease(String src, String holder)
throws LeaseExpiredException, UnresolvedLinkException,
FileNotFoundException {
return checkLease(src, INodeId.GRANDFATHER_INODE_ID, holder,
dir.getINode(src));
}
private INodeFileUnderConstruction checkLease(String src, long fileId,
String holder, INode inode) throws LeaseExpiredException,
FileNotFoundException {
private INodeFile checkLease(String src, long fileId, String holder,
INode inode) throws LeaseExpiredException, FileNotFoundException {
assert hasReadLock();
if (inode == null || !inode.isFile()) {
Lease lease = leaseManager.getLease(holder);
@ -2851,13 +2850,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ (lease != null ? lease.toString()
: "Holder " + holder + " does not have any open files."));
}
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
if (holder != null && !pendingFile.getClientName().equals(holder)) {
String clientName = file.getFileUnderConstructionFeature().getClientName();
if (holder != null && !clientName.equals(holder)) {
throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
+ pendingFile.getClientName() + " but is accessed by " + holder);
+ clientName + " but is accessed by " + holder);
}
INodeId.checkId(fileId, pendingFile);
return pendingFile;
INodeId.checkId(fileId, file);
return file;
}
/**
@ -2898,7 +2897,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
UnresolvedLinkException, IOException {
assert hasWriteLock();
final INodesInPath iip = dir.getLastINodeInPath(src);
final INodeFileUnderConstruction pendingFile;
final INodeFile pendingFile;
try {
pendingFile = checkLease(src, fileId, holder, iip.getINode(0));
} catch (LeaseExpiredException lee) {
@ -3588,9 +3587,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot fsync file " + src);
src = FSDirectory.resolvePath(src, pathComponents, dir);
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
INodeFile pendingFile = checkLease(src, clientName);
if (lastBlockLength > 0) {
pendingFile.updateLengthOfLastBlock(lastBlockLength);
pendingFile.getFileUnderConstructionFeature().updateLengthOfLastBlock(
pendingFile, lastBlockLength);
}
dir.persistBlocks(src, pendingFile, false);
} finally {
@ -3621,8 +3621,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
assert hasWriteLock();
final INodesInPath iip = dir.getLastINodeInPath(src);
final INodeFileUnderConstruction pendingFile
= INodeFileUnderConstruction.valueOf(iip.getINode(0), src);
final INodeFile pendingFile = iip.getINode(0).asFile();
int nrBlocks = pendingFile.numBlocks();
BlockInfo[] blocks = pendingFile.getBlocks();
@ -3744,7 +3743,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
private Lease reassignLease(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) {
INodeFile pendingFile) {
assert hasWriteLock();
if(newHolder == null)
return lease;
@ -3754,15 +3753,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) {
INodeFile pendingFile) {
assert hasWriteLock();
pendingFile.setClientName(newHolder);
pendingFile.getFileUnderConstructionFeature().setClientName(newHolder);
return leaseManager.reassignLease(lease, src, newHolder);
}
private void commitOrCompleteLastBlock(final INodeFileUnderConstruction fileINode,
private void commitOrCompleteLastBlock(final INodeFile fileINode,
final Block commitBlock) throws IOException {
assert hasWriteLock();
Preconditions.checkArgument(fileINode.isUnderConstruction());
if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
return;
}
@ -3779,19 +3779,21 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
private void finalizeINodeFileUnderConstruction(String src,
INodeFileUnderConstruction pendingFile, Snapshot latestSnapshot)
throws IOException, UnresolvedLinkException {
private void finalizeINodeFileUnderConstruction(String src,
INodeFile pendingFile, Snapshot latestSnapshot) throws IOException,
UnresolvedLinkException {
assert hasWriteLock();
leaseManager.removeLease(pendingFile.getClientName(), src);
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
Preconditions.checkArgument(uc != null);
leaseManager.removeLease(uc.getClientName(), src);
pendingFile = pendingFile.recordModification(latestSnapshot,
dir.getINodeMap());
// The file is no longer pending.
// Create permanent INode, update blocks
final INodeFile newFile = pendingFile.toINodeFile(now());
dir.replaceINodeFile(src, pendingFile, newFile);
// Create permanent INode, update blocks. No need to replace the inode here
// since we just remove the uc feature from pendingFile
final INodeFile newFile = pendingFile.toCompleteFile(now());
// close file and persist block allocations for this file
dir.closeFile(src, newFile);
@ -3808,12 +3810,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
public boolean isInSnapshot(BlockInfoUnderConstruction blockUC) {
assert hasReadLock();
final BlockCollection bc = blockUC.getBlockCollection();
if (bc == null || !(bc instanceof INodeFileUnderConstruction)) {
if (bc == null || !(bc instanceof INodeFile)
|| !((INodeFile) bc).isUnderConstruction()) {
return false;
}
INodeFileUnderConstruction inodeUC = (INodeFileUnderConstruction) blockUC
.getBlockCollection();
INodeFile inodeUC = (INodeFile) bc;
String fullName = inodeUC.getName();
try {
if (fullName != null && fullName.startsWith(Path.SEPARATOR)
@ -3891,11 +3893,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ recoveryId + " for block " + lastblock);
}
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
if (deleteblock) {
Block blockToDel = ExtendedBlock.getLocalBlock(lastblock);
boolean remove = pendingFile.removeLastBlock(blockToDel);
boolean remove = iFile.removeLastBlock(blockToDel);
if (remove) {
blockManager.removeBlockFromMap(storedBlock);
}
@ -3940,14 +3940,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
blockManager.getDatanodeManager().getDatanodeStorageInfos(
trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
trimmedStorages.toArray(new String[trimmedStorages.size()]));
pendingFile.setLastBlock(storedBlock, trimmedStorageInfos);
iFile.setLastBlock(storedBlock, trimmedStorageInfos);
}
if (closeFile) {
src = closeFileCommitBlocks(pendingFile, storedBlock);
src = closeFileCommitBlocks(iFile, storedBlock);
} else {
// If this commit does not want to close the file, persist blocks
src = persistBlocks(pendingFile, false);
src = persistBlocks(iFile, false);
}
} finally {
writeUnlock();
@ -3972,10 +3972,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException
*/
@VisibleForTesting
String closeFileCommitBlocks(INodeFileUnderConstruction pendingFile,
BlockInfo storedBlock)
String closeFileCommitBlocks(INodeFile pendingFile, BlockInfo storedBlock)
throws IOException {
String src = leaseManager.findPath(pendingFile);
// commit the last block and complete it if it has minimum replicas
@ -3983,7 +3981,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
//remove lease, close file
finalizeINodeFileUnderConstruction(src, pendingFile,
Snapshot.findLatestSnapshot(pendingFile, null));
Snapshot.findLatestSnapshot(pendingFile, null));
return src;
}
@ -3996,8 +3994,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException
*/
@VisibleForTesting
String persistBlocks(INodeFileUnderConstruction pendingFile,
boolean logRetryCache) throws IOException {
String persistBlocks(INodeFile pendingFile, boolean logRetryCache)
throws IOException {
String src = leaseManager.findPath(pendingFile);
dir.persistBlocks(src, pendingFile, logRetryCache);
return src;
@ -5182,13 +5180,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
for (Lease lease : leaseManager.getSortedLeases()) {
for (String path : lease.getPaths()) {
final INodeFileUnderConstruction cons;
final INodeFile cons;
try {
cons = INodeFileUnderConstruction.valueOf(dir.getINode(path), path);
cons = dir.getINode(path).asFile();
Preconditions.checkState(cons.isUnderConstruction());
} catch (UnresolvedLinkException e) {
throw new AssertionError("Lease files should reside on this FS");
} catch (IOException e) {
throw new RuntimeException(e);
}
BlockInfo[] blocks = cons.getBlocks();
if(blocks == null)
@ -5764,7 +5761,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return blockId;
}
private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
private INodeFile checkUCBlock(ExtendedBlock block,
String clientName) throws IOException {
assert hasWriteLock();
checkNameNodeSafeMode("Cannot get a new generation stamp and an "
@ -5780,19 +5777,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// check file inode
final INodeFile file = ((INode)storedBlock.getBlockCollection()).asFile();
if (file==null || !file.isUnderConstruction()) {
if (file == null || !file.isUnderConstruction()) {
throw new IOException("The file " + storedBlock +
" belonged to does not exist or it is not under construction.");
}
// check lease
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
if (clientName == null || !clientName.equals(pendingFile.getClientName())) {
if (clientName == null
|| !clientName.equals(file.getFileUnderConstructionFeature()
.getClientName())) {
throw new LeaseExpiredException("Lease mismatch: " + block +
" is accessed by a non lease holder " + clientName);
}
return pendingFile;
return file;
}
/**
@ -5903,8 +5901,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException {
assert hasWriteLock();
// check the vadility of the block and lease holder name
final INodeFileUnderConstruction pendingFile
= checkUCBlock(oldBlock, clientName);
final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
final BlockInfoUnderConstruction blockinfo
= (BlockInfoUnderConstruction)pendingFile.getLastBlock();
@ -5942,15 +5939,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Serializes leases.
*/
void saveFilesUnderConstruction(DataOutputStream out,
Map<Long, INodeFileUnderConstruction> snapshotUCMap) throws IOException {
Map<Long, INodeFile> snapshotUCMap) throws IOException {
// This is run by an inferior thread of saveNamespace, which holds a read
// lock on our behalf. If we took the read lock here, we could block
// for fairness if a writer is waiting on the lock.
synchronized (leaseManager) {
Map<String, INodeFileUnderConstruction> nodes =
leaseManager.getINodesUnderConstruction();
for (Map.Entry<String, INodeFileUnderConstruction> entry
: nodes.entrySet()) {
Map<String, INodeFile> nodes = leaseManager.getINodesUnderConstruction();
for (Map.Entry<String, INodeFile> entry : nodes.entrySet()) {
// TODO: for HDFS-5428, because of rename operations, some
// under-construction files that are
// in the current fs directory can also be captured in the
@ -5959,13 +5954,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
out.writeInt(nodes.size() + snapshotUCMap.size()); // write the size
for (Map.Entry<String, INodeFileUnderConstruction> entry
: nodes.entrySet()) {
for (Map.Entry<String, INodeFile> entry : nodes.entrySet()) {
FSImageSerialization.writeINodeUnderConstruction(
out, entry.getValue(), entry.getKey());
}
for (Map.Entry<Long, INodeFileUnderConstruction> entry
: snapshotUCMap.entrySet()) {
for (Map.Entry<Long, INodeFile> entry : snapshotUCMap.entrySet()) {
// for those snapshot INodeFileUC, we use "/.reserved/.inodes/<inodeid>"
// as their paths
StringBuilder b = new StringBuilder();

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
/**
* I-node for file being written.
*/
@InterfaceAudience.Private
public class FileUnderConstructionFeature extends INodeFile.Feature {
private String clientName; // lease holder
private final String clientMachine;
// if client is a cluster node too.
private final DatanodeDescriptor clientNode;
public FileUnderConstructionFeature(final String clientName,
final String clientMachine,
final DatanodeDescriptor clientNode) {
this.clientName = clientName;
this.clientMachine = clientMachine;
this.clientNode = clientNode;
}
public String getClientName() {
return clientName;
}
void setClientName(String clientName) {
this.clientName = clientName;
}
public String getClientMachine() {
return clientMachine;
}
public DatanodeDescriptor getClientNode() {
return clientNode;
}
/**
* Update the length for the last block
*
* @param lastBlockLength
* The length of the last block reported from client
* @throws IOException
*/
void updateLengthOfLastBlock(INodeFile f, long lastBlockLength)
throws IOException {
BlockInfo lastBlock = f.getLastBlock();
assert (lastBlock != null) : "The last block for path "
+ f.getFullPathName() + " is null when updating its length";
assert (lastBlock instanceof BlockInfoUnderConstruction)
: "The last block for path " + f.getFullPathName()
+ " is not a BlockInfoUnderConstruction when updating its length";
lastBlock.setNumBytes(lastBlockLength);
}
/**
* When deleting a file in the current fs directory, and the file is contained
* in a snapshot, we should delete the last block if it's under construction
* and its size is 0.
*/
void cleanZeroSizeBlock(final INodeFile f,
final BlocksMapUpdateInfo collectedBlocks) {
final BlockInfo[] blocks = f.getBlocks();
if (blocks != null && blocks.length > 0
&& blocks[blocks.length - 1] instanceof BlockInfoUnderConstruction) {
BlockInfoUnderConstruction lastUC =
(BlockInfoUnderConstruction) blocks[blocks.length - 1];
if (lastUC.getNumBytes() == 0) {
// this is a 0-sized block. do not need check its UC state here
collectedBlocks.addDeleteBlock(lastUC);
f.removeLastBlock(lastUC);
}
}
}
}

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
@ -205,23 +204,6 @@ public class INodeDirectory extends INodeWithAdditionalFields
return newDir;
}
/**
* Used when load fileUC from fsimage. The file to be replaced is actually
* only in snapshot, thus may not be contained in the children list.
* See HDFS-5428 for details.
*/
public void replaceChildFileInSnapshot(INodeFile oldChild,
final INodeFile newChild) {
if (children != null) {
final int i = searchChildren(newChild.getLocalNameBytes());
if (i >= 0 && children.get(i).getId() == oldChild.getId()) {
// no need to consider reference node here, since we already do the
// replacement in FSImageFormat.Loader#loadFilesUnderConstruction
children.set(i, newChild);
}
}
}
/** Replace the given child with a new child. */
public void replaceChild(INode oldChild, final INode newChild,
final INodeMap inodeMap) {
@ -291,17 +273,6 @@ public class INodeDirectory extends INodeWithAdditionalFields
return newChild;
}
/** Replace a child {@link INodeFile} with an {@link INodeFileUnderConstructionWithSnapshot}. */
INodeFileUnderConstructionWithSnapshot replaceChild4INodeFileUcWithSnapshot(
final INodeFileUnderConstruction child, final INodeMap inodeMap) {
Preconditions.checkArgument(!(child instanceof INodeFileUnderConstructionWithSnapshot),
"Child file is already an INodeFileUnderConstructionWithSnapshot, child=" + child);
final INodeFileUnderConstructionWithSnapshot newChild
= new INodeFileUnderConstructionWithSnapshot(child, null);
replaceChildFile(child, newChild, inodeMap);
return newChild;
}
@Override
public INodeDirectory recordModification(Snapshot latest,
final INodeMap inodeMap) throws QuotaExceededException {

View File

@ -20,15 +20,15 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.*;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
@ -43,6 +43,22 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private
public class INodeFile extends INodeWithAdditionalFields
implements INodeFileAttributes, BlockCollection {
/**
* A feature contains specific information for a type of INodeFile. E.g.,
* we can have separate features for Under-Construction and Snapshot.
*/
public static abstract class Feature {
private Feature nextFeature;
public Feature getNextFeature() {
return nextFeature;
}
public void setNextFeature(Feature next) {
this.nextFeature = next;
}
}
/** The same as valueOf(inode, path, false). */
public static INodeFile valueOf(INode inode, String path
) throws FileNotFoundException {
@ -104,8 +120,11 @@ public class INodeFile extends INodeWithAdditionalFields
private BlockInfo[] blocks;
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime,
BlockInfo[] blklist, short replication, long preferredBlockSize) {
private Feature headFeature;
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
long atime, BlockInfo[] blklist, short replication,
long preferredBlockSize) {
super(id, name, permissions, mtime, atime);
header = HeaderFormat.combineReplication(header, replication);
header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize);
@ -116,6 +135,48 @@ public class INodeFile extends INodeWithAdditionalFields
super(that);
this.header = that.header;
this.blocks = that.blocks;
this.headFeature = that.headFeature;
}
/**
* If the inode contains a {@link FileUnderConstructionFeature}, return it;
* otherwise, return null.
*/
public final FileUnderConstructionFeature getFileUnderConstructionFeature() {
for (Feature f = this.headFeature; f != null; f = f.nextFeature) {
if (f instanceof FileUnderConstructionFeature) {
return (FileUnderConstructionFeature) f;
}
}
return null;
}
/** Is this file under construction? */
@Override // BlockCollection
public boolean isUnderConstruction() {
return getFileUnderConstructionFeature() != null;
}
void addFeature(Feature f) {
f.nextFeature = headFeature;
headFeature = f;
}
void removeFeature(Feature f) {
if (f == headFeature) {
headFeature = headFeature.nextFeature;
return;
} else if (headFeature != null) {
Feature prev = headFeature;
Feature curr = headFeature.nextFeature;
for (; curr != null && curr != f; prev = curr, curr = curr.nextFeature)
;
if (curr != null) {
prev.nextFeature = curr.nextFeature;
return;
}
}
throw new IllegalStateException("Feature " + f + " not found.");
}
/** @return true unconditionally. */
@ -130,22 +191,88 @@ public class INodeFile extends INodeWithAdditionalFields
return this;
}
/** Is this file under construction? */
public boolean isUnderConstruction() {
return false;
}
/* Start of Under-Construction Feature */
/** Convert this file to an {@link INodeFileUnderConstruction}. */
public INodeFileUnderConstruction toUnderConstruction(
String clientName,
String clientMachine,
public INodeFile toUnderConstruction(String clientName, String clientMachine,
DatanodeDescriptor clientNode) {
Preconditions.checkState(!isUnderConstruction(),
"file is already an INodeFileUnderConstruction");
return new INodeFileUnderConstruction(this,
clientName, clientMachine, clientNode);
FileUnderConstructionFeature uc = new FileUnderConstructionFeature(
clientName, clientMachine, clientNode);
addFeature(uc);
return this;
}
/**
* Convert the file to a complete file, i.e., to remove the Under-Construction
* feature.
*/
public INodeFile toCompleteFile(long mtime) {
FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
if (uc != null) {
assertAllBlocksComplete();
removeFeature(uc);
this.setModificationTime(mtime);
}
return this;
}
/** Assert all blocks are complete. */
private void assertAllBlocksComplete() {
if (blocks == null) {
return;
}
for (int i = 0; i < blocks.length; i++) {
Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
+ " %s %s since blocks[%s] is non-complete, where blocks=%s.",
getClass().getSimpleName(), this, i, Arrays.asList(blocks));
}
}
@Override //BlockCollection
public void setBlock(int index, BlockInfo blk) {
this.blocks[index] = blk;
}
@Override // BlockCollection
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
DatanodeStorageInfo[] locations) throws IOException {
Preconditions.checkState(isUnderConstruction());
if (numBlocks() == 0) {
throw new IOException("Failed to set last block: File is empty.");
}
BlockInfoUnderConstruction ucBlock =
lastBlock.convertToBlockUnderConstruction(
BlockUCState.UNDER_CONSTRUCTION, locations);
ucBlock.setBlockCollection(this);
setBlock(numBlocks() - 1, ucBlock);
return ucBlock;
}
/**
* Remove a block from the block list. This block should be
* the last one on the list.
*/
boolean removeLastBlock(Block oldblock) {
if (blocks == null || blocks.length == 0) {
return false;
}
int size_1 = blocks.length - 1;
if (!blocks[size_1].equals(oldblock)) {
return false;
}
//copy to a new list
BlockInfo[] newlist = new BlockInfo[size_1];
System.arraycopy(blocks, 0, newlist, 0, size_1);
setBlocks(newlist);
return true;
}
/* End of Under-Construction Feature */
@Override
public INodeFileAttributes getSnapshotINode(final Snapshot snapshot) {
return this;
@ -266,11 +393,6 @@ public class INodeFile extends INodeWithAdditionalFields
}
}
/** Set the block of the file at the given index. */
public void setBlock(int idx, BlockInfo blk) {
this.blocks[idx] = blk;
}
/** Set the blocks. */
public void setBlocks(BlockInfo[] blocks) {
this.blocks = blocks;
@ -286,6 +408,11 @@ public class INodeFile extends INodeWithAdditionalFields
// this only happens when deleting the current file
computeQuotaUsage(counts, false);
destroyAndCollectBlocks(collectedBlocks, removedINodes);
} else if (snapshot == null && prior != null) {
FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
if (uc != null) {
uc.cleanZeroSizeBlock(this, collectedBlocks);
}
}
return counts;
}

View File

@ -1,248 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.MutableBlockCollection;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.Quota.Counts;
import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import com.google.common.base.Preconditions;
/**
* I-node for file being written.
*/
@InterfaceAudience.Private
public class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection {
/** Cast INode to INodeFileUnderConstruction. */
public static INodeFileUnderConstruction valueOf(INode inode, String path
) throws FileNotFoundException {
final INodeFile file = INodeFile.valueOf(inode, path);
if (!file.isUnderConstruction()) {
throw new FileNotFoundException("File is not under construction: " + path);
}
return (INodeFileUnderConstruction)file;
}
private String clientName; // lease holder
private final String clientMachine;
private final DatanodeDescriptor clientNode; // if client is a cluster node too.
INodeFileUnderConstruction(long id,
PermissionStatus permissions,
short replication,
long preferredBlockSize,
long modTime,
String clientName,
String clientMachine,
DatanodeDescriptor clientNode) {
this(id, null, replication, modTime, preferredBlockSize, BlockInfo.EMPTY_ARRAY,
permissions, clientName, clientMachine, clientNode);
}
INodeFileUnderConstruction(long id,
byte[] name,
short blockReplication,
long modificationTime,
long preferredBlockSize,
BlockInfo[] blocks,
PermissionStatus perm,
String clientName,
String clientMachine,
DatanodeDescriptor clientNode) {
super(id, name, perm, modificationTime, modificationTime,
blocks, blockReplication, preferredBlockSize);
this.clientName = clientName;
this.clientMachine = clientMachine;
this.clientNode = clientNode;
}
public INodeFileUnderConstruction(final INodeFile that,
final String clientName,
final String clientMachine,
final DatanodeDescriptor clientNode) {
super(that);
this.clientName = clientName;
this.clientMachine = clientMachine;
this.clientNode = clientNode;
}
public String getClientName() {
return clientName;
}
void setClientName(String clientName) {
this.clientName = clientName;
}
public String getClientMachine() {
return clientMachine;
}
public DatanodeDescriptor getClientNode() {
return clientNode;
}
/** @return true unconditionally. */
@Override
public final boolean isUnderConstruction() {
return true;
}
/**
* Converts an INodeFileUnderConstruction to an INodeFile.
* The original modification time is used as the access time.
* The new modification is the specified mtime.
*/
protected INodeFile toINodeFile(long mtime) {
assertAllBlocksComplete();
final INodeFile f = new INodeFile(getId(), getLocalNameBytes(),
getPermissionStatus(), mtime, getModificationTime(),
getBlocks(), getFileReplication(), getPreferredBlockSize());
f.setParent(getParent());
return f;
}
@Override
public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior,
final BlocksMapUpdateInfo collectedBlocks,
final List<INode> removedINodes, final boolean countDiffChange)
throws QuotaExceededException {
if (snapshot == null && prior != null) {
cleanZeroSizeBlock(collectedBlocks);
return Counts.newInstance();
} else {
return super.cleanSubtree(snapshot, prior, collectedBlocks,
removedINodes, countDiffChange);
}
}
/**
* When deleting a file in the current fs directory, and the file is contained
* in a snapshot, we should delete the last block if it's under construction
* and its size is 0.
*/
private void cleanZeroSizeBlock(final BlocksMapUpdateInfo collectedBlocks) {
final BlockInfo[] blocks = getBlocks();
if (blocks != null && blocks.length > 0
&& blocks[blocks.length - 1] instanceof BlockInfoUnderConstruction) {
BlockInfoUnderConstruction lastUC =
(BlockInfoUnderConstruction) blocks[blocks.length - 1];
if (lastUC.getNumBytes() == 0) {
// this is a 0-sized block. do not need check its UC state here
collectedBlocks.addDeleteBlock(lastUC);
removeLastBlock(lastUC);
}
}
}
@Override
public INodeFileUnderConstruction recordModification(final Snapshot latest,
final INodeMap inodeMap) throws QuotaExceededException {
if (isInLatestSnapshot(latest)) {
INodeFileUnderConstructionWithSnapshot newFile = getParent()
.replaceChild4INodeFileUcWithSnapshot(this, inodeMap)
.recordModification(latest, inodeMap);
return newFile;
} else {
return this;
}
}
/** Assert all blocks are complete. */
protected void assertAllBlocksComplete() {
final BlockInfo[] blocks = getBlocks();
for (int i = 0; i < blocks.length; i++) {
Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize"
+ " %s %s since blocks[%s] is non-complete, where blocks=%s.",
getClass().getSimpleName(), this, i, Arrays.asList(getBlocks()));
}
}
/**
* Remove a block from the block list. This block should be
* the last one on the list.
*/
boolean removeLastBlock(Block oldblock) {
final BlockInfo[] blocks = getBlocks();
if (blocks == null || blocks.length == 0) {
return false;
}
int size_1 = blocks.length - 1;
if (!blocks[size_1].equals(oldblock)) {
return false;
}
//copy to a new list
BlockInfo[] newlist = new BlockInfo[size_1];
System.arraycopy(blocks, 0, newlist, 0, size_1);
setBlocks(newlist);
return true;
}
/**
* Convert the last block of the file to an under-construction block.
* Set its locations.
*/
@Override
public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
DatanodeStorageInfo[] targets) throws IOException {
if (numBlocks() == 0) {
throw new IOException("Failed to set last block: File is empty.");
}
BlockInfoUnderConstruction ucBlock =
lastBlock.convertToBlockUnderConstruction(
BlockUCState.UNDER_CONSTRUCTION, targets);
ucBlock.setBlockCollection(this);
setBlock(numBlocks()-1, ucBlock);
return ucBlock;
}
/**
* Update the length for the last block
*
* @param lastBlockLength
* The length of the last block reported from client
* @throws IOException
*/
void updateLengthOfLastBlock(long lastBlockLength) throws IOException {
BlockInfo lastBlock = this.getLastBlock();
assert (lastBlock != null) : "The last block for path "
+ this.getFullPathName() + " is null when updating its length";
assert (lastBlock instanceof BlockInfoUnderConstruction) : "The last block for path "
+ this.getFullPathName()
+ " is not a BlockInfoUnderConstruction when updating its length";
lastBlock.setNumBytes(lastBlockLength);
}
}

View File

@ -182,9 +182,11 @@ public class LeaseManager {
/**
* Finds the pathname for the specified pendingFile
*/
public synchronized String findPath(INodeFileUnderConstruction pendingFile)
public synchronized String findPath(INodeFile pendingFile)
throws IOException {
Lease lease = getLease(pendingFile.getClientName());
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
Preconditions.checkArgument(uc != null);
Lease lease = getLease(uc.getClientName());
if (lease != null) {
String src = lease.findPath(pendingFile);
if (src != null) {
@ -253,7 +255,7 @@ public class LeaseManager {
/**
* @return the path associated with the pendingFile and null if not found.
*/
private String findPath(INodeFileUnderConstruction pendingFile) {
private String findPath(INodeFile pendingFile) {
try {
for (String src : paths) {
INode node = fsnamesystem.dir.getINode(src);
@ -433,14 +435,14 @@ public class LeaseManager {
* @return list of inodes
* @throws UnresolvedLinkException
*/
Map<String, INodeFileUnderConstruction> getINodesUnderConstruction() {
Map<String, INodeFileUnderConstruction> inodes =
new TreeMap<String, INodeFileUnderConstruction>();
Map<String, INodeFile> getINodesUnderConstruction() {
Map<String, INodeFile> inodes = new TreeMap<String, INodeFile>();
for (String p : sortedLeasesByPath.keySet()) {
// verify that path exists in namespace
try {
INode node = fsnamesystem.dir.getINode(p);
inodes.put(p, INodeFileUnderConstruction.valueOf(node, p));
INodeFile node = INodeFile.valueOf(fsnamesystem.dir.getINode(p), p);
Preconditions.checkState(node.isUnderConstruction());
inodes.put(p, node);
} catch (IOException ioe) {
LOG.error(ioe);
}

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryAttributes;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeMap;
import org.apache.hadoop.hdfs.server.namenode.INodeReference;
import org.apache.hadoop.hdfs.server.namenode.Quota;
@ -593,14 +592,6 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
return removed;
}
@Override
public void replaceChildFileInSnapshot(final INodeFile oldChild,
final INodeFile newChild) {
super.replaceChildFileInSnapshot(oldChild, newChild);
diffs.replaceChild(ListType.DELETED, oldChild, newChild);
diffs.replaceChild(ListType.CREATED, oldChild, newChild);
}
@Override
public void replaceChild(final INode oldChild, final INode newChild,
final INodeMap inodeMap) {

View File

@ -1,130 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.INodeMap;
import org.apache.hadoop.hdfs.server.namenode.Quota;
/**
* Represent an {@link INodeFileUnderConstruction} that is snapshotted.
*/
@InterfaceAudience.Private
public class INodeFileUnderConstructionWithSnapshot
extends INodeFileUnderConstruction implements FileWithSnapshot {
private final FileDiffList diffs;
private boolean isCurrentFileDeleted = false;
INodeFileUnderConstructionWithSnapshot(final INodeFile f,
final String clientName,
final String clientMachine,
final DatanodeDescriptor clientNode,
final FileDiffList diffs) {
super(f, clientName, clientMachine, clientNode);
this.diffs = diffs != null? diffs: new FileDiffList();
}
/**
* Construct an {@link INodeFileUnderConstructionWithSnapshot} based on an
* {@link INodeFileUnderConstruction}.
*
* @param f The given {@link INodeFileUnderConstruction} instance
*/
public INodeFileUnderConstructionWithSnapshot(INodeFileUnderConstruction f,
final FileDiffList diffs) {
this(f, f.getClientName(), f.getClientMachine(), f.getClientNode(), diffs);
}
@Override
protected INodeFileWithSnapshot toINodeFile(final long mtime) {
assertAllBlocksComplete();
final long atime = getModificationTime();
final INodeFileWithSnapshot f = new INodeFileWithSnapshot(this, getDiffs());
f.setModificationTime(mtime);
f.setAccessTime(atime);
return f;
}
@Override
public boolean isCurrentFileDeleted() {
return isCurrentFileDeleted;
}
@Override
public void deleteCurrentFile() {
isCurrentFileDeleted = true;
}
@Override
public INodeFileAttributes getSnapshotINode(Snapshot snapshot) {
return diffs.getSnapshotINode(snapshot, this);
}
@Override
public INodeFileUnderConstructionWithSnapshot recordModification(
final Snapshot latest, final INodeMap inodeMap)
throws QuotaExceededException {
if (isInLatestSnapshot(latest) && !shouldRecordInSrcSnapshot(latest)) {
diffs.saveSelf2Snapshot(latest, this, null);
}
return this;
}
@Override
public INodeFile asINodeFile() {
return this;
}
@Override
public FileDiffList getDiffs() {
return diffs;
}
@Override
public Quota.Counts cleanSubtree(final Snapshot snapshot, Snapshot prior,
final BlocksMapUpdateInfo collectedBlocks,
final List<INode> removedINodes, final boolean countDiffChange)
throws QuotaExceededException {
if (snapshot == null) { // delete the current file
if (!isCurrentFileDeleted()) {
recordModification(prior, null);
deleteCurrentFile();
}
Util.collectBlocksAndClear(this, collectedBlocks, removedINodes);
return Quota.Counts.newInstance();
} else { // delete a snapshot
prior = getDiffs().updatePrior(snapshot, prior);
return diffs.deleteSnapshotDiff(snapshot, prior, this, collectedBlocks,
removedINodes, countDiffChange);
}
}
@Override
public String toDetailString() {
return super.toDetailString()
+ (isCurrentFileDeleted()? " (DELETED), ": ", ") + diffs;
}
}

View File

@ -21,7 +21,6 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
@ -47,15 +46,6 @@ public class INodeFileWithSnapshot extends INodeFile
this.diffs = diffs != null? diffs: new FileDiffList();
}
@Override
public INodeFileUnderConstructionWithSnapshot toUnderConstruction(
final String clientName,
final String clientMachine,
final DatanodeDescriptor clientNode) {
return new INodeFileUnderConstructionWithSnapshot(this,
clientName, clientMachine, clientNode, getDiffs());
}
@Override
public boolean isCurrentFileDeleted() {
return isCurrentFileDeleted;

View File

@ -101,7 +101,7 @@
<p>
{#fs}
{TotalLoad} files and directories, {BlocksTotal} blocks = {FilesTotal} total filesystem object(s).
{FilesTotal} files and directories, {BlocksTotal} blocks = {@math key="{FilesTotal}" method="add" operand="{BlocksTotal}"/} total filesystem object(s).
{#helper_fs_max_objects/}
{/fs}
</p>

View File

@ -1,8 +1,9 @@
(function(k){function n(b){b=b.f();return"object"===typeof b&&!0===b.h}function p(b,c){return"function"===typeof c?c.toString():c}function l(b,c,d,a,e){a=a||{};var m=d.a,g,f,l=a.d||"";if("undefined"!==typeof a.key)g=k.b.c(a.key,b,c);else if(n(c))g=c.f().i,c.f().g&&(e=function(){return!1});else return h.log("No key specified for filter in:"+l+" helper "),b;f=k.b.c(a.value,b,c);if(e(q(f,a.type,c),q(g,a.type,c))){n(c)&&(c.f().g=!0);if(m)return b.e(m,c);h.log("Missing body block in the "+l+" helper ")}else if(d["else"])return b.e(d["else"],
c);return b}function q(b,c,d){if(b)switch(c||typeof b){case "number":return+b;case "string":return String(b);case "boolean":return Boolean("false"===b?!1:b);case "date":return new Date(b);case "context":return d.get(b)}return b}var h="undefined"!==typeof console?console:{log:function(){}};k.b={tap:function(b,c,d){var a=b;"function"===typeof b&&(!0===b.l?a=b():(a="",c.c(function(b){a+=b;return""}).e(b,d).p(),""===a&&(a=!1)));return a},sep:function(b,c,d){return c.stack.index===c.stack.m-1?b:d.a?d.a(b,
c):b},idx:function(b,c,d){return d.a?d.a(b,c.push(c.stack.index)):b},contextDump:function(b,c,d,a){a=a||{};d=a.o||"output";a=a.key||"current";d=k.b.c(d,b,c);a=k.b.c(a,b,c);c="full"===a?JSON.stringify(c.stack,p,2):JSON.stringify(c.stack.head,p,2);return"console"===d?(h.log(c),b):b.write(c)},"if":function(b,c,d,a){var e=d.a,m=d["else"];if(a&&a.j){a=a.j;a=k.b.c(a,b,c);if(eval(a)){if(e)return b.e(d.a,c);h.log("Missing body block in the if helper!");return b}if(m)return b.e(d["else"],c)}else h.log("No condition given in the if helper!");
return b},math:function(b,c,d,a){if(a&&"undefined"!==typeof a.key&&a.method){var e=a.key,m=a.method,g=a.n;a=a.round;var f=null,e=k.b.c(e,b,c),g=k.b.c(g,b,c);switch(m){case "mod":0!==g&&-0!==g||h.log("operand for divide operation is 0/-0: expect Nan!");f=parseFloat(e)%parseFloat(g);break;case "add":f=parseFloat(e)+parseFloat(g);break;case "subtract":f=parseFloat(e)-parseFloat(g);break;case "multiply":f=parseFloat(e)*parseFloat(g);break;case "divide":0!==g&&-0!==g||h.log("operand for divide operation is 0/-0: expect Nan/Infinity!");
f=parseFloat(e)/parseFloat(g);break;case "ceil":f=Math.ceil(parseFloat(e));break;case "floor":f=Math.floor(parseFloat(e));break;case "round":f=Math.round(parseFloat(e));break;case "abs":f=Math.abs(parseFloat(e));break;default:h.log("method passed is not supported")}if(null!==f)return a&&(f=Math.round(f)),d&&d.a?b.e(d.a,c.push({h:!0,g:!1,i:f})):b.write(f)}else h.log("Key is a required parameter for math helper along with method/operand!");return b},select:function(b,c,d,a){var e=d.a;if(a&&"undefined"!==
typeof a.key){a=k.b.c(a.key,b,c);if(e)return b.e(d.a,c.push({h:!0,g:!1,i:a}));h.log("Missing body block in the select helper ")}else h.log("No key given in the select helper!");return b},eq:function(b,c,d,a){a&&(a.d="eq");return l(b,c,d,a,function(a,b){return b===a})},ne:function(b,c,d,a){return a?(a.d="ne",l(b,c,d,a,function(a,b){return b!==a})):b},lt:function(b,c,d,a){if(a)return a.d="lt",l(b,c,d,a,function(a,b){return b<a})},lte:function(b,c,d,a){return a?(a.d="lte",l(b,c,d,a,function(a,b){return b<=
a})):b},gt:function(b,c,d,a){return a?(a.d="gt",l(b,c,d,a,function(a,b){return b>a})):b},gte:function(b,c,d,a){return a?(a.d="gte",l(b,c,d,a,function(a,b){return b>=a})):b},"default":function(b,c,d,a){a&&(a.d="default");return l(b,c,d,a,function(){return!0})},size:function(b,c,d,a){c=0;var e;a=a||{};if((a=a.key)&&!0!==a)if(k.isArray(a))c=a.length;else if(!isNaN(parseFloat(a))&&isFinite(a))c=a;else if("object"===typeof a)for(e in c=0,a)Object.hasOwnProperty.call(a,e)&&c++;else c=(a+"").length;else c=
0;return b.write(c)}}})("undefined"!==typeof exports?module.k=require("dustjs-linkedin"):dust);
(function(k){function n(b){b=b.current();return"object"===typeof b&&!0===b.isSelect}function p(b,c){return"function"===typeof c?c.toString():c}function l(b,c,d,a,e){a=a||{};var m=d.block,g,f,l=a.filterOpType||"";if("undefined"!==typeof a.key)g=k.helpers.tap(a.key,b,c);else if(n(c))g=c.current().selectKey,c.current().isResolved&&(e=function(){return!1});else return h.log("No key specified for filter in:"+l+" helper "),b;f=k.helpers.tap(a.value,b,c);if(e(q(f,a.type,c),q(g,a.type,c))){n(c)&&(c.current().isResolved=
!0);if(m)return b.render(m,c);h.log("Missing body block in the "+l+" helper ")}else if(d["else"])return b.render(d["else"],c);return b}function q(b,c,d){if(b)switch(c||typeof b){case "number":return+b;case "string":return String(b);case "boolean":return Boolean("false"===b?!1:b);case "date":return new Date(b);case "context":return d.get(b)}return b}var h="undefined"!==typeof console?console:{log:function(){}};k.helpers={tap:function(b,c,d){var a=b;"function"===typeof b&&(!0===b.isFunction?a=b():(a=
"",c.tap(function(b){a+=b;return""}).render(b,d).untap(),""===a&&(a=!1)));return a},sep:function(b,c,d){return c.stack.index===c.stack.of-1?b:d.block?d.block(b,c):b},idx:function(b,c,d){return d.block?d.block(b,c.push(c.stack.index)):b},contextDump:function(b,c,d,a){a=a||{};d=a.to||"output";a=a.key||"current";d=k.helpers.tap(d,b,c);a=k.helpers.tap(a,b,c);c="full"===a?JSON.stringify(c.stack,p,2):JSON.stringify(c.stack.head,p,2);return"console"===d?(h.log(c),b):b.write(c)},"if":function(b,c,d,a){var e=
d.block,m=d["else"];if(a&&a.cond){a=a.cond;a=k.helpers.tap(a,b,c);if(eval(a)){if(e)return b.render(d.block,c);h.log("Missing body block in the if helper!");return b}if(m)return b.render(d["else"],c)}else h.log("No condition given in the if helper!");return b},math:function(b,c,d,a){if(a&&"undefined"!==typeof a.key&&a.method){var e=a.key,m=a.method,g=a.operand;a=a.round;var f=null,e=k.helpers.tap(e,b,c),g=k.helpers.tap(g,b,c);switch(m){case "mod":0!==g&&-0!==g||h.log("operand for divide operation is 0/-0: expect Nan!");
f=parseFloat(e)%parseFloat(g);break;case "add":f=parseFloat(e)+parseFloat(g);break;case "subtract":f=parseFloat(e)-parseFloat(g);break;case "multiply":f=parseFloat(e)*parseFloat(g);break;case "divide":0!==g&&-0!==g||h.log("operand for divide operation is 0/-0: expect Nan/Infinity!");f=parseFloat(e)/parseFloat(g);break;case "ceil":f=Math.ceil(parseFloat(e));break;case "floor":f=Math.floor(parseFloat(e));break;case "round":f=Math.round(parseFloat(e));break;case "abs":f=Math.abs(parseFloat(e));break;
default:h.log("method passed is not supported")}if(null!==f)return a&&(f=Math.round(f)),d&&d.block?b.render(d.block,c.push({isSelect:!0,isResolved:!1,selectKey:f})):b.write(f)}else h.log("Key is a required parameter for math helper along with method/operand!");return b},select:function(b,c,d,a){var e=d.block;if(a&&"undefined"!==typeof a.key){a=k.helpers.tap(a.key,b,c);if(e)return b.render(d.block,c.push({isSelect:!0,isResolved:!1,selectKey:a}));h.log("Missing body block in the select helper ")}else h.log("No key given in the select helper!");
return b},eq:function(b,c,d,a){a&&(a.filterOpType="eq");return l(b,c,d,a,function(a,b){return b===a})},ne:function(b,c,d,a){return a?(a.filterOpType="ne",l(b,c,d,a,function(a,b){return b!==a})):b},lt:function(b,c,d,a){if(a)return a.filterOpType="lt",l(b,c,d,a,function(a,b){return b<a})},lte:function(b,c,d,a){return a?(a.filterOpType="lte",l(b,c,d,a,function(a,b){return b<=a})):b},gt:function(b,c,d,a){return a?(a.filterOpType="gt",l(b,c,d,a,function(a,b){return b>a})):b},gte:function(b,c,d,a){return a?
(a.filterOpType="gte",l(b,c,d,a,function(a,b){return b>=a})):b},"default":function(b,c,d,a){a&&(a.filterOpType="default");return l(b,c,d,a,function(a,b){return!0})},size:function(b,c,d,a){c=0;var e;a=a||{};if((a=a.key)&&!0!==a)if(k.isArray(a))c=a.length;else if(!isNaN(parseFloat(a))&&isFinite(a))c=a;else if("object"===typeof a)for(e in c=0,a)Object.hasOwnProperty.call(a,e)&&c++;else c=(a+"").length;else c=0;return b.write(c)}}})("undefined"!==typeof exports?module.exports=require("dustjs-linkedin"):
dust);

View File

@ -135,4 +135,25 @@ public class TestFSOutputSummer {
cluster.shutdown();
}
}
@Test
public void TestDFSCheckSumType() throws Exception{
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BYTES_PER_CHECKSUM);
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_OF_DATANODES)
.build();
fileSys = cluster.getFileSystem();
try {
Path file = new Path("try.dat");
Random rand = new Random(seed);
rand.nextBytes(expected);
writeFile1(file);
} finally {
fileSys.close();
cluster.shutdown();
}
}
}

View File

@ -1157,10 +1157,11 @@ public class TestReplicationPolicy {
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
final BlockInfo info = new BlockInfo(block1, 1);
final MutableBlockCollection mbc = mock(MutableBlockCollection.class);
final BlockCollection mbc = mock(BlockCollection.class);
when(mbc.getLastBlock()).thenReturn(info);
when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1);
when(mbc.getBlockReplication()).thenReturn((short)1);
when(mbc.isUnderConstruction()).thenReturn(true);
ContentSummary cs = mock(ContentSummary.class);
when(cs.getLength()).thenReturn((long)1);
when(mbc.computeContentSummary()).thenReturn(cs);

View File

@ -82,9 +82,10 @@ public class CreateEditsLog {
blocks[iB].setBlockId(currentBlockId++);
}
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
inodeId.nextValue(), null, replication, 0, blockSize, blocks, p, "",
"", null);
final INodeFile inode = new INodeFile(inodeId.nextValue(), null,
p, 0L, 0L, blocks, replication, blockSize);
inode.toUnderConstruction("", "", null);
// Append path to filename with information about blockIDs
String path = "_" + iF + "_B" + blocks[0].getBlockId() +
"_to_B" + blocks[blocksPerFile-1].getBlockId() + "_";
@ -96,9 +97,10 @@ public class CreateEditsLog {
dirInode = new INodeDirectory(inodeId.nextValue(), null, p, 0L);
editLog.logMkDir(currentDir, dirInode);
}
editLog.logOpenFile(filePath,
new INodeFileUnderConstruction(inodeId.nextValue(), p, replication,
0, blockSize, "", "", null), false);
INodeFile fileUc = new INodeFile(inodeId.nextValue(), null,
p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
fileUc.toUnderConstruction("", "", null);
editLog.logOpenFile(filePath, fileUc, false);
editLog.logCloseFile(filePath, inode);
if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks

View File

@ -614,6 +614,47 @@ public class TestCacheDirectives {
}, 500, 60000);
}
private static void waitForCachedStats(final DistributedFileSystem dfs,
final long targetFilesAffected, final long targetBytesNeeded,
final long targetBytesCached,
final CacheDirectiveInfo filter, final String infoString)
throws Exception {
LOG.info("Polling listDirectives{" +
((filter == null) ? "ALL" : filter.toString()) +
" for " + targetFilesAffected + " targetFilesAffected, " +
targetBytesNeeded + " targetBytesNeeded, " +
targetBytesCached + " targetBytesCached");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
RemoteIterator<CacheDirectiveEntry> iter = null;
CacheDirectiveEntry entry = null;
try {
iter = dfs.listCacheDirectives(filter);
entry = iter.next();
} catch (IOException e) {
fail("got IOException while calling " +
"listCacheDirectives: " + e.getMessage());
}
Assert.assertNotNull(entry);
CacheDirectiveStats stats = entry.getStats();
if ((targetFilesAffected == stats.getFilesAffected()) &&
(targetBytesNeeded == stats.getBytesNeeded()) &&
(targetBytesCached == stats.getBytesCached())) {
return true;
} else {
LOG.info(infoString + ": filesAffected: " +
stats.getFilesAffected() + "/" + targetFilesAffected +
", bytesNeeded: " +
stats.getBytesNeeded() + "/" + targetBytesNeeded +
", bytesCached: " +
stats.getBytesCached() + "/" + targetBytesCached);
return false;
}
}
}, 500, 60000);
}
private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
final List<Path> paths, final int expectedBlocks,
final int expectedReplicas)
@ -804,21 +845,12 @@ public class TestCacheDirectives {
waitForCachedBlocks(namenode, 4, 8,
"testWaitForCachedReplicasInDirectory:1");
// Verify that listDirectives gives the stats we want.
RemoteIterator<CacheDirectiveEntry> iter =
dfs.listCacheDirectives(new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
build());
CacheDirectiveEntry entry = iter.next();
CacheDirectiveStats stats = entry.getStats();
Assert.assertEquals(Long.valueOf(2),
stats.getFilesAffected());
Assert.assertEquals(Long.valueOf(
2 * numBlocksPerFile * BLOCK_SIZE * 2),
stats.getBytesNeeded());
Assert.assertEquals(Long.valueOf(
2 * numBlocksPerFile * BLOCK_SIZE * 2),
stats.getBytesCached());
waitForCachedStats(dfs, 2,
8 * BLOCK_SIZE, 8 * BLOCK_SIZE,
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
build(),
"testWaitForCachedReplicasInDirectory:2");
long id2 = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo/bar")).
@ -827,44 +859,28 @@ public class TestCacheDirectives {
build());
// wait for an additional 2 cached replicas to come up
waitForCachedBlocks(namenode, 4, 10,
"testWaitForCachedReplicasInDirectory:2");
"testWaitForCachedReplicasInDirectory:3");
// the directory directive's stats are unchanged
iter = dfs.listCacheDirectives(
waitForCachedStats(dfs, 2,
8 * BLOCK_SIZE, 8 * BLOCK_SIZE,
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo")).
build());
entry = iter.next();
stats = entry.getStats();
Assert.assertEquals(Long.valueOf(2),
stats.getFilesAffected());
Assert.assertEquals(Long.valueOf(
2 * numBlocksPerFile * BLOCK_SIZE * 2),
stats.getBytesNeeded());
Assert.assertEquals(Long.valueOf(
2 * numBlocksPerFile * BLOCK_SIZE * 2),
stats.getBytesCached());
setPath(new Path("/foo")).
build(),
"testWaitForCachedReplicasInDirectory:4");
// verify /foo/bar's stats
iter = dfs.listCacheDirectives(
waitForCachedStats(dfs, 1,
4 * numBlocksPerFile * BLOCK_SIZE,
// only 3 because the file only has 3 replicas, not 4 as requested.
3 * numBlocksPerFile * BLOCK_SIZE,
new CacheDirectiveInfo.Builder().
setPath(new Path("/foo/bar")).
build());
entry = iter.next();
stats = entry.getStats();
Assert.assertEquals(Long.valueOf(1),
stats.getFilesAffected());
Assert.assertEquals(Long.valueOf(
4 * numBlocksPerFile * BLOCK_SIZE),
stats.getBytesNeeded());
// only 3 because the file only has 3 replicas, not 4 as requested.
Assert.assertEquals(Long.valueOf(
3 * numBlocksPerFile * BLOCK_SIZE),
stats.getBytesCached());
setPath(new Path("/foo/bar")).
build(),
"testWaitForCachedReplicasInDirectory:5");
// remove and watch numCached go to 0
dfs.removeCacheDirective(id);
dfs.removeCacheDirective(id2);
waitForCachedBlocks(namenode, 0, 0,
"testWaitForCachedReplicasInDirectory:3");
"testWaitForCachedReplicasInDirectory:6");
} finally {
cluster.shutdown();
}

View File

@ -43,8 +43,7 @@ public class TestCommitBlockSynchronization {
private static final long length = 200;
private static final long genStamp = 300;
private FSNamesystem makeNameSystemSpy(Block block,
INodeFileUnderConstruction file)
private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
throws IOException {
Configuration conf = new Configuration();
FSImage image = new FSImage(conf);
@ -58,21 +57,26 @@ public class TestCommitBlockSynchronization {
blockInfo.setGenerationStamp(genStamp);
blockInfo.initializeBlockRecovery(genStamp);
doReturn(true).when(file).removeLastBlock(any(Block.class));
doReturn(true).when(file).isUnderConstruction();
doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class));
doReturn("").when(namesystemSpy).closeFileCommitBlocks(
any(INodeFileUnderConstruction.class),
any(BlockInfo.class));
any(INodeFile.class), any(BlockInfo.class));
doReturn("").when(namesystemSpy).persistBlocks(
any(INodeFileUnderConstruction.class), anyBoolean());
any(INodeFile.class), anyBoolean());
doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
return namesystemSpy;
}
private INodeFile mockFileUnderConstruction() {
INodeFile file = mock(INodeFile.class);
return file;
}
@Test
public void testCommitBlockSynchronization() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
INodeFile file = mockFileUnderConstruction();
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[0];
@ -100,7 +104,7 @@ public class TestCommitBlockSynchronization {
@Test
public void testCommitBlockSynchronization2() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
INodeFile file = mockFileUnderConstruction();
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[0];
@ -124,7 +128,7 @@ public class TestCommitBlockSynchronization {
@Test
public void testCommitBlockSynchronizationWithDelete() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
INodeFile file = mockFileUnderConstruction();
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[0];
@ -144,7 +148,7 @@ public class TestCommitBlockSynchronization {
@Test
public void testCommitBlockSynchronizationWithClose() throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
INodeFile file = mockFileUnderConstruction();
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[0];
@ -171,7 +175,7 @@ public class TestCommitBlockSynchronization {
@Test
public void testCommitBlockSynchronizationWithCloseAndNonExistantTarget()
throws IOException {
INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
INodeFile file = mockFileUnderConstruction();
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[]{

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
@ -152,9 +153,10 @@ public class TestEditLog {
FSEditLog editLog = namesystem.getEditLog();
for (int i = 0; i < numTransactions; i++) {
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
namesystem.allocateNewInodeId(), p, replication, blockSize, 0, "",
"", null);
INodeFile inode = new INodeFile(namesystem.allocateNewInodeId(), null,
p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
inode.toUnderConstruction("", "", null);
editLog.logOpenFile("/filename" + (startIndex + i), inode, false);
editLog.logCloseFile("/filename" + (startIndex + i), inode);
editLog.logSync();

View File

@ -29,6 +29,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.util.Time;
import org.junit.Test;
import org.mockito.Mockito;
@ -282,14 +285,6 @@ public class TestINodeFile {
assertTrue(fnfe.getMessage().contains("File does not exist"));
}
//cast to INodeFileUnderConstruction, should fail
try {
INodeFileUnderConstruction.valueOf(from, path);
fail();
} catch(FileNotFoundException fnfe) {
assertTrue(fnfe.getMessage().contains("File does not exist"));
}
//cast to INodeDirectory, should fail
try {
INodeDirectory.valueOf(from, path);
@ -306,14 +301,6 @@ public class TestINodeFile {
final INodeFile f = INodeFile.valueOf(from, path);
assertTrue(f == from);
//cast to INodeFileUnderConstruction, should fail
try {
INodeFileUnderConstruction.valueOf(from, path);
fail();
} catch(IOException ioe) {
assertTrue(ioe.getMessage().contains("File is not under construction"));
}
//cast to INodeDirectory, should fail
try {
INodeDirectory.valueOf(from, path);
@ -324,19 +311,14 @@ public class TestINodeFile {
}
{//cast from INodeFileUnderConstruction
final INode from = new INodeFileUnderConstruction(
INodeId.GRANDFATHER_INODE_ID, perm, replication, 0L, 0L, "client",
"machine", null);
final INode from = new INodeFile(
INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, null, replication, 1024L);
from.asFile().toUnderConstruction("client", "machine", null);
//cast to INodeFile, should success
final INodeFile f = INodeFile.valueOf(from, path);
assertTrue(f == from);
//cast to INodeFileUnderConstruction, should success
final INodeFileUnderConstruction u = INodeFileUnderConstruction.valueOf(
from, path);
assertTrue(u == from);
//cast to INodeDirectory, should fail
try {
INodeDirectory.valueOf(from, path);
@ -358,14 +340,6 @@ public class TestINodeFile {
assertTrue(fnfe.getMessage().contains("Path is not a file"));
}
//cast to INodeFileUnderConstruction, should fail
try {
INodeFileUnderConstruction.valueOf(from, path);
fail();
} catch(FileNotFoundException fnfe) {
assertTrue(fnfe.getMessage().contains("Path is not a file"));
}
//cast to INodeDirectory, should success
final INodeDirectory d = INodeDirectory.valueOf(from, path);
assertTrue(d == from);
@ -1015,4 +989,24 @@ public class TestINodeFile {
}
}
}
@Test
public void testFileUnderConstruction() {
replication = 3;
final INodeFile file = new INodeFile(INodeId.GRANDFATHER_INODE_ID, null,
perm, 0L, 0L, null, replication, 1024L);
assertFalse(file.isUnderConstruction());
final String clientName = "client";
final String clientMachine = "machine";
file.toUnderConstruction(clientName, clientMachine, null);
assertTrue(file.isUnderConstruction());
FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
assertEquals(clientName, uc.getClientName());
assertEquals(clientMachine, uc.getClientMachine());
Assert.assertNull(uc.getClientNode());
file.toCompleteFile(Time.now());
assertFalse(file.isUnderConstruction());
}
}

View File

@ -66,7 +66,6 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
@ -720,8 +719,8 @@ public class TestRetryCacheWithHA {
@Override
boolean checkNamenodeBeforeReturn() throws Exception {
INodeFileUnderConstruction fileNode = (INodeFileUnderConstruction) cluster
.getNamesystem(0).getFSDirectory().getINode4Write(file).asFile();
INodeFile fileNode = cluster.getNamesystem(0).getFSDirectory()
.getINode4Write(file).asFile();
BlockInfoUnderConstruction blkUC =
(BlockInfoUnderConstruction) (fileNode.getBlocks())[1];
int datanodeNum = blkUC.getExpectedStorageLocations().length;

View File

@ -1227,8 +1227,9 @@ public class TestRenameWithSnapshots {
out.write(content);
fooRef = fsdir.getINode4Write(foo2.toString());
assertTrue(fooRef instanceof INodeReference.DstReference);
INode fooNode = fooRef.asFile();
assertTrue(fooNode instanceof INodeFileUnderConstructionWithSnapshot);
INodeFile fooNode = fooRef.asFile();
assertTrue(fooNode instanceof INodeFileWithSnapshot);
assertTrue(fooNode.isUnderConstruction());
} finally {
if (out != null) {
out.close();
@ -1237,8 +1238,9 @@ public class TestRenameWithSnapshots {
fooRef = fsdir.getINode4Write(foo2.toString());
assertTrue(fooRef instanceof INodeReference.DstReference);
INode fooNode = fooRef.asFile();
INodeFile fooNode = fooRef.asFile();
assertTrue(fooNode instanceof INodeFileWithSnapshot);
assertFalse(fooNode.isUnderConstruction());
restartClusterAndCheckImage(true);
}

View File

@ -314,7 +314,9 @@ public class TestSnapshotBlocksMap {
assertEquals(BLOCKSIZE, blks[0].getNumBytes());
}
/** Make sure we delete 0-sized block when deleting an INodeFileUC */
/**
* Make sure we delete 0-sized block when deleting an under-construction file
*/
@Test
public void testDeletionWithZeroSizeBlock2() throws Exception {
final Path foo = new Path("/foo");

View File

@ -170,6 +170,9 @@ Release 2.3.0 - UNRELEASED
YARN-1053. Diagnostic message from ContainerExitEvent is ignored in
ContainerImpl (Omkar Vinit Joshi via bikas)
YARN-1320. Fixed Distributed Shell application to respect custom log4j
properties file. (Xuan Gong via vinodkv)
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -220,6 +220,9 @@ public class ApplicationMaster {
// Hardcoded path to shell script in launch container's local env
private final String ExecShellStringPath = "ExecShellScript.sh";
// Hardcoded path to custom log_properties
private final String log4jPath = "log4j.properties";
private final String shellCommandPath = "shellCommands";
private volatile boolean done;
@ -327,6 +330,16 @@ public class ApplicationMaster {
"No args specified for application master to initialize");
}
//Check whether customer log4j.properties file exists
File customerLog4jFile = new File(log4jPath);
if (customerLog4jFile.exists()) {
try {
Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, log4jPath);
} catch (Exception e) {
LOG.warn("Can not set up custom log4j properties. " + e);
}
}
if (cliParser.hasOption("help")) {
printUsage(opts);
return false;

View File

@ -166,6 +166,9 @@ public class Client {
private Options opts;
private final String shellCommandPath = "shellCommands";
// Hardcoded path to custom log_properties
private final String log4jPath = "log4j.properties";
/**
* @param args Command line arguments
*/
@ -257,7 +260,16 @@ public class Client {
if (args.length == 0) {
throw new IllegalArgumentException("No args specified for client to initialize");
}
}
if (cliParser.hasOption("log_properties")) {
String log4jPath = cliParser.getOptionValue("log_properties");
try {
Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath);
} catch (Exception e) {
LOG.warn("Can not set up custom log4j properties. " + e);
}
}
if (cliParser.hasOption("help")) {
printUsage();
@ -455,16 +467,16 @@ public class Client {
// Set the log4j properties if needed
if (!log4jPropFile.isEmpty()) {
Path log4jSrc = new Path(log4jPropFile);
Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
String log4jPathSuffix = appName + "/" + appId.getId() + "/" + log4jPath;
Path log4jDst = new Path(fs.getHomeDirectory(), log4jPathSuffix);
fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
log4jRsrc.setType(LocalResourceType.FILE);
log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
log4jRsrc.setSize(log4jFileStatus.getLen());
localResources.put("log4j.properties", log4jRsrc);
LocalResource log4jRsrc =
LocalResource.newInstance(
ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()),
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
log4jFileStatus.getLen(), log4jFileStatus.getModificationTime());
localResources.put(log4jPath, log4jRsrc);
}
// The shell script has to be made available on the final container(s)

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.PropertyConfigurator;
public class Log4jPropertyHelper {
public static void updateLog4jConfiguration(Class<?> targetClass,
String log4jPath) throws Exception {
Properties customProperties = new Properties();
FileInputStream fs = null;
InputStream is = null;
try {
fs = new FileInputStream(log4jPath);
is = targetClass.getResourceAsStream("/log4j.properties");
customProperties.load(fs);
Properties originalProperties = new Properties();
originalProperties.load(is);
for (Entry<Object, Object> entry : customProperties.entrySet()) {
originalProperties.setProperty(entry.getKey().toString(), entry
.getValue().toString());
}
LogManager.resetConfiguration();
PropertyConfigurator.configure(originalProperties);
}finally {
IOUtils.closeQuietly(is);
IOUtils.closeQuietly(fs);
}
}
}

View File

@ -25,6 +25,7 @@ import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@ -174,6 +175,68 @@ public class TestDistributedShell {
}
@Test(timeout=90000)
public void testDSShellWithCustomLogPropertyFile() throws Exception {
final File basedir =
new File("target", TestDistributedShell.class.getName());
final File tmpDir = new File(basedir, "tmpDir");
tmpDir.mkdirs();
final File customLogProperty = new File(tmpDir, "custom_log4j.properties");
if (customLogProperty.exists()) {
customLogProperty.delete();
}
if(!customLogProperty.createNewFile()) {
Assert.fail("Can not create custom log4j property file.");
}
PrintWriter fileWriter = new PrintWriter(customLogProperty);
// set the output to DEBUG level
fileWriter.write("log4j.rootLogger=debug,stdout");
fileWriter.close();
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"3",
"--shell_command",
"echo",
"--shell_args",
"HADOOP",
"--log_properties",
customLogProperty.getAbsolutePath(),
"--master_memory",
"512",
"--master_vcores",
"2",
"--container_memory",
"128",
"--container_vcores",
"1"
};
//Before run the DS, the default the log level is INFO
final Log LOG_Client =
LogFactory.getLog(Client.class);
Assert.assertTrue(LOG_Client.isInfoEnabled());
Assert.assertFalse(LOG_Client.isDebugEnabled());
final Log LOG_AM = LogFactory.getLog(ApplicationMaster.class);
Assert.assertTrue(LOG_AM.isInfoEnabled());
Assert.assertFalse(LOG_AM.isDebugEnabled());
LOG.info("Initializing DS Client");
final Client client =
new Client(new Configuration(yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
Assert.assertTrue(verifyContainerLog(3, null, true, "DEBUG") > 10);
//After DS is finished, the log level should be DEBUG
Assert.assertTrue(LOG_Client.isInfoEnabled());
Assert.assertTrue(LOG_Client.isDebugEnabled());
Assert.assertTrue(LOG_AM.isInfoEnabled());
Assert.assertTrue(LOG_AM.isDebugEnabled());
}
public void testDSShellWithCommands() throws Exception {
String[] args = {