From 8bcb8dba51b74ac9132e1ecd32a5c3e0a601c3d4 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Wed, 23 Apr 2014 22:08:19 +0000 Subject: [PATCH] svn merge -c 1589528 merging from trunk to branch-2 to fix:HDFS-6217. Webhdfs PUT operations may not work via a http proxy. Contributed by Daryn Sharp. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1589529 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/web/WebHdfsFileSystem.java | 19 +- .../hdfs/web/TestWebHdfsContentLength.java | 197 ++++++++++++++++++ 3 files changed, 218 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsContentLength.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2e6c2a13513..9e1515d615b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -127,6 +127,9 @@ Release 2.5.0 - UNRELEASED HDFS-6275. Fix warnings - type arguments can be inferred and redudant local variable. (suresh) + HDFS-6217. Webhdfs PUT operations may not work via a http proxy. + (Daryn Sharp via kihwal) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index ae0b721df71..413ee6922ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -490,8 +490,25 @@ public class WebHdfsFileSystem extends FileSystem private void connect(boolean doOutput) throws IOException { conn.setRequestMethod(op.getType().toString()); - conn.setDoOutput(doOutput); conn.setInstanceFollowRedirects(false); + switch (op.getType()) { + // if not sending a message body for a POST or PUT operation, need + // to ensure the server/proxy knows this + case POST: + case PUT: { + conn.setDoOutput(true); + if (!doOutput) { + // explicitly setting content-length to 0 won't do spnego!! + // opening and closing the stream will send "Content-Length: 0" + conn.getOutputStream().close(); + } + break; + } + default: { + conn.setDoOutput(doOutput); + break; + } + } conn.connect(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsContentLength.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsContentLength.java new file mode 100644 index 00000000000..ba99f7a091f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsContentLength.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.web; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.net.NetUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class TestWebHdfsContentLength { + private static ServerSocket listenSocket; + private static String bindAddr; + private static Path p; + private static FileSystem fs; + + private static final Pattern contentLengthPattern = Pattern.compile( + "^(Content-Length|Transfer-Encoding):\\s*(.*)", Pattern.MULTILINE); + + private static String errResponse = + "HTTP/1.1 500 Boom\r\n" + + "Content-Length: 0\r\n" + + "Connection: close\r\n\r\n"; + private static String redirectResponse; + + private static ExecutorService executor; + + @BeforeClass + public static void setup() throws IOException { + listenSocket = new ServerSocket(); + listenSocket.bind(null); + bindAddr = NetUtils.getHostPortString( + (InetSocketAddress)listenSocket.getLocalSocketAddress()); + redirectResponse = + "HTTP/1.1 307 Redirect\r\n" + + "Location: http://"+bindAddr+"/path\r\n" + + "Connection: close\r\n\r\n"; + + p = new Path("webhdfs://"+bindAddr+"/path"); + fs = p.getFileSystem(new Configuration()); + executor = Executors.newSingleThreadExecutor(); + } + + @AfterClass + public static void teardown() throws IOException { + if (listenSocket != null) { + listenSocket.close(); + } + if (executor != null) { + executor.shutdownNow(); + } + } + + @Test + public void testGetOp() throws Exception { + Future future = contentLengthFuture(errResponse); + try { + fs.getFileStatus(p); + Assert.fail(); + } catch (IOException ioe) {} // expected + Assert.assertEquals(null, getContentLength(future)); + } + + @Test + public void testGetOpWithRedirect() { + Future future1 = contentLengthFuture(redirectResponse); + Future future2 = contentLengthFuture(errResponse); + try { + fs.open(p).read(); + Assert.fail(); + } catch (IOException ioe) {} // expected + Assert.assertEquals(null, getContentLength(future1)); + Assert.assertEquals(null, getContentLength(future2)); + } + + @Test + public void testPutOp() { + Future future = contentLengthFuture(errResponse); + try { + fs.mkdirs(p); + Assert.fail(); + } catch (IOException ioe) {} // expected + Assert.assertEquals("0", getContentLength(future)); + } + + @Test + public void testPutOpWithRedirect() { + Future future1 = contentLengthFuture(redirectResponse); + Future future2 = contentLengthFuture(errResponse); + try { + FSDataOutputStream os = fs.create(p); + os.write(new byte[]{0}); + os.close(); + Assert.fail(); + } catch (IOException ioe) {} // expected + Assert.assertEquals("0", getContentLength(future1)); + Assert.assertEquals("chunked", getContentLength(future2)); + } + + @Test + public void testPostOp() { + Future future = contentLengthFuture(errResponse); + try { + fs.concat(p, new Path[]{p}); + Assert.fail(); + } catch (IOException ioe) {} // expected + Assert.assertEquals("0", getContentLength(future)); + } + + @Test + public void testPostOpWithRedirect() { + // POST operation with redirect + Future future1 = contentLengthFuture(redirectResponse); + Future future2 = contentLengthFuture(errResponse); + try { + FSDataOutputStream os = fs.append(p); + os.write(new byte[]{0}); + os.close(); + Assert.fail(); + } catch (IOException ioe) {} // expected + Assert.assertEquals("0", getContentLength(future1)); + Assert.assertEquals("chunked", getContentLength(future2)); + } + + @Test + public void testDelete() { + Future future = contentLengthFuture(errResponse); + try { + fs.delete(p, false); + Assert.fail(); + } catch (IOException ioe) {} // expected + Assert.assertEquals(null, getContentLength(future)); + } + + private String getContentLength(Future future) { + String request = null; + try { + request = future.get(2, TimeUnit.SECONDS); + } catch (Exception e) { + Assert.fail(e.toString()); + } + Matcher matcher = contentLengthPattern.matcher(request); + return matcher.find() ? matcher.group(2) : null; + } + + private Future contentLengthFuture(final String response) { + return executor.submit(new Callable() { + @Override + public String call() throws Exception { + Socket client = listenSocket.accept(); + client.setSoTimeout(2000); + try { + client.getOutputStream().write(response.getBytes()); + client.shutdownOutput(); + byte[] buf = new byte[4*1024]; // much bigger than request + int n = client.getInputStream().read(buf); + return new String(buf, 0, n); + } finally { + client.close(); + } + } + }); + } +}