svn merge -c 1363170 FIXES: HDFS-3646. LeaseRenewer can hold reference to inactive DFSClient instances forever.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1363172 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Daryn Sharp 2012-07-18 23:27:21 +00:00
parent 52fa40b8ba
commit 22022a8760
11 changed files with 109 additions and 65 deletions

View File

@ -1251,6 +1251,9 @@ Release 0.23.3 - UNRELEASED
StreamingOutput, otherwise, it will fail to transfer large files.
(szetszwo)
HDFS-3646. LeaseRenewer can hold reference to inactive DFSClient
instances forever. (Kihwal Lee via daryn)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -177,7 +177,7 @@ public class DFSClient implements java.io.Closeable {
final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
final LeaseRenewer leaserenewer;
private final String authority;
final SocketCache socketCache;
final Conf dfsClientConf;
private Random r = new Random();
@ -347,9 +347,9 @@ public DFSClient(URI nameNodeUri, Configuration conf,
this.hdfsTimeout = Client.getTimeout(conf);
this.ugi = UserGroupInformation.getCurrentUser();
final String authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
@ -477,7 +477,30 @@ void checkOpen() throws IOException {
}
}
/** Put a file. */
/** Return the lease renewer instance. The renewer thread won't start
* until the first output stream is created. The same instance will
* be returned until all output streams are closed.
*/
public synchronized LeaseRenewer getLeaseRenewer() throws IOException {
return LeaseRenewer.getInstance(authority, ugi, this);
}
/** Get a lease and start automatic renewal */
private void beginFileLease(final String src, final DFSOutputStream out)
throws IOException {
getLeaseRenewer().put(src, out, this);
}
/** Stop renewal of lease for the file. */
void endFileLease(final String src) throws IOException {
getLeaseRenewer().closeFile(src, this);
}
/** Put a file. Only called from LeaseRenewer, where proper locking is
* enforced to consistently update its local dfsclients array and
* client's filesBeingWritten map.
*/
void putFileBeingWritten(final String src, final DFSOutputStream out) {
synchronized(filesBeingWritten) {
filesBeingWritten.put(src, out);
@ -490,7 +513,7 @@ void putFileBeingWritten(final String src, final DFSOutputStream out) {
}
}
/** Remove a file. */
/** Remove a file. Only called from LeaseRenewer. */
void removeFileBeingWritten(final String src) {
synchronized(filesBeingWritten) {
filesBeingWritten.remove(src);
@ -566,6 +589,14 @@ void abort() {
clientRunning = false;
closeAllFilesBeingWritten(true);
socketCache.clear();
try {
// remove reference to this client and stop the renewer,
// if there is no more clients under the renewer.
getLeaseRenewer().closeClient(this);
} catch (IOException ioe) {
LOG.info("Exception occurred while aborting the client. " + ioe);
}
closeConnectionToNamenode();
}
@ -606,7 +637,7 @@ public synchronized void close() throws IOException {
closeAllFilesBeingWritten(false);
socketCache.clear();
clientRunning = false;
leaserenewer.closeClient(this);
getLeaseRenewer().closeClient(this);
// close connections to the namenode
closeConnectionToNamenode();
}
@ -1103,7 +1134,7 @@ public DFSOutputStream create(String src,
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
buffersize, dfsClientConf.createChecksum());
leaserenewer.put(src, result, this);
beginFileLease(src, result);
return result;
}
@ -1153,7 +1184,7 @@ public DFSOutputStream primitiveCreate(String src,
flag, createParent, replication, blockSize, progress, buffersize,
checksum);
}
leaserenewer.put(src, result, this);
beginFileLease(src, result);
return result;
}
@ -1239,7 +1270,7 @@ private DFSOutputStream append(String src, int buffersize, Progressable progress
+ src + " on client " + clientName);
}
final DFSOutputStream result = callAppend(stat, src, buffersize, progress);
leaserenewer.put(src, result, this);
beginFileLease(src, result);
return result;
}

