HDFS-11846. Ozone: Fix Http connection leaks in ozone clients. Contributed by Weiwei Yang.

This commit is contained in:
Xiaoyu Yao 2017-05-24 12:34:37 -07:00
parent 41d5a45e59
commit 67da8be745
6 changed files with 299 additions and 122 deletions

View File

@ -20,11 +20,16 @@ package org.apache.hadoop.ozone;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.net.HostAndPort; import com.google.common.net.HostAndPort;
import org.apache.avro.reflect.Nullable;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -556,4 +561,52 @@ public final class OzoneClientUtils {
.DFS_CONTAINER_IPC_PORT_DEFAULT); .DFS_CONTAINER_IPC_PORT_DEFAULT);
} }
/**
* Releases a http connection if the request is not null.
* @param request
*/
public static void releaseConnection(HttpRequestBase request) {
if (request != null) {
request.releaseConnection();
}
}
/**
* @return a default instance of {@link CloseableHttpClient}.
*/
public static CloseableHttpClient newHttpClient() {
return OzoneClientUtils.newHttpClient(null);
}
/**
* Returns a {@link CloseableHttpClient} configured by given configuration.
* If conf is null, returns a default instance.
*
* @param conf configuration
* @return a {@link CloseableHttpClient} instance.
*/
public static CloseableHttpClient newHttpClient(
@Nullable Configuration conf) {
int socketTimeout = OzoneConfigKeys
.OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT;
int connectionTimeout = OzoneConfigKeys
.OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT;
if (conf != null) {
socketTimeout = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS,
OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT);
connectionTimeout = conf.getInt(
OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS,
OzoneConfigKeys.OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT);
}
CloseableHttpClient client = HttpClients.custom()
.setDefaultRequestConfig(
RequestConfig.custom()
.setSocketTimeout(socketTimeout)
.setConnectTimeout(connectionTimeout)
.build())
.build();
return client;
}
} }

View File

@ -68,6 +68,13 @@ public final class OzoneConfigKeys {
"ozone.container.task.wait.seconds"; "ozone.container.task.wait.seconds";
public static final long OZONE_CONTAINER_TASK_WAIT_DEFAULT = 5; public static final long OZONE_CONTAINER_TASK_WAIT_DEFAULT = 5;
public static final String OZONE_CLIENT_SOCKET_TIMEOUT_MS =
"ozone.client.socket.timeout.ms";
public static final int OZONE_CLIENT_SOCKET_TIMEOUT_MS_DEFAULT = 5000;
public static final String OZONE_CLIENT_CONNECTION_TIMEOUT_MS =
"ozone.client.connection.timeout.ms";
public static final int OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT = 5000;
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY; = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.web.client;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.headers.Header; import org.apache.hadoop.ozone.web.headers.Header;
@ -37,7 +39,6 @@ import org.apache.http.entity.ContentType;
import org.apache.http.entity.FileEntity; import org.apache.http.entity.FileEntity;
import org.apache.http.entity.InputStreamEntity; import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -156,19 +157,16 @@ public class OzoneBucket {
throw new OzoneClientException("Invalid data."); throw new OzoneClientException("Invalid data.");
} }
try { HttpPut putRequest = null;
OzoneClient client = getVolume().getClient(); InputStream is = null;
try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
CloseableHttpClient httpClient = HttpClients.createDefault();
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName() builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+ "/" + keyName).build(); + "/" + keyName).build();
HttpPut putRequest = putRequest = getVolume().getClient().getHttpPut(builder.toString());
getVolume().getClient().getHttpPut(builder.toString());
InputStream is = new ByteArrayInputStream(data.getBytes(ENCODING)); is = new ByteArrayInputStream(data.getBytes(ENCODING));
putRequest.setEntity(new InputStreamEntity(is, data.length())); putRequest.setEntity(new InputStreamEntity(is, data.length()));
is.mark(data.length()); is.mark(data.length());
try { try {
@ -177,9 +175,11 @@ public class OzoneBucket {
is.reset(); is.reset();
} }
executePutKey(putRequest, httpClient); executePutKey(putRequest, httpClient);
} catch (IOException | URISyntaxException ex) { } catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
IOUtils.closeStream(is);
OzoneClientUtils.releaseConnection(putRequest);
} }
} }
@ -216,28 +216,28 @@ public class OzoneBucket {
throw new OzoneClientException("Invalid data stream"); throw new OzoneClientException("Invalid data stream");
} }
try { HttpPut putRequest = null;
OzoneClient client = getVolume().getClient(); FileInputStream fis = null;
try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
CloseableHttpClient httpClient = HttpClients.createDefault();
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName() builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+ "/" + keyName).build(); + "/" + keyName).build();
HttpPut putRequest = putRequest = getVolume().getClient().getHttpPut(builder.toString());
getVolume().getClient().getHttpPut(builder.toString());
FileEntity fileEntity = new FileEntity(file, ContentType FileEntity fileEntity = new FileEntity(file, ContentType
.APPLICATION_OCTET_STREAM); .APPLICATION_OCTET_STREAM);
putRequest.setEntity(fileEntity); putRequest.setEntity(fileEntity);
FileInputStream fis = new FileInputStream(file); fis = new FileInputStream(file);
putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(fis)); putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(fis));
fis.close();
executePutKey(putRequest, httpClient); executePutKey(putRequest, httpClient);
} catch (IOException | URISyntaxException ex) { } catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
IOUtils.closeStream(fis);
OzoneClientUtils.releaseConnection(putRequest);
} }
} }
@ -253,7 +253,6 @@ public class OzoneBucket {
throws OzoneException, IOException { throws OzoneException, IOException {
HttpEntity entity = null; HttpEntity entity = null;
try { try {
HttpResponse response = httpClient.execute(putRequest); HttpResponse response = httpClient.execute(putRequest);
int errorCode = response.getStatusLine().getStatusCode(); int errorCode = response.getStatusLine().getStatusCode();
entity = response.getEntity(); entity = response.getEntity();
@ -291,23 +290,23 @@ public class OzoneBucket {
throw new OzoneClientException("Invalid download path"); throw new OzoneClientException("Invalid download path");
} }
try { FileOutputStream outPutFile = null;
OzoneClient client = getVolume().getClient(); HttpGet getRequest = null;
try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
CloseableHttpClient httpClient = HttpClients.createDefault(); outPutFile = new FileOutputStream(downloadTo.toFile());
FileOutputStream outPutFile = new FileOutputStream(downloadTo.toFile());
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName() builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+ "/" + keyName).build(); + "/" + keyName).build();
HttpGet getRequest = getRequest = getVolume().getClient().getHttpGet(builder.toString());
getVolume().getClient().getHttpGet(builder.toString());
executeGetKey(getRequest, httpClient, outPutFile); executeGetKey(getRequest, httpClient, outPutFile);
outPutFile.flush(); outPutFile.flush();
outPutFile.close();
} catch (IOException | URISyntaxException ex) { } catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
IOUtils.closeStream(outPutFile);
OzoneClientUtils.releaseConnection(getRequest);
} }
} }
@ -324,22 +323,24 @@ public class OzoneBucket {
throw new OzoneClientException("Invalid key Name"); throw new OzoneClientException("Invalid key Name");
} }
try { HttpGet getRequest = null;
OzoneClient client = getVolume().getClient(); ByteArrayOutputStream outPutStream = null;
ByteArrayOutputStream outPutStream = new ByteArrayOutputStream(); try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
outPutStream = new ByteArrayOutputStream();
CloseableHttpClient httpClient = HttpClients.createDefault();
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName() builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+ "/" + keyName).build(); + "/" + keyName).build();
HttpGet getRequest = getRequest = getVolume().getClient().getHttpGet(builder.toString());
getVolume().getClient().getHttpGet(builder.toString());
executeGetKey(getRequest, httpClient, outPutStream); executeGetKey(getRequest, httpClient, outPutStream);
return outPutStream.toString(ENCODING_NAME); return outPutStream.toString(ENCODING_NAME);
} catch (IOException | URISyntaxException ex) { } catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
IOUtils.closeStream(outPutStream);
OzoneClientUtils.releaseConnection(getRequest);
} }
} }
@ -393,19 +394,19 @@ public class OzoneBucket {
throw new OzoneClientException("Invalid key Name"); throw new OzoneClientException("Invalid key Name");
} }
try { HttpDelete deleteRequest = null;
OzoneClient client = getVolume().getClient(); try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
CloseableHttpClient httpClient = HttpClients.createDefault();
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName() builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+ "/" + keyName).build(); + "/" + keyName).build();
HttpDelete deleteRequest = deleteRequest = getVolume()
getVolume().getClient().getHttpDelete(builder.toString()); .getClient().getHttpDelete(builder.toString());
executeDeleteKey(deleteRequest, httpClient); executeDeleteKey(deleteRequest, httpClient);
} catch (IOException | URISyntaxException ex) { } catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(deleteRequest);
} }
} }
@ -450,18 +451,20 @@ public class OzoneBucket {
* @return List of OzoneKeys * @return List of OzoneKeys
*/ */
public List<OzoneKey> listKeys() throws OzoneException { public List<OzoneKey> listKeys() throws OzoneException {
try { HttpGet getRequest = null;
try (CloseableHttpClient httpClient = OzoneClientUtils.newHttpClient()) {
OzoneClient client = getVolume().getClient(); OzoneClient client = getVolume().getClient();
CloseableHttpClient httpClient = HttpClients.createDefault();
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()) builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName())
.build(); .build();
HttpGet getRequest = client.getHttpGet(builder.toString()); getRequest = client.getHttpGet(builder.toString());
return executeListKeys(getRequest, httpClient); return executeListKeys(getRequest, httpClient);
} catch (IOException | URISyntaxException e) { } catch (IOException | URISyntaxException e) {
throw new OzoneClientException(e.getMessage()); throw new OzoneClientException(e.getMessage());
} finally {
OzoneClientUtils.releaseConnection(getRequest);
} }
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.ozone.web.client; package org.apache.hadoop.ozone.web.client;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.headers.Header; import org.apache.hadoop.ozone.web.headers.Header;
import org.apache.hadoop.ozone.web.response.ListVolumes; import org.apache.hadoop.ozone.web.response.ListVolumes;
@ -32,7 +34,6 @@ import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder; import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.HttpHeaders;
@ -87,7 +88,6 @@ public class OzoneClient implements Closeable {
*/ */
public URI getEndPointURI() { public URI getEndPointURI() {
return endPointURI; return endPointURI;
} }
/** /**
@ -142,8 +142,8 @@ public class OzoneClient implements Closeable {
*/ */
public OzoneVolume createVolume(String volumeName, String onBehalfOf, public OzoneVolume createVolume(String volumeName, String onBehalfOf,
String quota) throws OzoneException { String quota) throws OzoneException {
try { HttpPost httpPost = null;
CloseableHttpClient httpClient = HttpClients.createDefault(); try (CloseableHttpClient httpClient = newHttpClient()) {
OzoneUtils.verifyBucketName(volumeName); OzoneUtils.verifyBucketName(volumeName);
URIBuilder builder = new URIBuilder(endPointURI); URIBuilder builder = new URIBuilder(endPointURI);
@ -152,11 +152,13 @@ public class OzoneClient implements Closeable {
builder.setParameter(Header.OZONE_QUOTA_QUERY_TAG, quota); builder.setParameter(Header.OZONE_QUOTA_QUERY_TAG, quota);
} }
HttpPost httppost = getHttpPost(onBehalfOf, builder.build().toString()); httpPost = getHttpPost(onBehalfOf, builder.build().toString());
executeCreateVolume(httppost, httpClient); executeCreateVolume(httpPost, httpClient);
return getVolume(volumeName); return getVolume(volumeName);
} catch (IOException | URISyntaxException | IllegalArgumentException ex) { } catch (IOException | URISyntaxException | IllegalArgumentException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(httpPost);
} }
} }
@ -169,9 +171,8 @@ public class OzoneClient implements Closeable {
* @throws OzoneException * @throws OzoneException
*/ */
public OzoneVolume getVolume(String volumeName) throws OzoneException { public OzoneVolume getVolume(String volumeName) throws OzoneException {
try { HttpGet httpGet = null;
CloseableHttpClient httpClient = HttpClients.createDefault(); try (CloseableHttpClient httpClient = newHttpClient()) {
OzoneUtils.verifyBucketName(volumeName); OzoneUtils.verifyBucketName(volumeName);
URIBuilder builder = new URIBuilder(endPointURI); URIBuilder builder = new URIBuilder(endPointURI);
builder.setPath("/" + volumeName) builder.setPath("/" + volumeName)
@ -179,11 +180,12 @@ public class OzoneClient implements Closeable {
Header.OZONE_LIST_QUERY_VOLUME) Header.OZONE_LIST_QUERY_VOLUME)
.build(); .build();
HttpGet httpget = getHttpGet(builder.toString()); httpGet = getHttpGet(builder.toString());
return executeInfoVolume(httpget, httpClient); return executeInfoVolume(httpGet, httpClient);
} catch (IOException | URISyntaxException | IllegalArgumentException ex) { } catch (IOException | URISyntaxException | IllegalArgumentException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(httpGet);
} }
} }
@ -205,9 +207,8 @@ public class OzoneClient implements Closeable {
*/ */
public List<OzoneVolume> listVolumes(String onBehalfOf, String prefix, int public List<OzoneVolume> listVolumes(String onBehalfOf, String prefix, int
maxKeys, OzoneVolume prevKey) throws OzoneException { maxKeys, OzoneVolume prevKey) throws OzoneException {
try { HttpGet httpGet = null;
CloseableHttpClient httpClient = HttpClients.createDefault(); try (CloseableHttpClient httpClient = newHttpClient()) {
URIBuilder builder = new URIBuilder(endPointURI); URIBuilder builder = new URIBuilder(endPointURI);
if (prefix != null) { if (prefix != null) {
builder.addParameter(Header.OZONE_LIST_QUERY_PREFIX, prefix); builder.addParameter(Header.OZONE_LIST_QUERY_PREFIX, prefix);
@ -225,14 +226,15 @@ public class OzoneClient implements Closeable {
builder.setPath("/").build(); builder.setPath("/").build();
HttpGet httpget = getHttpGet(builder.toString()); httpGet = getHttpGet(builder.toString());
if (onBehalfOf != null) { if (onBehalfOf != null) {
httpget.addHeader(Header.OZONE_USER, onBehalfOf); httpGet.addHeader(Header.OZONE_USER, onBehalfOf);
} }
return executeListVolume(httpget, httpClient); return executeListVolume(httpGet, httpClient);
} catch (IOException | URISyntaxException ex) { } catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(httpGet);
} }
} }
@ -261,9 +263,8 @@ public class OzoneClient implements Closeable {
*/ */
public List<OzoneVolume> listAllVolumes(String prefix, int maxKeys, public List<OzoneVolume> listAllVolumes(String prefix, int maxKeys,
OzoneVolume prevKey) throws OzoneException { OzoneVolume prevKey) throws OzoneException {
try { HttpGet httpGet = null;
CloseableHttpClient httpClient = HttpClients.createDefault(); try (CloseableHttpClient httpClient = newHttpClient()) {
URIBuilder builder = new URIBuilder(endPointURI); URIBuilder builder = new URIBuilder(endPointURI);
if (prefix != null) { if (prefix != null) {
builder.addParameter(Header.OZONE_LIST_QUERY_PREFIX, prefix); builder.addParameter(Header.OZONE_LIST_QUERY_PREFIX, prefix);
@ -281,11 +282,13 @@ public class OzoneClient implements Closeable {
builder.addParameter(Header.OZONE_LIST_QUERY_ROOTSCAN, "true"); builder.addParameter(Header.OZONE_LIST_QUERY_ROOTSCAN, "true");
builder.setPath("/").build(); builder.setPath("/").build();
HttpGet httpget = getHttpGet(builder.toString()); httpGet = getHttpGet(builder.toString());
return executeListVolume(httpget, httpClient); return executeListVolume(httpGet, httpClient);
} catch (IOException | URISyntaxException ex) { } catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(httpGet);
} }
} }
@ -296,17 +299,18 @@ public class OzoneClient implements Closeable {
* @throws OzoneException - Ozone Exception * @throws OzoneException - Ozone Exception
*/ */
public void deleteVolume(String volumeName) throws OzoneException { public void deleteVolume(String volumeName) throws OzoneException {
try { HttpDelete httpDelete = null;
CloseableHttpClient httpClient = HttpClients.createDefault(); try (CloseableHttpClient httpClient = newHttpClient()) {
OzoneUtils.verifyBucketName(volumeName); OzoneUtils.verifyBucketName(volumeName);
URIBuilder builder = new URIBuilder(endPointURI); URIBuilder builder = new URIBuilder(endPointURI);
builder.setPath("/" + volumeName).build(); builder.setPath("/" + volumeName).build();
HttpDelete httpdelete = getHttpDelete(builder.toString()); httpDelete = getHttpDelete(builder.toString());
executeDeleteVolume(httpdelete, httpClient); executeDeleteVolume(httpDelete, httpClient);
} catch (IOException | URISyntaxException | IllegalArgumentException ex) { } catch (IOException | URISyntaxException | IllegalArgumentException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(httpDelete);
} }
} }
@ -319,23 +323,23 @@ public class OzoneClient implements Closeable {
*/ */
public void setVolumeOwner(String volumeName, String newOwner) public void setVolumeOwner(String volumeName, String newOwner)
throws OzoneException { throws OzoneException {
HttpPut putRequest = null;
if (newOwner == null || newOwner.isEmpty()) { if (newOwner == null || newOwner.isEmpty()) {
throw new OzoneClientException("Invalid new owner name"); throw new OzoneClientException("Invalid new owner name");
} }
try { try (CloseableHttpClient httpClient = newHttpClient()) {
CloseableHttpClient httpClient = HttpClients.createDefault();
OzoneUtils.verifyBucketName(volumeName); OzoneUtils.verifyBucketName(volumeName);
URIBuilder builder = new URIBuilder(endPointURI); URIBuilder builder = new URIBuilder(endPointURI);
builder.setPath("/" + volumeName).build(); builder.setPath("/" + volumeName).build();
HttpPut putRequest = getHttpPut(builder.toString()); putRequest = getHttpPut(builder.toString());
putRequest.addHeader(Header.OZONE_USER, newOwner); putRequest.addHeader(Header.OZONE_USER, newOwner);
executePutVolume(putRequest, httpClient); executePutVolume(putRequest, httpClient);
} catch (URISyntaxException | IllegalArgumentException | IOException ex) { } catch (URISyntaxException | IllegalArgumentException | IOException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(putRequest);
} }
} }
@ -354,20 +358,21 @@ public class OzoneClient implements Closeable {
if (quota == null || quota.isEmpty()) { if (quota == null || quota.isEmpty()) {
throw new OzoneClientException("Invalid quota"); throw new OzoneClientException("Invalid quota");
} }
try { HttpPut putRequest = null;
CloseableHttpClient httpClient = HttpClients.createDefault(); try (CloseableHttpClient httpClient = newHttpClient()) {
OzoneUtils.verifyBucketName(volumeName); OzoneUtils.verifyBucketName(volumeName);
URIBuilder builder = new URIBuilder(endPointURI); URIBuilder builder = new URIBuilder(endPointURI);
builder.setPath("/" + volumeName) builder.setPath("/" + volumeName)
.setParameter(Header.OZONE_QUOTA_QUERY_TAG, quota) .setParameter(Header.OZONE_QUOTA_QUERY_TAG, quota)
.build(); .build();
HttpPut putRequest = getHttpPut(builder.toString()); putRequest = getHttpPut(builder.toString());
executePutVolume(putRequest, httpClient); executePutVolume(putRequest, httpClient);
} catch (URISyntaxException | IllegalArgumentException | IOException ex) { } catch (URISyntaxException | IllegalArgumentException | IOException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(putRequest);
} }
} }
@ -380,7 +385,7 @@ public class OzoneClient implements Closeable {
* @throws OzoneException * @throws OzoneException
*/ */
private void executeCreateVolume(HttpPost httppost, private void executeCreateVolume(HttpPost httppost,
CloseableHttpClient httpClient) final CloseableHttpClient httpClient)
throws IOException, OzoneException { throws IOException, OzoneException {
HttpEntity entity = null; HttpEntity entity = null;
try { try {
@ -413,7 +418,7 @@ public class OzoneClient implements Closeable {
* @throws OzoneException * @throws OzoneException
*/ */
private OzoneVolume executeInfoVolume(HttpGet httpGet, private OzoneVolume executeInfoVolume(HttpGet httpGet,
CloseableHttpClient httpClient) final CloseableHttpClient httpClient)
throws IOException, OzoneException { throws IOException, OzoneException {
HttpEntity entity = null; HttpEntity entity = null;
try { try {
@ -447,7 +452,7 @@ public class OzoneClient implements Closeable {
* @throws OzoneException * @throws OzoneException
*/ */
private void executePutVolume(HttpPut putRequest, private void executePutVolume(HttpPut putRequest,
CloseableHttpClient httpClient) final CloseableHttpClient httpClient)
throws IOException, OzoneException { throws IOException, OzoneException {
HttpEntity entity = null; HttpEntity entity = null;
try { try {
@ -473,7 +478,7 @@ public class OzoneClient implements Closeable {
* @throws OzoneException * @throws OzoneException
*/ */
private List<OzoneVolume> executeListVolume(HttpGet httpGet, private List<OzoneVolume> executeListVolume(HttpGet httpGet,
CloseableHttpClient httpClient) final CloseableHttpClient httpClient)
throws IOException, OzoneException { throws IOException, OzoneException {
HttpEntity entity = null; HttpEntity entity = null;
List<OzoneVolume> volList = new LinkedList<>(); List<OzoneVolume> volList = new LinkedList<>();
@ -514,7 +519,7 @@ public class OzoneClient implements Closeable {
* @throws OzoneException * @throws OzoneException
*/ */
private void executeDeleteVolume(HttpDelete httpDelete, private void executeDeleteVolume(HttpDelete httpDelete,
CloseableHttpClient httpClient) final CloseableHttpClient httpClient)
throws IOException, OzoneException { throws IOException, OzoneException {
HttpEntity entity = null; HttpEntity entity = null;
try { try {
@ -540,12 +545,12 @@ public class OzoneClient implements Closeable {
* @return HttpPost * @return HttpPost
*/ */
public HttpPost getHttpPost(String onBehalfOf, String uriString) { public HttpPost getHttpPost(String onBehalfOf, String uriString) {
HttpPost httppost = new HttpPost(uriString); HttpPost httpPost = new HttpPost(uriString);
addOzoneHeaders(httppost); addOzoneHeaders(httpPost);
if (onBehalfOf != null) { if (onBehalfOf != null) {
httppost.addHeader(Header.OZONE_USER, onBehalfOf); httpPost.addHeader(Header.OZONE_USER, onBehalfOf);
} }
return httppost; return httpPost;
} }
/** /**
@ -555,9 +560,9 @@ public class OzoneClient implements Closeable {
* @return HttpGet * @return HttpGet
*/ */
public HttpGet getHttpGet(String uriString) { public HttpGet getHttpGet(String uriString) {
HttpGet httpget = new HttpGet(uriString); HttpGet httpGet = new HttpGet(uriString);
addOzoneHeaders(httpget); addOzoneHeaders(httpGet);
return httpget; return httpGet;
} }
/** /**
@ -615,4 +620,9 @@ public class OzoneClient implements Closeable {
// TODO : Currently we create a new HTTP client. We should switch // TODO : Currently we create a new HTTP client. We should switch
// This to a Pool and cleanup the pool here. // This to a Pool and cleanup the pool here.
} }
@VisibleForTesting
public CloseableHttpClient newHttpClient() {
return OzoneClientUtils.newHttpClient();
}
} }

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.ozone.web.client; package org.apache.hadoop.ozone.web.client;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.headers.Header; import org.apache.hadoop.ozone.web.headers.Header;
import org.apache.hadoop.ozone.web.request.OzoneQuota; import org.apache.hadoop.ozone.web.request.OzoneQuota;
@ -35,7 +37,6 @@ import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut; import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.utils.URIBuilder; import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import java.io.IOException; import java.io.IOException;
@ -164,26 +165,28 @@ public class OzoneVolume {
OzoneConsts.Versioning versioning) OzoneConsts.Versioning versioning)
throws OzoneException { throws OzoneException {
try { HttpPost httpPost = null;
try (CloseableHttpClient httpClient = newHttpClient()) {
OzoneUtils.verifyBucketName(bucketName); OzoneUtils.verifyBucketName(bucketName);
CloseableHttpClient httpClient = HttpClients.createDefault();
URIBuilder builder = new URIBuilder(getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
builder.setPath("/" + getVolumeName() + "/" + bucketName).build(); builder.setPath("/" + getVolumeName() + "/" + bucketName).build();
HttpPost httppost = client.getHttpPost(null, builder.toString()); httpPost = client.getHttpPost(null, builder.toString());
if (acls != null) { if (acls != null) {
for (String acl : acls) { for (String acl : acls) {
httppost httpPost
.addHeader(Header.OZONE_ACLS, Header.OZONE_ACL_ADD + " " + acl); .addHeader(Header.OZONE_ACLS, Header.OZONE_ACL_ADD + " " + acl);
} }
} }
httppost.addHeader(Header.OZONE_STORAGE_TYPE, storageType.toString()); httpPost.addHeader(Header.OZONE_STORAGE_TYPE, storageType.toString());
httppost.addHeader(Header.OZONE_BUCKET_VERSIONING, versioning.toString()); httpPost.addHeader(Header.OZONE_BUCKET_VERSIONING, versioning.toString());
executeCreateBucket(httppost, httpClient); executeCreateBucket(httpPost, httpClient);
return getBucket(bucketName); return getBucket(bucketName);
} catch (IOException | URISyntaxException | IllegalArgumentException ex) { } catch (IOException | URISyntaxException | IllegalArgumentException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(httpPost);
} }
} }
@ -226,7 +229,6 @@ public class OzoneVolume {
* @throws OzoneException * @throws OzoneException
*/ */
public OzoneBucket createBucket(String bucketName) throws OzoneException { public OzoneBucket createBucket(String bucketName) throws OzoneException {
return createBucket(bucketName, null, StorageType.DEFAULT, return createBucket(bucketName, null, StorageType.DEFAULT,
OzoneConsts.Versioning.DISABLED); OzoneConsts.Versioning.DISABLED);
} }
@ -273,13 +275,12 @@ public class OzoneVolume {
* @throws OzoneException * @throws OzoneException
*/ */
public void addAcls(String bucketName, String[] acls) throws OzoneException { public void addAcls(String bucketName, String[] acls) throws OzoneException {
HttpPut putRequest = null;
try { try (CloseableHttpClient httpClient = newHttpClient()) {
OzoneUtils.verifyBucketName(bucketName); OzoneUtils.verifyBucketName(bucketName);
CloseableHttpClient httpClient = HttpClients.createDefault();
URIBuilder builder = new URIBuilder(getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
builder.setPath("/" + getVolumeName() + "/" + bucketName).build(); builder.setPath("/" + getVolumeName() + "/" + bucketName).build();
HttpPut putRequest = client.getHttpPut(builder.toString()); putRequest = client.getHttpPut(builder.toString());
for (String acl : acls) { for (String acl : acls) {
putRequest putRequest
@ -288,6 +289,8 @@ public class OzoneVolume {
executePutBucket(putRequest, httpClient); executePutBucket(putRequest, httpClient);
} catch (URISyntaxException | IOException ex) { } catch (URISyntaxException | IOException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(putRequest);
} }
} }
@ -301,12 +304,12 @@ public class OzoneVolume {
*/ */
public void removeAcls(String bucketName, String[] acls) public void removeAcls(String bucketName, String[] acls)
throws OzoneException { throws OzoneException {
try { HttpPut putRequest = null;
try (CloseableHttpClient httpClient = newHttpClient()) {
OzoneUtils.verifyBucketName(bucketName); OzoneUtils.verifyBucketName(bucketName);
CloseableHttpClient httpClient = HttpClients.createDefault();
URIBuilder builder = new URIBuilder(getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
builder.setPath("/" + getVolumeName() + "/" + bucketName).build(); builder.setPath("/" + getVolumeName() + "/" + bucketName).build();
HttpPut putRequest = client.getHttpPut(builder.toString()); putRequest = client.getHttpPut(builder.toString());
for (String acl : acls) { for (String acl : acls) {
putRequest putRequest
@ -315,6 +318,8 @@ public class OzoneVolume {
executePutBucket(putRequest, httpClient); executePutBucket(putRequest, httpClient);
} catch (URISyntaxException | IOException ex) { } catch (URISyntaxException | IOException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(putRequest);
} }
} }
@ -326,18 +331,20 @@ public class OzoneVolume {
* @return OZoneBucket * @return OZoneBucket
*/ */
public OzoneBucket getBucket(String bucketName) throws OzoneException { public OzoneBucket getBucket(String bucketName) throws OzoneException {
try { HttpGet getRequest = null;
try (CloseableHttpClient httpClient = newHttpClient()) {
OzoneUtils.verifyBucketName(bucketName); OzoneUtils.verifyBucketName(bucketName);
CloseableHttpClient httpClient = HttpClients.createDefault();
URIBuilder builder = new URIBuilder(getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
builder.setPath("/" + getVolumeName() + "/" + bucketName) builder.setPath("/" + getVolumeName() + "/" + bucketName)
.setParameter(Header.OZONE_LIST_QUERY_TAG, .setParameter(Header.OZONE_LIST_QUERY_TAG,
Header.OZONE_LIST_QUERY_BUCKET).build(); Header.OZONE_LIST_QUERY_BUCKET).build();
HttpGet getRequest = client.getHttpGet(builder.toString()); getRequest = client.getHttpGet(builder.toString());
return executeInfoBucket(getRequest, httpClient); return executeInfoBucket(getRequest, httpClient);
} catch (IOException | URISyntaxException | IllegalArgumentException ex) { } catch (IOException | URISyntaxException | IllegalArgumentException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(getRequest);
} }
} }
@ -422,17 +429,18 @@ public class OzoneVolume {
* @throws OzoneException * @throws OzoneException
*/ */
public List<OzoneBucket> listBuckets() throws OzoneException { public List<OzoneBucket> listBuckets() throws OzoneException {
try { HttpGet getRequest = null;
CloseableHttpClient httpClient = HttpClients.createDefault(); try (CloseableHttpClient httpClient = newHttpClient()) {
URIBuilder builder = new URIBuilder(getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
builder.setPath("/" + getVolumeName()).build(); builder.setPath("/" + getVolumeName()).build();
HttpGet getRequest = client.getHttpGet(builder.toString()); getRequest = client.getHttpGet(builder.toString());
return executeListBuckets(getRequest, httpClient); return executeListBuckets(getRequest, httpClient);
} catch (IOException | URISyntaxException e) { } catch (IOException | URISyntaxException e) {
throw new OzoneClientException(e.getMessage()); throw new OzoneClientException(e.getMessage());
} finally {
OzoneClientUtils.releaseConnection(getRequest);
} }
} }
@ -488,17 +496,19 @@ public class OzoneVolume {
* @throws OzoneException * @throws OzoneException
*/ */
public void deleteBucket(String bucketName) throws OzoneException { public void deleteBucket(String bucketName) throws OzoneException {
try { HttpDelete delRequest = null;
try (CloseableHttpClient httpClient = newHttpClient()) {
OzoneUtils.verifyBucketName(bucketName); OzoneUtils.verifyBucketName(bucketName);
CloseableHttpClient httpClient = HttpClients.createDefault();
URIBuilder builder = new URIBuilder(getClient().getEndPointURI()); URIBuilder builder = new URIBuilder(getClient().getEndPointURI());
builder.setPath("/" + getVolumeName() + "/" + bucketName).build(); builder.setPath("/" + getVolumeName() + "/" + bucketName).build();
HttpDelete delRequest = client.getHttpDelete(builder.toString()); delRequest = client.getHttpDelete(builder.toString());
executeDeleteBucket(delRequest, httpClient); executeDeleteBucket(delRequest, httpClient);
} catch (IOException | URISyntaxException | IllegalArgumentException ex) { } catch (IOException | URISyntaxException | IllegalArgumentException ex) {
throw new OzoneClientException(ex.getMessage()); throw new OzoneClientException(ex.getMessage());
} finally {
OzoneClientUtils.releaseConnection(delRequest);
} }
} }
@ -536,4 +546,9 @@ public class OzoneVolume {
} }
} }
} }
@VisibleForTesting
public CloseableHttpClient newHttpClient() {
return OzoneClientUtils.newHttpClient();
}
} }

View File

@ -22,28 +22,36 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.request.OzoneQuota; import org.apache.hadoop.ozone.web.request.OzoneQuota;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class TestVolume { public class TestVolume {
private static MiniOzoneCluster cluster = null; private static MiniOzoneCluster cluster = null;
@ -90,10 +98,15 @@ public class TestVolume {
} }
@Test @Test
public void testCreateVolume() throws OzoneException { public void testCreateVolume() throws OzoneException, IOException {
String volumeName = OzoneUtils.getRequestID().toLowerCase(); String volumeName = OzoneUtils.getRequestID().toLowerCase();
client.setUserAuth(OzoneConsts.OZONE_SIMPLE_HDFS_USER); client.setUserAuth(OzoneConsts.OZONE_SIMPLE_HDFS_USER);
OzoneVolume vol = client.createVolume(volumeName, "bilbo", "100TB");
OzoneClient mockClient = Mockito.spy(client);
List<CloseableHttpClient> mockedClients = mockHttpClients(mockClient);
OzoneVolume vol = mockClient.createVolume(volumeName, "bilbo", "100TB");
// Verify http clients are properly closed.
verifyHttpConnectionClosed(mockedClients);
assertEquals(vol.getVolumeName(), volumeName); assertEquals(vol.getVolumeName(), volumeName);
assertEquals(vol.getCreatedby(), "hdfs"); assertEquals(vol.getCreatedby(), "hdfs");
@ -210,4 +223,80 @@ public class TestVolume {
// be volumes created by other tests too. So we should get more page counts. // be volumes created by other tests too. So we should get more page counts.
Assert.assertEquals(volCount / step , pagecount); Assert.assertEquals(volCount / step , pagecount);
} }
/**
* Returns a list of mocked {@link CloseableHttpClient} used for testing.
* The mocked client replaces the actual calls in
* {@link OzoneClient#newHttpClient()}, it is used to verify
* if the invocation of this client is expected. <b>Note</b>, the output
* of this method is always used as the input of
* {@link TestVolume#verifyHttpConnectionClosed(List)}.
*
* @param ozoneClient mocked ozone client.
* @return a list of mocked {@link CloseableHttpClient}.
* @throws IOException
*/
private List<CloseableHttpClient> mockHttpClients(OzoneClient ozoneClient)
throws IOException {
List<CloseableHttpClient> spyHttpClients = new ArrayList<>();
for (int i = 0; i < 5; i++) {
CloseableHttpClient spyHttpClient = Mockito
.spy(OzoneClientUtils.newHttpClient());
spyHttpClients.add(spyHttpClient);
}
List<CloseableHttpClient> nextReturns =
new ArrayList<>(spyHttpClients.subList(1, spyHttpClients.size()));
Mockito.when(ozoneClient.newHttpClient()).thenReturn(
spyHttpClients.get(0),
nextReturns.toArray(new CloseableHttpClient[nextReturns.size()]));
return spyHttpClients;
}
/**
* This method is used together with
* {@link TestVolume#mockHttpClients(OzoneClient)} to verify
* if the http client is properly closed. It verifies that as long as
* a client calls {@link CloseableHttpClient#execute(HttpUriRequest)} to
* send request, then it must calls {@link CloseableHttpClient#close()}
* close the http connection.
*
* @param mockedHttpClients
*/
private void verifyHttpConnectionClosed(
List<CloseableHttpClient> mockedHttpClients) {
final AtomicInteger totalCalled = new AtomicInteger();
Assert.assertTrue(mockedHttpClients.stream().allMatch(
closeableHttpClient -> {
boolean clientUsed = false;
try {
verify(closeableHttpClient, times(1))
.execute(Mockito.any());
totalCalled.incrementAndGet();
clientUsed = true;
} catch (Throwable e) {
// There might be some redundant instances in mockedHttpClients,
// it is allowed that a client is not used.
return true;
}
if (clientUsed) {
try {
// If a client is used, ensure the close function is called.
verify(closeableHttpClient,
times(1)).close();
return true;
} catch (IOException e) {
return false;
}
} else {
return true;
}
}));
System.out.println("Successful connections "
+ totalCalled.get());
Assert.assertTrue(
"The mocked http client should be called at least once.",
totalCalled.get() > 0);
}
} }