SOLR-13741: Harden AuditLoggerIntegrationTest

This commit is contained in:
Chris Hostetter 2019-10-16 16:56:34 -07:00
parent ebc720c5b0
commit 63e9bcf5d1
3 changed files with 287 additions and 166 deletions

View File

@ -3,7 +3,7 @@
"class": "solr.CallbackAuditLoggerPlugin",
"callbackPort": "_PORT_",
"async": _ASYNC_,
"delay": "_DELAY_",
"semaphore": _SEMAPHORE_,
"muteRules": _MUTERULES_
},_AUTH_
}
}

View File

@ -26,17 +26,20 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
@ -48,6 +51,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.security.AuditEvent.EventType;
import org.apache.solr.security.AuditEvent.RequestType;
import org.apache.solr.security.AuditLoggerPlugin.JSONAuditEventFormatter;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.After;
import org.junit.Before;
@ -55,13 +59,16 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
import static org.apache.solr.client.solrj.request.CollectionAdminRequest.getClusterStatus;
import static org.apache.solr.client.solrj.request.CollectionAdminRequest.getOverseerStatus;
import static org.apache.solr.security.AuditEvent.EventType.COMPLETED;
import static org.apache.solr.security.AuditEvent.EventType.ERROR;
import static org.apache.solr.security.AuditEvent.EventType.REJECTED;
import static org.apache.solr.security.AuditEvent.EventType.UNAUTHORIZED;
import static org.apache.solr.security.AuditEvent.RequestType.ADMIN;
import static org.apache.solr.security.AuditEvent.RequestType.SEARCH;
import static org.apache.solr.security.Sha256AuthenticationProvider.getSaltedHashedValue;
/**
* Validate that audit logging works in a live cluster
@ -70,6 +77,8 @@ import static org.apache.solr.security.AuditEvent.RequestType.SEARCH;
public class AuditLoggerIntegrationTest extends SolrCloudAuthTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected static final JSONAuditEventFormatter formatter = new JSONAuditEventFormatter();
protected static final int NUM_SERVERS = 1;
protected static final int NUM_SHARDS = 1;
protected static final int REPLICATION_FACTOR = 1;
@ -90,144 +99,227 @@ public class AuditLoggerIntegrationTest extends SolrCloudAuthTestCase {
testHarness.get().close();
}
super.tearDown();
CallbackAuditLoggerPlugin.BLOCKING_SEMAPHORES.clear();
}
@Test
public void testSynchronous() throws Exception {
setupCluster(false, 0, false, null);
runAdminCommands();
waitForAuditEventCallbacks(3);
setupCluster(false, null, false);
runThreeTestAdminCommands();
assertThreeTestAdminEvents();
assertAuditMetricsMinimums(testHarness.get().cluster, CallbackAuditLoggerPlugin.class.getSimpleName(), 3, 0);
assertThreeAdminEvents();
}
@Test
public void testAsync() throws Exception {
setupCluster(true, 0, false, null);
runAdminCommands();
waitForAuditEventCallbacks(3);
setupCluster(true, null, false);
runThreeTestAdminCommands();
assertThreeTestAdminEvents();
assertAuditMetricsMinimums(testHarness.get().cluster, CallbackAuditLoggerPlugin.class.getSimpleName(), 3, 0);
assertThreeAdminEvents();
}
@Test
public void testQueuedTimeMetric() throws Exception {
setupCluster(true, 100, false, null);
runAdminCommands();
waitForAuditEventCallbacks(3);
final Semaphore gate = new Semaphore(0);
CallbackAuditLoggerPlugin.BLOCKING_SEMAPHORES.put("testQueuedTimeMetric_semaphore", gate);
setupCluster(true, "testQueuedTimeMetric_semaphore", false);
// NOTE: gate is empty, we don't allow any of the events to be logged yet
runThreeTestAdminCommands();
// Don't assume anything about the system clock,
// Thread.sleep is not a garunteed minimum for a predictible elapsed time...
final long start = System.nanoTime();
Thread.sleep(100);
final long end = System.nanoTime();
gate.release(3);
assertThreeTestAdminEvents();
assertAuditMetricsMinimums(testHarness.get().cluster, CallbackAuditLoggerPlugin.class.getSimpleName(), 3, 0);
ArrayList<MetricRegistry> registries = getMetricsReigstries(testHarness.get().cluster);
Timer timer = ((Timer) registries.get(0).getMetrics().get("SECURITY./auditlogging.CallbackAuditLoggerPlugin.queuedTime"));
double meanTimeOnQueue = timer.getSnapshot().getMean() / 1000000; // Convert to ms
assertTrue("Expecting mean time on queue >10ms, got " + meanTimeOnQueue, meanTimeOnQueue > 10);
double meanTimeOnQueue = timer.getSnapshot().getMean();
double meanTimeExpected = (start - end) / 3.0D;
assertTrue("Expecting mean time on queue > "+meanTimeExpected+", got " + meanTimeOnQueue,
meanTimeOnQueue > meanTimeExpected);
}
@Test
public void testAsyncQueueDrain() throws Exception {
setupCluster(true, 150, false, null);
runAdminCommands();
assertTrue("Expecting <3 callbacks in buffer, was " + testHarness.get().receiver.getBuffer().size(),
testHarness.get().receiver.getBuffer().size() < 3); // Events still on queue
// We shutdown cluster while events are still in queue
testHarness.get().shutdownCluster();
assertThreeAdminEvents();
final AuditTestHarness harness = testHarness.get();
final Semaphore gate = new Semaphore(0);
CallbackAuditLoggerPlugin.BLOCKING_SEMAPHORES.put("testAsyncQueueDrain_semaphore", gate);
setupCluster(true, "testAsyncQueueDrain_semaphore", false);
final int preShutdownEventsAllowed = TestUtil.nextInt(random(), 0, 2);
final int postShutdownEventsAllowed = 3 - preShutdownEventsAllowed;
// Starting by only allowing 2/3 of the (expected) events to be logged right away...
log.info("Test will allow {} events to happen prior to shutdown", preShutdownEventsAllowed);
gate.release(preShutdownEventsAllowed);
runThreeTestAdminCommands();
final List<AuditEvent> events = new ArrayList
(harness.receiver.waitForAuditEvents(preShutdownEventsAllowed));
assertEquals(preShutdownEventsAllowed, events.size());
// Now shutdown cluster while 1 event still in process
// Do this in a background thread because it blocks...
final Thread shutdownThread = new DefaultSolrThreadFactory("shutdown")
.newThread(() -> { try {
log.info("START Shutting down Cluster.");
harness.shutdownCluster();
log.info("END Shutting down Cluster.");
} catch (Exception e) {
throw new RuntimeException(e);
}
});
try {
shutdownThread.start();
// release the ticket so the event can be processed
log.info("releasing final {} semaphore tickets...", postShutdownEventsAllowed);
gate.release(postShutdownEventsAllowed);
events.addAll(harness.receiver.waitForAuditEvents(postShutdownEventsAllowed));
assertThreeTestAdminEvents(events);
} finally {
shutdownThread.join();
}
}
@Test
public void testMuteAdminListCollections() throws Exception {
setupCluster(false, 0, false, "[ \"type:UNKNOWN\", [ \"path:/admin\", \"param:action=LIST\" ] ]");
runAdminCommands();
setupCluster(false, null, false, "\"type:UNKNOWN\"", "[ \"path:/admin\", \"param:action=LIST\" ]");
runThreeTestAdminCommands();
testHarness.get().shutdownCluster();
waitForAuditEventCallbacks(2);
assertEquals(2, testHarness.get().receiver.getBuffer().size());
final List<AuditEvent> events = testHarness.get().receiver.waitForAuditEvents(2);
assertEquals(2, events.size()); // sanity check
assertAuditEvent(events.get(0), COMPLETED, "/admin/collections", ADMIN, null, 200,
"action", "CLUSTERSTATUS");
assertAuditEvent(events.get(1), COMPLETED, "/admin/collections", ADMIN, null, 200,
"action", "OVERSEERSTATUS");
}
@Test
public void searchWithException() throws Exception {
setupCluster(false, 0, false, null);
try {
testHarness.get().cluster.getSolrClient().request(CollectionAdminRequest.createCollection("test", 1, 1));
setupCluster(false, null, false);
testHarness.get().cluster.getSolrClient().request(CollectionAdminRequest.createCollection("test", 1, 1));
expectThrows(SolrException.class, () -> {
testHarness.get().cluster.getSolrClient().query("test", new MapSolrParams(Collections.singletonMap("q", "a(bc")));
fail("Query should fail");
} catch (SolrException ex) {
waitForAuditEventCallbacks(3);
CallbackReceiver receiver = testHarness.get().receiver;
assertAuditEvent(receiver.popEvent(), COMPLETED, "/admin/cores");
assertAuditEvent(receiver.popEvent(), COMPLETED, "/admin/collections");
assertAuditEvent(receiver.popEvent(), ERROR,"/select", SEARCH, null, 400);
});
final List<AuditEvent> events = testHarness.get().receiver.waitForAuditEvents(3);
assertAuditEvent(events.get(0), COMPLETED, "/admin/cores");
assertAuditEvent(events.get(1), COMPLETED, "/admin/collections");
assertAuditEvent(events.get(2), ERROR,"/select", SEARCH, null, 400);
}
@Test
public void authValid() throws Exception {
setupCluster(false, null, true);
final CloudSolrClient client = testHarness.get().cluster.getSolrClient();
final CallbackReceiver receiver = testHarness.get().receiver;
{ // valid READ requests: #1 with, and #2 without, (valid) Authentication
final CollectionAdminRequest.List req = new CollectionAdminRequest.List();
// we don't block unknown users for READ, so this should succeed
client.request(req);
// Authenticated user (w/valid password) should also succeed
req.setBasicAuthCredentials("solr", SOLR_PASS);
client.request(req);
final List<AuditEvent> events = receiver.waitForAuditEvents(2);
assertAuditEvent(events.get(0), COMPLETED, "/admin/collections", ADMIN, null, 200, "action", "LIST");
assertAuditEvent(events.get(1), COMPLETED, "/admin/collections", ADMIN, "solr", 200, "action", "LIST");
}
{ // valid CREATE request: Authenticated admin user should be allowed to CREATE collection
final Create req = CollectionAdminRequest.createCollection("test_create", 1, 1);
req.setBasicAuthCredentials("solr", SOLR_PASS);
client.request(req);
// collection createion leads to AuditEvent's for the core as well...
final List<AuditEvent> events = receiver.waitForAuditEvents(2);
assertAuditEvent(events.get(0), COMPLETED, "/admin/cores", ADMIN, null, 200, "action", "CREATE");
assertAuditEvent(events.get(1), COMPLETED, "/admin/collections", ADMIN, null, 200, "action", "CREATE");
}
}
@Test
public void auth() throws Exception {
setupCluster(false, 0, true, null);
CloudSolrClient client = testHarness.get().cluster.getSolrClient();
try {
CollectionAdminRequest.List request = new CollectionAdminRequest.List();
client.request(request);
request.setBasicAuthCredentials("solr", "SolrRocks");
client.request(request);
CollectionAdminRequest.Create createRequest = CollectionAdminRequest.createCollection("test", 1, 1);
client.request(createRequest);
fail("Call should fail with 401");
} catch (SolrException ex) {
waitForAuditEventCallbacks(3);
CallbackReceiver receiver = testHarness.get().receiver;
assertAuditEvent(receiver.popEvent(), COMPLETED, "/admin/collections", ADMIN, null, 200, "action", "LIST");
AuditEvent e = receiver.popEvent();
System.out.println(new AuditLoggerPlugin.JSONAuditEventFormatter().formatEvent(e));
assertAuditEvent(e, COMPLETED, "/admin/collections", ADMIN, "solr", 200, "action", "LIST");
assertAuditEvent(receiver.popEvent(), REJECTED, "/admin/collections", ADMIN, null,401);
public void authFailures() throws Exception {
setupCluster(false, null, true);
final CloudSolrClient client = testHarness.get().cluster.getSolrClient();
final CallbackReceiver receiver = testHarness.get().receiver;
{ // invalid request: Authenticated user not allowed to CREATE w/o Authorization
final SolrException e = expectThrows(SolrException.class, () -> {
final Create createRequest = CollectionAdminRequest.createCollection("test_jimbo", 1, 1);
createRequest.setBasicAuthCredentials("jimbo", JIMBO_PASS);
client.request(createRequest);
});
assertEquals(403, e.code());
final List<AuditEvent> events = receiver.waitForAuditEvents(1);
assertAuditEvent(events.get(0), UNAUTHORIZED, "/admin/collections", ADMIN, "jimbo", 403, "name", "test_jimbo");
}
try {
CollectionAdminRequest.Create createRequest = CollectionAdminRequest.createCollection("test", 1, 1);
createRequest.setBasicAuthCredentials("solr", "wrongPW");
client.request(createRequest);
fail("Call should fail with 401");
} catch (SolrException ex) {
waitForAuditEventCallbacks(1);
CallbackReceiver receiver = testHarness.get().receiver;
assertAuditEvent(receiver.popEvent(), REJECTED, "/admin/collections", ADMIN, null, 401);
{ // invalid request: Anon user not allowed to CREATE w/o authentication + authorization
final SolrException e = expectThrows(SolrException.class, () -> {
Create createRequest = CollectionAdminRequest.createCollection("test_anon", 1, 1);
client.request(createRequest);
});
assertEquals(401, e.code());
final List<AuditEvent> events = receiver.waitForAuditEvents(1);
assertAuditEvent(events.get(0), REJECTED, "/admin/collections", ADMIN, null, 401, "name", "test_anon");
}
{ // invalid request: Admin user not Authenticated due to incorrect password
final SolrException e = expectThrows(SolrException.class, () -> {
Create createRequest = CollectionAdminRequest.createCollection("test_wrongpass", 1, 1);
createRequest.setBasicAuthCredentials("solr", "wrong_" + SOLR_PASS);
client.request(createRequest);
});
assertEquals(401, e.code());
final List<AuditEvent> events = receiver.waitForAuditEvents(1);
// Event generated from HttpServletRequest. Has no user since auth failed
assertAuditEvent(events.get(0), REJECTED, "/admin/collections", RequestType.ADMIN, null, 401);
}
}
private void assertAuditEvent(AuditEvent e, EventType type, String path, String... params) {
private static void assertAuditEvent(AuditEvent e, EventType type, String path, String... params) {
assertAuditEvent(e, type, path, null, null,null, params);
}
private void assertAuditEvent(AuditEvent e, EventType type, String path, RequestType requestType, String username, Integer status, String... params) {
assertEquals(type, e.getEventType());
assertEquals(path, e.getResource());
if (requestType != null) {
assertEquals(requestType, e.getRequestType());
}
if (username != null) {
assertEquals(username, e.getUsername());
}
if (status != null) {
assertEquals(status.intValue(), e.getStatus());
}
if (params != null && params.length > 0) {
List<String> p = new LinkedList<>(Arrays.asList(params));
while (p.size() >= 2) {
String val = e.getSolrParamAsString(p.get(0));
assertEquals(p.get(1), val);
p.remove(0);
p.remove(0);
private static void assertAuditEvent(AuditEvent e, EventType type, String path, RequestType requestType, String username, Integer status, String... params) {
try {
assertEquals(type, e.getEventType());
assertEquals(path, e.getResource());
if (requestType != null) {
assertEquals(requestType, e.getRequestType());
}
}
}
private void waitForAuditEventCallbacks(int number) throws InterruptedException {
waitForAuditEventCallbacks(number, 30);
}
private void waitForAuditEventCallbacks(int number, int timeoutSeconds) throws InterruptedException {
CallbackReceiver receiver = testHarness.get().receiver;
int count = 0;
while(receiver.buffer.size() < number) {
Thread.sleep(100);
if (++count >= timeoutSeconds*10) fail("Failed waiting for " + number + " callbacks after " + timeoutSeconds + " seconds");
if (username != null) {
assertEquals(username, e.getUsername());
}
if (status != null) {
assertEquals(status.intValue(), e.getStatus());
}
if (params != null && params.length > 0) {
List<String> p = new LinkedList<>(Arrays.asList(params));
while (p.size() >= 2) {
String val = e.getSolrParamAsString(p.get(0));
assertEquals(p.get(1), val);
p.remove(0);
p.remove(0);
}
}
} catch (AssertionError ae) {
throw new AssertionError(formatter.formatEvent(e) + " => " + ae.getMessage(), ae);
}
}
@ -240,56 +332,82 @@ public class AuditLoggerIntegrationTest extends SolrCloudAuthTestCase {
});
return registries;
}
private void runAdminCommands() throws IOException, SolrServerException {
/** @see #assertThreeTestAdminEvents */
private void runThreeTestAdminCommands() throws IOException, SolrServerException {
SolrClient client = testHarness.get().cluster.getSolrClient();
CollectionAdminRequest.listCollections(client);
client.request(getClusterStatus());
client.request(getOverseerStatus());
}
private void assertThreeAdminEvents() throws Exception {
CallbackReceiver receiver = testHarness.get().receiver;
waitForAuditEventCallbacks(3);
assertEquals(3, receiver.getTotalCount());
assertEquals(3, receiver.getCountForPath("/admin/collections"));
AuditEvent e = receiver.getBuffer().pop();
assertEquals(COMPLETED, e.getEventType());
assertEquals("GET", e.getHttpMethod());
assertEquals("action=LIST&wt=javabin&version=2", e.getHttpQueryString());
assertEquals("LIST", e.getSolrParamAsString("action"));
assertEquals("javabin", e.getSolrParamAsString("wt"));
e = receiver.getBuffer().pop();
assertEquals(COMPLETED, e.getEventType());
assertEquals("GET", e.getHttpMethod());
assertEquals("CLUSTERSTATUS", e.getSolrParamAsString("action"));
e = receiver.getBuffer().pop();
assertEquals(COMPLETED, e.getEventType());
assertEquals("GET", e.getHttpMethod());
assertEquals("OVERSEERSTATUS", e.getSolrParamAsString("action"));
/** @see #runThreeTestAdminCommands */
private void assertThreeTestAdminEvents() throws Exception {
final CallbackReceiver receiver = testHarness.get().receiver;
final List<AuditEvent> events = receiver.waitForAuditEvents(3);
assertThreeTestAdminEvents(events);
}
/** @see #runThreeTestAdminCommands */
private static void assertThreeTestAdminEvents(final List<AuditEvent> events) throws Exception {
assertEquals(3, events.size()); // sanity check
assertAuditEvent(events.get(0), COMPLETED, "/admin/collections", ADMIN, null, 200,
"action", "LIST", "wt", "javabin");
assertAuditEvent(events.get(1), COMPLETED, "/admin/collections", ADMIN, null, 200,
"action", "CLUSTERSTATUS");
assertAuditEvent(events.get(2), COMPLETED, "/admin/collections", ADMIN, null, 200,
"action", "OVERSEERSTATUS");
}
private static String SOLR_PASS = "SolrRocks";
private static String JIMBO_PASS = "JimIsCool";
private static String AUTH_SECTION = ",\n" +
" \"authentication\":{\n" +
" \"blockUnknown\":\"false\",\n" +
" \"class\":\"solr.BasicAuthPlugin\",\n" +
" \"credentials\":{\"solr\":\"orwp2Ghgj39lmnrZOTm7Qtre1VqHFDfwAEzr0ApbN3Y= Ju5osoAqOX8iafhWpPP01E5P+sg8tK8tHON7rCYZRRw=\"}},\n" +
" \"credentials\":{\"solr\":\"" + getSaltedHashedValue(SOLR_PASS) + "\"," +
" \"jimbo\":\"" + getSaltedHashedValue(JIMBO_PASS) + "\"}},\n" +
" \"authorization\":{\n" +
" \"class\":\"solr.RuleBasedAuthorizationPlugin\",\n" +
" \"user-role\":{\"solr\":\"admin\"},\n" +
" \"permissions\":[{\"name\":\"collection-admin-edit\",\"role\":\"admin\"}]\n" +
" }\n";
private void setupCluster(boolean async, int delay, boolean enableAuth, String muteRulesJson) throws Exception {
/**
* Starts the cluster with a security.json built from template, using CallbackAuditLoggerPlugin. The params
* to this method will fill the template.
* @param async enable async audit logging
* @param semaphoreName name of semaphore for controlling how to delay logging
* @param enableAuth should authentication be enabled in this cluster?
* @param muteRulesJson mute rules to trim down what events we care about in our tests
* @throws Exception if anything goes wrong
*/
private void setupCluster(boolean async, String semaphoreName, boolean enableAuth, String... muteRulesJson) throws Exception {
String securityJson = FileUtils.readFileToString(TEST_PATH().resolve("security").resolve("auditlog_plugin_security.json").toFile(), StandardCharsets.UTF_8);
securityJson = securityJson.replace("_PORT_", Integer.toString(testHarness.get().callbackPort));
securityJson = securityJson.replace("_ASYNC_", Boolean.toString(async));
securityJson = securityJson.replace("_DELAY_", Integer.toString(delay));
securityJson = securityJson.replace("_SEMAPHORE_",
null == semaphoreName ? "null" : "\""+semaphoreName+"\"");
securityJson = securityJson.replace("_AUTH_", enableAuth ? AUTH_SECTION : "");
securityJson = securityJson.replace("_MUTERULES_", muteRulesJson != null ? muteRulesJson : "[]");
// start with any test specific mute rules...
final List<String> muteRules = new ArrayList<>(Arrays.asList(muteRulesJson));
// for test purposes, ignore any intranode /metrics requests...
muteRules.add("\"path:/admin/metrics\"");
// With auth enabled we're also getting /admin/info/key requests
// So for test purposes, we're automatically MUTEing those when auth is enabled...
if (enableAuth) {
muteRules.add("\"path:/admin/info/key\"");
}
securityJson = securityJson.replace("_MUTERULES_", "[" + StringUtils.join(muteRules, ",") + "]");
MiniSolrCloudCluster myCluster = new Builder(NUM_SERVERS, createTempDir())
.withSecurityJson(securityJson)
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
@ -305,22 +423,12 @@ public class AuditLoggerIntegrationTest extends SolrCloudAuthTestCase {
*/
private class CallbackReceiver implements Runnable, AutoCloseable {
private final ServerSocket serverSocket;
private AtomicInteger count = new AtomicInteger();
private Map<String,AtomicInteger> resourceCounts = new HashMap<>();
private LinkedList<AuditEvent> buffer = new LinkedList<>();
private BlockingQueue<AuditEvent> queue = new LinkedBlockingDeque<>();
CallbackReceiver() throws IOException {
serverSocket = new ServerSocket(0);
}
int getTotalCount() {
return count.get();
}
int getCountForPath(String path) {
return resourceCounts.getOrDefault(path, new AtomicInteger()).get();
}
public int getPort() {
return serverSocket.getLocalPort();
}
@ -335,18 +443,10 @@ public class AuditLoggerIntegrationTest extends SolrCloudAuthTestCase {
if (!reader.ready()) continue;
ObjectMapper om = new ObjectMapper();
om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
AuditEvent event = om.readValue(reader.readLine(), AuditEvent.class);
buffer.add(event);
String r = event.getResource();
log.info("Received audit event for path " + r);
count.incrementAndGet();
AtomicInteger resourceCounter = resourceCounts.get(r);
if (resourceCounter == null) {
resourceCounter = new AtomicInteger(1);
resourceCounts.put(r, resourceCounter);
} else {
resourceCounter.incrementAndGet();
}
final String msg = reader.readLine();
final AuditEvent event = om.readValue(msg, AuditEvent.class);
log.info("Received {}: {}", event, msg);
queue.add(event);
}
} catch (IOException e) {
log.info("Socket closed", e);
@ -356,14 +456,22 @@ public class AuditLoggerIntegrationTest extends SolrCloudAuthTestCase {
@Override
public void close() throws Exception {
serverSocket.close();
assertEquals("Unexpected AuditEvents still in the queue",
Collections.emptyList(), new LinkedList<>(queue));
}
protected LinkedList<AuditEvent> getBuffer() {
return buffer;
}
protected AuditEvent popEvent() {
return buffer.pop();
public List<AuditEvent> waitForAuditEvents(final int expected) throws InterruptedException {
final LinkedList<AuditEvent> results = new LinkedList<>();
for (int i = 1; i <= expected; i++) { // NOTE: counting from 1 for error message readabiity...
final AuditEvent e = queue.poll(120, TimeUnit.SECONDS);
if (null == e) {
fail("did not recieved expected event #" + i + "/" + expected
+ " even after waiting an excessive amount of time");
}
log.info("Waited for and recieved event: {}", e);
results.add(e);
}
return results;
}
}

View File

@ -22,6 +22,8 @@ import java.io.PrintWriter;
import java.lang.invoke.MethodHandles;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Semaphore;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
@ -32,27 +34,32 @@ import org.slf4j.LoggerFactory;
*/
public class CallbackAuditLoggerPlugin extends AuditLoggerPlugin {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final Map<String,Semaphore> BLOCKING_SEMAPHORES = new HashMap<>();
private int callbackPort;
private Socket socket;
private PrintWriter out;
private int delay;
private Semaphore semaphore = null;
/**
* Opens a socket to send a callback, e.g. to a running test client
* @param event the audit event
*/
@Override
public void audit(AuditEvent event) {
if (delay > 0) {
log.info("Sleeping for {}ms before sending callback", delay);
if (null != semaphore) {
log.info("Waiting to acquire ticket from semaphore");
try {
Thread.sleep(delay);
semaphore.acquire();
} catch (InterruptedException e) {
log.warn("audit() interrupted while waiting to send callback, should not happen");
log.warn("audit() interrupted while waiting for ticket, probably due to shutdown, aborting");
return;
}
}
out.write(formatter.formatEvent(event) + "\n");
out.flush();
if (! out.checkError()) {
log.error("Output stream has an ERROR!");
}
log.info("Sent audit callback {} to localhost:{}", formatter.formatEvent(event), callbackPort);
}
@ -60,7 +67,13 @@ public class CallbackAuditLoggerPlugin extends AuditLoggerPlugin {
public void init(Map<String, Object> pluginConfig) {
super.init(pluginConfig);
callbackPort = Integer.parseInt((String) pluginConfig.get("callbackPort"));
delay = Integer.parseInt((String) pluginConfig.get("delay"));
final String semaphoreName = (String) pluginConfig.get("semaphore");
if (null != semaphoreName) {
semaphore = BLOCKING_SEMAPHORES.get(semaphoreName);
if (null == semaphore) {
throw new RuntimeException("Test did not setup semaphore of specified name: " + semaphoreName);
}
}
try {
socket = new Socket("localhost", callbackPort);
out = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8), true);