diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java new file mode 100644 index 00000000000..2d587bfe374 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.example; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * A simple example shows how to use asynchronous client. + */ +public class AsyncClientExample extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(AsyncClientExample.class); + + /** + * The size for thread pool. + */ + private static final int THREAD_POOL_SIZE = 16; + + /** + * The default number of operations. + */ + private static final int DEFAULT_NUM_OPS = 100; + + /** + * The name of the column family. d for default. + */ + private static final byte[] FAMILY = Bytes.toBytes("d"); + + /** + * For the example we're just using one qualifier. + */ + private static final byte[] QUAL = Bytes.toBytes("test"); + + private final AtomicReference> future = + new AtomicReference<>(); + + private CompletableFuture getConn() { + CompletableFuture f = future.get(); + if (f != null) { + return f; + } + for (;;) { + if (future.compareAndSet(null, new CompletableFuture<>())) { + CompletableFuture toComplete = future.get(); + ConnectionFactory.createAsyncConnection(getConf()).whenComplete((conn, error) -> { + if (error != null) { + toComplete.completeExceptionally(error); + // we need to reset the future holder so we will get a chance to recreate an async + // connection at next try. + future.set(null); + return; + } + toComplete.complete(conn); + }); + return toComplete; + } else { + f = future.get(); + if (f != null) { + return f; + } + } + } + } + + private CompletableFuture closeConn() { + CompletableFuture f = future.get(); + if (f == null) { + return CompletableFuture.completedFuture(null); + } + CompletableFuture closeFuture = new CompletableFuture<>(); + f.whenComplete((conn, error) -> { + if (error == null) { + IOUtils.closeQuietly(conn); + } + closeFuture.complete(null); + }); + return closeFuture; + } + + private byte[] getKey(int i) { + return Bytes.toBytes(String.format("%08x", i)); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 1 || args.length > 2) { + System.out.println("Usage: " + this.getClass().getName() + " tableName [num_operations]"); + return -1; + } + TableName tableName = TableName.valueOf(args[0]); + int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS; + ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE, + Threads.newDaemonThreadFactory("AsyncClientExample")); + // We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not + // need a thread pool and may have a better performance if you use it correctly as it can save + // some context switches. But if you use RawAsyncTable incorrectly, you may have a very bad + // impact on performance so use it with caution. + CountDownLatch latch = new CountDownLatch(numOps); + IntStream.range(0, numOps).forEach(i -> { + CompletableFuture future = getConn(); + future.whenComplete((conn, error) -> { + if (error != null) { + LOG.warn("failed to get async connection for " + i, error); + latch.countDown(); + return; + } + AsyncTable table = conn.getTable(tableName, threadPool); + table.put(new Put(getKey(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i))) + .whenComplete((putResp, putErr) -> { + if (putErr != null) { + LOG.warn("put failed for " + i, putErr); + latch.countDown(); + return; + } + LOG.info("put for " + i + " succeeded, try getting"); + table.get(new Get(getKey(i))).whenComplete((result, getErr) -> { + if (getErr != null) { + LOG.warn("get failed for " + i, putErr); + latch.countDown(); + return; + } + if (result.isEmpty()) { + LOG.warn("get failed for " + i + ", server returns empty result"); + } else if (!result.containsColumn(FAMILY, QUAL)) { + LOG.warn("get failed for " + i + ", the result does not contain " + + Bytes.toString(FAMILY) + ":" + Bytes.toString(QUAL)); + } else { + int v = Bytes.toInt(result.getValue(FAMILY, QUAL)); + if (v != i) { + LOG.warn("get failed for " + i + ", the value of " + Bytes.toString(FAMILY) + + ":" + Bytes.toString(QUAL) + " is " + v + ", exected " + i); + } else { + LOG.info("get for " + i + " succeeded"); + } + } + latch.countDown(); + }); + }); + }); + }); + latch.await(); + closeConn().get(); + return 0; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new AsyncClientExample(), args); + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java new file mode 100644 index 00000000000..5b86ad3bea6 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/HttpProxyExample.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.example; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.QueryStringDecoder; +import io.netty.util.concurrent.GlobalEventExecutor; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RawAsyncTable; +import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A simple example on how to use {@link RawAsyncTable} to write a fully asynchronous HTTP proxy + * server. The {@link AsyncConnection} will share the same event loop with the HTTP server. + *

+ * The request URL is: + * + *

+ * http://<host>:<port>/<table>/<rowgt;/<family>:<qualifier>
+ * 
+ * + * Use HTTP GET to fetch data, and use HTTP PUT to put data. Encode the value as the request content + * when doing PUT. + */ +public class HttpProxyExample { + + private final EventLoopGroup bossGroup = new NioEventLoopGroup(1); + + private final EventLoopGroup workerGroup = new NioEventLoopGroup(); + + private final Configuration conf; + + private final int port; + + private AsyncConnection conn; + + private Channel serverChannel; + + private ChannelGroup channelGroup; + + public HttpProxyExample(Configuration conf, int port) { + this.conf = conf; + this.port = port; + } + + private static final class Params { + public final String table; + + public final String row; + + public final String family; + + public final String qualifier; + + public Params(String table, String row, String family, String qualifier) { + this.table = table; + this.row = row; + this.family = family; + this.qualifier = qualifier; + } + } + + private static final class RequestHandler extends SimpleChannelInboundHandler { + + private final AsyncConnection conn; + + private final ChannelGroup channelGroup; + + public RequestHandler(AsyncConnection conn, ChannelGroup channelGroup) { + this.conn = conn; + this.channelGroup = channelGroup; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + channelGroup.add(ctx.channel()); + ctx.fireChannelActive(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + channelGroup.remove(ctx.channel()); + ctx.fireChannelInactive(); + } + + private void write(ChannelHandlerContext ctx, HttpResponseStatus status, + Optional content) { + DefaultFullHttpResponse resp; + if (content.isPresent()) { + ByteBuf buf = + ctx.alloc().buffer().writeBytes(content.get().getBytes(StandardCharsets.UTF_8)); + resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, buf); + resp.headers().set(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes()); + } else { + resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status); + } + resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text-plain; charset=UTF-8"); + ctx.writeAndFlush(resp); + } + + private Params parse(FullHttpRequest req) { + String[] components = new QueryStringDecoder(req.uri()).path().split("/"); + Preconditions.checkArgument(components.length == 4, "Unrecognized uri: %s", req.uri()); + // path is start with '/' so split will give an empty component + String[] cfAndCq = components[3].split(":"); + Preconditions.checkArgument(cfAndCq.length == 2, "Unrecognized uri: %s", req.uri()); + return new Params(components[1], components[2], cfAndCq[0], cfAndCq[1]); + } + + private void get(ChannelHandlerContext ctx, FullHttpRequest req) { + Params params = parse(req); + conn.getRawTable(TableName.valueOf(params.table)).get(new Get(Bytes.toBytes(params.row)) + .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier))) + .whenComplete((r, e) -> { + if (e != null) { + exceptionCaught(ctx, e); + } else { + byte[] value = + r.getValue(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier)); + if (value != null) { + write(ctx, HttpResponseStatus.OK, Optional.of(Bytes.toStringBinary(value))); + } else { + write(ctx, HttpResponseStatus.NOT_FOUND, Optional.empty()); + } + } + }); + } + + private void put(ChannelHandlerContext ctx, FullHttpRequest req) { + Params params = parse(req); + byte[] value = new byte[req.content().readableBytes()]; + req.content().readBytes(value); + conn.getRawTable(TableName.valueOf(params.table)).put(new Put(Bytes.toBytes(params.row)) + .addColumn(Bytes.toBytes(params.family), Bytes.toBytes(params.qualifier), value)) + .whenComplete((r, e) -> { + if (e != null) { + exceptionCaught(ctx, e); + } else { + write(ctx, HttpResponseStatus.OK, Optional.empty()); + } + }); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) { + switch (req.method().name()) { + case "GET": + get(ctx, req); + break; + case "PUT": + put(ctx, req); + break; + default: + write(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED, Optional.empty()); + break; + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (cause instanceof IllegalArgumentException) { + write(ctx, HttpResponseStatus.BAD_REQUEST, Optional.of(cause.getMessage())); + } else { + write(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, + Optional.of(Throwables.getStackTraceAsString(cause))); + } + } + } + + public void start() throws InterruptedException, ExecutionException { + NettyRpcClientConfigHelper.setEventLoopConfig(conf, workerGroup, NioSocketChannel.class); + conn = ConnectionFactory.createAsyncConnection(conf).get(); + channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + serverChannel = new ServerBootstrap().group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, true) + .childHandler(new ChannelInitializer() { + + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addFirst(new HttpServerCodec(), new HttpObjectAggregator(4 * 1024 * 1024), + new RequestHandler(conn, channelGroup)); + } + }).bind(port).syncUninterruptibly().channel(); + } + + public void join() { + serverChannel.closeFuture().awaitUninterruptibly(); + } + + public int port() { + if (serverChannel == null) { + return port; + } else { + return ((InetSocketAddress) serverChannel.localAddress()).getPort(); + } + } + + public void stop() throws IOException { + serverChannel.close().syncUninterruptibly(); + serverChannel = null; + channelGroup.close().syncUninterruptibly(); + channelGroup = null; + conn.close(); + conn = null; + } + + public static void main(String[] args) throws InterruptedException, ExecutionException { + int port = Integer.parseInt(args[0]); + HttpProxyExample proxy = new HttpProxyExample(HBaseConfiguration.create(), port); + proxy.start(); + proxy.join(); + } +} diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/client/example/TestAsyncClientExample.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/client/example/TestAsyncClientExample.java new file mode 100644 index 00000000000..b6d4f5c0b68 --- /dev/null +++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/client/example/TestAsyncClientExample.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.example; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncClientExample { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("test"); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + UTIL.createTable(TABLE_NAME, Bytes.toBytes("d")); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + AsyncClientExample tool = new AsyncClientExample(); + tool.setConf(UTIL.getConfiguration()); + assertEquals(0, ToolRunner.run(tool, new String[] { TABLE_NAME.getNameAsString() })); + } +} diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/client/example/TestHttpProxyExample.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/client/example/TestHttpProxyExample.java new file mode 100644 index 00000000000..7c44f55d700 --- /dev/null +++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/client/example/TestHttpProxyExample.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.example; + +import static org.junit.Assert.assertEquals; + +import com.google.common.io.ByteStreams; + +import java.nio.charset.StandardCharsets; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.http.HttpStatus; +import org.apache.http.client.entity.EntityBuilder; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ClientTests.class, MediumTests.class }) +public class TestHttpProxyExample { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("test"); + + private static final String FAMILY = "cf"; + + private static final String QUALIFIER = "cq"; + + private static final String URL_TEMPLCATE = "http://localhost:%d/%s/%s/%s:%s"; + + private static final String ROW = "row"; + + private static final String VALUE = "value"; + + private static HttpProxyExample PROXY; + + private static int PORT; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + UTIL.createTable(TABLE_NAME, Bytes.toBytes(FAMILY)); + PROXY = new HttpProxyExample(UTIL.getConfiguration(), 0); + PROXY.start(); + PORT = PROXY.port(); + } + + @AfterClass + public static void tearDown() throws Exception { + if (PROXY != null) { + PROXY.stop(); + } + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + try (CloseableHttpClient client = HttpClientBuilder.create().build()) { + HttpPut put = new HttpPut( + String.format(URL_TEMPLCATE, PORT, TABLE_NAME.getNameAsString(), ROW, FAMILY, QUALIFIER)); + put.setEntity(EntityBuilder.create().setText(VALUE) + .setContentType(ContentType.create("text-plain", StandardCharsets.UTF_8)).build()); + try (CloseableHttpResponse resp = client.execute(put)) { + assertEquals(HttpStatus.SC_OK, resp.getStatusLine().getStatusCode()); + } + HttpGet get = new HttpGet( + String.format(URL_TEMPLCATE, PORT, TABLE_NAME.getNameAsString(), ROW, FAMILY, QUALIFIER)); + try (CloseableHttpResponse resp = client.execute(get)) { + assertEquals(HttpStatus.SC_OK, resp.getStatusLine().getStatusCode()); + assertEquals("value", + Bytes.toString(ByteStreams.toByteArray(resp.getEntity().getContent()))); + } + get = new HttpGet(String.format(URL_TEMPLCATE, PORT, TABLE_NAME.getNameAsString(), "whatever", + FAMILY, QUALIFIER)); + try (CloseableHttpResponse resp = client.execute(get)) { + assertEquals(HttpStatus.SC_NOT_FOUND, resp.getStatusLine().getStatusCode()); + } + } + } +}