Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
defaultRetryJitterFraction = 0.5
importBulkRoute = "/authzed.api.v1.PermissionsService/ImportBulkRelationships"
exportBulkRoute = "/authzed.api.v1.PermissionsService/ExportBulkRelationships"
watchRoute = "/authzed.api.v1.WatchService/Watch"
)

// NewClient defines an (overridable) means of creating a new client.
Expand Down Expand Up @@ -235,7 +236,7 @@ func DialOptsFromFlags(cmd *cobra.Command, token storage.Token) ([]grpc.DialOpti
// retrying the bulk import in backup/restore logic is handled manually.
// retrying bulk export is also handled manually, because the default behavior is
// to start at the beginning of the stream, which produces duplicate values.
selector.StreamClientInterceptor(retry.StreamClientInterceptor(retryOpts...), selector.MatchFunc(isNoneOf(importBulkRoute, exportBulkRoute))),
selector.StreamClientInterceptor(retry.StreamClientInterceptor(retryOpts...), selector.MatchFunc(isNoneOf(importBulkRoute, exportBulkRoute, watchRoute))),
}

if !cobrautil.MustGetBool(cmd, "skip-version-check") {
Expand Down
60 changes: 36 additions & 24 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,30 +133,36 @@ func TestGetCurrentTokenWithCLIOverrideWithoutSecretFile(t *testing.T) {
require.Equal(&bTrue, token.Insecure)
}

type fakeSchemaServer struct {
type fakeServer struct {
v1.UnimplementedSchemaServiceServer
v1.UnimplementedExperimentalServiceServer
v1.UnimplementedWatchServiceServer
v1.UnimplementedPermissionsServiceServer
testFunc func()
}

func (fss *fakeSchemaServer) ReadSchema(_ context.Context, _ *v1.ReadSchemaRequest) (*v1.ReadSchemaResponse, error) {
func (fss *fakeServer) ReadSchema(_ context.Context, _ *v1.ReadSchemaRequest) (*v1.ReadSchemaResponse, error) {
fss.testFunc()
return nil, status.Error(codes.Unavailable, "")
}

func (fss *fakeSchemaServer) ImportBulkRelationships(grpc.ClientStreamingServer[v1.ImportBulkRelationshipsRequest, v1.ImportBulkRelationshipsResponse]) error {
func (fss *fakeServer) ImportBulkRelationships(grpc.ClientStreamingServer[v1.ImportBulkRelationshipsRequest, v1.ImportBulkRelationshipsResponse]) error {
fss.testFunc()
return status.Errorf(codes.Aborted, "")
}

func (fss *fakeServer) Watch(*v1.WatchRequest, grpc.ServerStreamingServer[v1.WatchResponse]) error {
fss.testFunc()
return status.Errorf(codes.Unavailable, "")
}

func TestRetries(t *testing.T) {
ctx := t.Context()
var callCount uint
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()

fakeServer := &fakeSchemaServer{testFunc: func() {
fakeServer := &fakeServer{testFunc: func() {
callCount++
}}
v1.RegisterSchemaServiceServer(s, fakeServer)
Expand Down Expand Up @@ -185,22 +191,25 @@ func TestRetries(t *testing.T) {
c, err := authzed.NewClient("passthrough://bufnet", dialOpts...)
require.NoError(t, err)

_, err = c.ReadSchema(ctx, &v1.ReadSchemaRequest{})
grpcutil.RequireStatus(t, codes.Unavailable, err)
require.Equal(t, retries, callCount)
t.Run("read_schema", func(t *testing.T) {
_, err = c.ReadSchema(ctx, &v1.ReadSchemaRequest{})
grpcutil.RequireStatus(t, codes.Unavailable, err)
require.Equal(t, retries, callCount)
})
}

func TestDoesNotRetryBackupRestore(t *testing.T) {
func TestDoesNotRetry(t *testing.T) {
ctx := t.Context()
var callCount uint
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()

fakeServer := &fakeSchemaServer{testFunc: func() {
fakeServer := &fakeServer{testFunc: func() {
callCount++
}}
v1.RegisterPermissionsServiceServer(s, fakeServer)
v1.RegisterExperimentalServiceServer(s, fakeServer)
v1.RegisterWatchServiceServer(s, fakeServer)

go func() {
_ = s.Serve(lis)
Expand All @@ -226,20 +235,23 @@ func TestDoesNotRetryBackupRestore(t *testing.T) {
c, err := authzed.NewClientWithExperimentalAPIs("passthrough://bufnet", dialOpts...)
require.NoError(t, err)

ibc, err := c.ImportBulkRelationships(ctx)
require.NoError(t, err)
err = ibc.SendMsg(&v1.ImportBulkRelationshipsRequest{})
require.NoError(t, err)
_, err = ibc.CloseAndRecv()
grpcutil.RequireStatus(t, codes.Aborted, err)
require.Equal(t, uint(1), callCount)
t.Run("import_bulk", func(t *testing.T) {
ibc, err := c.ImportBulkRelationships(ctx)
require.NoError(t, err)
err = ibc.SendMsg(&v1.ImportBulkRelationshipsRequest{})
require.NoError(t, err)
_, err = ibc.CloseAndRecv()
grpcutil.RequireStatus(t, codes.Aborted, err)
require.Equal(t, uint(1), callCount)
})

callCount = 0
bic, err := c.ImportBulkRelationships(ctx)
require.NoError(t, err)
err = bic.SendMsg(&v1.ImportBulkRelationshipsRequest{})
require.NoError(t, err)
_, err = bic.CloseAndRecv()
grpcutil.RequireStatus(t, codes.Aborted, err)
require.Equal(t, uint(1), callCount)
t.Run("watch", func(t *testing.T) {
callCount = 0
watchReq, err := c.Watch(ctx, &v1.WatchRequest{})
require.NoError(t, err)
resp, err := watchReq.Recv()
require.Nil(t, resp)
grpcutil.RequireStatus(t, codes.Unavailable, err)
require.Equal(t, uint(1), callCount)
})
}
121 changes: 83 additions & 38 deletions internal/commands/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"syscall"
"time"

"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"

Expand Down Expand Up @@ -44,26 +47,29 @@ func RegisterWatchRelationshipCmd(parentCmd *cobra.Command) *cobra.Command {

var watchCmd = &cobra.Command{
Use: "watch [object_types, ...] [start_cursor]",
Short: "Watches the stream of relationship updates from the server",
Short: "Watches the stream of relationship updates and schema updates from the server",
Args: ValidationWrapper(cobra.RangeArgs(0, 2)),
RunE: watchCmdFunc,
Deprecated: "please use `zed relationships watch` instead",
}

var watchRelationshipsCmd = &cobra.Command{
Use: "watch [object_types, ...] [start_cursor]",
Short: "Watches the stream of relationship updates from the server",
Short: "Watches the stream of relationship updates and schema updates from the server",
Args: ValidationWrapper(cobra.RangeArgs(0, 2)),
RunE: watchCmdFunc,
}

func watchCmdFunc(cmd *cobra.Command, _ []string) error {
console.Printf("starting watch stream over types %v and revision %v\n", watchObjectTypes, watchRevision)

cli, err := client.NewClient(cmd)
client, err := client.NewClient(cmd)
if err != nil {
return err
}
return watchCmdFuncImpl(cmd, client, processResponse)
}

func watchCmdFuncImpl(cmd *cobra.Command, watchClient v1.WatchServiceClient, processResponse func(resp *v1.WatchResponse)) error {
console.Printf("starting watch stream over types %v and revision %v\n", watchObjectTypes, watchRevision)

relFilters := make([]*v1.RelationshipFilter, 0, len(watchRelationshipFilters))
for _, filter := range watchRelationshipFilters {
Expand All @@ -74,21 +80,26 @@ func watchCmdFunc(cmd *cobra.Command, _ []string) error {
relFilters = append(relFilters, relFilter)
}

ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

signalctx, interruptCancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
defer interruptCancel()

req := &v1.WatchRequest{
OptionalObjectTypes: watchObjectTypes,
OptionalRelationshipFilters: relFilters,
OptionalUpdateKinds: []v1.WatchKind{
v1.WatchKind_WATCH_KIND_INCLUDE_CHECKPOINTS, // keeps connection open during quiet periods
v1.WatchKind_WATCH_KIND_INCLUDE_SCHEMA_UPDATES,
},
}

if watchRevision != "" {
req.OptionalStartCursor = &v1.ZedToken{Token: watchRevision}
}

ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

signalctx, interruptCancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
defer interruptCancel()

watchStream, err := cli.Watch(ctx, req)
watchStream, err := watchClient.Watch(ctx, req)
if err != nil {
return err
}
Expand All @@ -104,40 +115,74 @@ func watchCmdFunc(cmd *cobra.Command, _ []string) error {
default:
resp, err := watchStream.Recv()
if err != nil {
return err
}
ok, err := isRetryable(err)
if !ok {
return err
}

for _, update := range resp.Updates {
if watchTimestamps {
console.Printf("%v: ", time.Now())
log.Trace().Err(err).Msg("will retry from the last known revision " + watchRevision)
req.OptionalStartCursor = &v1.ZedToken{Token: watchRevision}
watchStream, err = watchClient.Watch(ctx, req)
if err != nil {
return err
}
continue
}

switch update.Operation {
case v1.RelationshipUpdate_OPERATION_CREATE:
console.Printf("CREATED ")
processResponse(resp)
}
}
}

case v1.RelationshipUpdate_OPERATION_DELETE:
console.Printf("DELETED ")
func isRetryable(err error) (bool, error) {
statusErr, ok := status.FromError(err)
if !ok || (statusErr.Code() != codes.Unavailable) {
return false, err
}
return true, nil
}

case v1.RelationshipUpdate_OPERATION_TOUCH:
console.Printf("TOUCHED ")
}
func processResponse(resp *v1.WatchResponse) {
if resp.ChangesThrough != nil {
watchRevision = resp.ChangesThrough.Token
}

subjectRelation := ""
if update.Relationship.Subject.OptionalRelation != "" {
subjectRelation = " " + update.Relationship.Subject.OptionalRelation
}
if resp.SchemaUpdated {
if watchTimestamps {
console.Printf("%v: ", time.Now())
}
console.Println("SCHEMA UPDATED")
}

console.Printf("%s:%s %s %s:%s%s\n",
update.Relationship.Resource.ObjectType,
update.Relationship.Resource.ObjectId,
update.Relationship.Relation,
update.Relationship.Subject.Object.ObjectType,
update.Relationship.Subject.Object.ObjectId,
subjectRelation,
)
}
for _, update := range resp.Updates {
if watchTimestamps {
console.Printf("%v: ", time.Now())
}

switch update.Operation {
case v1.RelationshipUpdate_OPERATION_CREATE:
console.Printf("CREATED ")

case v1.RelationshipUpdate_OPERATION_DELETE:
console.Printf("DELETED ")

case v1.RelationshipUpdate_OPERATION_TOUCH:
console.Printf("TOUCHED ")
}

subjectRelation := ""
if update.Relationship.Subject.OptionalRelation != "" {
subjectRelation = " " + update.Relationship.Subject.OptionalRelation
}

console.Printf("%s:%s %s %s:%s%s\n",
update.Relationship.Resource.ObjectType,
update.Relationship.Resource.ObjectId,
update.Relationship.Relation,
update.Relationship.Subject.Object.ObjectType,
update.Relationship.Subject.Object.ObjectId,
subjectRelation,
)
}
}

Expand Down
Loading
Loading