MAPREDUCE-7329: HadoopPipes task may fail when linux kernel version change from 3.x to 4.x (#2775)

This commit is contained in:
lichaojacobs 2021-04-09 10:58:53 +08:00 committed by GitHub
parent 56bd968fb4
commit 663ca14a76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 135 additions and 0 deletions

View File

@ -30,12 +30,14 @@ import java.util.Random;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparable;
@ -52,6 +54,7 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -66,6 +69,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(Application.class.getName()); LoggerFactory.getLogger(Application.class.getName());
private ServerSocket serverSocket; private ServerSocket serverSocket;
private PingSocketCleaner socketCleaner;
private Process process; private Process process;
private Socket clientSocket; private Socket clientSocket;
private OutputHandler<K2, V2> handler; private OutputHandler<K2, V2> handler;
@ -133,6 +137,13 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
process = runClient(cmd, env); process = runClient(cmd, env);
clientSocket = serverSocket.accept(); clientSocket = serverSocket.accept();
// start ping socket cleaner
int soTimeout = conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
socketCleaner = new PingSocketCleaner("ping-socket-cleaner", serverSocket,
soTimeout);
socketCleaner.setDaemon(true);
socketCleaner.start();
String challenge = getSecurityChallenge(); String challenge = getSecurityChallenge();
String digestToSend = createDigest(password, challenge); String digestToSend = createDigest(password, challenge);
@ -237,6 +248,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
serverSocket.close(); serverSocket.close();
try { try {
downlink.close(); downlink.close();
socketCleaner.interrupt();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
@ -266,4 +278,44 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
return SecureShuffleUtils.hashFromString(data, key); return SecureShuffleUtils.hashFromString(data, key);
} }
@VisibleForTesting
public static class PingSocketCleaner extends Thread {
private final ServerSocket serverSocket;
private final int soTimeout;
PingSocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
super(name);
this.serverSocket = serverSocket;
this.soTimeout = soTimeout;
}
@Override
public void run() {
LOG.info("PingSocketCleaner started...");
while (!Thread.currentThread().isInterrupted()) {
Socket clientSocket = null;
try {
clientSocket = serverSocket.accept();
clientSocket.setSoTimeout(soTimeout);
LOG.debug("Connection received from {}",
clientSocket.getInetAddress());
int readData = 0;
while (readData != -1) {
readData = clientSocket.getInputStream().read();
}
LOG.debug("close socket cause client has closed.");
closeSocketInternal(clientSocket);
} catch (IOException exception) {
LOG.error("PingSocketCleaner exception", exception);
} finally {
closeSocketInternal(clientSocket);
}
}
}
@VisibleForTesting
protected void closeSocketInternal(Socket clientSocket) {
IOUtils.closeSocket(clientSocket);
}
}
} }

View File

@ -28,12 +28,15 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsConstants; import org.apache.hadoop.fs.FsConstants;
@ -59,7 +62,9 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.pipes.Application.PingSocketCleaner;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@ -455,6 +460,84 @@ public class TestPipeApplication {
assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2)); assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2));
} }
@Test
public void testSocketCleaner() throws Exception {
ServerSocket serverSocket = setupServerSocket();
SocketCleaner cleaner = setupCleaner(serverSocket);
// mock ping thread, connect to server socket per second.
int expectedClosedCount = 5;
for (int i = 0; i < expectedClosedCount; i++) {
try {
Thread.sleep(1000);
Socket clientSocket = new Socket(serverSocket.getInetAddress(),
serverSocket.getLocalPort());
clientSocket.close();
} catch (Exception exception) {
// ignored...
exception.printStackTrace();
}
}
GenericTestUtils.waitFor(
() -> expectedClosedCount == cleaner.getCloseSocketCount(), 100, 5000);
}
@Test
public void testSocketTimeout() throws Exception {
ServerSocket serverSocket = setupServerSocket();
SocketCleaner cleaner = setupCleaner(serverSocket, 100);
try {
new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
Thread.sleep(1000);
} catch (Exception exception) {
// ignored...
}
GenericTestUtils.waitFor(() -> 1 == cleaner.getCloseSocketCount(), 100,
5000);
}
private SocketCleaner setupCleaner(ServerSocket serverSocket) {
return setupCleaner(serverSocket,
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
}
private SocketCleaner setupCleaner(ServerSocket serverSocket, int soTimeout) {
// start socket cleaner.
SocketCleaner cleaner = new SocketCleaner("test-ping-socket-cleaner",
serverSocket, soTimeout);
cleaner.setDaemon(true);
cleaner.start();
return cleaner;
}
private static class SocketCleaner extends PingSocketCleaner {
private int closeSocketCount = 0;
SocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
super(name, serverSocket, soTimeout);
}
@Override
public void run() {
super.run();
}
protected void closeSocketInternal(Socket clientSocket) {
if (!clientSocket.isClosed()) {
closeSocketCount++;
}
super.closeSocketInternal(clientSocket);
}
public int getCloseSocketCount() {
return closeSocketCount;
}
}
private ServerSocket setupServerSocket() throws Exception {
return new ServerSocket(0, 1);
}
/** /**
* clean previous std error and outs * clean previous std error and outs
*/ */