HDFS-12754. Lease renewal can hit a deadlock. Contributed by Kuhu Shukla.

(cherry picked from commit 45f59bde60)
This commit is contained in:
Kihwal Lee 2017-11-27 17:01:34 -06:00
parent b756beb679
commit 3516ef45f3
6 changed files with 122 additions and 58 deletions

View File

@ -484,12 +484,21 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
/** Get a lease and start automatic renewal */ /** Get a lease and start automatic renewal */
private void beginFileLease(final long inodeId, final DFSOutputStream out) private void beginFileLease(final long inodeId, final DFSOutputStream out)
throws IOException { throws IOException {
getLeaseRenewer().put(inodeId, out, this); synchronized (filesBeingWritten) {
putFileBeingWritten(inodeId, out);
getLeaseRenewer().put(this);
}
} }
/** Stop renewal of lease for the file. */ /** Stop renewal of lease for the file. */
void endFileLease(final long inodeId) { void endFileLease(final long inodeId) {
getLeaseRenewer().closeFile(inodeId, this); synchronized (filesBeingWritten) {
removeFileBeingWritten(inodeId);
// remove client from renewer if no files are open
if (filesBeingWritten.isEmpty()) {
getLeaseRenewer().closeClient(this);
}
}
} }
@ -615,9 +624,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if(clientRunning) { if(clientRunning) {
// lease renewal stops when all files are closed
closeAllFilesBeingWritten(false); closeAllFilesBeingWritten(false);
clientRunning = false; clientRunning = false;
getLeaseRenewer().closeClient(this);
// close connections to the namenode // close connections to the namenode
closeConnectionToNamenode(); closeConnectionToNamenode();
} }

View File

@ -63,4 +63,6 @@ public class DFSClientFaultInjector {
} }
public void sleepBeforeHedgedGet() {} public void sleepBeforeHedgedGet() {}
public void delayWhenRenewLeaseTimeout() {}
} }

View File

@ -30,7 +30,7 @@ import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSClientFaultInjector;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -76,7 +76,7 @@ import org.slf4j.LoggerFactory;
public class LeaseRenewer { public class LeaseRenewer {
static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class); static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L; private static long leaseRenewerGraceDefault = 60*1000L;
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
/** Get a {@link LeaseRenewer} instance */ /** Get a {@link LeaseRenewer} instance */
@ -156,9 +156,7 @@ public class LeaseRenewer {
final LeaseRenewer stored = renewers.get(r.factorykey); final LeaseRenewer stored = renewers.get(r.factorykey);
//Since a renewer may expire, the stored renewer can be different. //Since a renewer may expire, the stored renewer can be different.
if (r == stored) { if (r == stored) {
if (!r.clientsRunning()) { renewers.remove(r.factorykey);
renewers.remove(r.factorykey);
}
} }
} }
} }
@ -201,7 +199,7 @@ public class LeaseRenewer {
private LeaseRenewer(Factory.Key factorykey) { private LeaseRenewer(Factory.Key factorykey) {
this.factorykey = factorykey; this.factorykey = factorykey;
unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT); unsyncSetGraceSleepPeriod(leaseRenewerGraceDefault);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
instantiationTrace = StringUtils.stringifyException( instantiationTrace = StringUtils.stringifyException(
@ -293,8 +291,7 @@ public class LeaseRenewer {
&& Time.monotonicNow() - emptyTime > gracePeriod; && Time.monotonicNow() - emptyTime > gracePeriod;
} }
public synchronized void put(final long inodeId, final DFSOutputStream out, public synchronized void put(final DFSClient dfsc) {
final DFSClient dfsc) {
if (dfsc.isClientRunning()) { if (dfsc.isClientRunning()) {
if (!isRunning() || isRenewerExpired()) { if (!isRunning() || isRenewerExpired()) {
//start a new deamon with a new id. //start a new deamon with a new id.
@ -328,7 +325,6 @@ public class LeaseRenewer {
}); });
daemon.start(); daemon.start();
} }
dfsc.putFileBeingWritten(inodeId, out);
emptyTime = Long.MAX_VALUE; emptyTime = Long.MAX_VALUE;
} }
} }
@ -338,28 +334,6 @@ public class LeaseRenewer {
emptyTime = time; emptyTime = time;
} }
/** Close a file. */
public void closeFile(final long inodeId, final DFSClient dfsc) {
dfsc.removeFileBeingWritten(inodeId);
synchronized(this) {
if (dfsc.isFilesBeingWrittenEmpty()) {
dfsclients.remove(dfsc);
}
//update emptyTime if necessary
if (emptyTime == Long.MAX_VALUE) {
for(DFSClient c : dfsclients) {
if (!c.isFilesBeingWrittenEmpty()) {
//found a non-empty file-being-written map
return;
}
}
//discover the first time that all file-being-written maps are empty.
emptyTime = Time.monotonicNow();
}
}
}
/** Close the given client. */ /** Close the given client. */
public synchronized void closeClient(final DFSClient dfsc) { public synchronized void closeClient(final DFSClient dfsc) {
dfsclients.remove(dfsc); dfsclients.remove(dfsc);
@ -447,14 +421,17 @@ public class LeaseRenewer {
} catch (SocketTimeoutException ie) { } catch (SocketTimeoutException ie) {
LOG.warn("Failed to renew lease for " + clientsString() + " for " LOG.warn("Failed to renew lease for " + clientsString() + " for "
+ (elapsed/1000) + " seconds. Aborting ...", ie); + (elapsed/1000) + " seconds. Aborting ...", ie);
List<DFSClient> dfsclientsCopy;
synchronized (this) { synchronized (this) {
while (!dfsclients.isEmpty()) { DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
DFSClient dfsClient = dfsclients.get(0); dfsclientsCopy = new ArrayList<>(dfsclients);
dfsClient.closeAllFilesBeingWritten(true); dfsclients.clear();
closeClient(dfsClient);
}
//Expire the current LeaseRenewer thread. //Expire the current LeaseRenewer thread.
emptyTime = 0; emptyTime = 0;
Factory.INSTANCE.remove(LeaseRenewer.this);
}
for (DFSClient dfsClient : dfsclientsCopy) {
dfsClient.closeAllFilesBeingWritten(true);
} }
break; break;
} catch (IOException ie) { } catch (IOException ie) {
@ -511,4 +488,10 @@ public class LeaseRenewer {
return b.append("]").toString(); return b.append("]").toString();
} }
} }
@VisibleForTesting
public static void setLeaseRenewerGraceDefault(
long leaseRenewerGraceDefault) {
LeaseRenewer.leaseRenewerGraceDefault = leaseRenewerGraceDefault;
}
} }

