Removes compile warnings (elastic/elasticsearch#463)

These warnings include:
* Resource not used
* Resource leak due to not being closed

Original commit: elastic/x-pack-elasticsearch@e0fb068a0c
This commit is contained in:
Colin Goodheart-Smithe 2016-12-05 11:24:56 +00:00 committed by GitHub
parent 98eb5534ee
commit a8d2cf16b9
28 changed files with 184 additions and 230 deletions

View File

@ -19,7 +19,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

View File

@ -43,8 +43,6 @@ import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class GetBucketsAction extends Action<GetBucketsAction.Request, GetBucketsAction.Response, GetBucketsAction.RequestBuilder> {
public static final GetBucketsAction INSTANCE = new GetBucketsAction();

View File

@ -37,7 +37,6 @@ import org.elasticsearch.xpack.prelert.job.manager.JobManager;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.SingleDocument;
import java.io.IOException;
import java.util.List;

View File

@ -73,9 +73,9 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
private final ConcurrentMap<String, AutodetectCommunicator> autoDetectCommunicatorByJob;
public AutodetectProcessManager(Settings settings, Client client, ThreadPool threadPool, JobManager jobManager,
JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser,
AutodetectProcessFactory autodetectProcessFactory) {
JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister, AutodetectResultsParser parser,
AutodetectProcessFactory autodetectProcessFactory) {
super(settings);
this.client = client;
this.threadPool = threadPool;
@ -133,22 +133,23 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
ExecutorService executorService = threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME);
UsageReporter usageReporter = new UsageReporter(settings, job.getId(), usagePersister);
StatusReporter statusReporter = new StatusReporter(threadPool, settings, job.getId(),
jobProvider.dataCounts(jobId), usageReporter, jobDataCountsPersister);
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormaliser(), jobResultsPersister, parser);
try (StatusReporter statusReporter = new StatusReporter(threadPool, settings, job.getId(), jobProvider.dataCounts(jobId),
usageReporter, jobDataCountsPersister)) {
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormaliser(), jobResultsPersister, parser);
AutodetectProcess process = null;
try {
process = autodetectProcessFactory.createAutodetectProcess(job, ignoreDowntime, executorService);
// TODO Port the normalizer from the old project
return new AutodetectCommunicator(executorService, job, process, statusReporter, processor, stateProcessor);
} catch (Exception e) {
AutodetectProcess process = null;
try {
IOUtils.close(process);
} catch (IOException ioe) {
logger.error("Can't close autodetect", ioe);
process = autodetectProcessFactory.createAutodetectProcess(job, ignoreDowntime, executorService);
// TODO Port the normalizer from the old project
return new AutodetectCommunicator(executorService, job, process, statusReporter, processor, stateProcessor);
} catch (Exception e) {
try {
IOUtils.close(process);
} catch (IOException ioe) {
logger.error("Can't close autodetect", ioe);
}
throw e;
}
throw e;
}
}

View File

@ -158,29 +158,30 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
this.jobId = jobId;
}
@SuppressWarnings("incomplete-switch")
public void setStatus(JobStatus newStatus) {
switch (newStatus) {
case CLOSING:
if (this.status == JobStatus.CLOSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] job already closed");
}
if (this.status == JobStatus.CLOSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] job already closing");
}
break;
case PAUSING:
if (this.status == JobStatus.CLOSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is closed");
}
if (this.status == JobStatus.CLOSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is closing");
}
if (this.status == JobStatus.PAUSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is pausing");
}
if (this.status == JobStatus.PAUSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is already paused");
}
case CLOSING:
if (this.status == JobStatus.CLOSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] job already closed");
}
if (this.status == JobStatus.CLOSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] job already closing");
}
break;
case PAUSING:
if (this.status == JobStatus.CLOSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is closed");
}
if (this.status == JobStatus.CLOSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is closing");
}
if (this.status == JobStatus.PAUSING) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is pausing");
}
if (this.status == JobStatus.PAUSED) {
throw new IllegalArgumentException("[" + jobId + "][" + status +"] can't pause a job that is already paused");
}
}
this.status = newStatus;
@ -211,7 +212,7 @@ public class Allocation extends AbstractDiffable<Allocation> implements ToXConte
break;
case STOPPED:
if ((currentSchedulerStatus != JobSchedulerStatus.STOPPED ||
currentSchedulerStatus != JobSchedulerStatus.STOPPING) == false) {
currentSchedulerStatus != JobSchedulerStatus.STOPPING) == false) {
String msg = Messages.getMessage(Messages.JOB_SCHEDULER_CANNOT_STOP_IN_CURRENT_STATE, jobId, newSchedulerStatus);
throw ExceptionsHelper.conflictStatusException(msg);
}

