From d9c3d9a5ef516fc6c6e4ea04ebe52c189b3210a9 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Mon, 18 Sep 2017 12:07:43 -0700 Subject: [PATCH] HDFS-11873. Ozone: Object store handler supports reusing http client for multiple requests. Contributed by Xiaoyu Yao and Weiwei Yang. --- .../web/netty/ObjectStoreJerseyContainer.java | 1 + ...questContentObjectStoreChannelHandler.java | 21 +- ...uestDispatchObjectStoreChannelHandler.java | 2 +- .../ozone/web/client/TestOzoneClient.java | 305 ++++++++++++++++++ 4 files changed, 327 insertions(+), 2 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestOzoneClient.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java index efdc2882770..ce63602c460 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/ObjectStoreJerseyContainer.java @@ -231,6 +231,7 @@ public final class ObjectStoreJerseyContainer { ObjectStoreJerseyContainer.this.webapp, this.nettyReq, this.reqIn); ObjectStoreJerseyContainer.this.webapp.handleRequest(jerseyReq, this); } catch (Exception e) { + LOG.error("Error running Jersey Request Runner", e); this.exception = e; this.latch.countDown(); } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java index df7d50b20bc..7d94289b115 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestContentObjectStoreChannelHandler.java @@ -46,6 +46,7 @@ public final class RequestContentObjectStoreChannelHandler private final Future nettyResp; private final OutputStream reqOut; private final InputStream respIn; + private ObjectStoreJerseyContainer jerseyContainer; /** * Creates a new RequestContentObjectStoreChannelHandler. @@ -54,13 +55,16 @@ public final class RequestContentObjectStoreChannelHandler * @param nettyResp asynchronous HTTP response * @param reqOut output stream for writing request body * @param respIn input stream for reading response body + * @param jerseyContainer jerseyContainer to handle the request */ public RequestContentObjectStoreChannelHandler(HttpRequest nettyReq, - Future nettyResp, OutputStream reqOut, InputStream respIn) { + Future nettyResp, OutputStream reqOut, InputStream respIn, + ObjectStoreJerseyContainer jerseyContainer) { this.nettyReq = nettyReq; this.nettyResp = nettyResp; this.reqOut = reqOut; this.respIn = respIn; + this.jerseyContainer = jerseyContainer; } @Override @@ -83,6 +87,21 @@ public final class RequestContentObjectStoreChannelHandler respFuture.addListener(new CloseableCleanupListener(this.respIn)); if (!HttpHeaderUtil.isKeepAlive(this.nettyReq)) { respFuture.addListener(ChannelFutureListener.CLOSE); + } else { + respFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + // Notify client this is the last content for current request. + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + // Reset the pipeline handler for next request to reuses the + // same connection. + RequestDispatchObjectStoreChannelHandler h = + new RequestDispatchObjectStoreChannelHandler(jerseyContainer); + ctx.pipeline().replace(ctx.pipeline().last(), + RequestDispatchObjectStoreChannelHandler.class.getSimpleName(), + h); + } + }); } } LOG.trace( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java index df7edf71ce5..b416b8fa256 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/netty/RequestDispatchObjectStoreChannelHandler.java @@ -88,7 +88,7 @@ public final class RequestDispatchObjectStoreChannelHandler ctx.pipeline().replace(this, RequestContentObjectStoreChannelHandler.class.getSimpleName(), new RequestContentObjectStoreChannelHandler(nettyReq, nettyResp, - reqOut, respIn)); + reqOut, respIn, jerseyContainer)); LOG.trace("end RequestDispatchObjectStoreChannelHandler channelRead0, " + "ctx = {}, nettyReq = {}", ctx, nettyReq); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestOzoneClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestOzoneClient.java new file mode 100644 index 00000000000..e7444d3d44d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestOzoneClient.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web.client; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.rest.headers.Header; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.util.EntityUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.ws.rs.core.HttpHeaders; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URISyntaxException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; +import java.util.UUID; + +import static io.netty.util.CharsetUtil.UTF_8; + +/** + * Unit tests for Ozone client connection reuse with Apache HttpClient and Netty + * based HttpClient. + */ +public class TestOzoneClient { + private static Logger log = Logger.getLogger(TestOzoneClient.class); + private static int testVolumeCount = 5; + private static MiniOzoneCluster cluster = null; + private static String endpoint = null; + + @BeforeClass + public static void init() throws Exception { + Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.ALL); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, + OzoneConsts.OZONE_HANDLER_DISTRIBUTED); + cluster = new MiniOzoneCluster.Builder(conf) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); + DataNode dataNode = cluster.getDataNodes().get(0); + endpoint = String.format("http://localhost:%d", dataNode.getInfoPort()); + } + + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 5000) + public void testNewConnectionPerRequest() + throws IOException, URISyntaxException { + for (int i = 0; i < testVolumeCount; i++) { + try (CloseableHttpClient httpClient = + HttpClients.createDefault()) { + createVolume(getRandomVolumeName(i), httpClient); + } + } + } + + /** + * Object handler should be able to serve multiple requests from + * a single http client. This allows the client side to reuse + * http connections in a connection pool instead of creating a new + * connection per request which consumes resource heavily. + * + */ + @Test(timeout = 5000) + public void testReuseWithApacheHttpClient() + throws IOException, URISyntaxException { + + PoolingHttpClientConnectionManager cm = + new PoolingHttpClientConnectionManager(); + cm.setMaxTotal(200); + cm.setDefaultMaxPerRoute(20); + + try (CloseableHttpClient httpClient = + HttpClients.custom().setConnectionManager(cm).build()) { + for (int i = 0; i < testVolumeCount; i++) { + createVolume(getRandomVolumeName(i), httpClient); + } + } + } + + @Test(timeout = 10000) + public void testReuseWithNettyHttpClient() + throws IOException, InterruptedException, URISyntaxException { + URI uri = new URI(endpoint); + String host = uri.getHost() == null? "127.0.0.1" : uri.getHost(); + int port = uri.getPort(); + + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + Bootstrap b = new Bootstrap(); + b.group(workerGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.SO_REUSEADDR, true) + .handler(new ChannelInitializer() { + /** + * This method will be called once the {@link Channel} was + * registered. After the method returns this instance + * will be removed from the {@link ChannelPipeline} + * of the {@link Channel}. + * + * @param ch the {@link Channel} which was registered. + * @throws Exception is thrown if an error occurs. + * In that case the {@link Channel} will be closed. + */ + @Override + public void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + + // Comment the following line if you don't want client http trace + p.addLast("log", new LoggingHandler(LogLevel.INFO)); + p.addLast(new HttpClientCodec()); + p.addLast(new HttpContentDecompressor()); + p.addLast(new NettyHttpClientHandler()); + } + }); + + Channel ch = b.connect(host, port).sync().channel(); + for (int i = 0; i < testVolumeCount; i++) { + String volumeName = getRandomVolumeName(i); + try { + sendNettyCreateVolumeRequest(ch, volumeName); + Thread.sleep(1000); + } catch (Exception e) { + e.printStackTrace(); + } + } + + Thread.sleep(1000); + ch.close(); + // Wait for the server to close the connection. + ch.closeFuture().sync(); + } catch (Exception ex) { + log.error("Error received in client setup", ex); + }finally { + workerGroup.shutdownGracefully(); + } + } + + class NettyHttpClientHandler extends + SimpleChannelInboundHandler { + + @Override + public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { + if (msg instanceof HttpResponse) { + HttpResponse response = (HttpResponse) msg; + log.info("STATUS: " + response.getStatus()); + log.info("VERSION: " + response.getProtocolVersion()); + Assert.assertEquals(HttpResponseStatus.CREATED.code(), + response.getStatus().code()); + } + if (msg instanceof HttpContent) { + HttpContent content = (HttpContent) msg; + log.info(content.content().toString(UTF_8)); + if (content instanceof LastHttpContent) { + log.info("END OF CONTENT"); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + log.error("Exception upon channel read", cause); + ctx.close(); + } + } + + private String getRandomVolumeName(int index) { + UUID id = UUID.randomUUID(); + return "test-volume-" + index + "-" + id; + + } + + // Prepare the HTTP request and send it over the netty channel. + private void sendNettyCreateVolumeRequest(Channel channel, String volumeName) + throws URISyntaxException, IOException { + URIBuilder builder = new URIBuilder(endpoint); + builder.setPath("/" + volumeName); + URI uri = builder.build(); + + String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost(); + FullHttpRequest request = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath()); + + SimpleDateFormat format = + new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US); + request.headers().set(HttpHeaders.HOST, host); + request.headers().add(HttpHeaders.CONTENT_TYPE, "application/json"); + request.headers().set(Header.OZONE_VERSION_HEADER, + Header.OZONE_V1_VERSION_HEADER); + request.headers().set(HttpHeaders.DATE, + format.format(new Date(Time.monotonicNow()))); + request.headers().set(Header.OZONE_USER, + UserGroupInformation.getCurrentUser().getUserName()); + request.headers().set(HttpHeaders.AUTHORIZATION, + Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " + + OzoneConsts.OZONE_SIMPLE_HDFS_USER); + + // Send the HTTP request via netty channel. + channel.writeAndFlush(request); + } + + // It is caller's responsibility to close the client. + private void createVolume(String volumeName, CloseableHttpClient httpClient) + throws IOException, URISyntaxException { + HttpPost create1 = + getCreateVolumeRequest(volumeName); + HttpEntity entity = null; + try { + CloseableHttpResponse response1 = + httpClient.execute(create1); + Assert.assertEquals(HttpURLConnection.HTTP_CREATED, + response1.getStatusLine().getStatusCode()); + entity = response1.getEntity(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + EntityUtils.consumeQuietly(entity); + } + } + + private HttpPost getCreateVolumeRequest(String volumeName) + throws URISyntaxException, IOException { + URIBuilder builder = new URIBuilder(endpoint); + builder.setPath("/" + volumeName); + HttpPost httpPost = new HttpPost(builder.build().toString()); + SimpleDateFormat format = + new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US); + httpPost.addHeader(Header.OZONE_VERSION_HEADER, + Header.OZONE_V1_VERSION_HEADER); + httpPost.addHeader(HttpHeaders.DATE, + format.format(new Date(Time.monotonicNow()))); + httpPost.addHeader(Header.OZONE_USER, + UserGroupInformation.getCurrentUser().getUserName()); + httpPost.addHeader(HttpHeaders.AUTHORIZATION, + Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " + + OzoneConsts.OZONE_SIMPLE_HDFS_USER); + return httpPost; + } + +}