Revert "HDDS-705. OS3Exception resource name should be the actual resource name."

This reverts commit 977c6f6470.

There was spurious edit in this commit.
This commit is contained in:
Nanda kumar 2018-10-22 15:37:40 +05:30
parent e50334513c
commit c696419f3e
28 changed files with 158 additions and 364 deletions

View File

@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc; import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
@ -41,9 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.UUID;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -54,9 +50,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
private final Pipeline pipeline; private final Pipeline pipeline;
private final Configuration config; private final Configuration config;
private Map<UUID, XceiverClientProtocolServiceStub> asyncStubs; private XceiverClientProtocolServiceStub asyncStub;
private XceiverClientMetrics metrics; private XceiverClientMetrics metrics;
private Map<UUID, ManagedChannel> channels; private ManagedChannel channel;
private final Semaphore semaphore; private final Semaphore semaphore;
private boolean closed = false; private boolean closed = false;
@ -76,62 +72,46 @@ public class XceiverClientGrpc extends XceiverClientSpi {
this.semaphore = this.semaphore =
new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config)); new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
this.metrics = XceiverClientManager.getXceiverClientMetrics(); this.metrics = XceiverClientManager.getXceiverClientMetrics();
this.channels = new HashMap<>();
this.asyncStubs = new HashMap<>();
} }
@Override @Override
public void connect() throws Exception { public void connect() throws Exception {
// leader by default is the 1st datanode in the datanode list of pipleline
DatanodeDetails leader = this.pipeline.getLeader(); DatanodeDetails leader = this.pipeline.getLeader();
// just make a connection to the 1st datanode at the beginning
connectToDatanode(leader);
}
private void connectToDatanode(DatanodeDetails dn) {
// read port from the data node, on failure use default configured // read port from the data node, on failure use default configured
// port. // port.
int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue(); int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
if (port == 0) { if (port == 0) {
port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
} }
LOG.debug("Connecting to server Port : " + dn.getIpAddress()); LOG.debug("Connecting to server Port : " + leader.getIpAddress());
ManagedChannel channel = channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
NettyChannelBuilder.forAddress(dn.getIpAddress(), port).usePlaintext() .usePlaintext()
.maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
.build(); .build();
XceiverClientProtocolServiceStub asyncStub = asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
XceiverClientProtocolServiceGrpc.newStub(channel);
asyncStubs.put(dn.getUuid(), asyncStub);
channels.put(dn.getUuid(), channel);
} }
/** /**
* Returns if the xceiver client connects to all servers in the pipeline. * Returns if the xceiver client connects to a server.
* *
* @return True if the connection is alive, false otherwise. * @return True if the connection is alive, false otherwise.
*/ */
@VisibleForTesting @VisibleForTesting
public boolean isConnected(DatanodeDetails details) { public boolean isConnected() {
return isConnected(channels.get(details.getUuid())); return !channel.isTerminated() && !channel.isShutdown();
}
private boolean isConnected(ManagedChannel channel) {
return channel != null && !channel.isTerminated() && !channel.isShutdown();
} }
@Override @Override
public void close() { public void close() {
closed = true; closed = true;
for (ManagedChannel channel : channels.values()) { channel.shutdownNow();
channel.shutdownNow(); try {
try { channel.awaitTermination(60, TimeUnit.MINUTES);
channel.awaitTermination(60, TimeUnit.MINUTES); } catch (Exception e) {
} catch (Exception e) { LOG.error("Unexpected exception while waiting for channel termination",
LOG.error("Unexpected exception while waiting for channel termination", e);
e);
}
} }
} }
@ -140,51 +120,6 @@ public class XceiverClientGrpc extends XceiverClientSpi {
return pipeline; return pipeline;
} }
@Override
public ContainerCommandResponseProto sendCommand(
ContainerCommandRequestProto request) throws IOException {
int size = pipeline.getMachines().size();
ContainerCommandResponseProto responseProto = null;
int dnIndex = 0;
// In case of an exception or an error, we will try to read from the
// datanodes in the pipeline in a round robin fashion.
// TODO: cache the correct leader info in here, so that any subsequent calls
// should first go to leader
for (DatanodeDetails dn : pipeline.getMachines()) {
try {
// In case the command gets retried on a 2nd datanode,
// sendCommandAsyncCall will create a new channel and async stub
// in case these don't exist for the specific datanode.
responseProto =
sendCommandAsync(request, dn).get();
dnIndex++;
if (responseProto.getResult() == ContainerProtos.Result.SUCCESS
|| dnIndex == size) {
return responseProto;
}
} catch (ExecutionException | InterruptedException e) {
if (dnIndex < size) {
LOG.warn(
"Failed to execute command " + request + " on datanode " + dn
.getUuidString() +". Retrying", e);
} else {
throw new IOException("Failed to execute command " + request, e);
}
}
}
return responseProto;
}
// TODO: for a true async API, once the waitable future while executing
// the command on one channel fails, it should be retried asynchronously
// on the future Task for all the remaining datanodes.
// Note: this Async api is not used currently used in any active I/O path.
// In case it gets used, the asynchronous retry logic needs to be plugged
// in here.
/** /**
* Sends a given command to server gets a waitable future back. * Sends a given command to server gets a waitable future back.
* *
@ -193,25 +128,15 @@ public class XceiverClientGrpc extends XceiverClientSpi {
* @throws IOException * @throws IOException
*/ */
@Override @Override
public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( public CompletableFuture<ContainerCommandResponseProto>
ContainerCommandRequestProto request) sendCommandAsync(ContainerCommandRequestProto request)
throws IOException, ExecutionException, InterruptedException { throws IOException, ExecutionException, InterruptedException {
return sendCommandAsync(request, pipeline.getLeader()); if(closed){
}
private CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
ContainerCommandRequestProto request, DatanodeDetails dn)
throws IOException, ExecutionException, InterruptedException {
if (closed) {
throw new IOException("This channel is not connected."); throw new IOException("This channel is not connected.");
} }
UUID dnId = dn.getUuid(); if(channel == null || !isConnected()) {
ManagedChannel channel = channels.get(dnId); reconnect();
// If the channel doesn't exist for this specific datanode or the channel
// is closed, just reconnect
if (!isConnected(channel)) {
reconnect(dn);
} }
final CompletableFuture<ContainerCommandResponseProto> replyFuture = final CompletableFuture<ContainerCommandResponseProto> replyFuture =
@ -220,54 +145,48 @@ public class XceiverClientGrpc extends XceiverClientSpi {
long requestTime = Time.monotonicNowNanos(); long requestTime = Time.monotonicNowNanos();
metrics.incrPendingContainerOpsMetrics(request.getCmdType()); metrics.incrPendingContainerOpsMetrics(request.getCmdType());
// create a new grpc stream for each non-async call. // create a new grpc stream for each non-async call.
// TODO: for async calls, we should reuse StreamObserver resources.
final StreamObserver<ContainerCommandRequestProto> requestObserver = final StreamObserver<ContainerCommandRequestProto> requestObserver =
asyncStubs.get(dnId) asyncStub.send(new StreamObserver<ContainerCommandResponseProto>() {
.send(new StreamObserver<ContainerCommandResponseProto>() { @Override
@Override public void onNext(ContainerCommandResponseProto value) {
public void onNext(ContainerCommandResponseProto value) { replyFuture.complete(value);
replyFuture.complete(value); metrics.decrPendingContainerOpsMetrics(request.getCmdType());
metrics.decrPendingContainerOpsMetrics(request.getCmdType()); metrics.addContainerOpsLatency(request.getCmdType(),
metrics.addContainerOpsLatency(request.getCmdType(), Time.monotonicNowNanos() - requestTime);
Time.monotonicNowNanos() - requestTime); semaphore.release();
semaphore.release(); }
} @Override
public void onError(Throwable t) {
replyFuture.completeExceptionally(t);
metrics.decrPendingContainerOpsMetrics(request.getCmdType());
metrics.addContainerOpsLatency(request.getCmdType(),
Time.monotonicNowNanos() - requestTime);
semaphore.release();
}
@Override @Override
public void onError(Throwable t) { public void onCompleted() {
replyFuture.completeExceptionally(t); if (!replyFuture.isDone()) {
metrics.decrPendingContainerOpsMetrics(request.getCmdType()); replyFuture.completeExceptionally(
metrics.addContainerOpsLatency(request.getCmdType(), new IOException("Stream completed but no reply for request "
Time.monotonicNowNanos() - requestTime); + request));
semaphore.release(); }
} }
});
@Override
public void onCompleted() {
if (!replyFuture.isDone()) {
replyFuture.completeExceptionally(new IOException(
"Stream completed but no reply for request " + request));
}
}
});
requestObserver.onNext(request); requestObserver.onNext(request);
requestObserver.onCompleted(); requestObserver.onCompleted();
return replyFuture; return replyFuture;
} }
private void reconnect(DatanodeDetails dn) private void reconnect() throws IOException {
throws IOException {
ManagedChannel channel;
try { try {
connectToDatanode(dn); connect();
channel = channels.get(dn.getUuid());
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error while connecting: ", e); LOG.error("Error while connecting: ", e);
throw new IOException(e); throw new IOException(e);
} }
if (channel == null || !isConnected(channel)) { if (channel == null || !isConnected()) {
throw new IOException("This channel is not connected."); throw new IOException("This channel is not connected.");
} }
} }

View File

@ -27,6 +27,7 @@ import com.google.common.cache.RemovalNotification;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -58,7 +59,7 @@ public class XceiverClientManager implements Closeable {
//TODO : change this to SCM configuration class //TODO : change this to SCM configuration class
private final Configuration conf; private final Configuration conf;
private final Cache<String, XceiverClientSpi> clientCache; private final Cache<PipelineID, XceiverClientSpi> clientCache;
private final boolean useRatis; private final boolean useRatis;
private static XceiverClientMetrics metrics; private static XceiverClientMetrics metrics;
@ -82,10 +83,10 @@ public class XceiverClientManager implements Closeable {
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS) .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
.maximumSize(maxSize) .maximumSize(maxSize)
.removalListener( .removalListener(
new RemovalListener<String, XceiverClientSpi>() { new RemovalListener<PipelineID, XceiverClientSpi>() {
@Override @Override
public void onRemoval( public void onRemoval(
RemovalNotification<String, XceiverClientSpi> RemovalNotification<PipelineID, XceiverClientSpi>
removalNotification) { removalNotification) {
synchronized (clientCache) { synchronized (clientCache) {
// Mark the entry as evicted // Mark the entry as evicted
@ -97,7 +98,7 @@ public class XceiverClientManager implements Closeable {
} }
@VisibleForTesting @VisibleForTesting
public Cache<String, XceiverClientSpi> getClientCache() { public Cache<PipelineID, XceiverClientSpi> getClientCache() {
return clientCache; return clientCache;
} }
@ -139,14 +140,13 @@ public class XceiverClientManager implements Closeable {
private XceiverClientSpi getClient(Pipeline pipeline) private XceiverClientSpi getClient(Pipeline pipeline)
throws IOException { throws IOException {
HddsProtos.ReplicationType type = pipeline.getType();
try { try {
return clientCache.get(pipeline.getId().getId().toString() + type, return clientCache.get(pipeline.getId(),
new Callable<XceiverClientSpi>() { new Callable<XceiverClientSpi>() {
@Override @Override
public XceiverClientSpi call() throws Exception { public XceiverClientSpi call() throws Exception {
XceiverClientSpi client = null; XceiverClientSpi client = null;
switch (type) { switch (pipeline.getType()) {
case RATIS: case RATIS:
client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf); client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
break; break;

View File

@ -18,11 +18,9 @@
package org.apache.hadoop.hdds.scm; package org.apache.hadoop.hdds.scm;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.thirdparty.com.google.protobuf import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException; .InvalidProtocolBufferException;
@ -54,7 +52,6 @@ import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@ -212,11 +209,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
getClient().sendAsync(() -> byteString); getClient().sendAsync(() -> byteString);
} }
@VisibleForTesting
public void watchForCommit(long index, long timeout) throws Exception {
getClient().sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED)
.get(timeout, TimeUnit.MILLISECONDS);
}
/** /**
* Sends a given command to server gets a waitable future back. * Sends a given command to server gets a waitable future back.
* *

View File

@ -299,10 +299,6 @@ public class Pipeline {
return b.toString(); return b.toString();
} }
public void setType(HddsProtos.ReplicationType type) {
this.type = type;
}
/** /**
* Returns a JSON string of this object. * Returns a JSON string of this object.
* *

View File

@ -81,17 +81,14 @@ public final class ContainerProtocolCalls {
* @param xceiverClient client to perform call * @param xceiverClient client to perform call
* @param datanodeBlockID blockID to identify container * @param datanodeBlockID blockID to identify container
* @param traceID container protocol call args * @param traceID container protocol call args
* @param blockCommitSequenceId latest commit Id of the block
* @return container protocol get block response * @return container protocol get block response
* @throws IOException if there is an I/O error while performing the call * @throws IOException if there is an I/O error while performing the call
*/ */
public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
DatanodeBlockID datanodeBlockID, String traceID, DatanodeBlockID datanodeBlockID, String traceID) throws IOException {
long blockCommitSequenceId) throws IOException {
GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto
.newBuilder() .newBuilder()
.setBlockID(datanodeBlockID) .setBlockID(datanodeBlockID);
.setBlockCommitSequenceId(blockCommitSequenceId);
String id = xceiverClient.getPipeline().getLeader().getUuidString(); String id = xceiverClient.getPipeline().getLeader().getUuidString();
ContainerCommandRequestProto request = ContainerCommandRequestProto ContainerCommandRequestProto request = ContainerCommandRequestProto
@ -391,9 +388,7 @@ public final class ContainerProtocolCalls {
BlockID blockID, String traceID) throws IOException { BlockID blockID, String traceID) throws IOException {
GetBlockRequestProto.Builder getBlock = GetBlockRequestProto GetBlockRequestProto.Builder getBlock = GetBlockRequestProto
.newBuilder() .newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf()) .setBlockID(blockID.getDatanodeBlockIDProtobuf());
// by default, set the bcsId to be 0
.setBlockCommitSequenceId(0);
ContainerProtos.GetSmallFileRequestProto getSmallFileRequest = ContainerProtos.GetSmallFileRequestProto getSmallFileRequest =
GetSmallFileRequestProto GetSmallFileRequestProto
.newBuilder().setBlock(getBlock) .newBuilder().setBlock(getBlock)

View File

@ -140,8 +140,6 @@ enum Result {
UNKNOWN_CONTAINER_TYPE = 34; UNKNOWN_CONTAINER_TYPE = 34;
BLOCK_NOT_COMMITTED = 35; BLOCK_NOT_COMMITTED = 35;
CONTAINER_UNHEALTHY = 36; CONTAINER_UNHEALTHY = 36;
UNKNOWN_BCSID = 37;
BCSID_MISMATCH = 38;
} }
/** /**
@ -317,7 +315,6 @@ message PutBlockResponseProto {
message GetBlockRequestProto { message GetBlockRequestProto {
required DatanodeBlockID blockID = 1; required DatanodeBlockID blockID = 1;
required uint64 blockCommitSequenceId = 2;
} }
message GetBlockResponseProto { message GetBlockResponseProto {

View File

@ -483,8 +483,7 @@ public class KeyValueHandler extends Handler {
try { try {
BlockID blockID = BlockID.getFromProtobuf( BlockID blockID = BlockID.getFromProtobuf(
request.getGetBlock().getBlockID()); request.getGetBlock().getBlockID());
responseData = blockManager.getBlock(kvContainer, blockID, responseData = blockManager.getBlock(kvContainer, blockID);
request.getGetBlock().getBlockCommitSequenceId());
long numBytes = responseData.getProtoBufMessage().toByteArray().length; long numBytes = responseData.getProtoBufMessage().toByteArray().length;
metrics.incContainerBytesStats(Type.GetBlock, numBytes); metrics.incContainerBytesStats(Type.GetBlock, numBytes);
@ -756,8 +755,7 @@ public class KeyValueHandler extends Handler {
try { try {
BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock() BlockID blockID = BlockID.getFromProtobuf(getSmallFileReq.getBlock()
.getBlockID()); .getBlockID());
BlockData responseData = blockManager.getBlock(kvContainer, blockID, BlockData responseData = blockManager.getBlock(kvContainer, blockID);
getSmallFileReq.getBlock().getBlockCommitSequenceId());
ContainerProtos.ChunkInfo chunkInfo = null; ContainerProtos.ChunkInfo chunkInfo = null;
ByteString dataBuf = ByteString.EMPTY; ByteString dataBuf = ByteString.EMPTY;

View File

@ -45,8 +45,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
/** /**
* This class is for performing block related operations on the KeyValue * This class is for performing block related operations on the KeyValue
* Container. * Container.
@ -69,12 +68,6 @@ public class BlockManagerImpl implements BlockManager {
this.config = conf; this.config = conf;
} }
private long getBlockCommitSequenceId(MetadataStore db)
throws IOException {
byte[] bscId = db.get(blockCommitSequenceIdKey);
return bscId == null ? 0 : Longs.fromByteArray(bscId);
}
/** /**
* Puts or overwrites a block. * Puts or overwrites a block.
* *
@ -98,19 +91,21 @@ public class BlockManagerImpl implements BlockManager {
Preconditions.checkNotNull(db, "DB cannot be null here"); Preconditions.checkNotNull(db, "DB cannot be null here");
long blockCommitSequenceId = data.getBlockCommitSequenceId(); long blockCommitSequenceId = data.getBlockCommitSequenceId();
long blockCommitSequenceIdValue = getBlockCommitSequenceId(db); byte[] blockCommitSequenceIdValue = db.get(blockCommitSequenceIdKey);
// default blockCommitSequenceId for any block is 0. It the putBlock // default blockCommitSequenceId for any block is 0. It the putBlock
// request is not coming via Ratis(for test scenarios), it will be 0. // request is not coming via Ratis(for test scenarios), it will be 0.
// In such cases, we should overwrite the block as well // In such cases, we should overwrite the block as well
if (blockCommitSequenceId != 0) { if (blockCommitSequenceIdValue != null && blockCommitSequenceId != 0) {
if (blockCommitSequenceId <= blockCommitSequenceIdValue) { if (blockCommitSequenceId <= Longs
.fromByteArray(blockCommitSequenceIdValue)) {
// Since the blockCommitSequenceId stored in the db is greater than // Since the blockCommitSequenceId stored in the db is greater than
// equal to blockCommitSequenceId to be updated, it means the putBlock // equal to blockCommitSequenceId to be updated, it means the putBlock
// transaction is reapplied in the ContainerStateMachine on restart. // transaction is reapplied in the ContainerStateMachine on restart.
// It also implies that the given block must already exist in the db. // It also implies that the given block must already exist in the db.
// just log and return // just log and return
LOG.warn("blockCommitSequenceId " + blockCommitSequenceIdValue LOG.warn("blockCommitSequenceId " + Longs
.fromByteArray(blockCommitSequenceIdValue)
+ " in the Container Db is greater than" + " the supplied value " + " in the Container Db is greater than" + " the supplied value "
+ blockCommitSequenceId + " .Ignoring it"); + blockCommitSequenceId + " .Ignoring it");
return data.getSize(); return data.getSize();
@ -134,12 +129,10 @@ public class BlockManagerImpl implements BlockManager {
* *
* @param container - Container from which block need to be fetched. * @param container - Container from which block need to be fetched.
* @param blockID - BlockID of the block. * @param blockID - BlockID of the block.
* @param bcsId latest commit Id of the block
* @return Key Data. * @return Key Data.
* @throws IOException * @throws IOException
*/ */
@Override public BlockData getBlock(Container container, BlockID blockID)
public BlockData getBlock(Container container, BlockID blockID, long bcsId)
throws IOException { throws IOException {
Preconditions.checkNotNull(blockID, Preconditions.checkNotNull(blockID,
"BlockID cannot be null in GetBlock request"); "BlockID cannot be null in GetBlock request");
@ -152,14 +145,6 @@ public class BlockManagerImpl implements BlockManager {
// This is a post condition that acts as a hint to the user. // This is a post condition that acts as a hint to the user.
// Should never fail. // Should never fail.
Preconditions.checkNotNull(db, "DB cannot be null here"); Preconditions.checkNotNull(db, "DB cannot be null here");
long containerBCSId = getBlockCommitSequenceId(db);
if (containerBCSId < bcsId) {
throw new StorageContainerException(
"Unable to find the block with bcsID " + bcsId + " .Container "
+ container.getContainerData().getContainerID() + " bcsId is "
+ containerBCSId + ".", UNKNOWN_BCSID);
}
byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID())); byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
if (kData == null) { if (kData == null) {
throw new StorageContainerException("Unable to find the block.", throw new StorageContainerException("Unable to find the block.",
@ -167,12 +152,6 @@ public class BlockManagerImpl implements BlockManager {
} }
ContainerProtos.BlockData blockData = ContainerProtos.BlockData blockData =
ContainerProtos.BlockData.parseFrom(kData); ContainerProtos.BlockData.parseFrom(kData);
long id = blockData.getBlockCommitSequenceId();
if (id != bcsId) {
throw new StorageContainerException(
"bcsId " + bcsId + " mismatches with existing block Id "
+ id + " for block " + blockID + ".", BCSID_MISMATCH);
}
return BlockData.getFromProtoBuf(blockData); return BlockData.getFromProtoBuf(blockData);
} }

View File

@ -45,12 +45,10 @@ public interface BlockManager {
* *
* @param container - Container from which block need to be get. * @param container - Container from which block need to be get.
* @param blockID - BlockID of the Block. * @param blockID - BlockID of the Block.
* @param bcsId latest commit id of the block
* @return Block Data. * @return Block Data.
* @throws IOException * @throws IOException
*/ */
BlockData getBlock(Container container, BlockID blockID, long bcsId) BlockData getBlock(Container container, BlockID blockID) throws IOException;
throws IOException;
/** /**
* Deletes an existing block. * Deletes an existing block.

View File

@ -113,7 +113,7 @@ public class TestBlockManagerImpl {
assertEquals(1, keyValueContainer.getContainerData().getKeyCount()); assertEquals(1, keyValueContainer.getContainerData().getKeyCount());
//Get Block //Get Block
BlockData fromGetBlockData = blockManager.getBlock(keyValueContainer, BlockData fromGetBlockData = blockManager.getBlock(keyValueContainer,
blockData.getBlockID(), 0); blockData.getBlockID());
assertEquals(blockData.getContainerID(), fromGetBlockData.getContainerID()); assertEquals(blockData.getContainerID(), fromGetBlockData.getContainerID());
assertEquals(blockData.getLocalID(), fromGetBlockData.getLocalID()); assertEquals(blockData.getLocalID(), fromGetBlockData.getLocalID());
@ -139,7 +139,7 @@ public class TestBlockManagerImpl {
assertEquals(0, assertEquals(0,
keyValueContainer.getContainerData().getKeyCount()); keyValueContainer.getContainerData().getKeyCount());
try { try {
blockManager.getBlock(keyValueContainer, blockID, 0); blockManager.getBlock(keyValueContainer, blockID);
fail("testDeleteBlock"); fail("testDeleteBlock");
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
@ -197,7 +197,7 @@ public class TestBlockManagerImpl {
keyValueContainer.getContainerData().getKeyCount()); keyValueContainer.getContainerData().getKeyCount());
try { try {
//Since the block has been deleted, we should not be able to find it //Since the block has been deleted, we should not be able to find it
blockManager.getBlock(keyValueContainer, blockID, 0); blockManager.getBlock(keyValueContainer, blockID);
fail("testGetNoSuchBlock failed"); fail("testGetNoSuchBlock failed");
} catch (StorageContainerException ex) { } catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(

View File

@ -22,9 +22,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientManager;
@ -278,13 +276,8 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
long containerID = blockID.getContainerID(); long containerID = blockID.getContainerID();
ContainerWithPipeline containerWithPipeline = ContainerWithPipeline containerWithPipeline =
storageContainerLocationClient.getContainerWithPipeline(containerID); storageContainerLocationClient.getContainerWithPipeline(containerID);
Pipeline pipeline = containerWithPipeline.getPipeline();
// irrespective of the container state, we will always read via Standalone
// protocol.
pipeline.setType(HddsProtos.ReplicationType.STAND_ALONE);
XceiverClientSpi xceiverClient = xceiverClientManager XceiverClientSpi xceiverClient = xceiverClientManager
.acquireClient(pipeline); .acquireClient(containerWithPipeline.getPipeline());
boolean success = false; boolean success = false;
containerKey = omKeyLocationInfo.getLocalID(); containerKey = omKeyLocationInfo.getLocalID();
try { try {
@ -294,8 +287,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
ContainerProtos.DatanodeBlockID datanodeBlockID = blockID ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf(); .getDatanodeBlockIDProtobuf();
ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID, requestId, .getBlock(xceiverClient, datanodeBlockID, requestId);
omKeyLocationInfo.getBlockCommitSequenceId());
List<ContainerProtos.ChunkInfo> chunks = List<ContainerProtos.ChunkInfo> chunks =
response.getBlockData().getChunksList(); response.getBlockData().getChunksList();
for (ContainerProtos.ChunkInfo chunk : chunks) { for (ContainerProtos.ChunkInfo chunk : chunks) {

View File

@ -116,10 +116,6 @@ public class ChunkGroupOutputStream extends OutputStream {
public List<ChunkOutputStreamEntry> getStreamEntries() { public List<ChunkOutputStreamEntry> getStreamEntries() {
return streamEntries; return streamEntries;
} }
@VisibleForTesting
public XceiverClientManager getXceiverClientManager() {
return xceiverClientManager;
}
public List<OmKeyLocationInfo> getLocationInfoList() throws IOException { public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>(); List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();

View File

@ -102,7 +102,7 @@ public class TestMiniOzoneCluster {
// Verify client is able to connect to the container // Verify client is able to connect to the container
try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){ try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){
client.connect(); client.connect();
assertTrue(client.isConnected(pipeline.getLeader())); assertTrue(client.isConnected());
} }
} }
} }

View File

@ -25,11 +25,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.*; import org.apache.hadoop.ozone.*;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@ -37,7 +32,6 @@ import org.apache.hadoop.ozone.client.*;
import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@ -603,106 +597,6 @@ public class TestOzoneRpcClient {
} }
} }
@Test
public void testPutKeyAndGetKeyThreeNodes()
throws Exception {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
long currentTime = Time.now();
String value = "sample value";
store.createVolume(volumeName);
OzoneVolume volume = store.getVolume(volumeName);
volume.createBucket(bucketName);
OzoneBucket bucket = volume.getBucket(bucketName);
String keyName = UUID.randomUUID().toString();
OzoneOutputStream out = bucket
.createKey(keyName, value.getBytes().length, ReplicationType.RATIS,
ReplicationFactor.THREE);
ChunkGroupOutputStream groupOutputStream =
(ChunkGroupOutputStream) out.getOutputStream();
XceiverClientManager manager = groupOutputStream.getXceiverClientManager();
out.write(value.getBytes());
out.close();
// First, confirm the key info from the client matches the info in OM.
OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
builder.setVolumeName(volumeName).setBucketName(bucketName)
.setKeyName(keyName);
OmKeyLocationInfo keyInfo = ozoneManager.lookupKey(builder.build()).
getKeyLocationVersions().get(0).getBlocksLatestVersionOnly().get(0);
long containerID = keyInfo.getContainerID();
long localID = keyInfo.getLocalID();
OzoneKeyDetails keyDetails = (OzoneKeyDetails) bucket.getKey(keyName);
Assert.assertEquals(keyName, keyDetails.getName());
List<OzoneKeyLocation> keyLocations = keyDetails.getOzoneKeyLocations();
Assert.assertEquals(1, keyLocations.size());
Assert.assertEquals(containerID, keyLocations.get(0).getContainerID());
Assert.assertEquals(localID, keyLocations.get(0).getLocalID());
// Make sure that the data size matched.
Assert
.assertEquals(value.getBytes().length, keyLocations.get(0).getLength());
ContainerWithPipeline container =
cluster.getStorageContainerManager().getContainerManager()
.getContainerWithPipeline(new ContainerID(containerID));
Pipeline pipeline = container.getPipeline();
List<DatanodeDetails> datanodes = pipeline.getMachines();
DatanodeDetails datanodeDetails = datanodes.get(0);
Assert.assertNotNull(datanodeDetails);
XceiverClientSpi clientSpi = manager.acquireClient(pipeline);
Assert.assertTrue(clientSpi instanceof XceiverClientRatis);
XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi;
ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId(), 5000);
// shutdown the datanode
cluster.shutdownHddsDatanode(datanodeDetails);
Assert.assertTrue(container.getContainerInfo().getState()
== HddsProtos.LifeCycleState.OPEN);
// try to read, this shouls be successful
readKey(bucket, keyName, value);
Assert.assertTrue(container.getContainerInfo().getState()
== HddsProtos.LifeCycleState.OPEN);
// shutdown the second datanode
datanodeDetails = datanodes.get(1);
cluster.shutdownHddsDatanode(datanodeDetails);
Assert.assertTrue(container.getContainerInfo().getState()
== HddsProtos.LifeCycleState.OPEN);
// the container is open and with loss of 2 nodes we still should be able
// to read via Standalone protocol
// try to read
readKey(bucket, keyName, value);
// shutdown the 3rd datanode
datanodeDetails = datanodes.get(2);
cluster.shutdownHddsDatanode(datanodeDetails);
try {
// try to read
readKey(bucket, keyName, value);
Assert.fail("Expected exception not thrown");
} catch (Exception e) {
}
manager.releaseClient(clientSpi);
}
private void readKey(OzoneBucket bucket, String keyName, String data)
throws IOException {
OzoneKey key = bucket.getKey(keyName);
Assert.assertEquals(keyName, key.getName());
OzoneInputStream is = bucket.readKey(keyName);
byte[] fileContent = new byte[data.getBytes().length];
is.read(fileContent);
is.close();
}
@Test @Test
public void testGetKeyDetails() throws IOException, OzoneException { public void testGetKeyDetails() throws IOException, OzoneException {
String volumeName = UUID.randomUUID().toString(); String volumeName = UUID.randomUUID().toString();

View File

@ -468,7 +468,6 @@ public final class ContainerTestHelper {
ContainerProtos.GetBlockRequestProto.Builder getRequest = ContainerProtos.GetBlockRequestProto.Builder getRequest =
ContainerProtos.GetBlockRequestProto.newBuilder(); ContainerProtos.GetBlockRequestProto.newBuilder();
getRequest.setBlockID(blockID); getRequest.setBlockID(blockID);
getRequest.setBlockCommitSequenceId(0);
ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder(); ContainerCommandRequestProto.newBuilder();

View File

@ -151,7 +151,7 @@ public class TestContainerReplication {
.getHandler(ContainerType.KeyValueContainer); .getHandler(ContainerType.KeyValueContainer);
BlockData key = handler.getBlockManager() BlockData key = handler.getBlockManager()
.getBlock(container, BlockID.getFromProtobuf(blockID), 0); .getBlock(container, BlockID.getFromProtobuf(blockID));
Assert.assertNotNull(key); Assert.assertNotNull(key);
Assert.assertEquals(1, key.getChunks().size()); Assert.assertEquals(1, key.getChunks().size());

View File

@ -256,6 +256,6 @@ public class TestCloseContainerHandler {
openContainerBlockMap.getBlockDataMap(testContainerID)); openContainerBlockMap.getBlockDataMap(testContainerID));
// Make sure the key got committed // Make sure the key got committed
Assert.assertNotNull(handler.getBlockManager() Assert.assertNotNull(handler.getBlockManager()
.getBlock(container, blockID, 0)); .getBlock(container, blockID));
} }
} }

View File

@ -556,7 +556,7 @@ public class TestContainerPersistence {
blockData.setChunks(chunkList); blockData.setChunks(chunkList);
blockManager.putBlock(container, blockData); blockManager.putBlock(container, blockData);
BlockData readBlockData = blockManager. BlockData readBlockData = blockManager.
getBlock(container, blockData.getBlockID(), 0); getBlock(container, blockData.getBlockID());
ChunkInfo readChunk = ChunkInfo readChunk =
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0)); ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
Assert.assertEquals(info.getChecksum(), readChunk.getChecksum()); Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
@ -608,7 +608,7 @@ public class TestContainerPersistence {
blockData.setChunks(chunkProtoList); blockData.setChunks(chunkProtoList);
blockManager.putBlock(container, blockData); blockManager.putBlock(container, blockData);
BlockData readBlockData = blockManager. BlockData readBlockData = blockManager.
getBlock(container, blockData.getBlockID(), 0); getBlock(container, blockData.getBlockID());
ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1); ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1);
ChunkInfo readChunk = ChunkInfo readChunk =
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData
@ -636,7 +636,7 @@ public class TestContainerPersistence {
blockManager.deleteBlock(container, blockID); blockManager.deleteBlock(container, blockID);
exception.expect(StorageContainerException.class); exception.expect(StorageContainerException.class);
exception.expectMessage("Unable to find the block."); exception.expectMessage("Unable to find the block.");
blockManager.getBlock(container, blockData.getBlockID(), 0); blockManager.getBlock(container, blockData.getBlockID());
} }
/** /**

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.scm;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@ -106,7 +107,7 @@ public class TestXceiverClientManager {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
XceiverClientManager clientManager = new XceiverClientManager(conf); XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<String, XceiverClientSpi> cache = Cache<PipelineID, XceiverClientSpi> cache =
clientManager.getClientCache(); clientManager.getClientCache();
ContainerWithPipeline container1 = ContainerWithPipeline container1 =
@ -129,9 +130,8 @@ public class TestXceiverClientManager {
Assert.assertNotEquals(client1, client2); Assert.assertNotEquals(client1, client2);
// least recent container (i.e containerName1) is evicted // least recent container (i.e containerName1) is evicted
XceiverClientSpi nonExistent1 = cache.getIfPresent( XceiverClientSpi nonExistent1 = cache
container1.getContainerInfo().getPipelineID().getId().toString() .getIfPresent(container1.getContainerInfo().getPipelineID());
+ container1.getContainerInfo().getReplicationType());
Assert.assertEquals(null, nonExistent1); Assert.assertEquals(null, nonExistent1);
// However container call should succeed because of refcount on the client. // However container call should succeed because of refcount on the client.
String traceID1 = "trace" + RandomStringUtils.randomNumeric(4); String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
@ -160,7 +160,7 @@ public class TestXceiverClientManager {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1); conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
XceiverClientManager clientManager = new XceiverClientManager(conf); XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<String, XceiverClientSpi> cache = Cache<PipelineID, XceiverClientSpi> cache =
clientManager.getClientCache(); clientManager.getClientCache();
ContainerWithPipeline container1 = ContainerWithPipeline container1 =
@ -183,9 +183,8 @@ public class TestXceiverClientManager {
Assert.assertNotEquals(client1, client2); Assert.assertNotEquals(client1, client2);
// now client 1 should be evicted // now client 1 should be evicted
XceiverClientSpi nonExistent = cache.getIfPresent( XceiverClientSpi nonExistent = cache
container1.getContainerInfo().getPipelineID().getId().toString() .getIfPresent(container1.getContainerInfo().getPipelineID());
+ container1.getContainerInfo().getReplicationType());
Assert.assertEquals(null, nonExistent); Assert.assertEquals(null, nonExistent);
// Any container operation should now fail // Any container operation should now fail

View File

@ -699,8 +699,8 @@ public class TestKeys {
.KeyValueContainer); .KeyValueContainer);
KeyValueContainer container = (KeyValueContainer) cm.getContainerSet() KeyValueContainer container = (KeyValueContainer) cm.getContainerSet()
.getContainer(location.getBlockID().getContainerID()); .getContainer(location.getBlockID().getContainerID());
BlockData blockInfo = keyValueHandler.getBlockManager() BlockData blockInfo = keyValueHandler
.getBlock(container, location.getBlockID(), 0); .getBlockManager().getBlock(container, location.getBlockID());
KeyValueContainerData containerData = KeyValueContainerData containerData =
(KeyValueContainerData) container.getContainerData(); (KeyValueContainerData) container.getContainerData();
File dataDir = new File(containerData.getChunksPath()); File dataDir = new File(containerData.getChunksPath());

View File

@ -199,11 +199,11 @@ public class BucketEndpoint extends EndpointBase {
} catch (IOException ex) { } catch (IOException ex) {
if (ex.getMessage().contains("BUCKET_NOT_EMPTY")) { if (ex.getMessage().contains("BUCKET_NOT_EMPTY")) {
OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable
.BUCKET_NOT_EMPTY, bucketName); .BUCKET_NOT_EMPTY, S3ErrorTable.Resource.BUCKET);
throw os3Exception; throw os3Exception;
} else if (ex.getMessage().contains("BUCKET_NOT_FOUND")) { } else if (ex.getMessage().contains("BUCKET_NOT_FOUND")) {
OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable
.NO_SUCH_BUCKET, bucketName); .NO_SUCH_BUCKET, S3ErrorTable.Resource.BUCKET);
throw os3Exception; throw os3Exception;
} else { } else {
throw ex; throw ex;

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.s3.exception.OS3Exception; import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable.Resource;
import org.apache.hadoop.ozone.s3.header.AuthorizationHeaderV2; import org.apache.hadoop.ozone.s3.header.AuthorizationHeaderV2;
import org.apache.hadoop.ozone.s3.header.AuthorizationHeaderV4; import org.apache.hadoop.ozone.s3.header.AuthorizationHeaderV4;
@ -60,7 +61,7 @@ public class EndpointBase {
LOG.error("Error occurred is {}", ex); LOG.error("Error occurred is {}", ex);
if (ex.getMessage().contains("NOT_FOUND")) { if (ex.getMessage().contains("NOT_FOUND")) {
OS3Exception oex = OS3Exception oex =
S3ErrorTable.newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName); S3ErrorTable.newError(S3ErrorTable.NO_SUCH_BUCKET, Resource.BUCKET);
throw oex; throw oex;
} else { } else {
throw ex; throw ex;
@ -79,7 +80,7 @@ public class EndpointBase {
LOG.error("Error occurred is {}", ex); LOG.error("Error occurred is {}", ex);
if (ex.getMessage().contains("NOT_FOUND")) { if (ex.getMessage().contains("NOT_FOUND")) {
OS3Exception oex = OS3Exception oex =
S3ErrorTable.newError(S3ErrorTable.NO_SUCH_BUCKET, bucketName); S3ErrorTable.newError(S3ErrorTable.NO_SUCH_BUCKET, Resource.BUCKET);
throw oex; throw oex;
} else { } else {
throw ex; throw ex;
@ -186,7 +187,7 @@ public class EndpointBase {
if (auth == null) { if (auth == null) {
throw S3ErrorTable throw S3ErrorTable
.newError(S3ErrorTable.MALFORMED_HEADER, auth); .newError(S3ErrorTable.MALFORMED_HEADER, Resource.HEADER);
} }
String userName; String userName;

View File

@ -150,7 +150,7 @@ public class ObjectEndpoint extends EndpointBase {
} catch (IOException ex) { } catch (IOException ex) {
if (ex.getMessage().contains("NOT_FOUND")) { if (ex.getMessage().contains("NOT_FOUND")) {
OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable
.NO_SUCH_KEY, keyPath); .NO_SUCH_OBJECT, S3ErrorTable.Resource.OBJECT);
throw os3Exception; throw os3Exception;
} else { } else {
throw ex; throw ex;
@ -176,8 +176,9 @@ public class ObjectEndpoint extends EndpointBase {
} catch (IOException ex) { } catch (IOException ex) {
LOG.error("Exception occurred in HeadObject", ex); LOG.error("Exception occurred in HeadObject", ex);
if (ex.getMessage().contains("KEY_NOT_FOUND")) { if (ex.getMessage().contains("KEY_NOT_FOUND")) {
// Just return 404 with no content OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable
return Response.status(Status.NOT_FOUND).build(); .NO_SUCH_OBJECT, S3ErrorTable.Resource.OBJECT);
throw os3Exception;
} else { } else {
throw ex; throw ex;
} }
@ -214,7 +215,7 @@ public class ObjectEndpoint extends EndpointBase {
} catch (IOException ex) { } catch (IOException ex) {
if (ex.getMessage().contains("BUCKET_NOT_FOUND")) { if (ex.getMessage().contains("BUCKET_NOT_FOUND")) {
throw S3ErrorTable.newError(S3ErrorTable throw S3ErrorTable.newError(S3ErrorTable
.NO_SUCH_BUCKET, bucketName); .NO_SUCH_BUCKET, S3ErrorTable.Resource.BUCKET);
} else if (!ex.getMessage().contains("NOT_FOUND")) { } else if (!ex.getMessage().contains("NOT_FOUND")) {
throw ex; throw ex;
} }

View File

@ -45,23 +45,52 @@ public final class S3ErrorTable {
"BucketNotEmpty", "The bucket you tried to delete is not empty.", "BucketNotEmpty", "The bucket you tried to delete is not empty.",
HTTP_CONFLICT); HTTP_CONFLICT);
public static final OS3Exception NO_SUCH_OBJECT = new OS3Exception(
"NoSuchObject", "The specified object does not exist", HTTP_NOT_FOUND);
public static final OS3Exception MALFORMED_HEADER = new OS3Exception( public static final OS3Exception MALFORMED_HEADER = new OS3Exception(
"AuthorizationHeaderMalformed", "The authorization header you provided " + "AuthorizationHeaderMalformed", "The authorization header you provided " +
"is invalid.", HTTP_NOT_FOUND); "is invalid.", HTTP_NOT_FOUND);
public static final OS3Exception NO_SUCH_KEY = new OS3Exception(
"NoSuchObject", "The specified key does not exist", HTTP_NOT_FOUND);
/** /**
* Create a new instance of Error. * Create a new instance of Error.
* @param e Error Template * @param e Error Template
* @param resource Resource associated with this exception * @param resource Resource associated with this exception
* @return creates a new instance of error based on the template * @return creates a new instance of error based on the template
*/ */
public static OS3Exception newError(OS3Exception e, String resource) { public static OS3Exception newError(OS3Exception e, Resource resource) {
OS3Exception err = new OS3Exception(e.getCode(), e.getErrorMessage(), OS3Exception err = new OS3Exception(e.getCode(), e.getErrorMessage(),
e.getHttpCode()); e.getHttpCode());
err.setResource(resource); err.setResource(resource.getResource());
return err; return err;
} }
/**
* Resources, which can be defined in OS3Exception.
*/
public enum Resource {
BUCKET("Bucket"),
OBJECT("Object"),
HEADER("header"),
VOLUME("Volume");
private final String resource;
/**
* Constructs resource.
* @param value
*/
Resource(String value) {
this.resource = value;
}
/**
* Get resource.
* @return string
*/
public String getResource() {
return this.resource;
}
}
} }

View File

@ -52,24 +52,28 @@ public class AuthorizationHeaderV2 {
public void parseHeader() throws OS3Exception { public void parseHeader() throws OS3Exception {
String[] split = authHeader.split(" "); String[] split = authHeader.split(" ");
if (split.length != 2) { if (split.length != 2) {
throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable
.Resource.HEADER);
} }
identifier = split[0]; identifier = split[0];
if (!IDENTIFIER.equals(identifier)) { if (!IDENTIFIER.equals(identifier)) {
throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable
.Resource.HEADER);
} }
String[] remainingSplit = split[1].split(":"); String[] remainingSplit = split[1].split(":");
if (remainingSplit.length != 2) { if (remainingSplit.length != 2) {
throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable
.Resource.HEADER);
} }
accessKeyID = remainingSplit[0]; accessKeyID = remainingSplit[0];
signature = remainingSplit[1]; signature = remainingSplit[1];
if (isBlank(accessKeyID) || isBlank(signature)) { if (isBlank(accessKeyID) || isBlank(signature)) {
throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable
.Resource.HEADER);
} }
} }

View File

@ -64,7 +64,8 @@ public class AuthorizationHeaderV4 {
String[] split = authHeader.split(" "); String[] split = authHeader.split(" ");
if (split.length != 4) { if (split.length != 4) {
throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable
.Resource.HEADER);
} }
algorithm = split[0]; algorithm = split[0];
@ -77,21 +78,24 @@ public class AuthorizationHeaderV4 {
credential = credential.substring(CREDENTIAL.length(), credential credential = credential.substring(CREDENTIAL.length(), credential
.length() - 1); .length() - 1);
} else { } else {
throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable
.Resource.HEADER);
} }
if (signedHeaders.startsWith(SIGNEDHEADERS)) { if (signedHeaders.startsWith(SIGNEDHEADERS)) {
signedHeaders = signedHeaders.substring(SIGNEDHEADERS.length(), signedHeaders = signedHeaders.substring(SIGNEDHEADERS.length(),
signedHeaders.length() - 1); signedHeaders.length() - 1);
} else { } else {
throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable
.Resource.HEADER);
} }
if (signature.startsWith(SIGNATURE)) { if (signature.startsWith(SIGNATURE)) {
signature = signature.substring(SIGNATURE.length(), signature signature = signature.substring(SIGNATURE.length(), signature
.length()); .length());
} else { } else {
throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, authHeader); throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable
.Resource.HEADER);
} }
// Parse credential. Other parts of header are not validated yet. When // Parse credential. Other parts of header are not validated yet. When

View File

@ -63,7 +63,8 @@ public class Credential {
awsService = split[3]; awsService = split[3];
awsRequest = split[4]; awsRequest = split[4];
} else { } else {
throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, credential); throw S3ErrorTable.newError(S3ErrorTable.MALFORMED_HEADER, S3ErrorTable
.Resource.HEADER);
} }
} }

View File

@ -32,7 +32,7 @@ public class TestOS3Exception {
OS3Exception ex = new OS3Exception("AccessDenied", "Access Denied", OS3Exception ex = new OS3Exception("AccessDenied", "Access Denied",
403); 403);
String requestId = OzoneUtils.getRequestID(); String requestId = OzoneUtils.getRequestID();
ex = S3ErrorTable.newError(ex, "bucket"); ex = S3ErrorTable.newError(ex, S3ErrorTable.Resource.BUCKET);
ex.setRequestId(requestId); ex.setRequestId(requestId);
String val = ex.toXml(); String val = ex.toXml();
String formatString = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + String formatString = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +