svn merge -c 1297328 from trunk for HDFS-3032.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1297334 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3d48d8cf6e
commit
0b7ac49953
|
@ -217,6 +217,9 @@ Release 0.23.2 - UNRELEASED
|
||||||
HDFS-3012. Exception while renewing delegation token. (Bobby Evans via
|
HDFS-3012. Exception while renewing delegation token. (Bobby Evans via
|
||||||
jitendra)
|
jitendra)
|
||||||
|
|
||||||
|
HDFS-3032. Change DFSClient.renewLease() so that it only retries up to the
|
||||||
|
lease soft-limit. (Kihwal Lee via szetszwo)
|
||||||
|
|
||||||
Release 0.23.1 - 2012-02-17
|
Release 0.23.1 - 2012-02-17
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -18,6 +18,31 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
|
@ -59,7 +84,6 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
|
@ -83,6 +107,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
||||||
|
@ -105,7 +130,6 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenRenewer;
|
import org.apache.hadoop.security.token.TokenRenewer;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
|
||||||
|
|
||||||
/********************************************************
|
/********************************************************
|
||||||
* DFSClient can connect to a Hadoop Filesystem and
|
* DFSClient can connect to a Hadoop Filesystem and
|
||||||
|
@ -127,6 +151,7 @@ public class DFSClient implements java.io.Closeable {
|
||||||
private final InetSocketAddress nnAddress;
|
private final InetSocketAddress nnAddress;
|
||||||
final UserGroupInformation ugi;
|
final UserGroupInformation ugi;
|
||||||
volatile boolean clientRunning = true;
|
volatile boolean clientRunning = true;
|
||||||
|
volatile long lastLeaseRenewal;
|
||||||
private volatile FsServerDefaults serverDefaults;
|
private volatile FsServerDefaults serverDefaults;
|
||||||
private volatile long serverDefaultsLastUpdate;
|
private volatile long serverDefaultsLastUpdate;
|
||||||
final String clientName;
|
final String clientName;
|
||||||
|
@ -351,6 +376,12 @@ public class DFSClient implements java.io.Closeable {
|
||||||
void putFileBeingWritten(final String src, final DFSOutputStream out) {
|
void putFileBeingWritten(final String src, final DFSOutputStream out) {
|
||||||
synchronized(filesBeingWritten) {
|
synchronized(filesBeingWritten) {
|
||||||
filesBeingWritten.put(src, out);
|
filesBeingWritten.put(src, out);
|
||||||
|
// update the last lease renewal time only when there was no
|
||||||
|
// writes. once there is one write stream open, the lease renewer
|
||||||
|
// thread keeps it updated well with in anyone's expiration time.
|
||||||
|
if (lastLeaseRenewal == 0) {
|
||||||
|
updateLastLeaseRenewal();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -358,6 +389,9 @@ public class DFSClient implements java.io.Closeable {
|
||||||
void removeFileBeingWritten(final String src) {
|
void removeFileBeingWritten(final String src) {
|
||||||
synchronized(filesBeingWritten) {
|
synchronized(filesBeingWritten) {
|
||||||
filesBeingWritten.remove(src);
|
filesBeingWritten.remove(src);
|
||||||
|
if (filesBeingWritten.isEmpty()) {
|
||||||
|
lastLeaseRenewal = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,6 +407,19 @@ public class DFSClient implements java.io.Closeable {
|
||||||
return clientRunning;
|
return clientRunning;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long getLastLeaseRenewal() {
|
||||||
|
return lastLeaseRenewal;
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateLastLeaseRenewal() {
|
||||||
|
synchronized(filesBeingWritten) {
|
||||||
|
if (filesBeingWritten.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
lastLeaseRenewal = System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Renew leases.
|
* Renew leases.
|
||||||
* @return true if lease was renewed. May return false if this
|
* @return true if lease was renewed. May return false if this
|
||||||
|
@ -380,8 +427,24 @@ public class DFSClient implements java.io.Closeable {
|
||||||
**/
|
**/
|
||||||
boolean renewLease() throws IOException {
|
boolean renewLease() throws IOException {
|
||||||
if (clientRunning && !isFilesBeingWrittenEmpty()) {
|
if (clientRunning && !isFilesBeingWrittenEmpty()) {
|
||||||
|
try {
|
||||||
namenode.renewLease(clientName);
|
namenode.renewLease(clientName);
|
||||||
|
updateLastLeaseRenewal();
|
||||||
return true;
|
return true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Abort if the lease has already expired.
|
||||||
|
final long elapsed = System.currentTimeMillis() - getLastLeaseRenewal();
|
||||||
|
if (elapsed > HdfsConstants.LEASE_SOFTLIMIT_PERIOD) {
|
||||||
|
LOG.warn("Failed to renew lease for " + clientName + " for "
|
||||||
|
+ (elapsed/1000) + " seconds (>= soft-limit ="
|
||||||
|
+ (HdfsConstants.LEASE_SOFTLIMIT_PERIOD/1000) + " seconds.) "
|
||||||
|
+ "Closing all files being written ...", e);
|
||||||
|
closeAllFilesBeingWritten(true);
|
||||||
|
} else {
|
||||||
|
// Let the lease renewer handle it and retry.
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -430,7 +430,8 @@ class LeaseRenewer {
|
||||||
for(long lastRenewed = System.currentTimeMillis();
|
for(long lastRenewed = System.currentTimeMillis();
|
||||||
clientsRunning() && !Thread.interrupted();
|
clientsRunning() && !Thread.interrupted();
|
||||||
Thread.sleep(getSleepPeriod())) {
|
Thread.sleep(getSleepPeriod())) {
|
||||||
if (System.currentTimeMillis() - lastRenewed >= getRenewalTime()) {
|
final long elapsed = System.currentTimeMillis() - lastRenewed;
|
||||||
|
if (elapsed >= getRenewalTime()) {
|
||||||
try {
|
try {
|
||||||
renew();
|
renew();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -440,7 +441,7 @@ class LeaseRenewer {
|
||||||
lastRenewed = System.currentTimeMillis();
|
lastRenewed = System.currentTimeMillis();
|
||||||
} catch (SocketTimeoutException ie) {
|
} catch (SocketTimeoutException ie) {
|
||||||
LOG.warn("Failed to renew lease for " + clientsString() + " for "
|
LOG.warn("Failed to renew lease for " + clientsString() + " for "
|
||||||
+ (getRenewalTime()/1000) + " seconds. Aborting ...", ie);
|
+ (elapsed/1000) + " seconds. Aborting ...", ie);
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
for(DFSClient c : dfsclients) {
|
for(DFSClient c : dfsclients) {
|
||||||
c.abort();
|
c.abort();
|
||||||
|
@ -449,8 +450,7 @@ class LeaseRenewer {
|
||||||
break;
|
break;
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
LOG.warn("Failed to renew lease for " + clientsString() + " for "
|
LOG.warn("Failed to renew lease for " + clientsString() + " for "
|
||||||
+ (getRenewalTime()/1000) + " seconds. Will retry shortly ...",
|
+ (elapsed/1000) + " seconds. Will retry shortly ...", ie);
|
||||||
ie);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,17 +18,32 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
|
||||||
public class TestLease {
|
public class TestLease {
|
||||||
static boolean hasLease(MiniDFSCluster cluster, Path src) {
|
static boolean hasLease(MiniDFSCluster cluster, Path src) {
|
||||||
|
@ -36,11 +51,80 @@ public class TestLease {
|
||||||
).getLeaseByPath(src.toString()) != null;
|
).getLeaseByPath(src.toString()) != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Path dir = new Path("/test/lease/");
|
static final String dirString = "/test/lease";
|
||||||
|
final Path dir = new Path(dirString);
|
||||||
|
static final Log LOG = LogFactory.getLog(TestLease.class);
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLeaseAbort() throws Exception {
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
NamenodeProtocols preSpyNN = cluster.getNameNodeRpc();
|
||||||
|
NamenodeProtocols spyNN = spy(preSpyNN);
|
||||||
|
|
||||||
|
DFSClient dfs = new DFSClient(null, spyNN, conf, null);
|
||||||
|
byte[] buf = new byte[1024];
|
||||||
|
|
||||||
|
FSDataOutputStream c_out = createFsOut(dfs, dirString + "c");
|
||||||
|
c_out.write(buf, 0, 1024);
|
||||||
|
c_out.close();
|
||||||
|
|
||||||
|
DFSInputStream c_in = dfs.open(dirString + "c");
|
||||||
|
FSDataOutputStream d_out = createFsOut(dfs, dirString + "d");
|
||||||
|
|
||||||
|
// stub the renew method.
|
||||||
|
doThrow(new RemoteException(InvalidToken.class.getName(),
|
||||||
|
"Your token is worthless")).when(spyNN).renewLease(anyString());
|
||||||
|
|
||||||
|
// We don't need to wait the lease renewer thread to act.
|
||||||
|
// call renewLease() manually.
|
||||||
|
// make it look like lease has already expired.
|
||||||
|
dfs.lastLeaseRenewal = System.currentTimeMillis() - 300000;
|
||||||
|
dfs.renewLease();
|
||||||
|
|
||||||
|
// this should not work.
|
||||||
|
try {
|
||||||
|
d_out.write(buf, 0, 1024);
|
||||||
|
d_out.close();
|
||||||
|
Assert.fail("Write did not fail even after the fatal lease renewal failure");
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("Write failed as expected. ", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// unstub
|
||||||
|
doNothing().when(spyNN).renewLease(anyString());
|
||||||
|
|
||||||
|
// existing input streams should work
|
||||||
|
try {
|
||||||
|
int num = c_in.read(buf, 0, 1);
|
||||||
|
if (num != 1) {
|
||||||
|
Assert.fail("Failed to read 1 byte");
|
||||||
|
}
|
||||||
|
c_in.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Read failed with ", e);
|
||||||
|
Assert.fail("Read after lease renewal failure failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
// new file writes should work.
|
||||||
|
try {
|
||||||
|
c_out = createFsOut(dfs, dirString + "c");
|
||||||
|
c_out.write(buf, 0, 1024);
|
||||||
|
c_out.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Write failed with ", e);
|
||||||
|
Assert.fail("Write failed");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLease() throws Exception {
|
public void testLease() throws Exception {
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
try {
|
try {
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
@ -94,6 +178,11 @@ public class TestLease {
|
||||||
Assert.assertTrue(c3.leaserenewer != c5.leaserenewer);
|
Assert.assertTrue(c3.leaserenewer != c5.leaserenewer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private FSDataOutputStream createFsOut(DFSClient dfs, String path)
|
||||||
|
throws IOException {
|
||||||
|
return new FSDataOutputStream(dfs.create(path, true), null);
|
||||||
|
}
|
||||||
|
|
||||||
static final ClientProtocol mcp = Mockito.mock(ClientProtocol.class);
|
static final ClientProtocol mcp = Mockito.mock(ClientProtocol.class);
|
||||||
static public DFSClient createDFSClientAs(UserGroupInformation ugi,
|
static public DFSClient createDFSClientAs(UserGroupInformation ugi,
|
||||||
final Configuration conf) throws Exception {
|
final Configuration conf) throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue