HDFS-12178. Ozone: OzoneClient: Handling SCM container creationFlag at client side. Contributed by Nandakumar.

This commit is contained in:
Anu Engineer 2017-08-04 10:17:45 -07:00 committed by Owen O'Malley
parent 37a0ab5ca3
commit ccc1607424
1 changed files with 28 additions and 7 deletions

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.web.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.scm.XceiverClientManager;
@ -34,6 +35,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
/**
* Maintaining a list of ChunkInputStream. Write based on offset.
@ -55,6 +58,10 @@ public class ChunkGroupOutputStream extends OutputStream {
private long totalSize;
private long byteOffset;
//This has to be removed once HDFS-11888 is resolved.
//local cache which will have list of created container names.
private static Set<String> containersCreated = new HashSet<>();
public ChunkGroupOutputStream() {
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
@ -285,13 +292,27 @@ public class ChunkGroupOutputStream extends OutputStream {
xceiverClientManager.acquireClient(pipeline);
// create container if needed
// TODO : should be subKeyInfo.getShouldCreateContainer(), but for now
// always true.
boolean shouldCreate = true;
if (shouldCreate) {
try {
ContainerProtocolCalls.createContainer(xceiverClient, requestId);
} catch (StorageContainerException sce) {
LOG.warn("Create container failed with {}", containerName, sce);
//The following change has to reverted once HDFS-11888 is fixed.
if(!containersCreated.contains(containerName)) {
synchronized (containerName.intern()) {
//checking again, there is a chance that some other thread has
// created it.
if (!containersCreated.contains(containerName)) {
LOG.debug("Need to create container {}.", containerName);
try {
ContainerProtocolCalls.createContainer(xceiverClient, requestId);
} catch (StorageContainerException ex) {
if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
//container already exist.
LOG.debug("Container {} already exists.", containerName);
} else {
LOG.error("Container creation failed for {}.",
containerName, ex);
throw ex;
}
}
containersCreated.add(containerName);
}
}
}