From f1043d20bc10d3fd44a33ad89dac556d30095a20 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 24 Jun 2024 23:20:01 -0700 Subject: [PATCH] Support csv input format in Kafka ingestion with header (#16630) * Support ListBasedInputRow in Kafka ingestion with header * Fix up buildBlendedEventMap * Add new test for KafkaInputFormat with csv value and headers * Do not use forbidden APIs * Move utility method to TestUtils --- .../input/kafkainput/KafkaInputReader.java | 58 ++- .../kafkainput/KafkaInputFormatTest.java | 350 ++++++++---------- .../druid/indexing/common/TestUtils.java | 18 + 3 files changed, 197 insertions(+), 229 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index 31b7cf66be1..9d356d4a2e9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -31,7 +31,6 @@ import org.apache.druid.indexing.seekablestream.SettableByteEntity; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.druid.java.util.common.parsers.ParseException; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -171,21 +170,8 @@ public class KafkaInputReader implements InputEntityReader { return valueParser.read().map( r -> { - final MapBasedInputRow valueRow; - try { - // Return type for the value parser should be of type MapBasedInputRow - // Parsers returning other types are not compatible currently. - valueRow = (MapBasedInputRow) r; - } - catch (ClassCastException e) { - throw new ParseException( - null, - "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows" - ); - } - - final Map event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList); - final HashSet newDimensions = new HashSet<>(valueRow.getDimensions()); + final HashSet newDimensions = new HashSet<>(r.getDimensions()); + final Map event = buildBlendedEventMap(r::getRaw, newDimensions, headerKeyList); newDimensions.addAll(headerKeyList.keySet()); // Remove the dummy timestamp added in KafkaInputFormat newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); @@ -244,25 +230,18 @@ public class KafkaInputReader implements InputEntityReader } List newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size()); List> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size()); - ParseException parseException = null; for (Map raw : rowAndValues.getRawValuesList()) { - newRawRows.add(buildBlendedEventMap(raw, headerKeyList)); + newRawRows.add(buildBlendedEventMap(raw::get, raw.keySet(), headerKeyList)); } for (InputRow r : rowAndValues.getInputRows()) { - MapBasedInputRow valueRow = null; - try { - valueRow = (MapBasedInputRow) r; - } - catch (ClassCastException e) { - parseException = new ParseException( - null, - "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows" + if (r != null) { + final HashSet newDimensions = new HashSet<>(r.getDimensions()); + final Map event = buildBlendedEventMap( + r::getRaw, + newDimensions, + headerKeyList ); - } - if (valueRow != null) { - final Map event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList); - final HashSet newDimensions = new HashSet<>(valueRow.getDimensions()); newDimensions.addAll(headerKeyList.keySet()); // Remove the dummy timestamp added in KafkaInputFormat newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); @@ -279,7 +258,7 @@ public class KafkaInputReader implements InputEntityReader ); } } - return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, parseException); + return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, null); } ); } @@ -302,22 +281,31 @@ public class KafkaInputReader implements InputEntityReader /** * Builds a map that blends two {@link Map}, presenting the combined keyset of both maps, and preferring to read * from the first map and falling back to the second map if the value is not present. - * + *

* This strategy is used rather than just copying the values of the keyset into a new map so that any 'flattening' * machinery (such as {@link Map} created by {@link org.apache.druid.java.util.common.parsers.ObjectFlatteners}) is * still in place to be lazily evaluated instead of eagerly copying. */ - private static Map buildBlendedEventMap(Map map, Map fallback) + private static Map buildBlendedEventMap( + Function getRowValue, + Set rowDimensions, + Map fallback + ) { final Set keySet = new HashSet<>(fallback.keySet()); - keySet.addAll(map.keySet()); + keySet.addAll(rowDimensions); return new AbstractMap() { @Override public Object get(Object key) { - return map.getOrDefault((String) key, fallback.get(key)); + final String skey = (String) key; + final Object val = getRowValue.apply(skey); + if (val == null) { + return fallback.get(skey); + } + return val; } @Override diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index 858cd79fbd7..adbb7c4b677 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -24,14 +24,17 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.kafka.KafkaRecordEntity; +import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.seekablestream.SettableByteEntity; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; @@ -57,9 +60,29 @@ import java.util.Optional; public class KafkaInputFormatTest { - private KafkaRecordEntity inputEntity; - private final long timestamp = DateTimes.of("2021-06-24").getMillis(); + static { + NullHandling.initializeForTests(); + } + + private static final long TIMESTAMP_MILLIS = DateTimes.of("2021-06-24").getMillis(); private static final String TOPIC = "sample"; + private static final byte[] SIMPLE_JSON_KEY_BYTES = StringUtils.toUtf8( + TestUtils.singleQuoteToStandardJson( + "{'key': 'sampleKey'}" + ) + ); + private static final byte[] SIMPLE_JSON_VALUE_BYTES = StringUtils.toUtf8( + TestUtils.singleQuoteToStandardJson( + "{" + + " 'timestamp': '2021-06-25'," + + " 'bar': null," + + " 'foo': 'x'," + + " 'baz': 4," + + " 'o': {'mg': 1}" + + "}" + ) + ); + private static final Iterable

SAMPLE_HEADERS = ImmutableList.of( new Header() { @@ -177,26 +200,9 @@ public class KafkaInputFormatTest @Test public void testWithHeaderKeyAndValue() throws IOException { - final byte[] key = StringUtils.toUtf8( - "{\n" - + " \"key\": \"sampleKey\"\n" - + "}" - ); - - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-25\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(key, payload, headers); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -248,21 +254,7 @@ public class KafkaInputFormatTest Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); - // Header verification - Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); - Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); - Assert.assertEquals( - String.valueOf(DateTimes.of("2021-06-24").getMillis()), - Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) - ); - Assert.assertEquals( - TOPIC, - Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) - ); - Assert.assertEquals( - "2021-06-25", - Iterables.getOnlyElement(row.getDimension("timestamp")) - ); + verifyHeader(row); // Key verification Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); @@ -282,20 +274,8 @@ public class KafkaInputFormatTest //Headers cannot be null, so testing only no key use case! public void testWithOutKey() throws IOException { - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-24\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(null, payload, headers); + KafkaRecordEntity inputEntity = makeInputEntity(null, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -338,7 +318,7 @@ public class KafkaInputFormatTest @Test public void testTimestampFromHeader() throws IOException { - Iterable
sample_header_with_ts = Iterables.unmodifiableIterable( + final Iterable
sampleHeaderWithTs = Iterables.unmodifiableIterable( Iterables.concat( SAMPLE_HEADERS, ImmutableList.of( @@ -359,26 +339,9 @@ public class KafkaInputFormatTest ) ) ); - final byte[] key = StringUtils.toUtf8( - "{\n" - + " \"key\": \"sampleKey\"\n" - + "}" - ); - - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-24\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - - Headers headers = new RecordHeaders(sample_header_with_ts); - inputEntity = makeInputEntity(key, payload, headers); + Headers headers = new RecordHeaders(sampleHeaderWithTs); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -417,21 +380,7 @@ public class KafkaInputFormatTest Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); - // Header verification - Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); - Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); - Assert.assertEquals( - String.valueOf(DateTimes.of("2021-06-24").getMillis()), - Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) - ); - Assert.assertEquals( - "2021-06-24", - Iterables.getOnlyElement(row.getDimension("kafka.newheader.headerTs")) - ); - Assert.assertEquals( - "2021-06-24", - Iterables.getOnlyElement(row.getDimension("timestamp")) - ); + verifyHeader(row); // Key verification Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); @@ -450,20 +399,9 @@ public class KafkaInputFormatTest @Test public void testWithOutKeyAndHeaderSpecs() throws IOException { - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-24\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(null, payload, headers); + KafkaRecordEntity inputEntity = + makeInputEntity(null, SIMPLE_JSON_VALUE_BYTES, headers); KafkaInputFormat localFormat = new KafkaInputFormat( null, @@ -590,7 +528,7 @@ public class KafkaInputFormatTest for (int i = 0; i < keys.length; i++) { headers = headers.add(new RecordHeader("indexH", String.valueOf(i).getBytes(StandardCharsets.UTF_8))); - inputEntity = makeInputEntity(keys[i], values[i], headers); + KafkaRecordEntity inputEntity = makeInputEntity(keys[i], values[i], headers); settableByteEntity.setEntity(inputEntity); final int numExpectedIterations = 1; @@ -612,7 +550,6 @@ public class KafkaInputFormatTest Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index"))); - // Header verification Assert.assertEquals( "application/json", @@ -652,26 +589,9 @@ public class KafkaInputFormatTest @Test public void testMissingTimestampThrowsException() throws IOException { - final byte[] key = StringUtils.toUtf8( - "{\n" - + " \"key\": \"sampleKey\"\n" - + "}" - ); - - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-25\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(key, payload, headers); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -696,11 +616,9 @@ public class KafkaInputFormatTest try (CloseableIterator iterator = reader.read()) { while (iterator.hasNext()) { - Throwable t = Assert.assertThrows(ParseException.class, () -> iterator.next()); - Assert.assertEquals( - "Timestamp[null] is unparseable! Event: {kafka.newtopic.topic=sample, foo=x, kafka.newts" - + ".timestamp=1624492800000, kafka.newkey.key=sampleKey...", - t.getMessage() + Throwable t = Assert.assertThrows(ParseException.class, iterator::next); + Assert.assertTrue( + t.getMessage().startsWith("Timestamp[null] is unparseable! Event: {") ); } } @@ -709,27 +627,9 @@ public class KafkaInputFormatTest @Test public void testWithSchemaDiscovery() throws IOException { - // testWithHeaderKeyAndValue + schemaless - final byte[] key = StringUtils.toUtf8( - "{\n" - + " \"key\": \"sampleKey\"\n" - + "}" - ); - - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-25\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(key, payload, headers); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -777,21 +677,7 @@ public class KafkaInputFormatTest Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); - // Header verification - Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); - Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); - Assert.assertEquals( - String.valueOf(DateTimes.of("2021-06-24").getMillis()), - Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) - ); - Assert.assertEquals( - TOPIC, - Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) - ); - Assert.assertEquals( - "2021-06-25", - Iterables.getOnlyElement(row.getDimension("timestamp")) - ); + verifyHeader(row); // Key verification Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); @@ -808,29 +694,102 @@ public class KafkaInputFormatTest } @Test - public void testWithPartialDeclarationSchemaDiscovery() throws IOException + public void testValueInCsvFormat() throws IOException { - // testWithHeaderKeyAndValue + partial-schema + schema discovery - final byte[] key = StringUtils.toUtf8( - "{\n" - + " \"key\": \"sampleKey\"\n" - + "}" - ); - - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-25\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" + format = new KafkaInputFormat( + new KafkaStringHeaderFormat(null), + // Key Format + new JsonInputFormat( + new JSONPathSpec(true, ImmutableList.of()), + null, + null, + false, + false + ), + // Value Format + new CsvInputFormat( + Arrays.asList("foo", "bar", "timestamp", "baz"), + null, + false, + false, + 0 + ), + "kafka.newheader.", + "kafka.newkey.key", + "kafka.newts.timestamp", + "kafka.newtopic.topic" ); Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(key, payload, headers); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, StringUtils.toUtf8("x,,2021-06-25,4"), headers); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kafka.newheader.encoding", + "kafka.newheader.kafkapkc", + "kafka.newts.timestamp", + "kafka.newtopic.topic" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + Assert.assertEquals( + Arrays.asList( + "bar", + "foo", + "kafka.newheader.encoding", + "kafka.newheader.kafkapkc", + "kafka.newts.timestamp", + "kafka.newtopic.topic" + ), + row.getDimensions() + ); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals(DateTimes.of("2021-06-25"), row.getTimestamp()); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertTrue(row.getDimension("bar").isEmpty()); + + verifyHeader(row); + + // Key verification + Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testWithPartialDeclarationSchemaDiscovery() throws IOException + { + // testWithHeaderKeyAndValue + partial-schema + schema discovery + Headers headers = new RecordHeaders(SAMPLE_HEADERS); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -881,21 +840,7 @@ public class KafkaInputFormatTest Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); - // Header verification - Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); - Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); - Assert.assertEquals( - String.valueOf(DateTimes.of("2021-06-24").getMillis()), - Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) - ); - Assert.assertEquals( - TOPIC, - Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) - ); - Assert.assertEquals( - "2021-06-25", - Iterables.getOnlyElement(row.getDimension("timestamp")) - ); + verifyHeader(row); // Key verification Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); @@ -918,7 +863,7 @@ public class KafkaInputFormatTest TOPIC, 0, 0, - timestamp, + TIMESTAMP_MILLIS, null, 0, 0, @@ -930,6 +875,23 @@ public class KafkaInputFormatTest ); } + private void verifyHeader(InputRow row) + { + Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); + Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); + Assert.assertEquals( + String.valueOf(DateTimes.of("2021-06-24").getMillis()), + Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) + ); + Assert.assertEquals( + TOPIC, + Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) + ); + Assert.assertEquals( + "2021-06-25", + Iterables.getOnlyElement(row.getDimension("timestamp")) + ); + } private SettableByteEntity newSettableByteEntity(KafkaRecordEntity kafkaRecordEntity) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java index bac4261e0fa..7dda3b8ff62 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java @@ -36,6 +36,7 @@ import org.apache.druid.indexing.common.task.TestAppenderatorsManager; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; @@ -174,4 +175,21 @@ public class TestUtils } return true; } + + /** + * Converts the given JSON string which uses single quotes for field names and + * String values to a standard JSON by replacing all occurrences of a single + * quote with double quotes. + *

+ * Single-quoted JSON is typically easier to read as can be seen below: + *

+   * final String singleQuotedJson = "{'f1':'value', 'f2':5}";
+   *
+   * final String doubleQuotedJson = "{\"f1\":\"value\", \"f2\":5}";
+   * 
+ */ + public static String singleQuoteToStandardJson(String singleQuotedJson) + { + return StringUtils.replaceChar(singleQuotedJson, '\'', "\""); + } }