View File

@ -12,12 +12,10 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import java.io.IOException;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
class ElasticsearchBatchedModelDebugOutputIterator extends ElasticsearchBatchedDocumentsIterator<ModelDebugOutput>
{
public ElasticsearchBatchedModelDebugOutputIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher)

View File

@ -5,9 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import java.io.IOException;
import java.util.Locale;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
@ -17,9 +14,10 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import java.io.IOException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister.getJobIndexName;
@ -52,7 +50,7 @@ public class JobDataCountsPersister extends AbstractComponent {
try {
XContentBuilder content = serialiseCounts(counts);
client.prepareIndex(getJobIndexName(jobId), DataCounts.TYPE.getPreferredName(), jobId + DataCounts.DOCUMENT_SUFFIX)
.setSource(content).execute(new ActionListener<IndexResponse>() {
.setSource(content).execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
listener.onResponse(true);

View File

@ -10,7 +10,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
@ -74,14 +73,14 @@ public class JobResultsPersister extends AbstractComponent {
String indexName = getJobIndexName(jobId);
logger.trace("[{}] ES API CALL: index result type {} to index {} at epoch {}", jobId, Bucket.RESULT_TYPE_VALUE, indexName,
bucket.getEpoch());
IndexResponse response = client.prepareIndex(indexName, Result.TYPE.getPreferredName())
.setSource(content)
.execute().actionGet();
client.prepareIndex(indexName, Result.TYPE.getPreferredName()).setSource(content).execute()
.actionGet();
persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(),
bucket.isInterim());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error persisting bucket", new Object[] {jobId}, e));
}
}
/**
@ -158,8 +157,8 @@ public class JobResultsPersister extends AbstractComponent {
logger.trace("[{}] ES API CALL: index result type {} to index {} at timestamp {}",
jobId, PerPartitionMaxProbabilities.RESULT_TYPE_VALUE, indexName, partitionProbabilities.getTimestamp());
client.prepareIndex(indexName, Result.TYPE.getPreferredName())
.setSource(builder)
.execute().actionGet();
.setSource(builder)
.execute().actionGet();
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] error updating bucket per partition max normalized scores",
new Object[]{jobId}, e));
@ -251,8 +250,8 @@ public class JobResultsPersister extends AbstractComponent {
byte[] bytes = bytesRef.toBytesRef().bytes;
logger.trace("[{}] ES API CALL: bulk index", jobId);
client.prepareBulk()
.add(bytes, 0, bytes.length)
.execute().actionGet();
.add(bytes, 0, bytes.length)
.execute().actionGet();
} catch (Exception e) {
logger.error((org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("[{}] Error persisting bulk state", jobId), e);
@ -307,7 +306,7 @@ public class JobResultsPersister extends AbstractComponent {
}
void persistBucketInfluencersStandalone(String jobId, String bucketId, List<BucketInfluencer> bucketInfluencers,
Date bucketTime, boolean isInterim) throws IOException {
Date bucketTime, boolean isInterim) throws IOException {
if (bucketInfluencers != null && bucketInfluencers.isEmpty() == false) {
BulkRequestBuilder addBucketInfluencersRequest = client.prepareBulk();
for (BucketInfluencer bucketInfluencer : bucketInfluencers) {
@ -330,7 +329,7 @@ public class JobResultsPersister extends AbstractComponent {
}
private XContentBuilder serialiseBucketInfluencerStandalone(BucketInfluencer bucketInfluencer,
Date bucketTime, boolean isInterim) throws IOException {
Date bucketTime, boolean isInterim) throws IOException {
BucketInfluencer influencer = new BucketInfluencer(bucketInfluencer);
influencer.setIsInterim(isInterim);
influencer.setTimestamp(bucketTime);
@ -354,7 +353,7 @@ public class JobResultsPersister extends AbstractComponent {
private final Serialiser serialiser;
Persistable(String jobId, Object object, Supplier<String> typeSupplier, Supplier<String> idSupplier,
Serialiser serialiser) {
Serialiser serialiser) {
this.jobId = jobId;
this.object = object;
this.typeSupplier = typeSupplier;
@ -376,8 +375,8 @@ public class JobResultsPersister extends AbstractComponent {
try {
String indexName = getJobIndexName(jobId);
client.prepareIndex(indexName, type, idSupplier.get())
.setSource(serialiser.serialise())
.execute().actionGet();
.setSource(serialiser.serialise())
.execute().actionGet();
return true;
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, typeSupplier.get()}, e));

View File

@ -11,7 +11,6 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.env.Environment;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
@ -33,7 +32,6 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;

View File

@ -298,32 +298,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
partitionScores = scores;
}
/**
* Box class for the stream collector function below
*/
private final class DoubleMaxBox {
private double value = 0.0;
public DoubleMaxBox() {
}
public void accept(double d) {
if (d > value) {
value = d;
}
}
public DoubleMaxBox combine(DoubleMaxBox other) {
return (this.value > other.value) ? this : other;
}
public Double value() {
return this.value;
}
}
public Map<String, Double> getPerPartitionMaxProbability() {
return perPartitionMaxProbability;
}

View File

@ -85,7 +85,6 @@ public class PerPartitionMaxProbabilities extends ToXContentToBytes implements W
this.perPartitionMaxProbabilities = calcMaxNormalizedProbabilityPerPartition(records);
}
@SuppressWarnings("unchecked")
public PerPartitionMaxProbabilities(StreamInput in) throws IOException {
jobId = in.readString();
timestamp = new Date(in.readLong());

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.prelert.rest.data;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.prelert.rest.modelsnapshots;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.prelert.rest.modelsnapshots;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.prelert.rest.modelsnapshots;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -18,9 +17,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.prelert.action.PutModelSnapshotDescriptionAction;
import org.elasticsearch.xpack.prelert.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.prelert.rest.schedulers;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;

View File

@ -9,7 +9,6 @@ import org.elasticsearch.xpack.prelert.action.GetListAction.Response;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.lists.ListDocument;
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
import org.elasticsearch.xpack.prelert.utils.SingleDocument;
import java.util.Collections;

View File

@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.prelert.action;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
@ -15,7 +13,6 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
@ -39,7 +36,6 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;

View File

@ -20,7 +20,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.prelert.integration.ScheduledJobIT.clearPrelertMetadata;

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.prelert.integration;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
@ -26,7 +25,6 @@ import org.junit.After;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
public class TooManyJobsIT extends ESIntegTestCase {

View File

@ -21,7 +21,6 @@ import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
@ -44,14 +43,12 @@ public class JobManagerTests extends ESTestCase {
private ClusterService clusterService;
private JobProvider jobProvider;
private JobDataCountsPersister jobDataCountsPersister;
private Auditor auditor;
@Before
public void setupMocks() {
clusterService = mock(ClusterService.class);
jobProvider = mock(JobProvider.class);
jobDataCountsPersister = mock(JobDataCountsPersister.class);
auditor = mock(Auditor.class);
when(jobProvider.audit(anyString())).thenReturn(auditor);
}

View File

@ -335,7 +335,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
BucketQueryBuilder bq = new BucketQueryBuilder(Long.toString(timestamp));
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
expectThrows(ResourceNotFoundException.class,
() -> provider.bucket(jobId, bq.build()));
}
@ -392,7 +392,7 @@ public class ElasticsearchJobProviderTests extends ESTestCase {
BucketQueryBuilder bq = new BucketQueryBuilder(Long.toString(now.getTime()));
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
expectThrows(ResourceNotFoundException.class,
() -> provider.bucket(jobId, bq.build()));
}

View File

@ -398,7 +398,6 @@ public class MockClientBuilder {
@SuppressWarnings("unchecked")
public MockClientBuilder prepareBulkExecuteListener(BulkResponse response) {
ListenableActionFuture<BulkResponse> actionFuture = mock(ListenableActionFuture.class);
BulkRequestBuilder builder = mock(BulkRequestBuilder.class);
when(client.prepareBulk()).thenReturn(builder);

View File

@ -17,7 +17,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
@ -34,7 +33,6 @@ public class UsagePersisterTests extends ESTestCase {
@SuppressWarnings("rawtypes")
public void testPersistUsageCounts() throws ParseException {
Client client = mock(Client.class);
Logger logger = mock(Logger.class);
final UpdateRequestBuilder updateRequestBuilder = createSelfReturningUpdateRequester();
when(client.prepareUpdate(anyString(), anyString(), anyString())).thenReturn(

View File

@ -35,17 +35,18 @@ public class NativeAutodetectProcessTests extends ESTestCase {
public void testProcessStartTime() throws Exception {
InputStream logStream = Mockito.mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
Mockito.mock(OutputStream.class), Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
NUMBER_ANALYSIS_FIELDS, null, EsExecutors.newDirectExecutorService());
NUMBER_ANALYSIS_FIELDS, null, EsExecutors.newDirectExecutorService())) {
ZonedDateTime startTime = process.getProcessStartTime();
Thread.sleep(500);
ZonedDateTime now = ZonedDateTime.now();
assertTrue(now.isAfter(startTime));
ZonedDateTime startTime = process.getProcessStartTime();
Thread.sleep(500);
ZonedDateTime now = ZonedDateTime.now();
assertTrue(now.isAfter(startTime));
ZonedDateTime startPlus3 = startTime.plus(3, ChronoUnit.SECONDS);
assertTrue(now.isBefore(startPlus3));
ZonedDateTime startPlus3 = startTime.plus(3, ChronoUnit.SECONDS);
assertTrue(now.isBefore(startPlus3));
}
}
public void testWriteRecord() throws IOException {
@ -53,29 +54,30 @@ public class NativeAutodetectProcessTests extends ESTestCase {
when(logStream.read(new byte[1024])).thenReturn(-1);
String[] record = {"r1", "r2", "r3", "r4", "r5"};
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService());
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) {
process.writeRecord(record);
process.flushStream();
process.writeRecord(record);
process.flushStream();
ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
// read header
int numFields = bb.getInt();
Assert.assertEquals(record.length, numFields);
for (int i = 0; i < numFields; i++) {
int recordSize = bb.getInt();
assertEquals(2, recordSize);
// read header
int numFields = bb.getInt();
Assert.assertEquals(record.length, numFields);
for (int i = 0; i < numFields; i++) {
int recordSize = bb.getInt();
assertEquals(2, recordSize);
byte[] charBuff = new byte[recordSize];
for (int j = 0; j < recordSize; j++) {
charBuff[j] = bb.get();
byte[] charBuff = new byte[recordSize];
for (int j = 0; j < recordSize; j++) {
charBuff[j] = bb.get();
}
String value = new String(charBuff, StandardCharsets.UTF_8);
Assert.assertEquals(record[i], value);
}
String value = new String(charBuff, StandardCharsets.UTF_8);
Assert.assertEquals(record[i], value);
}
}
@ -83,45 +85,48 @@ public class NativeAutodetectProcessTests extends ESTestCase {
InputStream logStream = Mockito.mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH + 1024);
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService());
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) {
InterimResultsParams params = InterimResultsParams.builder().build();
process.flushJob(params);
InterimResultsParams params = InterimResultsParams.builder().build();
process.flushJob(params);
ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
assertThat(bb.remaining(), is(greaterThan(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH)));
ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
assertThat(bb.remaining(), is(greaterThan(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH)));
}
}
public void testWriteResetBucketsControlMessage() throws IOException {
InputStream logStream = Mockito.mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService());
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) {
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), true);
process.writeResetBucketsControlMessage(params);
process.flushStream();
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), true);
process.writeResetBucketsControlMessage(params);
process.flushStream();
String message = new String(bos.toByteArray(), StandardCharsets.UTF_8);
assertTrue(message.contains(ControlMsgToProcessWriter.RESET_BUCKETS_MESSAGE_CODE));
String message = new String(bos.toByteArray(), StandardCharsets.UTF_8);
assertTrue(message.contains(ControlMsgToProcessWriter.RESET_BUCKETS_MESSAGE_CODE));
}
}
public void testWriteUpdateConfigMessage() throws IOException {
InputStream logStream = Mockito.mock(InputStream.class);
when(logStream.read(new byte[1024])).thenReturn(-1);
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService());
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService())) {
process.writeUpdateConfigMessage("");
process.flushStream();
process.writeUpdateConfigMessage("");
process.flushStream();
String message = new String(bos.toByteArray(), StandardCharsets.UTF_8);
assertTrue(message.contains(ControlMsgToProcessWriter.UPDATE_MESSAGE_CODE));
String message = new String(bos.toByteArray(), StandardCharsets.UTF_8);
assertTrue(message.contains(ControlMsgToProcessWriter.UPDATE_MESSAGE_CODE));
}
}
}

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

