Revert "HDFS-12794. Ozone: Parallelize ChunkOutputSream Writes to container. Contributed by Shashikant Banerjee."

This reverts commit 6ce5ec6761.
This commit is contained in:
Mukul Kumar Singh 2018-01-17 01:09:48 +05:30 committed by Owen O'Malley
parent 98eeabbdb3
commit 98d62e55c4
8 changed files with 85 additions and 273 deletions

View File

@ -107,9 +107,6 @@ public final class OzoneConfigKeys {
"ozone.scm.block.size.in.mb"; "ozone.scm.block.size.in.mb";
public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256; public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256;
public static final String OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB =
"ozone.output.stream.buffer.size.in.mb";
public static final long OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT = 256;
/** /**
* Ozone administrator users delimited by comma. * Ozone administrator users delimited by comma.
* If not set, only the user who launches an ozone service will be the * If not set, only the user who launches an ozone service will be the

View File

@ -19,10 +19,7 @@ package org.apache.hadoop.ozone.client.io;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
@ -49,9 +46,6 @@ import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
/** /**
* Maintaining a list of ChunkInputStream. Write based on offset. * Maintaining a list of ChunkInputStream. Write based on offset.
* *
@ -78,7 +72,6 @@ public class ChunkGroupOutputStream extends OutputStream {
private final XceiverClientManager xceiverClientManager; private final XceiverClientManager xceiverClientManager;
private final int chunkSize; private final int chunkSize;
private final String requestID; private final String requestID;
private final long streamBufferSize;
/** /**
* A constructor for testing purpose only. * A constructor for testing purpose only.
@ -93,7 +86,6 @@ public class ChunkGroupOutputStream extends OutputStream {
xceiverClientManager = null; xceiverClientManager = null;
chunkSize = 0; chunkSize = 0;
requestID = null; requestID = null;
streamBufferSize = OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB;
} }
/** /**
@ -113,26 +105,12 @@ public class ChunkGroupOutputStream extends OutputStream {
return streamEntries; return streamEntries;
} }
/** public ChunkGroupOutputStream(
* Chunkoutput stream, making this package visible since this can be
* created only via builder.
* @param handler - Open Key state.
* @param xceiverClientManager - Communication Manager.
* @param scmClient - SCM protocol Client.
* @param ksmClient - KSM Protocol client
* @param chunkSize - Chunk Size - I/O
* @param requestId - Seed for trace ID generation.
* @param factor - Replication factor
* @param type - Replication Type - RATIS/Standalone etc.
* @param maxBufferSize - Maximum stream buffer Size.
* @throws IOException - Throws this exception if there is an error.
*/
ChunkGroupOutputStream(
OpenKeySession handler, XceiverClientManager xceiverClientManager, OpenKeySession handler, XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB scmClient, StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
KeySpaceManagerProtocolClientSideTranslatorPB ksmClient, KeySpaceManagerProtocolClientSideTranslatorPB ksmClient,
int chunkSize, String requestId, ReplicationFactor factor, int chunkSize, String requestId, ReplicationFactor factor,
ReplicationType type, long maxBufferSize) throws IOException { ReplicationType type) throws IOException {
this.streamEntries = new ArrayList<>(); this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0; this.currentStreamIndex = 0;
this.byteOffset = 0; this.byteOffset = 0;
@ -152,7 +130,6 @@ public class ChunkGroupOutputStream extends OutputStream {
this.requestID = requestId; this.requestID = requestId;
LOG.debug("Expecting open key with one block, but got" + LOG.debug("Expecting open key with one block, but got" +
info.getKeyLocationVersions().size()); info.getKeyLocationVersions().size());
this.streamBufferSize = maxBufferSize;
} }
/** /**
@ -207,7 +184,7 @@ public class ChunkGroupOutputStream extends OutputStream {
} }
streamEntries.add(new ChunkOutputStreamEntry(containerKey, streamEntries.add(new ChunkOutputStreamEntry(containerKey,
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
chunkSize, subKeyInfo.getLength(), this.streamBufferSize)); chunkSize, subKeyInfo.getLength()));
} }
@ -347,7 +324,6 @@ public class ChunkGroupOutputStream extends OutputStream {
private String requestID; private String requestID;
private ReplicationType type; private ReplicationType type;
private ReplicationFactor factor; private ReplicationFactor factor;
private long streamBufferSize;
public Builder setHandler(OpenKeySession handler) { public Builder setHandler(OpenKeySession handler) {
this.openHandler = handler; this.openHandler = handler;
@ -391,23 +367,9 @@ public class ChunkGroupOutputStream extends OutputStream {
return this; return this;
} }
public Builder setStreamBufferSize(long blockSize) {
this.streamBufferSize = blockSize;
return this;
}
public ChunkGroupOutputStream build() throws IOException { public ChunkGroupOutputStream build() throws IOException {
Preconditions.checkNotNull(openHandler);
Preconditions.checkNotNull(xceiverManager);
Preconditions.checkNotNull(scmClient);
Preconditions.checkNotNull(ksmClient);
Preconditions.checkState(chunkSize > 0);
Preconditions.checkState(StringUtils.isNotEmpty(requestID));
Preconditions
.checkState(streamBufferSize > 0 && streamBufferSize > chunkSize);
return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
ksmClient, chunkSize, requestID, factor, type, streamBufferSize); ksmClient, chunkSize, requestID, factor, type);
} }
} }
@ -423,12 +385,11 @@ public class ChunkGroupOutputStream extends OutputStream {
private final long length; private final long length;
// the current position of this stream 0 <= currentPosition < length // the current position of this stream 0 <= currentPosition < length
private long currentPosition; private long currentPosition;
private long streamBufferSize; // Max block size.
ChunkOutputStreamEntry(String containerKey, String key, ChunkOutputStreamEntry(String containerKey, String key,
XceiverClientManager xceiverClientManager, XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, String requestId, int chunkSize, XceiverClientSpi xceiverClient, String requestId, int chunkSize,
long length, long streamBufferSize) { long length) {
this.outputStream = null; this.outputStream = null;
this.containerKey = containerKey; this.containerKey = containerKey;
this.key = key; this.key = key;
@ -439,7 +400,6 @@ public class ChunkGroupOutputStream extends OutputStream {
this.length = length; this.length = length;
this.currentPosition = 0; this.currentPosition = 0;
this.streamBufferSize = streamBufferSize;
} }
/** /**
@ -458,8 +418,6 @@ public class ChunkGroupOutputStream extends OutputStream {
this.length = length; this.length = length;
this.currentPosition = 0; this.currentPosition = 0;
this.streamBufferSize =
OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB;
} }
long getLength() { long getLength() {
@ -474,7 +432,7 @@ public class ChunkGroupOutputStream extends OutputStream {
if (this.outputStream == null) { if (this.outputStream == null) {
this.outputStream = new ChunkOutputStream(containerKey, this.outputStream = new ChunkOutputStream(containerKey,
key, xceiverClientManager, xceiverClient, key, xceiverClientManager, xceiverClient,
requestId, chunkSize, Ints.checkedCast(streamBufferSize)); requestId, chunkSize);
} }
} }

View File

@ -74,11 +74,6 @@ import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
/** /**
* Ozone RPC Client Implementation, it connects to KSM, SCM and DataNode * Ozone RPC Client Implementation, it connects to KSM, SCM and DataNode
* to execute client calls. This uses RPC protocol for communication * to execute client calls. This uses RPC protocol for communication
@ -99,7 +94,6 @@ public class RpcClient implements ClientProtocol {
private final UserGroupInformation ugi; private final UserGroupInformation ugi;
private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights userRights;
private final OzoneAcl.OzoneACLRights groupRights; private final OzoneAcl.OzoneACLRights groupRights;
private final long streamBufferSize;
/** /**
* Creates RpcClient instance with the given configuration. * Creates RpcClient instance with the given configuration.
@ -154,9 +148,6 @@ public class RpcClient implements ClientProtocol {
} else { } else {
chunkSize = configuredChunkSize; chunkSize = configuredChunkSize;
} }
// streamBufferSize by default is set equal to default scm block size.
streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB,
OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB;
} }
@Override @Override
@ -472,7 +463,6 @@ public class RpcClient implements ClientProtocol {
.setRequestID(requestId) .setRequestID(requestId)
.setType(OzoneProtos.ReplicationType.valueOf(type.toString())) .setType(OzoneProtos.ReplicationType.valueOf(type.toString()))
.setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue())) .setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue()))
.setStreamBufferSize(streamBufferSize)
.build(); .build();
groupOutputStream.addPreallocateBlocks( groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(), openKey.getKeyInfo().getLatestVersionLocations(),

View File

@ -18,32 +18,22 @@
package org.apache.hadoop.scm.storage; package org.apache.hadoop.scm.storage;
import com.google.common.base.Preconditions; import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.UUID;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.util.Time;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.Result.SUCCESS;
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
/** /**
* An {@link OutputStream} used by the REST service in combination with the * An {@link OutputStream} used by the REST service in combination with the
@ -67,12 +57,12 @@ public class ChunkOutputStream extends OutputStream {
private final String key; private final String key;
private final String traceID; private final String traceID;
private final KeyData.Builder containerKeyData; private final KeyData.Builder containerKeyData;
private final String streamId;
private XceiverClientManager xceiverClientManager; private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient; private XceiverClientSpi xceiverClient;
private ByteBuffer buffer; private ByteBuffer buffer;
private final String streamId;
private int chunkIndex;
private int chunkSize; private int chunkSize;
private int streamBufferSize;
/** /**
* Creates a new ChunkOutputStream. * Creates a new ChunkOutputStream.
@ -83,18 +73,14 @@ public class ChunkOutputStream extends OutputStream {
* @param xceiverClient client to perform container calls * @param xceiverClient client to perform container calls
* @param traceID container protocol call args * @param traceID container protocol call args
* @param chunkSize chunk size * @param chunkSize chunk size
* @param maxBufferSize -- Controls the maximum amount of memory that we need
* to allocate data buffering.
*/ */
public ChunkOutputStream(String containerKey, String key, public ChunkOutputStream(String containerKey, String key,
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
String traceID, int chunkSize, int maxBufferSize) { String traceID, int chunkSize) {
this.containerKey = containerKey; this.containerKey = containerKey;
this.key = key; this.key = key;
this.traceID = traceID; this.traceID = traceID;
this.chunkSize = chunkSize; this.chunkSize = chunkSize;
this.streamBufferSize = maxBufferSize;
KeyValue keyValue = KeyValue.newBuilder() KeyValue keyValue = KeyValue.newBuilder()
.setKey("TYPE").setValue("KEY").build(); .setKey("TYPE").setValue("KEY").build();
this.containerKeyData = KeyData.newBuilder() this.containerKeyData = KeyData.newBuilder()
@ -103,24 +89,22 @@ public class ChunkOutputStream extends OutputStream {
.addMetadata(keyValue); .addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager; this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient; this.xceiverClient = xceiverClient;
this.buffer = ByteBuffer.allocate(maxBufferSize); this.buffer = ByteBuffer.allocate(chunkSize);
this.streamId = UUID.randomUUID().toString(); this.streamId = UUID.randomUUID().toString();
this.chunkIndex = 0;
} }
/**
* {@inheritDoc}
*/
@Override @Override
public synchronized void write(int b) throws IOException { public synchronized void write(int b) throws IOException {
checkOpen(); checkOpen();
byte[] c = new byte[1]; int rollbackPosition = buffer.position();
c[0] = (byte) b; int rollbackLimit = buffer.limit();
write(c, 0, 1); buffer.put((byte)b);
if (buffer.position() == chunkSize) {
flushBufferToChunk(rollbackPosition, rollbackLimit);
}
} }
/**
* {@inheritDoc}
*/
@Override @Override
public void write(byte[] b, int off, int len) throws IOException { public void write(byte[] b, int off, int len) throws IOException {
if (b == null) { if (b == null) {
@ -134,111 +118,26 @@ public class ChunkOutputStream extends OutputStream {
return; return;
} }
checkOpen(); checkOpen();
while (len > 0) {
int writeLen = Math.min(chunkSize - buffer.position(), len);
int rollbackPosition = buffer.position(); int rollbackPosition = buffer.position();
int rollbackLimit = buffer.limit(); int rollbackLimit = buffer.limit();
try { buffer.put(b, off, writeLen);
List<ImmutablePair<CompletableFuture<ContainerProtos if (buffer.position() == chunkSize) {
.ContainerCommandResponseProto>, ChunkInfo>> flushBufferToChunk(rollbackPosition, rollbackLimit);
writeFutures = writeInParallel(b, off, len);
// This is a rendezvous point for this function call, all chunk I/O
// for this block must complete before we can declare this call as
// complete.
// Wait until all the futures complete or throws an exception if any of
// the calls ended with an exception this call will throw.
// if futures is null, it means that we wrote the data to the buffer and
// returned.
if (writeFutures != null) {
CompletableFuture.allOf(writeFutures.toArray(new
CompletableFuture[writeFutures.size()])).join();
// Wrote this data, we will clear this buffer now.
buffer.clear();
} }
} catch (InterruptedException | ExecutionException e) { off += writeLen;
buffer.position(rollbackPosition); len -= writeLen;
buffer.limit(rollbackLimit);
throw new IOException("Unexpected error in write. ", e);
} }
} }
/**
* Write a given block into many small chunks in parallel.
*
* @param b
* @param off
* @param len
* @throws IOException
* @throws ExecutionException
* @throws InterruptedException
*/
public List<ImmutablePair<CompletableFuture<ContainerProtos
.ContainerCommandResponseProto>, ChunkInfo>>
writeInParallel(byte[] b, int off, int len)
throws IOException, ExecutionException, InterruptedException {
Preconditions.checkArgument(len <= streamBufferSize,
"A chunk write cannot be " + "larger than max buffer size limit.");
long newBlockCount = len / chunkSize;
buffer.put(b, off, len);
List<ImmutablePair<CompletableFuture<ContainerProtos
.ContainerCommandResponseProto>, ChunkInfo>>
writeFutures = new LinkedList<>();
// We if must have at least a chunkSize of data ready to write, if so we
// will go ahead and start writing that data.
if (buffer.position() >= chunkSize) {
// Allocate new byte slices which will point to each chunk of data
// that we want to write. Divide the byte buffer into individual chunks
// each of length equals to chunkSize max where each chunk will be
// assigned a chunkId where, for each chunk the async write requests will
// be made and wait for all of them to return before the write call
// returns.
for (int chunkId = 0; chunkId < newBlockCount; chunkId++) {
// Please note : We are not flipping the slice when we write since
// the slices are pointing the buffer start and end as needed for
// the chunk write. Also please note, Duplicate does not create a
// copy of data, it only creates metadata that points to the data
// stream.
ByteBuffer chunk = buffer.duplicate();
Preconditions.checkState((chunkId * chunkSize) < buffer.limit(),
"Chunk offset cannot be beyond the limits of the buffer.");
chunk.position(chunkId * chunkSize);
// Min handles the case where the last block might be lesser than
// chunk Size.
chunk.limit(chunk.position() +
Math.min(chunkSize, chunk.remaining() - (chunkId * chunkSize)));
// Schedule all the writes, this is a non-block call which returns
// futures. We collect these futures and wait for all of them to
// complete in the next line.
writeFutures.add(writeChunkToContainer(chunk, 0, chunkSize));
}
return writeFutures;
}
// Nothing to do , return null.
return null;
}
@Override @Override
public synchronized void flush() throws IOException { public synchronized void flush() throws IOException {
checkOpen(); checkOpen();
if (buffer.position() > 0) { if (buffer.position() > 0) {
int rollbackPosition = buffer.position(); int rollbackPosition = buffer.position();
int rollbackLimit = buffer.limit(); int rollbackLimit = buffer.limit();
ByteBuffer chunk = buffer.duplicate(); flushBufferToChunk(rollbackPosition, rollbackLimit);
try {
ImmutablePair<CompletableFuture<ContainerProtos
.ContainerCommandResponseProto>, ChunkInfo>
result = writeChunkToContainer(chunk, 0, chunkSize);
updateChunkInfo(result);
buffer.clear();
} catch (ExecutionException | InterruptedException e) {
buffer.position(rollbackPosition);
buffer.limit(rollbackLimit);
throw new IOException("Failure in flush", e);
}
} }
} }
@ -248,20 +147,10 @@ public class ChunkOutputStream extends OutputStream {
buffer != null) { buffer != null) {
try { try {
if (buffer.position() > 0) { if (buffer.position() > 0) {
// This flip is needed since this is the real buffer to which we writeChunkToContainer();
// are writing and position will have moved each time we did a put.
buffer.flip();
// Call get immediately to make this call Synchronous.
ImmutablePair<CompletableFuture<ContainerProtos
.ContainerCommandResponseProto>, ChunkInfo>
result = writeChunkToContainer(buffer, 0, buffer.limit());
updateChunkInfo(result);
buffer.clear();
} }
putKey(xceiverClient, containerKeyData.build(), traceID); putKey(xceiverClient, containerKeyData.build(), traceID);
} catch (IOException | InterruptedException | ExecutionException e) { } catch (IOException e) {
throw new IOException( throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e); "Unexpected Storage Container Exception: " + e.toString(), e);
} finally { } finally {
@ -274,24 +163,6 @@ public class ChunkOutputStream extends OutputStream {
} }
private void updateChunkInfo(
ImmutablePair<
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>,
ChunkInfo
> result) throws InterruptedException, ExecutionException {
// Wait for this call to complete.
ContainerProtos.ContainerCommandResponseProto response =
result.getLeft().get();
// If the write call to the chunk is successful, we need to add that
// chunk information to the containerKeyData.
// TODO: Clean up the garbage in case of failure.
if(response.getResult() == SUCCESS) {
ChunkInfo chunk = result.getRight();
containerKeyData.addChunks(chunk);
}
}
/** /**
* Checks if the stream is open. If not, throws an exception. * Checks if the stream is open. If not, throws an exception.
* *
@ -303,36 +174,54 @@ public class ChunkOutputStream extends OutputStream {
} }
} }
/**
* Attempts to flush buffered writes by writing a new chunk to the container.
* If successful, then clears the buffer to prepare to receive writes for a
* new chunk.
*
* @param rollbackPosition position to restore in buffer if write fails
* @param rollbackLimit limit to restore in buffer if write fails
* @throws IOException if there is an I/O error while performing the call
*/
private synchronized void flushBufferToChunk(int rollbackPosition,
int rollbackLimit) throws IOException {
boolean success = false;
try {
writeChunkToContainer();
success = true;
} finally {
if (success) {
buffer.clear();
} else {
buffer.position(rollbackPosition);
buffer.limit(rollbackLimit);
}
}
}
/** /**
* Writes buffered data as a new chunk to the container and saves chunk * Writes buffered data as a new chunk to the container and saves chunk
* information to be used later in putKey call. * information to be used later in putKey call.
* *
* @param data -- Data to write. * @throws IOException if there is an I/O error while performing the call
* @param offset - offset to the data buffer
* @param len - Length in bytes
* @return Returns a Immutable pair -- A future object that will contian
* the result of the operation, and the chunkInfo that we wrote.
*
* @throws IOException
* @throws ExecutionException
* @throws InterruptedException
*/ */
private ImmutablePair< private synchronized void writeChunkToContainer() throws IOException {
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>, buffer.flip();
ChunkInfo> ByteString data = ByteString.copyFrom(buffer);
writeChunkToContainer(ByteBuffer data, int offset, int len) ChunkInfo chunk = ChunkInfo
throws IOException, ExecutionException, InterruptedException { .newBuilder()
.setChunkName(
ByteString dataString = ByteString.copyFrom(data);
ChunkInfo chunk = ChunkInfo.newBuilder().setChunkName(
DigestUtils.md5Hex(key) + "_stream_" DigestUtils.md5Hex(key) + "_stream_"
+ streamId + "_chunk_" + Time.monotonicNowNanos()) + streamId + "_chunk_" + ++chunkIndex)
.setOffset(0) .setOffset(0)
.setLen(len) .setLen(data.size())
.build(); .build();
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> response = try {
writeChunk(xceiverClient, chunk, key, dataString, traceID); writeChunk(xceiverClient, chunk, key, data, traceID);
return new ImmutablePair(response, chunk); } catch (IOException e) {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
}
containerKeyData.addChunks(chunk);
} }
} }

View File

@ -53,9 +53,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.XceiverClientSpi;
/** /**
@ -165,10 +162,9 @@ public final class ContainerProtocolCalls {
* @param traceID container protocol call args * @param traceID container protocol call args
* @throws IOException if there is an I/O error while performing the call * @throws IOException if there is an I/O error while performing the call
*/ */
public static CompletableFuture<ContainerCommandResponseProto> writeChunk( public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
XceiverClientSpi xceiverClient, ChunkInfo chunk, String key, String key, ByteString data, String traceID)
ByteString data, String traceID) throws IOException {
throws IOException, ExecutionException, InterruptedException {
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder() .newBuilder()
.setPipeline(xceiverClient.getPipeline().getProtobufMessage()) .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
@ -183,7 +179,8 @@ public final class ContainerProtocolCalls {
.setDatanodeID(id) .setDatanodeID(id)
.setWriteChunk(writeChunkRequest) .setWriteChunk(writeChunkRequest)
.build(); .build();
return xceiverClient.sendCommandAsync(request); ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response);
} }
/** /**

View File

@ -67,11 +67,6 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List; import java.util.List;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
/** /**
* A {@link StorageHandler} implementation that distributes object storage * A {@link StorageHandler} implementation that distributes object storage
* across the nodes of an HDFS cluster. * across the nodes of an HDFS cluster.
@ -91,7 +86,6 @@ public final class DistributedStorageHandler implements StorageHandler {
private final boolean useRatis; private final boolean useRatis;
private final OzoneProtos.ReplicationType type; private final OzoneProtos.ReplicationType type;
private final OzoneProtos.ReplicationFactor factor; private final OzoneProtos.ReplicationFactor factor;
private final long streamBufferSize;
/** /**
* Creates a new DistributedStorageHandler. * Creates a new DistributedStorageHandler.
@ -133,9 +127,6 @@ public final class DistributedStorageHandler implements StorageHandler {
chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE); chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
} }
// streamBufferSize by default is set to default scm block size.
streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB,
OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB;
} }
@Override @Override
@ -427,7 +418,6 @@ public final class DistributedStorageHandler implements StorageHandler {
.setRequestID(args.getRequestID()) .setRequestID(args.getRequestID())
.setType(xceiverClientManager.getType()) .setType(xceiverClientManager.getType())
.setFactor(xceiverClientManager.getFactor()) .setFactor(xceiverClientManager.getFactor())
.setStreamBufferSize(streamBufferSize)
.build(); .build();
groupOutputStream.addPreallocateBlocks( groupOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(), openKey.getKeyInfo().getLatestVersionLocations(),

View File

@ -690,15 +690,6 @@
Ozone block size. Ozone block size.
</description> </description>
</property> </property>
<property>
<name>ozone.output.stream.buffer.size.in.mb</name>
<value>256</value>
<tag>OZONE</tag>
<description>
The maximum size of the buffer allocated for the ozone output stream for
write. Default size is equals to scm block size.
</description>
</property>
<property> <property>
<name>ozone.scm.chunk.size</name> <name>ozone.scm.chunk.size</name>
<value>16777216</value> <value>16777216</value>

View File

@ -1114,7 +1114,7 @@ public class TestKeySpaceManager {
.getMetadataManager().getExpiredOpenKeys(); .getMetadataManager().getExpiredOpenKeys();
Assert.assertEquals(0, openKeys.size()); Assert.assertEquals(0, openKeys.size());
//Thread.sleep(2000); Thread.sleep(2000);
openKeys = cluster.getKeySpaceManager().getMetadataManager() openKeys = cluster.getKeySpaceManager().getMetadataManager()
.getExpiredOpenKeys(); .getExpiredOpenKeys();