HDFS-12794. Ozone: Parallelize ChunkOutputSream Writes to container. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
d3e0b77259
commit
6ce5ec6761
|
@ -107,6 +107,9 @@ public final class OzoneConfigKeys {
|
|||
"ozone.scm.block.size.in.mb";
|
||||
public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256;
|
||||
|
||||
public static final String OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB =
|
||||
"ozone.output.stream.buffer.size.in.mb";
|
||||
public static final long OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT = 256;
|
||||
/**
|
||||
* Ozone administrator users delimited by comma.
|
||||
* If not set, only the user who launches an ozone service will be the
|
||||
|
|
|
@ -19,7 +19,10 @@ package org.apache.hadoop.ozone.client.io;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
|
||||
|
@ -46,6 +49,9 @@ import java.io.OutputStream;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
|
||||
|
||||
/**
|
||||
* Maintaining a list of ChunkInputStream. Write based on offset.
|
||||
*
|
||||
|
@ -72,6 +78,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
private final XceiverClientManager xceiverClientManager;
|
||||
private final int chunkSize;
|
||||
private final String requestID;
|
||||
private final long streamBufferSize;
|
||||
|
||||
/**
|
||||
* A constructor for testing purpose only.
|
||||
|
@ -86,6 +93,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
xceiverClientManager = null;
|
||||
chunkSize = 0;
|
||||
requestID = null;
|
||||
streamBufferSize = OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -105,12 +113,26 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
return streamEntries;
|
||||
}
|
||||
|
||||
public ChunkGroupOutputStream(
|
||||
/**
|
||||
* Chunkoutput stream, making this package visible since this can be
|
||||
* created only via builder.
|
||||
* @param handler - Open Key state.
|
||||
* @param xceiverClientManager - Communication Manager.
|
||||
* @param scmClient - SCM protocol Client.
|
||||
* @param ksmClient - KSM Protocol client
|
||||
* @param chunkSize - Chunk Size - I/O
|
||||
* @param requestId - Seed for trace ID generation.
|
||||
* @param factor - Replication factor
|
||||
* @param type - Replication Type - RATIS/Standalone etc.
|
||||
* @param maxBufferSize - Maximum stream buffer Size.
|
||||
* @throws IOException - Throws this exception if there is an error.
|
||||
*/
|
||||
ChunkGroupOutputStream(
|
||||
OpenKeySession handler, XceiverClientManager xceiverClientManager,
|
||||
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
|
||||
KeySpaceManagerProtocolClientSideTranslatorPB ksmClient,
|
||||
int chunkSize, String requestId, ReplicationFactor factor,
|
||||
ReplicationType type) throws IOException {
|
||||
ReplicationType type, long maxBufferSize) throws IOException {
|
||||
this.streamEntries = new ArrayList<>();
|
||||
this.currentStreamIndex = 0;
|
||||
this.byteOffset = 0;
|
||||
|
@ -130,6 +152,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
this.requestID = requestId;
|
||||
LOG.debug("Expecting open key with one block, but got" +
|
||||
info.getKeyLocationVersions().size());
|
||||
this.streamBufferSize = maxBufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -184,7 +207,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
}
|
||||
streamEntries.add(new ChunkOutputStreamEntry(containerKey,
|
||||
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
|
||||
chunkSize, subKeyInfo.getLength()));
|
||||
chunkSize, subKeyInfo.getLength(), this.streamBufferSize));
|
||||
}
|
||||
|
||||
|
||||
|
@ -324,6 +347,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
private String requestID;
|
||||
private ReplicationType type;
|
||||
private ReplicationFactor factor;
|
||||
private long streamBufferSize;
|
||||
|
||||
public Builder setHandler(OpenKeySession handler) {
|
||||
this.openHandler = handler;
|
||||
|
@ -367,9 +391,23 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setStreamBufferSize(long blockSize) {
|
||||
this.streamBufferSize = blockSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ChunkGroupOutputStream build() throws IOException {
|
||||
Preconditions.checkNotNull(openHandler);
|
||||
Preconditions.checkNotNull(xceiverManager);
|
||||
Preconditions.checkNotNull(scmClient);
|
||||
Preconditions.checkNotNull(ksmClient);
|
||||
Preconditions.checkState(chunkSize > 0);
|
||||
Preconditions.checkState(StringUtils.isNotEmpty(requestID));
|
||||
Preconditions
|
||||
.checkState(streamBufferSize > 0 && streamBufferSize > chunkSize);
|
||||
|
||||
return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
|
||||
ksmClient, chunkSize, requestID, factor, type);
|
||||
ksmClient, chunkSize, requestID, factor, type, streamBufferSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -385,11 +423,12 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
private final long length;
|
||||
// the current position of this stream 0 <= currentPosition < length
|
||||
private long currentPosition;
|
||||
private long streamBufferSize; // Max block size.
|
||||
|
||||
ChunkOutputStreamEntry(String containerKey, String key,
|
||||
XceiverClientManager xceiverClientManager,
|
||||
XceiverClientSpi xceiverClient, String requestId, int chunkSize,
|
||||
long length) {
|
||||
long length, long streamBufferSize) {
|
||||
this.outputStream = null;
|
||||
this.containerKey = containerKey;
|
||||
this.key = key;
|
||||
|
@ -400,6 +439,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
|
||||
this.length = length;
|
||||
this.currentPosition = 0;
|
||||
this.streamBufferSize = streamBufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -418,6 +458,8 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
|
||||
this.length = length;
|
||||
this.currentPosition = 0;
|
||||
this.streamBufferSize =
|
||||
OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB;
|
||||
}
|
||||
|
||||
long getLength() {
|
||||
|
@ -432,7 +474,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
if (this.outputStream == null) {
|
||||
this.outputStream = new ChunkOutputStream(containerKey,
|
||||
key, xceiverClientManager, xceiverClient,
|
||||
requestId, chunkSize);
|
||||
requestId, chunkSize, Ints.checkedCast(streamBufferSize));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -74,6 +74,11 @@ import java.util.List;
|
|||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
|
||||
|
||||
/**
|
||||
* Ozone RPC Client Implementation, it connects to KSM, SCM and DataNode
|
||||
* to execute client calls. This uses RPC protocol for communication
|
||||
|
@ -94,6 +99,7 @@ public class RpcClient implements ClientProtocol {
|
|||
private final UserGroupInformation ugi;
|
||||
private final OzoneAcl.OzoneACLRights userRights;
|
||||
private final OzoneAcl.OzoneACLRights groupRights;
|
||||
private final long streamBufferSize;
|
||||
|
||||
/**
|
||||
* Creates RpcClient instance with the given configuration.
|
||||
|
@ -148,6 +154,9 @@ public class RpcClient implements ClientProtocol {
|
|||
} else {
|
||||
chunkSize = configuredChunkSize;
|
||||
}
|
||||
// streamBufferSize by default is set equal to default scm block size.
|
||||
streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB,
|
||||
OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -463,6 +472,7 @@ public class RpcClient implements ClientProtocol {
|
|||
.setRequestID(requestId)
|
||||
.setType(OzoneProtos.ReplicationType.valueOf(type.toString()))
|
||||
.setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue()))
|
||||
.setStreamBufferSize(streamBufferSize)
|
||||
.build();
|
||||
groupOutputStream.addPreallocateBlocks(
|
||||
openKey.getKeyInfo().getLatestVersionLocations(),
|
||||
|
|
|
@ -18,22 +18,32 @@
|
|||
|
||||
package org.apache.hadoop.scm.storage;
|
||||
|
||||
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
|
||||
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
|
||||
import org.apache.hadoop.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.scm.XceiverClientSpi;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||
.Result.SUCCESS;
|
||||
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
|
||||
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
|
||||
|
||||
|
||||
/**
|
||||
* An {@link OutputStream} used by the REST service in combination with the
|
||||
|
@ -57,12 +67,12 @@ public class ChunkOutputStream extends OutputStream {
|
|||
private final String key;
|
||||
private final String traceID;
|
||||
private final KeyData.Builder containerKeyData;
|
||||
private final String streamId;
|
||||
private XceiverClientManager xceiverClientManager;
|
||||
private XceiverClientSpi xceiverClient;
|
||||
private ByteBuffer buffer;
|
||||
private final String streamId;
|
||||
private int chunkIndex;
|
||||
private int chunkSize;
|
||||
private int streamBufferSize;
|
||||
|
||||
/**
|
||||
* Creates a new ChunkOutputStream.
|
||||
|
@ -73,14 +83,18 @@ public class ChunkOutputStream extends OutputStream {
|
|||
* @param xceiverClient client to perform container calls
|
||||
* @param traceID container protocol call args
|
||||
* @param chunkSize chunk size
|
||||
* @param maxBufferSize -- Controls the maximum amount of memory that we need
|
||||
* to allocate data buffering.
|
||||
*/
|
||||
public ChunkOutputStream(String containerKey, String key,
|
||||
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
|
||||
String traceID, int chunkSize) {
|
||||
String traceID, int chunkSize, int maxBufferSize) {
|
||||
this.containerKey = containerKey;
|
||||
this.key = key;
|
||||
this.traceID = traceID;
|
||||
this.chunkSize = chunkSize;
|
||||
this.streamBufferSize = maxBufferSize;
|
||||
|
||||
KeyValue keyValue = KeyValue.newBuilder()
|
||||
.setKey("TYPE").setValue("KEY").build();
|
||||
this.containerKeyData = KeyData.newBuilder()
|
||||
|
@ -89,22 +103,24 @@ public class ChunkOutputStream extends OutputStream {
|
|||
.addMetadata(keyValue);
|
||||
this.xceiverClientManager = xceiverClientManager;
|
||||
this.xceiverClient = xceiverClient;
|
||||
this.buffer = ByteBuffer.allocate(chunkSize);
|
||||
this.buffer = ByteBuffer.allocate(maxBufferSize);
|
||||
this.streamId = UUID.randomUUID().toString();
|
||||
this.chunkIndex = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public synchronized void write(int b) throws IOException {
|
||||
checkOpen();
|
||||
int rollbackPosition = buffer.position();
|
||||
int rollbackLimit = buffer.limit();
|
||||
buffer.put((byte)b);
|
||||
if (buffer.position() == chunkSize) {
|
||||
flushBufferToChunk(rollbackPosition, rollbackLimit);
|
||||
}
|
||||
byte[] c = new byte[1];
|
||||
c[0] = (byte) b;
|
||||
write(c, 0, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void write(byte[] b, int off, int len) throws IOException {
|
||||
if (b == null) {
|
||||
|
@ -118,26 +134,111 @@ public class ChunkOutputStream extends OutputStream {
|
|||
return;
|
||||
}
|
||||
checkOpen();
|
||||
while (len > 0) {
|
||||
int writeLen = Math.min(chunkSize - buffer.position(), len);
|
||||
int rollbackPosition = buffer.position();
|
||||
int rollbackLimit = buffer.limit();
|
||||
buffer.put(b, off, writeLen);
|
||||
if (buffer.position() == chunkSize) {
|
||||
flushBufferToChunk(rollbackPosition, rollbackLimit);
|
||||
int rollbackPosition = buffer.position();
|
||||
int rollbackLimit = buffer.limit();
|
||||
try {
|
||||
List<ImmutablePair<CompletableFuture<ContainerProtos
|
||||
.ContainerCommandResponseProto>, ChunkInfo>>
|
||||
writeFutures = writeInParallel(b, off, len);
|
||||
// This is a rendezvous point for this function call, all chunk I/O
|
||||
// for this block must complete before we can declare this call as
|
||||
// complete.
|
||||
|
||||
// Wait until all the futures complete or throws an exception if any of
|
||||
// the calls ended with an exception this call will throw.
|
||||
// if futures is null, it means that we wrote the data to the buffer and
|
||||
// returned.
|
||||
if (writeFutures != null) {
|
||||
CompletableFuture.allOf(writeFutures.toArray(new
|
||||
CompletableFuture[writeFutures.size()])).join();
|
||||
|
||||
// Wrote this data, we will clear this buffer now.
|
||||
buffer.clear();
|
||||
}
|
||||
off += writeLen;
|
||||
len -= writeLen;
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
buffer.position(rollbackPosition);
|
||||
buffer.limit(rollbackLimit);
|
||||
throw new IOException("Unexpected error in write. ", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a given block into many small chunks in parallel.
|
||||
*
|
||||
* @param b
|
||||
* @param off
|
||||
* @param len
|
||||
* @throws IOException
|
||||
* @throws ExecutionException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public List<ImmutablePair<CompletableFuture<ContainerProtos
|
||||
.ContainerCommandResponseProto>, ChunkInfo>>
|
||||
writeInParallel(byte[] b, int off, int len)
|
||||
throws IOException, ExecutionException, InterruptedException {
|
||||
|
||||
Preconditions.checkArgument(len <= streamBufferSize,
|
||||
"A chunk write cannot be " + "larger than max buffer size limit.");
|
||||
long newBlockCount = len / chunkSize;
|
||||
buffer.put(b, off, len);
|
||||
List<ImmutablePair<CompletableFuture<ContainerProtos
|
||||
.ContainerCommandResponseProto>, ChunkInfo>>
|
||||
writeFutures = new LinkedList<>();
|
||||
|
||||
// We if must have at least a chunkSize of data ready to write, if so we
|
||||
// will go ahead and start writing that data.
|
||||
if (buffer.position() >= chunkSize) {
|
||||
// Allocate new byte slices which will point to each chunk of data
|
||||
// that we want to write. Divide the byte buffer into individual chunks
|
||||
// each of length equals to chunkSize max where each chunk will be
|
||||
// assigned a chunkId where, for each chunk the async write requests will
|
||||
// be made and wait for all of them to return before the write call
|
||||
// returns.
|
||||
for (int chunkId = 0; chunkId < newBlockCount; chunkId++) {
|
||||
// Please note : We are not flipping the slice when we write since
|
||||
// the slices are pointing the buffer start and end as needed for
|
||||
// the chunk write. Also please note, Duplicate does not create a
|
||||
// copy of data, it only creates metadata that points to the data
|
||||
// stream.
|
||||
ByteBuffer chunk = buffer.duplicate();
|
||||
Preconditions.checkState((chunkId * chunkSize) < buffer.limit(),
|
||||
"Chunk offset cannot be beyond the limits of the buffer.");
|
||||
chunk.position(chunkId * chunkSize);
|
||||
// Min handles the case where the last block might be lesser than
|
||||
// chunk Size.
|
||||
chunk.limit(chunk.position() +
|
||||
Math.min(chunkSize, chunk.remaining() - (chunkId * chunkSize)));
|
||||
|
||||
// Schedule all the writes, this is a non-block call which returns
|
||||
// futures. We collect these futures and wait for all of them to
|
||||
// complete in the next line.
|
||||
writeFutures.add(writeChunkToContainer(chunk, 0, chunkSize));
|
||||
}
|
||||
return writeFutures;
|
||||
}
|
||||
// Nothing to do , return null.
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void flush() throws IOException {
|
||||
checkOpen();
|
||||
if (buffer.position() > 0) {
|
||||
int rollbackPosition = buffer.position();
|
||||
int rollbackLimit = buffer.limit();
|
||||
flushBufferToChunk(rollbackPosition, rollbackLimit);
|
||||
ByteBuffer chunk = buffer.duplicate();
|
||||
try {
|
||||
|
||||
ImmutablePair<CompletableFuture<ContainerProtos
|
||||
.ContainerCommandResponseProto>, ChunkInfo>
|
||||
result = writeChunkToContainer(chunk, 0, chunkSize);
|
||||
updateChunkInfo(result);
|
||||
buffer.clear();
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
buffer.position(rollbackPosition);
|
||||
buffer.limit(rollbackLimit);
|
||||
throw new IOException("Failure in flush", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -147,10 +248,20 @@ public class ChunkOutputStream extends OutputStream {
|
|||
buffer != null) {
|
||||
try {
|
||||
if (buffer.position() > 0) {
|
||||
writeChunkToContainer();
|
||||
// This flip is needed since this is the real buffer to which we
|
||||
// are writing and position will have moved each time we did a put.
|
||||
buffer.flip();
|
||||
|
||||
// Call get immediately to make this call Synchronous.
|
||||
|
||||
ImmutablePair<CompletableFuture<ContainerProtos
|
||||
.ContainerCommandResponseProto>, ChunkInfo>
|
||||
result = writeChunkToContainer(buffer, 0, buffer.limit());
|
||||
updateChunkInfo(result);
|
||||
buffer.clear();
|
||||
}
|
||||
putKey(xceiverClient, containerKeyData.build(), traceID);
|
||||
} catch (IOException e) {
|
||||
} catch (IOException | InterruptedException | ExecutionException e) {
|
||||
throw new IOException(
|
||||
"Unexpected Storage Container Exception: " + e.toString(), e);
|
||||
} finally {
|
||||
|
@ -163,6 +274,24 @@ public class ChunkOutputStream extends OutputStream {
|
|||
|
||||
}
|
||||
|
||||
private void updateChunkInfo(
|
||||
ImmutablePair<
|
||||
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>,
|
||||
ChunkInfo
|
||||
> result) throws InterruptedException, ExecutionException {
|
||||
// Wait for this call to complete.
|
||||
ContainerProtos.ContainerCommandResponseProto response =
|
||||
result.getLeft().get();
|
||||
|
||||
// If the write call to the chunk is successful, we need to add that
|
||||
// chunk information to the containerKeyData.
|
||||
// TODO: Clean up the garbage in case of failure.
|
||||
if(response.getResult() == SUCCESS) {
|
||||
ChunkInfo chunk = result.getRight();
|
||||
containerKeyData.addChunks(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the stream is open. If not, throws an exception.
|
||||
*
|
||||
|
@ -174,54 +303,36 @@ public class ChunkOutputStream extends OutputStream {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to flush buffered writes by writing a new chunk to the container.
|
||||
* If successful, then clears the buffer to prepare to receive writes for a
|
||||
* new chunk.
|
||||
*
|
||||
* @param rollbackPosition position to restore in buffer if write fails
|
||||
* @param rollbackLimit limit to restore in buffer if write fails
|
||||
* @throws IOException if there is an I/O error while performing the call
|
||||
*/
|
||||
private synchronized void flushBufferToChunk(int rollbackPosition,
|
||||
int rollbackLimit) throws IOException {
|
||||
boolean success = false;
|
||||
try {
|
||||
writeChunkToContainer();
|
||||
success = true;
|
||||
} finally {
|
||||
if (success) {
|
||||
buffer.clear();
|
||||
} else {
|
||||
buffer.position(rollbackPosition);
|
||||
buffer.limit(rollbackLimit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes buffered data as a new chunk to the container and saves chunk
|
||||
* information to be used later in putKey call.
|
||||
*
|
||||
* @throws IOException if there is an I/O error while performing the call
|
||||
* @param data -- Data to write.
|
||||
* @param offset - offset to the data buffer
|
||||
* @param len - Length in bytes
|
||||
* @return Returns a Immutable pair -- A future object that will contian
|
||||
* the result of the operation, and the chunkInfo that we wrote.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws ExecutionException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private synchronized void writeChunkToContainer() throws IOException {
|
||||
buffer.flip();
|
||||
ByteString data = ByteString.copyFrom(buffer);
|
||||
ChunkInfo chunk = ChunkInfo
|
||||
.newBuilder()
|
||||
.setChunkName(
|
||||
private ImmutablePair<
|
||||
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>,
|
||||
ChunkInfo>
|
||||
writeChunkToContainer(ByteBuffer data, int offset, int len)
|
||||
throws IOException, ExecutionException, InterruptedException {
|
||||
|
||||
|
||||
ByteString dataString = ByteString.copyFrom(data);
|
||||
ChunkInfo chunk = ChunkInfo.newBuilder().setChunkName(
|
||||
DigestUtils.md5Hex(key) + "_stream_"
|
||||
+ streamId + "_chunk_" + ++chunkIndex)
|
||||
+ streamId + "_chunk_" + Time.monotonicNowNanos())
|
||||
.setOffset(0)
|
||||
.setLen(data.size())
|
||||
.setLen(len)
|
||||
.build();
|
||||
try {
|
||||
writeChunk(xceiverClient, chunk, key, data, traceID);
|
||||
} catch (IOException e) {
|
||||
throw new IOException(
|
||||
"Unexpected Storage Container Exception: " + e.toString(), e);
|
||||
}
|
||||
containerKeyData.addChunks(chunk);
|
||||
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> response =
|
||||
writeChunk(xceiverClient, chunk, key, dataString, traceID);
|
||||
return new ImmutablePair(response, chunk);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,6 +53,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
|
|||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.apache.hadoop.scm.XceiverClientSpi;
|
||||
|
||||
/**
|
||||
|
@ -162,9 +165,10 @@ public final class ContainerProtocolCalls {
|
|||
* @param traceID container protocol call args
|
||||
* @throws IOException if there is an I/O error while performing the call
|
||||
*/
|
||||
public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
|
||||
String key, ByteString data, String traceID)
|
||||
throws IOException {
|
||||
public static CompletableFuture<ContainerCommandResponseProto> writeChunk(
|
||||
XceiverClientSpi xceiverClient, ChunkInfo chunk, String key,
|
||||
ByteString data, String traceID)
|
||||
throws IOException, ExecutionException, InterruptedException {
|
||||
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
|
||||
.newBuilder()
|
||||
.setPipeline(xceiverClient.getPipeline().getProtobufMessage())
|
||||
|
@ -179,8 +183,7 @@ public final class ContainerProtocolCalls {
|
|||
.setDatanodeID(id)
|
||||
.setWriteChunk(writeChunkRequest)
|
||||
.build();
|
||||
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
|
||||
validateContainerResponse(response);
|
||||
return xceiverClient.sendCommandAsync(request);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -67,6 +67,11 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
|
||||
|
||||
/**
|
||||
* A {@link StorageHandler} implementation that distributes object storage
|
||||
* across the nodes of an HDFS cluster.
|
||||
|
@ -86,6 +91,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
private final boolean useRatis;
|
||||
private final OzoneProtos.ReplicationType type;
|
||||
private final OzoneProtos.ReplicationFactor factor;
|
||||
private final long streamBufferSize;
|
||||
|
||||
/**
|
||||
* Creates a new DistributedStorageHandler.
|
||||
|
@ -127,6 +133,9 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
|
||||
chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
|
||||
}
|
||||
// streamBufferSize by default is set to default scm block size.
|
||||
streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB,
|
||||
OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -418,6 +427,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
.setRequestID(args.getRequestID())
|
||||
.setType(xceiverClientManager.getType())
|
||||
.setFactor(xceiverClientManager.getFactor())
|
||||
.setStreamBufferSize(streamBufferSize)
|
||||
.build();
|
||||
groupOutputStream.addPreallocateBlocks(
|
||||
openKey.getKeyInfo().getLatestVersionLocations(),
|
||||
|
|
|
@ -690,6 +690,15 @@
|
|||
Ozone block size.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ozone.output.stream.buffer.size.in.mb</name>
|
||||
<value>256</value>
|
||||
<tag>OZONE</tag>
|
||||
<description>
|
||||
The maximum size of the buffer allocated for the ozone output stream for
|
||||
write. Default size is equals to scm block size.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ozone.scm.chunk.size</name>
|
||||
<value>16777216</value>
|
||||
|
|
|
@ -1114,7 +1114,7 @@ public class TestKeySpaceManager {
|
|||
.getMetadataManager().getExpiredOpenKeys();
|
||||
Assert.assertEquals(0, openKeys.size());
|
||||
|
||||
Thread.sleep(2000);
|
||||
//Thread.sleep(2000);
|
||||
|
||||
openKeys = cluster.getKeySpaceManager().getMetadataManager()
|
||||
.getExpiredOpenKeys();
|
||||
|
|
Loading…
Reference in New Issue