HDFS-7314. When the DFSClient lease cannot be renewed, abort open-for-write files rather than the entire DFSClient. (mingma)
(cherry picked from commitfbd88f1062
) (cherry picked from commit516bbf1c20
) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java (cherry picked from commit fb1bf424bdad20fff7ab390ce75c4bec558e7e6d)
This commit is contained in:
parent
4239513ec0
commit
dda8e0e328
|
@ -904,22 +904,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
RPC.stopProxy(namenode);
|
||||
}
|
||||
|
||||
/** Abort and release resources held. Ignore all errors. */
|
||||
void abort() {
|
||||
clientRunning = false;
|
||||
closeAllFilesBeingWritten(true);
|
||||
try {
|
||||
// remove reference to this client and stop the renewer,
|
||||
// if there is no more clients under the renewer.
|
||||
getLeaseRenewer().closeClient(this);
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Exception occurred while aborting the client " + ioe);
|
||||
}
|
||||
closeConnectionToNamenode();
|
||||
}
|
||||
|
||||
/** Close/abort all files being written. */
|
||||
private void closeAllFilesBeingWritten(final boolean abort) {
|
||||
public void closeAllFilesBeingWritten(final boolean abort) {
|
||||
for(;;) {
|
||||
final long inodeId;
|
||||
final DFSOutputStream out;
|
||||
|
|
|
@ -211,6 +211,12 @@ class LeaseRenewer {
|
|||
return renewal;
|
||||
}
|
||||
|
||||
/** Used for testing only. */
|
||||
@VisibleForTesting
|
||||
public synchronized void setRenewalTime(final long renewal) {
|
||||
this.renewal = renewal;
|
||||
}
|
||||
|
||||
/** Add a client. */
|
||||
private synchronized void addClient(final DFSClient dfsc) {
|
||||
for(DFSClient c : dfsclients) {
|
||||
|
@ -450,8 +456,12 @@ class LeaseRenewer {
|
|||
+ (elapsed/1000) + " seconds. Aborting ...", ie);
|
||||
synchronized (this) {
|
||||
while (!dfsclients.isEmpty()) {
|
||||
dfsclients.get(0).abort();
|
||||
DFSClient dfsClient = dfsclients.get(0);
|
||||
dfsClient.closeAllFilesBeingWritten(true);
|
||||
closeClient(dfsClient);
|
||||
}
|
||||
//Expire the current LeaseRenewer thread.
|
||||
emptyTime = 0;
|
||||
}
|
||||
break;
|
||||
} catch (IOException ie) {
|
||||
|
|
|
@ -32,6 +32,7 @@ import static org.mockito.Matchers.anyString;
|
|||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -63,6 +64,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.LeaseRenewer;
|
||||
import org.apache.hadoop.hdfs.client.HdfsUtils;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
|
@ -356,6 +358,58 @@ public class TestDFSClientRetries {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test DFSClient can continue to function after renewLease RPC
|
||||
* receives SocketTimeoutException.
|
||||
*/
|
||||
@Test
|
||||
public void testLeaseRenewSocketTimeout() throws Exception
|
||||
{
|
||||
String file1 = "/testFile1";
|
||||
String file2 = "/testFile2";
|
||||
// Set short retry timeouts so this test runs faster
|
||||
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
|
||||
conf.setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2 * 1000);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc());
|
||||
Mockito.doThrow(new SocketTimeoutException()).when(spyNN).renewLease(
|
||||
Mockito.anyString());
|
||||
DFSClient client = new DFSClient(null, spyNN, conf, null);
|
||||
// Get hold of the lease renewer instance used by the client
|
||||
LeaseRenewer leaseRenewer = client.getLeaseRenewer();
|
||||
leaseRenewer.setRenewalTime(100);
|
||||
OutputStream out1 = client.create(file1, false);
|
||||
|
||||
Mockito.verify(spyNN, timeout(10000).times(1)).renewLease(
|
||||
Mockito.anyString());
|
||||
verifyEmptyLease(leaseRenewer);
|
||||
try {
|
||||
out1.write(new byte[256]);
|
||||
fail("existing output stream should be aborted");
|
||||
} catch (IOException e) {
|
||||
}
|
||||
|
||||
// Verify DFSClient can do read operation after renewLease aborted.
|
||||
client.exists(file2);
|
||||
// Verify DFSClient can do write operation after renewLease no longer
|
||||
// throws SocketTimeoutException.
|
||||
Mockito.doNothing().when(spyNN).renewLease(
|
||||
Mockito.anyString());
|
||||
leaseRenewer = client.getLeaseRenewer();
|
||||
leaseRenewer.setRenewalTime(100);
|
||||
OutputStream out2 = client.create(file2, false);
|
||||
Mockito.verify(spyNN, timeout(10000).times(2)).renewLease(
|
||||
Mockito.anyString());
|
||||
out2.write(new byte[256]);
|
||||
out2.close();
|
||||
verifyEmptyLease(leaseRenewer);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that getAdditionalBlock() and close() are idempotent. This allows
|
||||
* a client to safely retry a call and still produce a correct
|
||||
|
@ -671,6 +725,14 @@ public class TestDFSClientRetries {
|
|||
return ret;
|
||||
}
|
||||
|
||||
private void verifyEmptyLease(LeaseRenewer leaseRenewer) throws Exception {
|
||||
int sleepCount = 0;
|
||||
while (!leaseRenewer.isEmpty() && sleepCount++ < 20) {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
assertTrue("Lease should be empty.", leaseRenewer.isEmpty());
|
||||
}
|
||||
|
||||
class DFSClientReader implements Runnable {
|
||||
|
||||
DFSClient client;
|
||||
|
|
Loading…
Reference in New Issue