Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker/scripts/mod_world_0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3441,6 +3441,10 @@ COMMIT;

SET FOREIGN_KEY_CHECKS = 1;

DROP DATABASE IF EXISTS meta;
CREATE DATABASE IF NOT EXISTS meta;
USE meta;

CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`branch_id` bigint NOT NULL,
Expand Down
4 changes: 4 additions & 0 deletions docker/scripts/mod_world_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3432,6 +3432,10 @@ COMMIT;

SET FOREIGN_KEY_CHECKS = 1;

DROP DATABASE IF EXISTS meta;
CREATE DATABASE IF NOT EXISTS meta;
USE meta;

CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`branch_id` bigint NOT NULL,
Expand Down
4 changes: 4 additions & 0 deletions docker/scripts/range_world_0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3901,6 +3901,10 @@ COMMIT;

SET FOREIGN_KEY_CHECKS = 1;

DROP DATABASE IF EXISTS meta;
CREATE DATABASE IF NOT EXISTS meta;
USE meta;

CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`branch_id` bigint NOT NULL,
Expand Down
4 changes: 4 additions & 0 deletions docker/scripts/range_world_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2966,6 +2966,10 @@ COMMIT;

SET FOREIGN_KEY_CHECKS = 1;

DROP DATABASE IF EXISTS meta;
CREATE DATABASE IF NOT EXISTS meta;
USE meta;

