From c70775aff6113a3bbaa237923fad3c21a73a7793 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Thu, 13 Oct 2016 16:34:29 -0700 Subject: [PATCH] HDFS-11004. Ozone : move Chunk IO and container protocol calls to hdfs-client. Contributed by Chen Liang. --- .../org/apache/hadoop/scm/ScmConfigKeys.java | 3 + .../hadoop/scm}/storage/ChunkInputStream.java | 22 +++-- .../scm}/storage/ChunkOutputStream.java | 59 +++++++------- .../scm}/storage/ContainerProtocolCalls.java | 80 +++++++++---------- .../hadoop/scm/storage/package-info.java | 23 ++++++ .../storage/DistributedStorageHandler.java | 22 ++--- 6 files changed, 115 insertions(+), 94 deletions(-) rename hadoop-hdfs-project/{hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web => hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm}/storage/ChunkInputStream.java (91%) rename hadoop-hdfs-project/{hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web => hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm}/storage/ChunkOutputStream.java (79%) rename hadoop-hdfs-project/{hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web => hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm}/storage/ContainerProtocolCalls.java (72%) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index a1b239328cf..44414ea33e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -29,4 +29,7 @@ public final class ScmConfigKeys { public static final String DFS_CONTAINER_IPC_PORT = "dfs.container.ipc"; public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011; + + // TODO : this is copied from OzoneConsts, may need to move to a better place + public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java similarity index 91% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java index f639b4a94c0..1206ecd2eb7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java @@ -16,9 +16,9 @@ * limitations under the License. */ -package org.apache.hadoop.ozone.web.storage; +package org.apache.hadoop.scm.storage; -import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.*; import java.io.IOException; import java.io.InputStream; @@ -31,23 +31,21 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResp import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.scm.XceiverClient; import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.ozone.web.exceptions.OzoneException; -import org.apache.hadoop.ozone.web.handlers.UserArgs; /** * An {@link InputStream} used by the REST service in combination with the - * {@link DistributedStorageHandler} to read the value of a key from a sequence + * SCMClient to read the value of a key from a sequence * of container chunks. All bytes of the key value are stored in container * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} * instances. This class encapsulates all state management for iterating * through the sequence of chunks and the sequence of buffers within each chunk. */ -class ChunkInputStream extends InputStream { +public class ChunkInputStream extends InputStream { private static final int EOF = -1; private final String key; - private final UserArgs args; + private final String traceID; private XceiverClientManager xceiverClientManager; private XceiverClient xceiverClient; private List chunks; @@ -62,12 +60,12 @@ class ChunkInputStream extends InputStream { * @param xceiverClientManager client manager that controls client * @param xceiverClient client to perform container calls * @param chunks list of chunks to read - * @param args container protocol call args + * @param traceID container protocol call traceID */ public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, - XceiverClient xceiverClient, List chunks, UserArgs args) { + XceiverClient xceiverClient, List chunks, String traceID) { this.key = key; - this.args = args; + this.traceID = traceID; this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; this.chunks = chunks; @@ -182,8 +180,8 @@ class ChunkInputStream extends InputStream { final ReadChunkResponseProto readChunkResponse; try { readChunkResponse = readChunk(xceiverClient, chunks.get(readChunkOffset), - key, args); - } catch (OzoneException e) { + key, traceID); + } catch (IOException e) { throw new IOException("Unexpected OzoneException", e); } chunkOffset = readChunkOffset; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java similarity index 79% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java index 1796a694316..0126e582fa4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java @@ -16,11 +16,10 @@ * limitations under the License. */ -package org.apache.hadoop.ozone.web.storage; +package org.apache.hadoop.scm.storage; -import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE; -import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; -import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk; import java.io.IOException; import java.io.OutputStream; @@ -31,15 +30,14 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue; +import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.XceiverClient; import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.ozone.web.exceptions.OzoneException; -import org.apache.hadoop.ozone.web.handlers.UserArgs; -import org.apache.hadoop.ozone.web.response.KeyInfo; /** * An {@link OutputStream} used by the REST service in combination with the - * {@link DistributedStorageHandler} to write the value of a key to a sequence + * SCMClient to write the value of a key to a sequence * of container chunks. Writes are buffered locally and periodically written to * the container as a new chunk. In order to preserve the semantics that * replacement of a pre-existing key is atomic, each instance of the stream has @@ -53,11 +51,11 @@ import org.apache.hadoop.ozone.web.response.KeyInfo; * This class encapsulates all state management for buffering and writing * through to the container. */ -class ChunkOutputStream extends OutputStream { +public class ChunkOutputStream extends OutputStream { private final String containerKey; - private final KeyInfo key; - private final UserArgs args; + private final String key; + private final String traceID; private final KeyData.Builder containerKeyData; private XceiverClientManager xceiverClientManager; private XceiverClient xceiverClient; @@ -72,19 +70,23 @@ class ChunkOutputStream extends OutputStream { * @param key chunk key * @param xceiverClientManager client manager that controls client * @param xceiverClient client to perform container calls - * @param args container protocol call args + * @param traceID container protocol call args */ - public ChunkOutputStream(String containerKey, KeyInfo key, + public ChunkOutputStream(String containerKey, String key, XceiverClientManager xceiverClientManager, XceiverClient xceiverClient, - UserArgs args) { + String traceID) { this.containerKey = containerKey; this.key = key; - this.args = args; - this.containerKeyData = fromKeyToContainerKeyDataBuilder( - xceiverClient.getPipeline().getContainerName(), containerKey, key); + this.traceID = traceID; + KeyValue keyValue = KeyValue.newBuilder() + .setKey("TYPE").setValue("KEY").build(); + this.containerKeyData = KeyData.newBuilder() + .setContainerName(xceiverClient.getPipeline().getContainerName()) + .setName(containerKey) + .addMetadata(keyValue); this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; - this.buffer = ByteBuffer.allocate(CHUNK_SIZE); + this.buffer = ByteBuffer.allocate(ScmConfigKeys.CHUNK_SIZE); this.streamId = UUID.randomUUID().toString(); this.chunkIndex = 0; } @@ -95,7 +97,7 @@ class ChunkOutputStream extends OutputStream { int rollbackPosition = buffer.position(); int rollbackLimit = buffer.limit(); buffer.put((byte)b); - if (buffer.position() == CHUNK_SIZE) { + if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) { flushBufferToChunk(rollbackPosition, rollbackLimit); } } @@ -114,11 +116,12 @@ class ChunkOutputStream extends OutputStream { } checkOpen(); while (len > 0) { - int writeLen = Math.min(CHUNK_SIZE - buffer.position(), len); + int writeLen = Math.min( + ScmConfigKeys.CHUNK_SIZE - buffer.position(), len); int rollbackPosition = buffer.position(); int rollbackLimit = buffer.limit(); buffer.put(b, off, writeLen); - if (buffer.position() == CHUNK_SIZE) { + if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) { flushBufferToChunk(rollbackPosition, rollbackLimit); } off += writeLen; @@ -144,9 +147,9 @@ class ChunkOutputStream extends OutputStream { if (buffer.position() > 0) { writeChunkToContainer(); } - putKey(xceiverClient, containerKeyData.build(), args); - } catch (OzoneException e) { - throw new IOException("Unexpected OzoneException", e); + putKey(xceiverClient, containerKeyData.build(), traceID); + } catch (IOException e) { + throw new IOException("Unexpected Storage Container Exception", e); } finally { xceiverClientManager.releaseClient(xceiverClient); xceiverClientManager = null; @@ -205,14 +208,14 @@ class ChunkOutputStream extends OutputStream { ChunkInfo chunk = ChunkInfo .newBuilder() .setChunkName( - key.getKeyName() + "_stream_" + streamId + "_chunk_" + ++chunkIndex) + key + "_stream_" + streamId + "_chunk_" + ++chunkIndex) .setOffset(0) .setLen(data.size()) .build(); try { - writeChunk(xceiverClient, chunk, key.getKeyName(), data, args); - } catch (OzoneException e) { - throw new IOException("Unexpected OzoneException", e); + writeChunk(xceiverClient, chunk, key, data, traceID); + } catch (IOException e) { + throw new IOException("Unexpected Storage Container Exception", e); } containerKeyData.addChunks(chunk); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java similarity index 72% rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java rename to hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java index c683a74aa38..166b7412d93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.ozone.web.storage; +package org.apache.hadoop.scm.storage; import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; @@ -37,29 +37,24 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResp import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto; import org.apache.hadoop.scm.XceiverClient; -import org.apache.hadoop.ozone.web.exceptions.ErrorTable; -import org.apache.hadoop.ozone.web.exceptions.OzoneException; -import org.apache.hadoop.ozone.web.handlers.UserArgs; /** * Implementation of all container protocol calls performed by - * {@link DistributedStorageHandler}. + * . */ -final class ContainerProtocolCalls { +public final class ContainerProtocolCalls { /** * Calls the container protocol to get a container key. * * @param xceiverClient client to perform call * @param containerKeyData key data to identify container - * @param args container protocol call args - * @returns container protocol get key response + * @param traceID container protocol call args + * @return container protocol get key response * @throws IOException if there is an I/O error while performing the call - * @throws OzoneException if the container protocol call failed */ public static GetKeyResponseProto getKey(XceiverClient xceiverClient, - KeyData containerKeyData, UserArgs args) throws IOException, - OzoneException { + KeyData containerKeyData, String traceID) throws IOException { GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto .newBuilder() .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) @@ -67,11 +62,11 @@ final class ContainerProtocolCalls { ContainerCommandRequestProto request = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.GetKey) - .setTraceID(args.getRequestID()) + .setTraceID(traceID) .setGetKey(readKeyRequest) .build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response, args); + validateContainerResponse(response, traceID); return response.getGetKey(); } @@ -80,13 +75,11 @@ final class ContainerProtocolCalls { * * @param xceiverClient client to perform call * @param containerKeyData key data to identify container - * @param args container protocol call args + * @param traceID container protocol call args * @throws IOException if there is an I/O error while performing the call - * @throws OzoneException if the container protocol call failed */ public static void putKey(XceiverClient xceiverClient, - KeyData containerKeyData, UserArgs args) throws IOException, - OzoneException { + KeyData containerKeyData, String traceID) throws IOException { PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto .newBuilder() .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) @@ -94,11 +87,11 @@ final class ContainerProtocolCalls { ContainerCommandRequestProto request = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.PutKey) - .setTraceID(args.getRequestID()) + .setTraceID(traceID) .setPutKey(createKeyRequest) .build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response, args); + validateContainerResponse(response, traceID); } /** @@ -107,14 +100,13 @@ final class ContainerProtocolCalls { * @param xceiverClient client to perform call * @param chunk information about chunk to read * @param key the key name - * @param args container protocol call args - * @returns container protocol read chunk response + * @param traceID container protocol call args + * @return container protocol read chunk response * @throws IOException if there is an I/O error while performing the call - * @throws OzoneException if the container protocol call failed */ public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient, - ChunkInfo chunk, String key, UserArgs args) - throws IOException, OzoneException { + ChunkInfo chunk, String key, String traceID) + throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto .newBuilder() .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) @@ -123,11 +115,11 @@ final class ContainerProtocolCalls { ContainerCommandRequestProto request = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.ReadChunk) - .setTraceID(args.getRequestID()) + .setTraceID(traceID) .setReadChunk(readChunkRequest) .build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response, args); + validateContainerResponse(response, traceID); return response.getReadChunk(); } @@ -138,13 +130,12 @@ final class ContainerProtocolCalls { * @param chunk information about chunk to write * @param key the key name * @param data the data of the chunk to write - * @param args container protocol call args + * @param traceID container protocol call args * @throws IOException if there is an I/O error while performing the call - * @throws OzoneException if the container protocol call failed */ public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk, - String key, ByteString data, UserArgs args) - throws IOException, OzoneException { + String key, ByteString data, String traceID) + throws IOException { WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto .newBuilder() .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) @@ -154,11 +145,11 @@ final class ContainerProtocolCalls { ContainerCommandRequestProto request = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.WriteChunk) - .setTraceID(args.getRequestID()) + .setTraceID(traceID) .setWriteChunk(writeChunkRequest) .build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response, args); + validateContainerResponse(response, traceID); } /** @@ -166,27 +157,28 @@ final class ContainerProtocolCalls { * return code is mapped to a corresponding exception and thrown. * * @param response container protocol call response - * @param args container protocol call args - * @throws OzoneException if the container protocol call failed + * @param traceID container protocol call args + * @throws IOException if the container protocol call failed */ private static void validateContainerResponse( - ContainerCommandResponseProto response, UserArgs args) - throws OzoneException { + ContainerCommandResponseProto response, String traceID + ) throws IOException { + // TODO : throw the right type of exception switch (response.getResult()) { case SUCCESS: break; case MALFORMED_REQUEST: - throw ErrorTable.newError(new OzoneException(HTTP_BAD_REQUEST, - "badRequest", "Bad container request."), args); + throw new IOException(HTTP_BAD_REQUEST + + ":Bad container request: " + traceID); case UNSUPPORTED_REQUEST: - throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, - "internalServerError", "Unsupported container request."), args); + throw new IOException(HTTP_INTERNAL_ERROR + + "Unsupported container request: " + traceID); case CONTAINER_INTERNAL_ERROR: - throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, - "internalServerError", "Container internal error."), args); + throw new IOException(HTTP_INTERNAL_ERROR + + "Container internal error:" + traceID); default: - throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, - "internalServerError", "Unrecognized container response."), args); + throw new IOException(HTTP_INTERNAL_ERROR + + "Unrecognized container response:" + traceID); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java new file mode 100644 index 00000000000..aa89af0695f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.scm.storage; + +/** + * This package contains StorageContainerManager classes. + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index 143d0589e1b..e8e58309f11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.web.storage; -import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.*; import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*; import java.io.IOException; @@ -57,6 +57,8 @@ import org.apache.hadoop.ozone.web.response.ListKeys; import org.apache.hadoop.ozone.web.response.ListVolumes; import org.apache.hadoop.ozone.web.response.VolumeInfo; import org.apache.hadoop.ozone.web.response.VolumeOwner; +import org.apache.hadoop.scm.storage.ChunkInputStream; +import org.apache.hadoop.scm.storage.ChunkOutputStream; import org.apache.hadoop.util.StringUtils; /** @@ -95,7 +97,7 @@ public final class DistributedStorageHandler implements StorageHandler { volume.setCreatedBy(args.getAdminName()); KeyData containerKeyData = fromVolumeToContainerKeyData( xceiverClient.getPipeline().getContainerName(), containerKey, volume); - putKey(xceiverClient, containerKeyData, args); + putKey(xceiverClient, containerKeyData, args.getRequestID()); } finally { xceiverClientManager.releaseClient(xceiverClient); } @@ -140,7 +142,7 @@ public final class DistributedStorageHandler implements StorageHandler { KeyData containerKeyData = containerKeyDataForRead( xceiverClient.getPipeline().getContainerName(), containerKey); GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, - args); + args.getRequestID()); return fromContainerKeyValueListToVolume( response.getKeyData().getMetadataList()); } finally { @@ -163,7 +165,7 @@ public final class DistributedStorageHandler implements StorageHandler { bucket.setStorageType(args.getStorageType()); KeyData containerKeyData = fromBucketToContainerKeyData( xceiverClient.getPipeline().getContainerName(), containerKey, bucket); - putKey(xceiverClient, containerKeyData, args); + putKey(xceiverClient, containerKeyData, args.getRequestID()); } finally { xceiverClientManager.releaseClient(xceiverClient); } @@ -218,7 +220,7 @@ public final class DistributedStorageHandler implements StorageHandler { KeyData containerKeyData = containerKeyDataForRead( xceiverClient.getPipeline().getContainerName(), containerKey); GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, - args); + args.getRequestID()); return fromContainerKeyValueListToBucket( response.getKeyData().getMetadataList()); } finally { @@ -235,8 +237,8 @@ public final class DistributedStorageHandler implements StorageHandler { key.setKeyName(args.getKeyName()); key.setCreatedOn(dateToString(new Date())); XceiverClient xceiverClient = acquireXceiverClient(containerKey); - return new ChunkOutputStream(containerKey, key, xceiverClientManager, - xceiverClient, args); + return new ChunkOutputStream(containerKey, key.getKeyName(), + xceiverClientManager, xceiverClient, args.getRequestID()); } @Override @@ -256,7 +258,7 @@ public final class DistributedStorageHandler implements StorageHandler { KeyData containerKeyData = containerKeyDataForRead( xceiverClient.getPipeline().getContainerName(), containerKey); GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, - args); + args.getRequestID()); long length = 0; List chunks = response.getKeyData().getChunksList(); for (ChunkInfo chunk : chunks) { @@ -264,8 +266,8 @@ public final class DistributedStorageHandler implements StorageHandler { } success = true; return new LengthInputStream(new ChunkInputStream( - containerKey, xceiverClientManager, xceiverClient, chunks, args), - length); + containerKey, xceiverClientManager, xceiverClient, + chunks, args.getRequestID()), length); } finally { if (!success) { xceiverClientManager.releaseClient(xceiverClient);