HADOOP-15919. AliyunOSS: Enable Yarn to use OSS. Contributed by wujinhu.

(cherry picked from commit be0708c6eb)
This commit is contained in:
Weiwei Yang 2018-11-19 14:00:54 +08:00
parent 17a41f5d86
commit fdb95ef150
17 changed files with 635 additions and 8 deletions

View File

@ -142,5 +142,11 @@
<artifactId>hadoop-mapreduce-client-jobclient</artifactId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>
<scope>test</scope>
<type>jar</type>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -136,7 +136,7 @@ public class AliyunOSSFileSystem extends FileSystem {
key, key,
uploadPartSize, uploadPartSize,
new SemaphoredDelegatingExecutor(boundedThreadPool, new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true)), (Statistics)(null)); blockOutputActiveBlocks, true)), statistics);
} }
/** /**
@ -297,6 +297,11 @@ public class AliyunOSSFileSystem extends FileSystem {
return uri; return uri;
} }
@Override
public int getDefaultPort() {
return Constants.OSS_DEFAULT_PORT;
}
@Override @Override
public Path getWorkingDirectory() { public Path getWorkingDirectory() {
return workingDir; return workingDir;

View File

@ -416,7 +416,6 @@ public class AliyunOSSFileSystemStore {
PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta); PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
LOG.debug(result.getETag()); LOG.debug(result.getETag());
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
statistics.incrementBytesWritten(file.length());
} finally { } finally {
fis.close(); fis.close();
} }
@ -617,7 +616,6 @@ public class AliyunOSSFileSystemStore {
uploadRequest.setPartNumber(idx); uploadRequest.setPartNumber(idx);
UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest); UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
statistics.incrementWriteOps(1); statistics.incrementWriteOps(1);
statistics.incrementBytesWritten(file.length());
return uploadResult.getPartETag(); return uploadResult.getPartETag();
} catch (Exception e) { } catch (Exception e) {
LOG.debug("Failed to upload "+ file.getPath() +", " + LOG.debug("Failed to upload "+ file.getPath() +", " +

View File

@ -38,6 +38,8 @@ public final class Constants {
public static final String CREDENTIALS_PROVIDER_KEY = public static final String CREDENTIALS_PROVIDER_KEY =
"fs.oss.credentials.provider"; "fs.oss.credentials.provider";
public static final int OSS_DEFAULT_PORT = -1;
// OSS access verification // OSS access verification
public static final String ACCESS_KEY_ID = "fs.oss.accessKeyId"; public static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret"; public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegateToFileSystem;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* OSS implementation of AbstractFileSystem.
* This impl delegates to the AliyunOSSFileSystem.
*/
public class OSS extends DelegateToFileSystem {
public OSS(URI theUri, Configuration conf)
throws IOException, URISyntaxException {
super(theUri, new AliyunOSSFileSystem(), conf, "oss", false);
}
@Override
public int getUriDefaultPort() {
return Constants.OSS_DEFAULT_PORT;
}
}

View File

@ -110,6 +110,16 @@ please raise your issues with them.
### Other properties ### Other properties
<property>
<name>fs.AbstractFileSystem.oss.impl</name>
<value>org.apache.hadoop.fs.aliyun.oss.OSS</value>
<description>The implementation class of the OSS AbstractFileSystem.
If you want to use OSS as YARNs resource storage dir via the
fs.defaultFS configuration property in Hadoops core-site.xml,
you should add this configuration to Hadoop's core-site.xml
</description>
</property>
<property> <property>
<name>fs.oss.endpoint</name> <name>fs.oss.endpoint</name>
<description>Aliyun OSS endpoint to connect to. An up-to-date list is <description>Aliyun OSS endpoint to connect to. An up-to-date list is

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.aliyun.oss;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.junit.internal.AssumptionViolatedException; import org.junit.internal.AssumptionViolatedException;
import java.io.IOException; import java.io.IOException;
@ -45,10 +46,21 @@ public final class AliyunOSSTestUtils {
*/ */
public static AliyunOSSFileSystem createTestFileSystem(Configuration conf) public static AliyunOSSFileSystem createTestFileSystem(Configuration conf)
throws IOException { throws IOException {
AliyunOSSFileSystem ossfs = new AliyunOSSFileSystem();
ossfs.initialize(getURI(conf), conf);
return ossfs;
}
public static FileContext createTestFileContext(Configuration conf) throws
IOException {
return FileContext.getFileContext(getURI(conf), conf);
}
private static URI getURI(Configuration conf) {
String fsname = conf.getTrimmed( String fsname = conf.getTrimmed(
TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME, ""); TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME, "");
boolean liveTest = StringUtils.isNotEmpty(fsname); boolean liveTest = !StringUtils.isEmpty(fsname);
URI testURI = null; URI testURI = null;
if (liveTest) { if (liveTest) {
testURI = URI.create(fsname); testURI = URI.create(fsname);
@ -59,11 +71,8 @@ public final class AliyunOSSTestUtils {
throw new AssumptionViolatedException("No test filesystem in " throw new AssumptionViolatedException("No test filesystem in "
+ TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME); + TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME);
} }
AliyunOSSFileSystem ossfs = new AliyunOSSFileSystem(); return testURI;
ossfs.initialize(testURI, conf);
return ossfs;
} }
/** /**
* Generate unique test path for multiple user tests. * Generate unique test path for multiple user tests.
* *

View File

@ -75,6 +75,7 @@ public class TestAliyunOSSBlockOutputStream {
@Test @Test
public void testRegularUpload() throws IOException { public void testRegularUpload() throws IOException {
FileSystem.clearStatistics();
long size = 1024 * 1024; long size = 1024 * 1024;
FileSystem.Statistics statistics = FileSystem.Statistics statistics =
FileSystem.getStatistics("oss", AliyunOSSFileSystem.class); FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
@ -111,6 +112,7 @@ public class TestAliyunOSSBlockOutputStream {
@Test @Test
public void testMultiPartUpload() throws IOException { public void testMultiPartUpload() throws IOException {
long size = 6 * 1024 * 1024; long size = 6 * 1024 * 1024;
FileSystem.clearStatistics();
FileSystem.Statistics statistics = FileSystem.Statistics statistics =
FileSystem.getStatistics("oss", AliyunOSSFileSystem.class); FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
@ -134,6 +136,7 @@ public class TestAliyunOSSBlockOutputStream {
@Test @Test
public void testMultiPartUploadConcurrent() throws IOException { public void testMultiPartUploadConcurrent() throws IOException {
FileSystem.clearStatistics();
long size = 50 * 1024 * 1024 - 1; long size = 50 * 1024 * 1024 - 1;
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size); ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
FileSystem.Statistics statistics = FileSystem.Statistics statistics =

View File

@ -19,9 +19,13 @@
package org.apache.hadoop.fs.aliyun.oss.contract; package org.apache.hadoop.fs.aliyun.oss.contract;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
import org.apache.hadoop.fs.contract.AbstractBondedFSContract; import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
import java.io.IOException;
/** /**
* The contract of Aliyun OSS: only enabled if the test bucket is provided. * The contract of Aliyun OSS: only enabled if the test bucket is provided.
*/ */
@ -40,6 +44,11 @@ public class AliyunOSSContract extends AbstractBondedFSContract {
return "oss"; return "oss";
} }
@Override
public FileSystem getTestFileSystem() throws IOException {
return AliyunOSSTestUtils.createTestFileSystem(new Configuration());
}
@Override @Override
public Path getTestPath() { public Path getTestPath() {
String testUniqueForkId = System.getProperty("test.unique.fork.id"); String testUniqueForkId = System.getProperty("test.unique.fork.id");

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.fileContext;
import org.apache.hadoop.fs.TestFileContext;
/**
* Implementation of TestFileContext for OSS.
*/
public class TestOSSFileContext extends TestFileContext {
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.fileContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContextCreateMkdirBaseTest;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
import org.junit.Before;
import java.io.IOException;
/**
* OSS implementation of FileContextCreateMkdirBaseTest.
*/
public class TestOSSFileContextCreateMkdir
extends FileContextCreateMkdirBaseTest {
@Before
public void setUp() throws IOException, Exception {
Configuration conf = new Configuration();
fc = AliyunOSSTestUtils.createTestFileContext(conf);
super.setUp();
}
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.fileContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContextMainOperationsBaseTest;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
/**
* OSS implementation of FileContextMainOperationsBaseTest.
*/
public class TestOSSFileContextMainOperations
extends FileContextMainOperationsBaseTest {
@Before
public void setUp() throws IOException, Exception {
Configuration conf = new Configuration();
fc = AliyunOSSTestUtils.createTestFileContext(conf);
super.setUp();
}
@Override
protected boolean listCorruptedBlocksSupported() {
return false;
}
@Test
@Ignore
public void testCreateFlagAppendExistingFile() throws IOException {
// append not supported, so test removed
}
@Test
@Ignore
public void testCreateFlagCreateAppendExistingFile() throws IOException {
// append not supported, so test removed
}
@Test
@Ignore
public void testSetVerifyChecksum() throws IOException {
// checksums ignored, so test ignored
}
@Test
@Ignore
public void testBuilderCreateAppendExistingFile() throws IOException {
// append not supported, so test removed
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.fileContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FCStatisticsBaseTest;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import java.net.URI;
/**
* OSS implementation of FCStatisticsBaseTest.
*/
public class TestOSSFileContextStatistics extends FCStatisticsBaseTest {
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
fc = AliyunOSSTestUtils.createTestFileContext(conf);
fc.mkdir(fileContextTestHelper.getTestRootPath(fc, "test"),
FileContext.DEFAULT_PERM, true);
FileContext.clearStatistics();
}
@After
public void tearDown() throws Exception {
if (fc != null) {
fc.delete(fileContextTestHelper.getTestRootPath(fc, "test"), true);
}
}
@Override
protected void verifyReadBytes(FileSystem.Statistics stats) {
// one blockSize for read, one for pread
Assert.assertEquals(2 * blockSize, stats.getBytesRead());
}
@Override
protected void verifyWrittenBytes(FileSystem.Statistics stats) {
// no extra bytes are written
Assert.assertEquals(blockSize, stats.getBytesWritten());
}
@Override
protected URI getFsUri() {
return fc.getHomeDirectory().toUri();
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.fileContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContextURIBase;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
/**
* OSS implementation of FileContextURIBase.
*/
public class TestOSSFileContextURI extends FileContextURIBase {
@Before
public void setUp() throws IOException, Exception {
Configuration conf = new Configuration();
fc1 = AliyunOSSTestUtils.createTestFileContext(conf);
// different object, same FS
fc2 = AliyunOSSTestUtils.createTestFileContext(conf);
super.setUp();
}
@Test
@Ignore
public void testFileStatus() throws IOException {
// test ignored
// (the statistics tested with this method are not relevant for an OSSFS)
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.fileContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContextUtilBase;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
import org.junit.Before;
import java.io.IOException;
/**
* OSS implementation of FileContextUtilBase.
*/
public class TestOSSFileContextUtil extends FileContextUtilBase {
@Before
public void setUp() throws IOException, Exception {
Configuration conf = new Configuration();
fc = AliyunOSSTestUtils.createTestFileContext(conf);
super.setUp();
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.yarn;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.EnumSet;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* OSS tests through the {@link FileContext} API.
*/
public class TestOSS {
private FileContext fc;
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
fc = AliyunOSSTestUtils.createTestFileContext(conf);
}
@After
public void tearDown() throws Exception {
if (fc != null) {
fc.delete(getTestPath(), true);
}
}
protected Path getTestPath() {
return new Path(AliyunOSSTestUtils.generateUniqueTestPath());
}
@Test
public void testOSSStatus() throws Exception {
FsStatus fsStatus = fc.getFsStatus(null);
assertNotNull(fsStatus);
assertTrue("Used capacity should be positive: " + fsStatus.getUsed(),
fsStatus.getUsed() >= 0);
assertTrue("Remaining capacity should be positive: " + fsStatus
.getRemaining(),
fsStatus.getRemaining() >= 0);
assertTrue("Capacity should be positive: " + fsStatus.getCapacity(),
fsStatus.getCapacity() >= 0);
}
@Test(timeout = 90000L)
public void testOSSCreateFileInSubDir() throws Exception {
Path dirPath = getTestPath();
fc.mkdir(dirPath, FileContext.DIR_DEFAULT_PERM, true);
Path filePath = new Path(dirPath, "file");
try (FSDataOutputStream file = fc.create(filePath, EnumSet.of(CreateFlag
.CREATE))) {
file.write(666);
}
}
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.aliyun.oss.yarn;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
import org.apache.hadoop.fs.aliyun.oss.contract.AliyunOSSContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.Test;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
/**
* Tests that OSS is usable through a YARN application.
*/
public class TestOSSMiniYarnCluster extends AbstractFSContractTestBase {
private Configuration conf;
private FileSystem fs;
private MiniYARNCluster yarnCluster;
private Path rootPath;
@Override
protected AbstractFSContract createContract(Configuration configuration) {
this.conf = configuration;
return new AliyunOSSContract(conf);
}
@Override
public void setup() throws Exception {
super.setup();
fs = getFileSystem();
rootPath = path("MiniClusterWordCount");
Path workingDir = path("working");
fs.setWorkingDirectory(workingDir);
fs.mkdirs(new Path(rootPath, "input/"));
yarnCluster = new MiniYARNCluster("MiniClusterWordCount", // testName
1, // number of node managers
1, // number of local log dirs per node manager
1); // number of hdfs dirs per node manager
yarnCluster.init(conf);
yarnCluster.start();
}
@Test
public void testWithMiniCluster() throws Exception {
Path input = new Path(rootPath, "input/in");
input = input.makeQualified(fs.getUri(), fs.getWorkingDirectory());
Path output = new Path(rootPath, "output/");
output = output.makeQualified(fs.getUri(), fs.getWorkingDirectory());
writeStringToFile(input, "first line\nsecond line\nthird line");
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
int exitCode = (job.waitForCompletion(true) ? 0 : 1);
assertEquals("Returned error code.", 0, exitCode);
assertTrue(fs.exists(new Path(output, "_SUCCESS")));
String outputAsStr = readStringFromFile(new Path(output, "part-r-00000"));
Map<String, Integer> resAsMap = getResultAsMap(outputAsStr);
assertEquals(4, resAsMap.size());
assertEquals(1, (int) resAsMap.get("first"));
assertEquals(1, (int) resAsMap.get("second"));
assertEquals(1, (int) resAsMap.get("third"));
assertEquals(3, (int) resAsMap.get("line"));
}
/**
* helper method.
*/
private Map<String, Integer> getResultAsMap(String outputAsStr) {
Map<String, Integer> result = new HashMap<>();
for (String line : outputAsStr.split("\n")) {
String[] tokens = line.split("\t");
assertTrue("Not enough tokens in in string \" "
+ line + "\" from output \"" + outputAsStr + "\"",
tokens.length > 1);
result.put(tokens[0], Integer.parseInt(tokens[1]));
}
return result;
}
/**
* helper method.
*/
private void writeStringToFile(Path path, String string) throws IOException {
FileContext fc = AliyunOSSTestUtils.createTestFileContext(conf);
try (FSDataOutputStream file = fc.create(path,
EnumSet.of(CreateFlag.CREATE))) {
file.write(string.getBytes());
}
}
/**
* helper method.
*/
private String readStringFromFile(Path path) throws IOException {
try (FSDataInputStream in = fs.open(path)) {
long bytesLen = fs.getFileStatus(path).getLen();
byte[] buffer = new byte[(int) bytesLen];
IOUtils.readFully(in, buffer, 0, buffer.length);
return new String(buffer);
}
}
@Override
public void teardown() throws Exception {
if (yarnCluster != null) {
yarnCluster.stop();
}
super.teardown();
IOUtils.closeStream(getFileSystem());
}
}