HADOOP-14383. Implement FileSystem that reads from HTTP / HTTPS endpoints.
This commit is contained in:
parent
6ca0c134e4
commit
b8870d8159
@ -306,6 +306,11 @@
|
|||||||
<artifactId>aalto-xml</artifactId>
|
<artifactId>aalto-xml</artifactId>
|
||||||
<scope>compile</scope>
|
<scope>compile</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.squareup.okhttp3</groupId>
|
||||||
|
<artifactId>mockwebserver</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
@ -0,0 +1,153 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.http;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PositionedReadable;
|
||||||
|
import org.apache.hadoop.fs.Seekable;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
|
import java.io.FilterInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URLConnection;
|
||||||
|
|
||||||
|
abstract class AbstractHttpFileSystem extends FileSystem {
|
||||||
|
private static final long DEFAULT_BLOCK_SIZE = 4096;
|
||||||
|
private static final Path WORKING_DIR = new Path("/");
|
||||||
|
|
||||||
|
private URI uri;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(URI name, Configuration conf) throws IOException {
|
||||||
|
super.initialize(name, conf);
|
||||||
|
this.uri = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract String getScheme();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public URI getUri() {
|
||||||
|
return uri;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
|
||||||
|
URLConnection conn = path.toUri().toURL().openConnection();
|
||||||
|
InputStream in = conn.getInputStream();
|
||||||
|
return new FSDataInputStream(new HttpDataInputStream(in));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream create(Path path, FsPermission fsPermission,
|
||||||
|
boolean b, int i, short i1, long l,
|
||||||
|
Progressable progressable)
|
||||||
|
throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream append(Path path, int i, Progressable progressable)
|
||||||
|
throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean rename(Path path, Path path1) throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean delete(Path path, boolean b) throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus[] listStatus(Path path) throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setWorkingDirectory(Path path) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path getWorkingDirectory() {
|
||||||
|
return WORKING_DIR;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean mkdirs(Path path, FsPermission fsPermission)
|
||||||
|
throws IOException {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus getFileStatus(Path path) throws IOException {
|
||||||
|
return new FileStatus(-1, false, 1, DEFAULT_BLOCK_SIZE, 0, path);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class HttpDataInputStream extends FilterInputStream
|
||||||
|
implements Seekable, PositionedReadable {
|
||||||
|
|
||||||
|
HttpDataInputStream(InputStream in) {
|
||||||
|
super(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(long position, byte[] buffer, int offset, int length)
|
||||||
|
throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFully(long position, byte[] buffer, int offset, int length)
|
||||||
|
throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFully(long position, byte[] buffer) throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void seek(long pos) throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getPos() throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,28 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.http;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Filesystem that reads from HTTP endpoint.
|
||||||
|
*/
|
||||||
|
public class HttpFileSystem extends AbstractHttpFileSystem {
|
||||||
|
@Override
|
||||||
|
public String getScheme() {
|
||||||
|
return "http";
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,28 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.http;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Filesystem that reads from HTTPS endpoint.
|
||||||
|
*/
|
||||||
|
public class HttpsFileSystem extends AbstractHttpFileSystem {
|
||||||
|
@Override
|
||||||
|
public String getScheme() {
|
||||||
|
return "https";
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,23 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Filesystem implementations that allow Hadoop to read directly from
|
||||||
|
* HTTP / HTTPS endpoints.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.http;
|
@ -17,3 +17,5 @@ org.apache.hadoop.fs.LocalFileSystem
|
|||||||
org.apache.hadoop.fs.viewfs.ViewFileSystem
|
org.apache.hadoop.fs.viewfs.ViewFileSystem
|
||||||
org.apache.hadoop.fs.ftp.FTPFileSystem
|
org.apache.hadoop.fs.ftp.FTPFileSystem
|
||||||
org.apache.hadoop.fs.HarFileSystem
|
org.apache.hadoop.fs.HarFileSystem
|
||||||
|
org.apache.hadoop.fs.http.HttpFileSystem
|
||||||
|
org.apache.hadoop.fs.http.HttpsFileSystem
|
||||||
|
@ -0,0 +1,67 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.http;
|
||||||
|
|
||||||
|
import okhttp3.mockwebserver.MockResponse;
|
||||||
|
import okhttp3.mockwebserver.MockWebServer;
|
||||||
|
import okhttp3.mockwebserver.RecordedRequest;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Testing HttpFileSystem.
|
||||||
|
*/
|
||||||
|
public class TestHttpFileSystem {
|
||||||
|
@Test
|
||||||
|
public void testHttpFileSystem() throws IOException, URISyntaxException,
|
||||||
|
InterruptedException {
|
||||||
|
Configuration conf = new Configuration(false);
|
||||||
|
conf.set("fs.http.impl", HttpFileSystem.class.getCanonicalName());
|
||||||
|
final String data = "foo";
|
||||||
|
|
||||||
|
try (MockWebServer server = new MockWebServer()) {
|
||||||
|
server.enqueue(new MockResponse().setBody(data));
|
||||||
|
server.start();
|
||||||
|
URI uri = URI.create(String.format("http://%s:%d", server.getHostName(),
|
||||||
|
server.getPort()));
|
||||||
|
FileSystem fs = FileSystem.get(uri, conf);
|
||||||
|
try (InputStream is = fs.open(
|
||||||
|
new Path(new URL(uri.toURL(), "/foo").toURI()),
|
||||||
|
4096)) {
|
||||||
|
byte[] buf = new byte[data.length()];
|
||||||
|
IOUtils.readFully(is, buf, 0, buf.length);
|
||||||
|
assertEquals(data, new String(buf, StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
RecordedRequest req = server.takeRequest();
|
||||||
|
assertEquals("/foo", req.getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -131,6 +131,12 @@
|
|||||||
<artifactId>okhttp</artifactId>
|
<artifactId>okhttp</artifactId>
|
||||||
<version>2.4.0</version>
|
<version>2.4.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.squareup.okhttp3</groupId>
|
||||||
|
<artifactId>mockwebserver</artifactId>
|
||||||
|
<version>3.7.0</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>jdiff</groupId>
|
<groupId>jdiff</groupId>
|
||||||
<artifactId>jdiff</artifactId>
|
<artifactId>jdiff</artifactId>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user