HBASE-8699 Parameter to DistributedFileSystem#isFileClosed should be of type Path (Ted Yu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1504676 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f4872d7ef8
commit
4e9a79a0ba
@ -95,9 +95,13 @@ public class FSHDFSUtils extends FSUtils {
|
|||||||
// is dead. We set it to 61 seconds, 1 second than the default READ_TIMEOUT in HDFS, the
|
// is dead. We set it to 61 seconds, 1 second than the default READ_TIMEOUT in HDFS, the
|
||||||
// default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY.
|
// default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY.
|
||||||
long subsequentPause = conf.getInt("hbase.lease.recovery.dfs.timeout", 61 * 1000);
|
long subsequentPause = conf.getInt("hbase.lease.recovery.dfs.timeout", 61 * 1000);
|
||||||
|
|
||||||
|
Method isFileClosedMeth = null;
|
||||||
|
// whether we need to look for isFileClosed method
|
||||||
|
boolean findIsFileClosedMeth = true;
|
||||||
boolean recovered = false;
|
boolean recovered = false;
|
||||||
// We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
|
// We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
|
||||||
for (int nbAttempt = 0; true; nbAttempt++) {
|
for (int nbAttempt = 0; !recovered; nbAttempt++) {
|
||||||
recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
|
recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
|
||||||
if (recovered) break;
|
if (recovered) break;
|
||||||
checkIfCancelled(reporter);
|
checkIfCancelled(reporter);
|
||||||
@ -113,7 +117,20 @@ public class FSHDFSUtils extends FSUtils {
|
|||||||
while ((EnvironmentEdgeManager.currentTimeMillis() - localStartWaiting) <
|
while ((EnvironmentEdgeManager.currentTimeMillis() - localStartWaiting) <
|
||||||
subsequentPause) {
|
subsequentPause) {
|
||||||
Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
|
Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
|
||||||
if (isFileClosed(dfs, p)) break;
|
if (findIsFileClosedMeth) {
|
||||||
|
try {
|
||||||
|
isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
|
||||||
|
new Class[]{ Path.class });
|
||||||
|
} catch (NoSuchMethodException nsme) {
|
||||||
|
LOG.debug("isFileClosed not available");
|
||||||
|
} finally {
|
||||||
|
findIsFileClosedMeth = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
|
||||||
|
recovered = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
checkIfCancelled(reporter);
|
checkIfCancelled(reporter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -181,21 +198,17 @@ public class FSHDFSUtils extends FSUtils {
|
|||||||
/**
|
/**
|
||||||
* Call HDFS-4525 isFileClosed if it is available.
|
* Call HDFS-4525 isFileClosed if it is available.
|
||||||
* @param dfs
|
* @param dfs
|
||||||
|
* @param m
|
||||||
* @param p
|
* @param p
|
||||||
* @return True if file is closed.
|
* @return True if file is closed.
|
||||||
*/
|
*/
|
||||||
boolean isFileClosed(final DistributedFileSystem dfs, final Path p) {
|
private boolean isFileClosed(final DistributedFileSystem dfs, final Method m, final Path p) {
|
||||||
try {
|
try {
|
||||||
Method m = dfs.getClass().getMethod("isFileClosed", new Class<?>[] {String.class});
|
return (Boolean) m.invoke(dfs, p);
|
||||||
return (Boolean) m.invoke(dfs, p.toString());
|
|
||||||
} catch (SecurityException e) {
|
} catch (SecurityException e) {
|
||||||
LOG.warn("No access", e);
|
LOG.warn("No access", e);
|
||||||
} catch (NoSuchMethodException e) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("noSuchMethod isFileClosed (HDFS-4525); making do without");
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Failed invocation", e);
|
LOG.warn("Failed invocation for " + p.toString(), e);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -88,10 +88,10 @@ public class TestFSHDFSUtils {
|
|||||||
// therefore return true if we are to break the loop.
|
// therefore return true if we are to break the loop.
|
||||||
Mockito.when(dfs.recoverLease(FILE)).
|
Mockito.when(dfs.recoverLease(FILE)).
|
||||||
thenReturn(false).thenReturn(false).thenReturn(true);
|
thenReturn(false).thenReturn(false).thenReturn(true);
|
||||||
Mockito.when(dfs.isFileClosed(FILE.toString())).thenReturn(true);
|
Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true);
|
||||||
assertTrue(this.fsHDFSUtils.recoverDFSFileLease(dfs, FILE, HTU.getConfiguration(), reporter));
|
assertTrue(this.fsHDFSUtils.recoverDFSFileLease(dfs, FILE, HTU.getConfiguration(), reporter));
|
||||||
Mockito.verify(dfs, Mockito.times(3)).recoverLease(FILE);
|
Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE);
|
||||||
Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE.toString());
|
Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -102,7 +102,7 @@ public class TestFSHDFSUtils {
|
|||||||
* Close status of a file. Copied over from HDFS-4525
|
* Close status of a file. Copied over from HDFS-4525
|
||||||
* @return true if file is already closed
|
* @return true if file is already closed
|
||||||
**/
|
**/
|
||||||
public boolean isFileClosed(String f) throws IOException{
|
public boolean isFileClosed(Path f) throws IOException{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user