View File

@ -109,7 +109,7 @@ public class TestLeaseRenewer {
// Set up a file so that we start renewing our lease. // Set up a file so that we start renewing our lease.
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class); DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
long fileId = 123L; long fileId = 123L;
renewer.put(fileId, mockStream, MOCK_DFSCLIENT); renewer.put(MOCK_DFSCLIENT);
// Wait for lease to get renewed // Wait for lease to get renewed
long failTime = Time.monotonicNow() + 5000; long failTime = Time.monotonicNow() + 5000;
@ -121,7 +121,7 @@ public class TestLeaseRenewer {
Assert.fail("Did not renew lease at all!"); Assert.fail("Did not renew lease at all!");
} }
renewer.closeFile(fileId, MOCK_DFSCLIENT); renewer.closeClient(MOCK_DFSCLIENT);
} }
/** /**
@ -136,11 +136,8 @@ public class TestLeaseRenewer {
Mockito.doReturn(false).when(mockClient1).renewLease(); Mockito.doReturn(false).when(mockClient1).renewLease();
assertSame(renewer, LeaseRenewer.getInstance( assertSame(renewer, LeaseRenewer.getInstance(
FAKE_AUTHORITY, FAKE_UGI_A, mockClient1)); FAKE_AUTHORITY, FAKE_UGI_A, mockClient1));
// Set up a file so that we start renewing our lease.
DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
long fileId = 456L; long fileId = 456L;
renewer.put(fileId, mockStream1, mockClient1); renewer.put(mockClient1);
// Second DFSClient does renew lease // Second DFSClient does renew lease
final DFSClient mockClient2 = createMockClient(); final DFSClient mockClient2 = createMockClient();
@ -148,9 +145,7 @@ public class TestLeaseRenewer {
assertSame(renewer, LeaseRenewer.getInstance( assertSame(renewer, LeaseRenewer.getInstance(
FAKE_AUTHORITY, FAKE_UGI_A, mockClient2)); FAKE_AUTHORITY, FAKE_UGI_A, mockClient2));
// Set up a file so that we start renewing our lease. renewer.put(mockClient2);
DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
renewer.put(fileId, mockStream2, mockClient2);
// Wait for lease to get renewed // Wait for lease to get renewed
@ -171,19 +166,17 @@ public class TestLeaseRenewer {
} }
}, 100, 10000); }, 100, 10000);
renewer.closeFile(fileId, mockClient1); renewer.closeClient(mockClient1);
renewer.closeFile(fileId, mockClient2); renewer.closeClient(mockClient2);
} }
@Test @Test
public void testThreadName() throws Exception { public void testThreadName() throws Exception {
DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
long fileId = 789L;
Assert.assertFalse("Renewer not initially running", Assert.assertFalse("Renewer not initially running",
renewer.isRunning()); renewer.isRunning());
// Pretend to open a file // Pretend to open a file
renewer.put(fileId, mockStream, MOCK_DFSCLIENT); renewer.put(MOCK_DFSCLIENT);
Assert.assertTrue("Renewer should have started running", Assert.assertTrue("Renewer should have started running",
renewer.isRunning()); renewer.isRunning());
@ -193,7 +186,7 @@ public class TestLeaseRenewer {
Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName); Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName);
// Pretend to close the file // Pretend to close the file
renewer.closeFile(fileId, MOCK_DFSCLIENT); renewer.closeClient(MOCK_DFSCLIENT);
renewer.setEmptyTime(Time.monotonicNow()); renewer.setEmptyTime(Time.monotonicNow());
// Should stop the renewer running within a few seconds // Should stop the renewer running within a few seconds

View File

@ -47,6 +47,7 @@ import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -1236,4 +1237,83 @@ public class TestDFSClientRetries {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test(timeout=120000)
public void testLeaseRenewAndDFSOutputStreamDeadLock() throws Exception {
final CountDownLatch testLatch = new CountDownLatch(1);
DFSClientFaultInjector.set(new DFSClientFaultInjector() {
public void delayWhenRenewLeaseTimeout() {
try {
testLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
String file1 = "/testFile1";
// Set short retry timeouts so this test runs faster
conf.setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
final NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc());
doAnswer(new SleepFixedTimeAnswer(1500, testLatch)).when(spyNN).complete(
anyString(), anyString(), any(ExtendedBlock.class), anyLong());
DFSClient client = new DFSClient(null, spyNN, conf, null);
// Get hold of the lease renewer instance used by the client
LeaseRenewer leaseRenewer = client.getLeaseRenewer();
leaseRenewer.setRenewalTime(100);
final OutputStream out1 = client.create(file1, false);
out1.write(new byte[256]);
Thread closeThread = new Thread(new Runnable() {
@Override public void run() {
try {
//1. trigger get LeaseRenewer lock
Mockito.doThrow(new SocketTimeoutException()).when(spyNN)
.renewLease(Mockito.anyString());
} catch (IOException e) {
e.printStackTrace();
}
}
});
closeThread.start();
//2. trigger get DFSOutputStream lock
out1.close();
} finally {
cluster.shutdown();
}
}
private static class SleepFixedTimeAnswer implements Answer<Object> {
private final int sleepTime;
private final CountDownLatch testLatch;
SleepFixedTimeAnswer(int sleepTime, CountDownLatch latch) {
this.sleepTime = sleepTime;
this.testLatch = latch;
}
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {
interrupted = true;
}
try {
return invocation.callRealMethod();
} finally {
testLatch.countDown();
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
} }

View File

@ -290,6 +290,7 @@ public class TestDistributedFileSystem {
Configuration conf = getTestConfiguration(); Configuration conf = getTestConfiguration();
final long grace = 1000L; final long grace = 1000L;
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
LeaseRenewer.setLeaseRenewerGraceDefault(grace);
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
@ -302,10 +303,6 @@ public class TestDistributedFileSystem {
{ {
final DistributedFileSystem dfs = cluster.getFileSystem(); final DistributedFileSystem dfs = cluster.getFileSystem();
Method setMethod = dfs.dfs.getLeaseRenewer().getClass()
.getDeclaredMethod("setGraceSleepPeriod", long.class);
setMethod.setAccessible(true);
setMethod.invoke(dfs.dfs.getLeaseRenewer(), grace);
Method checkMethod = dfs.dfs.getLeaseRenewer().getClass() Method checkMethod = dfs.dfs.getLeaseRenewer().getClass()
.getDeclaredMethod("isRunning"); .getDeclaredMethod("isRunning");
checkMethod.setAccessible(true); checkMethod.setAccessible(true);