HDFS-8086. Move LeaseRenewer to the hdfs.client.impl package. Contributed by Takanobu

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-05-01 15:11:09 -07:00
parent 8b6fc83ef4
commit 72751b957c
6 changed files with 91 additions and 71 deletions

View File

@ -167,6 +167,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8292. Move conditional in fmt_time from dfs-dust.js to status.html. HDFS-8292. Move conditional in fmt_time from dfs-dust.js to status.html.
(Charles Lamb via wang) (Charles Lamb via wang)
HDFS-8086. Move LeaseRenewer to the hdfs.client.impl package. (Takanobu
Asanuma via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -102,6 +102,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AclException;
@ -483,7 +484,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* enforced to consistently update its local dfsclients array and * enforced to consistently update its local dfsclients array and
* client's filesBeingWritten map. * client's filesBeingWritten map.
*/ */
void putFileBeingWritten(final long inodeId, final DFSOutputStream out) { public void putFileBeingWritten(final long inodeId, final DFSOutputStream out) {
synchronized(filesBeingWritten) { synchronized(filesBeingWritten) {
filesBeingWritten.put(inodeId, out); filesBeingWritten.put(inodeId, out);
// update the last lease renewal time only when there was no // update the last lease renewal time only when there was no
@ -496,7 +497,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
/** Remove a file. Only called from LeaseRenewer. */ /** Remove a file. Only called from LeaseRenewer. */
void removeFileBeingWritten(final long inodeId) { public void removeFileBeingWritten(final long inodeId) {
synchronized(filesBeingWritten) { synchronized(filesBeingWritten) {
filesBeingWritten.remove(inodeId); filesBeingWritten.remove(inodeId);
if (filesBeingWritten.isEmpty()) { if (filesBeingWritten.isEmpty()) {
@ -506,14 +507,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
/** Is file-being-written map empty? */ /** Is file-being-written map empty? */
boolean isFilesBeingWrittenEmpty() { public boolean isFilesBeingWrittenEmpty() {
synchronized(filesBeingWritten) { synchronized(filesBeingWritten) {
return filesBeingWritten.isEmpty(); return filesBeingWritten.isEmpty();
} }
} }
/** @return true if the client is running */ /** @return true if the client is running */
boolean isClientRunning() { public boolean isClientRunning() {
return clientRunning; return clientRunning;
} }
@ -535,7 +536,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @return true if lease was renewed. May return false if this * @return true if lease was renewed. May return false if this
* client has been closed or has no files open. * client has been closed or has no files open.
**/ **/
boolean renewLease() throws IOException { public boolean renewLease() throws IOException {
if (clientRunning && !isFilesBeingWrittenEmpty()) { if (clientRunning && !isFilesBeingWrittenEmpty()) {
try { try {
namenode.renewLease(clientName); namenode.renewLease(clientName);
@ -567,7 +568,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
} }
/** Abort and release resources held. Ignore all errors. */ /** Abort and release resources held. Ignore all errors. */
void abort() { public void abort() {
clientRunning = false; clientRunning = false;
closeAllFilesBeingWritten(true); closeAllFilesBeingWritten(true);
try { try {

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs.client.impl;
import java.io.IOException; import java.io.IOException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -31,6 +31,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -40,7 +42,7 @@ import com.google.common.annotations.VisibleForTesting;
/** /**
* <p> * <p>
* Used by {@link DFSClient} for renewing file-being-written leases * Used by {@link org.apache.hadoop.hdfs.DFSClient} for renewing file-being-written leases
* on the namenode. * on the namenode.
* When a file is opened for write (create or append), * When a file is opened for write (create or append),
* namenode stores a file lease for recording the identity of the writer. * namenode stores a file lease for recording the identity of the writer.
@ -55,10 +57,10 @@ import com.google.common.annotations.VisibleForTesting;
* <li> * <li>
* It maintains a map from (namenode, user) pairs to lease renewers. * It maintains a map from (namenode, user) pairs to lease renewers.
* The same {@link LeaseRenewer} instance is used for renewing lease * The same {@link LeaseRenewer} instance is used for renewing lease
* for all the {@link DFSClient} to the same namenode and the same user. * for all the {@link org.apache.hadoop.hdfs.DFSClient} to the same namenode and the same user.
* </li> * </li>
* <li> * <li>
* Each renewer maintains a list of {@link DFSClient}. * Each renewer maintains a list of {@link org.apache.hadoop.hdfs.DFSClient}.
* Periodically the leases for all the clients are renewed. * Periodically the leases for all the clients are renewed.
* A client is removed from the list when the client is closed. * A client is removed from the list when the client is closed.
* </li> * </li>
@ -70,14 +72,14 @@ import com.google.common.annotations.VisibleForTesting;
* </p> * </p>
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class LeaseRenewer { public class LeaseRenewer {
static final Log LOG = LogFactory.getLog(LeaseRenewer.class); static final Log LOG = LogFactory.getLog(LeaseRenewer.class);
static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L; static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
/** Get a {@link LeaseRenewer} instance */ /** Get a {@link LeaseRenewer} instance */
static LeaseRenewer getInstance(final String authority, public static LeaseRenewer getInstance(final String authority,
final UserGroupInformation ugi, final DFSClient dfsc) throws IOException { final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi); final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
r.addClient(dfsc); r.addClient(dfsc);
@ -284,7 +286,7 @@ class LeaseRenewer {
&& Time.monotonicNow() - emptyTime > gracePeriod; && Time.monotonicNow() - emptyTime > gracePeriod;
} }
synchronized void put(final long inodeId, final DFSOutputStream out, public synchronized void put(final long inodeId, final DFSOutputStream out,
final DFSClient dfsc) { final DFSClient dfsc) {
if (dfsc.isClientRunning()) { if (dfsc.isClientRunning()) {
if (!isRunning() || isRenewerExpired()) { if (!isRunning() || isRenewerExpired()) {
@ -333,7 +335,7 @@ class LeaseRenewer {
} }
/** Close a file. */ /** Close a file. */
void closeFile(final long inodeId, final DFSClient dfsc) { public void closeFile(final long inodeId, final DFSClient dfsc) {
dfsc.removeFileBeingWritten(inodeId); dfsc.removeFileBeingWritten(inodeId);
synchronized(this) { synchronized(this) {
@ -355,7 +357,7 @@ class LeaseRenewer {
} }
/** Close the given client. */ /** Close the given client. */
synchronized void closeClient(final DFSClient dfsc) { public synchronized void closeClient(final DFSClient dfsc) {
dfsclients.remove(dfsc); dfsclients.remove(dfsc);
if (dfsclients.isEmpty()) { if (dfsclients.isEmpty()) {
if (!isRunning() || isRenewerExpired()) { if (!isRunning() || isRenewerExpired()) {
@ -381,7 +383,7 @@ class LeaseRenewer {
} }
} }
void interruptAndJoin() throws InterruptedException { public void interruptAndJoin() throws InterruptedException {
Daemon daemonCopy = null; Daemon daemonCopy = null;
synchronized (this) { synchronized (this) {
if (isRunning()) { if (isRunning()) {

View File

@ -31,6 +31,7 @@ import static org.mockito.Mockito.mock;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -65,6 +66,7 @@ import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
@ -259,78 +261,84 @@ public class TestDistributedFileSystem {
{ {
final DistributedFileSystem dfs = cluster.getFileSystem(); final DistributedFileSystem dfs = cluster.getFileSystem();
dfs.dfs.getLeaseRenewer().setGraceSleepPeriod(grace); Method setMethod = dfs.dfs.getLeaseRenewer().getClass()
assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); .getDeclaredMethod("setGraceSleepPeriod", long.class);
setMethod.setAccessible(true);
setMethod.invoke(dfs.dfs.getLeaseRenewer(), grace);
Method checkMethod = dfs.dfs.getLeaseRenewer().getClass()
.getDeclaredMethod("isRunning");
checkMethod.setAccessible(true);
assertFalse((boolean) checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
{ {
//create a file //create a file
final FSDataOutputStream out = dfs.create(filepaths[0]); final FSDataOutputStream out = dfs.create(filepaths[0]);
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//write something //write something
out.writeLong(millis); out.writeLong(millis);
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//close //close
out.close(); out.close();
Thread.sleep(grace/4*3); Thread.sleep(grace/4*3);
//within grace period //within grace period
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
for(int i = 0; i < 3; i++) { for(int i = 0; i < 3; i++) {
if (dfs.dfs.getLeaseRenewer().isRunning()) { if ((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())) {
Thread.sleep(grace/2); Thread.sleep(grace/2);
} }
} }
//passed grace period //passed grace period
assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
} }
{ {
//create file1 //create file1
final FSDataOutputStream out1 = dfs.create(filepaths[1]); final FSDataOutputStream out1 = dfs.create(filepaths[1]);
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//create file2 //create file2
final FSDataOutputStream out2 = dfs.create(filepaths[2]); final FSDataOutputStream out2 = dfs.create(filepaths[2]);
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//write something to file1 //write something to file1
out1.writeLong(millis); out1.writeLong(millis);
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//close file1 //close file1
out1.close(); out1.close();
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//write something to file2 //write something to file2
out2.writeLong(millis); out2.writeLong(millis);
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//close file2 //close file2
out2.close(); out2.close();
Thread.sleep(grace/4*3); Thread.sleep(grace/4*3);
//within grace period //within grace period
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
} }
{ {
//create file3 //create file3
final FSDataOutputStream out3 = dfs.create(filepaths[3]); final FSDataOutputStream out3 = dfs.create(filepaths[3]);
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
Thread.sleep(grace/4*3); Thread.sleep(grace/4*3);
//passed previous grace period, should still running //passed previous grace period, should still running
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//write something to file3 //write something to file3
out3.writeLong(millis); out3.writeLong(millis);
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//close file3 //close file3
out3.close(); out3.close();
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
Thread.sleep(grace/4*3); Thread.sleep(grace/4*3);
//within grace period //within grace period
assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); assertTrue((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
for(int i = 0; i < 3; i++) { for(int i = 0; i < 3; i++) {
if (dfs.dfs.getLeaseRenewer().isRunning()) { if ((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer())) {
Thread.sleep(grace/2); Thread.sleep(grace/2);
} }
} }
//passed grace period //passed grace period
assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
} }
dfs.close(); dfs.close();
@ -359,15 +367,18 @@ public class TestDistributedFileSystem {
{ {
final DistributedFileSystem dfs = cluster.getFileSystem(); final DistributedFileSystem dfs = cluster.getFileSystem();
assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); Method checkMethod = dfs.dfs.getLeaseRenewer().getClass()
.getDeclaredMethod("isRunning");
checkMethod.setAccessible(true);
assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
//open and check the file //open and check the file
FSDataInputStream in = dfs.open(filepaths[0]); FSDataInputStream in = dfs.open(filepaths[0]);
assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
assertEquals(millis, in.readLong()); assertEquals(millis, in.readLong());
assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
in.close(); in.close();
assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); assertFalse((boolean)checkMethod.invoke(dfs.dfs.getLeaseRenewer()));
dfs.close(); dfs.close();
} }

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;

View File

@ -15,13 +15,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs.client.impl;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;