MAPREDUCE-7329: HadoopPipes task may fail when linux kernel version change from 3.x to 4.x (#2775)
(cherry picked from commit 663ca14a76
)
This commit is contained in:
parent
4994b73eeb
commit
068f114066
|
@ -30,12 +30,14 @@ import java.util.Random;
|
||||||
|
|
||||||
import javax.crypto.SecretKey;
|
import javax.crypto.SecretKey;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.FloatWritable;
|
import org.apache.hadoop.io.FloatWritable;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.NullWritable;
|
import org.apache.hadoop.io.NullWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.WritableComparable;
|
import org.apache.hadoop.io.WritableComparable;
|
||||||
|
@ -52,6 +54,7 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -66,6 +69,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(Application.class.getName());
|
LoggerFactory.getLogger(Application.class.getName());
|
||||||
private ServerSocket serverSocket;
|
private ServerSocket serverSocket;
|
||||||
|
private PingSocketCleaner socketCleaner;
|
||||||
private Process process;
|
private Process process;
|
||||||
private Socket clientSocket;
|
private Socket clientSocket;
|
||||||
private OutputHandler<K2, V2> handler;
|
private OutputHandler<K2, V2> handler;
|
||||||
|
@ -133,6 +137,13 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
|
||||||
|
|
||||||
process = runClient(cmd, env);
|
process = runClient(cmd, env);
|
||||||
clientSocket = serverSocket.accept();
|
clientSocket = serverSocket.accept();
|
||||||
|
// start ping socket cleaner
|
||||||
|
int soTimeout = conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
|
||||||
|
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
|
||||||
|
socketCleaner = new PingSocketCleaner("ping-socket-cleaner", serverSocket,
|
||||||
|
soTimeout);
|
||||||
|
socketCleaner.setDaemon(true);
|
||||||
|
socketCleaner.start();
|
||||||
|
|
||||||
String challenge = getSecurityChallenge();
|
String challenge = getSecurityChallenge();
|
||||||
String digestToSend = createDigest(password, challenge);
|
String digestToSend = createDigest(password, challenge);
|
||||||
|
@ -237,6 +248,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
|
||||||
serverSocket.close();
|
serverSocket.close();
|
||||||
try {
|
try {
|
||||||
downlink.close();
|
downlink.close();
|
||||||
|
socketCleaner.interrupt();
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
|
@ -266,4 +278,44 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
|
||||||
return SecureShuffleUtils.hashFromString(data, key);
|
return SecureShuffleUtils.hashFromString(data, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static class PingSocketCleaner extends Thread {
|
||||||
|
private final ServerSocket serverSocket;
|
||||||
|
private final int soTimeout;
|
||||||
|
|
||||||
|
PingSocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
|
||||||
|
super(name);
|
||||||
|
this.serverSocket = serverSocket;
|
||||||
|
this.soTimeout = soTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
LOG.info("PingSocketCleaner started...");
|
||||||
|
while (!Thread.currentThread().isInterrupted()) {
|
||||||
|
Socket clientSocket = null;
|
||||||
|
try {
|
||||||
|
clientSocket = serverSocket.accept();
|
||||||
|
clientSocket.setSoTimeout(soTimeout);
|
||||||
|
LOG.debug("Connection received from {}",
|
||||||
|
clientSocket.getInetAddress());
|
||||||
|
int readData = 0;
|
||||||
|
while (readData != -1) {
|
||||||
|
readData = clientSocket.getInputStream().read();
|
||||||
|
}
|
||||||
|
LOG.debug("close socket cause client has closed.");
|
||||||
|
closeSocketInternal(clientSocket);
|
||||||
|
} catch (IOException exception) {
|
||||||
|
LOG.error("PingSocketCleaner exception", exception);
|
||||||
|
} finally {
|
||||||
|
closeSocketInternal(clientSocket);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected void closeSocketInternal(Socket clientSocket) {
|
||||||
|
IOUtils.closeSocket(clientSocket);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,12 +28,15 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
|
import java.net.ServerSocket;
|
||||||
|
import java.net.Socket;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.FsConstants;
|
import org.apache.hadoop.fs.FsConstants;
|
||||||
|
@ -59,7 +62,9 @@ import org.apache.hadoop.mapred.RecordReader;
|
||||||
import org.apache.hadoop.mapred.Reporter;
|
import org.apache.hadoop.mapred.Reporter;
|
||||||
import org.apache.hadoop.mapred.TaskAttemptID;
|
import org.apache.hadoop.mapred.TaskAttemptID;
|
||||||
import org.apache.hadoop.mapred.TaskLog;
|
import org.apache.hadoop.mapred.TaskLog;
|
||||||
|
import org.apache.hadoop.mapred.pipes.Application.PingSocketCleaner;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.ExitUtil;
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
|
@ -455,6 +460,84 @@ public class TestPipeApplication {
|
||||||
assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2));
|
assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSocketCleaner() throws Exception {
|
||||||
|
ServerSocket serverSocket = setupServerSocket();
|
||||||
|
SocketCleaner cleaner = setupCleaner(serverSocket);
|
||||||
|
// mock ping thread, connect to server socket per second.
|
||||||
|
int expectedClosedCount = 5;
|
||||||
|
for (int i = 0; i < expectedClosedCount; i++) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
Socket clientSocket = new Socket(serverSocket.getInetAddress(),
|
||||||
|
serverSocket.getLocalPort());
|
||||||
|
clientSocket.close();
|
||||||
|
} catch (Exception exception) {
|
||||||
|
// ignored...
|
||||||
|
exception.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> expectedClosedCount == cleaner.getCloseSocketCount(), 100, 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSocketTimeout() throws Exception {
|
||||||
|
ServerSocket serverSocket = setupServerSocket();
|
||||||
|
SocketCleaner cleaner = setupCleaner(serverSocket, 100);
|
||||||
|
try {
|
||||||
|
new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (Exception exception) {
|
||||||
|
// ignored...
|
||||||
|
}
|
||||||
|
GenericTestUtils.waitFor(() -> 1 == cleaner.getCloseSocketCount(), 100,
|
||||||
|
5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
private SocketCleaner setupCleaner(ServerSocket serverSocket) {
|
||||||
|
return setupCleaner(serverSocket,
|
||||||
|
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
private SocketCleaner setupCleaner(ServerSocket serverSocket, int soTimeout) {
|
||||||
|
// start socket cleaner.
|
||||||
|
SocketCleaner cleaner = new SocketCleaner("test-ping-socket-cleaner",
|
||||||
|
serverSocket, soTimeout);
|
||||||
|
cleaner.setDaemon(true);
|
||||||
|
cleaner.start();
|
||||||
|
|
||||||
|
return cleaner;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class SocketCleaner extends PingSocketCleaner {
|
||||||
|
private int closeSocketCount = 0;
|
||||||
|
|
||||||
|
SocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
|
||||||
|
super(name, serverSocket, soTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
super.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void closeSocketInternal(Socket clientSocket) {
|
||||||
|
if (!clientSocket.isClosed()) {
|
||||||
|
closeSocketCount++;
|
||||||
|
}
|
||||||
|
super.closeSocketInternal(clientSocket);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCloseSocketCount() {
|
||||||
|
return closeSocketCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ServerSocket setupServerSocket() throws Exception {
|
||||||
|
return new ServerSocket(0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* clean previous std error and outs
|
* clean previous std error and outs
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue