HDFS-8527. OzoneHandler: Integration of REST interface and container data pipeline back-end. Contributed by Chris Nauroth

This commit is contained in:
Anu Engineer 2015-11-23 10:32:53 -08:00 committed by Owen O'Malley
parent a301827c6a
commit ae109d1de7
16 changed files with 1196 additions and 1 deletions

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY;
import java.io.IOException;
import com.sun.jersey.api.container.ContainerFactory;
import com.sun.jersey.api.core.ApplicationAdapter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.ObjectStoreApplication;
import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer;
import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler;
import org.apache.hadoop.ozone.web.localstorage.LocalStorageHandler;
/**
* Implements object store handling within the DataNode process. This class is
* responsible for initializing and maintaining the RPC clients and servers and
* the web application required for the object store implementation.
*/
public final class ObjectStoreHandler {
private final ObjectStoreJerseyContainer objectStoreJerseyContainer;
/**
* Creates a new ObjectStoreHandler.
*
* @param conf configuration
* @throws IOException if there is an I/O error
*/
public ObjectStoreHandler(Configuration conf) throws IOException {
String shType = conf.getTrimmed(DFS_STORAGE_HANDLER_TYPE_KEY,
DFS_STORAGE_HANDLER_TYPE_DEFAULT);
final StorageHandler storageHandler;
if ("distributed".equalsIgnoreCase(shType)) {
storageHandler = new DistributedStorageHandler();
} else {
if ("local".equalsIgnoreCase(shType)) {
storageHandler = new LocalStorageHandler();
} else {
throw new IllegalArgumentException(
String.format("Unrecognized value for %s: %s",
DFS_STORAGE_HANDLER_TYPE_KEY, shType));
}
}
this.objectStoreJerseyContainer = ContainerFactory.createContainer(
ObjectStoreJerseyContainer.class, new ApplicationAdapter(
new ObjectStoreApplication()));
this.objectStoreJerseyContainer.setStorageHandler(storageHandler);
}
/**
* Returns the initialized web application container.
*
* @return initialized web application container
*/
public ObjectStoreJerseyContainer getObjectStoreJerseyContainer() {
return this.objectStoreJerseyContainer;
}
}

View File

@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -28,6 +28,12 @@ public final class OzoneConfigKeys {
public static final String DFS_STORAGE_LOCAL_ROOT =
"dfs.ozone.localstorage.root";
public static final String DFS_STORAGE_LOCAL_ROOT_DEFAULT = "/tmp/ozone";
public static final String DFS_OBJECTSTORE_ENABLED_KEY =
"dfs.objectstore.enabled";
public static final boolean DFS_OBJECTSTORE_ENABLED_DEFAULT = false;
public static final String DFS_STORAGE_HANDLER_TYPE_KEY =
"dfs.storage.handler.type";
public static final String DFS_STORAGE_HANDLER_TYPE_DEFAULT = "distributed";
/**
* There is no need to instantiate this class.

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web;
import org.apache.hadoop.ozone.web.handlers.BucketHandler;
import org.apache.hadoop.ozone.web.handlers.VolumeHandler;
import javax.ws.rs.core.Application;
import java.util.HashSet;
import java.util.Set;
/**
* Ozone Application.
*/
public class ObjectStoreApplication extends Application {
public ObjectStoreApplication() {
super();
}
@Override
public Set<Class<?>> getClasses() {
HashSet<Class<?>> set = new HashSet<>();
set.add(BucketHandler.class);
set.add(VolumeHandler.class);
return set;
}
@Override
public Set<Object> getSingletons() {
HashSet<Object> set = new HashSet<>();
return set;
}
}

View File

@ -35,6 +35,7 @@ public final class Header {
public static final String OZONE_USER = "x-ozone-user";
public static final String OZONE_SIMPLE_AUTHENTICATION_SCHEME = "OZONE";
public static final String OZONE_VERSION_HEADER = "x-ozone-version";
public static final String OZONE_V1_VERSION_HEADER ="v1";
public static final String OZONE_LIST_QUERY_SERVICE = "service";
public static final String OZONE_LIST_QUERY_VOLUME = "volume";

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Ozone HTTP header definitions.
*/
@InterfaceAudience.Private
package org.apache.hadoop.ozone.web.headers;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.netty;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.hadoop.io.IOUtils;
import java.io.Closeable;
/**
* A {@link ChannelFutureListener} that closes {@link Closeable} resources.
*/
final class CloseableCleanupListener implements ChannelFutureListener {
private final Closeable[] closeables;
/**
* Creates a new CloseableCleanupListener.
*
* @param closeables any number of closeable resources
*/
public CloseableCleanupListener(Closeable... closeables) {
this.closeables = closeables;
}
@Override
public void operationComplete(ChannelFuture future) {
IOUtils.cleanup(null, closeables);
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.netty;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/**
* Abstract base class for the multiple Netty channel handlers used in the
* Object Store Netty channel pipeline.
*/
abstract class ObjectStoreChannelHandler<T>
extends SimpleChannelInboundHandler<T> {
/** Log usable in all subclasses. */
protected static final Logger LOG =
LoggerFactory.getLogger(ObjectStoreChannelHandler.class);
/**
* Handles uncaught exceptions in the channel pipeline by sending an internal
* server error response if the channel is still active.
*
* @param ctx ChannelHandlerContext to receive response
* @param cause Throwable that was unhandled in the channel pipeline
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.error("Unexpected exception in Netty pipeline.", cause);
if (ctx.channel().isActive()) {
sendErrorResponse(ctx, INTERNAL_SERVER_ERROR);
}
}
/**
* Sends an error response. This method is used when an unexpected error is
* encountered within the channel pipeline, outside of the actual Object Store
* application. It always closes the connection, because we can't in general
* know the state of the connection when these errors occur, so attempting to
* keep the connection alive could be unpredictable.
*
* @param ctx ChannelHandlerContext to receive response
* @param status HTTP response status
*/
protected static void sendErrorResponse(ChannelHandlerContext ctx,
HttpResponseStatus status) {
HttpResponse nettyResp = new DefaultFullHttpResponse(HTTP_1_1, status);
nettyResp.headers().set(CONTENT_LENGTH, 0);
nettyResp.headers().set(CONNECTION, CLOSE);
ctx.writeAndFlush(nettyResp).addListener(ChannelFutureListener.CLOSE);
}
}

View File

@ -0,0 +1,347 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.netty;
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING;
import static io.netty.handler.codec.http.HttpHeaderValues.CLOSE;
import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import com.sun.jersey.core.header.InBoundHeaders;
import com.sun.jersey.spi.container.ContainerRequest;
import com.sun.jersey.spi.container.ContainerResponse;
import com.sun.jersey.spi.container.ContainerResponseWriter;
import com.sun.jersey.spi.container.WebApplication;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderUtil;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.handlers.StorageHandlerBuilder;
/**
* This is a custom Jersey container that hosts the Object Store web
* application. It supports dispatching an inbound Netty {@link HttpRequest}
* to the Object Store Jersey application. Request dispatching must run
* asynchronously, because the Jersey application must consume the inbound
* HTTP request from a piped stream and produce the outbound HTTP response
* for another piped stream.The Netty channel handlers consume the connected
* ends of these piped streams. Request dispatching cannot run directly on
* the Netty threads, or there would be a risk of deadlock (one thread
* producing/consuming its end of the pipe while no other thread is
* producing/consuming the opposite end).
*/
public final class ObjectStoreJerseyContainer {
private static final Logger LOG =
LoggerFactory.getLogger(ObjectStoreJerseyContainer.class);
private final WebApplication webapp;
private StorageHandler storageHandler;
/**
* Creates a new ObjectStoreJerseyContainer.
*
* @param webapp web application
*/
public ObjectStoreJerseyContainer(WebApplication webapp) {
this.webapp = webapp;
}
/**
* Sets the {@link StorageHandler}. This must be called before dispatching any
* requests.
*
* @param newStorageHandler {@link StorageHandler} implementation
*/
public void setStorageHandler(StorageHandler newStorageHandler) {
this.storageHandler = newStorageHandler;
}
/**
* Asynchronously executes an HTTP request.
*
* @param nettyReq HTTP request
* @param reqIn input stream for reading request body
* @param respOut output stream for writing response body
*/
public Future<HttpResponse> dispatch(HttpRequest nettyReq, InputStream reqIn,
OutputStream respOut) {
// The request executes on a separate background thread. As soon as enough
// processing has completed to bootstrap the outbound response, the thread
// counts down on a latch. This latch also unblocks callers trying to get
// the asynchronous response out of the returned future.
final CountDownLatch latch = new CountDownLatch(1);
final RequestRunner runner = new RequestRunner(nettyReq, reqIn, respOut,
latch);
final Thread thread = new Thread(runner);
thread.setDaemon(true);
thread.start();
return new Future<HttpResponse>() {
private volatile boolean isCancelled = false;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (latch.getCount() == 0) {
return false;
}
if (!mayInterruptIfRunning) {
return false;
}
if (!thread.isAlive()) {
return false;
}
thread.interrupt();
try {
thread.join();
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to cancel dispatch thread.");
Thread.currentThread().interrupt();
return false;
}
isCancelled = true;
return true;
}
@Override
public HttpResponse get()
throws InterruptedException, ExecutionException {
checkCancelled();
latch.await();
return this.getOrThrow();
}
@Override
public HttpResponse get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
checkCancelled();
if (!latch.await(timeout, unit)) {
throw new TimeoutException(String.format(
"Timed out waiting for HttpResponse after %d %s.",
timeout, unit.toString().toLowerCase()));
}
return this.getOrThrow();
}
@Override
public boolean isCancelled() {
return isCancelled;
}
@Override
public boolean isDone() {
return !isCancelled && latch.getCount() == 0;
}
private void checkCancelled() {
if (isCancelled()) {
throw new CancellationException();
}
}
private HttpResponse getOrThrow() throws ExecutionException {
try {
return runner.getResponse();
} catch (Exception e) {
throw new ExecutionException(e);
}
}
};
}
/**
* Runs the actual handling of the HTTP request.
*/
private final class RequestRunner implements Runnable,
ContainerResponseWriter {
private final CountDownLatch latch;
private final HttpRequest nettyReq;
private final InputStream reqIn;
private final OutputStream respOut;
private Exception exception;
private HttpResponse nettyResp;
/**
* Creates a new RequestRunner.
*
* @param nettyReq HTTP request
* @param reqIn input stream for reading request body
* @param respOut output stream for writing response body
* @param latch for coordinating asynchronous return of HTTP response
*/
public RequestRunner(HttpRequest nettyReq, InputStream reqIn,
OutputStream respOut, CountDownLatch latch) {
this.latch = latch;
this.nettyReq = nettyReq;
this.reqIn = reqIn;
this.respOut = respOut;
}
@Override
public void run() {
LOG.trace("begin RequestRunner, nettyReq = {}", this.nettyReq);
StorageHandlerBuilder.setStorageHandler(
ObjectStoreJerseyContainer.this.storageHandler);
try {
ContainerRequest jerseyReq = nettyRequestToJerseyRequest(
ObjectStoreJerseyContainer.this.webapp, this.nettyReq, this.reqIn);
ObjectStoreJerseyContainer.this.webapp.handleRequest(jerseyReq, this);
} catch (Exception e) {
this.exception = e;
this.latch.countDown();
} finally {
IOUtils.cleanup(null, this.reqIn, this.respOut);
StorageHandlerBuilder.removeStorageHandler();
}
LOG.trace("end RequestRunner, nettyReq = {}", this.nettyReq);
}
/**
* This is a callback triggered by Jersey as soon as dispatch has completed
* to the point of knowing what kind of response to return. We save the
* response and trigger the latch to unblock callers waiting on the
* asynchronous return of the response. Our response always sets a
* Content-Length header. (We do not support Transfer-Encoding: chunked.)
* We also return the output stream for Jersey to use for writing the
* response body.
*
* @param contentLength length of response
* @param jerseyResp HTTP response returned by Jersey
* @return OutputStream for Jersey to use for writing the response body
*/
@Override
public OutputStream writeStatusAndHeaders(long contentLength,
ContainerResponse jerseyResp) {
LOG.trace(
"begin writeStatusAndHeaders, contentLength = {}, jerseyResp = {}.",
contentLength, jerseyResp);
this.nettyResp = jerseyResponseToNettyResponse(jerseyResp);
this.nettyResp.headers().set(CONTENT_LENGTH, Math.max(0, contentLength));
this.nettyResp.headers().set(CONNECTION,
HttpHeaderUtil.isKeepAlive(this.nettyReq) ? KEEP_ALIVE : CLOSE);
this.latch.countDown();
LOG.trace(
"end writeStatusAndHeaders, contentLength = {}, jerseyResp = {}.",
contentLength, jerseyResp);
return this.respOut;
}
/**
* This is a callback triggered by Jersey after it has completed writing the
* response body to the stream. We must close the stream here to unblock
* the Netty thread consuming the last chunk of the response from the input
* end of the piped stream.
*
* @throws IOException if there is an I/O error
*/
@Override
public void finish() throws IOException {
IOUtils.cleanup(null, this.respOut);
}
/**
* Gets the HTTP response calculated by the Jersey application, or throws an
* exception if an error occurred during processing. It only makes sense to
* call this method after waiting on the latch to trigger.
*
* @return HTTP response
* @throws Exception if there was an error executing the request
*/
public HttpResponse getResponse() throws Exception {
if (this.exception != null) {
throw this.exception;
}
return this.nettyResp;
}
}
/**
* Converts a Jersey HTTP response object to a Netty HTTP response object.
*
* @param jerseyResp Jersey HTTP response
* @return Netty HTTP response
*/
private static HttpResponse jerseyResponseToNettyResponse(
ContainerResponse jerseyResp) {
HttpResponse nettyResp = new DefaultHttpResponse(HTTP_1_1,
HttpResponseStatus.valueOf(jerseyResp.getStatus()));
for (Map.Entry<String, List<Object>> header :
jerseyResp.getHttpHeaders().entrySet()) {
if (!header.getKey().equalsIgnoreCase(CONTENT_LENGTH.toString()) &&
!header.getKey().equalsIgnoreCase(TRANSFER_ENCODING.toString())) {
nettyResp.headers().set(header.getKey(), header.getValue());
}
}
return nettyResp;
}
/**
* Converts a Netty HTTP request object to a Jersey HTTP request object.
*
* @param webapp web application
* @param nettyReq Netty HTTP request
* @param reqIn input stream for reading request body
* @return Jersey HTTP request
* @throws URISyntaxException if there is an error handling the request URI
*/
private static ContainerRequest nettyRequestToJerseyRequest(
WebApplication webapp, HttpRequest nettyReq, InputStream reqIn)
throws URISyntaxException {
HttpHeaders nettyHeaders = nettyReq.headers();
InBoundHeaders jerseyHeaders = new InBoundHeaders();
for (String name : nettyHeaders.names()) {
jerseyHeaders.put(name, nettyHeaders.getAll(name));
}
String host = nettyHeaders.get(HOST);
String scheme = host.startsWith("https") ? "https://" : "http://";
String baseUri = scheme + host + "/";
String reqUri = scheme + host + nettyReq.uri();
LOG.trace("baseUri = {}, reqUri = {}", baseUri, reqUri);
return new ContainerRequest(webapp, nettyReq.method().name(),
new URI(baseUri), new URI(reqUri), jerseyHeaders, reqIn);
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.netty;
import com.sun.jersey.api.container.ContainerException;
import com.sun.jersey.api.core.ResourceConfig;
import com.sun.jersey.spi.container.ContainerProvider;
import com.sun.jersey.spi.container.WebApplication;
/**
* This is a Jersey {@link ContainerProvider} capable of boostrapping the
* Object Store web application into a custom container. It must be registered
* using the Java service loader mechanism by listing it in
* META-INF/services/com.sun.jersey.spi.container.ContainerProvider .
*/
public final class ObjectStoreJerseyContainerProvider
implements ContainerProvider<ObjectStoreJerseyContainer> {
@Override
public ObjectStoreJerseyContainer createContainer(
Class<ObjectStoreJerseyContainer> type, ResourceConfig conf,
WebApplication webapp) throws ContainerException {
return new ObjectStoreJerseyContainer(webapp);
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.netty;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderUtil;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.stream.ChunkedStream;
import org.apache.hadoop.io.IOUtils;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Future;
/**
* Object Store Netty channel pipeline handler that handles inbound
* {@link HttpContent} fragments for the request body by sending the bytes into
* the pipe so that the application dispatch thread can read it.
* After receiving the {@link LastHttpContent}, this handler also flushes the
* response.
*/
public final class RequestContentObjectStoreChannelHandler
extends ObjectStoreChannelHandler<HttpContent> {
private final HttpRequest nettyReq;
private final Future<HttpResponse> nettyResp;
private final OutputStream reqOut;
private final InputStream respIn;
/**
* Creates a new RequestContentObjectStoreChannelHandler.
*
* @param nettyReq HTTP request
* @param nettyResp asynchronous HTTP response
* @param reqOut output stream for writing request body
* @param respIn input stream for reading response body
*/
public RequestContentObjectStoreChannelHandler(HttpRequest nettyReq,
Future<HttpResponse> nettyResp, OutputStream reqOut, InputStream respIn) {
this.nettyReq = nettyReq;
this.nettyResp = nettyResp;
this.reqOut = reqOut;
this.respIn = respIn;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpContent content)
throws Exception {
LOG.trace(
"begin RequestContentObjectStoreChannelHandler channelRead0, " +
"ctx = {}, content = {}", ctx, content);
content.content().readBytes(this.reqOut, content.content().readableBytes());
if (content instanceof LastHttpContent) {
IOUtils.cleanup(null, this.reqOut);
ctx.write(this.nettyResp.get());
ChannelFuture respFuture = ctx.writeAndFlush(new ChunkedStream(
this.respIn));
respFuture.addListener(new CloseableCleanupListener(this.respIn));
if (!HttpHeaderUtil.isKeepAlive(this.nettyReq)) {
respFuture.addListener(ChannelFutureListener.CLOSE);
}
}
LOG.trace(
"end RequestContentObjectStoreChannelHandler channelRead0, " +
"ctx = {}, content = {}", ctx, content);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
super.exceptionCaught(ctx, cause);
IOUtils.cleanup(null, this.reqOut, this.respIn);
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderUtil;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import org.apache.hadoop.io.IOUtils;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.Future;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/**
* Object Store Netty channel pipeline handler that handles an inbound
* {@link HttpRequest} by dispatching it to the Object Store Jersey container.
* The handler establishes 2 sets of connected piped streams: one for inbound
* request handling and another for outbound response handling. The relevant
* ends of these pipes are handed off to the Jersey application dispatch and the
* next channel handler, which is responsible for streaming in the inbound
* request body and flushing out the response body.
*/
public final class RequestDispatchObjectStoreChannelHandler
extends ObjectStoreChannelHandler<HttpRequest> {
private final ObjectStoreJerseyContainer jerseyContainer;
private PipedInputStream reqIn;
private PipedOutputStream reqOut;
private PipedInputStream respIn;
private PipedOutputStream respOut;
/**
* Creates a new RequestDispatchObjectStoreChannelHandler.
*
* @param jerseyContainer Object Store application Jersey container for
* request dispatch
*/
public RequestDispatchObjectStoreChannelHandler(
ObjectStoreJerseyContainer jerseyContainer) {
this.jerseyContainer = jerseyContainer;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpRequest nettyReq)
throws Exception {
LOG.trace("begin RequestDispatchObjectStoreChannelHandler channelRead0, " +
"ctx = {}, nettyReq = {}", ctx, nettyReq);
if (!nettyReq.decoderResult().isSuccess()) {
sendErrorResponse(ctx, BAD_REQUEST);
return;
}
this.reqIn = new PipedInputStream();
this.reqOut = new PipedOutputStream(reqIn);
this.respIn = new PipedInputStream();
this.respOut = new PipedOutputStream(respIn);
if (HttpHeaderUtil.is100ContinueExpected(nettyReq)) {
LOG.trace("Sending continue response.");
ctx.writeAndFlush(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
Future<HttpResponse> nettyResp = this.jerseyContainer.dispatch(nettyReq,
reqIn, respOut);
ctx.pipeline().replace(this,
RequestContentObjectStoreChannelHandler.class.getSimpleName(),
new RequestContentObjectStoreChannelHandler(nettyReq, nettyResp,
reqOut, respIn));
LOG.trace("end RequestDispatchObjectStoreChannelHandler channelRead0, " +
"ctx = {}, nettyReq = {}", ctx, nettyReq);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
super.exceptionCaught(ctx, cause);
IOUtils.cleanup(null, this.reqIn, this.reqOut, this.respIn, this.respOut);
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Netty-based HTTP server implementation for Ozone.
*/
@InterfaceAudience.Private
package org.apache.hadoop.ozone.web.netty;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.storage;
import java.io.IOException;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
/**
* A {@link StorageHandler} implementation that distributes object storage
* across the nodes of an HDFS cluster.
*/
public final class DistributedStorageHandler implements StorageHandler {
@Override
public void createVolume(VolumeArgs args) throws
IOException, OzoneException {
}
@Override
public void setVolumeOwner(VolumeArgs args) throws
IOException, OzoneException {
}
@Override
public void setVolumeQuota(VolumeArgs args, boolean remove)
throws IOException, OzoneException {
}
@Override
public boolean checkVolumeAccess(VolumeArgs args)
throws IOException, OzoneException {
return false;
}
@Override
public ListVolumes listVolumes(UserArgs args)
throws IOException, OzoneException {
return null;
}
@Override
public void deleteVolume(VolumeArgs args)
throws IOException, OzoneException {
}
@Override
public VolumeInfo getVolumeInfo(VolumeArgs args)
throws IOException, OzoneException {
return null;
}
@Override
public void createBucket(BucketArgs args)
throws IOException, OzoneException {
}
@Override
public void setBucketAcls(BucketArgs args)
throws IOException, OzoneException {
}
@Override
public void setBucketVersioning(BucketArgs args)
throws IOException, OzoneException {
}
@Override
public void setBucketStorageClass(BucketArgs args)
throws IOException, OzoneException {
}
@Override
public void deleteBucket(BucketArgs args)
throws IOException, OzoneException {
}
@Override
public void checkBucketAccess(BucketArgs args)
throws IOException, OzoneException {
}
@Override
public ListBuckets listBuckets(VolumeArgs args)
throws IOException, OzoneException {
return null;
}
@Override
public BucketInfo getBucketInfo(BucketArgs args)
throws IOException, OzoneException {
return null;
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Ozone storage handler implementation integrating REST interface front-end
* with container data pipeline back-end.
*/
@InterfaceAudience.Private
package org.apache.hadoop.ozone.web.storage;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainerProvider

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web;
import static java.net.HttpURLConnection.HTTP_CREATED;
import static org.apache.hadoop.ozone.web.utils.OzoneUtils.*;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import javax.ws.rs.core.HttpHeaders;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.web.headers.Header;
import org.apache.hadoop.ozone.web.utils.OzoneConsts;
import org.apache.hadoop.util.Time;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.DefaultHttpClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
public class TestOzoneWebAccess {
private static MiniDFSCluster cluster;
private static int port;
@Rule
public Timeout timeout = new Timeout(30000);
/**
* Create a MiniDFSCluster for testing.
*
* Ozone is made active by setting DFS_OBJECTSTORE_ENABLED_KEY = true and
* DFS_STORAGE_HANDLER_TYPE_KEY = "local" , which uses a local directory to
* emulate Ozone backend.
*
* @throws IOException
*/
@BeforeClass
public static void init() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
String path = new Path(
System.getProperty("test.build.data", "target/test/data"),
TestOzoneWebAccess.class.getSimpleName()).toUri().getPath();
conf.set(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, path);
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local");
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DataNode dataNode = cluster.getDataNodes().get(0);
port = dataNode.getInfoPort();
}
/**
* shutdown MiniDFSCluster
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Send a vaild Ozone Request.
*
* @throws IOException
*/
@Test
public void testOzoneRequest() throws IOException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
HttpClient client = new DefaultHttpClient();
String volumeName = getRequestID().toLowerCase(Locale.US);
try {
HttpPost httppost = new HttpPost(
String.format("http://localhost:%d/%s", port, volumeName));
httppost.addHeader(Header.OZONE_VERSION_HEADER,
Header.OZONE_V1_VERSION_HEADER);
httppost.addHeader(HttpHeaders.DATE,
format.format(new Date(Time.now())));
httppost.addHeader(HttpHeaders.AUTHORIZATION,
Header.OZONE_SIMPLE_AUTHENTICATION_SCHEME + " " +
OzoneConsts.OZONE_SIMPLE_HDFS_USER);
httppost.addHeader(Header.OZONE_USER, OzoneConsts.OZONE_SIMPLE_HDFS_USER);
HttpResponse response = client.execute(httppost);
assertEquals(response.toString(), HTTP_CREATED,
response.getStatusLine().getStatusCode());
} finally {
client.getConnectionManager().shutdown();
}
}
}