Add doPrivilege blocks for socket connect operations in plugins (#22534)

This is related to #22116. Certain plugins (discovery-azure-classic, 
discovery-ec2, discovery-gce, repository-azure, repository-gcs, and 
repository-s3) open socket connections. As SocketPermissions are 
transitioned out of core, these plugins will require connect 
permission. This pull request wraps operations that require these 
permissions in doPrivileged blocks.
This commit is contained in:
Tim Brooks 2017-01-18 10:12:18 -06:00 committed by GitHub
parent 1d1bdd476c
commit 2766b08ff4
18 changed files with 630 additions and 303 deletions

View File

@ -20,22 +20,32 @@
package org.elasticsearch.cloud.azure.classic.management;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ServiceLoader;
import com.microsoft.windowsazure.Configuration;
import com.microsoft.windowsazure.core.Builder;
import com.microsoft.windowsazure.core.DefaultBuilder;
import com.microsoft.windowsazure.core.utils.KeyStoreType;
import com.microsoft.windowsazure.exception.ServiceException;
import com.microsoft.windowsazure.management.compute.ComputeManagementClient;
import com.microsoft.windowsazure.management.compute.ComputeManagementService;
import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDetailedResponse;
import com.microsoft.windowsazure.management.configuration.ManagementConfiguration;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.azure.classic.AzureServiceRemoteException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
public class AzureComputeServiceImpl extends AbstractLifecycleComponent
implements AzureComputeService {
@ -89,10 +99,15 @@ public class AzureComputeServiceImpl extends AbstractLifecycleComponent
@Override
public HostedServiceGetDetailedResponse getServiceDetails() {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
try {
return client.getHostedServicesOperations().getDetailed(serviceName);
} catch (Exception e) {
throw new AzureServiceRemoteException("can not get list of azure nodes", e);
return AccessController.doPrivileged((PrivilegedExceptionAction<HostedServiceGetDetailedResponse>)
() -> client.getHostedServicesOperations().getDetailed(serviceName));
} catch (PrivilegedActionException e) {
throw new AzureServiceRemoteException("can not get list of azure nodes", e.getCause());
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cloud.aws.network;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
import org.elasticsearch.cloud.aws.util.SocketAccess;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.network.NetworkService.CustomNameResolver;
import org.elasticsearch.common.settings.Settings;
@ -97,9 +98,9 @@ public class Ec2NameResolver extends AbstractComponent implements CustomNameReso
try {
URL url = new URL(metadataUrl);
logger.debug("obtaining ec2 hostname from ec2 meta-data url {}", url);
URLConnection urlConnection = url.openConnection();
URLConnection urlConnection = SocketAccess.doPrivilegedIOException(url::openConnection);
urlConnection.setConnectTimeout(2000);
in = urlConnection.getInputStream();
in = SocketAccess.doPrivilegedIOException(urlConnection::getInputStream);
BufferedReader urlReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String metadataResult = urlReader.readLine();

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.util;
import org.elasticsearch.SpecialPermission;
import java.io.IOException;
import java.net.SocketPermission;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
/**
* This plugin uses aws libraries to connect to aws services. For these remote calls the plugin needs
* {@link SocketPermission} 'connect' to establish connections. This class wraps the operations requiring access in
* {@link AccessController#doPrivileged(PrivilegedAction)} blocks.
*/
public final class SocketAccess {
private static final SpecialPermission SPECIAL_PERMISSION = new SpecialPermission();
private SocketAccess() {}
public static <T> T doPrivileged(PrivilegedAction<T> operation) {
checkSpecialPermission();
return AccessController.doPrivileged(operation);
}
public static <T> T doPrivilegedIOException(PrivilegedExceptionAction<T> operation) throws IOException {
checkSpecialPermission();
try {
return AccessController.doPrivileged(operation);
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
private static void checkSpecialPermission() {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(SPECIAL_PERMISSION);
}
}
}

View File

@ -19,12 +19,6 @@
package org.elasticsearch.discovery.ec2;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
@ -38,6 +32,7 @@ import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2Service.DISCOVERY_EC2;
import org.elasticsearch.cloud.aws.util.SocketAccess;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
@ -114,7 +109,7 @@ public class AwsEc2UnicastHostsProvider extends AbstractComponent implements Uni
// NOTE: we don't filter by security group during the describe instances request for two reasons:
// 1. differences in VPCs require different parameters during query (ID vs Name)
// 2. We want to use two different strategies: (all security groups vs. any security groups)
descInstances = client.describeInstances(buildDescribeInstancesRequest());
descInstances = SocketAccess.doPrivileged(() -> client.describeInstances(buildDescribeInstancesRequest()));
} catch (AmazonClientException e) {
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
logger.debug("Full exception:", e);

View File

@ -44,6 +44,7 @@ import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.AwsEc2Service;
import org.elasticsearch.cloud.aws.AwsEc2ServiceImpl;
import org.elasticsearch.cloud.aws.network.Ec2NameResolver;
import org.elasticsearch.cloud.aws.util.SocketAccess;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.logging.DeprecationLogger;
@ -56,7 +57,6 @@ import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.ec2.AwsEc2UnicastHostsProvider;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.DiscoveryPlugin;
import org.elasticsearch.plugins.Plugin;
@ -75,20 +75,19 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
try {
// kick jackson to do some static caching of declared members info
Jackson.jsonNodeOf("{}");
// ClientConfiguration clinit has some classloader problems
// TODO: fix that
Class.forName("com.amazonaws.ClientConfiguration");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
return null;
// Initializing Jackson requires RuntimePermission accessDeclaredMembers
// The ClientConfiguration class requires RuntimePermission getClassLoader
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
try {
// kick jackson to do some static caching of declared members info
Jackson.jsonNodeOf("{}");
// ClientConfiguration clinit has some classloader problems
// TODO: fix that
Class.forName("com.amazonaws.ClientConfiguration");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
return null;
});
}
@ -194,14 +193,14 @@ public class Ec2DiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
try {
url = new URL(azMetadataUrl);
logger.debug("obtaining ec2 [placement/availability-zone] from ec2 meta-data url {}", url);
urlConnection = url.openConnection();
urlConnection = SocketAccess.doPrivilegedIOException(url::openConnection);
urlConnection.setConnectTimeout(2000);
} catch (IOException e) {
// should not happen, we know the url is not malformed, and openConnection does not actually hit network
throw new UncheckedIOException(e);
}
try (InputStream in = urlConnection.getInputStream();
try (InputStream in = SocketAccess.doPrivilegedIOException(urlConnection::getInputStream);
BufferedReader urlReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
String metadataResult = urlReader.readLine();

View File

@ -21,10 +21,7 @@ package org.elasticsearch.cloud.gce;
import java.io.Closeable;
import java.io.IOException;
import java.security.AccessController;
import java.security.GeneralSecurityException;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -43,6 +40,7 @@ import com.google.api.services.compute.model.InstanceList;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.gce.util.Access;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -68,21 +66,14 @@ public class GceInstancesServiceImpl extends AbstractComponent implements GceIns
try {
// hack around code messiness in GCE code
// TODO: get this fixed
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
InstanceList instanceList = AccessController.doPrivileged(new PrivilegedExceptionAction<InstanceList>() {
@Override
public InstanceList run() throws Exception {
Compute.Instances.List list = client().instances().list(project, zoneId);
return list.execute();
}
InstanceList instanceList = Access.doPrivilegedIOException(() -> {
Compute.Instances.List list = client().instances().list(project, zoneId);
return list.execute();
});
// assist type inference
return instanceList.isEmpty() || instanceList.getItems() == null ?
return instanceList.isEmpty() || instanceList.getItems() == null ?
Collections.<Instance>emptyList() : instanceList.getItems();
} catch (PrivilegedActionException e) {
} catch (IOException e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Problem fetching instance list for zone {}", zoneId), e);
logger.debug("Full exception:", e);
// assist type inference
@ -134,7 +125,7 @@ public class GceInstancesServiceImpl extends AbstractComponent implements GceIns
public synchronized Compute client() {
if (refreshInterval != null && refreshInterval.millis() != 0) {
if (client != null &&
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
if (logger.isTraceEnabled()) logger.trace("using cache to retrieve client");
return client;
}
@ -151,22 +142,12 @@ public class GceInstancesServiceImpl extends AbstractComponent implements GceIns
String tokenServerEncodedUrl = GceMetadataService.GCE_HOST.get(settings) +
"/computeMetadata/v1/instance/service-accounts/default/token";
ComputeCredential credential = new ComputeCredential.Builder(getGceHttpTransport(), gceJsonFactory)
.setTokenServerEncodedUrl(tokenServerEncodedUrl)
.build();
.setTokenServerEncodedUrl(tokenServerEncodedUrl)
.build();
// hack around code messiness in GCE code
// TODO: get this fixed
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException {
credential.refreshToken();
return null;
}
});
Access.doPrivilegedIOException(credential::refreshToken);
logger.debug("token [{}] will expire in [{}] s", credential.getAccessToken(), credential.getExpiresInSeconds());
if (credential.getExpiresInSeconds() != null) {

View File

@ -34,6 +34,7 @@ import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpTransport;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.gce.util.Access;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -56,8 +57,8 @@ public class GceMetadataService extends AbstractLifecycleComponent {
protected synchronized HttpTransport getGceHttpTransport() throws GeneralSecurityException, IOException {
if (gceHttpTransport == null) {
gceHttpTransport = GoogleNetHttpTransport.newTrustedTransport();
}
gceHttpTransport = GoogleNetHttpTransport.newTrustedTransport();
}
return gceHttpTransport;
}
@ -71,30 +72,16 @@ public class GceMetadataService extends AbstractLifecycleComponent {
try {
// hack around code messiness in GCE code
// TODO: get this fixed
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
headers = AccessController.doPrivileged(new PrivilegedExceptionAction<HttpHeaders>() {
@Override
public HttpHeaders run() throws IOException {
return new HttpHeaders();
}
});
GenericUrl genericUrl = AccessController.doPrivileged(new PrivilegedAction<GenericUrl>() {
@Override
public GenericUrl run() {
return new GenericUrl(urlMetadataNetwork);
}
});
headers = Access.doPrivileged(HttpHeaders::new);
GenericUrl genericUrl = Access.doPrivileged(() -> new GenericUrl(urlMetadataNetwork));
// This is needed to query meta data: https://cloud.google.com/compute/docs/metadata
headers.put("Metadata-Flavor", "Google");
HttpResponse response;
response = getGceHttpTransport().createRequestFactory()
.buildGetRequest(genericUrl)
.setHeaders(headers)
.execute();
HttpResponse response = Access.doPrivilegedIOException(() ->
getGceHttpTransport().createRequestFactory()
.buildGetRequest(genericUrl)
.setHeaders(headers)
.execute());
String metadata = response.parseAsString();
logger.debug("metadata found [{}]", metadata);
return metadata;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cloud.gce.network;
import org.elasticsearch.cloud.gce.GceMetadataService;
import org.elasticsearch.cloud.gce.util.Access;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.network.NetworkService.CustomNameResolver;
@ -28,6 +29,9 @@ import org.elasticsearch.common.settings.Settings;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
/**
* <p>Resolves certain GCE related 'meta' hostnames into an actual hostname
@ -106,13 +110,13 @@ public class GceNameResolver extends AbstractComponent implements CustomNameReso
}
try {
String metadataResult = gceMetadataService.metadata(gceMetadataPath);
String metadataResult = Access.doPrivilegedIOException(() -> gceMetadataService.metadata(gceMetadataPath));
if (metadataResult == null || metadataResult.length() == 0) {
throw new IOException("no gce metadata returned from [" + gceMetadataPath + "] for [" + value + "]");
}
// only one address: because we explicitly ask for only one via the GceHostnameType
return new InetAddress[] { InetAddress.getByName(metadataResult) };
} catch (IOException | URISyntaxException e) {
} catch (IOException e) {
throw new IOException("IOException caught when fetching InetAddress from [" + gceMetadataPath + "]", e);
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.gce.util;
import org.elasticsearch.SpecialPermission;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
/**
* GCE's http client changes access levels. Specifically it needs {@link RuntimePermission} accessDeclaredMembers and
* setFactory and {@link java.lang.reflect.ReflectPermission} suppressAccessChecks. For remote calls the plugin needs
* SocketPermissions for 'connect'. This class wraps the operations requiring access in
* {@link AccessController#doPrivileged(PrivilegedAction)} blocks.
*/
public final class Access {
private static final SpecialPermission SPECIAL_PERMISSION = new SpecialPermission();
private Access() {}
public static <T> T doPrivileged(PrivilegedAction<T> operation) {
checkSpecialPermission();
return AccessController.doPrivileged(operation);
}
public static void doPrivilegedVoid(DiscoveryRunnable action) {
checkSpecialPermission();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
action.execute();
return null;
});
}
public static <T> T doPrivilegedIOException(PrivilegedExceptionAction<T> operation) throws IOException {
checkSpecialPermission();
try {
return AccessController.doPrivileged(operation);
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
private static void checkSpecialPermission() {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(SPECIAL_PERMISSION);
}
}
@FunctionalInterface
public interface DiscoveryRunnable {
void execute();
}
}

View File

@ -31,6 +31,7 @@ import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.gce.util.Access;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
@ -73,11 +74,7 @@ public class RetryHttpInitializerWrapper implements HttpRequestInitializer {
// Use only for testing
static MockGoogleCredential.Builder newMockCredentialBuilder() {
// TODO: figure out why GCE is so bad like this
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
return AccessController.doPrivileged((PrivilegedAction<MockGoogleCredential.Builder>) () -> new MockGoogleCredential.Builder());
return Access.doPrivileged(MockGoogleCredential.Builder::new);
}
@Override

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cloud.gce.GceInstancesServiceImpl;
import org.elasticsearch.cloud.gce.GceMetadataService;
import org.elasticsearch.cloud.gce.GceModule;
import org.elasticsearch.cloud.gce.network.GceNameResolver;
import org.elasticsearch.cloud.gce.util.Access;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
@ -80,17 +81,7 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
* our plugin permissions don't allow core to "reach through" plugins to
* change the permission. Because that'd be silly.
*/
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
ClassInfo.of(HttpHeaders.class, true);
return null;
}
});
Access.doPrivilegedVoid( () -> ClassInfo.of(HttpHeaders.class, true));
}
public GceDiscoveryPlugin(Settings settings) {

View File

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.azure.blobstore.util;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.SpecialPermission;
import java.net.SocketPermission;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
/**
* This plugin uses azure libraries to connect to azure storage services. For these remote calls the plugin needs
* {@link SocketPermission} 'connect' to establish connections. This class wraps the operations requiring access in
* {@link AccessController#doPrivileged(PrivilegedAction)} blocks.
*/
public final class SocketAccess {
private static final SpecialPermission SPECIAL_PERMISSION = new SpecialPermission();
private SocketAccess() {}
public static <T> T doPrivilegedException(PrivilegedExceptionAction<T> operation) throws StorageException, URISyntaxException {
checkSpecialPermission();
try {
return AccessController.doPrivileged(operation);
} catch (PrivilegedActionException e) {
throw (StorageException) e.getCause();
}
}
public static void doPrivilegedVoidException(StorageRunnable action) throws StorageException, URISyntaxException {
checkSpecialPermission();
try {
AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
action.executeCouldThrow();
return null;
});
} catch (PrivilegedActionException e) {
Throwable cause = e.getCause();
if (cause instanceof StorageException) {
throw (StorageException) cause;
} else {
throw (URISyntaxException) cause;
}
}
}
private static void checkSpecialPermission() {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(SPECIAL_PERMISSION);
}
}
@FunctionalInterface
public interface StorageRunnable {
void executeCouldThrow() throws StorageException, URISyntaxException;
}
}

View File

@ -29,6 +29,7 @@ import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.cloud.azure.blobstore.util.SocketAccess;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
@ -42,6 +43,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
@ -79,12 +83,12 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
void createClient(AzureStorageSettings azureStorageSettings) {
try {
logger.trace("creating new Azure storage client using account [{}], key [{}]",
azureStorageSettings.getAccount(), azureStorageSettings.getKey());
azureStorageSettings.getAccount(), azureStorageSettings.getKey());
String storageConnectionString =
"DefaultEndpointsProtocol=https;"
+ "AccountName="+ azureStorageSettings.getAccount() +";"
+ "AccountKey=" + azureStorageSettings.getKey();
"DefaultEndpointsProtocol=https;"
+ "AccountName=" + azureStorageSettings.getAccount() + ";"
+ "AccountKey=" + azureStorageSettings.getKey();
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
@ -151,7 +155,7 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
try {
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
return blobContainer.exists();
return SocketAccess.doPrivilegedException(blobContainer::exists);
} catch (Exception e) {
logger.error("can not access container [{}]", container);
}
@ -163,7 +167,7 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
logger.trace("removing container [{}]", container);
blobContainer.deleteIfExists();
SocketAccess.doPrivilegedException(blobContainer::deleteIfExists);
}
@Override
@ -172,7 +176,7 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
logger.trace("creating container [{}]", container);
blobContainer.createIfNotExists();
SocketAccess.doPrivilegedException(blobContainer::createIfNotExists);
} catch (IllegalArgumentException e) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("fails creating container [{}]", container), e);
throw new RepositoryException(container, e.getMessage(), e);
@ -186,14 +190,16 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
// Container name must be lower case.
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
if (blobContainer.exists()) {
// We list the blobs using a flat blob listing mode
for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
String blobName = blobNameFromUri(blobItem.getUri());
logger.trace("removing blob [{}] full URI was [{}]", blobName, blobItem.getUri());
deleteBlob(account, mode, container, blobName);
SocketAccess.doPrivilegedVoidException(() -> {
if (blobContainer.exists()) {
// We list the blobs using a flat blob listing mode
for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
String blobName = blobNameFromUri(blobItem.getUri());
logger.trace("removing blob [{}] full URI was [{}]", blobName, blobItem.getUri());
deleteBlob(account, mode, container, blobName);
}
}
}
});
}
/**
@ -223,7 +229,7 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
CloudBlobContainer blobContainer = client.getContainerReference(container);
if (blobContainer.exists()) {
CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
return azureBlob.exists();
return SocketAccess.doPrivilegedException(azureBlob::exists);
}
return false;
@ -239,7 +245,7 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
if (blobContainer.exists()) {
logger.trace("container [{}]: blob [{}] found. removing.", container, blob);
CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
azureBlob.delete();
SocketAccess.doPrivilegedVoidException(azureBlob::delete);
}
}
@ -247,14 +253,16 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
public InputStream getInputStream(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
logger.trace("reading container [{}], blob [{}]", container, blob);
CloudBlobClient client = this.getSelectedClient(account, mode);
return client.getContainerReference(container).getBlockBlobReference(blob).openInputStream();
CloudBlockBlob blockBlobReference = client.getContainerReference(container).getBlockBlobReference(blob);
return SocketAccess.doPrivilegedException(blockBlobReference::openInputStream);
}
@Override
public OutputStream getOutputStream(String account, LocationMode mode, String container, String blob) throws URISyntaxException, StorageException {
logger.trace("writing container [{}], blob [{}]", container, blob);
CloudBlobClient client = this.getSelectedClient(account, mode);
return client.getContainerReference(container).getBlockBlobReference(blob).openOutputStream();
CloudBlockBlob blockBlobReference = client.getContainerReference(container).getBlockBlobReference(blob);
return SocketAccess.doPrivilegedException(blockBlobReference::openOutputStream);
}
@Override
@ -265,30 +273,32 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
logger.debug("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix);
MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
CloudBlobClient client = this.getSelectedClient(account, mode);
CloudBlobContainer blobContainer = client.getContainerReference(container);
if (blobContainer.exists()) {
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix))) {
URI uri = blobItem.getUri();
logger.trace("blob url [{}]", uri);
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
String blobPath = uri.getPath().substring(1 + container.length() + 1);
SocketAccess.doPrivilegedVoidException(() -> {
if (blobContainer.exists()) {
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix))) {
URI uri = blobItem.getUri();
logger.trace("blob url [{}]", uri);
CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobPath);
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
String blobPath = uri.getPath().substring(1 + container.length() + 1);
// fetch the blob attributes from Azure (getBlockBlobReference does not do this)
// this is needed to retrieve the blob length (among other metadata) from Azure Storage
blob.downloadAttributes();
CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobPath);
BlobProperties properties = blob.getProperties();
String name = blobPath.substring(keyPath.length());
logger.trace("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength());
blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
// fetch the blob attributes from Azure (getBlockBlobReference does not do this)
// this is needed to retrieve the blob length (among other metadata) from Azure Storage
blob.downloadAttributes();
BlobProperties properties = blob.getProperties();
String name = blobPath.substring(keyPath.length());
logger.trace("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength());
blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength()));
}
}
}
});
return blobsBuilder.immutableMap();
}
@ -302,8 +312,10 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
CloudBlockBlob blobSource = blobContainer.getBlockBlobReference(sourceBlob);
if (blobSource.exists()) {
CloudBlockBlob blobTarget = blobContainer.getBlockBlobReference(targetBlob);
blobTarget.startCopy(blobSource);
blobSource.delete();
SocketAccess.doPrivilegedVoidException(() -> {
blobTarget.startCopy(blobSource);
blobSource.delete();
});
logger.debug("moveBlob container [{}], sourceBlob [{}], targetBlob [{}] -> done", container, sourceBlob, targetBlob);
}
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.gcs.util.SocketAccess;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
@ -103,7 +104,7 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
*/
boolean doesBucketExist(String bucketName) {
try {
return doPrivileged(() -> {
return SocketAccess.doPrivilegedIOException(() -> {
try {
Bucket bucket = client.buckets().get(bucketName).execute();
if (bucket != null) {
@ -130,7 +131,7 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
* @return a map of blob names and their metadata
*/
Map<String, BlobMetaData> listBlobs(String path) throws IOException {
return doPrivileged(() -> listBlobsByPath(bucket, path, path));
return SocketAccess.doPrivilegedIOException(() -> listBlobsByPath(bucket, path, path));
}
/**
@ -141,7 +142,7 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
* @return a map of blob names and their metadata
*/
Map<String, BlobMetaData> listBlobsByPrefix(String path, String prefix) throws IOException {
return doPrivileged(() -> listBlobsByPath(bucket, buildKey(path, prefix), path));
return SocketAccess.doPrivilegedIOException(() -> listBlobsByPath(bucket, buildKey(path, prefix), path));
}
/**
@ -165,21 +166,19 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
* @return true if the blob exists, false otherwise
*/
boolean blobExists(String blobName) throws IOException {
return doPrivileged(() -> {
try {
StorageObject blob = client.objects().get(bucket, blobName).execute();
if (blob != null) {
return Strings.hasText(blob.getId());
}
} catch (GoogleJsonResponseException e) {
GoogleJsonError error = e.getDetails();
if ((e.getStatusCode() == HTTP_NOT_FOUND) || ((error != null) && (error.getCode() == HTTP_NOT_FOUND))) {
return false;
}
throw e;
try {
StorageObject blob = SocketAccess.doPrivilegedIOException(() -> client.objects().get(bucket, blobName).execute());
if (blob != null) {
return Strings.hasText(blob.getId());
}
return false;
});
} catch (GoogleJsonResponseException e) {
GoogleJsonError error = e.getDetails();
if ((e.getStatusCode() == HTTP_NOT_FOUND) || ((error != null) && (error.getCode() == HTTP_NOT_FOUND))) {
return false;
}
throw e;
}
return false;
}
/**
@ -189,18 +188,18 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
* @return an InputStream
*/
InputStream readBlob(String blobName) throws IOException {
return doPrivileged(() -> {
try {
try {
return SocketAccess.doPrivilegedIOException(() -> {
Storage.Objects.Get object = client.objects().get(bucket, blobName);
return object.executeMediaAsInputStream();
} catch (GoogleJsonResponseException e) {
GoogleJsonError error = e.getDetails();
if ((e.getStatusCode() == HTTP_NOT_FOUND) || ((error != null) && (error.getCode() == HTTP_NOT_FOUND))) {
throw new NoSuchFileException(e.getMessage());
}
throw e;
});
} catch (GoogleJsonResponseException e) {
GoogleJsonError error = e.getDetails();
if ((e.getStatusCode() == HTTP_NOT_FOUND) || ((error != null) && (error.getCode() == HTTP_NOT_FOUND))) {
throw new NoSuchFileException(e.getMessage());
}
});
throw e;
}
}
/**
@ -210,14 +209,13 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
* @param blobSize expected size of the blob to be written
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
doPrivileged(() -> {
SocketAccess.doPrivilegedVoidIOException(() -> {
InputStreamContent stream = new InputStreamContent(null, inputStream);
stream.setLength(blobSize);
Storage.Objects.Insert insert = client.objects().insert(bucket, null, stream);
insert.setName(blobName);
insert.execute();
return null;
});
}
@ -230,7 +228,7 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
if (!blobExists(blobName)) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
doPrivileged(() -> client.objects().delete(bucket, blobName).execute());
SocketAccess.doPrivilegedIOException(() -> client.objects().delete(bucket, blobName).execute());
}
/**
@ -239,10 +237,7 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
* @param prefix prefix of the buckets to delete
*/
void deleteBlobsByPrefix(String prefix) throws IOException {
doPrivileged(() -> {
deleteBlobs(listBlobsByPath(bucket, prefix, null).keySet());
return null;
});
deleteBlobs(listBlobsByPath(bucket, prefix, null).keySet());
}
/**
@ -259,11 +254,10 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
deleteBlob(blobNames.iterator().next());
return;
}
final List<Storage.Objects.Delete> deletions = new ArrayList<>();
final Iterator<String> blobs = blobNames.iterator();
doPrivileged(() -> {
final List<Storage.Objects.Delete> deletions = new ArrayList<>();
final Iterator<String> blobs = blobNames.iterator();
SocketAccess.doPrivilegedVoidIOException(() -> {
while (blobs.hasNext()) {
// Create a delete request for each blob to delete
deletions.add(client.objects().delete(bucket, blobs.next()));
@ -282,7 +276,7 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
@Override
public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
logger.error("failed to delete blob [{}] in bucket [{}]: {}", delete.getObject(), delete.getBucket(), e
.getMessage());
.getMessage());
}
@Override
@ -302,7 +296,6 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
}
}
}
return null;
});
}
@ -313,7 +306,7 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
* @param targetBlob new name of the blob in the target bucket
*/
void moveBlob(String sourceBlob, String targetBlob) throws IOException {
doPrivileged(() -> {
SocketAccess.doPrivilegedIOException(() -> {
// There's no atomic "move" in GCS so we need to copy and delete
client.objects().copy(bucket, sourceBlob, bucket, targetBlob, null).execute();
client.objects().delete(bucket, sourceBlob).execute();
@ -321,21 +314,6 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
});
}
/**
* Executes a {@link PrivilegedExceptionAction} with privileges enabled.
*/
<T> T doPrivileged(PrivilegedExceptionAction<T> operation) throws IOException {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<T>) operation::run);
} catch (PrivilegedActionException e) {
throw (IOException) e.getException();
}
}
private String buildKey(String keyPath, String s) {
assert s != null;
return keyPath + s;
@ -370,7 +348,7 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
private final Storage.Objects.List list;
StorageObjectsSpliterator(Storage client, String bucketName, String prefix, long pageSize) throws IOException {
list = client.objects().list(bucketName);
list = SocketAccess.doPrivilegedIOException(() -> client.objects().list(bucketName));
list.setMaxResults(pageSize);
if (prefix != null) {
list.setPrefix(prefix);
@ -381,7 +359,7 @@ public class GoogleCloudStorageBlobStore extends AbstractComponent implements Bl
public boolean tryAdvance(Consumer<? super StorageObject> action) {
try {
// Retrieves the next page of items
Objects objects = list.execute();
Objects objects = SocketAccess.doPrivilegedIOException(list::execute);
if ((objects == null) || (objects.getItems() == null) || (objects.getItems().isEmpty())) {
return false;

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore.gcs.util;
import org.elasticsearch.SpecialPermission;
import java.io.IOException;
import java.net.SocketPermission;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
/**
* This plugin uses google api/client libraries to connect to google cloud services. For these remote calls the plugin
* needs {@link SocketPermission} 'connect' to establish connections. This class wraps the operations requiring access
* in {@link AccessController#doPrivileged(PrivilegedAction)} blocks.
*/
public final class SocketAccess {
private static final SpecialPermission SPECIAL_PERMISSION = new SpecialPermission();
private SocketAccess() {}
public static <T> T doPrivilegedIOException(PrivilegedExceptionAction<T> operation) throws IOException {
checkSpecialPermission();
try {
return AccessController.doPrivileged(operation);
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
public static void doPrivilegedVoidIOException(StorageRunnable action) throws IOException {
checkSpecialPermission();
try {
AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
action.executeCouldThrow();
return null;
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
private static void checkSpecialPermission() {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(SPECIAL_PERMISSION);
}
}
@FunctionalInterface
public interface StorageRunnable {
void executeCouldThrow() throws IOException;
}
}

View File

@ -20,13 +20,14 @@
package org.elasticsearch.cloud.aws.blobstore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cloud.aws.util.SocketAccess;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
@ -42,8 +43,7 @@ import java.io.OutputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.security.PrivilegedAction;
import java.util.Map;
public class S3BlobContainer extends AbstractBlobContainer {
@ -61,13 +61,9 @@ public class S3BlobContainer extends AbstractBlobContainer {
@Override
public boolean blobExists(String blobName) {
try {
return doPrivileged(() -> {
try {
blobStore.client().getObjectMetadata(blobStore.bucket(), buildKey(blobName));
return true;
} catch (AmazonS3Exception e) {
return false;
}
return SocketAccess.doPrivileged(() -> {
blobStore.client().getObjectMetadata(blobStore.bucket(), buildKey(blobName));
return true;
});
} catch (AmazonS3Exception e) {
return false;
@ -81,7 +77,7 @@ public class S3BlobContainer extends AbstractBlobContainer {
int retry = 0;
while (retry <= blobStore.numberOfRetries()) {
try {
S3Object s3Object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
S3Object s3Object = SocketAccess.doPrivileged(() -> blobStore.client().getObject(blobStore.bucket(), buildKey(blobName)));
return s3Object.getObjectContent();
} catch (AmazonClientException e) {
if (blobStore.shouldRetry(e) && (retry < blobStore.numberOfRetries())) {
@ -96,7 +92,7 @@ public class S3BlobContainer extends AbstractBlobContainer {
}
}
}
throw new BlobStoreException("retries exhausted while attempting to access blob object [name:" + blobName + ", bucket:" + blobStore.bucket() +"]");
throw new BlobStoreException("retries exhausted while attempting to access blob object [name:" + blobName + ", bucket:" + blobStore.bucket() + "]");
}
@Override
@ -105,7 +101,7 @@ public class S3BlobContainer extends AbstractBlobContainer {
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
}
try (OutputStream stream = createOutput(blobName)) {
Streams.copy(inputStream, stream);
SocketAccess.doPrivilegedIOException(() -> Streams.copy(inputStream, stream));
}
}
@ -116,7 +112,7 @@ public class S3BlobContainer extends AbstractBlobContainer {
}
try {
blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName));
SocketAccess.doPrivilegedVoid(() -> blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName)));
} catch (AmazonClientException e) {
throw new IOException("Exception when deleting blob [" + blobName + "]", e);
}
@ -125,51 +121,60 @@ public class S3BlobContainer extends AbstractBlobContainer {
private OutputStream createOutput(final String blobName) throws IOException {
// UploadS3OutputStream does buffering & retry logic internally
return new DefaultS3OutputStream(blobStore, blobStore.bucket(), buildKey(blobName),
blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption());
blobStore.bufferSizeInBytes(), blobStore.numberOfRetries(), blobStore.serverSideEncryption());
}
@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
ObjectListing prevListing = null;
while (true) {
ObjectListing list;
if (prevListing != null) {
list = blobStore.client().listNextBatchOfObjects(prevListing);
} else {
if (blobNamePrefix != null) {
list = blobStore.client().listObjects(blobStore.bucket(), buildKey(blobNamePrefix));
} else {
list = blobStore.client().listObjects(blobStore.bucket(), keyPath);
return AccessController.doPrivileged((PrivilegedAction<Map<String, BlobMetaData>>) () -> {
MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
AmazonS3 client = blobStore.client();
SocketAccess.doPrivilegedVoid(() -> {
ObjectListing prevListing = null;
while (true) {
ObjectListing list;
if (prevListing != null) {
list = client.listNextBatchOfObjects(prevListing);
} else {
if (blobNamePrefix != null) {
list = client.listObjects(blobStore.bucket(), buildKey(blobNamePrefix));
} else {
list = client.listObjects(blobStore.bucket(), keyPath);
}
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
String name = summary.getKey().substring(keyPath.length());
blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize()));
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
String name = summary.getKey().substring(keyPath.length());
blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize()));
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
return blobsBuilder.immutableMap();
});
return blobsBuilder.immutableMap();
});
}
@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
try {
CopyObjectRequest request = new CopyObjectRequest(blobStore.bucket(), buildKey(sourceBlobName),
blobStore.bucket(), buildKey(targetBlobName));
blobStore.bucket(), buildKey(targetBlobName));
if (blobStore.serverSideEncryption()) {
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
request.setNewObjectMetadata(objectMetadata);
}
blobStore.client().copyObject(request);
blobStore.client().deleteObject(blobStore.bucket(), buildKey(sourceBlobName));
} catch (AmazonS3Exception e){
SocketAccess.doPrivilegedVoid(() -> {
blobStore.client().copyObject(request);
blobStore.client().deleteObject(blobStore.bucket(), buildKey(sourceBlobName));
});
} catch (AmazonS3Exception e) {
throw new IOException(e);
}
}
@ -182,20 +187,4 @@ public class S3BlobContainer extends AbstractBlobContainer {
protected String buildKey(String blobName) {
return keyPath + blobName;
}
/**
* + * Executes a {@link PrivilegedExceptionAction} with privileges enabled.
* +
*/
<T> T doPrivileged(PrivilegedExceptionAction<T> operation) throws IOException {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
try {
return AccessController.doPrivileged(operation);
} catch (PrivilegedActionException e) {
throw (IOException) e.getException();
}
}
}

View File

@ -29,6 +29,7 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import org.elasticsearch.cloud.aws.util.SocketAccess;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
@ -38,6 +39,8 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Locale;
@ -76,29 +79,31 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
// Also, if invalid security credentials are used to execute this method, the
// client is not able to distinguish between bucket permission errors and
// invalid credential errors, and this method could return an incorrect result.
int retry = 0;
while (retry <= maxRetries) {
try {
if (!client.doesBucketExist(bucket)) {
CreateBucketRequest request = null;
if (region != null) {
request = new CreateBucketRequest(bucket, region);
} else {
request = new CreateBucketRequest(bucket);
SocketAccess.doPrivilegedVoid(() -> {
int retry = 0;
while (retry <= maxRetries) {
try {
if (!client.doesBucketExist(bucket)) {
CreateBucketRequest request;
if (region != null) {
request = new CreateBucketRequest(bucket, region);
} else {
request = new CreateBucketRequest(bucket);
}
request.setCannedAcl(this.cannedACL);
client.createBucket(request);
}
break;
} catch (AmazonClientException e) {
if (shouldRetry(e) && retry < maxRetries) {
retry++;
} else {
logger.debug("S3 client create bucket failed");
throw e;
}
request.setCannedAcl(this.cannedACL);
client.createBucket(request);
}
break;
} catch (AmazonClientException e) {
if (shouldRetry(e) && retry < maxRetries) {
retry++;
} else {
logger.debug("S3 client create bucket failed");
throw e;
}
}
}
});
}
@Override
@ -114,7 +119,9 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
return bucket;
}
public boolean serverSideEncryption() { return serverSideEncryption; }
public boolean serverSideEncryption() {
return serverSideEncryption;
}
public int bufferSizeInBytes() {
return bufferSize.bytesAsInt();
@ -131,45 +138,48 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
@Override
public void delete(BlobPath path) {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
//we can do at most 1K objects per delete
//We don't know the bucket name until first object listing
DeleteObjectsRequest multiObjectDeleteRequest = null;
ArrayList<KeyVersion> keys = new ArrayList<KeyVersion>();
while (true) {
ObjectListing list;
if (prevListing != null) {
list = client.listNextBatchOfObjects(prevListing);
} else {
list = client.listObjects(bucket, path.buildAsString());
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
keys.add(new KeyVersion(summary.getKey()));
//Every 500 objects batch the delete request
if (keys.size() > 500) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
//we can do at most 1K objects per delete
//We don't know the bucket name until first object listing
DeleteObjectsRequest multiObjectDeleteRequest = null;
ArrayList<KeyVersion> keys = new ArrayList<KeyVersion>();
while (true) {
ObjectListing list;
if (prevListing != null) {
list = client.listNextBatchOfObjects(prevListing);
} else {
list = client.listObjects(bucket, path.buildAsString());
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
keys.clear();
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
keys.add(new KeyVersion(summary.getKey()));
//Every 500 objects batch the delete request
if (keys.size() > 500) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
multiObjectDeleteRequest = new DeleteObjectsRequest(list.getBucketName());
keys.clear();
}
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
}
}
if (list.isTruncated()) {
prevListing = list;
} else {
break;
if (!keys.isEmpty()) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
}
}
if (!keys.isEmpty()) {
multiObjectDeleteRequest.setKeys(keys);
client.deleteObjects(multiObjectDeleteRequest);
}
return null;
});
}
protected boolean shouldRetry(AmazonClientException e) {
if (e instanceof AmazonS3Exception) {
AmazonS3Exception s3e = (AmazonS3Exception)e;
AmazonS3Exception s3e = (AmazonS3Exception) e;
if (s3e.getStatusCode() == 400 && "RequestTimeout".equals(s3e.getErrorCode())) {
return true;
}
@ -194,7 +204,7 @@ public class S3BlobStore extends AbstractComponent implements BlobStore {
try {
StorageClass _storageClass = StorageClass.fromValue(storageClass.toUpperCase(Locale.ENGLISH));
if(_storageClass.equals(StorageClass.Glacier)) {
if (_storageClass.equals(StorageClass.Glacier)) {
throw new BlobStoreException("Glacier storage class is not supported");
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cloud.aws.util;
import org.elasticsearch.SpecialPermission;
import java.io.IOException;
import java.net.SocketPermission;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
/**
* This plugin uses aws libraries to connect to S3 repositories. For these remote calls the plugin needs
* {@link SocketPermission} 'connect' to establish connections. This class wraps the operations requiring access in
* {@link AccessController#doPrivileged(PrivilegedAction)} blocks.
*/
public final class SocketAccess {
private static final SpecialPermission SPECIAL_PERMISSION = new SpecialPermission();
private SocketAccess() {}
public static <T> T doPrivileged(PrivilegedAction<T> operation) {
checkSpecialPermission();
return AccessController.doPrivileged(operation);
}
public static <T> T doPrivilegedIOException(PrivilegedExceptionAction<T> operation) throws IOException {
checkSpecialPermission();
try {
return AccessController.doPrivileged(operation);
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
public static void doPrivilegedVoid(StorageRunnable action) {
checkSpecialPermission();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
action.execute();
return null;
});
}
private static void checkSpecialPermission() {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(SPECIAL_PERMISSION);
}
}
@FunctionalInterface
public interface StorageRunnable {
void execute();
}
}