diff --git a/src/main/java/io/cdap/plugin/mixpanel/source/batch/MixPanelSchemaHelper.java b/src/main/java/io/cdap/plugin/mixpanel/source/batch/MixPanelSchemaHelper.java index 63b2550..dbf6800 100644 --- a/src/main/java/io/cdap/plugin/mixpanel/source/batch/MixPanelSchemaHelper.java +++ b/src/main/java/io/cdap/plugin/mixpanel/source/batch/MixPanelSchemaHelper.java @@ -40,7 +40,7 @@ public static class MixPanelEvent { } private static final Gson GSON = new GsonBuilder().create(); - private static final String EVENT_NAME_FIELD = "event_name"; + private static final String EVENT_NAME_FIELD = "event"; private static final String EVENT_NAME_FIELD_DESC = "$event_name"; private static final Schema MIX_PANEL_RECORD_SCHEMA = Schema.recordOf( "mixPanelRecord", Schema.Field.of("raw_event", Schema.of(Schema.Type.STRING))); @@ -71,7 +71,7 @@ public static Schema getSchemaFromConfig(MixPanelBatchSourceConfig config) { .collect(Collectors.toSet()); // make sure default fields available - fieldNames.add("event_name"); + fieldNames.add(EVENT_NAME_FIELD); fieldNames.add("distinct_id"); fieldNames.add("time"); diff --git a/src/test/java/io/cdap/plugin/mixpanel/source/batch/MixPanelSchemaHelperTest.java b/src/test/java/io/cdap/plugin/mixpanel/source/batch/MixPanelSchemaHelperTest.java index 6547206..4860caf 100644 --- a/src/test/java/io/cdap/plugin/mixpanel/source/batch/MixPanelSchemaHelperTest.java +++ b/src/test/java/io/cdap/plugin/mixpanel/source/batch/MixPanelSchemaHelperTest.java @@ -3,11 +3,14 @@ import com.github.tomakehurst.wiremock.client.WireMock; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; import com.github.tomakehurst.wiremock.junit.WireMockRule; +import io.cdap.cdap.api.data.schema.Schema; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import java.io.IOException; +import java.util.Set; +import java.util.stream.Collectors; public class MixPanelSchemaHelperTest { @Rule @@ -22,6 +25,33 @@ public void testEscapeFieldName() { Assert.assertEquals("mega_field", MixPanelSchemaHelper.escapeFieldName("$$$$$mega_field$$$$")); } + @Test + public void testGetSchema() throws IOException { + WireMock.stubFor( + WireMock.post(WireMock.urlMatching("/api/2.0/events/properties/top/")) + .withBasicAuth("secret", "") + .withRequestBody(WireMock.containing("event1")) + .willReturn(WireMock.aResponse().withBody(TestHelper.getResource("describe conflicting 1.json")) + ) + ); + + MixPanelBatchSourceConfig config = MixPanelBatchSourceConfig.builder() + .setMixPanelRestApiUrl(String.format("http://localhost:%d/", wireMockRule.port())) + .setApiSecret("secret") + .setSchemaByEvents("on") + .setEvents("event1") + .build(); + + Schema schema = MixPanelSchemaHelper.getSchemaFromConfig(config); + Set schemaFields = schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toSet()); + Assert.assertEquals(5, schemaFields.size()); + Assert.assertTrue(schemaFields.contains("regular_field")); + Assert.assertTrue(schemaFields.contains("distinct_id")); + Assert.assertTrue(schemaFields.contains("time")); + Assert.assertTrue(schemaFields.contains("event")); + Assert.assertTrue(schemaFields.contains("conflict")); + } + @Test public void testGetSchemaFromConfigConflictingFields() throws IOException { WireMock.stubFor( diff --git a/src/test/java/io/cdap/plugin/mixpanel/source/batch/etl/MixPanelBatchSourceTest.java b/src/test/java/io/cdap/plugin/mixpanel/source/batch/etl/MixPanelBatchSourceTest.java index ec37d26..8a83f5e 100644 --- a/src/test/java/io/cdap/plugin/mixpanel/source/batch/etl/MixPanelBatchSourceTest.java +++ b/src/test/java/io/cdap/plugin/mixpanel/source/batch/etl/MixPanelBatchSourceTest.java @@ -146,12 +146,12 @@ public void testMixPanelSource() throws Exception { // ensure records in expected order List outputRecords = MockSink.readOutput(outputManager).stream() - .sorted(Comparator.comparing(structuredRecord -> structuredRecord.get("event_name"))) + .sorted(Comparator.comparing(structuredRecord -> structuredRecord.get("event"))) .collect(Collectors.toList()); Assert.assertEquals(2, outputRecords.size()); - Assert.assertEquals("Custom Event", outputRecords.get(0).get("event_name")); - Assert.assertEquals("Plan Upgraded", outputRecords.get(1).get("event_name")); + Assert.assertEquals("Custom Event", outputRecords.get(0).get("event")); + Assert.assertEquals("Plan Upgraded", outputRecords.get(1).get("event")); Assert.assertEquals("data 1 value", outputRecords.get(0).get("data_1")); Assert.assertNull(outputRecords.get(1).get("data_1")); Assert.assertNull(outputRecords.get(0).get("New_Plan"));