diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataStreamer.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataStreamer.java deleted file mode 100644 index c44a8f4a6cc..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataStreamer.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.job.data; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.xpack.prelert.job.DataCounts; -import org.elasticsearch.xpack.prelert.job.messages.Messages; -import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Objects; -import java.util.zip.GZIPInputStream; -import java.util.zip.ZipException; - -public class DataStreamer { - private static final Logger LOGGER = Loggers.getLogger(DataStreamer.class); - - private final DataProcessor dataProccesor; - - public DataStreamer(DataProcessor dataProcessor) { - dataProccesor = Objects.requireNonNull(dataProcessor); - } - - /** - * Stream the data to the native process. - * - * @return Count of records, fields, bytes, etc written - */ - public DataCounts streamData(String contentEncoding, String jobId, InputStream input, DataLoadParams params) throws IOException { - LOGGER.trace("Handle Post data to job {} ", jobId); - - input = tryDecompressingInputStream(contentEncoding, jobId, input); - DataCounts stats = handleStream(jobId, input, params); - - LOGGER.debug("Data uploaded to job {}", jobId); - - return stats; - } - - private InputStream tryDecompressingInputStream(String contentEncoding, String jobId, InputStream input) throws IOException { - if ("gzip".equals(contentEncoding)) { - LOGGER.debug("Decompressing post data in job {}", jobId); - try { - return new GZIPInputStream(input); - } catch (ZipException ze) { - LOGGER.error("Failed to decompress data file", ze); - throw new IllegalArgumentException(Messages.getMessage(Messages.REST_GZIP_ERROR), ze); - } - } - return input; - } - - /** - * Pass the data stream to the native process. - * - * @return Count of records, fields, bytes, etc written - */ - private DataCounts handleStream(String jobId, InputStream input, DataLoadParams params) { - return dataProccesor.processData(jobId, input, params); - } -} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataStreamerThread.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataStreamerThread.java deleted file mode 100644 index 3c8c4414f0e..00000000000 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/data/DataStreamerThread.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.job.data; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.xpack.prelert.job.DataCounts; -import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Optional; - -// NORELEASE - Use ES ThreadPool -public final class DataStreamerThread extends Thread { - private static final Logger LOGGER = Loggers.getLogger(DataStreamerThread.class); - - private DataCounts stats; - private final String jobId; - private final String contentEncoding; - private final DataLoadParams params; - private final InputStream input; - private final DataStreamer dataStreamer; - private ElasticsearchException jobException; - private IOException iOException; - - public DataStreamerThread(DataStreamer dataStreamer, String jobId, String contentEncoding, - DataLoadParams params, InputStream input) { - super("DataStreamer-" + jobId); - - this.dataStreamer = dataStreamer; - this.jobId = jobId; - this.contentEncoding = contentEncoding; - this.params = params; - this.input = input; - } - - @Override - public void run() { - try { - stats = dataStreamer.streamData(contentEncoding, jobId, input, params); - } catch (ElasticsearchException e) { - jobException = e; - } catch (IOException e) { - iOException = e; - } finally { - try { - input.close(); - } catch (IOException e) { - LOGGER.warn("Exception closing the data input stream", e); - } - } - } - - /** - * This method should only be called after the thread - * has joined other wise the result could be null - * (best case) or undefined. - */ - public DataCounts getDataCounts() { - return stats; - } - - /** - * If a Job exception was thrown during the run of this thread it - * is accessed here. Only call this method after the thread has joined. - */ - public Optional getJobException() { - return Optional.ofNullable(jobException); - } - - /** - * If an IOException was thrown during the run of this thread it - * is accessed here. Only call this method after the thread has joined. - */ - public Optional getIOException() { - return Optional.ofNullable(iOException); - } - - public String getJobId() { - return jobId; - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/data/DataStreamerTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/data/DataStreamerTests.java deleted file mode 100644 index f0277eff3f3..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/data/DataStreamerTests.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.job.data; - -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.prelert.job.DataCounts; -import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; - -import java.io.IOException; -import java.io.InputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.nio.charset.StandardCharsets; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class DataStreamerTests extends ESTestCase { - - public void testConstructor_GivenNullDataProcessor() { - - ESTestCase.expectThrows(NullPointerException.class, () -> new DataStreamer(null)); - } - - public void testStreamData_GivenNoContentEncodingAndNoPersistBaseDir() throws IOException { - - DataProcessor dataProcessor = mock(DataProcessor.class); - DataStreamer dataStreamer = new DataStreamer(dataProcessor); - InputStream inputStream = mock(InputStream.class); - DataLoadParams params = mock(DataLoadParams.class); - - when(dataProcessor.processData("foo", inputStream, params)).thenReturn(new DataCounts("foo")); - - dataStreamer.streamData("", "foo", inputStream, params); - - verify(dataProcessor).processData("foo", inputStream, params); - Mockito.verifyNoMoreInteractions(dataProcessor); - } - - public void testStreamData_ExpectsGzipButNotCompressed() throws IOException { - DataProcessor dataProcessor = mock(DataProcessor.class); - DataStreamer dataStreamer = new DataStreamer(dataProcessor); - InputStream inputStream = mock(InputStream.class); - DataLoadParams params = mock(DataLoadParams.class); - - try { - dataStreamer.streamData("gzip", "foo", inputStream, params); - fail("content encoding : gzip with uncompressed data should throw"); - } catch (IllegalArgumentException e) { - assertEquals("Content-Encoding = gzip but the data is not in gzip format", e.getMessage()); - } - } - - public void testStreamData_ExpectsGzipUsesGZipStream() throws IOException { - PipedInputStream pipedIn = new PipedInputStream(); - PipedOutputStream pipedOut = new PipedOutputStream(pipedIn); - try (GZIPOutputStream gzip = new GZIPOutputStream(pipedOut)) { - gzip.write("Hello World compressed".getBytes(StandardCharsets.UTF_8)); - - DataProcessor dataProcessor = mock(DataProcessor.class); - DataStreamer dataStreamer = new DataStreamer(dataProcessor); - DataLoadParams params = mock(DataLoadParams.class); - - when(dataProcessor.processData(Mockito.anyString(), - Mockito.any(InputStream.class), - Mockito.any(DataLoadParams.class))) - .thenReturn(new DataCounts("foo")); - - dataStreamer.streamData("gzip", "foo", pipedIn, params); - - // submitDataLoadJob should be called with a GZIPInputStream - ArgumentCaptor streamArg = ArgumentCaptor.forClass(InputStream.class); - - verify(dataProcessor).processData(Mockito.anyString(), - streamArg.capture(), - Mockito.any(DataLoadParams.class)); - - assertTrue(streamArg.getValue() instanceof GZIPInputStream); - } - } -} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/data/DataStreamerThreadTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/data/DataStreamerThreadTests.java deleted file mode 100644 index f231f0e1d78..00000000000 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/data/DataStreamerThreadTests.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.prelert.job.data; - -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.prelert.job.DataCounts; -import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams; -import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper; -import org.junit.After; -import org.junit.Before; -import org.mockito.Mockito; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Date; - -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class DataStreamerThreadTests extends ESTestCase { - private static final String JOB_ID = "foo"; - private static final String CONTENT_ENCODING = "application/json"; - - private DataStreamer dataStreamer; - private DataLoadParams params; - private InputStream inputStream; - - private DataStreamerThread dataStreamerThread; - - @Before - public void setUpMocks() { - dataStreamer = Mockito.mock(DataStreamer.class); - params = Mockito.mock(DataLoadParams.class); - inputStream = Mockito.mock(InputStream.class); - dataStreamerThread = new DataStreamerThread(dataStreamer, JOB_ID, CONTENT_ENCODING, params, inputStream); - } - - @After - public void verifyInputStreamClosed() throws IOException { - verify(inputStream).close(); - } - - public void testRun() throws Exception { - DataCounts counts = new DataCounts("foo", 42L, 0L, 0L, 0L, 0L, 0L, 0L, new Date(), new Date()); - when(dataStreamer.streamData(CONTENT_ENCODING, JOB_ID, inputStream, params)).thenReturn(counts); - - dataStreamerThread.run(); - - assertEquals(JOB_ID, dataStreamerThread.getJobId()); - assertEquals(counts, dataStreamerThread.getDataCounts()); - assertFalse(dataStreamerThread.getIOException().isPresent()); - assertFalse(dataStreamerThread.getJobException().isPresent()); - } - - public void testRun_GivenIOException() throws Exception { - when(dataStreamer.streamData(CONTENT_ENCODING, JOB_ID, inputStream, params)).thenThrow(new IOException("prelert")); - - dataStreamerThread.run(); - - assertEquals(JOB_ID, dataStreamerThread.getJobId()); - assertNull(dataStreamerThread.getDataCounts()); - assertEquals("prelert", dataStreamerThread.getIOException().get().getMessage()); - assertFalse(dataStreamerThread.getJobException().isPresent()); - } - - public void testRun_GivenJobException() throws Exception { - when(dataStreamer.streamData(CONTENT_ENCODING, JOB_ID, inputStream, params)).thenThrow(ExceptionsHelper.serverError("job failed")); - - dataStreamerThread.run(); - - assertEquals(JOB_ID, dataStreamerThread.getJobId()); - assertNull(dataStreamerThread.getDataCounts()); - assertFalse(dataStreamerThread.getIOException().isPresent()); - assertEquals("job failed", dataStreamerThread.getJobException().get().getMessage()); - } -}