CREATE TABLE `undo_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`branch_id` bigint NOT NULL,
Expand Down
15 changes: 12 additions & 3 deletions pkg/executor/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cectc/dbpack/pkg/topo"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/parser/ast"
"github.com/cectc/dbpack/third_party/parser/format"
)

type ShardingExecutor struct {
Expand Down Expand Up @@ -207,15 +208,23 @@ func (executor *ShardingExecutor) ExecutorComQuery(ctx context.Context, sql stri
}
}()

var plan proto.Plan
var (
plan proto.Plan
sb strings.Builder
)

log.Debugf("query: %s", sql)
connectionID := proto.ConnectionID(spanCtx)
queryStmt := proto.QueryStmt(spanCtx)
if queryStmt == nil {
return nil, 0, errors.New("query stmt should not be nil")
}
if err := queryStmt.Restore(format.NewRestoreCtx(constant.DBPackRestoreFormat, &sb)); err != nil {
return nil, 0, err
}
newSql := sb.String()
spanCtx = proto.WithSqlText(spanCtx, newSql)

log.Debugf("connectionID: %d, query: %s", connectionID, newSql)
switch stmt := queryStmt.(type) {
case *ast.SetStmt:
if shouldStartTransaction(stmt) {
Expand Down Expand Up @@ -278,7 +287,7 @@ func (executor *ShardingExecutor) ExecutorComQuery(ctx context.Context, sql stri
case *ast.SelectStmt:
if stmt.Fields != nil && len(stmt.Fields.Fields) > 0 {
if _, ok := stmt.Fields.Fields[0].Expr.(*ast.VariableExpr); ok {
return executor.executors[0].Query(spanCtx, sql)
return executor.executors[0].Query(spanCtx, newSql)
}
}
txi, ok := executor.localTransactionMap.Load(connectionID)
Expand Down
11 changes: 8 additions & 3 deletions pkg/listener/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,6 @@ func (l *MysqlListener) ExecuteCommand(ctx context.Context, c *mysql.Conn, data
connectionID := proto.ConnectionID(ctx)
l.executor.ConnectionClose(proto.WithConnectionID(ctx, connectionID))
log.Debugf("connection closed, id: %d", connectionID)
return errors.New("ComQuit")
case constant.ComInitDB:
db := string(data[1:])
c.RecycleReadPacket()
Expand Down Expand Up @@ -549,8 +548,14 @@ func (l *MysqlListener) ExecuteCommand(ctx context.Context, c *mysql.Conn, data
return nil
}

if showStmt, ok := stmt.(*ast.ShowStmt); ok && showStmt.Tp == ast.ShowTables {
showStmt.DBName = c.Database()
if showStmt, ok := stmt.(*ast.ShowStmt); ok {
switch showStmt.Tp {
case ast.ShowTables, ast.ShowTableStatus, ast.ShowColumns, ast.ShowIndex, ast.ShowTriggers:
if misc.IsBlank(showStmt.DBName) {
showStmt.DBName = c.Database()
}
default:
}
}

if !misc.IsBlank(c.Database()) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/plan/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ func (p *DeletePlan) Execute(ctx context.Context, hints ...*ast.TableOptimizerHi
return nil, 0, errors.WithStack(err)
}
}
schema := proto.Schema(ctx)
for _, table := range p.Tables {
sb.Reset()
if err = p.generate(&sb, table, hints...); err != nil {
if err = p.generate(&sb, schema, table, hints...); err != nil {
return nil, 0, errors.Wrap(err, "failed to generate sql for delete")
}
sql := sb.String()
Expand Down Expand Up @@ -114,7 +115,7 @@ func (p *DeletePlan) Execute(ctx context.Context, hints ...*ast.TableOptimizerHi
return mysqlResult, warnings, nil
}

func (p *DeletePlan) generate(sb *strings.Builder, table string, hints ...*ast.TableOptimizerHint) error {
func (p *DeletePlan) generate(sb *strings.Builder, schema, table string, hints ...*ast.TableOptimizerHint) error {
ctx := format.NewRestoreCtx(constant.DBPackRestoreFormat, sb)
ctx.WriteKeyWord("DELETE ")

Expand All @@ -133,7 +134,7 @@ func (p *DeletePlan) generate(sb *strings.Builder, table string, hints ...*ast.T
}

ctx.WriteKeyWord("FROM ")
ctx.WritePlain(table)
ctx.WritePlainf("%s.%s", schema, table)
if p.Stmt.Where != nil {
ctx.WriteKeyWord(" WHERE ")
if err := p.Stmt.Where.Restore(ctx); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/plan/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ func TestDeleteOnSingleDBPlan(t *testing.T) {
deleteSql: "delete from student where id in (?,?)",
tables: []string{"student_1", "student_5"},
expectedGenerateSqls: []string{
"DELETE FROM student_1 WHERE `id` IN (?,?)",
"DELETE FROM student_5 WHERE `id` IN (?,?)",
"DELETE FROM school.student_1 WHERE `id` IN (?,?)",
"DELETE FROM school.student_5 WHERE `id` IN (?,?)",
},
},
{
deleteSql: "delete from student where id = 9",
tables: []string{"student_9"},
expectedGenerateSqls: []string{
"DELETE FROM student_9 WHERE `id`=9",
"DELETE FROM school.student_9 WHERE `id`=9",
},
},
}
Expand All @@ -68,7 +68,7 @@ func TestDeleteOnSingleDBPlan(t *testing.T) {
}
for i, table := range plan.Tables {
var sb strings.Builder
err := plan.generate(&sb, table)
err := plan.generate(&sb, "school", table)
assert.Nil(t, err)
assert.Equal(t, c.expectedGenerateSqls[i], sb.String())
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/plan/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func (p *InsertPlan) Execute(ctx context.Context, _ ...*ast.TableOptimizerHint)
tx proto.Tx
err error
)
if err = p.generate(&sb); err != nil {
schema := proto.Schema(ctx)
if err = p.generate(&sb, schema); err != nil {
return nil, 0, errors.WithStack(err)
}
sql := sb.String()
Expand Down Expand Up @@ -77,13 +78,13 @@ func (p *InsertPlan) Execute(ctx context.Context, _ ...*ast.TableOptimizerHint)
}
}

func (p *InsertPlan) generate(sb *strings.Builder) (err error) {
func (p *InsertPlan) generate(sb *strings.Builder, schema string) (err error) {
ctx := format.NewRestoreCtx(constant.DBPackRestoreFormat, sb)

ctx.WriteKeyWord("INSERT ")
ctx.WriteKeyWord("INTO ")

ctx.WritePlain(p.Table)
ctx.WritePlainf("%s.%s", schema, p.Table)

ctx.WritePlain("(")
columnLen := len(p.Columns)
Expand Down
4 changes: 2 additions & 2 deletions pkg/plan/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestInsertPlan(t *testing.T) {
{
insertSql: "insert into student(id, name, gender, age) values(?,?,?,?)",
table: "student_5",
expectedGenerateSql: "INSERT INTO student_5(id,name,gender,age) VALUES (?,?,?,?)",
expectedGenerateSql: "INSERT INTO school.student_5(id,name,gender,age) VALUES (?,?,?,?)",
},
}

Expand All @@ -59,7 +59,7 @@ func TestInsertPlan(t *testing.T) {
Executor: nil,
}
var sb strings.Builder
err = plan.generate(&sb)
err = plan.generate(&sb, "school")
assert.Nil(t, err)
assert.Equal(t, c.expectedGenerateSql, sb.String())
})
Expand Down
15 changes: 10 additions & 5 deletions pkg/plan/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,22 @@ func (p *QueryOnSingleDBPlan) Execute(ctx context.Context, hints ...*ast.TableOp
}

func (p *QueryOnSingleDBPlan) generate(ctx context.Context, sb *strings.Builder, args *[]interface{}) (err error) {
schema := proto.Schema(ctx)
stmtVal := deepcopy.Copy(p.Stmt)
stmt := stmtVal.(*ast.SelectStmt)
switch len(p.Tables) {
case 0:
err = p.generateSelect("", stmt, sb, p.Limit)
err = p.generateSelect(schema, "", stmt, sb, p.Limit)
p.appendArgs(args)
case 1:
// single shard table
err = p.generateSelect(p.Tables[0], stmt, sb, p.Limit)
err = p.generateSelect(schema, p.Tables[0], stmt, sb, p.Limit)
p.appendArgs(args)
default:
sb.WriteString("SELECT * FROM (")

sb.WriteByte('(')
if err = p.generateSelect(p.Tables[0], stmt, sb, p.Limit); err != nil {
if err = p.generateSelect(schema, p.Tables[0], stmt, sb, p.Limit); err != nil {
return
}
sb.WriteByte(')')
Expand All @@ -127,7 +128,7 @@ func (p *QueryOnSingleDBPlan) generate(ctx context.Context, sb *strings.Builder,

sb.WriteString(" UNION ALL ")
sb.WriteByte('(')
if err = p.generateSelect(p.Tables[i], stmt, sb, p.Limit); err != nil {
if err = p.generateSelect(schema, p.Tables[i], stmt, sb, p.Limit); err != nil {
return
}
sb.WriteByte(')')
Expand Down Expand Up @@ -242,11 +243,12 @@ func (p *QueryOnMultiDBPlan) Execute(ctx context.Context, _ ...*ast.TableOptimiz
return result, warn, nil
}

func (p *QueryOnSingleDBPlan) generateSelect(table string, stmt *ast.SelectStmt, sb *strings.Builder, limit *Limit) error {
func (p *QueryOnSingleDBPlan) generateSelect(schema, table string, stmt *ast.SelectStmt, sb *strings.Builder, limit *Limit) error {
vi := &JoinVisitor{
fieldList: stmt.Fields,
where: stmt.Where,
orderBy: stmt.OrderBy,
schema: schema,
table: table,
algorithms: p.Algorithms,
globalTables: p.GlobalTables,
Expand Down Expand Up @@ -339,6 +341,7 @@ type JoinVisitor struct {
where ast.ExprNode
orderBy *ast.OrderByClause

schema string
table string
algorithms map[string]cond.ShardingAlgorithm
globalTables map[string]bool
Expand Down Expand Up @@ -410,11 +413,13 @@ func (s *JoinVisitor) Leave(n ast.Node) (node ast.Node, ok bool) {
s.orderBy.Accept(visitor2)
}
}
secondTable.Schema = model.NewCIStr(s.schema)
secondTable.Name = model.NewCIStr(joinTable)
}
}
}
}
firstTable.Schema = model.NewCIStr(s.schema)
firstTable.Name = model.NewCIStr(s.table)
return n, true
}
Expand Down
36 changes: 19 additions & 17 deletions pkg/plan/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/cectc/dbpack/pkg/cond"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/topo"
"github.com/cectc/dbpack/pkg/visitor"
"github.com/cectc/dbpack/third_party/parser"
Expand All @@ -43,35 +44,35 @@ func TestQueryOnSingleDBPlan(t *testing.T) {
tables: []string{"student_1", "student_5"},
pk: "id",
args: []interface{}{1, 5},
expectedGenerateSql: "SELECT * FROM ((SELECT * FROM `student_1` WHERE `id` IN (?,?)) UNION ALL (SELECT * " +
"FROM `student_5` WHERE `id` IN (?,?))) t ORDER BY `t`.`id` ASC",
expectedGenerateSql: "SELECT * FROM ((SELECT * FROM `school`.`student_1` WHERE `id` IN (?,?)) UNION ALL " +
"(SELECT * FROM `school`.`student_5` WHERE `id` IN (?,?))) t ORDER BY `t`.`id` ASC",
},
{
selectSql: "select * from student where id in (?,?) order by id desc",
tables: []string{"student_1", "student_5"},
pk: "id",
args: []interface{}{1, 5},
expectedGenerateSql: "SELECT * FROM ((SELECT * FROM `student_1` WHERE `id` IN (?,?) ORDER BY `id` DESC) " +
"UNION ALL (SELECT * FROM `student_5` WHERE `id` IN (?,?) ORDER BY `id` DESC)) t ORDER BY `t`.`id` DESC",
expectedGenerateSql: "SELECT * FROM ((SELECT * FROM `school`.`student_1` WHERE `id` IN (?,?) ORDER BY `id` DESC) " +
"UNION ALL (SELECT * FROM `school`.`student_5` WHERE `id` IN (?,?) ORDER BY `id` DESC)) t ORDER BY `t`.`id` DESC",
},
{
selectSql: "select * from student where id in (?,?) order by id desc limit ?, ?",
tables: []string{"student_1", "student_5"},
pk: "id",
args: []interface{}{1, 5, 1000, 20},
expectedGenerateSql: "SELECT * FROM ((SELECT * FROM `student_1` WHERE `id` IN (?,?) ORDER BY `id` DESC " +
"LIMIT 1020) UNION ALL (SELECT * FROM `student_5` WHERE `id` IN (?,?) ORDER BY `id` DESC LIMIT 1020)) t ORDER BY `t`.`id` DESC",
expectedGenerateSql: "SELECT * FROM ((SELECT * FROM `school`.`student_1` WHERE `id` IN (?,?) ORDER BY `id` DESC " +
"LIMIT 1020) UNION ALL (SELECT * FROM `school`.`student_5` WHERE `id` IN (?,?) ORDER BY `id` DESC LIMIT 1020)) t ORDER BY `t`.`id` DESC",
},
{
selectSql: "select student.id, student.name, city.province from student left join city on city.name = " +
"student.native_place where student.id in (?,?) order by student.id desc limit ?, ?",
tables: []string{"student_1", "student_5"},
pk: "id",
args: []interface{}{1, 5, 1000, 20},
expectedGenerateSql: "SELECT * FROM ((SELECT `student_1`.`id`,`student_1`.`name`,`city`.`province` FROM `student_1` " +
expectedGenerateSql: "SELECT * FROM ((SELECT `student_1`.`id`,`student_1`.`name`,`city`.`province` FROM `school`.`student_1` " +
"LEFT JOIN `city` ON `city`.`name`=`student_1`.`native_place` WHERE `student_1`.`id` IN (?,?) " +
"ORDER BY `student_1`.`id` DESC LIMIT 1020) UNION ALL (SELECT `student_5`.`id`,`student_5`.`name`,`city`.`province` " +
"FROM `student_5` LEFT JOIN `city` ON `city`.`name`=`student_5`.`native_place` WHERE `student_5`.`id` IN (?,?) " +
"FROM `school`.`student_5` LEFT JOIN `city` ON `city`.`name`=`student_5`.`native_place` WHERE `student_5`.`id` IN (?,?) " +
"ORDER BY `student_5`.`id` DESC LIMIT 1020)) t ORDER BY `t`.`id` DESC",
},
{
Expand All @@ -80,9 +81,9 @@ func TestQueryOnSingleDBPlan(t *testing.T) {
tables: []string{"student_1", "student_5"},
pk: "id",
args: []interface{}{1, 5, 1000, 20},
expectedGenerateSql: "SELECT * FROM ((SELECT `s`.`id`,`s`.`name`,`city`.`province` FROM `student_1` AS `s` " +
expectedGenerateSql: "SELECT * FROM ((SELECT `s`.`id`,`s`.`name`,`city`.`province` FROM `school`.`student_1` AS `s` " +
"LEFT JOIN `city` ON `city`.`name`=`s`.`native_place` WHERE `s`.`id` IN (?,?) ORDER BY `s`.`id` DESC LIMIT 1020) " +
"UNION ALL (SELECT `s`.`id`,`s`.`name`,`city`.`province` FROM `student_5` AS `s` LEFT JOIN `city` " +
"UNION ALL (SELECT `s`.`id`,`s`.`name`,`city`.`province` FROM `school`.`student_5` AS `s` LEFT JOIN `city` " +
"ON `city`.`name`=`s`.`native_place` WHERE `s`.`id` IN (?,?) ORDER BY `s`.`id` DESC LIMIT 1020)) t ORDER BY `t`.`id` DESC",
},
{
Expand All @@ -91,10 +92,10 @@ func TestQueryOnSingleDBPlan(t *testing.T) {
tables: []string{"student_1", "student_5"},
pk: "id",
args: []interface{}{1, 5, 1000, 20},
expectedGenerateSql: "SELECT * FROM ((SELECT `student_1`.`id`,`student_1`.`name`,`exam_1`.`grade` FROM `student_1` " +
"LEFT JOIN `exam_1` ON `exam_1`.`student_id`=`student_1`.`id` WHERE `student_1`.`id` IN (?,?) " +
expectedGenerateSql: "SELECT * FROM ((SELECT `student_1`.`id`,`student_1`.`name`,`exam_1`.`grade` FROM `school`.`student_1` " +
"LEFT JOIN `school`.`exam_1` ON `exam_1`.`student_id`=`student_1`.`id` WHERE `student_1`.`id` IN (?,?) " +
"ORDER BY `student_1`.`id` DESC LIMIT 1020) UNION ALL (SELECT `student_5`.`id`,`student_5`.`name`,`exam_5`.`grade` " +
"FROM `student_5` LEFT JOIN `exam_5` ON `exam_5`.`student_id`=`student_5`.`id` WHERE `student_5`.`id` IN (?,?) " +
"FROM `school`.`student_5` LEFT JOIN `school`.`exam_5` ON `exam_5`.`student_id`=`student_5`.`id` WHERE `student_5`.`id` IN (?,?) " +
"ORDER BY `student_5`.`id` DESC LIMIT 1020)) t ORDER BY `t`.`id` DESC",
},
{
Expand All @@ -103,9 +104,9 @@ func TestQueryOnSingleDBPlan(t *testing.T) {
tables: []string{"student_1", "student_5"},
pk: "id",
args: []interface{}{1, 5, 1000, 20},
expectedGenerateSql: "SELECT * FROM ((SELECT `s`.`id`,`s`.`name`,`e`.`grade` FROM `student_1` AS `s` " +
"LEFT JOIN `exam_1` AS `e` ON `e`.`student_id`=`s`.`id` WHERE `s`.`id` IN (?,?) ORDER BY `s`.`id` DESC LIMIT 1020) " +
"UNION ALL (SELECT `s`.`id`,`s`.`name`,`e`.`grade` FROM `student_5` AS `s` LEFT JOIN `exam_5` AS `e` " +
expectedGenerateSql: "SELECT * FROM ((SELECT `s`.`id`,`s`.`name`,`e`.`grade` FROM `school`.`student_1` AS `s` " +
"LEFT JOIN `school`.`exam_1` AS `e` ON `e`.`student_id`=`s`.`id` WHERE `s`.`id` IN (?,?) ORDER BY `s`.`id` DESC LIMIT 1020) " +
"UNION ALL (SELECT `s`.`id`,`s`.`name`,`e`.`grade` FROM `school`.`student_5` AS `s` LEFT JOIN `school`.`exam_5` AS `e` " +
"ON `e`.`student_id`=`s`.`id` WHERE `s`.`id` IN (?,?) ORDER BY `s`.`id` DESC LIMIT 1020)) t ORDER BY `t`.`id` DESC",
},
}
Expand Down Expand Up @@ -137,7 +138,8 @@ func TestQueryOnSingleDBPlan(t *testing.T) {
args []interface{}
)
plan.castLimit()
err = plan.generate(context.Background(), &sb, &args)
ctx := proto.WithSchema(context.Background(), "school")
err = plan.generate(ctx, &sb, &args)
assert.Nil(t, err)
assert.Equal(t, c.expectedGenerateSql, sb.String())
})
Expand Down
Loading
Loading