Skip to content
This repository was archived by the owner on Feb 6, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions warehouse/datalake/schema-repository/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,17 @@ func (gl *GlueSchemaRepository) CreateSchema() (err error) {
}

func (gl *GlueSchemaRepository) getPartitionKeys() []*glue.Column {
timeWindowFormat, _ := gl.Warehouse.Destination.Config["timeWindowFormat"].(string)
if timeWindowFormat != "" {
// Assumes a well-formed partitioning format
columnName := strings.Split(timeWindowFormat, "=")[0]
return []*glue.Column{&glue.Column{Name: aws.String(columnName), Type: aws.String("date")}}
var partitionColumns []*glue.Column
for _, partitionPair := range gl.Warehouse.DestPartitionKeys {
// Assumes a well-formed partitioning pair
columnName := partitionPair["key"]
if columnName != "" {
partitionColumns = append(partitionColumns, &glue.Column{Name: aws.String(columnName), Type: aws.String("date")})
}
}

if len(partitionColumns) > 0 {
return partitionColumns
}
return nil
}
Expand Down
9 changes: 4 additions & 5 deletions warehouse/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,8 @@ func (job *UploadJobT) run() (err error) {
job.matchRowsInStagingAndLoadFiles()
job.recordLoadFileGenerationTimeStat(startLoadFileID, endLoadFileID)
if job.warehouse.Type == "S3_DATALAKE" {
timeWindowFormatI, timeWindowFormatAvailable := job.warehouse.Destination.Config["timeWindowFormat"]
if timeWindowFormatAvailable {
timeWindowFormat, _ := timeWindowFormatI.(string)
timeWindowFormat := warehouseutils.GetTimeWindowFormat(job.warehouse.DestPartitionKeys)
if timeWindowFormat != "" {
for tableName := range job.upload.UploadSchema {
loadFiles := job.GetLoadFilesMetadata(warehouseutils.GetLoadFilesOptionsT{
Table: tableName,
Expand All @@ -415,7 +414,7 @@ func (job *UploadJobT) run() (err error) {
})
// This is best done every 100 files, since it's a batch request for updates in Glue
partitionBatchSize := 99
for i := 0; i < len(loadFiles) && timeWindowFormat != ""; i += partitionBatchSize {
for i := 0; i < len(loadFiles); i += partitionBatchSize {
end := i + partitionBatchSize

if end > len(loadFiles) {
Expand Down Expand Up @@ -1672,7 +1671,7 @@ func (job *UploadJobT) createLoadFiles(generateAll bool) (startLoadFileID int64,
}

if job.warehouse.Type == "S3_DATALAKE" {
timeWindowFormat, _ := job.warehouse.Destination.Config["timeWindowFormat"].(string)
timeWindowFormat := warehouseutils.GetTimeWindowFormat(job.warehouse.DestPartitionKeys)
if timeWindowFormat != "" {
payload.LoadFilePrefix = stagingFile.TimeWindow.Format(timeWindowFormat)
}
Expand Down
37 changes: 32 additions & 5 deletions warehouse/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,12 @@ func loadConfig() {
}

type WarehouseT struct {
Source backendconfig.SourceT
Destination backendconfig.DestinationT
Namespace string
Type string
Identifier string
Source backendconfig.SourceT
Destination backendconfig.DestinationT
Namespace string
Type string
Identifier string
DestPartitionKeys []map[string]string
}

type DestinationT struct {
Expand Down Expand Up @@ -844,3 +845,29 @@ func GetSSLKeyDirPath(destinationID string) (whSSLRootDir string) {
sslDirPath := fmt.Sprintf("%s/dest-ssls/%s", directoryName, destinationID)
return sslDirPath
}

func GetPartitionKeysFromConfig(config map[string]interface{}) (partitionKeys []map[string]string) {
partitionKeysI, partitionKeysIAvailable := config["partitionKeys"]
if partitionKeysIAvailable {
partitionKeysString, _ := json.Marshal(partitionKeysI)
err := json.Unmarshal(partitionKeysString, &partitionKeys)
if err != nil {
pkgLogger.Errorf("Error extracting partition keys from destination config %v", err)
}
}
return
}

func GetTimeWindowFormat(partitionKeys []map[string]string) (timeWindowFormat string) {
// supports only single partition
// TODO: extend to allow multiple partition keys on table
if len(partitionKeys) > 0 {
partitionPair := partitionKeys[0]
key := partitionPair["key"]
val := partitionPair["value"]
if key != "" && val != "" {
timeWindowFormat = fmt.Sprintf("%v=%v", key, val)
}
}
return
}
22 changes: 12 additions & 10 deletions warehouse/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,12 @@ func (wh *HandleT) backendConfigSubscriber() {
}
namespace := wh.getNamespace(destination.Config, source, destination, wh.destType)
warehouse := warehouseutils.WarehouseT{
Source: source,
Destination: destination,
Namespace: namespace,
Type: wh.destType,
Identifier: warehouseutils.GetWarehouseIdentifier(wh.destType, source.ID, destination.ID),
Source: source,
Destination: destination,
Namespace: namespace,
Type: wh.destType,
Identifier: warehouseutils.GetWarehouseIdentifier(wh.destType, source.ID, destination.ID),
DestPartitionKeys: warehouseutils.GetPartitionKeysFromConfig(destination.Config),
}
wh.warehouses = append(wh.warehouses, warehouse)

Expand Down Expand Up @@ -1105,11 +1106,12 @@ func minimalConfigSubscriber() {
connectionsMap[destination.ID] = map[string]warehouseutils.WarehouseT{}
}
connectionsMap[destination.ID][source.ID] = warehouseutils.WarehouseT{
Destination: destination,
Namespace: namespace,
Type: wh.destType,
Source: source,
Identifier: warehouseutils.GetWarehouseIdentifier(wh.destType, source.ID, destination.ID),
Destination: destination,
Namespace: namespace,
Type: wh.destType,
Source: source,
Identifier: warehouseutils.GetWarehouseIdentifier(wh.destType, source.ID, destination.ID),
DestPartitionKeys: warehouseutils.GetPartitionKeysFromConfig(destination.Config),
}
connectionsMapLock.Unlock()
}
Expand Down