HDFS-11846. Ozone: Fix Http connection leaks in ozone clients. Contributed by Weiwei Yang.
This commit is contained in:
parent
84294de9a2
commit
ff5dbeec07
|
@ -20,11 +20,16 @@ package org.apache.hadoop.ozone;
|
|||
import com.google.common.base.Optional;
|
||||
|
||||
import com.google.common.net.HostAndPort;
|
||||
import org.apache.avro.reflect.Nullable;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.client.methods.HttpRequestBase;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -556,4 +561,52 @@ public final class OzoneClientUtils {
|
|||
.DFS_CONTAINER_IPC_PORT_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases a http connection if the request is not null.
|
||||
* @param request
|
||||
*/
|
||||
public static void releaseConnection(HttpRequestBase request) {
|
||||
if (request != null) {
|
||||
request.releaseConnection();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a default instance of {@link CloseableHttpClient}.
|
||||
*/
|
||||
public static CloseableHttpClient newHttpClient() {
|
||||
return OzoneClientUtils.newHttpClient(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link CloseableHttpClient} configured by given configuration.
|
||||
* If conf is null, returns a default instance.
|
||||
*
|
||||
* @param conf configuration
|
||||
* @return a {@link CloseableHttpClient} instance.
|
||||
*/
|
||||
public static CloseableHttpClient newHttpClient(
|
||||
@Nullable Configuration conf) {
|
||||
int socketTimeout = OzoneConfigKeys
|
||||
.OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT;
|
||||
int connectionTimeout = OzoneConfigKeys
|
||||
.OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT;
|
||||
if (conf != null) {
|
||||
socketTimeout = conf.getInt(
|
||||
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS,
|
||||
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT);
|
||||
connectionTimeout = conf.getInt(
|
||||
OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS,
|
||||
OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT);
|
||||
}
|
||||
|
||||
CloseableHttpClient client = HttpClients.custom()
|
||||
.setDefaultRequestConfig(
|
||||
RequestConfig.custom()
|
||||
.setSocketTimeout(socketTimeout)
|
||||
.setConnectTimeout(connectionTimeout)
|
||||
.build())
|
||||
.build();
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,6 +68,13 @@ public final class OzoneConfigKeys {
|
|||
"ozone.container.task.wait.seconds";
|
||||
public static final long OZONE_CONTAINER_TASK_WAIT_DEFAULT = 5;
|
||||
|
||||
public static final String OZONE_CLIENT_SOCKET_TIMEOUT_MS =
|
||||
"ozone.client.socket.timeout.ms";
|
||||
public static final int OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT = 5000;
|
||||
public static final String OZONE_CLIENT_CONNECTION_TIMEOUT_MS =
|
||||
"ozone.client.connection.timeout.ms";
|
||||
public static final int OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT = 5000;
|
||||
|
||||
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
|
||||
= ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
|
||||
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.web.client;
|
|||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||
import org.apache.hadoop.ozone.web.headers.Header;
|
||||
|
@ -37,7 +39,6 @@ import org.apache.http.entity.ContentType;
|
|||
import org.apache.http.entity.FileEntity;
|
||||
import org.apache.http.entity.InputStreamEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
@ -156,19 +157,16 @@ public class OzoneBucket {
|
|||
throw new OzoneClientException("Invalid data.");
|
||||
}
|
||||
|
||||
try {
|
||||
OzoneClient client = getVolume().getClient();
|
||||
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
|
||||
HttpPut putRequest = null;
|
||||
InputStream is = null;
|
||||
try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
|
||||
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
|
||||
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
|
||||
+ "/" + keyName).build();
|
||||
|
||||
HttpPut putRequest =
|
||||
getVolume().getClient().getHttpPut(builder.toString());
|
||||
putRequest = getVolume().getClient().getHttpPut(builder.toString());
|
||||
|
||||
InputStream is = new ByteArrayInputStream(data.getBytes(ENCODING));
|
||||
is = new ByteArrayInputStream(data.getBytes(ENCODING));
|
||||
putRequest.setEntity(new InputStreamEntity(is, data.length()));
|
||||
is.mark(data.length());
|
||||
try {
|
||||
|
@ -177,9 +175,11 @@ public class OzoneBucket {
|
|||
is.reset();
|
||||
}
|
||||
executePutKey(putRequest, httpClient);
|
||||
|
||||
} catch (IOException | URISyntaxException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
IOUtils.closeStream(is);
|
||||
OzoneClientUtils.releaseConnection(putRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -216,28 +216,28 @@ public class OzoneBucket {
|
|||
throw new OzoneClientException("Invalid data stream");
|
||||
}
|
||||
|
||||
try {
|
||||
OzoneClient client = getVolume().getClient();
|
||||
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
HttpPut putRequest = null;
|
||||
FileInputStream fis = null;
|
||||
try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
|
||||
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
|
||||
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
|
||||
+ "/" + keyName).build();
|
||||
|
||||
HttpPut putRequest =
|
||||
getVolume().getClient().getHttpPut(builder.toString());
|
||||
putRequest = getVolume().getClient().getHttpPut(builder.toString());
|
||||
|
||||
FileEntity fileEntity = new FileEntity(file, ContentType
|
||||
.APPLICATION_OCTET_STREAM);
|
||||
putRequest.setEntity(fileEntity);
|
||||
|
||||
FileInputStream fis = new FileInputStream(file);
|
||||
fis = new FileInputStream(file);
|
||||
putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(fis));
|
||||
fis.close();
|
||||
executePutKey(putRequest, httpClient);
|
||||
|
||||
} catch (IOException | URISyntaxException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
IOUtils.closeStream(fis);
|
||||
OzoneClientUtils.releaseConnection(putRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -253,7 +253,6 @@ public class OzoneBucket {
|
|||
throws OzoneException, IOException {
|
||||
HttpEntity entity = null;
|
||||
try {
|
||||
|
||||
HttpResponse response = httpClient.execute(putRequest);
|
||||
int errorCode = response.getStatusLine().getStatusCode();
|
||||
entity = response.getEntity();
|
||||
|
@ -291,23 +290,23 @@ public class OzoneBucket {
|
|||
throw new OzoneClientException("Invalid download path");
|
||||
}
|
||||
|
||||
try {
|
||||
OzoneClient client = getVolume().getClient();
|
||||
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
FileOutputStream outPutFile = new FileOutputStream(downloadTo.toFile());
|
||||
FileOutputStream outPutFile = null;
|
||||
HttpGet getRequest = null;
|
||||
try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
|
||||
outPutFile = new FileOutputStream(downloadTo.toFile());
|
||||
|
||||
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
|
||||
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
|
||||
+ "/" + keyName).build();
|
||||
|
||||
HttpGet getRequest =
|
||||
getVolume().getClient().getHttpGet(builder.toString());
|
||||
getRequest = getVolume().getClient().getHttpGet(builder.toString());
|
||||
executeGetKey(getRequest, httpClient, outPutFile);
|
||||
outPutFile.flush();
|
||||
outPutFile.close();
|
||||
} catch (IOException | URISyntaxException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
IOUtils.closeStream(outPutFile);
|
||||
OzoneClientUtils.releaseConnection(getRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -324,22 +323,24 @@ public class OzoneBucket {
|
|||
throw new OzoneClientException("Invalid key Name");
|
||||
}
|
||||
|
||||
try {
|
||||
OzoneClient client = getVolume().getClient();
|
||||
ByteArrayOutputStream outPutStream = new ByteArrayOutputStream();
|
||||
HttpGet getRequest = null;
|
||||
ByteArrayOutputStream outPutStream = null;
|
||||
try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
|
||||
outPutStream = new ByteArrayOutputStream();
|
||||
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
|
||||
|
||||
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
|
||||
+ "/" + keyName).build();
|
||||
|
||||
HttpGet getRequest =
|
||||
getVolume().getClient().getHttpGet(builder.toString());
|
||||
getRequest = getVolume().getClient().getHttpGet(builder.toString());
|
||||
executeGetKey(getRequest, httpClient, outPutStream);
|
||||
return outPutStream.toString(ENCODING_NAME);
|
||||
} catch (IOException | URISyntaxException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
IOUtils.closeStream(outPutStream);
|
||||
OzoneClientUtils.releaseConnection(getRequest);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -393,19 +394,19 @@ public class OzoneBucket {
|
|||
throw new OzoneClientException("Invalid key Name");
|
||||
}
|
||||
|
||||
try {
|
||||
OzoneClient client = getVolume().getClient();
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
|
||||
HttpDelete deleteRequest = null;
|
||||
try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
|
||||
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
|
||||
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
|
||||
+ "/" + keyName).build();
|
||||
|
||||
HttpDelete deleteRequest =
|
||||
getVolume().getClient().getHttpDelete(builder.toString());
|
||||
deleteRequest = getVolume()
|
||||
.getClient().getHttpDelete(builder.toString());
|
||||
executeDeleteKey(deleteRequest, httpClient);
|
||||
} catch (IOException | URISyntaxException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(deleteRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -450,18 +451,20 @@ public class OzoneBucket {
|
|||
* @return List of OzoneKeys
|
||||
*/
|
||||
public List<OzoneKey> listKeys() throws OzoneException {
|
||||
try {
|
||||
HttpGet getRequest = null;
|
||||
try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
|
||||
OzoneClient client = getVolume().getClient();
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
|
||||
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName())
|
||||
.build();
|
||||
|
||||
HttpGet getRequest = client.getHttpGet(builder.toString());
|
||||
getRequest = client.getHttpGet(builder.toString());
|
||||
return executeListKeys(getRequest, httpClient);
|
||||
|
||||
} catch (IOException | URISyntaxException e) {
|
||||
throw new OzoneClientException(e.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(getRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.web.client;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||
import org.apache.hadoop.ozone.web.headers.Header;
|
||||
import org.apache.hadoop.ozone.web.response.ListVolumes;
|
||||
|
@ -32,7 +34,6 @@ import org.apache.http.client.methods.HttpPut;
|
|||
import org.apache.http.client.methods.HttpRequestBase;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
|
@ -87,7 +88,6 @@ public class OzoneClient implements Closeable {
|
|||
*/
|
||||
public URI getEndPointURI() {
|
||||
return endPointURI;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -142,8 +142,8 @@ public class OzoneClient implements Closeable {
|
|||
*/
|
||||
public OzoneVolume createVolume(String volumeName, String onBehalfOf,
|
||||
String quota) throws OzoneException {
|
||||
try {
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
HttpPost httpPost = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
OzoneUtils.verifyBucketName(volumeName);
|
||||
|
||||
URIBuilder builder = new URIBuilder(endPointURI);
|
||||
|
@ -152,11 +152,13 @@ public class OzoneClient implements Closeable {
|
|||
builder.setParameter(Header.OZONE_QUOTA_QUERY_TAG, quota);
|
||||
}
|
||||
|
||||
HttpPost httppost = getHttpPost(onBehalfOf, builder.build().toString());
|
||||
executeCreateVolume(httppost, httpClient);
|
||||
httpPost = getHttpPost(onBehalfOf, builder.build().toString());
|
||||
executeCreateVolume(httpPost, httpClient);
|
||||
return getVolume(volumeName);
|
||||
} catch (IOException | URISyntaxException | IllegalArgumentException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(httpPost);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -169,9 +171,8 @@ public class OzoneClient implements Closeable {
|
|||
* @throws OzoneException
|
||||
*/
|
||||
public OzoneVolume getVolume(String volumeName) throws OzoneException {
|
||||
try {
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
|
||||
HttpGet httpGet = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
OzoneUtils.verifyBucketName(volumeName);
|
||||
URIBuilder builder = new URIBuilder(endPointURI);
|
||||
builder.setPath("/" + volumeName)
|
||||
|
@ -179,11 +180,12 @@ public class OzoneClient implements Closeable {
|
|||
Header.OZONE_LIST_QUERY_VOLUME)
|
||||
.build();
|
||||
|
||||
HttpGet httpget = getHttpGet(builder.toString());
|
||||
return executeInfoVolume(httpget, httpClient);
|
||||
|
||||
httpGet = getHttpGet(builder.toString());
|
||||
return executeInfoVolume(httpGet, httpClient);
|
||||
} catch (IOException | URISyntaxException | IllegalArgumentException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(httpGet);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -205,9 +207,8 @@ public class OzoneClient implements Closeable {
|
|||
*/
|
||||
public List<OzoneVolume> listVolumes(String onBehalfOf, String prefix, int
|
||||
maxKeys, OzoneVolume prevKey) throws OzoneException {
|
||||
try {
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
|
||||
HttpGet httpGet = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
URIBuilder builder = new URIBuilder(endPointURI);
|
||||
if (prefix != null) {
|
||||
builder.addParameter(Header.OZONE_LIST_QUERY_PREFIX, prefix);
|
||||
|
@ -225,14 +226,15 @@ public class OzoneClient implements Closeable {
|
|||
|
||||
builder.setPath("/").build();
|
||||
|
||||
HttpGet httpget = getHttpGet(builder.toString());
|
||||
httpGet = getHttpGet(builder.toString());
|
||||
if (onBehalfOf != null) {
|
||||
httpget.addHeader(Header.OZONE_USER, onBehalfOf);
|
||||
httpGet.addHeader(Header.OZONE_USER, onBehalfOf);
|
||||
}
|
||||
return executeListVolume(httpget, httpClient);
|
||||
|
||||
return executeListVolume(httpGet, httpClient);
|
||||
} catch (IOException | URISyntaxException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(httpGet);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -261,9 +263,8 @@ public class OzoneClient implements Closeable {
|
|||
*/
|
||||
public List<OzoneVolume> listAllVolumes(String prefix, int maxKeys,
|
||||
OzoneVolume prevKey) throws OzoneException {
|
||||
try {
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
|
||||
HttpGet httpGet = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
URIBuilder builder = new URIBuilder(endPointURI);
|
||||
if (prefix != null) {
|
||||
builder.addParameter(Header.OZONE_LIST_QUERY_PREFIX, prefix);
|
||||
|
@ -281,11 +282,13 @@ public class OzoneClient implements Closeable {
|
|||
|
||||
builder.addParameter(Header.OZONE_LIST_QUERY_ROOTSCAN, "true");
|
||||
builder.setPath("/").build();
|
||||
HttpGet httpget = getHttpGet(builder.toString());
|
||||
return executeListVolume(httpget, httpClient);
|
||||
httpGet = getHttpGet(builder.toString());
|
||||
return executeListVolume(httpGet, httpClient);
|
||||
|
||||
} catch (IOException | URISyntaxException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(httpGet);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -296,17 +299,18 @@ public class OzoneClient implements Closeable {
|
|||
* @throws OzoneException - Ozone Exception
|
||||
*/
|
||||
public void deleteVolume(String volumeName) throws OzoneException {
|
||||
try {
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
|
||||
HttpDelete httpDelete = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
OzoneUtils.verifyBucketName(volumeName);
|
||||
URIBuilder builder = new URIBuilder(endPointURI);
|
||||
builder.setPath("/" + volumeName).build();
|
||||
|
||||
HttpDelete httpdelete = getHttpDelete(builder.toString());
|
||||
executeDeleteVolume(httpdelete, httpClient);
|
||||
httpDelete = getHttpDelete(builder.toString());
|
||||
executeDeleteVolume(httpDelete, httpClient);
|
||||
} catch (IOException | URISyntaxException | IllegalArgumentException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(httpDelete);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -319,23 +323,23 @@ public class OzoneClient implements Closeable {
|
|||
*/
|
||||
public void setVolumeOwner(String volumeName, String newOwner)
|
||||
throws OzoneException {
|
||||
|
||||
HttpPut putRequest = null;
|
||||
if (newOwner == null || newOwner.isEmpty()) {
|
||||
throw new OzoneClientException("Invalid new owner name");
|
||||
}
|
||||
try {
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
OzoneUtils.verifyBucketName(volumeName);
|
||||
URIBuilder builder = new URIBuilder(endPointURI);
|
||||
builder.setPath("/" + volumeName).build();
|
||||
|
||||
HttpPut putRequest = getHttpPut(builder.toString());
|
||||
putRequest = getHttpPut(builder.toString());
|
||||
putRequest.addHeader(Header.OZONE_USER, newOwner);
|
||||
executePutVolume(putRequest, httpClient);
|
||||
|
||||
} catch (URISyntaxException | IllegalArgumentException | IOException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(putRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -354,20 +358,21 @@ public class OzoneClient implements Closeable {
|
|||
if (quota == null || quota.isEmpty()) {
|
||||
throw new OzoneClientException("Invalid quota");
|
||||
}
|
||||
try {
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
|
||||
HttpPut putRequest = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
OzoneUtils.verifyBucketName(volumeName);
|
||||
URIBuilder builder = new URIBuilder(endPointURI);
|
||||
builder.setPath("/" + volumeName)
|
||||
.setParameter(Header.OZONE_QUOTA_QUERY_TAG, quota)
|
||||
.build();
|
||||
|
||||
HttpPut putRequest = getHttpPut(builder.toString());
|
||||
putRequest = getHttpPut(builder.toString());
|
||||
executePutVolume(putRequest, httpClient);
|
||||
|
||||
} catch (URISyntaxException | IllegalArgumentException | IOException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(putRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -380,7 +385,7 @@ public class OzoneClient implements Closeable {
|
|||
* @throws OzoneException
|
||||
*/
|
||||
private void executeCreateVolume(HttpPost httppost,
|
||||
CloseableHttpClient httpClient)
|
||||
final CloseableHttpClient httpClient)
|
||||
throws IOException, OzoneException {
|
||||
HttpEntity entity = null;
|
||||
try {
|
||||
|
@ -413,7 +418,7 @@ public class OzoneClient implements Closeable {
|
|||
* @throws OzoneException
|
||||
*/
|
||||
private OzoneVolume executeInfoVolume(HttpGet httpGet,
|
||||
CloseableHttpClient httpClient)
|
||||
final CloseableHttpClient httpClient)
|
||||
throws IOException, OzoneException {
|
||||
HttpEntity entity = null;
|
||||
try {
|
||||
|
@ -447,7 +452,7 @@ public class OzoneClient implements Closeable {
|
|||
* @throws OzoneException
|
||||
*/
|
||||
private void executePutVolume(HttpPut putRequest,
|
||||
CloseableHttpClient httpClient)
|
||||
final CloseableHttpClient httpClient)
|
||||
throws IOException, OzoneException {
|
||||
HttpEntity entity = null;
|
||||
try {
|
||||
|
@ -473,7 +478,7 @@ public class OzoneClient implements Closeable {
|
|||
* @throws OzoneException
|
||||
*/
|
||||
private List<OzoneVolume> executeListVolume(HttpGet httpGet,
|
||||
CloseableHttpClient httpClient)
|
||||
final CloseableHttpClient httpClient)
|
||||
throws IOException, OzoneException {
|
||||
HttpEntity entity = null;
|
||||
List<OzoneVolume> volList = new LinkedList<>();
|
||||
|
@ -514,7 +519,7 @@ public class OzoneClient implements Closeable {
|
|||
* @throws OzoneException
|
||||
*/
|
||||
private void executeDeleteVolume(HttpDelete httpDelete,
|
||||
CloseableHttpClient httpClient)
|
||||
final CloseableHttpClient httpClient)
|
||||
throws IOException, OzoneException {
|
||||
HttpEntity entity = null;
|
||||
try {
|
||||
|
@ -540,12 +545,12 @@ public class OzoneClient implements Closeable {
|
|||
* @return HttpPost
|
||||
*/
|
||||
public HttpPost getHttpPost(String onBehalfOf, String uriString) {
|
||||
HttpPost httppost = new HttpPost(uriString);
|
||||
addOzoneHeaders(httppost);
|
||||
HttpPost httpPost = new HttpPost(uriString);
|
||||
addOzoneHeaders(httpPost);
|
||||
if (onBehalfOf != null) {
|
||||
httppost.addHeader(Header.OZONE_USER, onBehalfOf);
|
||||
httpPost.addHeader(Header.OZONE_USER, onBehalfOf);
|
||||
}
|
||||
return httppost;
|
||||
return httpPost;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -555,9 +560,9 @@ public class OzoneClient implements Closeable {
|
|||
* @return HttpGet
|
||||
*/
|
||||
public HttpGet getHttpGet(String uriString) {
|
||||
HttpGet httpget = new HttpGet(uriString);
|
||||
addOzoneHeaders(httpget);
|
||||
return httpget;
|
||||
HttpGet httpGet = new HttpGet(uriString);
|
||||
addOzoneHeaders(httpGet);
|
||||
return httpGet;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -615,4 +620,9 @@ public class OzoneClient implements Closeable {
|
|||
// TODO : Currently we create a new HTTP client. We should switch
|
||||
// This to a Pool and cleanup the pool here.
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public CloseableHttpClient newHttpClient() {
|
||||
return OzoneClientUtils.newHttpClient();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.ozone.web.client;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||
import org.apache.hadoop.ozone.web.headers.Header;
|
||||
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
||||
|
@ -35,7 +37,6 @@ import org.apache.http.client.methods.HttpPost;
|
|||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -164,26 +165,28 @@ public class OzoneVolume {
|
|||
OzoneConsts.Versioning versioning)
|
||||
throws OzoneException {
|
||||
|
||||
try {
|
||||
HttpPost httpPost = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
OzoneUtils.verifyBucketName(bucketName);
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
|
||||
builder.setPath("/" + getVolumeName() + "/" + bucketName).build();
|
||||
|
||||
HttpPost httppost = client.getHttpPost(null, builder.toString());
|
||||
httpPost = client.getHttpPost(null, builder.toString());
|
||||
if (acls != null) {
|
||||
for (String acl : acls) {
|
||||
httppost
|
||||
httpPost
|
||||
.addHeader(Header.OZONE_ACLS, Header.OZONE_ACL_ADD + " " + acl);
|
||||
}
|
||||
}
|
||||
|
||||
httppost.addHeader(Header.OZONE_STORAGE_TYPE, storageType.toString());
|
||||
httppost.addHeader(Header.OZONE_BUCKET_VERSIONING, versioning.toString());
|
||||
executeCreateBucket(httppost, httpClient);
|
||||
httpPost.addHeader(Header.OZONE_STORAGE_TYPE, storageType.toString());
|
||||
httpPost.addHeader(Header.OZONE_BUCKET_VERSIONING, versioning.toString());
|
||||
executeCreateBucket(httpPost, httpClient);
|
||||
return getBucket(bucketName);
|
||||
} catch (IOException | URISyntaxException | IllegalArgumentException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(httpPost);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -226,7 +229,6 @@ public class OzoneVolume {
|
|||
* @throws OzoneException
|
||||
*/
|
||||
public OzoneBucket createBucket(String bucketName) throws OzoneException {
|
||||
|
||||
return createBucket(bucketName, null, StorageType.DEFAULT,
|
||||
OzoneConsts.Versioning.DISABLED);
|
||||
}
|
||||
|
@ -273,13 +275,12 @@ public class OzoneVolume {
|
|||
* @throws OzoneException
|
||||
*/
|
||||
public void addAcls(String bucketName, String[] acls) throws OzoneException {
|
||||
|
||||
try {
|
||||
HttpPut putRequest = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
OzoneUtils.verifyBucketName(bucketName);
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
|
||||
builder.setPath("/" + getVolumeName() + "/" + bucketName).build();
|
||||
HttpPut putRequest = client.getHttpPut(builder.toString());
|
||||
putRequest = client.getHttpPut(builder.toString());
|
||||
|
||||
for (String acl : acls) {
|
||||
putRequest
|
||||
|
@ -288,6 +289,8 @@ public class OzoneVolume {
|
|||
executePutBucket(putRequest, httpClient);
|
||||
} catch (URISyntaxException | IOException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(putRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -301,12 +304,12 @@ public class OzoneVolume {
|
|||
*/
|
||||
public void removeAcls(String bucketName, String[] acls)
|
||||
throws OzoneException {
|
||||
try {
|
||||
HttpPut putRequest = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
OzoneUtils.verifyBucketName(bucketName);
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
|
||||
builder.setPath("/" + getVolumeName() + "/" + bucketName).build();
|
||||
HttpPut putRequest = client.getHttpPut(builder.toString());
|
||||
putRequest = client.getHttpPut(builder.toString());
|
||||
|
||||
for (String acl : acls) {
|
||||
putRequest
|
||||
|
@ -315,6 +318,8 @@ public class OzoneVolume {
|
|||
executePutBucket(putRequest, httpClient);
|
||||
} catch (URISyntaxException | IOException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(putRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -326,18 +331,20 @@ public class OzoneVolume {
|
|||
* @return OZoneBucket
|
||||
*/
|
||||
public OzoneBucket getBucket(String bucketName) throws OzoneException {
|
||||
try {
|
||||
HttpGet getRequest = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
OzoneUtils.verifyBucketName(bucketName);
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
|
||||
builder.setPath("/" + getVolumeName() + "/" + bucketName)
|
||||
.setParameter(Header.OZONE_LIST_QUERY_TAG,
|
||||
Header.OZONE_LIST_QUERY_BUCKET).build();
|
||||
HttpGet getRequest = client.getHttpGet(builder.toString());
|
||||
getRequest = client.getHttpGet(builder.toString());
|
||||
return executeInfoBucket(getRequest, httpClient);
|
||||
|
||||
} catch (IOException | URISyntaxException | IllegalArgumentException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(getRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -422,17 +429,18 @@ public class OzoneVolume {
|
|||
* @throws OzoneException
|
||||
*/
|
||||
public List<OzoneBucket> listBuckets() throws OzoneException {
|
||||
try {
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
|
||||
HttpGet getRequest = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
|
||||
builder.setPath("/" + getVolumeName()).build();
|
||||
|
||||
HttpGet getRequest = client.getHttpGet(builder.toString());
|
||||
getRequest = client.getHttpGet(builder.toString());
|
||||
return executeListBuckets(getRequest, httpClient);
|
||||
|
||||
} catch (IOException | URISyntaxException e) {
|
||||
throw new OzoneClientException(e.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(getRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -488,17 +496,19 @@ public class OzoneVolume {
|
|||
* @throws OzoneException
|
||||
*/
|
||||
public void deleteBucket(String bucketName) throws OzoneException {
|
||||
try {
|
||||
HttpDelete delRequest = null;
|
||||
try (CloseableHttpClient httpClient = newHttpClient()) {
|
||||
OzoneUtils.verifyBucketName(bucketName);
|
||||
CloseableHttpClient httpClient = HttpClients.createDefault();
|
||||
URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
|
||||
builder.setPath("/" + getVolumeName() + "/" + bucketName).build();
|
||||
|
||||
HttpDelete delRequest = client.getHttpDelete(builder.toString());
|
||||
delRequest = client.getHttpDelete(builder.toString());
|
||||
executeDeleteBucket(delRequest, httpClient);
|
||||
|
||||
} catch (IOException | URISyntaxException | IllegalArgumentException ex) {
|
||||
throw new OzoneClientException(ex.getMessage());
|
||||
} finally {
|
||||
OzoneClientUtils.releaseConnection(delRequest);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -536,4 +546,9 @@ public class OzoneVolume {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public CloseableHttpClient newHttpClient() {
|
||||
return OzoneClientUtils.newHttpClient();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,28 +22,36 @@ import org.apache.commons.io.FileUtils;
|
|||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.http.client.methods.HttpUriRequest;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class TestVolume {
|
||||
private static MiniOzoneCluster cluster = null;
|
||||
|
@ -90,10 +98,15 @@ public class TestVolume {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testCreateVolume() throws OzoneException {
|
||||
public void testCreateVolume() throws OzoneException, IOException {
|
||||
String volumeName = OzoneUtils.getRequestID().toLowerCase();
|
||||
client.setUserAuth(OzoneConsts.OZONE_SIMPLE_HDFS_USER);
|
||||
OzoneVolume vol = client.createVolume(volumeName, "bilbo", "100TB");
|
||||
|
||||
OzoneClient mockClient = Mockito.spy(client);
|
||||
List<CloseableHttpClient> mockedClients = mockHttpClients(mockClient);
|
||||
OzoneVolume vol = mockClient.createVolume(volumeName, "bilbo", "100TB");
|
||||
// Verify http clients are properly closed.
|
||||
verifyHttpConnectionClosed(mockedClients);
|
||||
|
||||
assertEquals(vol.getVolumeName(), volumeName);
|
||||
assertEquals(vol.getCreatedby(), "hdfs");
|
||||
|
@ -210,4 +223,80 @@ public class TestVolume {
|
|||
// be volumes created by other tests too. So we should get more page counts.
|
||||
Assert.assertEquals(volCount / step , pagecount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of mocked {@link CloseableHttpClient} used for testing.
|
||||
* The mocked client replaces the actual calls in
|
||||
* {@link OzoneClient#newHttpClient()}, it is used to verify
|
||||
* if the invocation of this client is expected. <b>Note</b>, the output
|
||||
* of this method is always used as the input of
|
||||
* {@link TestVolume#verifyHttpConnectionClosed(List)}.
|
||||
*
|
||||
* @param ozoneClient mocked ozone client.
|
||||
* @return a list of mocked {@link CloseableHttpClient}.
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<CloseableHttpClient> mockHttpClients(OzoneClient ozoneClient)
|
||||
throws IOException {
|
||||
List<CloseableHttpClient> spyHttpClients = new ArrayList<>();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
CloseableHttpClient spyHttpClient = Mockito
|
||||
.spy(OzoneClientUtils.newHttpClient());
|
||||
spyHttpClients.add(spyHttpClient);
|
||||
}
|
||||
|
||||
List<CloseableHttpClient> nextReturns =
|
||||
new ArrayList<>(spyHttpClients.subList(1, spyHttpClients.size()));
|
||||
Mockito.when(ozoneClient.newHttpClient()).thenReturn(
|
||||
spyHttpClients.get(0),
|
||||
nextReturns.toArray(new CloseableHttpClient[nextReturns.size()]));
|
||||
return spyHttpClients;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used together with
|
||||
* {@link TestVolume#mockHttpClients(OzoneClient)} to verify
|
||||
* if the http client is properly closed. It verifies that as long as
|
||||
* a client calls {@link CloseableHttpClient#execute(HttpUriRequest)} to
|
||||
* send request, then it must calls {@link CloseableHttpClient#close()}
|
||||
* close the http connection.
|
||||
*
|
||||
* @param mockedHttpClients
|
||||
*/
|
||||
private void verifyHttpConnectionClosed(
|
||||
List<CloseableHttpClient> mockedHttpClients) {
|
||||
final AtomicInteger totalCalled = new AtomicInteger();
|
||||
Assert.assertTrue(mockedHttpClients.stream().allMatch(
|
||||
closeableHttpClient -> {
|
||||
boolean clientUsed = false;
|
||||
try {
|
||||
verify(closeableHttpClient, times(1))
|
||||
.execute(Mockito.any());
|
||||
totalCalled.incrementAndGet();
|
||||
clientUsed = true;
|
||||
} catch (Throwable e) {
|
||||
// There might be some redundant instances in mockedHttpClients,
|
||||
// it is allowed that a client is not used.
|
||||
return true;
|
||||
}
|
||||
|
||||
if (clientUsed) {
|
||||
try {
|
||||
// If a client is used, ensure the close function is called.
|
||||
verify(closeableHttpClient,
|
||||
times(1)).close();
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}));
|
||||
System.out.println("Successful connections "
|
||||
+ totalCalled.get());
|
||||
Assert.assertTrue(
|
||||
"The mocked http client should be called at least once.",
|
||||
totalCalled.get() > 0);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue