diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java index 7fedc7bb2dc..66fa3de94f8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java @@ -225,7 +225,7 @@ public class TestShuffleChannelHandler extends TestShuffleHandlerBase { final ShuffleTest t = createShuffleTest(); final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion(); - String dataFile = getDataFile(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2); + String dataFile = getDataFile(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2); assertTrue("should delete", new File(dataFile).delete()); FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index 37a9210286c..cc46b49b113 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -29,6 +29,7 @@ import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -41,6 +42,7 @@ import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.MalformedURLException; @@ -159,7 +161,7 @@ public class TestShuffleHandler extends TestShuffleHandlerBase { shuffleHandler.init(conf); shuffleHandler.start(); final String port = shuffleHandler.getConfig().get(SHUFFLE_PORT_CONFIG_KEY); - final SecretKey secretKey = shuffleHandler.addTestApp(); + final SecretKey secretKey = shuffleHandler.addTestApp(TEST_USER); // setup connections HttpURLConnection[] conns = new HttpURLConnection[connAttempts]; @@ -237,7 +239,7 @@ public class TestShuffleHandler extends TestShuffleHandlerBase { shuffleHandler.init(conf); shuffleHandler.start(); final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); - final SecretKey secretKey = shuffleHandler.addTestApp(); + final SecretKey secretKey = shuffleHandler.addTestApp(TEST_USER); HttpURLConnection conn1 = createRequest( geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true), @@ -278,18 +280,34 @@ public class TestShuffleHandler extends TestShuffleHandlerBase { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); + final String randomUser = "randomUser"; + final String attempt = "attempt_1111111111111_0004_m_000004_0"; + generateMapOutput(randomUser, tempDir.toAbsolutePath().toString(), attempt, + Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A)); + ShuffleHandlerMock shuffleHandler = new ShuffleHandlerMock(); shuffleHandler.init(conf); try { shuffleHandler.start(); final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); - final SecretKey secretKey = shuffleHandler.addTestApp(); + final SecretKey secretKey = shuffleHandler.addTestApp(randomUser); HttpURLConnection conn = createRequest( - geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), false), + geURL(port, TEST_JOB_ID, 0, Collections.singletonList(attempt), false), secretKey); conn.connect(); - BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream())); + + InputStream is = null; + try { + is = conn.getInputStream(); + } catch (IOException ioe) { + if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { + is = conn.getErrorStream(); + } + } + + assertNotNull(is); + BufferedReader in = new BufferedReader(new InputStreamReader(is)); StringBuilder builder = new StringBuilder(); String inputLine; while ((inputLine = in.readLine()) != null) { @@ -299,7 +317,7 @@ public class TestShuffleHandler extends TestShuffleHandlerBase { String receivedString = builder.toString(); //Retrieve file owner name - String indexFilePath = getIndexFile(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1); + String indexFilePath = getIndexFile(randomUser, tempDir.toAbsolutePath().toString(), attempt); String owner; try (FileInputStream fis = new FileInputStream(indexFilePath)) { owner = NativeIO.POSIX.getFstat(fis.getFD()).getOwner(); @@ -307,11 +325,11 @@ public class TestShuffleHandler extends TestShuffleHandlerBase { String message = "Owner '" + owner + "' for path " + indexFilePath - + " did not match expected owner '" + TEST_USER + "'"; + + " did not match expected owner '" + randomUser + "'"; assertTrue(String.format("Received string '%s' should contain " + "message '%s'", receivedString, message), receivedString.contains(message)); - assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, conn.getResponseCode()); LOG.info("received: " + receivedString); assertNotEquals("", receivedString); } finally { @@ -334,7 +352,7 @@ public class TestShuffleHandler extends TestShuffleHandlerBase { shuffle.init(conf); shuffle.start(); final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); - final SecretKey secretKey = shuffle.addTestApp(); + final SecretKey secretKey = shuffle.addTestApp(TEST_USER); // verify we are authorized to shuffle int rc = getShuffleResponseCode(port, secretKey); @@ -387,7 +405,7 @@ public class TestShuffleHandler extends TestShuffleHandlerBase { shuffle.init(conf); shuffle.start(); final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); - final SecretKey secretKey = shuffle.addTestApp(); + final SecretKey secretKey = shuffle.addTestApp(TEST_USER); // verify we are authorized to shuffle int rc = getShuffleResponseCode(port, secretKey); @@ -489,14 +507,14 @@ public class TestShuffleHandler extends TestShuffleHandlerBase { class ShuffleHandlerMock extends ShuffleHandler { - public SecretKey addTestApp() throws IOException { + public SecretKey addTestApp(String user) throws IOException { DataOutputBuffer outputBuffer = new DataOutputBuffer(); outputBuffer.reset(); Token jt = new Token<>( - "identifier".getBytes(), "password".getBytes(), new Text(TEST_USER), + "identifier".getBytes(), "password".getBytes(), new Text(user), new Text("shuffleService")); jt.write(outputBuffer); - initializeApplication(new ApplicationInitializationContext(TEST_USER, TEST_APP_ID, + initializeApplication(new ApplicationInitializationContext(user, TEST_APP_ID, ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength()))); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java index 1bce443381d..406f2866230 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java @@ -55,7 +55,7 @@ public class TestShuffleHandlerBase { public static final String TEST_ATTEMPT_2 = "attempt_1111111111111_0002_m_000002_0"; public static final String TEST_ATTEMPT_3 = "attempt_1111111111111_0003_m_000003_0"; public static final String TEST_JOB_ID = "job_1111111111111_0001"; - public static final String TEST_USER = "testUser"; + public static final String TEST_USER = System.getProperty("user.name"); public static final String TEST_DATA_A = "aaaaa"; public static final String TEST_DATA_B = "bbbbb"; public static final String TEST_DATA_C = "ccccc"; @@ -70,11 +70,11 @@ public class TestShuffleHandlerBase { tempDir = Files.createTempDirectory("test-shuffle-channel-handler"); tempDir.toFile().deleteOnExit(); - generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1, + generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1, Arrays.asList(TEST_DATA_A, TEST_DATA_B, TEST_DATA_C)); - generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2, + generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2, Arrays.asList(TEST_DATA_B, TEST_DATA_A, TEST_DATA_C)); - generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_3, + generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_3, Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A)); outputStreamCaptor.reset(); @@ -101,12 +101,13 @@ public class TestShuffleHandlerBase { return allMatches; } - public static void generateMapOutput(String tempDir, String attempt, List maps) + public static void generateMapOutput(String user, String tempDir, + String attempt, List maps) throws IOException { SpillRecord record = new SpillRecord(maps.size()); - assertTrue(new File(getBasePath(tempDir, attempt)).mkdirs()); - try (PrintWriter writer = new PrintWriter(getDataFile(tempDir, attempt), "UTF-8")) { + assertTrue(new File(getBasePath(user, tempDir, attempt)).mkdirs()); + try (PrintWriter writer = new PrintWriter(getDataFile(user, tempDir, attempt), "UTF-8")) { long startOffset = 0; int partition = 0; for (String map : maps) { @@ -119,21 +120,21 @@ public class TestShuffleHandlerBase { partition++; writer.write(map); } - record.writeToFile(new Path(getIndexFile(tempDir, attempt)), + record.writeToFile(new Path(getIndexFile(user, tempDir, attempt)), new JobConf(new Configuration())); } } - public static String getIndexFile(String tempDir, String attempt) { - return String.format("%s/%s", getBasePath(tempDir, attempt), INDEX_FILE_NAME); + public static String getIndexFile(String user, String tempDir, String attempt) { + return String.format("%s/%s", getBasePath(user, tempDir, attempt), INDEX_FILE_NAME); } - public static String getDataFile(String tempDir, String attempt) { - return String.format("%s/%s", getBasePath(tempDir, attempt), DATA_FILE_NAME); + public static String getDataFile(String user, String tempDir, String attempt) { + return String.format("%s/%s", getBasePath(user, tempDir, attempt), DATA_FILE_NAME); } - private static String getBasePath(String tempDir, String attempt) { - return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, TEST_USER, attempt); + private static String getBasePath(String user, String tempDir, String attempt) { + return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, user, attempt); } public static String getUri(String jobId, int reduce, List maps, boolean keepAlive) {