HDFS-14497. Write lock held by metasave impact following RPC processing. Contributed by He Xiaoqiao.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org> (cherry picked from commit33c62f8f4e
) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (cherry picked from commit80392e94b6
)
This commit is contained in:
parent
9fdb849e03
commit
d91c68729c
|
@ -726,7 +726,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
|
||||
/** Dump meta data to out. */
|
||||
public void metaSave(PrintWriter out) {
|
||||
assert namesystem.hasWriteLock(); // TODO: block manager read lock and NS write lock
|
||||
assert namesystem.hasReadLock(); // TODO: block manager read lock and NS write lock
|
||||
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
||||
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
|
||||
datanodeManager.fetchDatanodes(live, dead, false);
|
||||
|
|
|
@ -122,6 +122,7 @@ import java.lang.management.ManagementFactory;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -587,6 +588,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
private boolean resourceLowSafeMode = false;
|
||||
private String nameNodeHostName = null;
|
||||
|
||||
/**
|
||||
* HDFS-14497: Concurrency control when many metaSave request to write
|
||||
* meta to same out stream after switch to read lock.
|
||||
*/
|
||||
private Object metaSaveLock = new Object();
|
||||
|
||||
/**
|
||||
* Notify that loading of this FSDirectory is complete, and
|
||||
* it is imageLoaded for use
|
||||
|
@ -1757,23 +1764,26 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
String operationName = "metaSave";
|
||||
checkSuperuserPrivilege(operationName);
|
||||
checkOperation(OperationCategory.READ);
|
||||
writeLock();
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.READ);
|
||||
File file = new File(System.getProperty("hadoop.log.dir"), filename);
|
||||
PrintWriter out = new PrintWriter(new BufferedWriter(
|
||||
new OutputStreamWriter(new FileOutputStream(file), Charsets.UTF_8)));
|
||||
metaSave(out);
|
||||
out.flush();
|
||||
out.close();
|
||||
synchronized(metaSaveLock) {
|
||||
File file = new File(System.getProperty("hadoop.log.dir"), filename);
|
||||
PrintWriter out = new PrintWriter(new BufferedWriter(
|
||||
new OutputStreamWriter(Files.newOutputStream(file.toPath()),
|
||||
Charsets.UTF_8)));
|
||||
metaSave(out);
|
||||
out.flush();
|
||||
out.close();
|
||||
}
|
||||
} finally {
|
||||
writeUnlock(operationName);
|
||||
readUnlock(operationName);
|
||||
}
|
||||
logAuditEvent(true, operationName, null);
|
||||
}
|
||||
|
||||
private void metaSave(PrintWriter out) {
|
||||
assert hasWriteLock();
|
||||
assert hasReadLock();
|
||||
long totalInodes = this.dir.totalInodes();
|
||||
long totalBlocks = this.getBlocksTotal();
|
||||
out.println(totalInodes + " files and directories, " + totalBlocks
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.io.File;
|
|||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
@ -215,6 +216,65 @@ public class TestMetaSave {
|
|||
}
|
||||
}
|
||||
|
||||
class MetaSaveThread extends Thread {
|
||||
NamenodeProtocols nnRpc;
|
||||
String filename;
|
||||
public MetaSaveThread(NamenodeProtocols nnRpc, String filename) {
|
||||
this.nnRpc = nnRpc;
|
||||
this.filename = filename;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
nnRpc.metaSave(filename);
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that metasave concurrent output file (not append).
|
||||
*/
|
||||
@Test
|
||||
public void testConcurrentMetaSave() throws Exception {
|
||||
ArrayList<MetaSaveThread> threads = new ArrayList<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
threads.add(new MetaSaveThread(nnRpc, "metaSaveConcurrent.out.txt"));
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
threads.get(i).start();
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
threads.get(i).join();
|
||||
}
|
||||
// Read output file.
|
||||
FileInputStream fis = null;
|
||||
InputStreamReader isr = null;
|
||||
BufferedReader rdr = null;
|
||||
try {
|
||||
fis = new FileInputStream(getLogFile("metaSaveConcurrent.out.txt"));
|
||||
isr = new InputStreamReader(fis);
|
||||
rdr = new BufferedReader(isr);
|
||||
|
||||
// Validate that file was overwritten (not appended) by checking for
|
||||
// presence of only one "Live Datanodes" line.
|
||||
boolean foundLiveDatanodesLine = false;
|
||||
String line = rdr.readLine();
|
||||
while (line != null) {
|
||||
if (line.startsWith("Live Datanodes")) {
|
||||
if (foundLiveDatanodesLine) {
|
||||
fail("multiple Live Datanodes lines, output file not overwritten");
|
||||
}
|
||||
foundLiveDatanodesLine = true;
|
||||
}
|
||||
line = rdr.readLine();
|
||||
}
|
||||
} finally {
|
||||
IOUtils.cleanup(null, rdr, isr, fis);
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws IOException {
|
||||
if (fileSys != null)
|
||||
|
|
Loading…
Reference in New Issue