From 4c5eec3968ec4f5dec3f309fdb145b27d6b0ff5a Mon Sep 17 00:00:00 2001 From: Andrew Kyle Purtell Date: Tue, 6 Apr 2010 07:07:44 +0000 Subject: [PATCH] HBASE-2412 [stargate] PerformanceEvaluation; and related fixes git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@931038 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + contrib/stargate/core/pom.xml | 22 + .../hadoop/hbase/stargate/ExistsResource.java | 79 ++ .../apache/hadoop/hbase/stargate/Main.java | 23 +- .../hadoop/hbase/stargate/RESTServlet.java | 27 +- .../hbase/stargate/RegionsResource.java | 44 +- .../hadoop/hbase/stargate/RootResource.java | 13 +- .../hadoop/hbase/stargate/RowResource.java | 43 +- .../hbase/stargate/ScannerResource.java | 36 +- .../hadoop/hbase/stargate/SchemaResource.java | 67 +- .../StorageClusterStatusResource.java | 17 +- .../hadoop/hbase/stargate/TableResource.java | 6 + .../apache/hadoop/hbase/stargate/User.java | 2 +- .../hadoop/hbase/stargate/client/Client.java | 76 +- .../hadoop/hbase/stargate/client/Cluster.java | 7 + .../hbase/stargate/client/RemoteAdmin.java | 188 +++ .../hbase/stargate/client/RemoteHTable.java | 234 +-- .../stargate/model/TableRegionModel.java | 18 +- .../stargate/model/TableSchemaModel.java | 44 + .../stargate/util/HTableTokenBucket.java | 2 +- .../hadoop/hbase/stargate/util/UserData.java | 30 +- .../hbase/stargate/PerformanceEvaluation.java | 1255 +++++++++++++++++ .../stargate/client/TestRemoteAdmin.java | 84 ++ 23 files changed, 2057 insertions(+), 261 deletions(-) create mode 100644 contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ExistsResource.java create mode 100644 contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteAdmin.java create mode 100644 contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/PerformanceEvaluation.java create mode 100644 contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/client/TestRemoteAdmin.java diff --git a/CHANGES.txt b/CHANGES.txt index ac4f7843eb1..b515fad4895 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -484,6 +484,7 @@ Release 0.21.0 - Unreleased HBASE-2087 The wait on compaction because "Too many store files" holds up all flushing HBASE-2252 Mapping a very big table kills region servers + HBASE-2412 [stargate] PerformanceEvaluation NEW FEATURES HBASE-1961 HBase EC2 scripts diff --git a/contrib/stargate/core/pom.xml b/contrib/stargate/core/pom.xml index 3c9615e2076..b5d7b02eb17 100644 --- a/contrib/stargate/core/pom.xml +++ b/contrib/stargate/core/pom.xml @@ -22,6 +22,28 @@ 3.0.1 + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + org/apache/hadoop/hbase/stargate/PerformanceEvaluation + + + + + + diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ExistsResource.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ExistsResource.java new file mode 100644 index 00000000000..5fb70b10f8e --- /dev/null +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ExistsResource.java @@ -0,0 +1,79 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.stargate; + +import java.io.IOException; + +import javax.ws.rs.GET; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.CacheControl; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; +import javax.ws.rs.core.Response.ResponseBuilder; + +import org.apache.hadoop.hbase.client.HBaseAdmin; + +public class ExistsResource implements Constants { + + User user; + String tableName; + String actualTableName; + CacheControl cacheControl; + RESTServlet servlet; + + public ExistsResource(User user, String table) throws IOException { + if (user != null) { + this.user = user; + this.actualTableName = + !user.isAdmin() ? (user.getName() + "." + table) : table; + } else { + this.actualTableName = table; + } + this.tableName = table; + servlet = RESTServlet.getInstance(); + cacheControl = new CacheControl(); + cacheControl.setNoCache(true); + cacheControl.setNoTransform(false); + } + + @GET + @Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF, + MIMETYPE_BINARY}) + public Response get(final @Context UriInfo uriInfo) throws IOException { + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } + try { + HBaseAdmin admin = new HBaseAdmin(servlet.getConfiguration()); + if (!admin.tableExists(actualTableName)) { + throw new WebApplicationException(Response.Status.NOT_FOUND); + } + } catch (IOException e) { + throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE); + } + ResponseBuilder response = Response.ok(); + response.cacheControl(cacheControl); + return response.build(); + } + +} diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/Main.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/Main.java index 8c6beaa004c..d1f51717c21 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/Main.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/Main.java @@ -20,11 +20,12 @@ package org.apache.hadoop.hbase.stargate; +import java.net.InetAddress; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; -import org.mortbay.jetty.Connector; import org.mortbay.jetty.Server; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.ServletHolder; @@ -64,6 +65,18 @@ public class Main implements Constants { sh.setInitParameter("com.sun.jersey.config.property.packages", "jetty"); + // configure the Stargate singleton + + RESTServlet servlet = RESTServlet.getInstance(); + port = servlet.getConfiguration().getInt("stargate.port", port); + if (!servlet.isMultiUser()) { + servlet.setMultiUser(cmd.hasOption("m")); + } + servlet.addConnectorAddress( + servlet.getConfiguration().get("stargate.hostname", + InetAddress.getLocalHost().getCanonicalHostName()), + port); + // set up Jetty and run the embedded server Server server = new Server(port); @@ -74,14 +87,6 @@ public class Main implements Constants { Context context = new Context(server, "/", Context.SESSIONS); context.addServlet(sh, "/*"); - // configure the Stargate singleton - - RESTServlet servlet = RESTServlet.getInstance(); - servlet.setMultiUser(cmd.hasOption("m")); - for (Connector conn: server.getConnectors()) { - servlet.addConnectorAddress(conn.getHost(), conn.getLocalPort()); - } - server.start(); server.join(); } diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RESTServlet.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RESTServlet.java index f6a3039a658..d1603f893eb 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RESTServlet.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RESTServlet.java @@ -193,6 +193,10 @@ public class RESTServlet extends ServletAdaptor this.statusReporter = new StatusReporter( conf.getInt(STATUS_REPORT_PERIOD_KEY, 1000 * 60), stopping); this.multiuser = conf.getBoolean("stargate.multiuser", false); + if (this.multiuser) { + LOG.info("multiuser mode enabled"); + getAuthenticator(); + } } @Override @@ -321,6 +325,7 @@ public class RESTServlet extends ServletAdaptor if (authenticator == null) { authenticator = new HBCAuthenticator(conf); } + LOG.info("using authenticator " + authenticator); } return authenticator; } @@ -339,18 +344,20 @@ public class RESTServlet extends ServletAdaptor * @param want the number of tokens desired * @throws IOException */ - public boolean userRequestLimit(final User user, int want) + public boolean userRequestLimit(final User user, int want) throws IOException { - UserData ud = SoftUserData.get(user); - HTableTokenBucket tb = (HTableTokenBucket) ud.get(UserData.TOKENBUCKET); - if (tb == null) { - tb = new HTableTokenBucket(conf, Bytes.toBytes(user.getToken())); - ud.put(UserData.TOKENBUCKET, tb); + if (multiuser) { + UserData ud = SoftUserData.get(user); + HTableTokenBucket tb = (HTableTokenBucket) ud.get(UserData.TOKENBUCKET); + if (tb == null) { + tb = new HTableTokenBucket(conf, Bytes.toBytes(user.getToken())); + ud.put(UserData.TOKENBUCKET, tb); + } + if (tb.available() < want) { + return false; + } + tb.remove(want); } - if (tb.available() < want) { - return false; - } - tb.remove(want); return true; } diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RegionsResource.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RegionsResource.java index 969666ed5ed..f8211b24637 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RegionsResource.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RegionsResource.java @@ -35,10 +35,12 @@ import javax.ws.rs.core.Response.ResponseBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.stargate.User; import org.apache.hadoop.hbase.stargate.model.TableInfoModel; @@ -48,18 +50,20 @@ public class RegionsResource implements Constants { private static final Log LOG = LogFactory.getLog(RegionsResource.class); User user; - String table; + String tableName; + String actualTableName; CacheControl cacheControl; RESTServlet servlet; public RegionsResource(User user, String table) throws IOException { if (user != null) { - if (!user.isAdmin()) { - throw new WebApplicationException(Response.Status.FORBIDDEN); - } this.user = user; + this.actualTableName = + !user.isAdmin() ? (user.getName() + "." + table) : table; + } else { + this.actualTableName = table; } - this.table = table; + this.tableName = table; cacheControl = new CacheControl(); cacheControl.setNoCache(true); cacheControl.setNoTransform(false); @@ -69,9 +73,9 @@ public class RegionsResource implements Constants { private Map getTableRegions() throws IOException { HTablePool pool = servlet.getTablePool(); - HTable table = (HTable) pool.getTable(this.table); + HTableInterface table = pool.getTable(actualTableName); try { - return table.getRegionsInfo(); + return ((HTable)table).getRegionsInfo(); } finally { pool.putTable(table); } @@ -79,22 +83,32 @@ public class RegionsResource implements Constants { @GET @Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) - public Response get(final @Context UriInfo uriInfo) { + public Response get(final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("GET " + uriInfo.getAbsolutePath()); } + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } servlet.getMetrics().incrementRequests(1); try { - TableInfoModel model = new TableInfoModel(table); + String name = user.isAdmin() ? actualTableName : tableName; + TableInfoModel model = new TableInfoModel(name); Map regions = getTableRegions(); for (Map.Entry e: regions.entrySet()) { HRegionInfo hri = e.getKey(); - HServerAddress addr = e.getValue(); - InetSocketAddress sa = addr.getInetSocketAddress(); - model.add( - new TableRegionModel(table, hri.getRegionId(), hri.getStartKey(), - hri.getEndKey(), - sa.getHostName() + ":" + Integer.valueOf(sa.getPort()))); + if (user.isAdmin()) { + HServerAddress addr = e.getValue(); + InetSocketAddress sa = addr.getInetSocketAddress(); + model.add( + new TableRegionModel(name, hri.getRegionId(), hri.getStartKey(), + hri.getEndKey(), + sa.getHostName() + ":" + Integer.valueOf(sa.getPort()))); + } else { + model.add( + new TableRegionModel(name, hri.getRegionId(), hri.getStartKey(), + hri.getEndKey())); + } } ResponseBuilder response = Response.ok(model); response.cacheControl(cacheControl); diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RootResource.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RootResource.java index eea45aca75b..300f76f4a10 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RootResource.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RootResource.java @@ -121,7 +121,7 @@ public class RootResource implements Constants { if (servlet.isMultiUser()) { throw new WebApplicationException(Response.Status.BAD_REQUEST); } - return new StorageClusterStatusResource(); + return new StorageClusterStatusResource(User.DEFAULT_USER); } @Path("version") @@ -135,7 +135,7 @@ public class RootResource implements Constants { if (servlet.isMultiUser()) { User user = auth(token); if (!servlet.userRequestLimit(user, 1)) { - throw new WebApplicationException(Response.status(509).build()); + return Response.status(509).build(); } try { ResponseBuilder response = Response.ok(getTableListForUser(user)); @@ -154,11 +154,8 @@ public class RootResource implements Constants { final @PathParam("token") String token) throws IOException { if (servlet.isMultiUser()) { User user = auth(token); - if (user.isAdmin()) { - if (!servlet.userRequestLimit(user, 1)) { - throw new WebApplicationException(Response.status(509).build()); - } - return new StorageClusterStatusResource(); + if (user != null && user.isAdmin()) { + return new StorageClusterStatusResource(user); } throw new WebApplicationException(Response.Status.FORBIDDEN); } @@ -185,7 +182,7 @@ public class RootResource implements Constants { if (servlet.isMultiUser()) { throw new WebApplicationException(Response.Status.BAD_REQUEST); } - return new TableResource(null, table); + return new TableResource(User.DEFAULT_USER, table); } } diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowResource.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowResource.java index 18738d23a79..0c2397da277 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowResource.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/RowResource.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.stargate.User; import org.apache.hadoop.hbase.stargate.model.CellModel; @@ -79,16 +80,19 @@ public class RowResource implements Constants { } this.servlet = RESTServlet.getInstance(); cacheControl = new CacheControl(); - cacheControl.setMaxAge(servlet.getMaxAge(table)); + cacheControl.setMaxAge(servlet.getMaxAge(actualTableName)); cacheControl.setNoTransform(false); } @GET @Produces({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) - public Response get(final @Context UriInfo uriInfo) { + public Response get(final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("GET " + uriInfo.getAbsolutePath()); } + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } servlet.getMetrics().incrementRequests(1); try { ResultGenerator generator = @@ -127,10 +131,14 @@ public class RowResource implements Constants { @GET @Produces(MIMETYPE_BINARY) - public Response getBinary(final @Context UriInfo uriInfo) { + public Response getBinary(final @Context UriInfo uriInfo) + throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("GET " + uriInfo.getAbsolutePath() + " as "+ MIMETYPE_BINARY); } + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } servlet.getMetrics().incrementRequests(1); // doesn't make sense to use a non specific coordinate as this can only // return a single cell @@ -166,6 +174,7 @@ public class RowResource implements Constants { throw new WebApplicationException(Response.status(509).build()); } table = pool.getTable(actualTableName); + ((HTable)table).setAutoFlush(false); for (RowModel row: rows) { byte[] key = row.getKey(); Put put = new Put(key); @@ -182,6 +191,7 @@ public class RowResource implements Constants { LOG.debug("PUT " + put.toString()); } } + ((HTable)table).setAutoFlush(true); table.flushCommits(); ResponseBuilder response = Response.ok(); return response.build(); @@ -236,7 +246,6 @@ public class RowResource implements Constants { if (LOG.isDebugEnabled()) { LOG.debug("PUT " + put.toString()); } - table.flushCommits(); return Response.ok().build(); } catch (IOException e) { throw new WebApplicationException(e, @@ -251,10 +260,13 @@ public class RowResource implements Constants { @PUT @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) public Response put(final CellSetModel model, - final @Context UriInfo uriInfo) { + final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("PUT " + uriInfo.getAbsolutePath()); } + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } return update(model, true); } @@ -262,38 +274,52 @@ public class RowResource implements Constants { @Consumes(MIMETYPE_BINARY) public Response putBinary(final byte[] message, final @Context UriInfo uriInfo, final @Context HttpHeaders headers) + throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("PUT " + uriInfo.getAbsolutePath() + " as "+ MIMETYPE_BINARY); } + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } return updateBinary(message, headers, true); } @POST @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) public Response post(final CellSetModel model, - final @Context UriInfo uriInfo) { + final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("POST " + uriInfo.getAbsolutePath()); } + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } return update(model, false); } @POST @Consumes(MIMETYPE_BINARY) public Response postBinary(final byte[] message, - final @Context UriInfo uriInfo, final @Context HttpHeaders headers) { + final @Context UriInfo uriInfo, final @Context HttpHeaders headers) + throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("POST " + uriInfo.getAbsolutePath() + " as "+MIMETYPE_BINARY); } + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } return updateBinary(message, headers, false); } @DELETE - public Response delete(final @Context UriInfo uriInfo) { + public Response delete(final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("DELETE " + uriInfo.getAbsolutePath()); } + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } servlet.getMetrics().incrementRequests(1); Delete delete = null; if (rowspec.hasTimestamp()) @@ -325,7 +351,6 @@ public class RowResource implements Constants { if (LOG.isDebugEnabled()) { LOG.debug("DELETE " + delete.toString()); } - table.flushCommits(); } catch (IOException e) { throw new WebApplicationException(e, Response.Status.SERVICE_UNAVAILABLE); diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ScannerResource.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ScannerResource.java index a77e048440a..4d09545a498 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ScannerResource.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/ScannerResource.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.stargate; import java.io.IOException; import java.net.URI; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -48,8 +49,8 @@ public class ScannerResource implements Constants { private static final Log LOG = LogFactory.getLog(ScannerResource.class); - static final Map scanners = - new HashMap(); + static final Map scanners = + Collections.synchronizedMap(new HashMap()); User user; String tableName; @@ -69,16 +70,17 @@ public class ScannerResource implements Constants { } static void delete(final String id) { - synchronized (scanners) { - ScannerInstanceResource instance = scanners.remove(id); - if (instance != null) { - instance.generator.close(); - } + ScannerInstanceResource instance = scanners.remove(id); + if (instance != null) { + instance.generator.close(); } } Response update(final ScannerModel model, final boolean replace, - final UriInfo uriInfo) { + final UriInfo uriInfo) throws IOException { + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } servlet.getMetrics().incrementRequests(1); byte[] endRow = model.hasEndRow() ? model.getEndRow() : null; RowSpec spec = new RowSpec(model.getStartRow(), endRow, @@ -91,9 +93,7 @@ public class ScannerResource implements Constants { ScannerInstanceResource instance = new ScannerInstanceResource(user, actualTableName, id, gen, model.getBatch()); - synchronized (scanners) { - scanners.put(id, instance); - } + scanners.put(id, instance); if (LOG.isDebugEnabled()) { LOG.debug("new scanner: " + id); } @@ -111,7 +111,7 @@ public class ScannerResource implements Constants { @PUT @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) public Response put(final ScannerModel model, - final @Context UriInfo uriInfo) { + final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("PUT " + uriInfo.getAbsolutePath()); } @@ -121,7 +121,7 @@ public class ScannerResource implements Constants { @POST @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) public Response post(final ScannerModel model, - final @Context UriInfo uriInfo) { + final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("POST " + uriInfo.getAbsolutePath()); } @@ -131,13 +131,11 @@ public class ScannerResource implements Constants { @Path("{scanner: .+}") public ScannerInstanceResource getScannerInstanceResource( final @PathParam("scanner") String id) { - synchronized (scanners) { - ScannerInstanceResource instance = scanners.get(id); - if (instance == null) { - throw new WebApplicationException(Response.Status.NOT_FOUND); - } - return instance; + ScannerInstanceResource instance = scanners.get(id); + if (instance == null) { + throw new WebApplicationException(Response.Status.NOT_FOUND); } + return instance; } } diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/SchemaResource.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/SchemaResource.java index 2ffa5d453cf..b3a8402767b 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/SchemaResource.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/SchemaResource.java @@ -35,10 +35,12 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; import javax.ws.rs.core.Response.ResponseBuilder; + import javax.xml.namespace.QName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableExistsException; @@ -46,7 +48,6 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTablePool; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.stargate.User; import org.apache.hadoop.hbase.stargate.model.ColumnSchemaModel; import org.apache.hadoop.hbase.stargate.model.TableSchemaModel; @@ -89,31 +90,17 @@ public class SchemaResource implements Constants { @GET @Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) - public Response get(final @Context UriInfo uriInfo) { + public Response get(final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("GET " + uriInfo.getAbsolutePath()); } + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } servlet.getMetrics().incrementRequests(1); try { - HTableDescriptor htd = getTableSchema(); - TableSchemaModel model = new TableSchemaModel(); - model.setName(tableName); - for (Map.Entry e: - htd.getValues().entrySet()) { - model.addAttribute(Bytes.toString(e.getKey().get()), - Bytes.toString(e.getValue().get())); - } - for (HColumnDescriptor hcd: htd.getFamilies()) { - ColumnSchemaModel columnModel = new ColumnSchemaModel(); - columnModel.setName(hcd.getNameAsString()); - for (Map.Entry e: - hcd.getValues().entrySet()) { - columnModel.addAttribute(Bytes.toString(e.getKey().get()), - Bytes.toString(e.getValue().get())); - } - model.addColumnFamily(columnModel); - } - ResponseBuilder response = Response.ok(model); + ResponseBuilder response = + Response.ok(new TableSchemaModel(getTableSchema())); response.cacheControl(cacheControl); return response.build(); } catch (TableNotFoundException e) { @@ -206,46 +193,52 @@ public class SchemaResource implements Constants { @PUT @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) public Response put(final TableSchemaModel model, - final @Context UriInfo uriInfo) { + final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("PUT " + uriInfo.getAbsolutePath()); } - servlet.getMetrics().incrementRequests(1); - // use the name given in the path, but warn if the name on the path and - // the name in the schema are different - if (model.getName() != tableName) { - LOG.warn("table name mismatch: path='" + tableName + "', schema='" + - model.getName() + "'"); + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); } + servlet.getMetrics().incrementRequests(1); return update(model, true, uriInfo); } @POST @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) public Response post(final TableSchemaModel model, - final @Context UriInfo uriInfo) { + final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("PUT " + uriInfo.getAbsolutePath()); } - servlet.getMetrics().incrementRequests(1); - // use the name given in the path, but warn if the name on the path and - // the name in the schema are different - if (model.getName() != tableName) { - LOG.warn("table name mismatch: path='" + tableName + "', schema='" + - model.getName() + "'"); + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); } + servlet.getMetrics().incrementRequests(1); return update(model, false, uriInfo); } @DELETE - public Response delete(final @Context UriInfo uriInfo) { + public Response delete(final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("DELETE " + uriInfo.getAbsolutePath()); } + if (!servlet.userRequestLimit(user, 1)) { + return Response.status(509).build(); + } servlet.getMetrics().incrementRequests(1); try { HBaseAdmin admin = new HBaseAdmin(servlet.getConfiguration()); - admin.disableTable(actualTableName); + boolean success = false; + for (int i = 0; i < 10; i++) try { + admin.disableTable(actualTableName); + success = true; + break; + } catch (IOException e) { + } + if (!success) { + throw new IOException("could not disable table"); + } admin.deleteTable(actualTableName); return Response.ok().build(); } catch (TableNotFoundException e) { diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/StorageClusterStatusResource.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/StorageClusterStatusResource.java index 494b44c2ff8..7c60b1c044d 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/StorageClusterStatusResource.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/StorageClusterStatusResource.java @@ -44,22 +44,27 @@ public class StorageClusterStatusResource implements Constants { private static final Log LOG = LogFactory.getLog(StorageClusterStatusResource.class); + private User user; private CacheControl cacheControl; private RESTServlet servlet; - public StorageClusterStatusResource() throws IOException { - servlet = RESTServlet.getInstance(); - cacheControl = new CacheControl(); - cacheControl.setNoCache(true); - cacheControl.setNoTransform(false); + public StorageClusterStatusResource(User user) throws IOException { + this.user = user; + this.servlet = RESTServlet.getInstance(); + this.cacheControl = new CacheControl(); + this.cacheControl.setNoCache(true); + this.cacheControl.setNoTransform(false); } @GET @Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) - public Response get(final @Context UriInfo uriInfo) { + public Response get(final @Context UriInfo uriInfo) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("GET " + uriInfo.getAbsolutePath()); } + if (!servlet.userRequestLimit(user, 1)) { + Response.status(509).build(); + } servlet.getMetrics().incrementRequests(1); try { HBaseAdmin admin = new HBaseAdmin(servlet.getConfiguration()); diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/TableResource.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/TableResource.java index 188c7bc367d..8fade7ac957 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/TableResource.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/TableResource.java @@ -40,6 +40,11 @@ public class TableResource implements Constants { this.table = table; } + @Path("exists") + public ExistsResource getExistsResource() throws IOException { + return new ExistsResource(user, table); + } + @Path("regions") public RegionsResource getRegionsResource() throws IOException { return new RegionsResource(user, table); @@ -66,4 +71,5 @@ public class TableResource implements Constants { Response.Status.INTERNAL_SERVER_ERROR); } } + } diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/User.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/User.java index 81d1b9da10c..9b8120ff4dd 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/User.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/User.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.util.Bytes; public class User implements Constants { public static final User DEFAULT_USER = new User("default", - "00000000000000000000000000000000", false, true); + "00000000000000000000000000000000", true, true); private String name; private String token; diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/Client.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/Client.java index 44668761799..e17e278234b 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/Client.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/Client.java @@ -64,10 +64,13 @@ public class Client { */ public Client(Cluster cluster) { this.cluster = cluster; - httpClient = new HttpClient(new MultiThreadedHttpConnectionManager()); - HttpConnectionManagerParams managerParams = - httpClient.getHttpConnectionManager().getParams(); + MultiThreadedHttpConnectionManager manager = + new MultiThreadedHttpConnectionManager(); + HttpConnectionManagerParams managerParams = manager.getParams(); managerParams.setConnectionTimeout(2000); // 2 s + managerParams.setDefaultMaxConnectionsPerHost(10); + managerParams.setMaxTotalConnections(100); + this.httpClient = new HttpClient(manager); HttpClientParams clientParams = httpClient.getParams(); clientParams.setVersion(HttpVersion.HTTP_1_1); } @@ -200,10 +203,13 @@ public class Client { public Response head(Cluster cluster, String path, Header[] headers) throws IOException { HeadMethod method = new HeadMethod(); - int code = execute(cluster, method, null, path); - headers = method.getResponseHeaders(); - method.releaseConnection(); - return new Response(code, headers, null); + try { + int code = execute(cluster, method, null, path); + headers = method.getResponseHeaders(); + return new Response(code, headers, null); + } finally { + method.releaseConnection(); + } } /** @@ -276,11 +282,14 @@ public class Client { public Response get(Cluster c, String path, Header[] headers) throws IOException { GetMethod method = new GetMethod(); - int code = execute(c, method, headers, path); - headers = method.getResponseHeaders(); - byte[] body = method.getResponseBody(); - method.releaseConnection(); - return new Response(code, headers, body); + try { + int code = execute(c, method, headers, path); + headers = method.getResponseHeaders(); + byte[] body = method.getResponseBody(); + return new Response(code, headers, body); + } finally { + method.releaseConnection(); + } } /** @@ -339,12 +348,15 @@ public class Client { public Response put(Cluster cluster, String path, Header[] headers, byte[] content) throws IOException { PutMethod method = new PutMethod(); - method.setRequestEntity(new ByteArrayRequestEntity(content)); - int code = execute(cluster, method, headers, path); - headers = method.getResponseHeaders(); - content = method.getResponseBody(); - method.releaseConnection(); - return new Response(code, headers, content); + try { + method.setRequestEntity(new ByteArrayRequestEntity(content)); + int code = execute(cluster, method, headers, path); + headers = method.getResponseHeaders(); + content = method.getResponseBody(); + return new Response(code, headers, content); + } finally { + method.releaseConnection(); + } } /** @@ -403,12 +415,15 @@ public class Client { public Response post(Cluster cluster, String path, Header[] headers, byte[] content) throws IOException { PostMethod method = new PostMethod(); - method.setRequestEntity(new ByteArrayRequestEntity(content)); - int code = execute(cluster, method, headers, path); - headers = method.getResponseHeaders(); - content = method.getResponseBody(); - method.releaseConnection(); - return new Response(code, headers, content); + try { + method.setRequestEntity(new ByteArrayRequestEntity(content)); + int code = execute(cluster, method, headers, path); + headers = method.getResponseHeaders(); + content = method.getResponseBody(); + return new Response(code, headers, content); + } finally { + method.releaseConnection(); + } } /** @@ -430,9 +445,14 @@ public class Client { */ public Response delete(Cluster cluster, String path) throws IOException { DeleteMethod method = new DeleteMethod(); - int code = execute(cluster, method, null, path); - Header[] headers = method.getResponseHeaders(); - method.releaseConnection(); - return new Response(code, headers); + try { + int code = execute(cluster, method, null, path); + Header[] headers = method.getResponseHeaders(); + byte[] content = method.getResponseBody(); + return new Response(code, headers, content); + } finally { + method.releaseConnection(); + } } + } diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/Cluster.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/Cluster.java index 2264256421a..39a959bcedd 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/Cluster.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/Cluster.java @@ -46,6 +46,13 @@ public class Cluster { nodes.addAll(nodes); } + /** + * @return true if no locations have been added, false otherwise + */ + public boolean isEmpty() { + return nodes.isEmpty(); + } + /** * Add a node to the cluster * @param node the service location in 'host:port' format diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteAdmin.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteAdmin.java new file mode 100644 index 00000000000..3a918961f33 --- /dev/null +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteAdmin.java @@ -0,0 +1,188 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.stargate.client; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.stargate.Constants; +import org.apache.hadoop.hbase.stargate.model.TableSchemaModel; +import org.apache.hadoop.hbase.util.Bytes; + +public class RemoteAdmin { + + final Client client; + final Configuration conf; + final String accessToken; + final int maxRetries; + final long sleepTime; + + /** + * Constructor + * @param client + * @param conf + */ + public RemoteAdmin(Client client, Configuration conf) { + this(client, conf, null); + } + + /** + * Constructor + * @param client + * @param conf + * @param accessToken + */ + public RemoteAdmin(Client client, Configuration conf, String accessToken) { + this.client = client; + this.conf = conf; + this.accessToken = accessToken; + this.maxRetries = conf.getInt("stargate.client.max.retries", 10); + this.sleepTime = conf.getLong("stargate.client.sleep", 1000); + } + + /** + * @param tableName name of table to check + * @return true if all regions of the table are available + * @throws IOException if a remote or network exception occurs + */ + public boolean isTableAvailable(String tableName) throws IOException { + return isTableAvailable(Bytes.toBytes(tableName)); + } + + /** + * @param tableName name of table to check + * @return true if all regions of the table are available + * @throws IOException if a remote or network exception occurs + */ + public boolean isTableAvailable(byte[] tableName) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append('/'); + if (accessToken != null) { + sb.append(accessToken); + sb.append('/'); + } + sb.append(Bytes.toStringBinary(tableName)); + sb.append('/'); + sb.append("exists"); + int code = 0; + for (int i = 0; i < maxRetries; i++) { + Response response = client.get(sb.toString()); + code = response.getCode(); + switch (code) { + case 200: + return true; + case 404: + return false; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { } + break; + default: + throw new IOException("exists request returned " + code); + } + } + throw new IOException("exists request timed out"); + } + + /** + * Creates a new table. + * @param desc table descriptor for table + * @throws IOException if a remote or network exception occurs + */ + public void createTable(HTableDescriptor desc) + throws IOException { + TableSchemaModel model = new TableSchemaModel(desc); + StringBuilder sb = new StringBuilder(); + sb.append('/'); + if (accessToken != null) { + sb.append(accessToken); + sb.append('/'); + } + sb.append(Bytes.toStringBinary(desc.getName())); + sb.append('/'); + sb.append("schema"); + int code = 0; + for (int i = 0; i < maxRetries; i++) { + Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, + model.createProtobufOutput()); + code = response.getCode(); + switch (code) { + case 201: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { } + break; + default: + throw new IOException("create request returned " + code); + } + } + throw new IOException("create request timed out"); + } + + /** + * Deletes a table. + * @param tableName name of table to delete + * @throws IOException if a remote or network exception occurs + */ + public void deleteTable(final String tableName) throws IOException { + deleteTable(Bytes.toBytes(tableName)); + } + + /** + * Deletes a table. + * @param tableName name of table to delete + * @throws IOException if a remote or network exception occurs + */ + public void deleteTable(final byte [] tableName) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append('/'); + if (accessToken != null) { + sb.append(accessToken); + sb.append('/'); + } + sb.append(Bytes.toStringBinary(tableName)); + sb.append('/'); + sb.append("schema"); + int code = 0; + for (int i = 0; i < maxRetries; i++) { + Response response = client.delete(sb.toString()); + code = response.getCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { } + break; + default: + throw new IOException("delete request returned " + code); + } + } + throw new IOException("delete request timed out"); + } + +} diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteHTable.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteHTable.java index 3ddfb859d49..042ac92248b 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteHTable.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/client/RemoteHTable.java @@ -29,16 +29,13 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import javax.xml.namespace.QName; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -54,7 +51,6 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.stargate.Constants; import org.apache.hadoop.hbase.stargate.model.CellModel; import org.apache.hadoop.hbase.stargate.model.CellSetModel; -import org.apache.hadoop.hbase.stargate.model.ColumnSchemaModel; import org.apache.hadoop.hbase.stargate.model.RowModel; import org.apache.hadoop.hbase.stargate.model.ScannerModel; import org.apache.hadoop.hbase.stargate.model.TableSchemaModel; @@ -67,11 +63,13 @@ public class RemoteHTable implements HTableInterface { private static final Log LOG = LogFactory.getLog(RemoteHTable.class); - Client client; - Configuration conf; - byte[] name; - String accessToken; - + final Client client; + final Configuration conf; + final byte[] name; + final String accessToken; + final int maxRetries; + final long sleepTime; + @SuppressWarnings("unchecked") protected String buildRowSpec(final byte[] row, final Map familyMap, final long startTime, final long endTime, final int maxVersions) { @@ -210,19 +208,18 @@ public class RemoteHTable implements HTableInterface { this.conf = conf; this.name = name; this.accessToken = accessToken; + this.maxRetries = conf.getInt("stargate.client.max.retries", 10); + this.sleepTime = conf.getLong("stargate.client.sleep", 1000); } - @Override public byte[] getTableName() { return name.clone(); } - @Override public Configuration getConfiguration() { return conf; } - @Override public HTableDescriptor getTableDescriptor() throws IOException { StringBuilder sb = new StringBuilder(); sb.append('/'); @@ -233,32 +230,30 @@ public class RemoteHTable implements HTableInterface { sb.append(Bytes.toStringBinary(name)); sb.append('/'); sb.append("schema"); - Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF); - if (response.getCode() != 200) { - throw new IOException("schema request returned " + response.getCode()); - } - TableSchemaModel schema = new TableSchemaModel(); - schema.getObjectFromMessage(response.getBody()); - HTableDescriptor htd = new HTableDescriptor(schema.getName()); - for (Map.Entry e: schema.getAny().entrySet()) { - htd.setValue(e.getKey().getLocalPart(), e.getValue().toString()); - } - for (ColumnSchemaModel column: schema.getColumns()) { - HColumnDescriptor hcd = new HColumnDescriptor(column.getName()); - for (Map.Entry e: column.getAny().entrySet()) { - hcd.setValue(e.getKey().getLocalPart(), e.getValue().toString()); + for (int i = 0; i < maxRetries; i++) { + Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF); + int code = response.getCode(); + switch (code) { + case 200: + TableSchemaModel schema = new TableSchemaModel(); + schema.getObjectFromMessage(response.getBody()); + return schema.getTableDescriptor(); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { } + break; + default: + throw new IOException("schema request returned " + code); } - htd.addFamily(hcd); } - return htd; + throw new IOException("schema request timed out"); } - @Override public void close() throws IOException { client.shutdown(); } - @Override public Result get(Get get) throws IOException { TimeRange range = get.getTimeRange(); String spec = buildRowSpec(get.getRow(), get.getFamilyMap(), @@ -266,34 +261,41 @@ public class RemoteHTable implements HTableInterface { if (get.getFilter() != null) { LOG.warn("filters not supported on gets"); } - Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF); - int code = response.getCode(); - if (code == 404) { - return new Result(); - } - if (code != 200) { - throw new IOException("get request returned " + code); - } - CellSetModel model = new CellSetModel(); - model.getObjectFromMessage(response.getBody()); - Result[] results = buildResultFromModel(model); - if (results.length > 0) { - if (results.length > 1) { - LOG.warn("too many results for get (" + results.length + ")"); + for (int i = 0; i < maxRetries; i++) { + Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF); + int code = response.getCode(); + switch (code) { + case 200: + CellSetModel model = new CellSetModel(); + model.getObjectFromMessage(response.getBody()); + Result[] results = buildResultFromModel(model); + if (results.length > 0) { + if (results.length > 1) { + LOG.warn("too many results for get (" + results.length + ")"); + } + return results[0]; + } + // fall through + case 404: + return new Result(); + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { } + break; + default: + throw new IOException("get request returned " + code); } - return results[0]; } - return new Result(); + throw new IOException("get request timed out"); } - @Override public boolean exists(Get get) throws IOException { LOG.warn("exists() is really get(), just use get()"); Result result = get(get); return (result != null && !(result.isEmpty())); } - @Override public void put(Put put) throws IOException { CellSetModel model = buildModelFromPut(put); StringBuilder sb = new StringBuilder(); @@ -305,14 +307,25 @@ public class RemoteHTable implements HTableInterface { sb.append(Bytes.toStringBinary(name)); sb.append('/'); sb.append(Bytes.toStringBinary(put.getRow())); - Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, - model.createProtobufOutput()); - if (response.getCode() != 200) { - throw new IOException("put failed with " + response.getCode()); + for (int i = 0; i < maxRetries; i++) { + Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, + model.createProtobufOutput()); + int code = response.getCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { } + break; + default: + throw new IOException("put request failed with " + code); + } } + throw new IOException("put request timed out"); } - @Override public void put(List puts) throws IOException { // this is a trick: Stargate accepts multiple rows in a cell set and // ignores the row specification in the URI @@ -351,31 +364,52 @@ public class RemoteHTable implements HTableInterface { } sb.append(Bytes.toStringBinary(name)); sb.append("/$multiput"); // can be any nonexistent row - Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, - model.createProtobufOutput()); - if (response.getCode() != 200) { - throw new IOException("multiput failed with " + response.getCode()); + for (int i = 0; i < maxRetries; i++) { + Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, + model.createProtobufOutput()); + int code = response.getCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { } + break; + default: + throw new IOException("multiput request failed with " + code); + } } + throw new IOException("multiput request timed out"); } - @Override public void delete(Delete delete) throws IOException { String spec = buildRowSpec(delete.getRow(), delete.getFamilyMap(), delete.getTimeStamp(), delete.getTimeStamp(), 1); - Response response = client.delete(spec); - if (response.getCode() != 200) { - throw new IOException("delete() returned " + response.getCode()); + for (int i = 0; i < maxRetries; i++) { + Response response = client.delete(spec); + int code = response.getCode(); + switch (code) { + case 200: + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { } + break; + default: + throw new IOException("delete request failed with " + code); + } } + throw new IOException("delete request timed out"); } - @Override public void delete(List deletes) throws IOException { for (Delete delete: deletes) { delete(delete); } } - @Override public void flushCommits() throws IOException { // no-op } @@ -385,6 +419,12 @@ public class RemoteHTable implements HTableInterface { String uri; public Scanner(Scan scan) throws IOException { + ScannerModel model; + try { + model = ScannerModel.fromScan(scan); + } catch (Exception e) { + throw new IOException(e); + } StringBuffer sb = new StringBuffer(); sb.append('/'); if (accessToken != null) { @@ -394,18 +434,24 @@ public class RemoteHTable implements HTableInterface { sb.append(Bytes.toStringBinary(name)); sb.append('/'); sb.append("scanner"); - try { - ScannerModel model = ScannerModel.fromScan(scan); + for (int i = 0; i < maxRetries; i++) { Response response = client.post(sb.toString(), Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput()); - if (response.getCode() != 201) { - throw new IOException("scan request failed with " + - response.getCode()); + int code = response.getCode(); + switch (code) { + case 201: + uri = response.getLocation(); + return; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { } + break; + default: + throw new IOException("scan request failed with " + code); } - uri = response.getLocation(); - } catch (Exception e) { - throw new IOException(e); } + throw new IOException("scan request timed out"); } @Override @@ -413,18 +459,28 @@ public class RemoteHTable implements HTableInterface { StringBuilder sb = new StringBuilder(uri); sb.append("?n="); sb.append(nbRows); - Response response = client.get(sb.toString(), - Constants.MIMETYPE_PROTOBUF); - if (response.getCode() == 206) { - return null; + for (int i = 0; i < maxRetries; i++) { + Response response = client.get(sb.toString(), + Constants.MIMETYPE_PROTOBUF); + int code = response.getCode(); + switch (code) { + case 200: + CellSetModel model = new CellSetModel(); + model.getObjectFromMessage(response.getBody()); + return buildResultFromModel(model); + case 204: + case 206: + return null; + case 509: + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { } + break; + default: + throw new IOException("scanner.next request failed with " + code); + } } - if (response.getCode() != 200) { - LOG.error("scanner.next failed with " + response.getCode()); - return null; - } - CellSetModel model = new CellSetModel(); - model.getObjectFromMessage(response.getBody()); - return buildResultFromModel(model); + throw new IOException("scanner.next request timed out"); } @Override @@ -487,20 +543,17 @@ public class RemoteHTable implements HTableInterface { } } - - @Override + public ResultScanner getScanner(Scan scan) throws IOException { return new Scanner(scan); } - @Override public ResultScanner getScanner(byte[] family) throws IOException { Scan scan = new Scan(); scan.addFamily(family); return new Scanner(scan); } - @Override public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { Scan scan = new Scan(); @@ -508,39 +561,32 @@ public class RemoteHTable implements HTableInterface { return new Scanner(scan); } - @Override public boolean isAutoFlush() { return true; } - @Override public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { throw new IOException("getRowOrBefore not supported"); } - @Override public RowLock lockRow(byte[] row) throws IOException { throw new IOException("lockRow not implemented"); } - @Override public void unlockRow(RowLock rl) throws IOException { throw new IOException("unlockRow not implemented"); } - @Override public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { throw new IOException("checkAndPut not supported"); } - @Override public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { throw new IOException("incrementColumnValue not supported"); } - @Override public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException { throw new IOException("incrementColumnValue not supported"); diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/TableRegionModel.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/TableRegionModel.java index 9d721071d92..607b9b589c6 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/TableRegionModel.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/TableRegionModel.java @@ -59,6 +59,18 @@ public class TableRegionModel implements Serializable { */ public TableRegionModel() {} + /** + * Constructor + * @param table the table name + * @param id the encoded id of the region + * @param startKey the start key of the region + * @param endKey the end key of the region + */ + public TableRegionModel(String table, long id, byte[] startKey, + byte[] endKey) { + this(table, id, startKey, endKey, null); + } + /** * Constructor * @param table the table name @@ -173,8 +185,10 @@ public class TableRegionModel implements Serializable { sb.append(Bytes.toString(startKey)); sb.append("'\n endKey='"); sb.append(Bytes.toString(endKey)); - sb.append("'\n location='"); - sb.append(location); + if (location != null) { + sb.append("'\n location='"); + sb.append(location); + } sb.append("'\n]\n"); return sb.toString(); } diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/TableSchemaModel.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/TableSchemaModel.java index c1f352e9b77..7d5784b574a 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/TableSchemaModel.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/model/TableSchemaModel.java @@ -38,9 +38,11 @@ import javax.xml.namespace.QName; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.stargate.ProtobufMessageHandler; import org.apache.hadoop.hbase.stargate.protobuf.generated.ColumnSchemaMessage.ColumnSchema; import org.apache.hadoop.hbase.stargate.protobuf.generated.TableSchemaMessage.TableSchema; +import org.apache.hadoop.hbase.util.Bytes; /** * A representation of HBase table descriptors. @@ -77,6 +79,29 @@ public class TableSchemaModel implements Serializable, ProtobufMessageHandler { */ public TableSchemaModel() {} + /** + * Constructor + * @param htd the table descriptor + */ + public TableSchemaModel(HTableDescriptor htd) { + setName(htd.getNameAsString()); + for (Map.Entry e: + htd.getValues().entrySet()) { + addAttribute(Bytes.toString(e.getKey().get()), + Bytes.toString(e.getValue().get())); + } + for (HColumnDescriptor hcd: htd.getFamilies()) { + ColumnSchemaModel columnModel = new ColumnSchemaModel(); + columnModel.setName(hcd.getNameAsString()); + for (Map.Entry e: + hcd.getValues().entrySet()) { + columnModel.addAttribute(Bytes.toString(e.getKey().get()), + Bytes.toString(e.getValue().get())); + } + addColumnFamily(columnModel); + } + } + /** * Add an attribute to the table descriptor * @param name attribute name @@ -308,4 +333,23 @@ public class TableSchemaModel implements Serializable, ProtobufMessageHandler { } return this; } + + /** + * @return a table descriptor + */ + public HTableDescriptor getTableDescriptor() { + HTableDescriptor htd = new HTableDescriptor(getName()); + for (Map.Entry e: getAny().entrySet()) { + htd.setValue(e.getKey().getLocalPart(), e.getValue().toString()); + } + for (ColumnSchemaModel column: getColumns()) { + HColumnDescriptor hcd = new HColumnDescriptor(column.getName()); + for (Map.Entry e: column.getAny().entrySet()) { + hcd.setValue(e.getKey().getLocalPart(), e.getValue().toString()); + } + htd.addFamily(hcd); + } + return htd; + } + } diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/util/HTableTokenBucket.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/util/HTableTokenBucket.java index 007cb5d22e6..ed3b16dcd5e 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/util/HTableTokenBucket.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/util/HTableTokenBucket.java @@ -70,7 +70,7 @@ public class HTableTokenBucket implements Constants { HTable table; byte[] row; int tokens; - double rate = 10.0; // default, 10 ops added per second + double rate = 20.0; // default, 20 ops added per second int size = 100; // burst long lastUpdated = System.currentTimeMillis(); long configUpdateInterval; diff --git a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/util/UserData.java b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/util/UserData.java index 6cc3522ccb9..0fd2252b149 100644 --- a/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/util/UserData.java +++ b/contrib/stargate/core/src/main/java/org/apache/hadoop/hbase/stargate/util/UserData.java @@ -20,46 +20,32 @@ package org.apache.hadoop.hbase.stargate.util; -import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; /** * Generic storage for per user information. */ public class UserData { - public static final int TOKENBUCKET = 0; + public static final int TOKENBUCKET = 1; - ArrayList data = new ArrayList(); + Map data = new HashMap(1); public synchronized boolean has(final int sel) { - try { - return data.get(sel) != null; - } catch (IndexOutOfBoundsException e) { - return false; - } + return data.get(sel) != null; } public synchronized Object get(final int sel) { - try { - return data.get(sel); - } catch (IndexOutOfBoundsException e) { - return null; - } + return data.get(sel); } public synchronized Object put(final int sel, final Object o) { - Object old = null; - try { - old = data.get(sel); - } catch (IndexOutOfBoundsException e) { - // do nothing - } - data.set(sel, o); - return old; + return data.put(sel, o); } public synchronized Object remove(int sel) { - return put(sel, null); + return remove(sel); } } diff --git a/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/PerformanceEvaluation.java b/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/PerformanceEvaluation.java new file mode 100644 index 00000000000..ff42e54f30c --- /dev/null +++ b/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/PerformanceEvaluation.java @@ -0,0 +1,1255 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.stargate; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.PrintStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.Arrays; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.WhileMatchFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.stargate.client.Client; +import org.apache.hadoop.hbase.stargate.client.Cluster; +import org.apache.hadoop.hbase.stargate.client.RemoteAdmin; +import org.apache.hadoop.hbase.stargate.client.RemoteHTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Hash; +import org.apache.hadoop.hbase.util.MurmurHash; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; +import org.apache.hadoop.util.LineReader; + +/** + * Script used evaluating Stargate performance and scalability. Runs a SG + * client that steps through one of a set of hardcoded tests or 'experiments' + * (e.g. a random reads test, a random writes test, etc.). Pass on the + * command-line which test to run and how many clients are participating in + * this experiment. Run java PerformanceEvaluation --help to + * obtain usage. + * + *

This class sets up and runs the evaluation programs described in + * Section 7, Performance Evaluation, of the Bigtable + * paper, pages 8-10. + * + *

If number of clients > 1, we start up a MapReduce job. Each map task + * runs an individual client. Each client does about 1GB of data. + */ +public class PerformanceEvaluation { + protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName()); + + private static final int ROW_LENGTH = 1000; + private static final int ONE_GB = 1024 * 1024 * 1000; + private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH; + + public static final byte [] TABLE_NAME = Bytes.toBytes("TestTable"); + public static final byte [] FAMILY_NAME = Bytes.toBytes("info"); + public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data"); + + protected static final HTableDescriptor TABLE_DESCRIPTOR; + static { + TABLE_DESCRIPTOR = new HTableDescriptor(TABLE_NAME); + TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME)); + } + + protected Map commands = new TreeMap(); + protected static Cluster cluster = new Cluster(); + protected static String accessToken = null; + + volatile Configuration conf; + private boolean nomapred = false; + private int N = 1; + private int R = ROWS_PER_GB; + private int B = 100; + + private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); + /** + * Regex to parse lines in input file passed to mapreduce task. + */ + public static final Pattern LINE_PATTERN = + Pattern.compile("startRow=(\\d+),\\s+" + + "perClientRunRows=(\\d+),\\s+" + + "totalRows=(\\d+),\\s+" + + "clients=(\\d+),\\s+" + + "rowsPerPut=(\\d+)"); + + /** + * Enum for map metrics. Keep it out here rather than inside in the Map + * inner-class so we can find associated properties. + */ + protected static enum Counter { + /** elapsed time */ + ELAPSED_TIME, + /** number of rows */ + ROWS} + + /** + * Constructor + * @param c Configuration object + */ + public PerformanceEvaluation(final Configuration c) { + this.conf = c; + + addCommandDescriptor(RandomReadTest.class, "randomRead", + "Run random read test"); + addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", + "Run random seek and scan 100 test"); + addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10", + "Run random seek scan with both start and stop row (max 10 rows)"); + addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100", + "Run random seek scan with both start and stop row (max 100 rows)"); + addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000", + "Run random seek scan with both start and stop row (max 1000 rows)"); + addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000", + "Run random seek scan with both start and stop row (max 10000 rows)"); + addCommandDescriptor(RandomWriteTest.class, "randomWrite", + "Run random write test"); + addCommandDescriptor(SequentialReadTest.class, "sequentialRead", + "Run sequential read test"); + addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", + "Run sequential write test"); + addCommandDescriptor(ScanTest.class, "scan", + "Run scan test (read every row)"); + addCommandDescriptor(FilteredScanTest.class, "filterScan", + "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)"); + } + + protected void addCommandDescriptor(Class cmdClass, + String name, String description) { + CmdDescriptor cmdDescriptor = + new CmdDescriptor(cmdClass, name, description); + commands.put(name, cmdDescriptor); + } + + /** + * Implementations can have their status set. + */ + static interface Status { + /** + * Sets status + * @param msg status message + * @throws IOException + */ + void setStatus(final String msg) throws IOException; + } + + /** + * This class works as the InputSplit of Performance Evaluation + * MapReduce InputFormat, and the Record Value of RecordReader. + * Each map task will only read one record from a PeInputSplit, + * the record value is the PeInputSplit itself. + */ + public static class PeInputSplit extends InputSplit implements Writable { + private int startRow = 0; + private int rows = 0; + private int totalRows = 0; + private int clients = 0; + private int rowsPerPut = 1; + + public PeInputSplit() { + this.startRow = 0; + this.rows = 0; + this.totalRows = 0; + this.clients = 0; + this.rowsPerPut = 1; + } + + public PeInputSplit(int startRow, int rows, int totalRows, int clients, + int rowsPerPut) { + this.startRow = startRow; + this.rows = rows; + this.totalRows = totalRows; + this.clients = clients; + this.rowsPerPut = 1; + } + + @Override + public void readFields(DataInput in) throws IOException { + this.startRow = in.readInt(); + this.rows = in.readInt(); + this.totalRows = in.readInt(); + this.clients = in.readInt(); + this.rowsPerPut = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(startRow); + out.writeInt(rows); + out.writeInt(totalRows); + out.writeInt(clients); + out.writeInt(rowsPerPut); + } + + @Override + public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[0]; + } + + public int getStartRow() { + return startRow; + } + + public int getRows() { + return rows; + } + + public int getTotalRows() { + return totalRows; + } + + public int getClients() { + return clients; + } + + public int getRowsPerPut() { + return rowsPerPut; + } + } + + /** + * InputFormat of Performance Evaluation MapReduce job. + * It extends from FileInputFormat, want to use it's methods such as setInputPaths(). + */ + public static class PeInputFormat extends FileInputFormat { + + @Override + public List getSplits(JobContext job) throws IOException { + // generate splits + List splitList = new ArrayList(); + + for (FileStatus file: listStatus(job)) { + Path path = file.getPath(); + FileSystem fs = path.getFileSystem(job.getConfiguration()); + FSDataInputStream fileIn = fs.open(path); + LineReader in = new LineReader(fileIn, job.getConfiguration()); + int lineLen = 0; + while(true) { + Text lineText = new Text(); + lineLen = in.readLine(lineText); + if(lineLen <= 0) { + break; + } + Matcher m = LINE_PATTERN.matcher(lineText.toString()); + if((m != null) && m.matches()) { + int startRow = Integer.parseInt(m.group(1)); + int rows = Integer.parseInt(m.group(2)); + int totalRows = Integer.parseInt(m.group(3)); + int clients = Integer.parseInt(m.group(4)); + int rowsPerPut = Integer.parseInt(m.group(5)); + + LOG.debug("split["+ splitList.size() + "] " + + " startRow=" + startRow + + " rows=" + rows + + " totalRows=" + totalRows + + " clients=" + clients + + " rowsPerPut=" + rowsPerPut); + + PeInputSplit newSplit = + new PeInputSplit(startRow, rows, totalRows, clients, rowsPerPut); + splitList.add(newSplit); + } + } + in.close(); + } + + LOG.info("Total # of splits: " + splitList.size()); + return splitList; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) { + return new PeRecordReader(); + } + + public static class PeRecordReader extends RecordReader { + private boolean readOver = false; + private PeInputSplit split = null; + private NullWritable key = null; + private PeInputSplit value = null; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + this.readOver = false; + this.split = (PeInputSplit)split; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if(readOver) { + return false; + } + + key = NullWritable.get(); + value = (PeInputSplit)split; + + readOver = true; + return true; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return key; + } + + @Override + public PeInputSplit getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + if(readOver) { + return 1.0f; + } else { + return 0.0f; + } + } + + @Override + public void close() throws IOException { + // do nothing + } + } + } + + /** + * MapReduce job that runs a performance evaluation client in each map task. + */ + public static class EvaluationMapTask + extends Mapper { + + /** configuration parameter name that contains the command */ + public final static String CMD_KEY = "EvaluationMapTask.command"; + /** configuration parameter name that contains the PE impl */ + public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl"; + + private Class cmd; + private PerformanceEvaluation pe; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class); + + // this is required so that extensions of PE are instantiated within the + // map reduce task... + Class peClass = + forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class); + try { + this.pe = peClass.getConstructor(Configuration.class) + .newInstance(context.getConfiguration()); + } catch (Exception e) { + throw new IllegalStateException("Could not instantiate PE instance", e); + } + } + + private Class forName(String className, Class type) { + Class clazz = null; + try { + clazz = Class.forName(className).asSubclass(type); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("Could not find class for name: " + className, e); + } + return clazz; + } + + protected void map(NullWritable key, PeInputSplit value, final Context context) + throws IOException, InterruptedException { + + Status status = new Status() { + public void setStatus(String msg) { + context.setStatus(msg); + } + }; + + // Evaluation task + long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(), + value.getRows(), value.getTotalRows(), value.getRowsPerPut(), status); + // Collect how much time the thing took. Report as map output and + // to the ELAPSED_TIME counter. + context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); + context.getCounter(Counter.ROWS).increment(value.rows); + context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime)); + context.progress(); + } + } + + /* + * If table does not already exist, create. + * @param c Client to use checking. + * @return True if we created the table. + * @throws IOException + */ + private boolean checkTable() throws IOException { + HTableDescriptor tableDescriptor = getTableDescriptor(); + RemoteAdmin admin = + new RemoteAdmin(new Client(cluster), conf, accessToken); + if (!admin.isTableAvailable(tableDescriptor.getName())) { + admin.createTable(tableDescriptor); + return true; + } + return false; + } + + protected HTableDescriptor getTableDescriptor() { + return TABLE_DESCRIPTOR; + } + + /* + * We're to run multiple clients concurrently. Setup a mapreduce job. Run + * one map per client. Then run a single reduce to sum the elapsed times. + * @param cmd Command to run. + * @throws IOException + */ + private void runNIsMoreThanOne(final Class cmd) + throws IOException, InterruptedException, ClassNotFoundException { + checkTable(); + if (nomapred) { + doMultipleClients(cmd); + } else { + doMapReduce(cmd); + } + } + + /* + * Run all clients in this vm each to its own thread. + * @param cmd Command to run. + * @throws IOException + */ + private void doMultipleClients(final Class cmd) throws IOException { + final List threads = new ArrayList(N); + final int perClientRows = R/N; + for (int i = 0; i < N; i++) { + Thread t = new Thread (Integer.toString(i)) { + @Override + public void run() { + super.run(); + PerformanceEvaluation pe = new PerformanceEvaluation(conf); + int index = Integer.parseInt(getName()); + try { + long elapsedTime = pe.runOneClient(cmd, index * perClientRows, + perClientRows, R, B, new Status() { + public void setStatus(final String msg) throws IOException { + LOG.info("client-" + getName() + " " + msg); + } + }); + LOG.info("Finished " + getName() + " in " + elapsedTime + + "ms writing " + perClientRows + " rows"); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + threads.add(t); + } + for (Thread t: threads) { + t.start(); + } + for (Thread t: threads) { + while(t.isAlive()) { + try { + t.join(); + } catch (InterruptedException e) { + LOG.debug("Interrupted, continuing" + e.toString()); + } + } + } + } + + /* + * Run a mapreduce job. Run as many maps as asked-for clients. + * Before we start up the job, write out an input file with instruction + * per client regards which row they are to start on. + * @param cmd Command to run. + * @throws IOException + */ + private void doMapReduce(final Class cmd) throws IOException, + InterruptedException, ClassNotFoundException { + Path inputDir = writeInputFile(this.conf); + this.conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); + this.conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); + Job job = new Job(this.conf); + job.setJarByClass(PerformanceEvaluation.class); + job.setJobName("HBase Performance Evaluation"); + + job.setInputFormatClass(PeInputFormat.class); + PeInputFormat.setInputPaths(job, inputDir); + + job.setOutputKeyClass(LongWritable.class); + job.setOutputValueClass(LongWritable.class); + + job.setMapperClass(EvaluationMapTask.class); + job.setReducerClass(LongSumReducer.class); + + job.setNumReduceTasks(1); + + job.setOutputFormatClass(TextOutputFormat.class); + TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs")); + + job.waitForCompletion(true); + } + + /* + * Write input file of offsets-per-client for the mapreduce job. + * @param c Configuration + * @return Directory that contains file written. + * @throws IOException + */ + private Path writeInputFile(final Configuration c) throws IOException { + FileSystem fs = FileSystem.get(c); + if (!fs.exists(PERF_EVAL_DIR)) { + fs.mkdirs(PERF_EVAL_DIR); + } + SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); + Path subdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); + fs.mkdirs(subdir); + Path inputFile = new Path(subdir, "input.txt"); + PrintStream out = new PrintStream(fs.create(inputFile)); + // Make input random. + Map m = new TreeMap(); + Hash h = MurmurHash.getInstance(); + int perClientRows = (R / N); + try { + for (int i = 0; i < 10; i++) { + for (int j = 0; j < N; j++) { + String s = "startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) + + ", perClientRunRows=" + (perClientRows / 10) + + ", totalRows=" + R + + ", clients=" + N + + ", rowsPerPut=" + B; + int hash = h.hash(Bytes.toBytes(s)); + m.put(hash, s); + } + } + for (Map.Entry e: m.entrySet()) { + out.println(e.getValue()); + } + } finally { + out.close(); + } + return subdir; + } + + /** + * Describes a command. + */ + static class CmdDescriptor { + private Class cmdClass; + private String name; + private String description; + + CmdDescriptor(Class cmdClass, String name, String description) { + this.cmdClass = cmdClass; + this.name = name; + this.description = description; + } + + public Class getCmdClass() { + return cmdClass; + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + } + + /** + * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation.Test + * tests}. This makes the reflection logic a little easier to understand... + */ + static class TestOptions { + private int startRow; + private int perClientRunRows; + private int totalRows; + private byte[] tableName; + private int rowsPerPut; + + TestOptions() { + } + + TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, int rowsPerPut) { + this.startRow = startRow; + this.perClientRunRows = perClientRunRows; + this.totalRows = totalRows; + this.tableName = tableName; + this.rowsPerPut = rowsPerPut; + } + + public int getStartRow() { + return startRow; + } + + public int getPerClientRunRows() { + return perClientRunRows; + } + + public int getTotalRows() { + return totalRows; + } + + public byte[] getTableName() { + return tableName; + } + + public int getRowsPerPut() { + return rowsPerPut; + } + } + + /* + * A test. + * Subclass to particularize what happens per row. + */ + static abstract class Test { + // Below is make it so when Tests are all running in the one + // jvm, that they each have a differently seeded Random. + private static final Random randomSeed = + new Random(System.currentTimeMillis()); + private static long nextRandomSeed() { + return randomSeed.nextLong(); + } + protected final Random rand = new Random(nextRandomSeed()); + + protected final int startRow; + protected final int perClientRunRows; + protected final int totalRows; + protected final Status status; + protected byte[] tableName; + protected RemoteHTable table; + protected volatile Configuration conf; + + /** + * Note that all subclasses of this class must provide a public contructor + * that has the exact same list of arguments. + */ + Test(final Configuration conf, final TestOptions options, final Status status) { + super(); + this.startRow = options.getStartRow(); + this.perClientRunRows = options.getPerClientRunRows(); + this.totalRows = options.getTotalRows(); + this.status = status; + this.tableName = options.getTableName(); + this.table = null; + this.conf = conf; + } + + protected String generateStatus(final int sr, final int i, final int lr) { + return sr + "/" + i + "/" + lr; + } + + protected int getReportingPeriod() { + int period = this.perClientRunRows / 10; + return period == 0? this.perClientRunRows: period; + } + + void testSetup() throws IOException { + this.table = new RemoteHTable(new Client(cluster), conf, tableName, + accessToken); + } + + void testTakedown() throws IOException { + this.table.close(); + } + + /* + * Run test + * @return Elapsed time. + * @throws IOException + */ + long test() throws IOException { + long elapsedTime; + testSetup(); + long startTime = System.currentTimeMillis(); + try { + testTimed(); + elapsedTime = System.currentTimeMillis() - startTime; + } finally { + testTakedown(); + } + return elapsedTime; + } + + /** + * Provides an extension point for tests that don't want a per row invocation. + */ + void testTimed() throws IOException { + int lastRow = this.startRow + this.perClientRunRows; + // Report on completion of 1/10th of total. + for (int i = this.startRow; i < lastRow; i++) { + testRow(i); + if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { + status.setStatus(generateStatus(this.startRow, i, lastRow)); + } + } + } + + /* + * Test for individual row. + * @param i Row index. + */ + void testRow(final int i) throws IOException { + } + } + + @SuppressWarnings("unused") + static class RandomSeekScanTest extends Test { + RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) throws IOException { + Scan scan = new Scan(getRandomRow(this.rand, this.totalRows)); + scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + scan.setFilter(new WhileMatchFilter(new PageFilter(120))); + ResultScanner s = this.table.getScanner(scan); + //int count = 0; + for (Result rr = null; (rr = s.next()) != null;) { + // LOG.info("" + count++ + " " + rr.toString()); + } + s.close(); + } + + @Override + protected int getReportingPeriod() { + int period = this.perClientRunRows / 100; + return period == 0? this.perClientRunRows: period; + } + + } + + @SuppressWarnings("unused") + static abstract class RandomScanWithRangeTest extends Test { + RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) throws IOException { + Pair startAndStopRow = getStartAndStopRow(); + Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond()); + scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + ResultScanner s = this.table.getScanner(scan); + int count = 0; + for (Result rr = null; (rr = s.next()) != null;) { + count++; + } + + if (i % 100 == 0) { + LOG.info(String.format("Scan for key range %s - %s returned %s rows", + Bytes.toString(startAndStopRow.getFirst()), + Bytes.toString(startAndStopRow.getSecond()), count)); + } + + s.close(); + } + + protected abstract Pair getStartAndStopRow(); + + protected Pair generateStartAndStopRows(int maxRange) { + int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows; + int stop = start + maxRange; + return new Pair(format(start), format(stop)); + } + + @Override + protected int getReportingPeriod() { + int period = this.perClientRunRows / 100; + return period == 0? this.perClientRunRows: period; + } + } + + static class RandomScanWithRange10Test extends RandomScanWithRangeTest { + RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + protected Pair getStartAndStopRow() { + return generateStartAndStopRows(10); + } + } + + static class RandomScanWithRange100Test extends RandomScanWithRangeTest { + RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + protected Pair getStartAndStopRow() { + return generateStartAndStopRows(100); + } + } + + static class RandomScanWithRange1000Test extends RandomScanWithRangeTest { + RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + protected Pair getStartAndStopRow() { + return generateStartAndStopRows(1000); + } + } + + static class RandomScanWithRange10000Test extends RandomScanWithRangeTest { + RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + protected Pair getStartAndStopRow() { + return generateStartAndStopRows(10000); + } + } + + static class RandomReadTest extends Test { + RandomReadTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) throws IOException { + Get get = new Get(getRandomRow(this.rand, this.totalRows)); + get.addColumn(FAMILY_NAME, QUALIFIER_NAME); + this.table.get(get); + } + + @Override + protected int getReportingPeriod() { + int period = this.perClientRunRows / 100; + return period == 0? this.perClientRunRows: period; + } + + } + + static class RandomWriteTest extends Test { + int rowsPerPut; + + RandomWriteTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + rowsPerPut = options.getRowsPerPut(); + } + + @Override + void testTimed() throws IOException { + int lastRow = this.startRow + this.perClientRunRows; + // Report on completion of 1/10th of total. + List puts = new ArrayList(); + for (int i = this.startRow; i < lastRow; i += rowsPerPut) { + for (int j = 0; j < rowsPerPut; j++) { + byte [] row = getRandomRow(this.rand, this.totalRows); + Put put = new Put(row); + byte[] value = generateValue(this.rand); + put.add(FAMILY_NAME, QUALIFIER_NAME, value); + puts.add(put); + if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { + status.setStatus(generateStatus(this.startRow, i, lastRow)); + } + } + table.put(puts); + } + } + } + + static class ScanTest extends Test { + private ResultScanner testScanner; + + ScanTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testSetup() throws IOException { + super.testSetup(); + } + + @Override + void testTakedown() throws IOException { + if (this.testScanner != null) { + this.testScanner.close(); + } + super.testTakedown(); + } + + + @Override + void testRow(final int i) throws IOException { + if (this.testScanner == null) { + Scan scan = new Scan(format(this.startRow)); + scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + this.testScanner = table.getScanner(scan); + } + testScanner.next(); + } + + } + + static class SequentialReadTest extends Test { + SequentialReadTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(final int i) throws IOException { + Get get = new Get(format(i)); + get.addColumn(FAMILY_NAME, QUALIFIER_NAME); + table.get(get); + } + + } + + static class SequentialWriteTest extends Test { + int rowsPerPut; + + SequentialWriteTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + rowsPerPut = options.getRowsPerPut(); + } + + @Override + void testTimed() throws IOException { + int lastRow = this.startRow + this.perClientRunRows; + // Report on completion of 1/10th of total. + List puts = new ArrayList(); + for (int i = this.startRow; i < lastRow; i += rowsPerPut) { + for (int j = 0; j < rowsPerPut; j++) { + Put put = new Put(format(i + j)); + byte[] value = generateValue(this.rand); + put.add(FAMILY_NAME, QUALIFIER_NAME, value); + puts.add(put); + if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { + status.setStatus(generateStatus(this.startRow, i, lastRow)); + } + } + table.put(puts); + } + } + } + + static class FilteredScanTest extends Test { + protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); + + FilteredScanTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + @Override + void testRow(int i) throws IOException { + byte[] value = generateValue(this.rand); + Scan scan = constructScan(value); + ResultScanner scanner = null; + try { + scanner = this.table.getScanner(scan); + while (scanner.next() != null) { + } + } finally { + if (scanner != null) scanner.close(); + } + } + + protected Scan constructScan(byte[] valuePrefix) throws IOException { + Filter filter = new SingleColumnValueFilter( + FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL, + new BinaryComparator(valuePrefix) + ); + Scan scan = new Scan(); + scan.addColumn(FAMILY_NAME, QUALIFIER_NAME); + scan.setFilter(filter); + return scan; + } + } + + /* + * Format passed integer. + * @param number + * @return Returns zero-prefixed 10-byte wide decimal version of passed + * number (Does absolute in case number is negative). + */ + public static byte [] format(final int number) { + byte [] b = new byte[10]; + int d = Math.abs(number); + for (int i = b.length - 1; i >= 0; i--) { + b[i] = (byte)((d % 10) + '0'); + d /= 10; + } + return b; + } + + /* + * This method takes some time and is done inline uploading data. For + * example, doing the mapfile test, generation of the key and value + * consumes about 30% of CPU time. + * @return Generated random value to insert into a table cell. + */ + public static byte[] generateValue(final Random r) { + byte [] b = new byte [ROW_LENGTH]; + r.nextBytes(b); + return b; + } + + static byte [] getRandomRow(final Random random, final int totalRows) { + return format(random.nextInt(Integer.MAX_VALUE) % totalRows); + } + + long runOneClient(final Class cmd, final int startRow, + final int perClientRunRows, final int totalRows, + final int rowsPerPut, final Status status) + throws IOException { + status.setStatus("Start " + cmd + " at offset " + startRow + " for " + + perClientRunRows + " rows"); + long totalElapsedTime = 0; + + Test t = null; + TestOptions options = new TestOptions(startRow, perClientRunRows, + totalRows, getTableDescriptor().getName(), rowsPerPut); + try { + Constructor constructor = cmd.getDeclaredConstructor( + Configuration.class, TestOptions.class, Status.class); + t = constructor.newInstance(this.conf, options, status); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Invalid command class: " + + cmd.getName() + ". It does not provide a constructor as described by" + + "the javadoc comment. Available constructors are: " + + Arrays.toString(cmd.getConstructors())); + } catch (Exception e) { + throw new IllegalStateException("Failed to construct command class", e); + } + totalElapsedTime = t.test(); + + status.setStatus("Finished " + cmd + " in " + totalElapsedTime + + "ms at offset " + startRow + " for " + perClientRunRows + " rows"); + return totalElapsedTime; + } + + private void runNIsOne(final Class cmd) { + Status status = new Status() { + public void setStatus(String msg) throws IOException { + LOG.info(msg); + } + }; + + try { + checkTable(); + runOneClient(cmd, 0, R, R, B, status); + } catch (Exception e) { + LOG.error("Failed", e); + } + } + + private void runTest(final Class cmd) throws IOException, + InterruptedException, ClassNotFoundException { + if (N == 1) { + // If there is only one client and one HRegionServer, we assume nothing + // has been set up at all. + runNIsOne(cmd); + } else { + // Else, run + runNIsMoreThanOne(cmd); + } + } + + protected void printUsage() { + printUsage(null); + } + + protected void printUsage(final String message) { + if (message != null && message.length() > 0) { + System.err.println(message); + } + System.err.println("Usage: java " + this.getClass().getName() + " \\"); + System.err.println(" [--option] [--option=value] "); + System.err.println(); + System.err.println("Options:"); + System.err.println(" host String. Specify Stargate endpoint."); + System.err.println(" token String. API access token."); + System.err.println(" rows Integer. Rows each client runs. Default: One million"); + System.err.println(" rowsPerPut Integer. Rows each Stargate (multi)Put. Default: 100"); + System.err.println(" nomapred (Flag) Run multiple clients using threads " + + "(rather than use mapreduce)"); + System.err.println(); + System.err.println("Command:"); + for (CmdDescriptor command : commands.values()) { + System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription())); + } + System.err.println(); + System.err.println("Args:"); + System.err.println(" nclients Integer. Required. Total number of " + + "clients (and HRegionServers)"); + System.err.println(" running: 1 <= value <= 500"); + System.err.println("Examples:"); + System.err.println(" To run a single evaluation client:"); + System.err.println(" $ bin/hbase " + this.getClass().getName() + + " sequentialWrite 1"); + } + + private void getArgs(final int start, final String[] args) { + if(start + 1 > args.length) { + throw new IllegalArgumentException("must supply the number of clients"); + } + N = Integer.parseInt(args[start]); + if (N < 1) { + throw new IllegalArgumentException("Number of clients must be > 1"); + } + // Set total number of rows to write. + R = R * N; + } + + public int doCommandLine(final String[] args) { + // Process command-line args. TODO: Better cmd-line processing + // (but hopefully something not as painful as cli options). + int errCode = -1; + if (args.length < 1) { + printUsage(); + return errCode; + } + + try { + for (int i = 0; i < args.length; i++) { + String cmd = args[i]; + if (cmd.equals("-h")) { + printUsage(); + errCode = 0; + break; + } + + final String nmr = "--nomapred"; + if (cmd.startsWith(nmr)) { + nomapred = true; + continue; + } + + final String rows = "--rows="; + if (cmd.startsWith(rows)) { + R = Integer.parseInt(cmd.substring(rows.length())); + continue; + } + + final String rowsPerPut = "--rowsPerPut="; + if (cmd.startsWith(rowsPerPut)) { + this.B = Integer.parseInt(cmd.substring(rowsPerPut.length())); + continue; + } + + final String host = "--host="; + if (cmd.startsWith(host)) { + cluster.add(cmd.substring(host.length())); + continue; + } + + final String token = "--token="; + if (cmd.startsWith(token)) { + accessToken = cmd.substring(token.length()); + continue; + } + + Class cmdClass = determineCommandClass(cmd); + if (cmdClass != null) { + getArgs(i + 1, args); + if (cluster.isEmpty()) { + String s = conf.get("stargate.hostname", "localhost"); + if (s.contains(":")) { + cluster.add(s); + } else { + cluster.add(s, conf.getInt("stargate.port", 8080)); + } + } + runTest(cmdClass); + errCode = 0; + break; + } + + printUsage(); + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + + return errCode; + } + + private Class determineCommandClass(String cmd) { + CmdDescriptor descriptor = commands.get(cmd); + return descriptor != null ? descriptor.getCmdClass() : null; + } + + /** + * @param args + */ + public static void main(final String[] args) { + Configuration c = HBaseConfiguration.create(); + System.exit(new PerformanceEvaluation(c).doCommandLine(args)); + } +} diff --git a/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/client/TestRemoteAdmin.java b/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/client/TestRemoteAdmin.java new file mode 100644 index 00000000000..96a198fb999 --- /dev/null +++ b/contrib/stargate/core/src/test/java/org/apache/hadoop/hbase/stargate/client/TestRemoteAdmin.java @@ -0,0 +1,84 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.stargate.client; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.stargate.MiniClusterTestBase; +import org.apache.hadoop.hbase.stargate.client.Client; +import org.apache.hadoop.hbase.util.Bytes; + +public class TestRemoteAdmin extends MiniClusterTestBase { + + static final String TABLE_1 = "TestRemoteAdmin_Table_1"; + static final String TABLE_2 = "TestRemoteAdmin_Table_2"; + static final byte[] COLUMN_1 = Bytes.toBytes("a"); + + static final HTableDescriptor DESC_1; + static { + DESC_1 = new HTableDescriptor(TABLE_1); + DESC_1.addFamily(new HColumnDescriptor(COLUMN_1)); + } + static final HTableDescriptor DESC_2; + static { + DESC_2 = new HTableDescriptor(TABLE_2); + DESC_2.addFamily(new HColumnDescriptor(COLUMN_1)); + } + + Client client; + HBaseAdmin localAdmin; + RemoteAdmin remoteAdmin; + + @Override + protected void setUp() throws Exception { + super.setUp(); + localAdmin = new HBaseAdmin(conf); + remoteAdmin = new RemoteAdmin(new Client( + new Cluster().add("localhost", testServletPort)), + conf); + if (localAdmin.tableExists(TABLE_1)) { + localAdmin.disableTable(TABLE_1); + localAdmin.deleteTable(TABLE_1); + } + if (!localAdmin.tableExists(TABLE_2)) { + localAdmin.createTable(DESC_2); + } + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + public void testCreateTable() throws Exception { + assertFalse(remoteAdmin.isTableAvailable(TABLE_1)); + remoteAdmin.createTable(DESC_1); + assertTrue(remoteAdmin.isTableAvailable(TABLE_1)); + } + + public void testDeleteTable() throws Exception { + assertTrue(remoteAdmin.isTableAvailable(TABLE_2)); + remoteAdmin.deleteTable(TABLE_2); + assertFalse(remoteAdmin.isTableAvailable(TABLE_2)); + } + +}