Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions database/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"strings"

"github.com/pressly/goose/v3/internal/dialect/dialectquery"
)
Expand All @@ -26,6 +27,10 @@ const (
DialectStarrocks Dialect = "starrocks"
)

var (
ClickhouseStore = &dialectquery.Clickhouse{}
)

// NewStore returns a new [Store] implementation for the given dialect.
func NewStore(dialect Dialect, tablename string) (Store, error) {
if tablename == "" {
Expand All @@ -35,7 +40,7 @@ func NewStore(dialect Dialect, tablename string) (Store, error) {
return nil, errors.New("dialect must not be empty")
}
lookup := map[Dialect]dialectquery.Querier{
DialectClickHouse: &dialectquery.Clickhouse{},
DialectClickHouse: ClickhouseStore,
DialectMSSQL: &dialectquery.Sqlserver{},
DialectMySQL: &dialectquery.Mysql{},
DialectPostgres: &dialectquery.Postgres{},
Expand Down Expand Up @@ -69,9 +74,15 @@ func (s *store) Tablename() string {
}

func (s *store) CreateVersionTable(ctx context.Context, db DBTxConn) error {
q := s.querier.CreateTable(s.tablename)
if _, err := db.ExecContext(ctx, q); err != nil {
return fmt.Errorf("failed to create version table %q: %w", s.tablename, err)
queries := strings.Split(s.querier.CreateTable(s.tablename), ";")
for _, q := range queries {
q = strings.TrimSpace(q)
if q == "" {
continue
}
if _, err := db.ExecContext(ctx, q); err != nil {
return err
}
}
return nil
}
Expand Down
44 changes: 38 additions & 6 deletions internal/dialect/dialectquery/clickhouse.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,53 @@
package dialectquery

import "fmt"
import (
"fmt"
"strings"
)

type Clickhouse struct{}
type Clickhouse struct {
ClusterName string
}

var _ Querier = (*Clickhouse)(nil)

func (c *Clickhouse) CreateTable(tableName string) string {
q := `CREATE TABLE IF NOT EXISTS %s (
if c.ClusterName != "" {
var dbName string
split := strings.SplitN(tableName, ".", 2)
if len(split) != 2 {
dbName = "default"
} else {
dbName = split[0]
tableName = split[1]
}

fullTableName := fmt.Sprintf("%s.%s", dbName, tableName)
const localPostfix = "_local_v1"

return `CREATE TABLE IF NOT EXISTS ` + fullTableName + localPostfix + ` ON CLUSTER '` + c.ClusterName + `' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime default now()
)
)
ENGINE = ReplicatedMergeTree('
/clickhouse/{installation}/{cluster}/tables/{shard}/` + dbName + `/` + tableName + localPostfix + `', '{replica}')
ORDER BY (date);

CREATE TABLE IF NOT EXISTS ` + fullTableName + ` ON CLUSTER '` + c.ClusterName + `' AS ` + fullTableName + localPostfix + `
ENGINE = Distributed('` + c.ClusterName + `', ` + dbName + `, '` + tableName + localPostfix + `', rand());
ORDER BY (date); `
}

return fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime default now()
)
ENGINE = MergeTree()
ORDER BY (date)`
return fmt.Sprintf(q, tableName)
ORDER BY (date)`, tableName)
}

func (c *Clickhouse) InsertVersion(tableName string) string {
Expand Down
15 changes: 12 additions & 3 deletions internal/dialect/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"time"

"github.com/pressly/goose/v3/internal/dialect/dialectquery"
Expand Down Expand Up @@ -94,9 +95,17 @@ type store struct {
var _ Store = (*store)(nil)

func (s *store) CreateVersionTable(ctx context.Context, tx *sql.Tx, tableName string) error {
q := s.querier.CreateTable(tableName)
_, err := tx.ExecContext(ctx, q)
return err
queries := strings.Split(s.querier.CreateTable(tableName), ";")
for _, q := range queries {
q = strings.TrimSpace(q)
if q == "" {
continue
}
if _, err := tx.ExecContext(ctx, q); err != nil {
return err
}
}
return nil
}

func (s *store) InsertVersion(ctx context.Context, tx *sql.Tx, tableName string, version int64) error {
Expand Down
Loading