don't attach audit user to remote requests and fix rest origin address

Currently, we attach the internal audit user to all requests. This is incorrect for requests that
need to be sent to a remote cluster. For these cases, we should require a user to be defined
to access the remote cluster if it is protected by Shield.

Additionally, the origin_address field for rest request fields is formatted differently than other
address fields. This changes the field to only be the remote address.

Closes elastic/elasticsearch#278
Closes elastic/elasticsearch#279

Original commit: elastic/x-pack-elasticsearch@a5f86b1974
This commit is contained in:
jaymode 2015-07-22 13:19:37 -04:00
parent 78068bd66f
commit 5de2b799e0
3 changed files with 73 additions and 33 deletions

View File

@ -37,7 +37,6 @@ import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.shield.ShieldPlugin;
import org.elasticsearch.shield.User;
@ -55,6 +54,7 @@ import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
@ -484,7 +484,12 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail {
}
msg.builder.field(Field.REQUEST_BODY, restRequestContent(request));
msg.builder.field(Field.ORIGIN_TYPE, "rest");
msg.builder.field(Field.ORIGIN_ADDRESS, request.getRemoteAddress());
SocketAddress address = request.getRemoteAddress();
if (address instanceof InetSocketAddress) {
msg.builder.field(Field.ORIGIN_ADDRESS, ((InetSocketAddress)request.getRemoteAddress()).getAddress().getHostAddress());
} else {
msg.builder.field(Field.ORIGIN_ADDRESS, request.getRemoteAddress());
}
msg.builder.field(Field.URI, request.uri());
return msg.end();
@ -636,8 +641,11 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail {
request.settings(updatedSettings);
}
authenticationService.attachUserHeaderIfMissing(request, auditUser.user());
assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!";
if (!indexToRemoteCluster) {
authenticationService.attachUserHeaderIfMissing(request, auditUser.user());
}
PutIndexTemplateResponse response = client.admin().indices().putTemplate(request).actionGet();
if (!response.isAcknowledged()) {
throw new IllegalStateException("failed to put index template for audit logging");
@ -660,7 +668,9 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
try {
authenticationService.attachUserHeaderIfMissing(request, auditUser.user());
if (!indexToRemoteCluster) {
authenticationService.attachUserHeaderIfMissing(request, auditUser.user());
}
} catch (IOException e) {
throw new ElasticsearchException("failed to attach user header", e);
}
@ -700,7 +710,9 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail {
IndexRequest indexRequest = client.prepareIndex()
.setIndex(resolve(INDEX_NAME_PREFIX, message.timestamp, rollover))
.setType(DOC_TYPE).setSource(message.builder).request();
authenticationService.attachUserHeaderIfMissing(indexRequest, auditUser.user());
if (!indexToRemoteCluster) {
authenticationService.attachUserHeaderIfMissing(indexRequest, auditUser.user());
}
bulkProcessor.add(indexRequest);
} catch (InterruptedException e) {
logger.debug("index audit queue consumer interrupted", e);

View File

@ -8,7 +8,6 @@ package org.elasticsearch.shield.audit.index;
import com.google.common.base.Predicate;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.exists.ExistsResponse;
import org.elasticsearch.action.search.SearchResponse;
@ -20,7 +19,6 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.shield.ShieldPlugin;
@ -30,7 +28,9 @@ import org.elasticsearch.shield.authc.AuthenticationToken;
import org.elasticsearch.shield.transport.filter.IPFilter;
import org.elasticsearch.shield.transport.filter.ShieldIpFilterRule;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.ShieldIntegrationTest;
import org.elasticsearch.test.ShieldSettingsSource;
import org.elasticsearch.transport.TransportInfo;
import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.transport.TransportRequest;
@ -45,13 +45,17 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import static org.elasticsearch.shield.audit.index.IndexNameResolver.Rollover.*;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.elasticsearch.test.InternalTestCluster.clusterName;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.*;
/**
*
@ -59,14 +63,14 @@ import static org.mockito.Mockito.when;
@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numDataNodes = 1)
public class IndexAuditTrailTests extends ShieldIntegrationTest {
public static final String REMOTE_TEST_CLUSTER = "single-node-remote-test-cluster";
public static final String SECOND_CLUSTER_NODE_PREFIX = "remote_" + SUITE_CLUSTER_NODE_PREFIX;
private static final IndexAuditUserHolder user = new IndexAuditUserHolder(IndexAuditTrail.INDEX_NAME_PREFIX);
private IndexNameResolver.Rollover rollover;
private IndexAuditTrail auditor;
private boolean remoteIndexing = false;
private Node remoteNode;
private InternalTestCluster cluster2;
private Client remoteClient;
private int numShards;
private int numReplicas;
@ -121,28 +125,53 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
numReplicas = numberOfReplicas();
numShards = numberOfShards();
Settings settings = settings(rollover, includes, excludes);
AuthenticationService authService = mock(AuthenticationService.class);
remoteIndexing = randomBoolean();
if (remoteIndexing) {
// start a small single-node cluster to test remote indexing against
logger.info("--> remote indexing enabled");
Settings s = Settings.builder().put(ShieldPlugin.ENABLED_SETTING_NAME, false).put("path.home", createTempDir()).build();
remoteNode = nodeBuilder().clusterName(REMOTE_TEST_CLUSTER).data(true).settings(s).node();
remoteClient = remoteNode.client();
// create another cluster
String cluster2Name = clusterName(Scope.SUITE.name(), randomLong());
// Setup a second test cluster with randomization for number of nodes, shield enabled, and SSL
final int numNodes = randomIntBetween(1, 2);
final boolean useShield = randomBoolean();
final boolean useSSL = useShield && randomBoolean();
logger.info("--> remote indexing enabled. shield enabled: [{}], SSL enabled: [{}]", useShield, useSSL);
ShieldSettingsSource cluster2SettingsSource = new ShieldSettingsSource(numNodes, useSSL, systemKey(), createTempDir(), Scope.SUITE) {
@Override
public Settings node(int nodeOrdinal) {
return Settings.builder()
.put(super.node(nodeOrdinal))
.put(ShieldPlugin.ENABLED_SETTING_NAME, useShield)
.build();
}
};
cluster2 = new InternalTestCluster(randomLong(), createTempDir(), numNodes, numNodes, cluster2Name, cluster2SettingsSource, 0, false, SECOND_CLUSTER_NODE_PREFIX);
cluster2.beforeTest(getRandom(), 0.5);
remoteClient = cluster2.client();
NodesInfoResponse response = remoteClient.admin().cluster().prepareNodesInfo().execute().actionGet();
TransportInfo info = response.getNodes()[0].getTransport();
InetSocketTransportAddress inet = (InetSocketTransportAddress) info.address().publishAddress();
settings = Settings.builder()
Settings.Builder builder = Settings.builder()
.put(settings)
.put(remoteSettings(inet.address().getAddress().getHostAddress(), inet.address().getPort(), REMOTE_TEST_CLUSTER))
.build();
.put(ShieldPlugin.ENABLED_SETTING_NAME, useShield)
.put(remoteSettings(inet.address().getAddress().getHostAddress(), inet.address().getPort(), cluster2Name))
.put("shield.audit.index.client.shield.user", ShieldSettingsSource.DEFAULT_USER_NAME + ":" + ShieldSettingsSource.DEFAULT_PASSWORD);
if (useSSL) {
for (Map.Entry<String, String> entry : cluster2SettingsSource.getClientSSLSettings().getAsMap().entrySet()) {
builder.put("shield.audit.index.client." + entry.getKey(), entry.getValue());
}
}
settings = builder.build();
doThrow(new IllegalStateException("indexing user should not be attached when sending remotely")).when(authService).attachUserHeaderIfMissing(any(TransportMessage.class), eq(user.user()));
}
settings = Settings.builder().put(settings).put("path.home", createTempDir()).build();
logger.info("--> settings: [{}]", settings.getAsMap().toString());
AuthenticationService authService = mock(AuthenticationService.class);
when(authService.authenticate(mock(RestRequest.class))).thenThrow(new UnsupportedOperationException(""));
when(authService.authenticate("_action", new LocalHostMockMessage(), user.user())).thenThrow(new UnsupportedOperationException(""));
@ -158,11 +187,10 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
}
cluster().wipe();
if (remoteIndexing && remoteNode != null) {
DeleteIndexResponse response = remoteClient.admin().indices().prepareDelete("*").execute().actionGet();
assertTrue(response.isAcknowledged());
if (remoteIndexing && cluster2 != null) {
cluster2.wipe();
remoteClient.close();
remoteNode.close();
cluster2.close();
}
}
@ -210,7 +238,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
SearchHit hit = getIndexedAuditMessage();
assertAuditMessage(hit, "rest", "anonymous_access_denied");
assertThat("_hostname:9200", equalTo(hit.field("origin_address").getValue()));
assertThat("127.0.0.1", equalTo(hit.field("origin_address").getValue()));
assertThat("_uri", equalTo(hit.field("uri").getValue()));
assertThat((String) hit.field("origin_type").getValue(), is("rest"));
assertThat(hit.field("request_body").getValue(), notNullValue());
@ -301,7 +329,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
assertAuditMessage(hit, "rest", "authentication_failed");
assertThat(hit.field("principal").getValue(), is((Object) "_principal"));
assertThat("_hostname:9200", equalTo(hit.field("origin_address").getValue()));
assertThat("127.0.0.1", equalTo(hit.field("origin_address").getValue()));
assertThat("_uri", equalTo(hit.field("uri").getValue()));
assertThat((String) hit.field("origin_type").getValue(), is("rest"));
assertThat(hit.field("request_body").getValue(), notNullValue());
@ -318,7 +346,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
assertAuditMessage(hit, "rest", "authentication_failed");
assertThat(hit.field("principal"), nullValue());
assertThat("_hostname:9200", equalTo(hit.field("origin_address").getValue()));
assertThat("127.0.0.1", equalTo(hit.field("origin_address").getValue()));
assertThat("_uri", equalTo(hit.field("uri").getValue()));
assertThat((String) hit.field("origin_type").getValue(), is("rest"));
assertThat(hit.field("request_body").getValue(), notNullValue());
@ -387,7 +415,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
SearchHit hit = getIndexedAuditMessage();
assertAuditMessage(hit, "rest", "authentication_failed");
assertThat("_hostname:9200", equalTo(hit.field("origin_address").getValue()));
assertThat("127.0.0.1", equalTo(hit.field("origin_address").getValue()));
assertThat("_uri", equalTo(hit.field("uri").getValue()));
assertEquals("_realm", hit.field("realm").getValue());
assertThat((String) hit.field("origin_type").getValue(), is("rest"));
@ -618,7 +646,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
private RestRequest mockRestRequest() {
RestRequest request = mock(RestRequest.class);
when(request.getRemoteAddress()).thenReturn(new InetSocketAddress("_hostname", 9200));
when(request.getRemoteAddress()).thenReturn(new InetSocketAddress("127.0.0.1", 9200));
when(request.uri()).thenReturn("_uri");
return request;
}

View File

@ -193,11 +193,11 @@ public class ShieldSettingsSource extends ClusterDiscoveryConfiguration.UnicastZ
}
}
private Settings getNodeSSLSettings() {
public Settings getNodeSSLSettings() {
return getSSLSettingsForStore("/org/elasticsearch/shield/transport/ssl/certs/simple/testnode.jks", "testnode", sslTransportEnabled, hostnameVerificationEnabled, hostnameVerificationResolveNameEnabled);
}
private Settings getClientSSLSettings() {
public Settings getClientSSLSettings() {
return getSSLSettingsForStore("/org/elasticsearch/shield/transport/ssl/certs/simple/testclient.jks", "testclient", sslTransportEnabled, hostnameVerificationEnabled, hostnameVerificationResolveNameEnabled);
}