Skip to content

Conversation

@vladhlinskiy
Copy link
Owner

@vladhlinskiy vladhlinskiy commented Jul 23, 2019

JIRA: https://issues.cask.co/browse/CDAP-15266
WIKI: https://wiki.cask.co/display/CE/MongoDB+database+plugin

In the scope of this PR:

  • MongoDB Plugin moved from 'hydrator-plugins'
  • Config properties changed to match current plugin standards
  • Field-level lineage support added

Plugins tested with current MongoDB release 4.0.10.

*/
public Schema getSchema() {
if (schema == null) {
throw new IllegalArgumentException("Schema cannot be null.");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this error type should not be thrown during validation phase. Should be InvalidConfigPropertyException. Also IAE usually thrown on illegal method param, in cases like this it looks more like IllegalStateException

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to:

    /**
     * @return the schema of the dataset
     */
    @Nullable
    public Schema getSchema() {
      try {
        return null == schema ? null : Schema.parseJson(schema);
      } catch (IOException e) {
        throw new InvalidConfigPropertyException("Invalid schema", e, MongoDBConstants.SCHEMA);
      }
    }

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
Schema schema = config.getSchema();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema may be null on this stage

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to validate the config first:

  @Override
  public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
    super.configurePipeline(pipelineConfigurer);
    config.validate();
    Schema schema = config.getSchema();
    ...

return builder.build();
}

public static void validateSchema(Schema schema) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class should not be responsible for validation

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case BOOLEAN:
case STRING:
return object;
case NULL:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this case is possible

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, thanks.

@Description("User to use to connect to the specified database. Required for databases that " +
"need authentication. Optional for databases that do not require authentication.")
@Nullable
public String user;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why no fields support macro?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

@Override
public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, BSONWritable>> emitter)
throws Exception {
BasicDBObjectBuilder bsonBuilder = BasicDBObjectBuilder.start();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this transformation should also be excluded to separate class, usually with Transformer suffix

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lineageRecorder.createExternalDataset(context.getInputSchema());
List<Schema.Field> fields = context.getInputSchema().getFields();
if (fields != null && !fields.isEmpty()) {
lineageRecorder.recordWrite("Write", "Wrote to MongoDB collection.",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could add collection name here

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collection name added.

throw new InvalidConfigPropertyException("Host must be specified", MongoDBConstants.HOST);
}
if (!containsMacro(MongoDBConstants.PORT)) {
if (null == port) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null should be on the right side

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
config.validate();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have input schema validation?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MongoDBBatchSink#validateInputSchema method created to check for supported types:

  private void validateInputSchema(Schema inputSchema) {
    List<Schema.Field> fields = inputSchema.getFields();
    if (fields == null || fields.isEmpty()) {
      throw new InvalidStageException("Input schema should contain fields");
    }
    for (Schema.Field field : fields) {
      Schema.Type fieldType = field.getSchema().isNullable() ?
        field.getSchema().getNonNullable().getType() : field.getSchema().getType();
      if (!SUPPORTED_FIELD_TYPES.contains(fieldType)) {
        String supportedTypes = SUPPORTED_FIELD_TYPES.stream()
          .map(Enum::name)
          .map(String::toLowerCase)
          .collect(Collectors.joining(", "));
        String errorMessage = String.format("Field '%s' is of unsupported type '%s'. Supported types are: %s.",
                                            field.getName(), fieldType, supportedTypes);
        throw new InvalidStageException(errorMessage);
      }
    }
  }

super.validate();
if (!containsMacro(MongoDBConstants.SCHEMA)) {
Schema parsedSchema = getSchema();
if (null == parsedSchema) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null should be on the right side

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

private final Schema schema;

public BSONConverter(Schema schema) throws IOException {
public BSONObjectToRecordTransformer(Schema schema) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add unit tests for both transformers

Copy link
Owner Author

@vladhlinskiy vladhlinskiy Jul 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -18,9 +18,12 @@

import com.google.common.base.Strings;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add unit tests for both configs

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants