HBASE-2412 [stargate] PerformanceEvaluation; and related fixes

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@931038 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2010-04-06 07:07:44 +00:00
parent 6baf69a757
commit 4c5eec3968
23 changed files with 2057 additions and 261 deletions

View File

@ -484,6 +484,7 @@ Release 0.21.0 - Unreleased
HBASE-2087 The wait on compaction because "Too many store files" HBASE-2087 The wait on compaction because "Too many store files"
holds up all flushing holds up all flushing
HBASE-2252 Mapping a very big table kills region servers HBASE-2252 Mapping a very big table kills region servers
HBASE-2412 [stargate] PerformanceEvaluation
NEW FEATURES NEW FEATURES
HBASE-1961 HBase EC2 scripts HBASE-1961 HBase EC2 scripts

View File

@ -22,6 +22,28 @@
<commons-httpclient.version>3.0.1</commons-httpclient.version> <commons-httpclient.version>3.0.1</commons-httpclient.version>
</properties> </properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<mainClass>org/apache/hadoop/hbase/stargate/PerformanceEvaluation</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<dependencies> <dependencies>
<dependency> <dependency>

View File

@ -0,0 +1,79 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.stargate;
import java.io.IOException;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import javax.ws.rs.core.Response.ResponseBuilder;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class ExistsResource implements Constants {
User user;
String tableName;
String actualTableName;
CacheControl cacheControl;
RESTServlet servlet;
public ExistsResource(User user, String table) throws IOException {
if (user != null) {
this.user = user;
this.actualTableName =
!user.isAdmin() ? (user.getName() + "." + table) : table;
} else {
this.actualTableName = table;
}
this.tableName = table;
servlet = RESTServlet.getInstance();
cacheControl = new CacheControl();
cacheControl.setNoCache(true);
cacheControl.setNoTransform(false);
}
@GET
@Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF,
MIMETYPE_BINARY})
public Response get(final @Context UriInfo uriInfo) throws IOException {
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
try {
HBaseAdmin admin = new HBaseAdmin(servlet.getConfiguration());
if (!admin.tableExists(actualTableName)) {
throw new WebApplicationException(Response.Status.NOT_FOUND);
}
} catch (IOException e) {
throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE);
}
ResponseBuilder response = Response.ok();
response.cacheControl(cacheControl);
return response.build();
}
}

View File

@ -20,11 +20,12 @@
package org.apache.hadoop.hbase.stargate; package org.apache.hadoop.hbase.stargate;
import java.net.InetAddress;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser; import org.apache.commons.cli.PosixParser;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.jetty.servlet.ServletHolder;
@ -64,6 +65,18 @@ public class Main implements Constants {
sh.setInitParameter("com.sun.jersey.config.property.packages", sh.setInitParameter("com.sun.jersey.config.property.packages",
"jetty"); "jetty");
// configure the Stargate singleton
RESTServlet servlet = RESTServlet.getInstance();
port = servlet.getConfiguration().getInt("stargate.port", port);
if (!servlet.isMultiUser()) {
servlet.setMultiUser(cmd.hasOption("m"));
}
servlet.addConnectorAddress(
servlet.getConfiguration().get("stargate.hostname",
InetAddress.getLocalHost().getCanonicalHostName()),
port);
// set up Jetty and run the embedded server // set up Jetty and run the embedded server
Server server = new Server(port); Server server = new Server(port);
@ -74,14 +87,6 @@ public class Main implements Constants {
Context context = new Context(server, "/", Context.SESSIONS); Context context = new Context(server, "/", Context.SESSIONS);
context.addServlet(sh, "/*"); context.addServlet(sh, "/*");
// configure the Stargate singleton
RESTServlet servlet = RESTServlet.getInstance();
servlet.setMultiUser(cmd.hasOption("m"));
for (Connector conn: server.getConnectors()) {
servlet.addConnectorAddress(conn.getHost(), conn.getLocalPort());
}
server.start(); server.start();
server.join(); server.join();
} }

View File

@ -193,6 +193,10 @@ public class RESTServlet extends ServletAdaptor
this.statusReporter = new StatusReporter( this.statusReporter = new StatusReporter(
conf.getInt(STATUS_REPORT_PERIOD_KEY, 1000 * 60), stopping); conf.getInt(STATUS_REPORT_PERIOD_KEY, 1000 * 60), stopping);
this.multiuser = conf.getBoolean("stargate.multiuser", false); this.multiuser = conf.getBoolean("stargate.multiuser", false);
if (this.multiuser) {
LOG.info("multiuser mode enabled");
getAuthenticator();
}
} }
@Override @Override
@ -321,6 +325,7 @@ public class RESTServlet extends ServletAdaptor
if (authenticator == null) { if (authenticator == null) {
authenticator = new HBCAuthenticator(conf); authenticator = new HBCAuthenticator(conf);
} }
LOG.info("using authenticator " + authenticator);
} }
return authenticator; return authenticator;
} }
@ -341,6 +346,7 @@ public class RESTServlet extends ServletAdaptor
*/ */
public boolean userRequestLimit(final User user, int want) public boolean userRequestLimit(final User user, int want)
throws IOException { throws IOException {
if (multiuser) {
UserData ud = SoftUserData.get(user); UserData ud = SoftUserData.get(user);
HTableTokenBucket tb = (HTableTokenBucket) ud.get(UserData.TOKENBUCKET); HTableTokenBucket tb = (HTableTokenBucket) ud.get(UserData.TOKENBUCKET);
if (tb == null) { if (tb == null) {
@ -351,6 +357,7 @@ public class RESTServlet extends ServletAdaptor
return false; return false;
} }
tb.remove(want); tb.remove(want);
}
return true; return true;
} }

View File

@ -35,10 +35,12 @@ import javax.ws.rs.core.Response.ResponseBuilder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.stargate.User; import org.apache.hadoop.hbase.stargate.User;
import org.apache.hadoop.hbase.stargate.model.TableInfoModel; import org.apache.hadoop.hbase.stargate.model.TableInfoModel;
@ -48,18 +50,20 @@ public class RegionsResource implements Constants {
private static final Log LOG = LogFactory.getLog(RegionsResource.class); private static final Log LOG = LogFactory.getLog(RegionsResource.class);
User user; User user;
String table; String tableName;
String actualTableName;
CacheControl cacheControl; CacheControl cacheControl;
RESTServlet servlet; RESTServlet servlet;
public RegionsResource(User user, String table) throws IOException { public RegionsResource(User user, String table) throws IOException {
if (user != null) { if (user != null) {
if (!user.isAdmin()) {
throw new WebApplicationException(Response.Status.FORBIDDEN);
}
this.user = user; this.user = user;
this.actualTableName =
!user.isAdmin() ? (user.getName() + "." + table) : table;
} else {
this.actualTableName = table;
} }
this.table = table; this.tableName = table;
cacheControl = new CacheControl(); cacheControl = new CacheControl();
cacheControl.setNoCache(true); cacheControl.setNoCache(true);
cacheControl.setNoTransform(false); cacheControl.setNoTransform(false);
@ -69,9 +73,9 @@ public class RegionsResource implements Constants {
private Map<HRegionInfo,HServerAddress> getTableRegions() private Map<HRegionInfo,HServerAddress> getTableRegions()
throws IOException { throws IOException {
HTablePool pool = servlet.getTablePool(); HTablePool pool = servlet.getTablePool();
HTable table = (HTable) pool.getTable(this.table); HTableInterface table = pool.getTable(actualTableName);
try { try {
return table.getRegionsInfo(); return ((HTable)table).getRegionsInfo();
} finally { } finally {
pool.putTable(table); pool.putTable(table);
} }
@ -79,22 +83,32 @@ public class RegionsResource implements Constants {
@GET @GET
@Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) @Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
public Response get(final @Context UriInfo uriInfo) { public Response get(final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("GET " + uriInfo.getAbsolutePath()); LOG.debug("GET " + uriInfo.getAbsolutePath());
} }
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
servlet.getMetrics().incrementRequests(1); servlet.getMetrics().incrementRequests(1);
try { try {
TableInfoModel model = new TableInfoModel(table); String name = user.isAdmin() ? actualTableName : tableName;
TableInfoModel model = new TableInfoModel(name);
Map<HRegionInfo,HServerAddress> regions = getTableRegions(); Map<HRegionInfo,HServerAddress> regions = getTableRegions();
for (Map.Entry<HRegionInfo,HServerAddress> e: regions.entrySet()) { for (Map.Entry<HRegionInfo,HServerAddress> e: regions.entrySet()) {
HRegionInfo hri = e.getKey(); HRegionInfo hri = e.getKey();
if (user.isAdmin()) {
HServerAddress addr = e.getValue(); HServerAddress addr = e.getValue();
InetSocketAddress sa = addr.getInetSocketAddress(); InetSocketAddress sa = addr.getInetSocketAddress();
model.add( model.add(
new TableRegionModel(table, hri.getRegionId(), hri.getStartKey(), new TableRegionModel(name, hri.getRegionId(), hri.getStartKey(),
hri.getEndKey(), hri.getEndKey(),
sa.getHostName() + ":" + Integer.valueOf(sa.getPort()))); sa.getHostName() + ":" + Integer.valueOf(sa.getPort())));
} else {
model.add(
new TableRegionModel(name, hri.getRegionId(), hri.getStartKey(),
hri.getEndKey()));
}
} }
ResponseBuilder response = Response.ok(model); ResponseBuilder response = Response.ok(model);
response.cacheControl(cacheControl); response.cacheControl(cacheControl);

View File

@ -121,7 +121,7 @@ public class RootResource implements Constants {
if (servlet.isMultiUser()) { if (servlet.isMultiUser()) {
throw new WebApplicationException(Response.Status.BAD_REQUEST); throw new WebApplicationException(Response.Status.BAD_REQUEST);
} }
return new StorageClusterStatusResource(); return new StorageClusterStatusResource(User.DEFAULT_USER);
} }
@Path("version") @Path("version")
@ -135,7 +135,7 @@ public class RootResource implements Constants {
if (servlet.isMultiUser()) { if (servlet.isMultiUser()) {
User user = auth(token); User user = auth(token);
if (!servlet.userRequestLimit(user, 1)) { if (!servlet.userRequestLimit(user, 1)) {
throw new WebApplicationException(Response.status(509).build()); return Response.status(509).build();
} }
try { try {
ResponseBuilder response = Response.ok(getTableListForUser(user)); ResponseBuilder response = Response.ok(getTableListForUser(user));
@ -154,11 +154,8 @@ public class RootResource implements Constants {
final @PathParam("token") String token) throws IOException { final @PathParam("token") String token) throws IOException {
if (servlet.isMultiUser()) { if (servlet.isMultiUser()) {
User user = auth(token); User user = auth(token);
if (user.isAdmin()) { if (user != null && user.isAdmin()) {
if (!servlet.userRequestLimit(user, 1)) { return new StorageClusterStatusResource(user);
throw new WebApplicationException(Response.status(509).build());
}
return new StorageClusterStatusResource();
} }
throw new WebApplicationException(Response.Status.FORBIDDEN); throw new WebApplicationException(Response.Status.FORBIDDEN);
} }
@ -185,7 +182,7 @@ public class RootResource implements Constants {
if (servlet.isMultiUser()) { if (servlet.isMultiUser()) {
throw new WebApplicationException(Response.Status.BAD_REQUEST); throw new WebApplicationException(Response.Status.BAD_REQUEST);
} }
return new TableResource(null, table); return new TableResource(User.DEFAULT_USER, table);
} }
} }

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.stargate.User; import org.apache.hadoop.hbase.stargate.User;
import org.apache.hadoop.hbase.stargate.model.CellModel; import org.apache.hadoop.hbase.stargate.model.CellModel;
@ -79,16 +80,19 @@ public class RowResource implements Constants {
} }
this.servlet = RESTServlet.getInstance(); this.servlet = RESTServlet.getInstance();
cacheControl = new CacheControl(); cacheControl = new CacheControl();
cacheControl.setMaxAge(servlet.getMaxAge(table)); cacheControl.setMaxAge(servlet.getMaxAge(actualTableName));
cacheControl.setNoTransform(false); cacheControl.setNoTransform(false);
} }
@GET @GET
@Produces({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) @Produces({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
public Response get(final @Context UriInfo uriInfo) { public Response get(final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("GET " + uriInfo.getAbsolutePath()); LOG.debug("GET " + uriInfo.getAbsolutePath());
} }
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
servlet.getMetrics().incrementRequests(1); servlet.getMetrics().incrementRequests(1);
try { try {
ResultGenerator generator = ResultGenerator generator =
@ -127,10 +131,14 @@ public class RowResource implements Constants {
@GET @GET
@Produces(MIMETYPE_BINARY) @Produces(MIMETYPE_BINARY)
public Response getBinary(final @Context UriInfo uriInfo) { public Response getBinary(final @Context UriInfo uriInfo)
throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("GET " + uriInfo.getAbsolutePath() + " as "+ MIMETYPE_BINARY); LOG.debug("GET " + uriInfo.getAbsolutePath() + " as "+ MIMETYPE_BINARY);
} }
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
servlet.getMetrics().incrementRequests(1); servlet.getMetrics().incrementRequests(1);
// doesn't make sense to use a non specific coordinate as this can only // doesn't make sense to use a non specific coordinate as this can only
// return a single cell // return a single cell
@ -166,6 +174,7 @@ public class RowResource implements Constants {
throw new WebApplicationException(Response.status(509).build()); throw new WebApplicationException(Response.status(509).build());
} }
table = pool.getTable(actualTableName); table = pool.getTable(actualTableName);
((HTable)table).setAutoFlush(false);
for (RowModel row: rows) { for (RowModel row: rows) {
byte[] key = row.getKey(); byte[] key = row.getKey();
Put put = new Put(key); Put put = new Put(key);
@ -182,6 +191,7 @@ public class RowResource implements Constants {
LOG.debug("PUT " + put.toString()); LOG.debug("PUT " + put.toString());
} }
} }
((HTable)table).setAutoFlush(true);
table.flushCommits(); table.flushCommits();
ResponseBuilder response = Response.ok(); ResponseBuilder response = Response.ok();
return response.build(); return response.build();
@ -236,7 +246,6 @@ public class RowResource implements Constants {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("PUT " + put.toString()); LOG.debug("PUT " + put.toString());
} }
table.flushCommits();
return Response.ok().build(); return Response.ok().build();
} catch (IOException e) { } catch (IOException e) {
throw new WebApplicationException(e, throw new WebApplicationException(e,
@ -251,10 +260,13 @@ public class RowResource implements Constants {
@PUT @PUT
@Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
public Response put(final CellSetModel model, public Response put(final CellSetModel model,
final @Context UriInfo uriInfo) { final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("PUT " + uriInfo.getAbsolutePath()); LOG.debug("PUT " + uriInfo.getAbsolutePath());
} }
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
return update(model, true); return update(model, true);
} }
@ -262,38 +274,52 @@ public class RowResource implements Constants {
@Consumes(MIMETYPE_BINARY) @Consumes(MIMETYPE_BINARY)
public Response putBinary(final byte[] message, public Response putBinary(final byte[] message,
final @Context UriInfo uriInfo, final @Context HttpHeaders headers) final @Context UriInfo uriInfo, final @Context HttpHeaders headers)
throws IOException
{ {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("PUT " + uriInfo.getAbsolutePath() + " as "+ MIMETYPE_BINARY); LOG.debug("PUT " + uriInfo.getAbsolutePath() + " as "+ MIMETYPE_BINARY);
} }
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
return updateBinary(message, headers, true); return updateBinary(message, headers, true);
} }
@POST @POST
@Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
public Response post(final CellSetModel model, public Response post(final CellSetModel model,
final @Context UriInfo uriInfo) { final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("POST " + uriInfo.getAbsolutePath()); LOG.debug("POST " + uriInfo.getAbsolutePath());
} }
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
return update(model, false); return update(model, false);
} }
@POST @POST
@Consumes(MIMETYPE_BINARY) @Consumes(MIMETYPE_BINARY)
public Response postBinary(final byte[] message, public Response postBinary(final byte[] message,
final @Context UriInfo uriInfo, final @Context HttpHeaders headers) { final @Context UriInfo uriInfo, final @Context HttpHeaders headers)
throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("POST " + uriInfo.getAbsolutePath() + " as "+MIMETYPE_BINARY); LOG.debug("POST " + uriInfo.getAbsolutePath() + " as "+MIMETYPE_BINARY);
} }
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
return updateBinary(message, headers, false); return updateBinary(message, headers, false);
} }
@DELETE @DELETE
public Response delete(final @Context UriInfo uriInfo) { public Response delete(final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("DELETE " + uriInfo.getAbsolutePath()); LOG.debug("DELETE " + uriInfo.getAbsolutePath());
} }
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
servlet.getMetrics().incrementRequests(1); servlet.getMetrics().incrementRequests(1);
Delete delete = null; Delete delete = null;
if (rowspec.hasTimestamp()) if (rowspec.hasTimestamp())
@ -325,7 +351,6 @@ public class RowResource implements Constants {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("DELETE " + delete.toString()); LOG.debug("DELETE " + delete.toString());
} }
table.flushCommits();
} catch (IOException e) { } catch (IOException e) {
throw new WebApplicationException(e, throw new WebApplicationException(e,
Response.Status.SERVICE_UNAVAILABLE); Response.Status.SERVICE_UNAVAILABLE);

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.stargate;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -49,7 +50,7 @@ public class ScannerResource implements Constants {
private static final Log LOG = LogFactory.getLog(ScannerResource.class); private static final Log LOG = LogFactory.getLog(ScannerResource.class);
static final Map<String,ScannerInstanceResource> scanners = static final Map<String,ScannerInstanceResource> scanners =
new HashMap<String,ScannerInstanceResource>(); Collections.synchronizedMap(new HashMap<String,ScannerInstanceResource>());
User user; User user;
String tableName; String tableName;
@ -69,16 +70,17 @@ public class ScannerResource implements Constants {
} }
static void delete(final String id) { static void delete(final String id) {
synchronized (scanners) {
ScannerInstanceResource instance = scanners.remove(id); ScannerInstanceResource instance = scanners.remove(id);
if (instance != null) { if (instance != null) {
instance.generator.close(); instance.generator.close();
} }
} }
}
Response update(final ScannerModel model, final boolean replace, Response update(final ScannerModel model, final boolean replace,
final UriInfo uriInfo) { final UriInfo uriInfo) throws IOException {
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
servlet.getMetrics().incrementRequests(1); servlet.getMetrics().incrementRequests(1);
byte[] endRow = model.hasEndRow() ? model.getEndRow() : null; byte[] endRow = model.hasEndRow() ? model.getEndRow() : null;
RowSpec spec = new RowSpec(model.getStartRow(), endRow, RowSpec spec = new RowSpec(model.getStartRow(), endRow,
@ -91,9 +93,7 @@ public class ScannerResource implements Constants {
ScannerInstanceResource instance = ScannerInstanceResource instance =
new ScannerInstanceResource(user, actualTableName, id, gen, new ScannerInstanceResource(user, actualTableName, id, gen,
model.getBatch()); model.getBatch());
synchronized (scanners) {
scanners.put(id, instance); scanners.put(id, instance);
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("new scanner: " + id); LOG.debug("new scanner: " + id);
} }
@ -111,7 +111,7 @@ public class ScannerResource implements Constants {
@PUT @PUT
@Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
public Response put(final ScannerModel model, public Response put(final ScannerModel model,
final @Context UriInfo uriInfo) { final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("PUT " + uriInfo.getAbsolutePath()); LOG.debug("PUT " + uriInfo.getAbsolutePath());
} }
@ -121,7 +121,7 @@ public class ScannerResource implements Constants {
@POST @POST
@Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
public Response post(final ScannerModel model, public Response post(final ScannerModel model,
final @Context UriInfo uriInfo) { final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("POST " + uriInfo.getAbsolutePath()); LOG.debug("POST " + uriInfo.getAbsolutePath());
} }
@ -131,13 +131,11 @@ public class ScannerResource implements Constants {
@Path("{scanner: .+}") @Path("{scanner: .+}")
public ScannerInstanceResource getScannerInstanceResource( public ScannerInstanceResource getScannerInstanceResource(
final @PathParam("scanner") String id) { final @PathParam("scanner") String id) {
synchronized (scanners) {
ScannerInstanceResource instance = scanners.get(id); ScannerInstanceResource instance = scanners.get(id);
if (instance == null) { if (instance == null) {
throw new WebApplicationException(Response.Status.NOT_FOUND); throw new WebApplicationException(Response.Status.NOT_FOUND);
} }
return instance; return instance;
} }
}
} }

View File

@ -35,10 +35,12 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo; import javax.ws.rs.core.UriInfo;
import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.ResponseBuilder;
import javax.xml.namespace.QName; import javax.xml.namespace.QName;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableExistsException;
@ -46,7 +48,6 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.stargate.User; import org.apache.hadoop.hbase.stargate.User;
import org.apache.hadoop.hbase.stargate.model.ColumnSchemaModel; import org.apache.hadoop.hbase.stargate.model.ColumnSchemaModel;
import org.apache.hadoop.hbase.stargate.model.TableSchemaModel; import org.apache.hadoop.hbase.stargate.model.TableSchemaModel;
@ -89,31 +90,17 @@ public class SchemaResource implements Constants {
@GET @GET
@Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) @Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
public Response get(final @Context UriInfo uriInfo) { public Response get(final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("GET " + uriInfo.getAbsolutePath()); LOG.debug("GET " + uriInfo.getAbsolutePath());
} }
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
servlet.getMetrics().incrementRequests(1); servlet.getMetrics().incrementRequests(1);
try { try {
HTableDescriptor htd = getTableSchema(); ResponseBuilder response =
TableSchemaModel model = new TableSchemaModel(); Response.ok(new TableSchemaModel(getTableSchema()));
model.setName(tableName);
for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e:
htd.getValues().entrySet()) {
model.addAttribute(Bytes.toString(e.getKey().get()),
Bytes.toString(e.getValue().get()));
}
for (HColumnDescriptor hcd: htd.getFamilies()) {
ColumnSchemaModel columnModel = new ColumnSchemaModel();
columnModel.setName(hcd.getNameAsString());
for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e:
hcd.getValues().entrySet()) {
columnModel.addAttribute(Bytes.toString(e.getKey().get()),
Bytes.toString(e.getValue().get()));
}
model.addColumnFamily(columnModel);
}
ResponseBuilder response = Response.ok(model);
response.cacheControl(cacheControl); response.cacheControl(cacheControl);
return response.build(); return response.build();
} catch (TableNotFoundException e) { } catch (TableNotFoundException e) {
@ -206,46 +193,52 @@ public class SchemaResource implements Constants {
@PUT @PUT
@Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
public Response put(final TableSchemaModel model, public Response put(final TableSchemaModel model,
final @Context UriInfo uriInfo) { final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("PUT " + uriInfo.getAbsolutePath()); LOG.debug("PUT " + uriInfo.getAbsolutePath());
} }
servlet.getMetrics().incrementRequests(1); if (!servlet.userRequestLimit(user, 1)) {
// use the name given in the path, but warn if the name on the path and return Response.status(509).build();
// the name in the schema are different
if (model.getName() != tableName) {
LOG.warn("table name mismatch: path='" + tableName + "', schema='" +
model.getName() + "'");
} }
servlet.getMetrics().incrementRequests(1);
return update(model, true, uriInfo); return update(model, true, uriInfo);
} }
@POST @POST
@Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) @Consumes({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
public Response post(final TableSchemaModel model, public Response post(final TableSchemaModel model,
final @Context UriInfo uriInfo) { final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("PUT " + uriInfo.getAbsolutePath()); LOG.debug("PUT " + uriInfo.getAbsolutePath());
} }
servlet.getMetrics().incrementRequests(1); if (!servlet.userRequestLimit(user, 1)) {
// use the name given in the path, but warn if the name on the path and return Response.status(509).build();
// the name in the schema are different
if (model.getName() != tableName) {
LOG.warn("table name mismatch: path='" + tableName + "', schema='" +
model.getName() + "'");
} }
servlet.getMetrics().incrementRequests(1);
return update(model, false, uriInfo); return update(model, false, uriInfo);
} }
@DELETE @DELETE
public Response delete(final @Context UriInfo uriInfo) { public Response delete(final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("DELETE " + uriInfo.getAbsolutePath()); LOG.debug("DELETE " + uriInfo.getAbsolutePath());
} }
if (!servlet.userRequestLimit(user, 1)) {
return Response.status(509).build();
}
servlet.getMetrics().incrementRequests(1); servlet.getMetrics().incrementRequests(1);
try { try {
HBaseAdmin admin = new HBaseAdmin(servlet.getConfiguration()); HBaseAdmin admin = new HBaseAdmin(servlet.getConfiguration());
boolean success = false;
for (int i = 0; i < 10; i++) try {
admin.disableTable(actualTableName); admin.disableTable(actualTableName);
success = true;
break;
} catch (IOException e) {
}
if (!success) {
throw new IOException("could not disable table");
}
admin.deleteTable(actualTableName); admin.deleteTable(actualTableName);
return Response.ok().build(); return Response.ok().build();
} catch (TableNotFoundException e) { } catch (TableNotFoundException e) {

View File

@ -44,22 +44,27 @@ public class StorageClusterStatusResource implements Constants {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(StorageClusterStatusResource.class); LogFactory.getLog(StorageClusterStatusResource.class);
private User user;
private CacheControl cacheControl; private CacheControl cacheControl;
private RESTServlet servlet; private RESTServlet servlet;
public StorageClusterStatusResource() throws IOException { public StorageClusterStatusResource(User user) throws IOException {
servlet = RESTServlet.getInstance(); this.user = user;
cacheControl = new CacheControl(); this.servlet = RESTServlet.getInstance();
cacheControl.setNoCache(true); this.cacheControl = new CacheControl();
cacheControl.setNoTransform(false); this.cacheControl.setNoCache(true);
this.cacheControl.setNoTransform(false);
} }
@GET @GET
@Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF}) @Produces({MIMETYPE_TEXT, MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
public Response get(final @Context UriInfo uriInfo) { public Response get(final @Context UriInfo uriInfo) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("GET " + uriInfo.getAbsolutePath()); LOG.debug("GET " + uriInfo.getAbsolutePath());
} }
if (!servlet.userRequestLimit(user, 1)) {
Response.status(509).build();
}
servlet.getMetrics().incrementRequests(1); servlet.getMetrics().incrementRequests(1);
try { try {
HBaseAdmin admin = new HBaseAdmin(servlet.getConfiguration()); HBaseAdmin admin = new HBaseAdmin(servlet.getConfiguration());

View File

@ -40,6 +40,11 @@ public class TableResource implements Constants {
this.table = table; this.table = table;
} }
@Path("exists")
public ExistsResource getExistsResource() throws IOException {
return new ExistsResource(user, table);
}
@Path("regions") @Path("regions")
public RegionsResource getRegionsResource() throws IOException { public RegionsResource getRegionsResource() throws IOException {
return new RegionsResource(user, table); return new RegionsResource(user, table);
@ -66,4 +71,5 @@ public class TableResource implements Constants {
Response.Status.INTERNAL_SERVER_ERROR); Response.Status.INTERNAL_SERVER_ERROR);
} }
} }
} }

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.util.Bytes;
public class User implements Constants { public class User implements Constants {
public static final User DEFAULT_USER = new User("default", public static final User DEFAULT_USER = new User("default",
"00000000000000000000000000000000", false, true); "00000000000000000000000000000000", true, true);
private String name; private String name;
private String token; private String token;

View File

@ -64,10 +64,13 @@ public class Client {
*/ */
public Client(Cluster cluster) { public Client(Cluster cluster) {
this.cluster = cluster; this.cluster = cluster;
httpClient = new HttpClient(new MultiThreadedHttpConnectionManager()); MultiThreadedHttpConnectionManager manager =
HttpConnectionManagerParams managerParams = new MultiThreadedHttpConnectionManager();
httpClient.getHttpConnectionManager().getParams(); HttpConnectionManagerParams managerParams = manager.getParams();
managerParams.setConnectionTimeout(2000); // 2 s managerParams.setConnectionTimeout(2000); // 2 s
managerParams.setDefaultMaxConnectionsPerHost(10);
managerParams.setMaxTotalConnections(100);
this.httpClient = new HttpClient(manager);
HttpClientParams clientParams = httpClient.getParams(); HttpClientParams clientParams = httpClient.getParams();
clientParams.setVersion(HttpVersion.HTTP_1_1); clientParams.setVersion(HttpVersion.HTTP_1_1);
} }
@ -200,10 +203,13 @@ public class Client {
public Response head(Cluster cluster, String path, Header[] headers) public Response head(Cluster cluster, String path, Header[] headers)
throws IOException { throws IOException {
HeadMethod method = new HeadMethod(); HeadMethod method = new HeadMethod();
try {
int code = execute(cluster, method, null, path); int code = execute(cluster, method, null, path);
headers = method.getResponseHeaders(); headers = method.getResponseHeaders();
method.releaseConnection();
return new Response(code, headers, null); return new Response(code, headers, null);
} finally {
method.releaseConnection();
}
} }
/** /**
@ -276,11 +282,14 @@ public class Client {
public Response get(Cluster c, String path, Header[] headers) public Response get(Cluster c, String path, Header[] headers)
throws IOException { throws IOException {
GetMethod method = new GetMethod(); GetMethod method = new GetMethod();
try {
int code = execute(c, method, headers, path); int code = execute(c, method, headers, path);
headers = method.getResponseHeaders(); headers = method.getResponseHeaders();
byte[] body = method.getResponseBody(); byte[] body = method.getResponseBody();
method.releaseConnection();
return new Response(code, headers, body); return new Response(code, headers, body);
} finally {
method.releaseConnection();
}
} }
/** /**
@ -339,12 +348,15 @@ public class Client {
public Response put(Cluster cluster, String path, Header[] headers, public Response put(Cluster cluster, String path, Header[] headers,
byte[] content) throws IOException { byte[] content) throws IOException {
PutMethod method = new PutMethod(); PutMethod method = new PutMethod();
try {
method.setRequestEntity(new ByteArrayRequestEntity(content)); method.setRequestEntity(new ByteArrayRequestEntity(content));
int code = execute(cluster, method, headers, path); int code = execute(cluster, method, headers, path);
headers = method.getResponseHeaders(); headers = method.getResponseHeaders();
content = method.getResponseBody(); content = method.getResponseBody();
method.releaseConnection();
return new Response(code, headers, content); return new Response(code, headers, content);
} finally {
method.releaseConnection();
}
} }
/** /**
@ -403,12 +415,15 @@ public class Client {
public Response post(Cluster cluster, String path, Header[] headers, public Response post(Cluster cluster, String path, Header[] headers,
byte[] content) throws IOException { byte[] content) throws IOException {
PostMethod method = new PostMethod(); PostMethod method = new PostMethod();
try {
method.setRequestEntity(new ByteArrayRequestEntity(content)); method.setRequestEntity(new ByteArrayRequestEntity(content));
int code = execute(cluster, method, headers, path); int code = execute(cluster, method, headers, path);
headers = method.getResponseHeaders(); headers = method.getResponseHeaders();
content = method.getResponseBody(); content = method.getResponseBody();
method.releaseConnection();
return new Response(code, headers, content); return new Response(code, headers, content);
} finally {
method.releaseConnection();
}
} }
/** /**
@ -430,9 +445,14 @@ public class Client {
*/ */
public Response delete(Cluster cluster, String path) throws IOException { public Response delete(Cluster cluster, String path) throws IOException {
DeleteMethod method = new DeleteMethod(); DeleteMethod method = new DeleteMethod();
try {
int code = execute(cluster, method, null, path); int code = execute(cluster, method, null, path);
Header[] headers = method.getResponseHeaders(); Header[] headers = method.getResponseHeaders();
byte[] content = method.getResponseBody();
return new Response(code, headers, content);
} finally {
method.releaseConnection(); method.releaseConnection();
return new Response(code, headers);
} }
} }
}

View File

@ -46,6 +46,13 @@ public class Cluster {
nodes.addAll(nodes); nodes.addAll(nodes);
} }
/**
* @return true if no locations have been added, false otherwise
*/
public boolean isEmpty() {
return nodes.isEmpty();
}
/** /**
* Add a node to the cluster * Add a node to the cluster
* @param node the service location in 'host:port' format * @param node the service location in 'host:port' format

View File

@ -0,0 +1,188 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.stargate.client;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.stargate.Constants;
import org.apache.hadoop.hbase.stargate.model.TableSchemaModel;
import org.apache.hadoop.hbase.util.Bytes;
public class RemoteAdmin {
final Client client;
final Configuration conf;
final String accessToken;
final int maxRetries;
final long sleepTime;
/**
* Constructor
* @param client
* @param conf
*/
public RemoteAdmin(Client client, Configuration conf) {
this(client, conf, null);
}
/**
* Constructor
* @param client
* @param conf
* @param accessToken
*/
public RemoteAdmin(Client client, Configuration conf, String accessToken) {
this.client = client;
this.conf = conf;
this.accessToken = accessToken;
this.maxRetries = conf.getInt("stargate.client.max.retries", 10);
this.sleepTime = conf.getLong("stargate.client.sleep", 1000);
}
/**
* @param tableName name of table to check
* @return true if all regions of the table are available
* @throws IOException if a remote or network exception occurs
*/
public boolean isTableAvailable(String tableName) throws IOException {
return isTableAvailable(Bytes.toBytes(tableName));
}
/**
* @param tableName name of table to check
* @return true if all regions of the table are available
* @throws IOException if a remote or network exception occurs
*/
public boolean isTableAvailable(byte[] tableName) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append('/');
if (accessToken != null) {
sb.append(accessToken);
sb.append('/');
}
sb.append(Bytes.toStringBinary(tableName));
sb.append('/');
sb.append("exists");
int code = 0;
for (int i = 0; i < maxRetries; i++) {
Response response = client.get(sb.toString());
code = response.getCode();
switch (code) {
case 200:
return true;
case 404:
return false;
case 509:
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) { }
break;
default:
throw new IOException("exists request returned " + code);
}
}
throw new IOException("exists request timed out");
}
/**
* Creates a new table.
* @param desc table descriptor for table
* @throws IOException if a remote or network exception occurs
*/
public void createTable(HTableDescriptor desc)
throws IOException {
TableSchemaModel model = new TableSchemaModel(desc);
StringBuilder sb = new StringBuilder();
sb.append('/');
if (accessToken != null) {
sb.append(accessToken);
sb.append('/');
}
sb.append(Bytes.toStringBinary(desc.getName()));
sb.append('/');
sb.append("schema");
int code = 0;
for (int i = 0; i < maxRetries; i++) {
Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
model.createProtobufOutput());
code = response.getCode();
switch (code) {
case 201:
return;
case 509:
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) { }
break;
default:
throw new IOException("create request returned " + code);
}
}
throw new IOException("create request timed out");
}
/**
* Deletes a table.
* @param tableName name of table to delete
* @throws IOException if a remote or network exception occurs
*/
public void deleteTable(final String tableName) throws IOException {
deleteTable(Bytes.toBytes(tableName));
}
/**
* Deletes a table.
* @param tableName name of table to delete
* @throws IOException if a remote or network exception occurs
*/
public void deleteTable(final byte [] tableName) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append('/');
if (accessToken != null) {
sb.append(accessToken);
sb.append('/');
}
sb.append(Bytes.toStringBinary(tableName));
sb.append('/');
sb.append("schema");
int code = 0;
for (int i = 0; i < maxRetries; i++) {
Response response = client.delete(sb.toString());
code = response.getCode();
switch (code) {
case 200:
return;
case 509:
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) { }
break;
default:
throw new IOException("delete request returned " + code);
}
}
throw new IOException("delete request timed out");
}
}

View File

@ -29,16 +29,13 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
@ -54,7 +51,6 @@ import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.stargate.Constants; import org.apache.hadoop.hbase.stargate.Constants;
import org.apache.hadoop.hbase.stargate.model.CellModel; import org.apache.hadoop.hbase.stargate.model.CellModel;
import org.apache.hadoop.hbase.stargate.model.CellSetModel; import org.apache.hadoop.hbase.stargate.model.CellSetModel;
import org.apache.hadoop.hbase.stargate.model.ColumnSchemaModel;
import org.apache.hadoop.hbase.stargate.model.RowModel; import org.apache.hadoop.hbase.stargate.model.RowModel;
import org.apache.hadoop.hbase.stargate.model.ScannerModel; import org.apache.hadoop.hbase.stargate.model.ScannerModel;
import org.apache.hadoop.hbase.stargate.model.TableSchemaModel; import org.apache.hadoop.hbase.stargate.model.TableSchemaModel;
@ -67,10 +63,12 @@ public class RemoteHTable implements HTableInterface {
private static final Log LOG = LogFactory.getLog(RemoteHTable.class); private static final Log LOG = LogFactory.getLog(RemoteHTable.class);
Client client; final Client client;
Configuration conf; final Configuration conf;
byte[] name; final byte[] name;
String accessToken; final String accessToken;
final int maxRetries;
final long sleepTime;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected String buildRowSpec(final byte[] row, final Map familyMap, protected String buildRowSpec(final byte[] row, final Map familyMap,
@ -210,19 +208,18 @@ public class RemoteHTable implements HTableInterface {
this.conf = conf; this.conf = conf;
this.name = name; this.name = name;
this.accessToken = accessToken; this.accessToken = accessToken;
this.maxRetries = conf.getInt("stargate.client.max.retries", 10);
this.sleepTime = conf.getLong("stargate.client.sleep", 1000);
} }
@Override
public byte[] getTableName() { public byte[] getTableName() {
return name.clone(); return name.clone();
} }
@Override
public Configuration getConfiguration() { public Configuration getConfiguration() {
return conf; return conf;
} }
@Override
public HTableDescriptor getTableDescriptor() throws IOException { public HTableDescriptor getTableDescriptor() throws IOException {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append('/'); sb.append('/');
@ -233,32 +230,30 @@ public class RemoteHTable implements HTableInterface {
sb.append(Bytes.toStringBinary(name)); sb.append(Bytes.toStringBinary(name));
sb.append('/'); sb.append('/');
sb.append("schema"); sb.append("schema");
for (int i = 0; i < maxRetries; i++) {
Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF); Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF);
if (response.getCode() != 200) { int code = response.getCode();
throw new IOException("schema request returned " + response.getCode()); switch (code) {
} case 200:
TableSchemaModel schema = new TableSchemaModel(); TableSchemaModel schema = new TableSchemaModel();
schema.getObjectFromMessage(response.getBody()); schema.getObjectFromMessage(response.getBody());
HTableDescriptor htd = new HTableDescriptor(schema.getName()); return schema.getTableDescriptor();
for (Map.Entry<QName, Object> e: schema.getAny().entrySet()) { case 509:
htd.setValue(e.getKey().getLocalPart(), e.getValue().toString()); try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) { }
break;
default:
throw new IOException("schema request returned " + code);
} }
for (ColumnSchemaModel column: schema.getColumns()) {
HColumnDescriptor hcd = new HColumnDescriptor(column.getName());
for (Map.Entry<QName, Object> e: column.getAny().entrySet()) {
hcd.setValue(e.getKey().getLocalPart(), e.getValue().toString());
} }
htd.addFamily(hcd); throw new IOException("schema request timed out");
}
return htd;
} }
@Override
public void close() throws IOException { public void close() throws IOException {
client.shutdown(); client.shutdown();
} }
@Override
public Result get(Get get) throws IOException { public Result get(Get get) throws IOException {
TimeRange range = get.getTimeRange(); TimeRange range = get.getTimeRange();
String spec = buildRowSpec(get.getRow(), get.getFamilyMap(), String spec = buildRowSpec(get.getRow(), get.getFamilyMap(),
@ -266,14 +261,11 @@ public class RemoteHTable implements HTableInterface {
if (get.getFilter() != null) { if (get.getFilter() != null) {
LOG.warn("filters not supported on gets"); LOG.warn("filters not supported on gets");
} }
for (int i = 0; i < maxRetries; i++) {
Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF); Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF);
int code = response.getCode(); int code = response.getCode();
if (code == 404) { switch (code) {
return new Result(); case 200:
}
if (code != 200) {
throw new IOException("get request returned " + code);
}
CellSetModel model = new CellSetModel(); CellSetModel model = new CellSetModel();
model.getObjectFromMessage(response.getBody()); model.getObjectFromMessage(response.getBody());
Result[] results = buildResultFromModel(model); Result[] results = buildResultFromModel(model);
@ -283,17 +275,27 @@ public class RemoteHTable implements HTableInterface {
} }
return results[0]; return results[0];
} }
// fall through
case 404:
return new Result(); return new Result();
case 509:
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) { }
break;
default:
throw new IOException("get request returned " + code);
}
}
throw new IOException("get request timed out");
} }
@Override
public boolean exists(Get get) throws IOException { public boolean exists(Get get) throws IOException {
LOG.warn("exists() is really get(), just use get()"); LOG.warn("exists() is really get(), just use get()");
Result result = get(get); Result result = get(get);
return (result != null && !(result.isEmpty())); return (result != null && !(result.isEmpty()));
} }
@Override
public void put(Put put) throws IOException { public void put(Put put) throws IOException {
CellSetModel model = buildModelFromPut(put); CellSetModel model = buildModelFromPut(put);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
@ -305,14 +307,25 @@ public class RemoteHTable implements HTableInterface {
sb.append(Bytes.toStringBinary(name)); sb.append(Bytes.toStringBinary(name));
sb.append('/'); sb.append('/');
sb.append(Bytes.toStringBinary(put.getRow())); sb.append(Bytes.toStringBinary(put.getRow()));
for (int i = 0; i < maxRetries; i++) {
Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
model.createProtobufOutput()); model.createProtobufOutput());
if (response.getCode() != 200) { int code = response.getCode();
throw new IOException("put failed with " + response.getCode()); switch (code) {
case 200:
return;
case 509:
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) { }
break;
default:
throw new IOException("put request failed with " + code);
} }
} }
throw new IOException("put request timed out");
}
@Override
public void put(List<Put> puts) throws IOException { public void put(List<Put> puts) throws IOException {
// this is a trick: Stargate accepts multiple rows in a cell set and // this is a trick: Stargate accepts multiple rows in a cell set and
// ignores the row specification in the URI // ignores the row specification in the URI
@ -351,31 +364,52 @@ public class RemoteHTable implements HTableInterface {
} }
sb.append(Bytes.toStringBinary(name)); sb.append(Bytes.toStringBinary(name));
sb.append("/$multiput"); // can be any nonexistent row sb.append("/$multiput"); // can be any nonexistent row
for (int i = 0; i < maxRetries; i++) {
Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF, Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
model.createProtobufOutput()); model.createProtobufOutput());
if (response.getCode() != 200) { int code = response.getCode();
throw new IOException("multiput failed with " + response.getCode()); switch (code) {
case 200:
return;
case 509:
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) { }
break;
default:
throw new IOException("multiput request failed with " + code);
} }
} }
throw new IOException("multiput request timed out");
}
@Override
public void delete(Delete delete) throws IOException { public void delete(Delete delete) throws IOException {
String spec = buildRowSpec(delete.getRow(), delete.getFamilyMap(), String spec = buildRowSpec(delete.getRow(), delete.getFamilyMap(),
delete.getTimeStamp(), delete.getTimeStamp(), 1); delete.getTimeStamp(), delete.getTimeStamp(), 1);
for (int i = 0; i < maxRetries; i++) {
Response response = client.delete(spec); Response response = client.delete(spec);
if (response.getCode() != 200) { int code = response.getCode();
throw new IOException("delete() returned " + response.getCode()); switch (code) {
case 200:
return;
case 509:
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) { }
break;
default:
throw new IOException("delete request failed with " + code);
} }
} }
throw new IOException("delete request timed out");
}
@Override
public void delete(List<Delete> deletes) throws IOException { public void delete(List<Delete> deletes) throws IOException {
for (Delete delete: deletes) { for (Delete delete: deletes) {
delete(delete); delete(delete);
} }
} }
@Override
public void flushCommits() throws IOException { public void flushCommits() throws IOException {
// no-op // no-op
} }
@ -385,6 +419,12 @@ public class RemoteHTable implements HTableInterface {
String uri; String uri;
public Scanner(Scan scan) throws IOException { public Scanner(Scan scan) throws IOException {
ScannerModel model;
try {
model = ScannerModel.fromScan(scan);
} catch (Exception e) {
throw new IOException(e);
}
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
sb.append('/'); sb.append('/');
if (accessToken != null) { if (accessToken != null) {
@ -394,37 +434,53 @@ public class RemoteHTable implements HTableInterface {
sb.append(Bytes.toStringBinary(name)); sb.append(Bytes.toStringBinary(name));
sb.append('/'); sb.append('/');
sb.append("scanner"); sb.append("scanner");
try { for (int i = 0; i < maxRetries; i++) {
ScannerModel model = ScannerModel.fromScan(scan);
Response response = client.post(sb.toString(), Response response = client.post(sb.toString(),
Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput()); Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
if (response.getCode() != 201) { int code = response.getCode();
throw new IOException("scan request failed with " + switch (code) {
response.getCode()); case 201:
}
uri = response.getLocation(); uri = response.getLocation();
} catch (Exception e) { return;
throw new IOException(e); case 509:
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) { }
break;
default:
throw new IOException("scan request failed with " + code);
} }
} }
throw new IOException("scan request timed out");
}
@Override @Override
public Result[] next(int nbRows) throws IOException { public Result[] next(int nbRows) throws IOException {
StringBuilder sb = new StringBuilder(uri); StringBuilder sb = new StringBuilder(uri);
sb.append("?n="); sb.append("?n=");
sb.append(nbRows); sb.append(nbRows);
for (int i = 0; i < maxRetries; i++) {
Response response = client.get(sb.toString(), Response response = client.get(sb.toString(),
Constants.MIMETYPE_PROTOBUF); Constants.MIMETYPE_PROTOBUF);
if (response.getCode() == 206) { int code = response.getCode();
return null; switch (code) {
} case 200:
if (response.getCode() != 200) {
LOG.error("scanner.next failed with " + response.getCode());
return null;
}
CellSetModel model = new CellSetModel(); CellSetModel model = new CellSetModel();
model.getObjectFromMessage(response.getBody()); model.getObjectFromMessage(response.getBody());
return buildResultFromModel(model); return buildResultFromModel(model);
case 204:
case 206:
return null;
case 509:
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) { }
break;
default:
throw new IOException("scanner.next request failed with " + code);
}
}
throw new IOException("scanner.next request timed out");
} }
@Override @Override
@ -488,19 +544,16 @@ public class RemoteHTable implements HTableInterface {
} }
@Override
public ResultScanner getScanner(Scan scan) throws IOException { public ResultScanner getScanner(Scan scan) throws IOException {
return new Scanner(scan); return new Scanner(scan);
} }
@Override
public ResultScanner getScanner(byte[] family) throws IOException { public ResultScanner getScanner(byte[] family) throws IOException {
Scan scan = new Scan(); Scan scan = new Scan();
scan.addFamily(family); scan.addFamily(family);
return new Scanner(scan); return new Scanner(scan);
} }
@Override
public ResultScanner getScanner(byte[] family, byte[] qualifier) public ResultScanner getScanner(byte[] family, byte[] qualifier)
throws IOException { throws IOException {
Scan scan = new Scan(); Scan scan = new Scan();
@ -508,39 +561,32 @@ public class RemoteHTable implements HTableInterface {
return new Scanner(scan); return new Scanner(scan);
} }
@Override
public boolean isAutoFlush() { public boolean isAutoFlush() {
return true; return true;
} }
@Override
public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
throw new IOException("getRowOrBefore not supported"); throw new IOException("getRowOrBefore not supported");
} }
@Override
public RowLock lockRow(byte[] row) throws IOException { public RowLock lockRow(byte[] row) throws IOException {
throw new IOException("lockRow not implemented"); throw new IOException("lockRow not implemented");
} }
@Override
public void unlockRow(RowLock rl) throws IOException { public void unlockRow(RowLock rl) throws IOException {
throw new IOException("unlockRow not implemented"); throw new IOException("unlockRow not implemented");
} }
@Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Put put) throws IOException { byte[] value, Put put) throws IOException {
throw new IOException("checkAndPut not supported"); throw new IOException("checkAndPut not supported");
} }
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
long amount) throws IOException { long amount) throws IOException {
throw new IOException("incrementColumnValue not supported"); throw new IOException("incrementColumnValue not supported");
} }
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
long amount, boolean writeToWAL) throws IOException { long amount, boolean writeToWAL) throws IOException {
throw new IOException("incrementColumnValue not supported"); throw new IOException("incrementColumnValue not supported");

View File

@ -59,6 +59,18 @@ public class TableRegionModel implements Serializable {
*/ */
public TableRegionModel() {} public TableRegionModel() {}
/**
* Constructor
* @param table the table name
* @param id the encoded id of the region
* @param startKey the start key of the region
* @param endKey the end key of the region
*/
public TableRegionModel(String table, long id, byte[] startKey,
byte[] endKey) {
this(table, id, startKey, endKey, null);
}
/** /**
* Constructor * Constructor
* @param table the table name * @param table the table name
@ -173,8 +185,10 @@ public class TableRegionModel implements Serializable {
sb.append(Bytes.toString(startKey)); sb.append(Bytes.toString(startKey));
sb.append("'\n endKey='"); sb.append("'\n endKey='");
sb.append(Bytes.toString(endKey)); sb.append(Bytes.toString(endKey));
if (location != null) {
sb.append("'\n location='"); sb.append("'\n location='");
sb.append(location); sb.append(location);
}
sb.append("'\n]\n"); sb.append("'\n]\n");
return sb.toString(); return sb.toString();
} }

View File

@ -38,9 +38,11 @@ import javax.xml.namespace.QName;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.stargate.ProtobufMessageHandler; import org.apache.hadoop.hbase.stargate.ProtobufMessageHandler;
import org.apache.hadoop.hbase.stargate.protobuf.generated.ColumnSchemaMessage.ColumnSchema; import org.apache.hadoop.hbase.stargate.protobuf.generated.ColumnSchemaMessage.ColumnSchema;
import org.apache.hadoop.hbase.stargate.protobuf.generated.TableSchemaMessage.TableSchema; import org.apache.hadoop.hbase.stargate.protobuf.generated.TableSchemaMessage.TableSchema;
import org.apache.hadoop.hbase.util.Bytes;
/** /**
* A representation of HBase table descriptors. * A representation of HBase table descriptors.
@ -77,6 +79,29 @@ public class TableSchemaModel implements Serializable, ProtobufMessageHandler {
*/ */
public TableSchemaModel() {} public TableSchemaModel() {}
/**
* Constructor
* @param htd the table descriptor
*/
public TableSchemaModel(HTableDescriptor htd) {
setName(htd.getNameAsString());
for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e:
htd.getValues().entrySet()) {
addAttribute(Bytes.toString(e.getKey().get()),
Bytes.toString(e.getValue().get()));
}
for (HColumnDescriptor hcd: htd.getFamilies()) {
ColumnSchemaModel columnModel = new ColumnSchemaModel();
columnModel.setName(hcd.getNameAsString());
for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e:
hcd.getValues().entrySet()) {
columnModel.addAttribute(Bytes.toString(e.getKey().get()),
Bytes.toString(e.getValue().get()));
}
addColumnFamily(columnModel);
}
}
/** /**
* Add an attribute to the table descriptor * Add an attribute to the table descriptor
* @param name attribute name * @param name attribute name
@ -308,4 +333,23 @@ public class TableSchemaModel implements Serializable, ProtobufMessageHandler {
} }
return this; return this;
} }
/**
* @return a table descriptor
*/
public HTableDescriptor getTableDescriptor() {
HTableDescriptor htd = new HTableDescriptor(getName());
for (Map.Entry<QName, Object> e: getAny().entrySet()) {
htd.setValue(e.getKey().getLocalPart(), e.getValue().toString());
}
for (ColumnSchemaModel column: getColumns()) {
HColumnDescriptor hcd = new HColumnDescriptor(column.getName());
for (Map.Entry<QName, Object> e: column.getAny().entrySet()) {
hcd.setValue(e.getKey().getLocalPart(), e.getValue().toString());
}
htd.addFamily(hcd);
}
return htd;
}
} }

