HDDS-1636. Tracing id is not propagated via async datanode grpc call (#895)

This commit is contained in:
Elek, Márton 2019-06-08 05:40:32 +02:00 committed by Xiaoyu Yao
parent 76b94c274f
commit 46b23c11b0
18 changed files with 90 additions and 154 deletions

View File

@ -315,8 +315,13 @@ public class XceiverClientGrpc extends XceiverClientSpi {
try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientGrpc." + request.getCmdType().name())
.startActive(true)) {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
XceiverClientReply asyncReply =
sendCommandAsync(request, pipeline.getFirstNode());
sendCommandAsync(finalPayload, pipeline.getFirstNode());
// TODO : for now make this API sync in nature as async requests are
// served out of order over XceiverClientGrpc. This needs to be fixed
// if this API is to be used for I/O path. Currently, this is not

View File

@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
/**
* This class provides the client-facing APIs of container operations.
@ -113,8 +112,7 @@ public class ContainerOperationClient implements ScmClient {
*/
public void createContainer(XceiverClientSpi client,
long containerId) throws IOException {
String traceID = UUID.randomUUID().toString();
ContainerProtocolCalls.createContainer(client, containerId, traceID, null);
ContainerProtocolCalls.createContainer(client, containerId, null);
// Let us log this info after we let SCM know that we have completed the
// creation state.
@ -257,9 +255,8 @@ public class ContainerOperationClient implements ScmClient {
XceiverClientSpi client = null;
try {
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
ContainerProtocolCalls
.deleteContainer(client, containerId, force, traceID, null);
.deleteContainer(client, containerId, force, null);
storageContainerLocationClient
.deleteContainer(containerId);
if (LOG.isDebugEnabled()) {
@ -307,10 +304,8 @@ public class ContainerOperationClient implements ScmClient {
XceiverClientSpi client = null;
try {
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
ReadContainerResponseProto response =
ContainerProtocolCalls.readContainer(client, containerID, traceID,
null);
ContainerProtocolCalls.readContainer(client, containerID, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Read container {}, machines: {} ", containerID,
pipeline.getNodes());
@ -393,7 +388,6 @@ public class ContainerOperationClient implements ScmClient {
*/
// Actually close the container on Datanode
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
storageContainerLocationClient.notifyObjectStageChange(
ObjectStageChangeRequestProto.Type.container,
@ -401,7 +395,7 @@ public class ContainerOperationClient implements ScmClient {
ObjectStageChangeRequestProto.Op.close,
ObjectStageChangeRequestProto.Stage.begin);
ContainerProtocolCalls.closeContainer(client, containerId, traceID,
ContainerProtocolCalls.closeContainer(client, containerId,
null);
// Notify SCM to close the container
storageContainerLocationClient.notifyObjectStageChange(

View File

@ -60,7 +60,6 @@ public class BlockInputStream extends InputStream implements Seekable {
private Pipeline pipeline;
private final Token<OzoneBlockTokenIdentifier> token;
private final boolean verifyChecksum;
private final String traceID;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private boolean initialized = false;
@ -96,13 +95,12 @@ public class BlockInputStream extends InputStream implements Seekable {
public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
String traceId, XceiverClientManager xceiverClientManager) {
XceiverClientManager xceiverClientManager) {
this.blockID = blockId;
this.length = blockLen;
this.pipeline = pipeline;
this.token = token;
this.verifyChecksum = verifyChecksum;
this.traceID = traceId;
this.xceiverClientManager = xceiverClientManager;
}
@ -166,7 +164,7 @@ public class BlockInputStream extends InputStream implements Seekable {
DatanodeBlockID datanodeBlockID = blockID
.getDatanodeBlockIDProtobuf();
GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, datanodeBlockID, traceID);
.getBlock(xceiverClient, datanodeBlockID);
chunks = response.getBlockData().getChunksList();
success = true;
@ -185,7 +183,7 @@ public class BlockInputStream extends InputStream implements Seekable {
* Datanode only when a read operation is performed on for that chunk.
*/
protected synchronized void addStream(ChunkInfo chunkInfo) {
chunkStreams.add(new ChunkInputStream(chunkInfo, blockID, traceID,
chunkStreams.add(new ChunkInputStream(chunkInfo, blockID,
xceiverClient, verifyChecksum));
}

View File

@ -82,7 +82,7 @@ public class BlockOutputStream extends OutputStream {
private volatile BlockID blockID;
private final String key;
private final String traceID;
private final BlockData.Builder containerBlockData;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
@ -128,7 +128,6 @@ public class BlockOutputStream extends OutputStream {
* @param key chunk key
* @param xceiverClientManager client manager that controls client
* @param pipeline pipeline where block will be written
* @param traceID container protocol call args
* @param chunkSize chunk size
* @param bufferPool pool of buffers
* @param streamBufferFlushSize flush size
@ -140,13 +139,12 @@ public class BlockOutputStream extends OutputStream {
@SuppressWarnings("parameternumber")
public BlockOutputStream(BlockID blockID, String key,
XceiverClientManager xceiverClientManager, Pipeline pipeline,
String traceID, int chunkSize, long streamBufferFlushSize,
int chunkSize, long streamBufferFlushSize,
long streamBufferMaxSize, long watchTimeout, BufferPool bufferPool,
ChecksumType checksumType, int bytesPerChecksum)
throws IOException {
this.blockID = blockID;
this.key = key;
this.traceID = traceID;
this.chunkSize = chunkSize;
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
@ -379,13 +377,12 @@ public class BlockOutputStream extends OutputStream {
List<ByteBuffer> byteBufferList = bufferList;
bufferList = null;
Preconditions.checkNotNull(byteBufferList);
String requestId =
traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID;
CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> flushFuture;
try {
XceiverClientReply asyncReply =
putBlockAsync(xceiverClient, containerBlockData.build(), requestId);
putBlockAsync(xceiverClient, containerBlockData.build());
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
flushFuture = future.thenApplyAsync(e -> {
@ -606,13 +603,10 @@ public class BlockOutputStream extends OutputStream {
.setLen(effectiveChunkSize)
.setChecksumData(checksumData.getProtoBufMessage())
.build();
// generate a unique requestId
String requestId =
traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo
.getChunkName();
try {
XceiverClientReply asyncReply =
writeChunkAsync(xceiverClient, chunkInfo, blockID, data, requestId);
writeChunkAsync(xceiverClient, chunkInfo, blockID, data);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
future.thenApplyAsync(e -> {

View File

@ -49,7 +49,6 @@ public class ChunkInputStream extends InputStream implements Seekable {
private ChunkInfo chunkInfo;
private final long length;
private final BlockID blockID;
private final String traceID;
private XceiverClientSpi xceiverClient;
private boolean verifyChecksum;
private boolean allocated = false;
@ -77,11 +76,10 @@ public class ChunkInputStream extends InputStream implements Seekable {
private static final int EOF = -1;
ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
String traceId, XceiverClientSpi xceiverClient, boolean verifyChecksum) {
XceiverClientSpi xceiverClient, boolean verifyChecksum) {
this.chunkInfo = chunkInfo;
this.length = chunkInfo.getLen();
this.blockID = blockId;
this.traceID = traceId;
this.xceiverClient = xceiverClient;
this.verifyChecksum = verifyChecksum;
}
@ -335,7 +333,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
validators.add(validator);
readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, traceID, validators);
readChunkInfo, blockID, validators);
} catch (IOException e) {
if (e instanceof StorageContainerException) {

View File

@ -63,7 +63,7 @@ public class TestBlockInputStream {
createChunkList(5);
blockStream = new DummyBlockInputStream(blockID, blockSize, null, null,
false, null, null);
false, null);
}
/**
@ -113,10 +113,9 @@ public class TestBlockInputStream {
Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
String traceId,
XceiverClientManager xceiverClientManager) {
super(blockId, blockLen, pipeline, token, verifyChecksum,
traceId, xceiverClientManager);
xceiverClientManager);
}
@Override
@ -128,7 +127,7 @@ public class TestBlockInputStream {
protected void addStream(ChunkInfo chunkInfo) {
TestChunkInputStream testChunkInputStream = new TestChunkInputStream();
getChunkStreams().add(testChunkInputStream.new DummyChunkInputStream(
chunkInfo, null, null, null, false,
chunkInfo, null, null, false,
chunkDataMap.get(chunkInfo.getChunkName()).clone()));
}

View File

@ -66,7 +66,7 @@ public class TestChunkInputStream {
chunkData, 0, CHUNK_SIZE).getProtoBufMessage())
.build();
chunkStream = new DummyChunkInputStream(chunkInfo, null, null, null, true);
chunkStream = new DummyChunkInputStream(chunkInfo, null, null, true);
}
static byte[] generateRandomData(int length) {
@ -85,19 +85,17 @@ public class TestChunkInputStream {
DummyChunkInputStream(ChunkInfo chunkInfo,
BlockID blockId,
String traceId,
XceiverClientSpi xceiverClient,
boolean verifyChecksum) {
super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum);
super(chunkInfo, blockId, xceiverClient, verifyChecksum);
}
public DummyChunkInputStream(ChunkInfo chunkInfo,
BlockID blockId,
String traceId,
XceiverClientSpi xceiverClient,
boolean verifyChecksum,
byte[] data) {
super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum);
super(chunkInfo, blockId, xceiverClient, verifyChecksum);
chunkData = data;
}

View File

@ -92,12 +92,11 @@ public final class ContainerProtocolCalls {
*
* @param xceiverClient client to perform call
* @param datanodeBlockID blockID to identify container
* @param traceID container protocol call args
* @return container protocol get block response
* @throws IOException if there is an I/O error while performing the call
*/
public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
DatanodeBlockID datanodeBlockID, String traceID) throws IOException {
DatanodeBlockID datanodeBlockID) throws IOException {
GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto
.newBuilder()
.setBlockID(datanodeBlockID);
@ -107,7 +106,6 @@ public final class ContainerProtocolCalls {
.newBuilder()
.setCmdType(Type.GetBlock)
.setContainerID(datanodeBlockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setGetBlock(readBlockRequest);
String encodedToken = getEncodedBlockToken(getService(datanodeBlockID));
@ -126,13 +124,12 @@ public final class ContainerProtocolCalls {
*
* @param xceiverClient client to perform call
* @param blockID blockId for the Block
* @param traceID container protocol call args
* @return container protocol getLastCommittedBlockLength response
* @throws IOException if there is an I/O error while performing the call
*/
public static ContainerProtos.GetCommittedBlockLengthResponseProto
getCommittedBlockLength(
XceiverClientSpi xceiverClient, BlockID blockID, String traceID)
XceiverClientSpi xceiverClient, BlockID blockID)
throws IOException {
ContainerProtos.GetCommittedBlockLengthRequestProto.Builder
getBlockLengthRequestBuilder =
@ -143,7 +140,6 @@ public final class ContainerProtocolCalls {
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.GetCommittedBlockLength)
.setContainerID(blockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setGetCommittedBlockLength(getBlockLengthRequestBuilder);
String encodedToken = getEncodedBlockToken(new Text(blockID.
@ -162,20 +158,19 @@ public final class ContainerProtocolCalls {
*
* @param xceiverClient client to perform call
* @param containerBlockData block data to identify container
* @param traceID container protocol call args
* @return putBlockResponse
* @throws IOException if there is an I/O error while performing the call
*/
public static ContainerProtos.PutBlockResponseProto putBlock(
XceiverClientSpi xceiverClient, BlockData containerBlockData,
String traceID) throws IOException {
XceiverClientSpi xceiverClient, BlockData containerBlockData)
throws IOException {
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder().setBlockData(containerBlockData);
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
.setContainerID(containerBlockData.getBlockID().getContainerID())
.setTraceID(traceID).setDatanodeUuid(id)
.setDatanodeUuid(id)
.setPutBlock(createBlockRequest);
String encodedToken =
getEncodedBlockToken(getService(containerBlockData.getBlockID()));
@ -193,15 +188,13 @@ public final class ContainerProtocolCalls {
*
* @param xceiverClient client to perform call
* @param containerBlockData block data to identify container
* @param traceID container protocol call args
* @return putBlockResponse
* @throws IOException if there is an error while performing the call
* @throws InterruptedException
* @throws ExecutionException
*/
public static XceiverClientReply putBlockAsync(
XceiverClientSpi xceiverClient, BlockData containerBlockData,
String traceID)
XceiverClientSpi xceiverClient, BlockData containerBlockData)
throws IOException, InterruptedException, ExecutionException {
PutBlockRequestProto.Builder createBlockRequest =
PutBlockRequestProto.newBuilder().setBlockData(containerBlockData);
@ -209,7 +202,7 @@ public final class ContainerProtocolCalls {
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
.setContainerID(containerBlockData.getBlockID().getContainerID())
.setTraceID(traceID).setDatanodeUuid(id)
.setDatanodeUuid(id)
.setPutBlock(createBlockRequest);
String encodedToken =
getEncodedBlockToken(getService(containerBlockData.getBlockID()));
@ -226,14 +219,13 @@ public final class ContainerProtocolCalls {
* @param xceiverClient client to perform call
* @param chunk information about chunk to read
* @param blockID ID of the block
* @param traceID container protocol call args
* @param validators functions to validate the response
* @return container protocol read chunk response
* @throws IOException if there is an I/O error while performing the call
*/
public static ContainerProtos.ReadChunkResponseProto readChunk(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
String traceID, List<CheckedBiFunction> validators) throws IOException {
List<CheckedBiFunction> validators) throws IOException {
ReadChunkRequestProto.Builder readChunkRequest =
ReadChunkRequestProto.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
@ -241,7 +233,7 @@ public final class ContainerProtocolCalls {
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
.setContainerID(blockID.getContainerID()).setTraceID(traceID)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id).setReadChunk(readChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
@ -261,11 +253,10 @@ public final class ContainerProtocolCalls {
* @param chunk information about chunk to write
* @param blockID ID of the block
* @param data the data of the chunk to write
* @param traceID container protocol call args
* @throws IOException if there is an error while performing the call
*/
public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
BlockID blockID, ByteString data, String traceID)
BlockID blockID, ByteString data)
throws IOException {
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder()
@ -277,7 +268,6 @@ public final class ContainerProtocolCalls {
.newBuilder()
.setCmdType(Type.WriteChunk)
.setContainerID(blockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setWriteChunk(writeChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
@ -296,12 +286,11 @@ public final class ContainerProtocolCalls {
* @param chunk information about chunk to write
* @param blockID ID of the block
* @param data the data of the chunk to write
* @param traceID container protocol call args
* @throws IOException if there is an I/O error while performing the call
*/
public static XceiverClientReply writeChunkAsync(
XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
ByteString data, String traceID)
ByteString data)
throws IOException, ExecutionException, InterruptedException {
WriteChunkRequestProto.Builder writeChunkRequest =
WriteChunkRequestProto.newBuilder()
@ -310,7 +299,7 @@ public final class ContainerProtocolCalls {
String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder builder =
ContainerCommandRequestProto.newBuilder().setCmdType(Type.WriteChunk)
.setContainerID(blockID.getContainerID()).setTraceID(traceID)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(id).setWriteChunk(writeChunkRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
getContainerBlockID().toString()));
@ -330,13 +319,12 @@ public final class ContainerProtocolCalls {
* @param client - client that communicates with the container.
* @param blockID - ID of the block
* @param data - Data to be written into the container.
* @param traceID - Trace ID for logging purpose.
* @return container protocol writeSmallFile response
* @throws IOException
*/
public static PutSmallFileResponseProto writeSmallFile(
XceiverClientSpi client, BlockID blockID, byte[] data,
String traceID) throws IOException {
XceiverClientSpi client, BlockID blockID, byte[] data)
throws IOException {
BlockData containerBlockData =
BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
@ -369,7 +357,6 @@ public final class ContainerProtocolCalls {
ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.PutSmallFile)
.setContainerID(blockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setPutSmallFile(putSmallFileRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.
@ -387,12 +374,11 @@ public final class ContainerProtocolCalls {
* createContainer call that creates a container on the datanode.
* @param client - client
* @param containerID - ID of container
* @param traceID - traceID
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static void createContainer(XceiverClientSpi client, long containerID,
String traceID, String encodedToken) throws IOException {
String encodedToken) throws IOException {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto
.newBuilder();
@ -409,7 +395,6 @@ public final class ContainerProtocolCalls {
request.setContainerID(containerID);
request.setCreateContainer(createRequest.build());
request.setDatanodeUuid(id);
request.setTraceID(traceID);
client.sendCommand(request.build(), getValidatorList());
}
@ -418,12 +403,11 @@ public final class ContainerProtocolCalls {
*
* @param client
* @param force whether or not to forcibly delete the container.
* @param traceID
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static void deleteContainer(XceiverClientSpi client, long containerID,
boolean force, String traceID, String encodedToken) throws IOException {
boolean force, String encodedToken) throws IOException {
ContainerProtos.DeleteContainerRequestProto.Builder deleteRequest =
ContainerProtos.DeleteContainerRequestProto.newBuilder();
deleteRequest.setForceDelete(force);
@ -434,7 +418,6 @@ public final class ContainerProtocolCalls {
request.setCmdType(ContainerProtos.Type.DeleteContainer);
request.setContainerID(containerID);
request.setDeleteContainer(deleteRequest);
request.setTraceID(traceID);
request.setDatanodeUuid(id);
if (encodedToken != null) {
request.setEncodedToken(encodedToken);
@ -447,12 +430,11 @@ public final class ContainerProtocolCalls {
*
* @param client
* @param containerID
* @param traceID
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static void closeContainer(XceiverClientSpi client,
long containerID, String traceID, String encodedToken)
long containerID, String encodedToken)
throws IOException {
String id = client.getPipeline().getFirstNode().getUuidString();
@ -461,7 +443,6 @@ public final class ContainerProtocolCalls {
request.setCmdType(Type.CloseContainer);
request.setContainerID(containerID);
request.setCloseContainer(CloseContainerRequestProto.getDefaultInstance());
request.setTraceID(traceID);
request.setDatanodeUuid(id);
if(encodedToken != null) {
request.setEncodedToken(encodedToken);
@ -473,13 +454,12 @@ public final class ContainerProtocolCalls {
* readContainer call that gets meta data from an existing container.
*
* @param client - client
* @param traceID - trace ID
* @param encodedToken - encodedToken if security is enabled
* @throws IOException
*/
public static ReadContainerResponseProto readContainer(
XceiverClientSpi client, long containerID,
String traceID, String encodedToken) throws IOException {
XceiverClientSpi client, long containerID, String encodedToken)
throws IOException {
String id = client.getPipeline().getFirstNode().getUuidString();
ContainerCommandRequestProto.Builder request =
@ -488,7 +468,6 @@ public final class ContainerProtocolCalls {
request.setContainerID(containerID);
request.setReadContainer(ReadContainerRequestProto.getDefaultInstance());
request.setDatanodeUuid(id);
request.setTraceID(traceID);
if(encodedToken != null) {
request.setEncodedToken(encodedToken);
}
@ -503,12 +482,11 @@ public final class ContainerProtocolCalls {
*
* @param client
* @param blockID - ID of the block
* @param traceID - trace ID
* @return GetSmallFileResponseProto
* @throws IOException
*/
public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
BlockID blockID, String traceID) throws IOException {
BlockID blockID) throws IOException {
GetBlockRequestProto.Builder getBlock = GetBlockRequestProto
.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf());
@ -522,7 +500,6 @@ public final class ContainerProtocolCalls {
.newBuilder()
.setCmdType(Type.GetSmallFile)
.setContainerID(blockID.getContainerID())
.setTraceID(traceID)
.setDatanodeUuid(id)
.setGetSmallFile(getSmallFileRequest);
String encodedToken = getEncodedBlockToken(new Text(blockID.

View File

@ -45,7 +45,7 @@ public class StringCodec implements Codec<StringBuilder> {
if (value != null && !value.equals("")) {
String[] parts = value.split(":");
if (parts.length != 4) {
LOG.trace("MalformedTracerStateString: {}", value);
LOG.debug("MalformedTracerStateString: {}", value);
throw new MalformedTracerStateStringException(value);
} else {
String traceId = parts[0];

View File

@ -47,7 +47,6 @@ public final class BlockOutputStreamEntry extends OutputStream {
private final Pipeline pipeline;
private final ChecksumType checksumType;
private final int bytesPerChecksum;
private final String requestId;
private final int chunkSize;
// total number of bytes that should be written to this stream
private final long length;
@ -73,7 +72,6 @@ public final class BlockOutputStreamEntry extends OutputStream {
this.key = key;
this.xceiverClientManager = xceiverClientManager;
this.pipeline = pipeline;
this.requestId = requestId;
this.chunkSize = chunkSize;
this.token = token;
this.length = length;
@ -111,7 +109,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
}
this.outputStream =
new BlockOutputStream(blockID, key, xceiverClientManager,
pipeline, requestId, chunkSize, streamBufferFlushSize,
pipeline, chunkSize, streamBufferFlushSize,
streamBufferMaxSize, watchTimeout, bufferPool, checksumType,
bytesPerChecksum);
}
@ -324,10 +322,6 @@ public final class BlockOutputStreamEntry extends OutputStream {
return pipeline;
}
public String getRequestId() {
return requestId;
}
public int getChunkSize() {
return chunkSize;
}

View File

@ -76,21 +76,21 @@ public class KeyInputStream extends InputStream implements Seekable {
* For each block in keyInfo, add a BlockInputStream to blockStreams.
*/
public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo,
XceiverClientManager xceiverClientManager, String requestId,
XceiverClientManager xceiverClientManager,
boolean verifyChecksum) {
List<OmKeyLocationInfo> keyLocationInfos = keyInfo
.getLatestVersionLocations().getBlocksLatestVersionOnly();
KeyInputStream keyInputStream = new KeyInputStream();
keyInputStream.initialize(keyInfo.getKeyName(), keyLocationInfos,
xceiverClientManager, requestId, verifyChecksum);
xceiverClientManager, verifyChecksum);
return new LengthInputStream(keyInputStream, keyInputStream.length);
}
private synchronized void initialize(String keyName,
List<OmKeyLocationInfo> blockInfos,
XceiverClientManager xceiverClientManager, String requestId,
XceiverClientManager xceiverClientManager,
boolean verifyChecksum) {
this.key = keyName;
this.blockOffsets = new long[blockInfos.size()];
@ -100,7 +100,7 @@ public class KeyInputStream extends InputStream implements Seekable {
LOG.debug("Adding stream for accessing {}. The stream will be " +
"initialized later.", omKeyLocationInfo);
addStream(omKeyLocationInfo, xceiverClientManager, requestId,
addStream(omKeyLocationInfo, xceiverClientManager,
verifyChecksum);
this.blockOffsets[i] = keyLength;
@ -116,11 +116,11 @@ public class KeyInputStream extends InputStream implements Seekable {
* the block for the first time.
*/
private synchronized void addStream(OmKeyLocationInfo blockInfo,
XceiverClientManager xceiverClientMngr, String clientRequestId,
XceiverClientManager xceiverClientMngr,
boolean verifyChecksum) {
blockStreams.add(new BlockInputStream(blockInfo.getBlockID(),
blockInfo.getLength(), blockInfo.getPipeline(), blockInfo.getToken(),
verifyChecksum, clientRequestId, xceiverClientMngr));
verifyChecksum, xceiverClientMngr));
}
@VisibleForTesting

View File

@ -654,7 +654,6 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
throws IOException {
HddsClientUtils.verifyResourceName(volumeName, bucketName);
Preconditions.checkNotNull(keyName);
String requestId = UUID.randomUUID().toString();
OmKeyArgs keyArgs = new OmKeyArgs.Builder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
@ -662,7 +661,7 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
.setRefreshPipeline(true)
.build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
return createInputStream(keyInfo, requestId);
return createInputStream(keyInfo);
}
@Override
@ -984,7 +983,7 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
.setKeyName(keyName)
.build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupFile(keyArgs);
return createInputStream(keyInfo, UUID.randomUUID().toString());
return createInputStream(keyInfo);
}
@Override
@ -1069,10 +1068,10 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
return ozoneManagerClient.getAcl(obj);
}
private OzoneInputStream createInputStream(OmKeyInfo keyInfo,
String requestId) throws IOException {
private OzoneInputStream createInputStream(OmKeyInfo keyInfo)
throws IOException {
LengthInputStream lengthInputStream = KeyInputStream
.getFromOmKeyInfo(keyInfo, xceiverClientManager, requestId,
.getFromOmKeyInfo(keyInfo, xceiverClientManager,
verifyChecksum);
FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo();
if (feInfo != null) {

View File

@ -44,7 +44,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.UUID;
/**
* Tests the idempotent operations in ContainerStateMachine.
@ -80,7 +79,6 @@ public class TestContainerStateMachineIdempotency {
@Test
public void testContainerStateMachineIdempotency() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, containerOwner);
@ -89,8 +87,7 @@ public class TestContainerStateMachineIdempotency {
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
try {
//create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID,
null);
ContainerProtocolCalls.createContainer(client, containerID, null);
// call create Container again
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
byte[] data =
@ -112,10 +109,8 @@ public class TestContainerStateMachineIdempotency {
client.sendCommand(putKeyRequest);
// close container call
ContainerProtocolCalls.closeContainer(client, containerID, traceID,
null);
ContainerProtocolCalls.closeContainer(client, containerID, traceID,
null);
ContainerProtocolCalls.closeContainer(client, containerID, null);
ContainerProtocolCalls.closeContainer(client, containerID, null);
} catch (IOException ioe) {
Assert.fail("Container operation failed" + ioe);
}

View File

@ -41,8 +41,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.UUID;
/**
* Test Container calls.
*/
@ -80,7 +78,6 @@ public class TestContainerSmallFile {
@Test
public void testAllocateWrite() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
@ -88,14 +85,14 @@ public class TestContainerSmallFile {
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID, null);
container.getContainerInfo().getContainerID(), null);
BlockID blockID = ContainerTestHelper.getTestBlockID(
container.getContainerInfo().getContainerID());
ContainerProtocolCalls.writeSmallFile(client, blockID,
"data123".getBytes(), traceID);
"data123".getBytes());
ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client, blockID, traceID);
ContainerProtocolCalls.readSmallFile(client, blockID);
String readData = response.getData().getData().toStringUtf8();
Assert.assertEquals("data123", readData);
xceiverClientManager.releaseClient(client, false);
@ -103,7 +100,6 @@ public class TestContainerSmallFile {
@Test
public void testInvalidBlockRead() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
xceiverClientManager.getType(),
@ -111,7 +107,7 @@ public class TestContainerSmallFile {
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID, null);
container.getContainerInfo().getContainerID(), null);
thrown.expect(StorageContainerException.class);
thrown.expectMessage("Unable to find the block");
@ -120,13 +116,12 @@ public class TestContainerSmallFile {
container.getContainerInfo().getContainerID());
// Try to read a Key Container Name
ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client, blockID, traceID);
ContainerProtocolCalls.readSmallFile(client, blockID);
xceiverClientManager.releaseClient(client, false);
}
@Test
public void testInvalidContainerRead() throws Exception {
String traceID = UUID.randomUUID().toString();
long nonExistContainerID = 8888L;
ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
@ -135,11 +130,11 @@ public class TestContainerSmallFile {
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID, null);
container.getContainerInfo().getContainerID(), null);
BlockID blockID = ContainerTestHelper.getTestBlockID(
container.getContainerInfo().getContainerID());
ContainerProtocolCalls.writeSmallFile(client, blockID,
"data123".getBytes(), traceID);
"data123".getBytes());
thrown.expect(StorageContainerException.class);
thrown.expectMessage("ContainerID 8888 does not exist");
@ -148,13 +143,12 @@ public class TestContainerSmallFile {
ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client,
ContainerTestHelper.getTestBlockID(
nonExistContainerID), traceID);
nonExistContainerID));
xceiverClientManager.releaseClient(client, false);
}
@Test
public void testReadWriteWithBCSId() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container =
storageContainerLocationClient.allocateContainer(
HddsProtos.ReplicationType.RATIS,
@ -162,20 +156,20 @@ public class TestContainerSmallFile {
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client,
container.getContainerInfo().getContainerID(), traceID, null);
container.getContainerInfo().getContainerID(), null);
BlockID blockID1 = ContainerTestHelper.getTestBlockID(
container.getContainerInfo().getContainerID());
ContainerProtos.PutSmallFileResponseProto responseProto =
ContainerProtocolCalls
.writeSmallFile(client, blockID1, "data123".getBytes(), traceID);
.writeSmallFile(client, blockID1, "data123".getBytes());
long bcsId = responseProto.getCommittedBlockLength().getBlockID()
.getBlockCommitSequenceId();
try {
blockID1.setBlockCommitSequenceId(bcsId + 1);
//read a file with higher bcsId than the container bcsId
ContainerProtocolCalls
.readSmallFile(client, blockID1, traceID);
.readSmallFile(client, blockID1);
Assert.fail("Expected exception not thrown");
} catch (StorageContainerException sce) {
Assert
@ -186,12 +180,12 @@ public class TestContainerSmallFile {
BlockID blockID2 = ContainerTestHelper
.getTestBlockID(container.getContainerInfo().getContainerID());
ContainerProtocolCalls
.writeSmallFile(client, blockID2, "data123".getBytes(), traceID);
.writeSmallFile(client, blockID2, "data123".getBytes());
try {
blockID1.setBlockCommitSequenceId(bcsId + 1);
//read a file with higher bcsId than the committed bcsId for the block
ContainerProtocolCalls.readSmallFile(client, blockID1, traceID);
ContainerProtocolCalls.readSmallFile(client, blockID1);
Assert.fail("Expected exception not thrown");
} catch (StorageContainerException sce) {
Assert
@ -199,7 +193,7 @@ public class TestContainerSmallFile {
}
blockID1.setBlockCommitSequenceId(bcsId);
ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client, blockID1, traceID);
ContainerProtocolCalls.readSmallFile(client, blockID1);
String readData = response.getData().getData().toStringUtf8();
Assert.assertEquals("data123", readData);
xceiverClientManager.releaseClient(client, false);

View File

@ -46,9 +46,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.UUID;
/**
* Test Container calls.
*/
@ -85,7 +82,6 @@ public class TestGetCommittedBlockLengthAndPutKey {
@Test
public void tesGetCommittedBlockLength() throws Exception {
ContainerProtos.GetCommittedBlockLengthResponseProto response;
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
@ -93,7 +89,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
Pipeline pipeline = container.getPipeline();
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
//create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID, null);
ContainerProtocolCalls.createContainer(client, containerID, null);
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
byte[] data =
@ -109,7 +105,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
.getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk());
client.sendCommand(putKeyRequest);
response = ContainerProtocolCalls
.getCommittedBlockLength(client, blockID, traceID);
.getCommittedBlockLength(client, blockID);
// make sure the block ids in the request and response are same.
Assert.assertTrue(
BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
@ -119,22 +115,21 @@ public class TestGetCommittedBlockLengthAndPutKey {
@Test
public void testGetCommittedBlockLengthForInvalidBlock() throws Exception {
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(xceiverClientManager.getType(),
HddsProtos.ReplicationFactor.ONE, containerOwner);
long containerID = container.getContainerInfo().getContainerID();
XceiverClientSpi client = xceiverClientManager
.acquireClient(container.getPipeline());
ContainerProtocolCalls.createContainer(client, containerID, traceID, null);
ContainerProtocolCalls.createContainer(client, containerID, null);
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
// move the container to closed state
ContainerProtocolCalls.closeContainer(client, containerID, traceID, null);
ContainerProtocolCalls.closeContainer(client, containerID, null);
try {
// There is no block written inside the container. The request should
// fail.
ContainerProtocolCalls.getCommittedBlockLength(client, blockID, traceID);
ContainerProtocolCalls.getCommittedBlockLength(client, blockID);
Assert.fail("Expected exception not thrown");
} catch (StorageContainerException sce) {
Assert.assertTrue(sce.getMessage().contains("Unable to find the block"));
@ -145,7 +140,6 @@ public class TestGetCommittedBlockLengthAndPutKey {
@Test
public void tesPutKeyResposne() throws Exception {
ContainerProtos.PutBlockResponseProto response;
String traceID = UUID.randomUUID().toString();
ContainerWithPipeline container = storageContainerLocationClient
.allocateContainer(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE, containerOwner);
@ -153,7 +147,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
Pipeline pipeline = container.getPipeline();
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
//create the container
ContainerProtocolCalls.createContainer(client, containerID, traceID, null);
ContainerProtocolCalls.createContainer(client, containerID, null);
BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
byte[] data =

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.scm;
import com.google.common.cache.Cache;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@ -144,9 +143,8 @@ public class TestXceiverClientManager {
+ container1.getContainerInfo().getReplicationType());
Assert.assertEquals(null, nonExistent1);
// However container call should succeed because of refcount on the client.
String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
ContainerProtocolCalls.createContainer(client1,
container1.getContainerInfo().getContainerID(), traceID1, null);
container1.getContainerInfo().getContainerID(), null);
// After releasing the client, this connection should be closed
// and any container operations should fail
@ -155,7 +153,7 @@ public class TestXceiverClientManager {
String expectedMessage = "This channel is not connected.";
try {
ContainerProtocolCalls.createContainer(client1,
container1.getContainerInfo().getContainerID(), traceID1, null);
container1.getContainerInfo().getContainerID(), null);
Assert.fail("Create container should throw exception on closed"
+ "client");
} catch (Exception e) {
@ -202,11 +200,10 @@ public class TestXceiverClientManager {
Assert.assertEquals(null, nonExistent);
// Any container operation should now fail
String traceID2 = "trace" + RandomStringUtils.randomNumeric(4);
String expectedMessage = "This channel is not connected.";
try {
ContainerProtocolCalls.createContainer(client1,
container1.getContainerInfo().getContainerID(), traceID2, null);
container1.getContainerInfo().getContainerID(), null);
Assert.fail("Create container should throw exception on closed"
+ "client");
} catch (Exception e) {

View File

@ -503,7 +503,7 @@ public final class DistributedStorageHandler implements StorageHandler {
.build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
return KeyInputStream.getFromOmKeyInfo(
keyInfo, xceiverClientManager, args.getRequestID(), verifyChecksum);
keyInfo, xceiverClientManager, verifyChecksum);
}
@Override

View File

@ -47,7 +47,7 @@ public class TestChunkStreams {
for (int i = 0; i < 5; i++) {
int tempOffset = offset;
BlockInputStream in =
new BlockInputStream(null, 100, null, null, true, null, null) {
new BlockInputStream(null, 100, null, null, true, null) {
private long pos = 0;
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);
@ -103,7 +103,7 @@ public class TestChunkStreams {
for (int i = 0; i < 5; i++) {
int tempOffset = offset;
BlockInputStream in =
new BlockInputStream(null, 100, null, null, true, null, null) {
new BlockInputStream(null, 100, null, null, true, null) {
private long pos = 0;
private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100);