Merge pull request #1947 from noddi/feature/count-parameter-history-endpoints

Add count parameter to history endpoints
This commit is contained in:
Fangjin Yang 2015-11-12 10:23:44 -08:00
commit 4f46d457f1
10 changed files with 417 additions and 6 deletions

View File

@ -62,4 +62,21 @@ public interface AuditManager
*/
public List<AuditEntry> fetchAuditHistory(String type, Interval interval);
/**
* Provides last N entries of audit history for given key, type
* @param key
* @param type
* @param limit
* @return list of AuditEntries satisfying the passed parameters
*/
public List<AuditEntry> fetchAuditHistory(String key, String type, int limit);
/**
* Provides last N entries of audit history for given type
* @param type type of auditEntry
* @param limit
* @return list of AuditEntries satisfying the passed parameters
*/
public List<AuditEntry> fetchAuditHistory(String type, int limit);
}

View File

@ -90,3 +90,9 @@ http://<COORDINATOR_IP>:<PORT>/druid/coordinator/v1/config/history?interval=<int
```
default value of interval can be specified by setting `druid.audit.manager.auditHistoryMillis` (1 week if not configured) in coordinator runtime.properties
To view last <n> entries of the audit history of coordinator dynamic config issue a GET request to the URL -
```
http://<COORDINATOR_IP>:<PORT>/druid/coordinator/v1/config/history?count=<n>
```

View File

@ -169,6 +169,12 @@ http://<OVERLORD_IP>:<port>/druid/indexer/v1/worker/history?interval=<interval>
default value of interval can be specified by setting `druid.audit.manager.auditHistoryMillis` (1 week if not configured) in overlord runtime.properties.
To view last <n> entries of the audit history of worker config issue a GET request to the URL -
```
http://<OVERLORD_IP>:<port>/druid/indexer/v1/worker/history?count=<n>
```
#### Worker Select Strategy
##### Fill Capacity

View File

@ -193,6 +193,10 @@ Returns all rules for a specified datasource and includes default datasource.
Returns audit history of rules for a specified datasource. default value of interval can be specified by setting `druid.audit.manager.auditHistoryMillis` (1 week if not configured) in coordinator runtime.properties
* `/druid/coordinator/v1/rules/{dataSourceName}/history?count=<n>`
Returns last <n> entries of audit history of rules for a specified datasource.
#### Intervals
* `/druid/coordinator/v1/intervals`

View File

@ -29,6 +29,7 @@ import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.common.config.JacksonConfigManager;
@ -46,6 +47,7 @@ import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.metadata.EntryExistsException;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -62,6 +64,8 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@ -219,10 +223,28 @@ public class OverlordResource
@Path("/worker/history")
@Produces(MediaType.APPLICATION_JSON)
public Response getWorkerConfigHistory(
@QueryParam("interval") final String interval
@QueryParam("interval") final String interval,
@QueryParam("count") final Integer count
)
{
Interval theInterval = interval == null ? null : new Interval(interval);
if (theInterval == null && count != null) {
try {
return Response.ok(
auditManager.fetchAuditHistory(
WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.CONFIG_KEY,
count
)
)
.build();
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.<String, Object>of("error", e.getMessage()))
.build();
}
}
return Response.ok(
auditManager.fetchAuditHistory(
WorkerBehaviorConfig.CONFIG_KEY,

View File

@ -22,16 +22,19 @@ import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.audit.AuditEntry;
import io.druid.audit.AuditManager;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Json;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
@ -40,6 +43,7 @@ import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@ManageLifecycle
public class SQLAuditManager implements AuditManager
@ -166,6 +170,14 @@ public class SQLAuditManager implements AuditManager
return theInterval;
}
private int getLimit(int limit) throws IllegalArgumentException
{
if (limit < 1) {
throw new IllegalArgumentException("Limit must be greater than zero!");
}
return limit;
}
@Override
public List<AuditEntry> fetchAuditHistory(final String type, Interval interval)
{
@ -207,4 +219,63 @@ public class SQLAuditManager implements AuditManager
);
}
@Override
public List<AuditEntry> fetchAuditHistory(final String key, final String type, int limit)
throws IllegalArgumentException
{
return fetchAuditHistoryLastEntries(key, type, limit);
}
@Override
public List<AuditEntry> fetchAuditHistory(final String type, int limit)
throws IllegalArgumentException
{
return fetchAuditHistoryLastEntries(null, type, limit);
}
private List<AuditEntry> fetchAuditHistoryLastEntries(final String key, final String type, int limit)
throws IllegalArgumentException
{
final int theLimit = getLimit(limit);
String queryString = String.format("SELECT payload FROM %s WHERE type = :type", getAuditTable());
if (key != null) {
queryString += " and audit_key = :audit_key";
}
queryString += " ORDER BY created_date DESC";
final String theQueryString = queryString;
return dbi.withHandle(
new HandleCallback<List<AuditEntry>>()
{
@Override
public List<AuditEntry> withHandle(Handle handle) throws Exception
{
Query<Map<String, Object>> query = handle.createQuery(theQueryString);
if (key != null) {
query.bind("audit_key", key);
}
return query.bind("type", type)
.setMaxRows(theLimit)
.map(
new ResultSetMapper<AuditEntry>()
{
@Override
public AuditEntry map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
try {
return jsonMapper.readValue(r.getBytes("payload"), AuditEntry.class);
}
catch (IOException e) {
throw new SQLException(e);
}
}
}
)
.list();
}
}
);
}
}

View File

@ -21,8 +21,11 @@ import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.common.config.JacksonConfigManager;
import io.druid.server.coordinator.CoordinatorDynamicConfig;
import org.joda.time.Interval;
import com.google.common.collect.ImmutableMap;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
@ -90,10 +93,28 @@ public class CoordinatorDynamicConfigsResource
@Path("/history")
@Produces(MediaType.APPLICATION_JSON)
public Response getDatasourceRuleHistory(
@QueryParam("interval") final String interval
@QueryParam("interval") final String interval,
@QueryParam("count") final Integer count
)
{
Interval theInterval = interval == null ? null : new Interval(interval);
if (theInterval == null && count != null) {
try {
return Response.ok(
auditManager.fetchAuditHistory(
CoordinatorDynamicConfig.CONFIG_KEY,
CoordinatorDynamicConfig.CONFIG_KEY,
count
)
)
.build();
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.<String, Object>of("error", e.getMessage()))
.build();
}
}
return Response.ok(
auditManager.fetchAuditHistory(
CoordinatorDynamicConfig.CONFIG_KEY,

View File

@ -19,10 +19,12 @@ package io.druid.server.http;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.metadata.MetadataRuleManager;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.Interval;
import javax.servlet.http.HttpServletRequest;
@ -38,6 +40,8 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.util.List;
/**
@ -108,10 +112,22 @@ public class RulesResource
@Produces(MediaType.APPLICATION_JSON)
public Response getDatasourceRuleHistory(
@PathParam("dataSourceName") final String dataSourceName,
@QueryParam("interval") final String interval
@QueryParam("interval") final String interval,
@QueryParam("count") final Integer count
)
{
Interval theInterval = interval == null ? null : new Interval(interval);
if (theInterval == null && count != null) {
try {
return Response.ok(auditManager.fetchAuditHistory(dataSourceName, "rules", count))
.build();
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.<String, Object>of("error", e.getMessage()))
.build();
}
}
return Response.ok(auditManager.fetchAuditHistory(dataSourceName, "rules", theInterval))
.build();
}

View File

@ -47,7 +47,6 @@ public class SQLAuditManagerTest
private final ObjectMapper mapper = new DefaultObjectMapper();
@Before
public void setUp() throws Exception
{
@ -135,6 +134,102 @@ public class SQLAuditManagerTest
Assert.assertEquals(entry, auditEntries.get(1));
}
@Test
public void testFetchAuditHistoryByKeyAndTypeWithLimit() throws IOException
{
AuditEntry entry1 = new AuditEntry(
"testKey1",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
"testPayload",
new DateTime("2013-01-01T00:00:00Z")
);
AuditEntry entry2 = new AuditEntry(
"testKey2",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
"testPayload",
new DateTime("2013-01-02T00:00:00Z")
);
auditManager.doAudit(entry1);
auditManager.doAudit(entry2);
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(
"testKey1",
"testType",
1
);
Assert.assertEquals(1, auditEntries.size());
Assert.assertEquals(entry1, auditEntries.get(0));
}
@Test
public void testFetchAuditHistoryByTypeWithLimit() throws IOException
{
AuditEntry entry1 = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
"testPayload",
new DateTime("2013-01-01T00:00:00Z")
);
AuditEntry entry2 = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
"testPayload",
new DateTime("2013-01-02T00:00:00Z")
);
AuditEntry entry3 = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
"testPayload",
new DateTime("2013-01-03T00:00:00Z")
);
auditManager.doAudit(entry1);
auditManager.doAudit(entry2);
auditManager.doAudit(entry3);
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(
"testType",
2
);
Assert.assertEquals(2, auditEntries.size());
Assert.assertEquals(entry3, auditEntries.get(0));
Assert.assertEquals(entry2, auditEntries.get(1));
}
@Test(expected=IllegalArgumentException.class)
public void testFetchAuditHistoryLimitBelowZero() throws IOException
{
auditManager.fetchAuditHistory("testType", -1);
}
@Test(expected=IllegalArgumentException.class)
public void testFetchAuditHistoryLimitZero() throws IOException
{
auditManager.fetchAuditHistory("testType", 0);
}
@After
public void cleanup()
{
@ -156,6 +251,4 @@ public class SQLAuditManagerTest
}
);
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.http;
import com.google.common.collect.ImmutableList;
import io.druid.audit.AuditEntry;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.metadata.MetadataRuleManager;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.Map;
public class RulesResourceTest
{
private MetadataRuleManager databaseRuleManager;
private AuditManager auditManager;
@Before
public void setUp()
{
databaseRuleManager = EasyMock.createStrictMock(MetadataRuleManager.class);
auditManager = EasyMock.createStrictMock(AuditManager.class);
}
@Test
public void testGetDatasourceRuleHistoryWithCount()
{
AuditEntry entry1 = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
"testPayload",
new DateTime("2013-01-02T00:00:00Z")
);
AuditEntry entry2 = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
"testPayload",
new DateTime("2013-01-01T00:00:00Z")
);
EasyMock.expect(auditManager.fetchAuditHistory(EasyMock.eq("datasource1"), EasyMock.eq("rules"), EasyMock.eq(2)))
.andReturn(ImmutableList.of(entry1, entry2))
.once();
EasyMock.replay(auditManager);
RulesResource rulesResource = new RulesResource(databaseRuleManager, auditManager);
Response response = rulesResource.getDatasourceRuleHistory("datasource1", null, 2);
List<AuditEntry> rulesHistory = (List) response.getEntity();
Assert.assertEquals(2, rulesHistory.size());
Assert.assertEquals(entry1, rulesHistory.get(0));
Assert.assertEquals(entry2, rulesHistory.get(1));
EasyMock.verify(auditManager);
}
@Test
public void testGetDatasourceRuleHistoryWithInterval()
{
String interval = "P2D/2013-01-02T00:00:00Z";
Interval theInterval = new Interval(interval);
AuditEntry entry1 = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
"testPayload",
new DateTime("2013-01-02T00:00:00Z")
);
AuditEntry entry2 = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
"testPayload",
new DateTime("2013-01-01T00:00:00Z")
);
EasyMock.expect(auditManager.fetchAuditHistory(EasyMock.eq("datasource1"), EasyMock.eq("rules"), EasyMock.eq(theInterval)))
.andReturn(ImmutableList.of(entry1, entry2))
.once();
EasyMock.replay(auditManager);
RulesResource rulesResource = new RulesResource(databaseRuleManager, auditManager);
Response response = rulesResource.getDatasourceRuleHistory("datasource1", interval, null);
List<AuditEntry> rulesHistory = (List) response.getEntity();
Assert.assertEquals(2, rulesHistory.size());
Assert.assertEquals(entry1, rulesHistory.get(0));
Assert.assertEquals(entry2, rulesHistory.get(1));
EasyMock.verify(auditManager);
}
@Test
public void testGetDatasourceRuleHistoryWithWrongCount()
{
EasyMock.expect(auditManager.fetchAuditHistory(EasyMock.eq("datasource1"), EasyMock.eq("rules"), EasyMock.eq(-1)))
.andThrow(new IllegalArgumentException("Limit must be greater than zero!"))
.once();
EasyMock.replay(auditManager);
RulesResource rulesResource = new RulesResource(databaseRuleManager, auditManager);
Response response = rulesResource.getDatasourceRuleHistory("datasource1", null, -1);
Map<String, Object> rulesHistory = (Map) response.getEntity();
Assert.assertEquals(400, response.getStatus());
Assert.assertTrue(rulesHistory.containsKey("error"));
Assert.assertEquals("Limit must be greater than zero!", rulesHistory.get("error"));
EasyMock.verify(auditManager);
}
}