View File

@ -70,7 +70,7 @@ public class HTableTokenBucket implements Constants {
HTable table; HTable table;
byte[] row; byte[] row;
int tokens; int tokens;
double rate = 10.0; // default, 10 ops added per second double rate = 20.0; // default, 20 ops added per second
int size = 100; // burst int size = 100; // burst
long lastUpdated = System.currentTimeMillis(); long lastUpdated = System.currentTimeMillis();
long configUpdateInterval; long configUpdateInterval;

View File

@ -20,46 +20,32 @@
package org.apache.hadoop.hbase.stargate.util; package org.apache.hadoop.hbase.stargate.util;
import java.util.ArrayList; import java.util.HashMap;
import java.util.Map;
/** /**
* Generic storage for per user information. * Generic storage for per user information.
*/ */
public class UserData { public class UserData {
public static final int TOKENBUCKET = 0; public static final int TOKENBUCKET = 1;
ArrayList<Object> data = new ArrayList<Object>(); Map<Integer,Object> data = new HashMap<Integer,Object>(1);
public synchronized boolean has(final int sel) { public synchronized boolean has(final int sel) {
try {
return data.get(sel) != null; return data.get(sel) != null;
} catch (IndexOutOfBoundsException e) {
return false;
}
} }
public synchronized Object get(final int sel) { public synchronized Object get(final int sel) {
try {
return data.get(sel); return data.get(sel);
} catch (IndexOutOfBoundsException e) {
return null;
}
} }
public synchronized Object put(final int sel, final Object o) { public synchronized Object put(final int sel, final Object o) {
Object old = null; return data.put(sel, o);
try {
old = data.get(sel);
} catch (IndexOutOfBoundsException e) {
// do nothing
}
data.set(sel, o);
return old;
} }
public synchronized Object remove(int sel) { public synchronized Object remove(int sel) {
return put(sel, null); return remove(sel);
} }
} }

View File

@ -0,0 +1,84 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.stargate.client;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.stargate.MiniClusterTestBase;
import org.apache.hadoop.hbase.stargate.client.Client;
import org.apache.hadoop.hbase.util.Bytes;
public class TestRemoteAdmin extends MiniClusterTestBase {
static final String TABLE_1 = "TestRemoteAdmin_Table_1";
static final String TABLE_2 = "TestRemoteAdmin_Table_2";
static final byte[] COLUMN_1 = Bytes.toBytes("a");
static final HTableDescriptor DESC_1;
static {
DESC_1 = new HTableDescriptor(TABLE_1);
DESC_1.addFamily(new HColumnDescriptor(COLUMN_1));
}
static final HTableDescriptor DESC_2;
static {
DESC_2 = new HTableDescriptor(TABLE_2);
DESC_2.addFamily(new HColumnDescriptor(COLUMN_1));
}
Client client;
HBaseAdmin localAdmin;
RemoteAdmin remoteAdmin;
@Override
protected void setUp() throws Exception {
super.setUp();
localAdmin = new HBaseAdmin(conf);
remoteAdmin = new RemoteAdmin(new Client(
new Cluster().add("localhost", testServletPort)),
conf);
if (localAdmin.tableExists(TABLE_1)) {
localAdmin.disableTable(TABLE_1);
localAdmin.deleteTable(TABLE_1);
}
if (!localAdmin.tableExists(TABLE_2)) {
localAdmin.createTable(DESC_2);
}
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
public void testCreateTable() throws Exception {
assertFalse(remoteAdmin.isTableAvailable(TABLE_1));
remoteAdmin.createTable(DESC_1);
assertTrue(remoteAdmin.isTableAvailable(TABLE_1));
}
public void testDeleteTable() throws Exception {
assertTrue(remoteAdmin.isTableAvailable(TABLE_2));
remoteAdmin.deleteTable(TABLE_2);
assertFalse(remoteAdmin.isTableAvailable(TABLE_2));
}
}