HADOOP-15919. AliyunOSS: Enable Yarn to use OSS. Contributed by wujinhu.
This commit is contained in:
parent
d027a24f03
commit
be0708c6eb
|
@ -142,5 +142,11 @@
|
||||||
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
|
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-mapreduce-examples</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
<type>jar</type>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -136,7 +136,7 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
key,
|
key,
|
||||||
uploadPartSize,
|
uploadPartSize,
|
||||||
new SemaphoredDelegatingExecutor(boundedThreadPool,
|
new SemaphoredDelegatingExecutor(boundedThreadPool,
|
||||||
blockOutputActiveBlocks, true)), (Statistics)(null));
|
blockOutputActiveBlocks, true)), statistics);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -297,6 +297,11 @@ public class AliyunOSSFileSystem extends FileSystem {
|
||||||
return uri;
|
return uri;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getDefaultPort() {
|
||||||
|
return Constants.OSS_DEFAULT_PORT;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Path getWorkingDirectory() {
|
public Path getWorkingDirectory() {
|
||||||
return workingDir;
|
return workingDir;
|
||||||
|
|
|
@ -416,7 +416,6 @@ public class AliyunOSSFileSystemStore {
|
||||||
PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
|
PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
|
||||||
LOG.debug(result.getETag());
|
LOG.debug(result.getETag());
|
||||||
statistics.incrementWriteOps(1);
|
statistics.incrementWriteOps(1);
|
||||||
statistics.incrementBytesWritten(file.length());
|
|
||||||
} finally {
|
} finally {
|
||||||
fis.close();
|
fis.close();
|
||||||
}
|
}
|
||||||
|
@ -617,7 +616,6 @@ public class AliyunOSSFileSystemStore {
|
||||||
uploadRequest.setPartNumber(idx);
|
uploadRequest.setPartNumber(idx);
|
||||||
UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
|
UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
|
||||||
statistics.incrementWriteOps(1);
|
statistics.incrementWriteOps(1);
|
||||||
statistics.incrementBytesWritten(file.length());
|
|
||||||
return uploadResult.getPartETag();
|
return uploadResult.getPartETag();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.debug("Failed to upload "+ file.getPath() +", " +
|
LOG.debug("Failed to upload "+ file.getPath() +", " +
|
||||||
|
|
|
@ -38,6 +38,8 @@ public final class Constants {
|
||||||
public static final String CREDENTIALS_PROVIDER_KEY =
|
public static final String CREDENTIALS_PROVIDER_KEY =
|
||||||
"fs.oss.credentials.provider";
|
"fs.oss.credentials.provider";
|
||||||
|
|
||||||
|
public static final int OSS_DEFAULT_PORT = -1;
|
||||||
|
|
||||||
// OSS access verification
|
// OSS access verification
|
||||||
public static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
|
public static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
|
||||||
public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
|
public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.DelegateToFileSystem;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OSS implementation of AbstractFileSystem.
|
||||||
|
* This impl delegates to the AliyunOSSFileSystem.
|
||||||
|
*/
|
||||||
|
public class OSS extends DelegateToFileSystem {
|
||||||
|
|
||||||
|
public OSS(URI theUri, Configuration conf)
|
||||||
|
throws IOException, URISyntaxException {
|
||||||
|
super(theUri, new AliyunOSSFileSystem(), conf, "oss", false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getUriDefaultPort() {
|
||||||
|
return Constants.OSS_DEFAULT_PORT;
|
||||||
|
}
|
||||||
|
}
|
|
@ -110,6 +110,16 @@ please raise your issues with them.
|
||||||
|
|
||||||
### Other properties
|
### Other properties
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.AbstractFileSystem.oss.impl</name>
|
||||||
|
<value>org.apache.hadoop.fs.aliyun.oss.OSS</value>
|
||||||
|
<description>The implementation class of the OSS AbstractFileSystem.
|
||||||
|
If you want to use OSS as YARN’s resource storage dir via the
|
||||||
|
fs.defaultFS configuration property in Hadoop’s core-site.xml,
|
||||||
|
you should add this configuration to Hadoop's core-site.xml
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>fs.oss.endpoint</name>
|
<name>fs.oss.endpoint</name>
|
||||||
<description>Aliyun OSS endpoint to connect to. An up-to-date list is
|
<description>Aliyun OSS endpoint to connect to. An up-to-date list is
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.fs.aliyun.oss;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.junit.internal.AssumptionViolatedException;
|
import org.junit.internal.AssumptionViolatedException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -45,10 +46,21 @@ public final class AliyunOSSTestUtils {
|
||||||
*/
|
*/
|
||||||
public static AliyunOSSFileSystem createTestFileSystem(Configuration conf)
|
public static AliyunOSSFileSystem createTestFileSystem(Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
AliyunOSSFileSystem ossfs = new AliyunOSSFileSystem();
|
||||||
|
ossfs.initialize(getURI(conf), conf);
|
||||||
|
return ossfs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FileContext createTestFileContext(Configuration conf) throws
|
||||||
|
IOException {
|
||||||
|
return FileContext.getFileContext(getURI(conf), conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static URI getURI(Configuration conf) {
|
||||||
String fsname = conf.getTrimmed(
|
String fsname = conf.getTrimmed(
|
||||||
TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME, "");
|
TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME, "");
|
||||||
|
|
||||||
boolean liveTest = StringUtils.isNotEmpty(fsname);
|
boolean liveTest = !StringUtils.isEmpty(fsname);
|
||||||
URI testURI = null;
|
URI testURI = null;
|
||||||
if (liveTest) {
|
if (liveTest) {
|
||||||
testURI = URI.create(fsname);
|
testURI = URI.create(fsname);
|
||||||
|
@ -59,11 +71,8 @@ public final class AliyunOSSTestUtils {
|
||||||
throw new AssumptionViolatedException("No test filesystem in "
|
throw new AssumptionViolatedException("No test filesystem in "
|
||||||
+ TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME);
|
+ TestAliyunOSSFileSystemContract.TEST_FS_OSS_NAME);
|
||||||
}
|
}
|
||||||
AliyunOSSFileSystem ossfs = new AliyunOSSFileSystem();
|
return testURI;
|
||||||
ossfs.initialize(testURI, conf);
|
|
||||||
return ossfs;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate unique test path for multiple user tests.
|
* Generate unique test path for multiple user tests.
|
||||||
*
|
*
|
||||||
|
|
|
@ -75,6 +75,7 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRegularUpload() throws IOException {
|
public void testRegularUpload() throws IOException {
|
||||||
|
FileSystem.clearStatistics();
|
||||||
long size = 1024 * 1024;
|
long size = 1024 * 1024;
|
||||||
FileSystem.Statistics statistics =
|
FileSystem.Statistics statistics =
|
||||||
FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
|
FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
|
||||||
|
@ -111,6 +112,7 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
@Test
|
@Test
|
||||||
public void testMultiPartUpload() throws IOException {
|
public void testMultiPartUpload() throws IOException {
|
||||||
long size = 6 * 1024 * 1024;
|
long size = 6 * 1024 * 1024;
|
||||||
|
FileSystem.clearStatistics();
|
||||||
FileSystem.Statistics statistics =
|
FileSystem.Statistics statistics =
|
||||||
FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
|
FileSystem.getStatistics("oss", AliyunOSSFileSystem.class);
|
||||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
|
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
|
||||||
|
@ -134,6 +136,7 @@ public class TestAliyunOSSBlockOutputStream {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultiPartUploadConcurrent() throws IOException {
|
public void testMultiPartUploadConcurrent() throws IOException {
|
||||||
|
FileSystem.clearStatistics();
|
||||||
long size = 50 * 1024 * 1024 - 1;
|
long size = 50 * 1024 * 1024 - 1;
|
||||||
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
|
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
|
||||||
FileSystem.Statistics statistics =
|
FileSystem.Statistics statistics =
|
||||||
|
|
|
@ -19,9 +19,13 @@
|
||||||
package org.apache.hadoop.fs.aliyun.oss.contract;
|
package org.apache.hadoop.fs.aliyun.oss.contract;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
|
||||||
import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
|
import org.apache.hadoop.fs.contract.AbstractBondedFSContract;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The contract of Aliyun OSS: only enabled if the test bucket is provided.
|
* The contract of Aliyun OSS: only enabled if the test bucket is provided.
|
||||||
*/
|
*/
|
||||||
|
@ -40,6 +44,11 @@ public class AliyunOSSContract extends AbstractBondedFSContract {
|
||||||
return "oss";
|
return "oss";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileSystem getTestFileSystem() throws IOException {
|
||||||
|
return AliyunOSSTestUtils.createTestFileSystem(new Configuration());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Path getTestPath() {
|
public Path getTestPath() {
|
||||||
String testUniqueForkId = System.getProperty("test.unique.fork.id");
|
String testUniqueForkId = System.getProperty("test.unique.fork.id");
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss.fileContext;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.TestFileContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of TestFileContext for OSS.
|
||||||
|
*/
|
||||||
|
public class TestOSSFileContext extends TestFileContext {
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss.fileContext;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContextCreateMkdirBaseTest;
|
||||||
|
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OSS implementation of FileContextCreateMkdirBaseTest.
|
||||||
|
*/
|
||||||
|
public class TestOSSFileContextCreateMkdir
|
||||||
|
extends FileContextCreateMkdirBaseTest {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException, Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
fc = AliyunOSSTestUtils.createTestFileContext(conf);
|
||||||
|
super.setUp();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss.fileContext;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContextMainOperationsBaseTest;
|
||||||
|
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OSS implementation of FileContextMainOperationsBaseTest.
|
||||||
|
*/
|
||||||
|
public class TestOSSFileContextMainOperations
|
||||||
|
extends FileContextMainOperationsBaseTest {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException, Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
fc = AliyunOSSTestUtils.createTestFileContext(conf);
|
||||||
|
super.setUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean listCorruptedBlocksSupported() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore
|
||||||
|
public void testCreateFlagAppendExistingFile() throws IOException {
|
||||||
|
// append not supported, so test removed
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore
|
||||||
|
public void testCreateFlagCreateAppendExistingFile() throws IOException {
|
||||||
|
// append not supported, so test removed
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore
|
||||||
|
public void testSetVerifyChecksum() throws IOException {
|
||||||
|
// checksums ignored, so test ignored
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore
|
||||||
|
public void testBuilderCreateAppendExistingFile() throws IOException {
|
||||||
|
// append not supported, so test removed
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,69 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss.fileContext;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FCStatisticsBaseTest;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OSS implementation of FCStatisticsBaseTest.
|
||||||
|
*/
|
||||||
|
public class TestOSSFileContextStatistics extends FCStatisticsBaseTest {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
fc = AliyunOSSTestUtils.createTestFileContext(conf);
|
||||||
|
fc.mkdir(fileContextTestHelper.getTestRootPath(fc, "test"),
|
||||||
|
FileContext.DEFAULT_PERM, true);
|
||||||
|
FileContext.clearStatistics();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (fc != null) {
|
||||||
|
fc.delete(fileContextTestHelper.getTestRootPath(fc, "test"), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void verifyReadBytes(FileSystem.Statistics stats) {
|
||||||
|
// one blockSize for read, one for pread
|
||||||
|
Assert.assertEquals(2 * blockSize, stats.getBytesRead());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void verifyWrittenBytes(FileSystem.Statistics stats) {
|
||||||
|
// no extra bytes are written
|
||||||
|
Assert.assertEquals(blockSize, stats.getBytesWritten());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected URI getFsUri() {
|
||||||
|
return fc.getHomeDirectory().toUri();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,51 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss.fileContext;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContextURIBase;
|
||||||
|
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OSS implementation of FileContextURIBase.
|
||||||
|
*/
|
||||||
|
public class TestOSSFileContextURI extends FileContextURIBase {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException, Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
fc1 = AliyunOSSTestUtils.createTestFileContext(conf);
|
||||||
|
// different object, same FS
|
||||||
|
fc2 = AliyunOSSTestUtils.createTestFileContext(conf);
|
||||||
|
super.setUp();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore
|
||||||
|
public void testFileStatus() throws IOException {
|
||||||
|
// test ignored
|
||||||
|
// (the statistics tested with this method are not relevant for an OSSFS)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss.fileContext;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileContextUtilBase;
|
||||||
|
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OSS implementation of FileContextUtilBase.
|
||||||
|
*/
|
||||||
|
public class TestOSSFileContextUtil extends FileContextUtilBase {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException, Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
fc = AliyunOSSTestUtils.createTestFileContext(conf);
|
||||||
|
super.setUp();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,83 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss.yarn;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FsStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.EnumSet;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OSS tests through the {@link FileContext} API.
|
||||||
|
*/
|
||||||
|
public class TestOSS {
|
||||||
|
private FileContext fc;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
fc = AliyunOSSTestUtils.createTestFileContext(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (fc != null) {
|
||||||
|
fc.delete(getTestPath(), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Path getTestPath() {
|
||||||
|
return new Path(AliyunOSSTestUtils.generateUniqueTestPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOSSStatus() throws Exception {
|
||||||
|
FsStatus fsStatus = fc.getFsStatus(null);
|
||||||
|
assertNotNull(fsStatus);
|
||||||
|
assertTrue("Used capacity should be positive: " + fsStatus.getUsed(),
|
||||||
|
fsStatus.getUsed() >= 0);
|
||||||
|
assertTrue("Remaining capacity should be positive: " + fsStatus
|
||||||
|
.getRemaining(),
|
||||||
|
fsStatus.getRemaining() >= 0);
|
||||||
|
assertTrue("Capacity should be positive: " + fsStatus.getCapacity(),
|
||||||
|
fsStatus.getCapacity() >= 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 90000L)
|
||||||
|
public void testOSSCreateFileInSubDir() throws Exception {
|
||||||
|
Path dirPath = getTestPath();
|
||||||
|
fc.mkdir(dirPath, FileContext.DIR_DEFAULT_PERM, true);
|
||||||
|
Path filePath = new Path(dirPath, "file");
|
||||||
|
try (FSDataOutputStream file = fc.create(filePath, EnumSet.of(CreateFlag
|
||||||
|
.CREATE))) {
|
||||||
|
file.write(666);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,160 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.aliyun.oss.yarn;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.examples.WordCount;
|
||||||
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSTestUtils;
|
||||||
|
import org.apache.hadoop.fs.aliyun.oss.contract.AliyunOSSContract;
|
||||||
|
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||||
|
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.IntWritable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||||
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that OSS is usable through a YARN application.
|
||||||
|
*/
|
||||||
|
public class TestOSSMiniYarnCluster extends AbstractFSContractTestBase {
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private FileSystem fs;
|
||||||
|
private MiniYARNCluster yarnCluster;
|
||||||
|
private Path rootPath;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected AbstractFSContract createContract(Configuration configuration) {
|
||||||
|
this.conf = configuration;
|
||||||
|
return new AliyunOSSContract(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
super.setup();
|
||||||
|
fs = getFileSystem();
|
||||||
|
rootPath = path("MiniClusterWordCount");
|
||||||
|
Path workingDir = path("working");
|
||||||
|
fs.setWorkingDirectory(workingDir);
|
||||||
|
fs.mkdirs(new Path(rootPath, "input/"));
|
||||||
|
|
||||||
|
yarnCluster = new MiniYARNCluster("MiniClusterWordCount", // testName
|
||||||
|
1, // number of node managers
|
||||||
|
1, // number of local log dirs per node manager
|
||||||
|
1); // number of hdfs dirs per node manager
|
||||||
|
yarnCluster.init(conf);
|
||||||
|
yarnCluster.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithMiniCluster() throws Exception {
|
||||||
|
Path input = new Path(rootPath, "input/in");
|
||||||
|
input = input.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
||||||
|
Path output = new Path(rootPath, "output/");
|
||||||
|
output = output.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
||||||
|
|
||||||
|
writeStringToFile(input, "first line\nsecond line\nthird line");
|
||||||
|
|
||||||
|
Job job = Job.getInstance(conf, "word count");
|
||||||
|
job.setJarByClass(WordCount.class);
|
||||||
|
job.setMapperClass(WordCount.TokenizerMapper.class);
|
||||||
|
job.setCombinerClass(WordCount.IntSumReducer.class);
|
||||||
|
job.setReducerClass(WordCount.IntSumReducer.class);
|
||||||
|
job.setOutputKeyClass(Text.class);
|
||||||
|
job.setOutputValueClass(IntWritable.class);
|
||||||
|
FileInputFormat.addInputPath(job, input);
|
||||||
|
FileOutputFormat.setOutputPath(job, output);
|
||||||
|
|
||||||
|
int exitCode = (job.waitForCompletion(true) ? 0 : 1);
|
||||||
|
assertEquals("Returned error code.", 0, exitCode);
|
||||||
|
|
||||||
|
assertTrue(fs.exists(new Path(output, "_SUCCESS")));
|
||||||
|
String outputAsStr = readStringFromFile(new Path(output, "part-r-00000"));
|
||||||
|
Map<String, Integer> resAsMap = getResultAsMap(outputAsStr);
|
||||||
|
|
||||||
|
assertEquals(4, resAsMap.size());
|
||||||
|
assertEquals(1, (int) resAsMap.get("first"));
|
||||||
|
assertEquals(1, (int) resAsMap.get("second"));
|
||||||
|
assertEquals(1, (int) resAsMap.get("third"));
|
||||||
|
assertEquals(3, (int) resAsMap.get("line"));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* helper method.
|
||||||
|
*/
|
||||||
|
private Map<String, Integer> getResultAsMap(String outputAsStr) {
|
||||||
|
Map<String, Integer> result = new HashMap<>();
|
||||||
|
for (String line : outputAsStr.split("\n")) {
|
||||||
|
String[] tokens = line.split("\t");
|
||||||
|
assertTrue("Not enough tokens in in string \" "
|
||||||
|
+ line + "\" from output \"" + outputAsStr + "\"",
|
||||||
|
tokens.length > 1);
|
||||||
|
result.put(tokens[0], Integer.parseInt(tokens[1]));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* helper method.
|
||||||
|
*/
|
||||||
|
private void writeStringToFile(Path path, String string) throws IOException {
|
||||||
|
FileContext fc = AliyunOSSTestUtils.createTestFileContext(conf);
|
||||||
|
try (FSDataOutputStream file = fc.create(path,
|
||||||
|
EnumSet.of(CreateFlag.CREATE))) {
|
||||||
|
file.write(string.getBytes());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* helper method.
|
||||||
|
*/
|
||||||
|
private String readStringFromFile(Path path) throws IOException {
|
||||||
|
try (FSDataInputStream in = fs.open(path)) {
|
||||||
|
long bytesLen = fs.getFileStatus(path).getLen();
|
||||||
|
byte[] buffer = new byte[(int) bytesLen];
|
||||||
|
IOUtils.readFully(in, buffer, 0, buffer.length);
|
||||||
|
return new String(buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
if (yarnCluster != null) {
|
||||||
|
yarnCluster.stop();
|
||||||
|
}
|
||||||
|
super.teardown();
|
||||||
|
IOUtils.closeStream(getFileSystem());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue