HADOOP-12875. [Azure Data Lake] Support for contract test and unit test cases. Contributed by Vishwajeet Dusane.

This commit is contained in:
Chris Nauroth 2016-06-16 09:46:05 -07:00
parent e14ee0d3b5
commit c9e71382a5
31 changed files with 2905 additions and 4 deletions

View File

@ -922,7 +922,7 @@ public class PrivateAzureDataLakeFileSystem extends SWebHdfsFileSystem {
size = maxBufferSize;
}
int equalBufferSplit = Math.max(Math.round(size / SIZE4MB), 1);
int equalBufferSplit = Math.max(size / SIZE4MB, 1);
int splitSize = Math.min(equalBufferSplit, maxConcurrentConnection);
return splitSize;
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl;
import org.apache.hadoop.fs.FileStatus;
import java.util.Random;
/**
* This class is responsible to provide generic test methods for mock up test
* to generate stub response for a network request.
*/
public final class TestADLResponseData {
private TestADLResponseData() {}
public static String getGetFileStatusJSONResponse(FileStatus status) {
String str = "{\"FileStatus\":{\"length\":" + status.getLen() + "," +
"\"pathSuffix\":\"\",\"type\":\"" + (status.isDirectory() ?
"DIRECTORY" :
"FILE") + "\"" +
",\"blockSize\":" + status.getBlockSize() + ",\"accessTime\":" +
status.getAccessTime() + ",\"modificationTime\":" + status
.getModificationTime() + "" +
",\"replication\":" + status.getReplication() + ",\"permission\":\""
+ status.getPermission() + "\",\"owner\":\"" + status.getOwner()
+ "\",\"group\":\"" + status.getGroup() + "\"}}";
return str;
}
public static String getGetFileStatusJSONResponse() {
return getGetFileStatusJSONResponse(4194304);
}
public static String getGetFileStatusJSONResponse(long length) {
String str = "{\"FileStatus\":{\"length\":" + length + "," +
"\"pathSuffix\":\"\",\"type\":\"FILE\",\"blockSize\":268435456," +
"\"accessTime\":1452103827023,\"modificationTime\":1452103827023," +
"\"replication\":0,\"permission\":\"777\"," +
"\"owner\":\"NotSupportYet\",\"group\":\"NotSupportYet\"}}";
return str;
}
public static String getListFileStatusJSONResponse(int dirSize) {
String list = "";
for (int i = 0; i < dirSize; ++i) {
list += "{\"length\":1024,\"pathSuffix\":\"" + java.util.UUID.randomUUID()
+ "\",\"type\":\"FILE\",\"blockSize\":268435456," +
"\"accessTime\":1452103878833," +
"\"modificationTime\":1452103879190,\"replication\":0," +
"\"permission\":\"777\",\"owner\":\"NotSupportYet\"," +
"\"group\":\"NotSupportYet\"},";
}
list = list.substring(0, list.length() - 1);
String str = "{\"FileStatuses\":{\"FileStatus\":[" + list + "]}}";
return str;
}
public static String getJSONResponse(boolean status) {
String str = "{\"boolean\":" + status + "}";
return str;
}
public static String getErrorIllegalArgumentExceptionJSONResponse() {
String str = "{\n" +
" \"RemoteException\":\n" +
" {\n" +
" \"exception\" : \"IllegalArgumentException\",\n" +
" \"javaClassName\": \"java.lang.IllegalArgumentException\",\n" +
" \"message\" : \"Bad Offset 0x83090015\"" +
" }\n" +
"}";
return str;
}
public static String getErrorInternalServerExceptionJSONResponse() {
String str = "{\n" +
" \"RemoteException\":\n" +
" {\n" +
" \"exception\" : \"RumtimeException\",\n" +
" \"javaClassName\": \"java.lang.RumtimeException\",\n" +
" \"message\" : \"Internal Server Error\"" +
" }\n" +
"}";
return str;
}
public static byte[] getRandomByteArrayData() {
return getRandomByteArrayData(4 * 1024 * 1024);
}
public static byte[] getRandomByteArrayData(int size) {
byte[] b = new byte[size];
Random rand = new Random();
rand.nextBytes(b);
return b;
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl;
import com.squareup.okhttp.mockwebserver.MockResponse;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.common.AdlMockWebServer;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.URISyntaxException;
/**
* This class is responsible for testing local getFileStatus implementation
* to cover correct parsing of successful and error JSON response
* from the server.
* Adls GetFileStatus operation is in detail covered in
* org.apache.hadoop.fs.adl.live testing package.
*/
public class TestGetFileStatus extends AdlMockWebServer {
@Test
public void getFileStatusReturnsAsExpected()
throws URISyntaxException, IOException {
getMockServer().enqueue(new MockResponse().setResponseCode(200)
.setBody(TestADLResponseData.getGetFileStatusJSONResponse()));
long startTime = Time.monotonicNow();
FileStatus fileStatus = getMockAdlFileSystem().getFileStatus(
new Path("/test1/test2"));
long endTime = Time.monotonicNow();
System.out.println("Time : " + (endTime - startTime));
Assert.assertTrue(fileStatus.isFile());
Assert.assertEquals(fileStatus.getPath().toString(),
"adl://" + getMockServer().getHostName() + ":"
+ getMockServer().getPort()
+ "/test1/test2");
Assert.assertEquals(fileStatus.getLen(), 4194304);
Assert.assertEquals(fileStatus.getBlockSize(), 268435456);
Assert.assertEquals(fileStatus.getReplication(), 0);
Assert.assertEquals(fileStatus.getPermission(), new FsPermission("777"));
Assert.assertEquals(fileStatus.getOwner(), "NotSupportYet");
Assert.assertEquals(fileStatus.getGroup(), "NotSupportYet");
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl;
import com.squareup.okhttp.mockwebserver.MockResponse;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.common.AdlMockWebServer;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
/**
* This class is responsible for testing local listStatus implementation to
* cover correct parsing of successful and error JSON response from the server.
* Adls ListStatus functionality is in detail covered in
* org.apache.hadoop.fs.adl.live testing package.
*/
public class TestListStatus extends AdlMockWebServer {
@Test
public void listStatusReturnsAsExpected() throws IOException {
getMockServer().enqueue(new MockResponse().setResponseCode(200)
.setBody(TestADLResponseData.getListFileStatusJSONResponse(10)));
long startTime = Time.monotonicNow();
FileStatus[] ls = getMockAdlFileSystem().listStatus(
new Path("/test1/test2"));
long endTime = Time.monotonicNow();
System.out.println("Time : " + (endTime - startTime));
Assert.assertEquals(ls.length, 10);
getMockServer().enqueue(new MockResponse().setResponseCode(200)
.setBody(TestADLResponseData.getListFileStatusJSONResponse(200)));
startTime = Time.monotonicNow();
ls = getMockAdlFileSystem().listStatus(new Path("/test1/test2"));
endTime = Time.monotonicNow();
System.out.println("Time : " + (endTime - startTime));
Assert.assertEquals(ls.length, 200);
getMockServer().enqueue(new MockResponse().setResponseCode(200)
.setBody(TestADLResponseData.getListFileStatusJSONResponse(2048)));
startTime = Time.monotonicNow();
ls = getMockAdlFileSystem().listStatus(new Path("/test1/test2"));
endTime = Time.monotonicNow();
System.out.println("Time : " + (endTime - startTime));
Assert.assertEquals(ls.length, 2048);
}
@Test
public void listStatusonFailure() throws IOException {
getMockServer().enqueue(new MockResponse().setResponseCode(403).setBody(
TestADLResponseData.getErrorIllegalArgumentExceptionJSONResponse()));
FileStatus[] ls = null;
long startTime = Time.monotonicNow();
try {
ls = getMockAdlFileSystem().listStatus(new Path("/test1/test2"));
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains("Bad Offset 0x83090015"));
}
long endTime = Time.monotonicNow();
System.out.println("Time : " + (endTime - startTime));
getMockServer().enqueue(new MockResponse().setResponseCode(500)
.setBody(
TestADLResponseData.getErrorInternalServerExceptionJSONResponse()));
startTime = Time.monotonicNow();
try {
ls = getMockAdlFileSystem().listStatus(new Path("/test1/test2"));
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains("Internal Server Error"));
}
endTime = Time.monotonicNow();
System.out.println("Time : " + (endTime - startTime));
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl;
/**
* This class overrides AdlFileSystem to change transport scheme to http instead
* of https to run against Mock Server.
*/
public class TestableAdlFileSystem extends AdlFileSystem {
@Override
protected String getTransportScheme() {
return "http";
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.adl.AdlFileSystem;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Utility class to configure real Adls storage to run Live test suite against.
*/
public final class AdlStorageConfiguration {
private AdlStorageConfiguration() {}
private static final String CONTRACT_ENABLE_KEY =
"dfs.adl.test.contract" + ".enable";
private static final String TEST_CONFIGURATION_FILE_NAME =
"contract-test-options.xml";
private static final String TEST_SUPPORTED_TEST_CONFIGURATION_FILE_NAME =
"adls.xml";
private static boolean isContractTestEnabled = false;
private static Configuration conf = null;
public static Configuration getConfiguration() {
Configuration localConf = new Configuration();
localConf.addResource(TEST_CONFIGURATION_FILE_NAME);
localConf.addResource(TEST_SUPPORTED_TEST_CONFIGURATION_FILE_NAME);
return localConf;
}
public static boolean isContractTestEnabled() {
if (conf == null) {
conf = getConfiguration();
}
isContractTestEnabled = conf.getBoolean(CONTRACT_ENABLE_KEY, false);
return isContractTestEnabled;
}
public static FileSystem createAdlStorageConnector()
throws URISyntaxException, IOException {
if (conf == null) {
conf = getConfiguration();
}
if(!isContractTestEnabled()) {
return null;
}
AdlFileSystem fileSystem = new AdlFileSystem();
fileSystem.initialize(new URI(conf.get("fs.defaultFS")), conf);
return fileSystem;
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import java.io.IOException;
import java.net.URISyntaxException;
/**
* Extension of AbstractFSContract representing a filesystem contract that
* a Adls filesystem implementation is expected implement.
*/
public class AdlStorageContract extends AbstractFSContract {
private FileSystem fs;
protected AdlStorageContract(Configuration conf) {
super(conf);
try {
fs = AdlStorageConfiguration.createAdlStorageConnector();
} catch (URISyntaxException e) {
throw new IllegalStateException("Can not initialize ADL FileSystem. "
+ "Please check fs.defaultFS property.", e);
} catch (IOException e) {
throw new IllegalStateException("Can not initialize ADL FileSystem.", e);
}
this.setConf(AdlStorageConfiguration.getConfiguration());
}
@Override
public String getScheme() {
return "adl";
}
@Override
public FileSystem getTestFileSystem() throws IOException {
return this.fs;
}
@Override
public Path getTestPath() {
Path path = new Path("/test");
return path;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractAppendTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Before;
import org.junit.Test;
/**
* Verify Adls APPEND semantics compliance with Hadoop.
*/
public class TestAdlContractAppendLive extends AbstractContractAppendTest {
@Override
protected AbstractFSContract createContract(Configuration configuration) {
return new AdlStorageContract(configuration);
}
@Override
@Test
public void testRenameFileBeingAppended() throws Throwable {
ContractTestUtils.unsupported("Skipping since renaming file in append "
+ "mode not supported in Adl");
}
@Before
@Override
public void setup() throws Exception {
org.junit.Assume
.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
super.setup();
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractConcatTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Before;
import org.junit.Test;
/**
* Verify Adls CONCAT semantics compliance with Hadoop.
*/
public class TestAdlContractConcatLive extends AbstractContractConcatTest {
@Override
protected AbstractFSContract createContract(Configuration configuration) {
return new AdlStorageContract(configuration);
}
@Before
@Override
public void setup() throws Exception {
org.junit.Assume
.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
super.setup();
}
@Test
public void testConcatMissingTarget() throws Throwable {
ContractTestUtils.unsupported("BUG : Adl to support expectation from "
+ "concat on missing targets.");
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Before;
import org.junit.Test;
/**
* Verify Adls CREATE semantics compliance with Hadoop.
*/
public class TestAdlContractCreateLive extends AbstractContractCreateTest {
@Override
protected AbstractFSContract createContract(Configuration configuration) {
return new AdlStorageContract(configuration);
}
@Before
@Override
public void setup() throws Exception {
org.junit.Assume
.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
super.setup();
}
@Test
public void testOverwriteEmptyDirectory() throws Throwable {
ContractTestUtils
.unsupported("BUG : Adl to support override empty " + "directory.");
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.Before;
/**
* Verify Adls DELETE semantics compliance with Hadoop.
*/
public class TestAdlContractDeleteLive extends AbstractContractDeleteTest {
@Override
protected AbstractFSContract createContract(Configuration configuration) {
return new AdlStorageContract(configuration);
}
@Before
@Override
public void setup() throws Exception {
org.junit.Assume
.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
super.setup();
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Before;
import org.junit.Test;
/**
* Verify Adls MKDIR semantics compliance with Hadoop.
*/
public class TestAdlContractMkdirLive extends AbstractContractMkdirTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AdlStorageContract(conf);
}
@Before
@Override
public void setup() throws Exception {
org.junit.Assume
.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
super.setup();
}
@Test
public void testMkdirOverParentFile() throws Throwable {
ContractTestUtils.unsupported("Not supported by Adl");
}
@Test
public void testNoMkdirOverFile() throws Throwable {
ContractTestUtils.unsupported("Not supported by Adl");
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractOpenTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.Before;
/**
* Verify Adls OPEN/READ semantics compliance with Hadoop.
*/
public class TestAdlContractOpenLive extends AbstractContractOpenTest {
@Override
protected AbstractFSContract createContract(Configuration configuration) {
return new AdlStorageContract(configuration);
}
@Before
@Override
public void setup() throws Exception {
org.junit.Assume
.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
super.setup();
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Before;
import org.junit.Test;
/**
* Verify Adls RENAME semantics compliance with Hadoop.
*/
public class TestAdlContractRenameLive extends AbstractContractRenameTest {
@Override
protected AbstractFSContract createContract(Configuration configuration) {
return new AdlStorageContract(configuration);
}
@Before
@Override
public void setup() throws Exception {
org.junit.Assume
.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
super.setup();
}
@Test
public void testRenameFileOverExistingFile() throws Throwable {
ContractTestUtils
.unsupported("BUG : Adl to support full complete POSIX" + "behaviour");
}
@Test
public void testRenameFileNonexistentDir() throws Throwable {
ContractTestUtils
.unsupported("BUG : Adl to support create dir is not " + "exist");
}
@Test
public void testRenameWithNonEmptySubDir() throws Throwable {
ContractTestUtils.unsupported("BUG : Adl to support non empty dir move.");
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Before;
import org.junit.Test;
/**
* Verify Adls root level operation support.
*/
public class TestAdlContractRootDirLive
extends AbstractContractRootDirectoryTest {
@Override
protected AbstractFSContract createContract(Configuration configuration) {
return new AdlStorageContract(configuration);
}
@Before
@Override
public void setup() throws Exception {
org.junit.Assume
.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
super.setup();
}
@Test
public void testRmNonEmptyRootDirNonRecursive() throws Throwable {
ContractTestUtils.unsupported(
"BUG : Adl should throw exception instred " + "of returning false.");
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.junit.Before;
/**
* Verify Adls OPEN/READ seek operation support.
*/
public class TestAdlContractSeekLive extends AbstractContractSeekTest {
@Override
protected AbstractFSContract createContract(Configuration configuration) {
return new AdlStorageContract(configuration);
}
@Before
@Override
public void setup() throws Exception {
org.junit.Assume
.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
super.setup();
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Random;
/**
* Verify different data segment size writes ensure the integrity and
* order of the data.
*/
public class TestAdlDifferentSizeWritesLive {
public static byte[] getRandomByteArrayData(int size) {
byte[] b = new byte[size];
Random rand = new Random();
rand.nextBytes(b);
return b;
}
@Before
public void setup() throws Exception {
org.junit.Assume
.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
}
@Test
public void testSmallDataWrites() throws IOException {
testDataIntegrity(4 * 1024 * 1024, 1 * 1024);
testDataIntegrity(4 * 1024 * 1024, 7 * 1024);
testDataIntegrity(4 * 1024 * 1024, 10);
testDataIntegrity(2 * 1024 * 1024, 10);
testDataIntegrity(1 * 1024 * 1024, 10);
testDataIntegrity(100, 1);
}
@Test
public void testMediumDataWrites() throws IOException {
testDataIntegrity(4 * 1024 * 1024, 1 * 1024 * 1024);
testDataIntegrity(7 * 1024 * 1024, 2 * 1024 * 1024);
testDataIntegrity(9 * 1024 * 1024, 2 * 1024 * 1024);
testDataIntegrity(10 * 1024 * 1024, 3 * 1024 * 1024);
}
private void testDataIntegrity(int totalSize, int chunkSize)
throws IOException {
Path path = new Path("/test/dataIntegrityCheck");
FileSystem fs = null;
try {
fs = AdlStorageConfiguration.createAdlStorageConnector();
} catch (URISyntaxException e) {
throw new IllegalStateException("Can not initialize ADL FileSystem. "
+ "Please check fs.defaultFS property.", e);
}
byte[] expectedData = getRandomByteArrayData(totalSize);
FSDataOutputStream out = fs.create(path, true);
int iteration = totalSize / chunkSize;
int reminderIteration = totalSize % chunkSize;
int offset = 0;
for (int i = 0; i < iteration; ++i) {
out.write(expectedData, offset, chunkSize);
offset += chunkSize;
}
out.write(expectedData, offset, reminderIteration);
out.close();
byte[] actualData = new byte[totalSize];
FSDataInputStream in = fs.open(path);
in.readFully(0, actualData);
in.close();
Assert.assertArrayEquals(expectedData, actualData);
Assert.assertTrue(fs.delete(path, true));
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import java.io.IOException;
/**
* Verify Adls adhere to Hadoop file system semantics.
*/
public class TestAdlFileSystemContractLive extends FileSystemContractBaseTest {
private FileSystem adlStore;
@Override
protected void setUp() throws Exception {
adlStore = AdlStorageConfiguration.createAdlStorageConnector();
if (AdlStorageConfiguration.isContractTestEnabled()) {
fs = adlStore;
}
}
@Override
protected void tearDown() throws Exception {
if (AdlStorageConfiguration.isContractTestEnabled()) {
cleanup();
adlStore = null;
fs = null;
}
}
private void cleanup() throws IOException {
adlStore.delete(new Path("/test"), true);
}
@Override
protected void runTest() throws Throwable {
if (AdlStorageConfiguration.isContractTestEnabled()) {
super.runTest();
}
}
public void testGetFileStatus() throws IOException {
if (!AdlStorageConfiguration.isContractTestEnabled()) {
return;
}
Path testPath = new Path("/test/adltest");
if (adlStore.exists(testPath)) {
adlStore.delete(testPath, false);
}
adlStore.create(testPath).close();
assertTrue(adlStore.delete(testPath, false));
}
/**
* The following tests are failing on Azure Data Lake and the Azure Data Lake
* file system code needs to be modified to make them pass.
* A separate work item has been opened for this.
*/
@Test
@Override
public void testMkdirsFailsForSubdirectoryOfExistingFile() throws Exception {
// BUG : Adl should return exception instead of false.
}
@Test
@Override
public void testMkdirsWithUmask() throws Exception {
// Support under implementation in Adl
}
@Test
@Override
public void testMoveFileUnderParent() throws Exception {
// BUG: Adl server should return expected status code.
}
@Test
@Override
public void testRenameFileToSelf() throws Exception {
// BUG: Adl server should return expected status code.
}
@Test
@Override
public void testRenameToDirWithSamePrefixAllowed() throws Exception {
// BUG: Adl server should return expected status code.
}
}

View File

@ -0,0 +1,342 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Random;
import java.util.UUID;
/**
* Verify different data segment size read from the file to ensure the
* integrity and order of the data over
* BufferManger and BatchByteArrayInputStream implementation.
*/
public class TestAdlReadLive {
private String expectedData = "1234567890abcdefghijklmnopqrstuvwxyz";
@Before
public void setup() throws Exception {
org.junit.Assume
.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
}
private FileSystem getFileSystem() throws IOException, URISyntaxException {
return AdlStorageConfiguration.createAdlStorageConnector();
}
private void setupFile(Path path) throws IOException, URISyntaxException {
setupFile(path, expectedData);
}
private void setupFile(Path path, String data)
throws IOException, URISyntaxException {
expectedData = data;
FileSystem fs = getFileSystem();
fs.delete(path, true);
FSDataOutputStream fdis = fs.create(path);
fdis.writeBytes(expectedData);
fdis.close();
fs.listStatus(path.getParent());
long actualLen = fs.getFileStatus(path).getLen();
long expectedLen = expectedData.length();
System.out.println(
" Length of file : " + fs.getFileStatus(path).getLen() + " " + fs
.getUri());
Assert.assertEquals(expectedLen, actualLen);
}
@Test
public void
testOpenReadMoreThanAvailableBufferCrashFixIndexOutOfBoundsException()
throws Throwable {
Path path = new Path("/test1");
FileSystem fs = getFileSystem();
setupFile(path);
if (fs.exists(path)) {
Assert.assertTrue(fs.delete(path, true));
}
FSDataOutputStream outputStream = fs.create(path);
final byte[] data = new byte[24 * 1024 * 1024];
Random ran = new Random();
ran.nextBytes(data);
outputStream.write(data);
FSDataInputStream bb = fs.open(path);
byte[] expected = new byte[4 * 1024 * 1024];
bb.read();
bb.readFully(16711581, expected, 33,
65640); // BugFix : Was causing crash IndexOutOfBoundsException
bb.seek(16711581);
bb.readFully(16711576, expected, 33, 65640);
bb.readFully(16711578, expected, 33, 65640);
bb.readFully(16711580, expected, 33, 65640);
bb.readFully(16711576, expected, 0, expected.length);
bb.seek(0);
expected = new byte[134144];
while (bb.read() != -1){
continue;
}
bb.readFully(0, data, 0, data.length);
}
@Test
public void readNullData() throws IOException, URISyntaxException {
String data = "SPL \u0001Lorg.apache.hadoop.examples.terasort"
+ ".TeraGen$RangeInputFormat$RangeInputSplit \u008DLK@Lorg.apache"
+ ".hadoop.examples.terasort"
+ ".TeraGen$RangeInputFormat$RangeInputSplit\u008DLK@\u008DLK@";
Path path = new Path("/test4");
FileSystem fs = this.getFileSystem();
setupFile(path, data);
FSDataInputStream bb = fs.open(path);
int i = 0;
String actualData = new String();
System.out.println("Data Length :" + expectedData.length());
byte[] arr = new byte[data.length()];
bb.readFully(0, arr);
actualData = new String(arr);
System.out.println(" Data : " + actualData);
Assert.assertEquals(actualData.length(), expectedData.length());
arr = new byte[data.length() - 7];
bb.readFully(7, arr);
actualData = new String(arr);
Assert.assertEquals(actualData.length(), expectedData.length() - 7);
bb.close();
}
@Test
public void readTest() throws IOException, URISyntaxException {
Path path = new Path("/test4");
FileSystem fs = this.getFileSystem();
setupFile(path);
FSDataInputStream bb = fs.open(path);
int i = 0;
String actualData = new String();
while (true) {
int c = bb.read();
if (c < 0) {
break;
}
actualData += (char) c;
}
byte[] b = new byte[100];
System.out.println(bb.read(b, 9, 91));
System.out.println(bb.read());
System.out.println(bb.read());
System.out.println(bb.read());
System.out.println(bb.read());
System.out.println(bb.read());
System.out.println(bb.read());
bb.close();
Assert.assertEquals(actualData, expectedData);
for (int j = 0; j < 100; ++j) {
fs = this.getFileSystem();
fs.exists(new Path("/test" + j));
}
}
@Test
public void readByteTest() throws IOException, URISyntaxException {
Path path = new Path("/test3");
FileSystem fs = this.getFileSystem();
setupFile(path);
FSDataInputStream bb = fs.open(path);
int i = 0;
byte[] data = new byte[expectedData.length()];
int readByte = bb.read(data);
bb.close();
Assert.assertEquals(readByte, expectedData.length());
Assert.assertEquals(new String(data), expectedData);
}
@Test
public void readByteFullyTest() throws IOException, URISyntaxException {
Path path = new Path("/test2");
FileSystem fs = this.getFileSystem();
setupFile(path);
FSDataInputStream bb = fs.open(path);
int i = 0;
byte[] data = new byte[expectedData.length()];
bb.readFully(data);
bb.close();
Assert.assertEquals(new String(data), expectedData);
bb = fs.open(path);
bb.readFully(data, 0, data.length);
bb.close();
Assert.assertEquals(new String(data), expectedData);
}
@Test
public void readCombinationTest() throws IOException, URISyntaxException {
Path path = new Path("/test1");
FileSystem fs = this.getFileSystem();
setupFile(path);
FSDataInputStream bb = fs.open(path);
int i = 0;
byte[] data = new byte[5];
int readByte = bb.read(data);
Assert.assertEquals(new String(data), expectedData.substring(0, 5));
bb.readFully(data, 0, data.length);
Assert.assertEquals(new String(data), expectedData.substring(5, 10));
bb.close();
bb = fs.open(path);
bb.readFully(5, data, 0, data.length);
Assert.assertEquals(new String(data), expectedData.substring(5, 10));
bb.read(data);
Assert.assertEquals(new String(data), expectedData.substring(0, 5));
bb.close();
bb = fs.open(path);
bb.read(new byte[100]);
bb.close();
}
@Test
public void readMultiSeekTest() throws IOException, URISyntaxException {
final Path path = new Path(
"/delete14/" + UUID.randomUUID().toString().replaceAll("-", ""));
FileSystem fs = this.getFileSystem();
final byte[] actualData = new byte[3267397];
Random ran = new Random();
ran.nextBytes(actualData);
byte[] testData = null;
fs.delete(path, true);
FSDataOutputStream os = fs.create(path);
os.write(actualData);
os.close();
FSDataInputStream bb = fs.open(path);
byte[] data = new byte[16384];
bb.readFully(3251013, data, 0, 16384);
testData = new byte[16384];
System.arraycopy(actualData, 3251013, testData, 0, 16384);
Assert.assertArrayEquals(testData, data);
data = new byte[1921];
bb.readFully(3265476, data, 0, 1921);
testData = new byte[1921];
System.arraycopy(actualData, 3265476, testData, 0, 1921);
Assert.assertArrayEquals(testData, data);
data = new byte[3267394];
bb.readFully(3, data, 0, 3267394);
testData = new byte[3267394];
System.arraycopy(actualData, 3, testData, 0, 3267394);
Assert.assertArrayEquals(testData, data);
data = new byte[3266943];
bb.readFully(454, data, 0, 3266943);
testData = new byte[3266943];
System.arraycopy(actualData, 454, testData, 0, 3266943);
Assert.assertArrayEquals(testData, data);
data = new byte[3265320];
bb.readFully(2077, data, 0, 3265320);
testData = new byte[3265320];
System.arraycopy(actualData, 2077, testData, 0, 3265320);
Assert.assertArrayEquals(testData, data);
bb.close();
bb = fs.open(path);
data = new byte[3263262];
bb.readFully(4135, data, 0, 3263262);
testData = new byte[3263262];
System.arraycopy(actualData, 4135, testData, 0, 3263262);
Assert.assertArrayEquals(testData, data);
data = new byte[2992591];
bb.readFully(274806, data, 0, 2992591);
testData = new byte[2992591];
System.arraycopy(actualData, 274806, testData, 0, 2992591);
Assert.assertArrayEquals(testData, data);
data = new byte[1985665];
bb.readFully(1281732, data, 0, 1985665);
testData = new byte[1985665];
System.arraycopy(actualData, 1281732, testData, 0, 1985665);
Assert.assertArrayEquals(testData, data);
data = new byte[3267394];
try {
bb.readFully(2420207, data, 0, 3267394);
Assert.fail("EOF expected");
} catch (IOException e) {
}
bb.close();
}
@Test
public void allASCIICharTest() throws IOException, URISyntaxException {
final Path path = new Path(
"/delete14/" + UUID.randomUUID().toString().replaceAll("-", ""));
FileSystem fs = this.getFileSystem();
final byte[] actualData = new byte[127];
for (byte i = 0; i < 127; ++i) {
actualData[i] = i;
}
fs.delete(path, true);
FSDataOutputStream os = fs.create(path);
os.write(actualData);
os.close();
FSDataInputStream bb = fs.open(path);
byte[] data = new byte[127];
bb.readFully(0, data, 0, data.length);
bb.close();
Assert.assertArrayEquals(data, actualData);
bb = fs.open(path);
int byteRead = 1;
while (bb.read() != -1) {
byteRead++;
}
bb.seek(0);
byteRead = 1;
while (bb.read() != -1) {
byteRead++;
}
bb.close();
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegateToFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextCreateMkdirBaseTest;
import org.apache.hadoop.fs.FileContextTestHelper;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Assume;
import org.junit.BeforeClass;
import java.io.File;
import java.net.URI;
/**
* Verify Adls file system adhere to Hadoop file system contract using bunch of
* available test in FileContextCreateMkdirBaseTest.
*/
public class TestAdlWebHdfsFileContextCreateMkdirLive
extends FileContextCreateMkdirBaseTest {
private static final String KEY_FILE_SYSTEM = "fs.defaultFS";
@Override
public void setUp() throws Exception {
Configuration conf = AdlStorageConfiguration.getConfiguration();
String fileSystem = conf.get(KEY_FILE_SYSTEM);
if (fileSystem == null || fileSystem.trim().length() == 0) {
throw new Exception("Default file system not configured.");
}
URI uri = new URI(fileSystem);
FileSystem fs = AdlStorageConfiguration.createAdlStorageConnector();
fc = FileContext.getFileContext(
new DelegateToFileSystem(uri, fs, conf, fs.getScheme(), false) {
}, conf);
super.setUp();
}
/**
* Required to override since the getRandmizedTestDir on Windows generates
* absolute path of the local file path which contains ":" character.
* Example file system path generated is "adl://<FileSystem Path>/d:/a/b/c
*
* Adls does not support : character in the path hence overriding to remove
* unsupported character from the path.
*
* @return FileContextTestHelper
*/
@Override
protected FileContextTestHelper createFileContextHelper() {
return new FileContextTestHelper(new File(
RandomStringUtils.randomAlphanumeric(10))
.getAbsolutePath().replaceAll(":", ""));
}
@BeforeClass
public static void skipTestCheck() {
Assume.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl.live;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegateToFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextMainOperationsBaseTest;
import org.apache.hadoop.fs.FileContextTestHelper;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Assume;
import org.junit.BeforeClass;
import java.io.File;
import java.io.IOException;
import java.net.URI;
/**
* Verify Adls file system adhere to Hadoop file system contract using bunch of
* available test in FileContextMainOperationsBaseTest.
*/
public class TestAdlWebHdfsFileContextMainOperationsLive
extends FileContextMainOperationsBaseTest {
private static final String KEY_FILE_SYSTEM = "fs.defaultFS";
@BeforeClass
public static void skipTestCheck() {
Assume.assumeTrue(AdlStorageConfiguration.isContractTestEnabled());
}
@Override
public void setUp() throws Exception {
Configuration conf = AdlStorageConfiguration.getConfiguration();
String fileSystem = conf.get(KEY_FILE_SYSTEM);
if (fileSystem == null || fileSystem.trim().length() == 0) {
throw new Exception("Default file system not configured.");
}
URI uri = new URI(fileSystem);
FileSystem fs = AdlStorageConfiguration.createAdlStorageConnector();
fc = FileContext.getFileContext(
new DelegateToFileSystem(uri, fs, conf, fs.getScheme(), false) {
}, conf);
super.setUp();
}
/**
* Required to override since the getRandmizedTestDir on Windows generates
* absolute path of the local file path which contains ":" character.
* Example file system path generated is "adl://<FileSystem Path>/d:/a/b/c
*
* Adls does not support : character in the path hence overriding to remove
* unsupported character from the path.
*
* @return FileContextTestHelper
*/
@Override
protected FileContextTestHelper createFileContextHelper() {
return new FileContextTestHelper(
new File(RandomStringUtils.randomAlphanumeric(10)).getAbsolutePath()
.replaceAll(":", ""));
}
@Override
protected boolean listCorruptedBlocksSupported() {
return false;
}
@Override
public void testUnsupportedSymlink() throws IOException {
Assume.assumeTrue("Symbolic link are not supported by Adls", false);
}
/**
* In case this test is causing failure due to
* java.lang.RuntimeException: java.io.FileNotFoundException: Hadoop bin
* directory does not exist: <path>\hadoop-common-project
* \hadoop-common\target\bin -see https://wiki.apache
* .org/hadoop/WindowsProblems. then do build the hadoop dependencies
* otherwise mark this test as skip.
*/
@Override
public void testWorkingDirectory() throws Exception {
super.testWorkingDirectory();
}
}

View File

@ -20,17 +20,19 @@ package org.apache.hadoop.fs.adl.oauth2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.web.oauth2.AccessTokenProvider;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_KEY;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Verify cache behavior of ConfRefreshTokenBasedAccessTokenProvider instances.

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.common;
import com.eclipsesource.json.JsonObject;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.adl.TestableAdlFileSystem;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.web.oauth2.ConfCredentialBasedAccessTokenProvider;
import org.apache.hadoop.hdfs.web.oauth2.CredentialBasedAccessTokenProvider;
import org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
/**
* Mock server to simulate Adls backend calls. This infrastructure is expandable
* to override expected server response based on the derived test functionality.
* Common functionality to generate token information before request is send to
* adls backend is also managed within AdlMockWebServer implementation.
*/
public class AdlMockWebServer {
// Create a MockWebServer. These are lean enough that you can create a new
// instance for every unit test.
private MockWebServer server = null;
private TestableAdlFileSystem fs = null;
private int port = 0;
private Configuration conf = new Configuration();
public MockWebServer getMockServer() {
return server;
}
public TestableAdlFileSystem getMockAdlFileSystem() {
return fs;
}
public int getPort() {
return port;
}
public Configuration getConf() {
return conf;
}
public static MockResponse getTokenResponse() {
JsonObject jsonObject = new JsonObject()
.set(OAuth2Constants.EXPIRES_IN, "0987654321")
.set("token_type", "bearer").set(OAuth2Constants.ACCESS_TOKEN, "123");
MockResponse oauth2Response = new MockResponse();
oauth2Response.addHeader("Content-Type", "application/json");
oauth2Response.setResponseCode(200);
oauth2Response.setBody(jsonObject.toString());
return oauth2Response;
}
@Before
public void preTestSetup() throws IOException, URISyntaxException {
server = new MockWebServer();
server.enqueue(getTokenResponse());
// Start the server.
server.start();
// Ask the server for its URL. You'll need this to make HTTP requests.
URL baseUrl = server.getUrl("");
port = baseUrl.getPort();
// Exercise your application code, which should make those HTTP requests.
// Responses are returned in the same order that they are enqueued.
fs = new TestableAdlFileSystem();
conf.set(HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY, "MY_CLIENTID");
conf.set(HdfsClientConfigKeys.ACCESS_TOKEN_PROVIDER_KEY,
ConfCredentialBasedAccessTokenProvider.class.getName());
conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY, "true");
conf.set(HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY, "http://localhost:" +
port + "/refresh");
conf.set(CredentialBasedAccessTokenProvider.OAUTH_CREDENTIAL_KEY,
"credential");
URI uri = new URI("adl://localhost:" + port);
fs.initialize(uri, conf);
}
@After
public void postTestSetup() throws IOException {
fs.close();
server.shutdown();
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.common;
import com.squareup.okhttp.mockwebserver.MockResponse;
import java.util.ArrayList;
/**
* Supporting class to hold expected MockResponse object along with parameters
* for validation in test methods.
*/
public class ExpectedResponse {
private MockResponse response;
private ArrayList<String> expectedQueryParameters = new ArrayList<String>();
private int expectedBodySize;
private String httpRequestType;
public int getExpectedBodySize() {
return expectedBodySize;
}
public String getHttpRequestType() {
return httpRequestType;
}
public ArrayList<String> getExpectedQueryParameters() {
return expectedQueryParameters;
}
public MockResponse getResponse() {
return response;
}
ExpectedResponse set(MockResponse mockResponse) {
this.response = mockResponse;
return this;
}
ExpectedResponse addExpectedQueryParam(String param) {
expectedQueryParameters.add(param);
return this;
}
ExpectedResponse addExpectedBodySize(int bodySize) {
this.expectedBodySize = bodySize;
return this;
}
ExpectedResponse addExpectedHttpRequestType(String expectedHttpRequestType) {
this.httpRequestType = expectedHttpRequestType;
return this;
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.common;
import com.squareup.okhttp.mockwebserver.Dispatcher;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import okio.Buffer;
import org.apache.hadoop.fs.adl.TestADLResponseData;
import java.util.ArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Supporting class for mock test to validate Adls read operation using
* BufferManager.java and BatchByteArrayInputStream implementation.
*/
public class TestDataForRead {
private byte[] actualData;
private ArrayList<ExpectedResponse> responses;
private Dispatcher dispatcher;
private int intensityOfTest;
private boolean checkOfNoOfCalls;
private int expectedNoNetworkCall;
public TestDataForRead(final byte[] actualData, int expectedNoNetworkCall,
int intensityOfTest, boolean checkOfNoOfCalls) {
this.checkOfNoOfCalls = checkOfNoOfCalls;
this.actualData = actualData;
responses = new ArrayList<ExpectedResponse>();
this.expectedNoNetworkCall = expectedNoNetworkCall;
this.intensityOfTest = intensityOfTest;
dispatcher = new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest recordedRequest)
throws InterruptedException {
if (recordedRequest.getPath().equals("/refresh")) {
return AdlMockWebServer.getTokenResponse();
}
if (recordedRequest.getRequestLine().contains("op=GETFILESTATUS")) {
return new MockResponse().setResponseCode(200).setBody(
TestADLResponseData
.getGetFileStatusJSONResponse(actualData.length));
}
if (recordedRequest.getRequestLine().contains("op=OPEN")) {
String request = recordedRequest.getRequestLine();
int offset = 0;
int byteCount = 0;
Pattern pattern = Pattern.compile("offset=([0-9]+)");
Matcher matcher = pattern.matcher(request);
if (matcher.find()) {
System.out.println(matcher.group(1));
offset = Integer.parseInt(matcher.group(1));
}
pattern = Pattern.compile("length=([0-9]+)");
matcher = pattern.matcher(request);
if (matcher.find()) {
System.out.println(matcher.group(1));
byteCount = Integer.parseInt(matcher.group(1));
}
Buffer buf = new Buffer();
buf.write(actualData, offset, byteCount);
return new MockResponse().setResponseCode(200)
.setChunkedBody(buf, 4 * 1024 * 1024);
}
return new MockResponse().setBody("NOT SUPPORTED").setResponseCode(501);
}
};
}
public boolean isCheckOfNoOfCalls() {
return checkOfNoOfCalls;
}
public int getExpectedNoNetworkCall() {
return expectedNoNetworkCall;
}
public int getIntensityOfTest() {
return intensityOfTest;
}
public byte[] getActualData() {
return actualData;
}
public ArrayList<ExpectedResponse> getResponses() {
return responses;
}
public Dispatcher getDispatcher() {
return dispatcher;
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdfs.web;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.adl.TestADLResponseData;
import org.apache.hadoop.fs.common.AdlMockWebServer;
import org.apache.hadoop.fs.common.TestDataForRead;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
/**
* This class is responsible for stress positional reads vs number of network
* calls required by to fetch the amount of data. Test does ensure the data
* integrity and order of the data is maintained. This tests are meant to test
* BufferManager.java and BatchByteArrayInputStream implementation.
*/
@RunWith(Parameterized.class)
public class TestAdlRead extends AdlMockWebServer {
// Keeping timeout of 1 hour to ensure the test does complete and should
// not terminate due to high backend latency.
@Rule
public Timeout globalTimeout = new Timeout(60 * 60000);
private TestDataForRead testData;
public TestAdlRead(TestDataForRead testData) {
this.testData = testData;
getConf().set("adl.feature.override.readahead.max.buffersize", "8192");
getConf().set("adl.feature.override.readahead.max.concurrent.connection",
"1");
}
@Parameterized.Parameters(name = "{index}")
public static Collection testDataForReadOperation() {
return Arrays.asList(new Object[][] {
//--------------------------
// Test Data
//--------------------------
{new TestDataForRead("Hello World".getBytes(), 3, 1000, true)},
{new TestDataForRead(
("the problem you appear to be wrestling with is that this doesn't "
+ "display very well. ").getBytes(), 3, 1000, true)},
{new TestDataForRead(
("Chinese Indonesians (Indonesian: Orang Tionghoa-Indonesia; "
+ "Chinese: "
+ "trad ???????, simp ???????, pin Y<>nd<6E>n<EFBFBD>x?y<> Hu<48>r<EFBFBD>n), are "
+ "Indonesians descended from various Chinese ethnic groups, "
+ "particularly Han.").getBytes(), 3, 1000, true)},
{new TestDataForRead(
TestADLResponseData.getRandomByteArrayData(5 * 1024), 3, 1000,
true)}, {new TestDataForRead(
TestADLResponseData.getRandomByteArrayData(1 * 1024), 3, 50, true)},
{new TestDataForRead(
TestADLResponseData.getRandomByteArrayData(8 * 1024), 3, 10, true)},
{new TestDataForRead(
TestADLResponseData.getRandomByteArrayData(32 * 1024), 6, 10,
false)}, {new TestDataForRead(
TestADLResponseData.getRandomByteArrayData(48 * 1024), 8, 10, false)}});
}
@After
@Before
public void cleanReadBuffer() {
BufferManager.getInstance().clear();
}
@Test
public void testEntireBytes() throws IOException, InterruptedException {
getMockServer().setDispatcher(testData.getDispatcher());
FSDataInputStream in = getMockAdlFileSystem().open(new Path("/test"));
byte[] expectedData = new byte[testData.getActualData().length];
Assert.assertEquals(in.read(expectedData), expectedData.length);
Assert.assertArrayEquals(expectedData, testData.getActualData());
in.close();
if (testData.isCheckOfNoOfCalls()) {
Assert.assertEquals(testData.getExpectedNoNetworkCall(),
getMockServer().getRequestCount());
}
}
@Test
public void testSeekOperation() throws IOException, InterruptedException {
getMockServer().setDispatcher(testData.getDispatcher());
FSDataInputStream in = getMockAdlFileSystem().open(new Path("/test"));
Random random = new Random();
for (int i = 0; i < 1000; ++i) {
int position = random.nextInt(testData.getActualData().length);
in.seek(position);
Assert.assertEquals(in.getPos(), position);
Assert.assertEquals(in.read(), testData.getActualData()[position] & 0xFF);
}
in.close();
if (testData.isCheckOfNoOfCalls()) {
Assert.assertEquals(testData.getExpectedNoNetworkCall(),
getMockServer().getRequestCount());
}
}
@Test
public void testReadServerCalls() throws IOException, InterruptedException {
getMockServer().setDispatcher(testData.getDispatcher());
FSDataInputStream in = getMockAdlFileSystem().open(new Path("/test"));
byte[] expectedData = new byte[testData.getActualData().length];
in.readFully(expectedData);
Assert.assertArrayEquals(expectedData, testData.getActualData());
Assert.assertEquals(testData.getExpectedNoNetworkCall(),
getMockServer().getRequestCount());
in.close();
}
@Test
public void testReadFully() throws IOException, InterruptedException {
getMockServer().setDispatcher(testData.getDispatcher());
FSDataInputStream in = getMockAdlFileSystem().open(new Path("/test"));
byte[] expectedData = new byte[testData.getActualData().length];
in.readFully(expectedData);
Assert.assertArrayEquals(expectedData, testData.getActualData());
in.readFully(0, expectedData);
Assert.assertArrayEquals(expectedData, testData.getActualData());
in.seek(0);
in.readFully(expectedData, 0, expectedData.length);
Assert.assertArrayEquals(expectedData, testData.getActualData());
in.close();
if (testData.isCheckOfNoOfCalls()) {
Assert.assertEquals(testData.getExpectedNoNetworkCall(),
getMockServer().getRequestCount());
}
}
@Test
public void testRandomPositionalReadUsingReadFully()
throws IOException, InterruptedException {
getMockServer().setDispatcher(testData.getDispatcher());
FSDataInputStream in = getMockAdlFileSystem().open(new Path("/test"));
ByteArrayInputStream actualData = new ByteArrayInputStream(
testData.getActualData());
Random random = new Random();
for (int i = 0; i < testData.getIntensityOfTest(); ++i) {
int offset = random.nextInt(testData.getActualData().length);
int length = testData.getActualData().length - offset;
byte[] expectedData = new byte[length];
byte[] actualDataSubset = new byte[length];
actualData.reset();
actualData.skip(offset);
actualData.read(actualDataSubset, 0, length);
in.readFully(offset, expectedData, 0, length);
Assert.assertArrayEquals(expectedData, actualDataSubset);
}
for (int i = 0; i < testData.getIntensityOfTest(); ++i) {
int offset = random.nextInt(testData.getActualData().length);
int length = random.nextInt(testData.getActualData().length - offset);
byte[] expectedData = new byte[length];
byte[] actualDataSubset = new byte[length];
actualData.reset();
actualData.skip(offset);
actualData.read(actualDataSubset, 0, length);
in.readFully(offset, expectedData, 0, length);
Assert.assertArrayEquals(expectedData, actualDataSubset);
}
in.close();
if (testData.isCheckOfNoOfCalls()) {
Assert.assertEquals(testData.getExpectedNoNetworkCall(),
getMockServer().getRequestCount());
}
}
}

View File

@ -0,0 +1,306 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdfs.web;
import com.squareup.okhttp.mockwebserver.Dispatcher;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import okio.Buffer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.adl.TestADLResponseData;
import org.apache.hadoop.fs.common.AdlMockWebServer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* This class is responsible for testing multiple threads trying to access same
* or multiple files from the offset. This tests are meant to test
* BufferManager.java and BatchByteArrayInputStream implementation.
*/
@RunWith(Parameterized.class)
public class TestConcurrentDataReadOperations extends AdlMockWebServer {
private static FSDataInputStream commonHandle = null;
private static Object lock = new Object();
private int concurrencyLevel;
public TestConcurrentDataReadOperations(int concurrencyLevel) {
this.concurrencyLevel = concurrencyLevel;
getConf().set("adl.feature.override.readahead.max.buffersize", "102400");
getConf().set("adl.feature.override.readahead.max.concurrent.connection",
"1");
}
@Parameterized.Parameters(name = "{index}")
public static Collection testDataNumberOfConcurrentRun() {
return Arrays.asList(new Object[][] {{1}, {2}, {3}, {4}, {5}});
}
public static byte[] getRandomByteArrayData(int size) {
byte[] b = new byte[size];
Random rand = new Random();
rand.nextBytes(b);
return b;
}
private void setDispatcher(final ArrayList<CreateTestData> testData) {
getMockServer().setDispatcher(new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest recordedRequest)
throws InterruptedException {
if (recordedRequest.getPath().equals("/refresh")) {
return AdlMockWebServer.getTokenResponse();
}
CreateTestData currentRequest = null;
for (CreateTestData local : testData) {
if (recordedRequest.getPath().contains(local.path.toString())) {
currentRequest = local;
break;
}
}
if (currentRequest == null) {
new MockResponse().setBody("Request data not found")
.setResponseCode(501);
}
if (recordedRequest.getRequestLine().contains("op=GETFILESTATUS")) {
return new MockResponse().setResponseCode(200).setBody(
TestADLResponseData
.getGetFileStatusJSONResponse(currentRequest.data.length));
}
if (recordedRequest.getRequestLine().contains("op=OPEN")) {
String request = recordedRequest.getRequestLine();
int offset = 0;
int byteCount = 0;
Pattern pattern = Pattern.compile("offset=([0-9]+)");
Matcher matcher = pattern.matcher(request);
if (matcher.find()) {
System.out.println(matcher.group(1));
offset = Integer.parseInt(matcher.group(1));
}
pattern = Pattern.compile("length=([0-9]+)");
matcher = pattern.matcher(request);
if (matcher.find()) {
System.out.println(matcher.group(1));
byteCount = Integer.parseInt(matcher.group(1));
}
Buffer buf = new Buffer();
buf.write(currentRequest.data, offset, byteCount);
return new MockResponse().setResponseCode(200)
.setChunkedBody(buf, 4 * 1024 * 1024);
}
return new MockResponse().setBody("NOT SUPPORTED").setResponseCode(501);
}
});
}
@Before
public void resetHandle() {
commonHandle = null;
}
@Test
public void testParallelReadOnDifferentStreams()
throws IOException, InterruptedException, ExecutionException {
ArrayList<CreateTestData> createTestData = new ArrayList<CreateTestData>();
Random random = new Random();
for (int i = 0; i < concurrencyLevel; i++) {
CreateTestData testData = new CreateTestData();
testData
.set(new Path("/test/concurrentRead/" + UUID.randomUUID().toString()),
getRandomByteArrayData(random.nextInt(1 * 1024 * 1024)));
createTestData.add(testData);
}
setDispatcher(createTestData);
ArrayList<ReadTestData> readTestData = new ArrayList<ReadTestData>();
for (CreateTestData local : createTestData) {
ReadTestData localReadData = new ReadTestData();
localReadData.set(local.path, local.data, 0);
readTestData.add(localReadData);
}
runReadTest(readTestData, false);
}
@Test
public void testParallelReadOnSameStreams()
throws IOException, InterruptedException, ExecutionException {
ArrayList<CreateTestData> createTestData = new ArrayList<CreateTestData>();
Random random = new Random();
for (int i = 0; i < 1; i++) {
CreateTestData testData = new CreateTestData();
testData
.set(new Path("/test/concurrentRead/" + UUID.randomUUID().toString()),
getRandomByteArrayData(1024 * 1024));
createTestData.add(testData);
}
setDispatcher(createTestData);
ArrayList<ReadTestData> readTestData = new ArrayList<ReadTestData>();
ByteArrayInputStream buffered = new ByteArrayInputStream(
createTestData.get(0).data);
ReadTestData readInitially = new ReadTestData();
byte[] initialData = new byte[1024 * 1024];
buffered.read(initialData);
readInitially.set(createTestData.get(0).path, initialData, 0);
readTestData.add(readInitially);
runReadTest(readTestData, false);
readTestData.clear();
for (int i = 0; i < concurrencyLevel * 5; i++) {
ReadTestData localReadData = new ReadTestData();
int offset = random.nextInt((1024 * 1024)-1);
int length = 1024 * 1024 - offset;
byte[] expectedData = new byte[length];
buffered.reset();
buffered.skip(offset);
buffered.read(expectedData);
localReadData.set(createTestData.get(0).path, expectedData, offset);
readTestData.add(localReadData);
}
runReadTest(readTestData, true);
}
void runReadTest(ArrayList<ReadTestData> testData, boolean useSameStream)
throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(testData.size());
Future[] subtasks = new Future[testData.size()];
for (int i = 0; i < testData.size(); i++) {
subtasks[i] = executor.submit(
new ReadConcurrentRunnable(testData.get(i).data, testData.get(i).path,
testData.get(i).offset, useSameStream));
}
executor.shutdown();
// wait until all tasks are finished
executor.awaitTermination(120, TimeUnit.SECONDS);
for (int i = 0; i < testData.size(); ++i) {
Assert.assertTrue((Boolean) subtasks[i].get());
}
}
class ReadTestData {
private Path path;
private byte[] data;
private int offset;
public void set(Path filePath, byte[] dataToBeRead, int fromOffset) {
this.path = filePath;
this.data = dataToBeRead;
this.offset = fromOffset;
}
}
class CreateTestData {
private Path path;
private byte[] data;
public void set(Path filePath, byte[] dataToBeWritten) {
this.path = filePath;
this.data = dataToBeWritten;
}
}
class ReadConcurrentRunnable implements Callable<Boolean> {
private Path path;
private int offset;
private byte[] expectedData;
private boolean useSameStream;
public ReadConcurrentRunnable(byte[] expectedData, Path path, int offset,
boolean useSameStream) {
this.path = path;
this.offset = offset;
this.expectedData = expectedData;
this.useSameStream = useSameStream;
}
public Boolean call() throws IOException {
try {
FSDataInputStream in;
if (useSameStream) {
synchronized (lock) {
if (commonHandle == null) {
commonHandle = getMockAdlFileSystem().open(path);
}
in = commonHandle;
}
} else {
in = getMockAdlFileSystem().open(path);
}
byte[] actualData = new byte[expectedData.length];
in.readFully(offset, actualData);
Assert.assertArrayEquals("Path :" + path.toString() + " did not match.",
expectedData, actualData);
if (!useSameStream) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdfs.web;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.adl.TestableAdlFileSystem;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.web.oauth2.ConfCredentialBasedAccessTokenProvider;
import org.apache.hadoop.hdfs.web.oauth2.CredentialBasedAccessTokenProvider;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* This class is responsible for testing adl file system required configuration
* and feature set keys.
*/
public class TestConfigurationSetting {
@Test
public void testAllConfiguration() throws URISyntaxException, IOException {
TestableAdlFileSystem fs = new TestableAdlFileSystem();
Configuration conf = new Configuration();
conf.set(HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY,
"http://localhost:1111/refresh");
conf.set(CredentialBasedAccessTokenProvider.OAUTH_CREDENTIAL_KEY,
"credential");
conf.set(HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY, "MY_CLIENTID");
conf.set(HdfsClientConfigKeys.ACCESS_TOKEN_PROVIDER_KEY,
ConfCredentialBasedAccessTokenProvider.class.getName());
conf.set(HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY, "true");
URI uri = new URI("adl://localhost:1234");
fs.initialize(uri, conf);
// Default setting check
Assert.assertEquals(true, fs.isFeatureRedirectOff());
Assert.assertEquals(true, fs.isFeatureGetBlockLocationLocallyBundled());
Assert.assertEquals(true, fs.isFeatureConcurrentReadWithReadAhead());
Assert.assertEquals(false, fs.isOverrideOwnerFeatureOn());
Assert.assertEquals(8 * 1024 * 1024, fs.getMaxBufferSize());
Assert.assertEquals(2, fs.getMaxConcurrentConnection());
fs.close();
// Configuration toggle check
conf.set("adl.feature.override.redirection.off", "false");
fs.initialize(uri, conf);
Assert.assertEquals(false, fs.isFeatureRedirectOff());
fs.close();
conf.set("adl.feature.override.redirection.off", "true");
fs.initialize(uri, conf);
Assert.assertEquals(true, fs.isFeatureRedirectOff());
fs.close();
conf.set("adl.feature.override.getblocklocation.locally.bundled", "false");
fs.initialize(uri, conf);
Assert.assertEquals(false, fs.isFeatureGetBlockLocationLocallyBundled());
fs.close();
conf.set("adl.feature.override.getblocklocation.locally.bundled", "true");
fs.initialize(uri, conf);
Assert.assertEquals(true, fs.isFeatureGetBlockLocationLocallyBundled());
fs.close();
conf.set("adl.feature.override.readahead", "false");
fs.initialize(uri, conf);
Assert.assertEquals(false, fs.isFeatureConcurrentReadWithReadAhead());
fs.close();
conf.set("adl.feature.override.readahead", "true");
fs.initialize(uri, conf);
Assert.assertEquals(true, fs.isFeatureConcurrentReadWithReadAhead());
fs.close();
conf.set("adl.feature.override.readahead.max.buffersize", "101");
fs.initialize(uri, conf);
Assert.assertEquals(101, fs.getMaxBufferSize());
fs.close();
conf.set("adl.feature.override.readahead.max.buffersize", "12134565");
fs.initialize(uri, conf);
Assert.assertEquals(12134565, fs.getMaxBufferSize());
fs.close();
conf.set("adl.debug.override.localuserasfileowner", "true");
fs.initialize(uri, conf);
Assert.assertEquals(true, fs.isOverrideOwnerFeatureOn());
fs.close();
conf.set("adl.debug.override.localuserasfileowner", "false");
fs.initialize(uri, conf);
Assert.assertEquals(false, fs.isOverrideOwnerFeatureOn());
fs.close();
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.hdfs.web;
import com.squareup.okhttp.mockwebserver.MockResponse;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.adl.TestADLResponseData;
import org.apache.hadoop.fs.common.AdlMockWebServer;
import org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem.BatchByteArrayInputStream;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
/**
* This class is responsible for testing split size calculation during
* read ahead buffer initiation based on the data size and configuration
* initialization.
*/
public class TestSplitSizeCalculation extends AdlMockWebServer {
@Test
public void testSplitSizeCalculations()
throws URISyntaxException, IOException {
getMockServer().enqueue(new MockResponse().setResponseCode(200).setBody(
TestADLResponseData.getGetFileStatusJSONResponse(128 * 1024 * 1024)));
getMockServer().enqueue(new MockResponse().setResponseCode(200).setBody(
TestADLResponseData.getGetFileStatusJSONResponse(128 * 1024 * 1024)));
getMockServer().enqueue(new MockResponse().setResponseCode(200).setBody(
TestADLResponseData.getGetFileStatusJSONResponse(128 * 1024 * 1024)));
getMockServer().enqueue(new MockResponse().setResponseCode(200).setBody(
TestADLResponseData.getGetFileStatusJSONResponse(128 * 1024 * 1024)));
URL url = getMockServer().getUrl("");
BatchByteArrayInputStream stream = getMockAdlFileSystem()
.new BatchByteArrayInputStream(url,
new Path("/test1/test2"), 16 * 1024 * 1024, 4);
Assert.assertEquals(1, stream.getSplitSize(1 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(2 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(3 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(4 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(5 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(6 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(7 * 1024 * 1024));
Assert.assertEquals(2, stream.getSplitSize(8 * 1024 * 1024));
Assert.assertEquals(4, stream.getSplitSize(16 * 1024 * 1024));
Assert.assertEquals(3, stream.getSplitSize(12 * 1024 * 1024));
Assert.assertEquals(4, stream.getSplitSize(102 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(102));
stream.close();
stream = getMockAdlFileSystem().new BatchByteArrayInputStream(url,
new Path("/test1/test2"), 4 * 1024 * 1024, 4);
Assert.assertEquals(1, stream.getSplitSize(1 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(2 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(3 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(4 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(5 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(8 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(5 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(6 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(7 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(16 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(12 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(102 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(102));
stream.close();
stream = getMockAdlFileSystem().new BatchByteArrayInputStream(url,
new Path("/test1/test2"), 16 * 1024 * 1024, 2);
Assert.assertEquals(1, stream.getSplitSize(1 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(2 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(3 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(4 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(5 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(5 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(6 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(7 * 1024 * 1024));
Assert.assertEquals(2, stream.getSplitSize(8 * 1024 * 1024));
Assert.assertEquals(2, stream.getSplitSize(16 * 1024 * 1024));
Assert.assertEquals(2, stream.getSplitSize(12 * 1024 * 1024));
Assert.assertEquals(2, stream.getSplitSize(102 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(102));
stream.close();
stream = getMockAdlFileSystem().new BatchByteArrayInputStream(url,
new Path("/test1/test2"), 8 * 1024 * 1024, 2);
Assert.assertEquals(1, stream.getSplitSize(1 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(2 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(3 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(4 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(5 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(6 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(7 * 1024 * 1024));
Assert.assertEquals(2, stream.getSplitSize(8 * 1024 * 1024));
Assert.assertEquals(2, stream.getSplitSize(16 * 1024 * 1024));
Assert.assertEquals(2, stream.getSplitSize(12 * 1024 * 1024));
Assert.assertEquals(2, stream.getSplitSize(102 * 1024 * 1024));
Assert.assertEquals(1, stream.getSplitSize(102));
stream.close();
}
}

View File

@ -0,0 +1,139 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<property>
<name>fs.contract.test.root-tests-enabled</name>
<value>true</value>
</property>
<property>
<name>fs.contract.test.supports-concat</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-returns-false-if-source-missing</name>
<value>true</value>
</property>
<property>
<name>fs.contract.test.random-seek-count</name>
<value>10</value>
</property>
<property>
<name>fs.contract.is-case-sensitive</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-returns-true-if-dest-exists</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-returns-true-if-source-missing</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-creates-dest-dirs</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-remove-dest-if-empty-dir</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-settimes</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-append</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-atomic-directory-delete</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-atomic-rename</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-block-locality</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-concat</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-seek</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-seek-on-closed-file</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rejects-seek-past-eof</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-available-on-closed-file</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-strict-exceptions</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-unix-permissions</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rename-overwrites-dest</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-append</name>
<value>true</value>
</property>
<property>
<name>fs.azure.enable.append.support</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-getfilestatus</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,57 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<property>
<name>dfs.webhdfs.oauth2.refresh.token.expires.ms.since.epoch</name>
<value>0</value>
</property>
<property>
<name>dfs.webhdfs.oauth2.credential</name>
<value>bearer.and.refresh.token</value>
</property>
<property>
<name>dfs.webhdfs.oauth2.refresh.url</name>
<value>https://login.windows.net/common/oauth2/token/</value>
</property>
<property>
<name>dfs.webhdfs.oauth2.access.token.provider</name>
<value>
org.apache.hadoop.fs.adl.oauth2.CachedRefreshTokenBasedAccessTokenProvider
</value>
</property>
<property>
<name>dfs.webhdfs.oauth2.enabled</name>
<value>true</value>
</property>
<!--USER INPUT REQUIRED-->
<property>
<name>dfs.webhdfs.oauth2.client.id</name>
<value>ADD CLIENT ID</value>
</property>
<!--USER INPUT REQUIRED-->
<property>
<name>dfs.webhdfs.oauth2.refresh.token</name>
<value>ADD REFRESH TOKEN</value>
</property>
<!--USER INPUT REQUIRED-->
<property>
<name>fs.defaultFS</name>
<value>adl://urAdlAccountGoesHere.azuredatalakestore.net:443/</value>
</property>
<!--USER INPUT REQUIRED-->
<property>
<name>dfs.adl.test.contract.enable</name>
<value>false</value>
</property>
</configuration>