View File

@ -30,7 +30,7 @@ class DummyStatusReporter extends StatusReporter {
* and Mockito.spy() doesn't work due to the lambdas used in {@link StatusReporter}.
* Override the method here an count the calls
*/
@Override
@Override
protected void logStatus(long totalRecords) {
super.logStatus(totalRecords);
++logStatusCallCount;
@ -43,4 +43,9 @@ class DummyStatusReporter extends StatusReporter {
public int getLogStatusCallCount() {
return logStatusCallCount;
}
@Override
public void close() {
// Do nothing
}
}

View File

@ -33,7 +33,6 @@ public class StatusReporterTests extends ESTestCase {
private UsageReporter usageReporter;
private JobDataCountsPersister jobDataCountsPersister;
private StatusReporter statusReporter;
private ThreadPool threadPool;
private Settings settings;
@ -59,10 +58,11 @@ public class StatusReporterTests extends ESTestCase {
}
public void testSettingAcceptablePercentages() throws IOException {
StatusReporter statusReporter =
new StatusReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), usageReporter, jobDataCountsPersister);
assertEquals(statusReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS);
assertEquals(statusReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS);
try (StatusReporter statusReporter = new StatusReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), usageReporter,
jobDataCountsPersister)) {
assertEquals(statusReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS);
assertEquals(statusReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS);
}
}
public void testSimpleConstructor() throws Exception {
@ -75,12 +75,10 @@ public class StatusReporterTests extends ESTestCase {
}
public void testComplexConstructor() throws Exception {
Environment env = new Environment(
Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build());
DataCounts counts = new DataCounts("foo", 1L, 1L, 2L, 0L, 3L, 4L, 5L, new Date(), new Date());
try (StatusReporter statusReporter =
new StatusReporter(threadPool, settings, JOB_ID, counts, usageReporter, jobDataCountsPersister)) {
new StatusReporter(threadPool, settings, JOB_ID, counts, usageReporter, jobDataCountsPersister)) {
DataCounts stats = statusReporter.incrementalStats();
assertNotNull(stats);
assertAllCountFieldsEqualZero(stats);
@ -156,91 +154,95 @@ public class StatusReporterTests extends ESTestCase {
}
public void testReportRecordsWritten_Given100Records() {
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
statusReporter.setAnalysedFieldsPerRecord(3);
try (DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter)) {
statusReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 101; i++) {
statusReporter.reportRecordWritten(5, i);
for (int i = 1; i <= 101; i++) {
statusReporter.reportRecordWritten(5, i);
}
assertEquals(101, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(505, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(101, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(303, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(101, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(1, statusReporter.getLogStatusCallCount());
}
assertEquals(101, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(505, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(101, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(303, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(101, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(1, statusReporter.getLogStatusCallCount());
}
public void testReportRecordsWritten_Given1000Records() {
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
try (DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter)) {
statusReporter.setAnalysedFieldsPerRecord(3);
statusReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 1001; i++) {
statusReporter.reportRecordWritten(5, i);
for (int i = 1; i <= 1001; i++) {
statusReporter.reportRecordWritten(5, i);
}
assertEquals(1001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(5005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(1001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(3003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(1001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(10, statusReporter.getLogStatusCallCount());
}
assertEquals(1001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(5005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(1001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(3003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(1001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(10, statusReporter.getLogStatusCallCount());
}
public void testReportRecordsWritten_Given2000Records() {
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
statusReporter.setAnalysedFieldsPerRecord(3);
try (DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter)) {
statusReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 2001; i++) {
statusReporter.reportRecordWritten(5, i);
for (int i = 1; i <= 2001; i++) {
statusReporter.reportRecordWritten(5, i);
}
assertEquals(2001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(10005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(2001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(6003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(2001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(11, statusReporter.getLogStatusCallCount());
}
assertEquals(2001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(10005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(2001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(6003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(2001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(11, statusReporter.getLogStatusCallCount());
}
public void testReportRecordsWritten_Given20000Records() {
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
statusReporter.setAnalysedFieldsPerRecord(3);
try (DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter)) {
statusReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 20001; i++) {
statusReporter.reportRecordWritten(5, i);
for (int i = 1; i <= 20001; i++) {
statusReporter.reportRecordWritten(5, i);
}
assertEquals(20001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(100005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(20001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(60003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(20001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(29, statusReporter.getLogStatusCallCount());
}
assertEquals(20001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(100005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(20001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(60003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(20001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(29, statusReporter.getLogStatusCallCount());
}
public void testReportRecordsWritten_Given30000Records() {
DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter);
statusReporter.setAnalysedFieldsPerRecord(3);
try (DummyStatusReporter statusReporter = new DummyStatusReporter(usageReporter)) {
statusReporter.setAnalysedFieldsPerRecord(3);
for (int i = 1; i <= 30001; i++) {
statusReporter.reportRecordWritten(5, i);
for (int i = 1; i <= 30001; i++) {
statusReporter.reportRecordWritten(5, i);
}
assertEquals(30001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(150005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(30001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(90003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(30001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(30, statusReporter.getLogStatusCallCount());
}
assertEquals(30001, statusReporter.incrementalStats().getInputRecordCount());
assertEquals(150005, statusReporter.incrementalStats().getInputFieldCount());
assertEquals(30001, statusReporter.incrementalStats().getProcessedRecordCount());
assertEquals(90003, statusReporter.incrementalStats().getProcessedFieldCount());
assertEquals(30001, statusReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
assertEquals(30, statusReporter.getLogStatusCallCount());
}
public void testFinishReporting() {