-
Notifications
You must be signed in to change notification settings - Fork 0
CDAP-15266 Revive mongodb plugins #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
| */ | ||
| public Schema getSchema() { | ||
| if (schema == null) { | ||
| throw new IllegalArgumentException("Schema cannot be null."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this error type should not be thrown during validation phase. Should be InvalidConfigPropertyException. Also IAE usually thrown on illegal method param, in cases like this it looks more like IllegalStateException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to:
/**
* @return the schema of the dataset
*/
@Nullable
public Schema getSchema() {
try {
return null == schema ? null : Schema.parseJson(schema);
} catch (IOException e) {
throw new InvalidConfigPropertyException("Invalid schema", e, MongoDBConstants.SCHEMA);
}
}
| @Override | ||
| public void configurePipeline(PipelineConfigurer pipelineConfigurer) { | ||
| super.configurePipeline(pipelineConfigurer); | ||
| Schema schema = config.getSchema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schema may be null on this stage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to validate the config first:
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
config.validate();
Schema schema = config.getSchema();
...
| return builder.build(); | ||
| } | ||
|
|
||
| public static void validateSchema(Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class should not be responsible for validation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to MongoDBSourceConfig#validate.
| case BOOLEAN: | ||
| case STRING: | ||
| return object; | ||
| case NULL: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this case is possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, thanks.
| @Description("User to use to connect to the specified database. Required for databases that " + | ||
| "need authentication. Optional for databases that do not require authentication.") | ||
| @Nullable | ||
| public String user; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why no fields support macro?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
| @Override | ||
| public void transform(StructuredRecord input, Emitter<KeyValue<NullWritable, BSONWritable>> emitter) | ||
| throws Exception { | ||
| BasicDBObjectBuilder bsonBuilder = BasicDBObjectBuilder.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this transformation should also be excluded to separate class, usually with Transformer suffix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RecordToBSONWritableTransformer class created.
| lineageRecorder.createExternalDataset(context.getInputSchema()); | ||
| List<Schema.Field> fields = context.getInputSchema().getFields(); | ||
| if (fields != null && !fields.isEmpty()) { | ||
| lineageRecorder.recordWrite("Write", "Wrote to MongoDB collection.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could add collection name here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collection name added.
| throw new InvalidConfigPropertyException("Host must be specified", MongoDBConstants.HOST); | ||
| } | ||
| if (!containsMacro(MongoDBConstants.PORT)) { | ||
| if (null == port) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null should be on the right side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
| @Override | ||
| public void configurePipeline(PipelineConfigurer pipelineConfigurer) { | ||
| super.configurePipeline(pipelineConfigurer); | ||
| config.validate(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have input schema validation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MongoDBBatchSink#validateInputSchema method created to check for supported types:
private void validateInputSchema(Schema inputSchema) {
List<Schema.Field> fields = inputSchema.getFields();
if (fields == null || fields.isEmpty()) {
throw new InvalidStageException("Input schema should contain fields");
}
for (Schema.Field field : fields) {
Schema.Type fieldType = field.getSchema().isNullable() ?
field.getSchema().getNonNullable().getType() : field.getSchema().getType();
if (!SUPPORTED_FIELD_TYPES.contains(fieldType)) {
String supportedTypes = SUPPORTED_FIELD_TYPES.stream()
.map(Enum::name)
.map(String::toLowerCase)
.collect(Collectors.joining(", "));
String errorMessage = String.format("Field '%s' is of unsupported type '%s'. Supported types are: %s.",
field.getName(), fieldType, supportedTypes);
throw new InvalidStageException(errorMessage);
}
}
}
| super.validate(); | ||
| if (!containsMacro(MongoDBConstants.SCHEMA)) { | ||
| Schema parsedSchema = getSchema(); | ||
| if (null == parsedSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null should be on the right side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
| private final Schema schema; | ||
|
|
||
| public BSONConverter(Schema schema) throws IOException { | ||
| public BSONObjectToRecordTransformer(Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add unit tests for both transformers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| @@ -18,9 +18,12 @@ | |||
|
|
|||
| import com.google.common.base.Strings; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add unit tests for both configs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JIRA: https://issues.cask.co/browse/CDAP-15266
WIKI: https://wiki.cask.co/display/CE/MongoDB+database+plugin
In the scope of this PR:
Plugins tested with current MongoDB release 4.0.10.