HDFS-11513. Ozone: Separate XceiverServer and XceiverClient into interfaces and implementations.

This commit is contained in:
Tsz-Wo Nicholas Sze 2017-03-09 11:03:20 -08:00
parent 5e95bdeef2
commit fbc8099b63
12 changed files with 140 additions and 88 deletions

View File

@ -33,14 +33,13 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* A Client for the storageContainer protocol.
*/
public class XceiverClient implements Closeable {
public class XceiverClient implements XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
private final Pipeline pipeline;
private final Configuration config;
@ -61,9 +60,7 @@ public class XceiverClient implements Closeable {
this.config = config;
}
/**
* Connects to the leader in the pipeline.
*/
@Override
public void connect() throws Exception {
if (channelFuture != null
&& channelFuture.channel() != null
@ -90,9 +87,6 @@ public class XceiverClient implements Closeable {
channelFuture = b.connect(leader.getHostName(), port).sync();
}
/**
* Close the client.
*/
@Override
public void close() {
if(group != null) {
@ -104,22 +98,12 @@ public class XceiverClient implements Closeable {
}
}
/**
* Returns the pipeline of machines that host the container used by this
* client.
*
* @return pipeline of machines that host the container
*/
@Override
public Pipeline getPipeline() {
return pipeline;
}
/**
* Sends a given command to server and gets the reply back.
* @param request Request
* @return Response to the command
* @throws IOException
*/
@Override
public ContainerProtos.ContainerCommandResponseProto sendCommand(
ContainerProtos.ContainerCommandRequestProto request)
throws IOException {

View File

@ -96,7 +96,7 @@ public class XceiverClientManager {
* @return XceiverClient connected to a container
* @throws IOException if an XceiverClient cannot be acquired
*/
public XceiverClient acquireClient(Pipeline pipeline) throws IOException {
public XceiverClientSpi acquireClient(Pipeline pipeline) throws IOException {
Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getMachines() != null);
Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
@ -109,7 +109,7 @@ public class XceiverClientManager {
return info.getXceiverClient();
} else {
// connection not found, create new, add reference and return
XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
XceiverClientSpi xceiverClient = new XceiverClient(pipeline, conf);
try {
xceiverClient.connect();
} catch (Exception e) {
@ -129,7 +129,7 @@ public class XceiverClientManager {
*
* @param xceiverClient client to release
*/
public void releaseClient(XceiverClient xceiverClient) {
public void releaseClient(XceiverClientSpi xceiverClient) {
Preconditions.checkNotNull(xceiverClient);
String containerName = xceiverClient.getPipeline().getContainerName();
XceiverClientWithAccessInfo info;
@ -147,10 +147,10 @@ public class XceiverClientManager {
* - a reference count, +1 when acquire, -1 when release
*/
private static class XceiverClientWithAccessInfo {
final private XceiverClient xceiverClient;
final private XceiverClientSpi xceiverClient;
final private AtomicInteger referenceCount;
XceiverClientWithAccessInfo(XceiverClient xceiverClient) {
XceiverClientWithAccessInfo(XceiverClientSpi xceiverClient) {
this.xceiverClient = xceiverClient;
this.referenceCount = new AtomicInteger(0);
}
@ -167,7 +167,7 @@ public class XceiverClientManager {
return this.referenceCount.get() != 0;
}
XceiverClient getXceiverClient() {
XceiverClientSpi getXceiverClient() {
return xceiverClient;
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.scm;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
import java.io.IOException;
/**
* A Client for the storageContainer protocol.
*/
public interface XceiverClientSpi extends Closeable {
/**
* Connects to the leader in the pipeline.
*/
void connect() throws Exception;
@Override
void close();
/**
* Returns the pipeline of machines that host the container used by this
* client.
*
* @return pipeline of machines that host the container
*/
Pipeline getPipeline();
/**
* Sends a given command to server and gets the reply back.
* @param request Request
* @return Response to the command
* @throws IOException
*/
ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException;
}

View File

@ -17,8 +17,8 @@
*/
package org.apache.hadoop.scm.client;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
@ -75,7 +75,7 @@ public class ContainerOperationClient implements ScmClient {
@Override
public Pipeline createContainer(String containerId)
throws IOException {
XceiverClient client = null;
XceiverClientSpi client = null;
try {
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(containerId);

View File

@ -29,7 +29,7 @@ import com.google.protobuf.ByteString;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.XceiverClientManager;
/**
@ -47,7 +47,7 @@ public class ChunkInputStream extends InputStream {
private final String key;
private final String traceID;
private XceiverClientManager xceiverClientManager;
private XceiverClient xceiverClient;
private XceiverClientSpi xceiverClient;
private List<ChunkInfo> chunks;
private int chunkOffset;
private List<ByteBuffer> buffers;
@ -63,7 +63,7 @@ public class ChunkInputStream extends InputStream {
* @param traceID container protocol call traceID
*/
public ChunkInputStream(String key, XceiverClientManager xceiverClientManager,
XceiverClient xceiverClient, List<ChunkInfo> chunks, String traceID) {
XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) {
this.key = key;
this.traceID = traceID;
this.xceiverClientManager = xceiverClientManager;

View File

@ -32,8 +32,8 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
/**
* An {@link OutputStream} used by the REST service in combination with the
@ -58,7 +58,7 @@ public class ChunkOutputStream extends OutputStream {
private final String traceID;
private final KeyData.Builder containerKeyData;
private XceiverClientManager xceiverClientManager;
private XceiverClient xceiverClient;
private XceiverClientSpi xceiverClient;
private ByteBuffer buffer;
private final String streamId;
private int chunkIndex;
@ -73,7 +73,7 @@ public class ChunkOutputStream extends OutputStream {
* @param traceID container protocol call args
*/
public ChunkOutputStream(String containerKey, String key,
XceiverClientManager xceiverClientManager, XceiverClient xceiverClient,
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
String traceID) {
this.containerKey = containerKey;
this.key = key;

View File

@ -45,10 +45,10 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import java.io.IOException;
import org.apache.hadoop.scm.XceiverClientSpi;
/**
* Implementation of all container protocol calls performed by Container
@ -71,7 +71,7 @@ public final class ContainerProtocolCalls {
* @return container protocol get key response
* @throws IOException if there is an I/O error while performing the call
*/
public static GetKeyResponseProto getKey(XceiverClient xceiverClient,
public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient,
KeyData containerKeyData, String traceID) throws IOException {
GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
.newBuilder()
@ -96,7 +96,7 @@ public final class ContainerProtocolCalls {
* @param traceID container protocol call args
* @throws IOException if there is an I/O error while performing the call
*/
public static void putKey(XceiverClient xceiverClient,
public static void putKey(XceiverClientSpi xceiverClient,
KeyData containerKeyData, String traceID) throws IOException {
PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
.newBuilder()
@ -122,7 +122,7 @@ public final class ContainerProtocolCalls {
* @return container protocol read chunk response
* @throws IOException if there is an I/O error while performing the call
*/
public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient,
public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient,
ChunkInfo chunk, String key, String traceID)
throws IOException {
ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
@ -151,7 +151,7 @@ public final class ContainerProtocolCalls {
* @param traceID container protocol call args
* @throws IOException if there is an I/O error while performing the call
*/
public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk,
public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
String key, ByteString data, String traceID)
throws IOException {
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
@ -183,7 +183,7 @@ public final class ContainerProtocolCalls {
* @param traceID - Trace ID for logging purpose.
* @throws IOException
*/
public static void writeSmallFile(XceiverClient client, String containerName,
public static void writeSmallFile(XceiverClientSpi client, String containerName,
String key, byte[] data, String traceID) throws IOException {
KeyData containerKeyData = KeyData
@ -224,7 +224,7 @@ public final class ContainerProtocolCalls {
* @param traceID - traceID
* @throws IOException
*/
public static void createContainer(XceiverClient client, String traceID)
public static void createContainer(XceiverClientSpi client, String traceID)
throws IOException {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto
@ -255,7 +255,7 @@ public final class ContainerProtocolCalls {
* @return GetSmallFileResponseProto
* @throws IOException
*/
public static GetSmallFileResponseProto readSmallFile(XceiverClient client,
public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
String containerName, String key, String traceID) throws IOException {
KeyData containerKeyData = KeyData
.newBuilder()

View File

@ -36,7 +36,7 @@ import java.io.IOException;
* Creates a netty server endpoint that acts as the communication layer for
* Ozone containers.
*/
public final class XceiverServer {
public final class XceiverServer implements XceiverServerSpi {
private final int port;
private final ContainerDispatcher storageContainer;
@ -57,11 +57,7 @@ public final class XceiverServer {
this.storageContainer = dispatcher;
}
/**
* Starts running the server.
*
* @throws IOException
*/
@Override
public void start() throws IOException {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
@ -75,11 +71,7 @@ public final class XceiverServer {
.channel();
}
/**
* Stops a running server.
*
* @throws Exception
*/
@Override
public void stop() {
if (bossGroup != null) {
bossGroup.shutdownGracefully();

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server;
import java.io.IOException;
/** A server endpoint that acts as the communication layer for Ozone containers. */
public interface XceiverServerSpi {
/** Starts the server. */
void start() throws IOException;
/** Stops a running server. */
void stop();
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,7 +52,7 @@ public class OzoneContainer {
private final Configuration ozoneConfig;
private final ContainerDispatcher dispatcher;
private final ContainerManager manager;
private final XceiverServer server;
private final XceiverServerSpi server;
private final ChunkManager chunkManager;
private final KeyManager keyManager;

View File

@ -18,20 +18,6 @@
package org.apache.hadoop.ozone.web.storage;
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.*;
import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.TimeZone;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
@ -40,7 +26,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
@ -50,17 +35,21 @@ import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.response.VolumeOwner;
import org.apache.hadoop.ozone.web.response.*;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.storage.ChunkInputStream;
import org.apache.hadoop.scm.storage.ChunkOutputStream;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.*;
import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.getKey;
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
/**
* A {@link StorageHandler} implementation that distributes object storage
* across the nodes of an HDFS cluster.
@ -87,7 +76,7 @@ public final class DistributedStorageHandler implements StorageHandler {
@Override
public void createVolume(VolumeArgs args) throws IOException, OzoneException {
String containerKey = buildContainerKey(args.getVolumeName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
try {
VolumeInfo volume = new VolumeInfo();
volume.setVolumeName(args.getVolumeName());
@ -137,7 +126,7 @@ public final class DistributedStorageHandler implements StorageHandler {
public VolumeInfo getVolumeInfo(VolumeArgs args)
throws IOException, OzoneException {
String containerKey = buildContainerKey(args.getVolumeName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
try {
KeyData containerKeyData = containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey);
@ -155,7 +144,7 @@ public final class DistributedStorageHandler implements StorageHandler {
throws IOException, OzoneException {
String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
try {
BucketInfo bucket = new BucketInfo();
bucket.setVolumeName(args.getVolumeName());
@ -215,7 +204,7 @@ public final class DistributedStorageHandler implements StorageHandler {
throws IOException, OzoneException {
String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
try {
KeyData containerKeyData = containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey);
@ -236,7 +225,7 @@ public final class DistributedStorageHandler implements StorageHandler {
KeyInfo key = new KeyInfo();
key.setKeyName(args.getKeyName());
key.setCreatedOn(dateToString(new Date()));
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
return new ChunkOutputStream(containerKey, key.getKeyName(),
xceiverClientManager, xceiverClient, args.getRequestID());
}
@ -252,7 +241,7 @@ public final class DistributedStorageHandler implements StorageHandler {
OzoneException {
String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName(), args.getKeyName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
boolean success = false;
try {
KeyData containerKeyData = containerKeyDataForRead(
@ -286,7 +275,7 @@ public final class DistributedStorageHandler implements StorageHandler {
}
/**
* Acquires an {@link XceiverClient} connected to a {@link Pipeline} of nodes
* Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline} of nodes
* capable of serving container protocol operations. The container is
* selected based on the specified container key.
*
@ -294,7 +283,7 @@ public final class DistributedStorageHandler implements StorageHandler {
* @return XceiverClient connected to a container
* @throws IOException if an XceiverClient cannot be acquired
*/
private XceiverClient acquireXceiverClient(String containerKey)
private XceiverClientSpi acquireXceiverClient(String containerKey)
throws IOException {
Set<LocatedContainer> locatedContainers =
storageContainerLocation.getStorageContainerLocations(

View File

@ -23,8 +23,8 @@ import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
@ -75,7 +75,7 @@ public class TestContainerSmallFile {
String containerName = "container0";
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(containerName);
XceiverClient client = xceiverClientManager.acquireClient(pipeline);
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID);
ContainerProtocolCalls.writeSmallFile(client, containerName,
@ -93,7 +93,7 @@ public class TestContainerSmallFile {
String containerName = "container1";
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(containerName);
XceiverClient client = xceiverClientManager.acquireClient(pipeline);
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID);
thrown.expect(StorageContainerException.class);
@ -112,7 +112,7 @@ public class TestContainerSmallFile {
String containerName = "container2";
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(containerName);
XceiverClient client = xceiverClientManager.acquireClient(pipeline);
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
ContainerProtocolCalls.createContainer(client, traceID);
ContainerProtocolCalls.writeSmallFile(client, containerName,
"key", "data123".getBytes(), traceID);