NIFI-7714: QueryCassandra loses precision when converting timestamps to JSON

Updated the patch based on @tpalfy's review
Updated the patch based on @mattyb149's review
Rename DATE_FORMAT_PATTERN to JSON_TIMESTAMP_FORMAT_PATTERN
Changed convertToJsonStream method's visibility to package private.
Removed json prefix from timestamp-format-pattern property to make it more generic

This closes #4463.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Denes Arvay 2020-08-07 14:27:40 +02:00 committed by Peter Turcsanyi
parent bdfe1f2370
commit ca43615702
3 changed files with 129 additions and 9 deletions

View File

@ -25,6 +25,7 @@ import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
@ -39,6 +40,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -68,6 +70,7 @@ import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
@ -130,6 +133,24 @@ public class QueryCassandra extends AbstractCassandraProcessor {
.defaultValue(AVRO_FORMAT)
.build();
public static final PropertyDescriptor TIMESTAMP_FORMAT_PATTERN = new PropertyDescriptor.Builder()
.name("timestamp-format-pattern")
.displayName("Timestamp Format Pattern for JSON output")
.description("Pattern to use when converting timestamp fields to JSON. Note: the formatted timestamp will be in UTC timezone.")
.required(true)
.defaultValue("yyyy-MM-dd HH:mm:ssZ")
.addValidator((subject, input, context) -> {
final ValidationResult.Builder vrb = new ValidationResult.Builder().subject(subject).input(input);
try {
new SimpleDateFormat(input).format(new Date());
vrb.valid(true).explanation("Valid date format pattern");
} catch (Exception ex) {
vrb.valid(false).explanation("the pattern is invalid: " + ex.getMessage());
}
return vrb.build();
})
.build();
private final static List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
@ -145,6 +166,7 @@ public class QueryCassandra extends AbstractCassandraProcessor {
_propertyDescriptors.add(QUERY_TIMEOUT);
_propertyDescriptors.add(FETCH_SIZE);
_propertyDescriptors.add(OUTPUT_FORMAT);
_propertyDescriptors.add(TIMESTAMP_FORMAT_PATTERN);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
@ -220,14 +242,14 @@ public class QueryCassandra extends AbstractCassandraProcessor {
if (AVRO_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS));
} else if (JSON_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS));
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS));
}
} else {
resultSet = queryFuture.getUninterruptibly();
if (AVRO_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToAvroStream(resultSet, out, 0, null));
} else if (JSON_FORMAT.equals(outputFormat)) {
nrOfRows.set(convertToJsonStream(resultSet, out, charset, 0, null));
nrOfRows.set(convertToJsonStream(Optional.of(context), resultSet, out, charset, 0, null));
}
}
@ -381,6 +403,13 @@ public class QueryCassandra extends AbstractCassandraProcessor {
*/
public static long convertToJsonStream(final ResultSet rs, final OutputStream outStream,
Charset charset, long timeout, TimeUnit timeUnit)
throws IOException, InterruptedException, TimeoutException, ExecutionException {
return convertToJsonStream(Optional.empty(), rs, outStream, charset, timeout, timeUnit);
}
@VisibleForTesting
static long convertToJsonStream(final Optional<ProcessContext> context, final ResultSet rs, final OutputStream outStream,
Charset charset, long timeout, TimeUnit timeUnit)
throws IOException, InterruptedException, TimeoutException, ExecutionException {
try {
@ -425,7 +454,7 @@ public class QueryCassandra extends AbstractCassandraProcessor {
if (!first) {
sb.append(",");
}
sb.append(getJsonElement(element));
sb.append(getJsonElement(context, element));
first = false;
}
sb.append("]");
@ -441,15 +470,15 @@ public class QueryCassandra extends AbstractCassandraProcessor {
if (!first) {
sb.append(",");
}
sb.append(getJsonElement(mapKey));
sb.append(getJsonElement(context, mapKey));
sb.append(":");
sb.append(getJsonElement(mapValue));
sb.append(getJsonElement(context, mapValue));
first = false;
}
sb.append("}");
valueString = sb.toString();
} else {
valueString = getJsonElement(value);
valueString = getJsonElement(context, value);
}
outStream.write(("\"" + colName + "\":"
+ valueString + "").getBytes(charset));
@ -467,12 +496,14 @@ public class QueryCassandra extends AbstractCassandraProcessor {
}
protected static String getJsonElement(Object value) {
return getJsonElement(Optional.empty(), value);
}
protected static String getJsonElement(final Optional<ProcessContext> context, Object value) {
if (value instanceof Number) {
return value.toString();
} else if (value instanceof Date) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssZ");
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
return "\"" + dateFormat.format((Date) value) + "\"";
return "\"" + getFormattedDate(context, (Date) value) + "\"";
} else if (value instanceof String) {
return "\"" + StringEscapeUtils.escapeJson((String) value) + "\"";
} else {
@ -480,6 +511,15 @@ public class QueryCassandra extends AbstractCassandraProcessor {
}
}
private static String getFormattedDate(final Optional<ProcessContext> context, Date value) {
final String dateFormatPattern = context
.map(_context -> _context.getProperty(TIMESTAMP_FORMAT_PATTERN).getValue())
.orElse(TIMESTAMP_FORMAT_PATTERN.getDefaultValue());
SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
return dateFormat.format(value);
}
/**
* Creates an Avro schema from the given result set. The metadata (column definitions, data types, etc.) is used
* to determine a schema for Avro.

View File

@ -27,8 +27,10 @@ import org.mockito.stubbing.Answer;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -45,6 +47,15 @@ import static org.mockito.Mockito.when;
* Utility methods for Cassandra processors' unit tests
*/
public class CassandraQueryTestUtil {
static final Date TEST_DATE;
static {
Calendar c = GregorianCalendar.getInstance(TimeZone.getTimeZone("PST"));
c.set(2020, Calendar.JANUARY, 1, 10, 10, 10);
c.set(Calendar.MILLISECOND, 10);
TEST_DATE = c.getTime();
}
public static ResultSet createMockResultSet() throws Exception {
ResultSet resultSet = mock(ResultSet.class);
ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class);
@ -140,6 +151,27 @@ public class CassandraQueryTestUtil {
return resultSet;
}
public static ResultSet createMockDateResultSet() throws Exception {
ResultSet resultSet = mock(ResultSet.class);
ColumnDefinitions columnDefinitions = mock(ColumnDefinitions.class);
when(columnDefinitions.size()).thenReturn(1);
when(columnDefinitions.getName(anyInt())).thenReturn("date");
when(columnDefinitions.getTable(0)).thenReturn("users");
when(columnDefinitions.getType(anyInt())).thenReturn(DataType.timestamp());
Row row = mock(Row.class);
when(row.getTimestamp(0)).thenReturn(TEST_DATE);
List<Row> rows = Collections.singletonList(row);
when(resultSet.iterator()).thenReturn(rows.iterator());
when(resultSet.all()).thenReturn(rows);
when(resultSet.getAvailableWithoutFetching()).thenReturn(rows.size());
when(resultSet.isFullyFetched()).thenReturn(false).thenReturn(true);
when(resultSet.getColumnDefinitions()).thenReturn(columnDefinitions);
return resultSet;
}
public static Row createRow(String user_id, String first_name, String last_name, Set<String> emails,
List<String> top_places, Map<Date, String> todo, boolean registered,
float scale, double metric) {

View File

@ -39,16 +39,23 @@ import com.datastax.driver.core.exceptions.ReadTimeoutException;
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import org.apache.avro.Schema;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Before;
import org.junit.Test;
@ -75,6 +82,11 @@ public class QueryCassandraTest {
testRunner.assertNotValid();
testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "username");
testRunner.assertValid();
testRunner.setProperty(QueryCassandra.TIMESTAMP_FORMAT_PATTERN, "invalid format");
testRunner.assertNotValid();
testRunner.setProperty(QueryCassandra.TIMESTAMP_FORMAT_PATTERN, "yyyy-MM-dd HH:mm:ss.SSSZ");
testRunner.assertValid();
}
@Test
@ -368,6 +380,42 @@ public class QueryCassandraTest {
assertEquals(2, numberOfRows);
}
@Test
public void testDefaultDateFormatInConvertToJSONStream() throws Exception {
ResultSet rs = CassandraQueryTestUtil.createMockDateResultSet();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DateFormat df = new SimpleDateFormat(QueryCassandra.TIMESTAMP_FORMAT_PATTERN.getDefaultValue());
df.setTimeZone(TimeZone.getTimeZone("UTC"));
long numberOfRows = QueryCassandra.convertToJsonStream(Optional.of(testRunner.getProcessContext()), rs, baos,
StandardCharsets.UTF_8, 0, null);
assertEquals(1, numberOfRows);
Map<String, List<Map<String, String>>> map = new ObjectMapper().readValue(baos.toByteArray(), HashMap.class);
String date = map.get("results").get(0).get("date");
assertEquals(df.format(CassandraQueryTestUtil.TEST_DATE), date);
}
@Test
public void testCustomDateFormatInConvertToJSONStream() throws Exception {
MockProcessContext context = (MockProcessContext) testRunner.getProcessContext();
ResultSet rs = CassandraQueryTestUtil.createMockDateResultSet();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
final String customDateFormat = "yyyy-MM-dd HH:mm:ss.SSSZ";
context.setProperty(QueryCassandra.TIMESTAMP_FORMAT_PATTERN, customDateFormat);
DateFormat df = new SimpleDateFormat(customDateFormat);
df.setTimeZone(TimeZone.getTimeZone("UTC"));
long numberOfRows = QueryCassandra.convertToJsonStream(Optional.of(context), rs, baos, StandardCharsets.UTF_8, 0, null);
assertEquals(1, numberOfRows);
Map<String, List<Map<String, String>>> map = new ObjectMapper().readValue(baos.toByteArray(), HashMap.class);
String date = map.get("results").get(0).get("date");
assertEquals(df.format(CassandraQueryTestUtil.TEST_DATE), date);
}
private void setUpStandardProcessorConfig() {
testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE");
testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "localhost:9042");