MAPREDUCE-7434. Fix ShuffleHandler tests. Contributed by Tamas Domok

This commit is contained in:
Szilard Nemeth 2023-03-01 16:10:05 +01:00
parent 28d2753d2f
commit 8f6be3678d
3 changed files with 47 additions and 28 deletions

View File

@ -225,7 +225,7 @@ public class TestShuffleChannelHandler extends TestShuffleHandlerBase {
final ShuffleTest t = createShuffleTest(); final ShuffleTest t = createShuffleTest();
final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion(); final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
String dataFile = getDataFile(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2); String dataFile = getDataFile(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2);
assertTrue("should delete", new File(dataFile).delete()); assertTrue("should delete", new File(dataFile).delete());
FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0, FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,

View File

@ -29,6 +29,7 @@ import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -41,6 +42,7 @@ import java.io.DataInputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.MalformedURLException; import java.net.MalformedURLException;
@ -159,7 +161,7 @@ public class TestShuffleHandler extends TestShuffleHandlerBase {
shuffleHandler.init(conf); shuffleHandler.init(conf);
shuffleHandler.start(); shuffleHandler.start();
final String port = shuffleHandler.getConfig().get(SHUFFLE_PORT_CONFIG_KEY); final String port = shuffleHandler.getConfig().get(SHUFFLE_PORT_CONFIG_KEY);
final SecretKey secretKey = shuffleHandler.addTestApp(); final SecretKey secretKey = shuffleHandler.addTestApp(TEST_USER);
// setup connections // setup connections
HttpURLConnection[] conns = new HttpURLConnection[connAttempts]; HttpURLConnection[] conns = new HttpURLConnection[connAttempts];
@ -237,7 +239,7 @@ public class TestShuffleHandler extends TestShuffleHandlerBase {
shuffleHandler.init(conf); shuffleHandler.init(conf);
shuffleHandler.start(); shuffleHandler.start();
final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
final SecretKey secretKey = shuffleHandler.addTestApp(); final SecretKey secretKey = shuffleHandler.addTestApp(TEST_USER);
HttpURLConnection conn1 = createRequest( HttpURLConnection conn1 = createRequest(
geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true), geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true),
@ -278,18 +280,34 @@ public class TestShuffleHandler extends TestShuffleHandlerBase {
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
final String randomUser = "randomUser";
final String attempt = "attempt_1111111111111_0004_m_000004_0";
generateMapOutput(randomUser, tempDir.toAbsolutePath().toString(), attempt,
Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A));
ShuffleHandlerMock shuffleHandler = new ShuffleHandlerMock(); ShuffleHandlerMock shuffleHandler = new ShuffleHandlerMock();
shuffleHandler.init(conf); shuffleHandler.init(conf);
try { try {
shuffleHandler.start(); shuffleHandler.start();
final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
final SecretKey secretKey = shuffleHandler.addTestApp(); final SecretKey secretKey = shuffleHandler.addTestApp(randomUser);
HttpURLConnection conn = createRequest( HttpURLConnection conn = createRequest(
geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), false), geURL(port, TEST_JOB_ID, 0, Collections.singletonList(attempt), false),
secretKey); secretKey);
conn.connect(); conn.connect();
BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
InputStream is = null;
try {
is = conn.getInputStream();
} catch (IOException ioe) {
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
is = conn.getErrorStream();
}
}
assertNotNull(is);
BufferedReader in = new BufferedReader(new InputStreamReader(is));
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
String inputLine; String inputLine;
while ((inputLine = in.readLine()) != null) { while ((inputLine = in.readLine()) != null) {
@ -299,7 +317,7 @@ public class TestShuffleHandler extends TestShuffleHandlerBase {
String receivedString = builder.toString(); String receivedString = builder.toString();
//Retrieve file owner name //Retrieve file owner name
String indexFilePath = getIndexFile(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1); String indexFilePath = getIndexFile(randomUser, tempDir.toAbsolutePath().toString(), attempt);
String owner; String owner;
try (FileInputStream fis = new FileInputStream(indexFilePath)) { try (FileInputStream fis = new FileInputStream(indexFilePath)) {
owner = NativeIO.POSIX.getFstat(fis.getFD()).getOwner(); owner = NativeIO.POSIX.getFstat(fis.getFD()).getOwner();
@ -307,11 +325,11 @@ public class TestShuffleHandler extends TestShuffleHandlerBase {
String message = String message =
"Owner '" + owner + "' for path " + indexFilePath "Owner '" + owner + "' for path " + indexFilePath
+ " did not match expected owner '" + TEST_USER + "'"; + " did not match expected owner '" + randomUser + "'";
assertTrue(String.format("Received string '%s' should contain " + assertTrue(String.format("Received string '%s' should contain " +
"message '%s'", receivedString, message), "message '%s'", receivedString, message),
receivedString.contains(message)); receivedString.contains(message));
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, conn.getResponseCode());
LOG.info("received: " + receivedString); LOG.info("received: " + receivedString);
assertNotEquals("", receivedString); assertNotEquals("", receivedString);
} finally { } finally {
@ -334,7 +352,7 @@ public class TestShuffleHandler extends TestShuffleHandlerBase {
shuffle.init(conf); shuffle.init(conf);
shuffle.start(); shuffle.start();
final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
final SecretKey secretKey = shuffle.addTestApp(); final SecretKey secretKey = shuffle.addTestApp(TEST_USER);
// verify we are authorized to shuffle // verify we are authorized to shuffle
int rc = getShuffleResponseCode(port, secretKey); int rc = getShuffleResponseCode(port, secretKey);
@ -387,7 +405,7 @@ public class TestShuffleHandler extends TestShuffleHandlerBase {
shuffle.init(conf); shuffle.init(conf);
shuffle.start(); shuffle.start();
final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
final SecretKey secretKey = shuffle.addTestApp(); final SecretKey secretKey = shuffle.addTestApp(TEST_USER);
// verify we are authorized to shuffle // verify we are authorized to shuffle
int rc = getShuffleResponseCode(port, secretKey); int rc = getShuffleResponseCode(port, secretKey);
@ -489,14 +507,14 @@ public class TestShuffleHandler extends TestShuffleHandlerBase {
class ShuffleHandlerMock extends ShuffleHandler { class ShuffleHandlerMock extends ShuffleHandler {
public SecretKey addTestApp() throws IOException { public SecretKey addTestApp(String user) throws IOException {
DataOutputBuffer outputBuffer = new DataOutputBuffer(); DataOutputBuffer outputBuffer = new DataOutputBuffer();
outputBuffer.reset(); outputBuffer.reset();
Token<JobTokenIdentifier> jt = new Token<>( Token<JobTokenIdentifier> jt = new Token<>(
"identifier".getBytes(), "password".getBytes(), new Text(TEST_USER), "identifier".getBytes(), "password".getBytes(), new Text(user),
new Text("shuffleService")); new Text("shuffleService"));
jt.write(outputBuffer); jt.write(outputBuffer);
initializeApplication(new ApplicationInitializationContext(TEST_USER, TEST_APP_ID, initializeApplication(new ApplicationInitializationContext(user, TEST_APP_ID,
ByteBuffer.wrap(outputBuffer.getData(), 0, ByteBuffer.wrap(outputBuffer.getData(), 0,
outputBuffer.getLength()))); outputBuffer.getLength())));

View File

@ -55,7 +55,7 @@ public class TestShuffleHandlerBase {
public static final String TEST_ATTEMPT_2 = "attempt_1111111111111_0002_m_000002_0"; public static final String TEST_ATTEMPT_2 = "attempt_1111111111111_0002_m_000002_0";
public static final String TEST_ATTEMPT_3 = "attempt_1111111111111_0003_m_000003_0"; public static final String TEST_ATTEMPT_3 = "attempt_1111111111111_0003_m_000003_0";
public static final String TEST_JOB_ID = "job_1111111111111_0001"; public static final String TEST_JOB_ID = "job_1111111111111_0001";
public static final String TEST_USER = "testUser"; public static final String TEST_USER = System.getProperty("user.name");
public static final String TEST_DATA_A = "aaaaa"; public static final String TEST_DATA_A = "aaaaa";
public static final String TEST_DATA_B = "bbbbb"; public static final String TEST_DATA_B = "bbbbb";
public static final String TEST_DATA_C = "ccccc"; public static final String TEST_DATA_C = "ccccc";
@ -70,11 +70,11 @@ public class TestShuffleHandlerBase {
tempDir = Files.createTempDirectory("test-shuffle-channel-handler"); tempDir = Files.createTempDirectory("test-shuffle-channel-handler");
tempDir.toFile().deleteOnExit(); tempDir.toFile().deleteOnExit();
generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1, generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1,
Arrays.asList(TEST_DATA_A, TEST_DATA_B, TEST_DATA_C)); Arrays.asList(TEST_DATA_A, TEST_DATA_B, TEST_DATA_C));
generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2, generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2,
Arrays.asList(TEST_DATA_B, TEST_DATA_A, TEST_DATA_C)); Arrays.asList(TEST_DATA_B, TEST_DATA_A, TEST_DATA_C));
generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_3, generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_3,
Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A)); Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A));
outputStreamCaptor.reset(); outputStreamCaptor.reset();
@ -101,12 +101,13 @@ public class TestShuffleHandlerBase {
return allMatches; return allMatches;
} }
public static void generateMapOutput(String tempDir, String attempt, List<String> maps) public static void generateMapOutput(String user, String tempDir,
String attempt, List<String> maps)
throws IOException { throws IOException {
SpillRecord record = new SpillRecord(maps.size()); SpillRecord record = new SpillRecord(maps.size());
assertTrue(new File(getBasePath(tempDir, attempt)).mkdirs()); assertTrue(new File(getBasePath(user, tempDir, attempt)).mkdirs());
try (PrintWriter writer = new PrintWriter(getDataFile(tempDir, attempt), "UTF-8")) { try (PrintWriter writer = new PrintWriter(getDataFile(user, tempDir, attempt), "UTF-8")) {
long startOffset = 0; long startOffset = 0;
int partition = 0; int partition = 0;
for (String map : maps) { for (String map : maps) {
@ -119,21 +120,21 @@ public class TestShuffleHandlerBase {
partition++; partition++;
writer.write(map); writer.write(map);
} }
record.writeToFile(new Path(getIndexFile(tempDir, attempt)), record.writeToFile(new Path(getIndexFile(user, tempDir, attempt)),
new JobConf(new Configuration())); new JobConf(new Configuration()));
} }
} }
public static String getIndexFile(String tempDir, String attempt) { public static String getIndexFile(String user, String tempDir, String attempt) {
return String.format("%s/%s", getBasePath(tempDir, attempt), INDEX_FILE_NAME); return String.format("%s/%s", getBasePath(user, tempDir, attempt), INDEX_FILE_NAME);
} }
public static String getDataFile(String tempDir, String attempt) { public static String getDataFile(String user, String tempDir, String attempt) {
return String.format("%s/%s", getBasePath(tempDir, attempt), DATA_FILE_NAME); return String.format("%s/%s", getBasePath(user, tempDir, attempt), DATA_FILE_NAME);
} }
private static String getBasePath(String tempDir, String attempt) { private static String getBasePath(String user, String tempDir, String attempt) {
return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, TEST_USER, attempt); return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, user, attempt);
} }
public static String getUri(String jobId, int reduce, List<String> maps, boolean keepAlive) { public static String getUri(String jobId, int reduce, List<String> maps, boolean keepAlive) {