View File

@ -1668,6 +1668,7 @@ synchronized void abort() throws IOException {
streamer.setLastException(new IOException("Lease timeout of " +
(dfsClient.hdfsTimeout/1000) + " seconds expired."));
closeThreads(true);
dfsClient.endFileLease(src);
}
// shutdown datastreamer and responseprocessor threads.
@ -1722,7 +1723,7 @@ public synchronized void close() throws IOException {
ExtendedBlock lastBlock = streamer.getBlock();
closeThreads(false);
completeFile(lastBlock);
dfsClient.leaserenewer.closeFile(src, dfsClient);
dfsClient.endFileLease(src);
} finally {
closed = true;
}

View File

@ -157,9 +157,6 @@ private synchronized void remove(final LeaseRenewer r) {
}
}
private final String clienNamePostfix = DFSUtil.getRandom().nextInt()
+ "_" + Thread.currentThread().getId();
/** The time in milliseconds that the map became empty. */
private long emptyTime = Long.MAX_VALUE;
/** A fixed lease renewal time period in milliseconds */
@ -213,11 +210,6 @@ private synchronized long getRenewalTime() {
return renewal;
}
/** @return the client name for the given id. */
String getClientName(final String id) {
return "DFSClient_" + id + "_" + clienNamePostfix;
}
/** Add a client. */
private synchronized void addClient(final DFSClient dfsc) {
for(DFSClient c : dfsclients) {
@ -271,6 +263,11 @@ private void unsyncSetGraceSleepPeriod(final long gracePeriod) {
synchronized boolean isRunning() {
return daemon != null && daemon.isAlive();
}
/** Does this renewer have nothing to renew? */
public boolean isEmpty() {
return dfsclients.isEmpty();
}
/** Used only by tests */
synchronized String getDaemonName() {
@ -331,6 +328,9 @@ void closeFile(final String src, final DFSClient dfsc) {
dfsc.removeFileBeingWritten(src);
synchronized(this) {
if (dfsc.isFilesBeingWrittenEmpty()) {
dfsclients.remove(dfsc);
}
//update emptyTime if necessary
if (emptyTime == Long.MAX_VALUE) {
for(DFSClient c : dfsclients) {
@ -428,8 +428,7 @@ public int compare(final DFSClient left, final DFSClient right) {
* when the lease period is half over.
*/
private void run(final int id) throws InterruptedException {
for(long lastRenewed = Time.now();
clientsRunning() && !Thread.interrupted();
for(long lastRenewed = Time.now(); !Thread.interrupted();
Thread.sleep(getSleepPeriod())) {
final long elapsed = Time.now() - lastRenewed;
if (elapsed >= getRenewalTime()) {
@ -469,6 +468,13 @@ private void run(final int id) throws InterruptedException {
//no longer the current daemon or expired
return;
}
// if no clients are in running state or there is no more clients
// registered with this renewer, stop the daemon after the grace
// period.
if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
emptyTime = Time.now();
}
}
}
}

View File

@ -33,7 +33,7 @@ public static void setDFSClient(DistributedFileSystem dfs, DFSClient client) {
public static void stopLeaseRenewer(DistributedFileSystem dfs) throws IOException {
try {
dfs.dfs.leaserenewer.interruptAndJoin();
dfs.dfs.getLeaseRenewer().interruptAndJoin();
} catch (InterruptedException e) {
throw new IOException(e);
}

View File

@ -174,78 +174,78 @@ public void testDFSClient() throws Exception {
{
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
dfs.dfs.leaserenewer.setGraceSleepPeriod(grace);
assertFalse(dfs.dfs.leaserenewer.isRunning());
dfs.dfs.getLeaseRenewer().setGraceSleepPeriod(grace);
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
{
//create a file
final FSDataOutputStream out = dfs.create(filepaths[0]);
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
//write something
out.writeLong(millis);
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
//close
out.close();
Thread.sleep(grace/4*3);
//within grace period
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
for(int i = 0; i < 3; i++) {
if (dfs.dfs.leaserenewer.isRunning()) {
if (dfs.dfs.getLeaseRenewer().isRunning()) {
Thread.sleep(grace/2);
}
}
//passed grace period
assertFalse(dfs.dfs.leaserenewer.isRunning());
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
}
{
//create file1
final FSDataOutputStream out1 = dfs.create(filepaths[1]);
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
//create file2
final FSDataOutputStream out2 = dfs.create(filepaths[2]);
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
//write something to file1
out1.writeLong(millis);
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
//close file1
out1.close();
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
//write something to file2
out2.writeLong(millis);
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
//close file2
out2.close();
Thread.sleep(grace/4*3);
//within grace period
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
}
{
//create file3
final FSDataOutputStream out3 = dfs.create(filepaths[3]);
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
Thread.sleep(grace/4*3);
//passed previous grace period, should still running
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
//write something to file3
out3.writeLong(millis);
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
//close file3
out3.close();
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
Thread.sleep(grace/4*3);
//within grace period
assertTrue(dfs.dfs.leaserenewer.isRunning());
assertTrue(dfs.dfs.getLeaseRenewer().isRunning());
for(int i = 0; i < 3; i++) {
if (dfs.dfs.leaserenewer.isRunning()) {
if (dfs.dfs.getLeaseRenewer().isRunning()) {
Thread.sleep(grace/2);
}
}
//passed grace period
assertFalse(dfs.dfs.leaserenewer.isRunning());
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
}
dfs.close();
@ -274,15 +274,15 @@ public void testDFSClient() throws Exception {
{
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
assertFalse(dfs.dfs.leaserenewer.isRunning());
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
//open and check the file
FSDataInputStream in = dfs.open(filepaths[0]);
assertFalse(dfs.dfs.leaserenewer.isRunning());
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
assertEquals(millis, in.readLong());
assertFalse(dfs.dfs.leaserenewer.isRunning());
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
in.close();
assertFalse(dfs.dfs.leaserenewer.isRunning());
assertFalse(dfs.dfs.getLeaseRenewer().isRunning());
dfs.close();
}

View File

@ -182,7 +182,7 @@ public void run() {
// has not been completed in the NN.
// Lose the leases
LOG.info("Killing lease checker");
client.leaserenewer.interruptAndJoin();
client.getLeaseRenewer().interruptAndJoin();
FileSystem fs1 = cluster.getFileSystem();
FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
@ -255,7 +255,7 @@ public void run() {
// has not been completed in the NN.
// Lose the leases
LOG.info("Killing lease checker");
client.leaserenewer.interruptAndJoin();
client.getLeaseRenewer().interruptAndJoin();
FileSystem fs1 = cluster.getFileSystem();
FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(

View File

@ -83,6 +83,7 @@ public void testLeaseAbort() throws Exception {
// We don't need to wait the lease renewer thread to act.
// call renewLease() manually.
// make it look like lease has already expired.
LeaseRenewer originalRenewer = dfs.getLeaseRenewer();
dfs.lastLeaseRenewal = Time.now() - 300000;
dfs.renewLease();
@ -95,6 +96,10 @@ public void testLeaseAbort() throws Exception {
LOG.info("Write failed as expected. ", e);
}
// If aborted, the renewer should be empty. (no reference to clients)
Thread.sleep(1000);
Assert.assertTrue(originalRenewer.isEmpty());
// unstub
doNothing().when(spyNN).renewLease(anyString());
@ -168,15 +173,20 @@ public void testFactory() throws Exception {
final Configuration conf = new Configuration();
final DFSClient c1 = createDFSClientAs(ugi[0], conf);
FSDataOutputStream out1 = createFsOut(c1, "/out1");
final DFSClient c2 = createDFSClientAs(ugi[0], conf);
Assert.assertEquals(c1.leaserenewer, c2.leaserenewer);
FSDataOutputStream out2 = createFsOut(c2, "/out2");
Assert.assertEquals(c1.getLeaseRenewer(), c2.getLeaseRenewer());
final DFSClient c3 = createDFSClientAs(ugi[1], conf);
Assert.assertTrue(c1.leaserenewer != c3.leaserenewer);
FSDataOutputStream out3 = createFsOut(c3, "/out3");
Assert.assertTrue(c1.getLeaseRenewer() != c3.getLeaseRenewer());
final DFSClient c4 = createDFSClientAs(ugi[1], conf);
Assert.assertEquals(c3.leaserenewer, c4.leaserenewer);
FSDataOutputStream out4 = createFsOut(c4, "/out4");
Assert.assertEquals(c3.getLeaseRenewer(), c4.getLeaseRenewer());
final DFSClient c5 = createDFSClientAs(ugi[2], conf);
Assert.assertTrue(c1.leaserenewer != c5.leaserenewer);
Assert.assertTrue(c3.leaserenewer != c5.leaserenewer);
FSDataOutputStream out5 = createFsOut(c5, "/out5");
Assert.assertTrue(c1.getLeaseRenewer() != c5.getLeaseRenewer());
Assert.assertTrue(c3.getLeaseRenewer() != c5.getLeaseRenewer());
}
private FSDataOutputStream createFsOut(DFSClient dfs, String path)

View File

@ -171,7 +171,7 @@ private Path createFile(final String filestr, final int size,
if (triggerLeaseRenewerInterrupt) {
AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
dfs.dfs.leaserenewer.interruptAndJoin();
dfs.dfs.getLeaseRenewer().interruptAndJoin();
}
return filepath;
}
@ -276,7 +276,7 @@ public void testHardLeaseRecovery() throws Exception {
// kill the lease renewal thread
AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
dfs.dfs.leaserenewer.interruptAndJoin();
dfs.dfs.getLeaseRenewer().interruptAndJoin();
// set the hard limit to be 1 second
cluster.setLeasePeriod(LONG_LEASE_PERIOD, SHORT_LEASE_PERIOD);
@ -341,7 +341,7 @@ public void testSoftLeaseRecovery() throws Exception {
AppendTestUtil.LOG.info("hflush");
stm.hflush();
AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
dfs.dfs.leaserenewer.interruptAndJoin();
dfs.dfs.getLeaseRenewer().interruptAndJoin();
// set the soft limit to be 1 second so that the
// namenode triggers lease recovery on next attempt to write-for-open.
@ -463,7 +463,7 @@ public void hardLeaseRecoveryRestartHelper(boolean doRename, int size)
// kill the lease renewal thread
AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
dfs.dfs.leaserenewer.interruptAndJoin();
dfs.dfs.getLeaseRenewer().interruptAndJoin();
// Make sure the DNs don't send a heartbeat for a while, so the blocks
// won't actually get completed during lease recovery.

View File

@ -93,13 +93,6 @@ public void testInstanceSharing() throws IOException {
Assert.assertNotSame(lr3, lr4);
}
@Test
public void testClientName() throws IOException {
String clientName = renewer.getClientName("NONMAPREDUCE");
Assert.assertTrue("bad client name: " + clientName,
clientName.startsWith("DFSClient_NONMAPREDUCE_"));
}
@Test
public void testRenewal() throws Exception {
// Keep track of how many times the lease gets renewed

View File

@ -88,7 +88,7 @@ public void pipeline_02_03() throws Exception {
// of data can be read successfully.
checkFile(p, half, conf);
AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
((DistributedFileSystem)fs).dfs.leaserenewer.interruptAndJoin();
((DistributedFileSystem)fs).dfs.getLeaseRenewer().interruptAndJoin();
//c. On M1, append another half block of data. Close file on M1.
{