diff --git a/Makefile b/Makefile index e426b95..d3bd27b 100644 --- a/Makefile +++ b/Makefile @@ -53,3 +53,7 @@ gen-protoc: protoc --go_out=. --go_opt=paths=source_relative \ --go-grpc_out=. --go-grpc_opt=paths=source_relative \ gravity.proto + @cd dns/proto && \ + protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + dns.proto diff --git a/dns/aether.go b/dns/aether.go index e6c6a11..a51a8c0 100644 --- a/dns/aether.go +++ b/dns/aether.go @@ -8,7 +8,7 @@ import ( "strings" "time" - cstr "github.com/agentuity/go-common/string" + pb "github.com/agentuity/go-common/dns/proto" "github.com/google/uuid" "github.com/redis/go-redis/v9" ) @@ -19,6 +19,8 @@ type DNSBaseAction struct { Reply string `json:"reply,omitempty"` } +// DNSAddAction represents a DNS record addition request. +// Deprecated: This type will be replaced by the protobuf-generated types in dns/proto. type DNSAddAction struct { DNSBaseAction Name string `json:"name"` @@ -74,6 +76,8 @@ func (a *DNSAddAction) WithPort(port int) *DNSAddAction { return a } +// DNSDeleteAction represents a DNS record deletion request. +// Deprecated: This type will be replaced by the protobuf-generated types in dns/proto. type DNSDeleteAction struct { DNSBaseAction // Name is the name of the DNS record to delete. @@ -83,6 +87,8 @@ type DNSDeleteAction struct { IDs []string `json:"ids,omitempty"` } +// DNSCertAction represents a certificate request for a domain. +// Deprecated: This type will be replaced by the protobuf-generated types in dns/proto. type DNSCertAction struct { DNSBaseAction Name string `json:"name"` @@ -205,93 +211,59 @@ func (a DNSBaseAction) GetAction() string { return a.Action } -// Transport is an interface for a transport layer for the DNS server -type Transport interface { - Subscribe(ctx context.Context, channel string) Subscriber - Publish(ctx context.Context, channel string, payload []byte) error -} - -// Message is a message from the transport layer -type Message struct { - Payload []byte -} - -// Subscriber is an interface for a subscriber to the transport layer -type Subscriber interface { - // Close closes the subscriber - Close() error - // Channel returns a channel of messages - Channel() <-chan *Message +// transport is an interface for a transport layer for the DNS server +type transport interface { + // Publish sends a DNS action and waits for a response + Publish(ctx context.Context, action DNSAction) ([]byte, error) + // PublishAsync sends a DNS action without waiting for a response + PublishAsync(ctx context.Context, action DNSAction) error } type option struct { - transport Transport + transport transport timeout time.Duration reply bool } -type optionHandler func(*option) +type OptionHandler func(*option) // WithReply sets whether the DNS action should wait for a reply from the DNS server -func WithReply(reply bool) optionHandler { +func WithReply(reply bool) OptionHandler { return func(o *option) { o.reply = reply } } // WithTransport sets a custom transport for the DNS action -func WithTransport(transport Transport) optionHandler { +// +// for Testing +func withTransport(t transport) OptionHandler { return func(o *option) { - o.transport = transport + o.transport = t } } // WithTimeout sets a custom timeout for the DNS action -func WithTimeout(timeout time.Duration) optionHandler { +func WithTimeout(timeout time.Duration) OptionHandler { return func(o *option) { o.timeout = timeout } } // WithRedis uses a redis client as the transport for the DNS action -func WithRedis(redis *redis.Client) optionHandler { +func WithRedis(redis *redis.Client) OptionHandler { return func(o *option) { o.transport = &redisTransport{redis: redis} } } -type redisSubscriber struct { - sub *redis.PubSub -} - -var _ Subscriber = (*redisSubscriber)(nil) - -func (s *redisSubscriber) Close() error { - return s.sub.Close() -} - -func (s *redisSubscriber) Channel() <-chan *Message { - ch := make(chan *Message) - go func() { - for msg := range s.sub.Channel() { - ch <- &Message{Payload: []byte(msg.Payload)} +// WithGRPC uses a gRPC client as the transport for the DNS action +func WithGRPC(client pb.DNSServiceClient) OptionHandler { + return func(o *option) { + o.transport = &grpcTransport{ + client: client, } - }() - return ch -} - -type redisTransport struct { - redis *redis.Client -} - -var _ Transport = (*redisTransport)(nil) - -func (t *redisTransport) Subscribe(ctx context.Context, channel string) Subscriber { - return &redisSubscriber{sub: t.redis.Subscribe(ctx, channel)} -} - -func (t *redisTransport) Publish(ctx context.Context, channel string, payload []byte) error { - return t.redis.Publish(ctx, channel, payload).Err() + } } // ActionFromChannel returns the action from the channel string @@ -332,7 +304,7 @@ func NewDNSResponse[R any, T TypedDNSAction[R]](action T, data *R, err error) *D } // SendDNSAction sends a DNS action to the DNS server with a timeout. If the timeout is 0, the default timeout will be used. -func SendDNSAction[R any, T TypedDNSAction[R]](ctx context.Context, action T, opts ...optionHandler) (*R, error) { +func SendDNSAction[R any, T TypedDNSAction[R]](ctx context.Context, action T, opts ...OptionHandler) (*R, error) { var o option o.timeout = DefaultDNSTimeout o.reply = true @@ -345,43 +317,27 @@ func SendDNSAction[R any, T TypedDNSAction[R]](ctx context.Context, action T, op return nil, ErrTransportRequired } - id := action.GetID() - if id == "" { - return nil, errors.New("message ID not found") + if o.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, o.timeout) + defer cancel() } - var sub Subscriber - - if o.reply { - action.SetReply("aether:response:" + action.GetAction() + ":" + id) - sub = o.transport.Subscribe(ctx, action.GetReply()) - defer sub.Close() + if !o.reply { + return nil, o.transport.PublishAsync(ctx, action) } - if err := o.transport.Publish(ctx, "aether:request:"+action.GetAction()+":"+id, []byte(cstr.JSONStringify(action))); err != nil { + responseBytes, err := o.transport.Publish(ctx, action) + if err != nil { return nil, err } - if o.reply { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case msg := <-sub.Channel(): - if msg == nil { - return nil, ErrClosed - } - var response DNSResponse[R] - if err := json.Unmarshal([]byte(msg.Payload), &response); err != nil { - return nil, fmt.Errorf("failed to unmarshal dns action response: %w", err) - } - if !response.Success { - return nil, errors.New(response.Error) - } - return response.Data, nil - case <-time.After(o.timeout): - return nil, ErrTimeout - } + var response DNSResponse[R] + if err := json.Unmarshal(responseBytes, &response); err != nil { + return nil, fmt.Errorf("failed to unmarshal dns action response: %w", err) } - - return nil, nil + if !response.Success { + return nil, errors.New(response.Error) + } + return response.Data, nil } diff --git a/dns/aether_test.go b/dns/aether_test.go index f68bda3..9094e3e 100644 --- a/dns/aether_test.go +++ b/dns/aether_test.go @@ -11,53 +11,42 @@ import ( ) type testTransport struct { - publish chan *Message - subscribe chan *Message + lastAction DNSAction } -var _ Transport = (*testTransport)(nil) - -func (t *testTransport) Subscribe(ctx context.Context, channel string) Subscriber { - t.subscribe = make(chan *Message, 1) - t.publish = make(chan *Message, 1) - return &testSubscriber{channel: channel, messages: t.subscribe} -} - -func (t *testTransport) Publish(ctx context.Context, channel string, payload []byte) error { - t.publish <- &Message{Payload: payload} - var cert DNSCert - cert.Certificate = []byte("cert") - cert.Expires = time.Now().Add(time.Hour * 24 * 365 * 2) - cert.PrivateKey = []byte("private") - var response DNSResponse[DNSCert] - response.Success = true - response.Data = &cert - response.MsgID = uuid.New().String() - response.Error = "" - response.Data = &cert - responseBytes, err := json.Marshal(response) - if err != nil { - return err +var _ transport = (*testTransport)(nil) + +func (t *testTransport) Publish(ctx context.Context, action DNSAction) ([]byte, error) { + t.lastAction = action + + // Return different responses based on action type + switch action.(type) { + case *DNSCertAction: + var cert DNSCert + cert.Certificate = []byte("cert") + cert.Expires = time.Now().Add(time.Hour * 24 * 365 * 2) + cert.PrivateKey = []byte("private") + var response DNSResponse[DNSCert] + response.Success = true + response.Data = &cert + return json.Marshal(response) + case *DNSAddAction: + var record DNSRecord + record.IDs = []string{uuid.New().String()} + var response DNSResponse[DNSRecord] + response.Success = true + response.Data = &record + return json.Marshal(response) + default: + return nil, nil } - t.subscribe <- &Message{Payload: responseBytes} - return nil } -type testSubscriber struct { - channel string - messages chan *Message -} - -var _ Subscriber = (*testSubscriber)(nil) - -func (s *testSubscriber) Close() error { +func (t *testTransport) PublishAsync(ctx context.Context, action DNSAction) error { + t.lastAction = action return nil } -func (s *testSubscriber) Channel() <-chan *Message { - return s.messages -} - func TestDNSAction(t *testing.T) { var transport testTransport @@ -69,7 +58,7 @@ func TestDNSAction(t *testing.T) { Name: "test", } - reply, err := SendDNSAction(context.Background(), action, WithTransport(&transport), WithTimeout(time.Second)) + reply, err := SendDNSAction(context.Background(), action, withTransport(&transport), WithTimeout(time.Second)) if err != nil { t.Fatalf("failed to send dns action: %v", err) } @@ -89,7 +78,7 @@ func TestDNSCertAction(t *testing.T) { Name: "test", } - reply, err := SendDNSAction(context.Background(), action, WithTransport(&transport), WithTimeout(time.Second)) + reply, err := SendDNSAction(context.Background(), action, withTransport(&transport), WithTimeout(time.Second)) if err != nil { t.Fatalf("failed to send dns cert action: %v", err) } diff --git a/dns/grpc_adapter.go b/dns/grpc_adapter.go new file mode 100644 index 0000000..68da1e2 --- /dev/null +++ b/dns/grpc_adapter.go @@ -0,0 +1,114 @@ +package dns + +import ( + "time" + + pb "github.com/agentuity/go-common/dns/proto" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func (a *DNSAddAction) ToProto() *pb.AddRequest { + return &pb.AddRequest{ + Name: a.Name, + Type: a.Type, + Value: a.Value, + TtlSeconds: int64(a.TTL.Seconds()), + ExpiresSeconds: int64(a.Expires.Seconds()), + Priority: int32(a.Priority), + Weight: int32(a.Weight), + Port: int32(a.Port), + } +} + +func (a *DNSDeleteAction) ToProto() *pb.DeleteRequest { + return &pb.DeleteRequest{ + Name: a.Name, + Ids: a.IDs, + } +} + +func (a *DNSCertAction) ToProto() *pb.CertRequest { + return &pb.CertRequest{ + Name: a.Name, + } +} + +func FromProtoAddResponse(resp *pb.AddResponse) (*DNSRecord, error) { + if !resp.Success { + return nil, &DNSError{Message: resp.Error} + } + return &DNSRecord{IDs: []string{resp.Id}}, nil +} + +func FromProtoDeleteResponse(resp *pb.DeleteResponse) error { + if !resp.Success { + return &DNSError{Message: resp.Error} + } + return nil +} + +func FromProtoCertResponse(resp *pb.CertResponse) (*DNSCert, error) { + if !resp.Success { + return nil, &DNSError{Message: resp.Error} + } + var expires time.Time + if resp.Expires != nil { + expires = resp.Expires.AsTime() + } + return &DNSCert{ + Certificate: resp.Certificate, + PrivateKey: resp.PrivateKey, + Expires: expires, + Domain: resp.Domain, + }, nil +} + +type DNSError struct { + Message string +} + +func (e *DNSError) Error() string { + return e.Message +} + +func ToProtoAddResponse(record *DNSRecord, err error) *pb.AddResponse { + resp := &pb.AddResponse{} + if err != nil { + resp.Success = false + resp.Error = err.Error() + } else { + resp.Success = true + if record != nil && len(record.IDs) > 0 { + resp.Id = record.IDs[0] + } + } + return resp +} + +func ToProtoDeleteResponse(err error) *pb.DeleteResponse { + resp := &pb.DeleteResponse{} + if err != nil { + resp.Success = false + resp.Error = err.Error() + } else { + resp.Success = true + } + return resp +} + +func ToProtoCertResponse(cert *DNSCert, err error) *pb.CertResponse { + resp := &pb.CertResponse{} + if err != nil { + resp.Success = false + resp.Error = err.Error() + } else { + resp.Success = true + if cert != nil { + resp.Certificate = cert.Certificate + resp.PrivateKey = cert.PrivateKey + resp.Expires = timestamppb.New(cert.Expires) + resp.Domain = cert.Domain + } + } + return resp +} diff --git a/dns/grpc_transport.go b/dns/grpc_transport.go new file mode 100644 index 0000000..502fd83 --- /dev/null +++ b/dns/grpc_transport.go @@ -0,0 +1,103 @@ +package dns + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "net/url" + + pb "github.com/agentuity/go-common/dns/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +type grpcTransport struct { + client pb.DNSServiceClient +} + +var _ transport = (*grpcTransport)(nil) + +func (t *grpcTransport) Publish(ctx context.Context, action DNSAction) ([]byte, error) { + switch a := action.(type) { + case *DNSAddAction: + resp, err := t.client.Add(ctx, a.ToProto()) + if err != nil { + return nil, err + } + record, err := FromProtoAddResponse(resp) + response := NewDNSResponse(a, record, err) + return json.Marshal(response) + + case *DNSDeleteAction: + resp, err := t.client.Delete(ctx, a.ToProto()) + if err != nil { + return nil, err + } + deleteErr := FromProtoDeleteResponse(resp) + var emptyString string + response := NewDNSResponse(a, &emptyString, deleteErr) + return json.Marshal(response) + + case *DNSCertAction: + resp, err := t.client.RequestCert(ctx, a.ToProto()) + if err != nil { + return nil, err + } + cert, err := FromProtoCertResponse(resp) + response := NewDNSResponse(a, cert, err) + return json.Marshal(response) + + default: + return nil, fmt.Errorf("unsupported action type: %T", action) + } +} + +func (t *grpcTransport) PublishAsync(ctx context.Context, action DNSAction) error { + switch a := action.(type) { + case *DNSAddAction: + _, err := t.client.Add(ctx, a.ToProto()) + return err + + case *DNSDeleteAction: + _, err := t.client.Delete(ctx, a.ToProto()) + return err + + case *DNSCertAction: + _, err := t.client.RequestCert(ctx, a.ToProto()) + return err + + default: + return fmt.Errorf("unsupported action type: %T", action) + } +} + +// NewDNSServiceClient creates a new gRPC DNS service client. +// The caller is responsible for closing the returned connection. +// If tlsCert is nil, insecure credentials will be used unless opts are provided. +func NewDNSServiceClient(ctx context.Context, address string, tlsCert *tls.Certificate, opts ...grpc.DialOption) (pb.DNSServiceClient, *grpc.ClientConn, error) { + if tlsCert != nil { + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{*tlsCert}, + MinVersion: tls.VersionTLS13, + } + opts = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))} + } + if len(opts) == 0 { + opts = []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + } + + u, err := url.Parse(address) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse address: %w", err) + } + target := u.Host + conn, err := grpc.NewClient(target, opts...) + if err != nil { + return nil, nil, fmt.Errorf("failed to create gRPC client: %w", err) + } + + client := pb.NewDNSServiceClient(conn) + return client, conn, nil +} diff --git a/dns/proto/README.md b/dns/proto/README.md new file mode 100644 index 0000000..bd2df3d --- /dev/null +++ b/dns/proto/README.md @@ -0,0 +1,38 @@ +# DNS Protocol Buffers + +This directory contains the Protocol Buffer definitions for the DNS service. + +## Module Path + +Generated Go code uses the import path: `github.com/agentuity/go-common/dns/proto` + +## Files + +- `dns.proto` - DNS service definitions +- `dns.pb.go` - Generated Go message types +- `dns_grpc.pb.go` - Generated gRPC service stubs + +## Service + +The `DNSService` provides three RPC methods: +- `Add` - Add a DNS record +- `Delete` - Delete DNS records +- `RequestCert` - Request a certificate for a domain + +## Regenerating Code + +From the repository root: + +```bash +make gen-protoc +``` + +Or manually from this directory: + +```bash +protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + dns.proto +``` + +The generated code should be committed to version control. diff --git a/dns/proto/dns.pb.go b/dns/proto/dns.pb.go new file mode 100644 index 0000000..9af78ac --- /dev/null +++ b/dns/proto/dns.pb.go @@ -0,0 +1,532 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.6 +// protoc v5.29.3 +// source: dns.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type AddRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Type string `protobuf:"bytes,2,opt,name=type,proto3" json:"type,omitempty"` + Value string `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` + // TTL in seconds (converted from time.Duration) + TtlSeconds int64 `protobuf:"varint,4,opt,name=ttl_seconds,json=ttlSeconds,proto3" json:"ttl_seconds,omitempty"` + // Expires in seconds (converted from time.Duration) + ExpiresSeconds int64 `protobuf:"varint,5,opt,name=expires_seconds,json=expiresSeconds,proto3" json:"expires_seconds,omitempty"` + // Priority for MX and SRV records + Priority int32 `protobuf:"varint,6,opt,name=priority,proto3" json:"priority,omitempty"` + // Weight for SRV records + Weight int32 `protobuf:"varint,7,opt,name=weight,proto3" json:"weight,omitempty"` + // Port for SRV records + Port int32 `protobuf:"varint,8,opt,name=port,proto3" json:"port,omitempty"` + Comment string `protobuf:"bytes,9,opt,name=comment,proto3" json:"comment,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AddRequest) Reset() { + *x = AddRequest{} + mi := &file_dns_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AddRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AddRequest) ProtoMessage() {} + +func (x *AddRequest) ProtoReflect() protoreflect.Message { + mi := &file_dns_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AddRequest.ProtoReflect.Descriptor instead. +func (*AddRequest) Descriptor() ([]byte, []int) { + return file_dns_proto_rawDescGZIP(), []int{0} +} + +func (x *AddRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *AddRequest) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *AddRequest) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +func (x *AddRequest) GetTtlSeconds() int64 { + if x != nil { + return x.TtlSeconds + } + return 0 +} + +func (x *AddRequest) GetExpiresSeconds() int64 { + if x != nil { + return x.ExpiresSeconds + } + return 0 +} + +func (x *AddRequest) GetPriority() int32 { + if x != nil { + return x.Priority + } + return 0 +} + +func (x *AddRequest) GetWeight() int32 { + if x != nil { + return x.Weight + } + return 0 +} + +func (x *AddRequest) GetPort() int32 { + if x != nil { + return x.Port + } + return 0 +} + +func (x *AddRequest) GetComment() string { + if x != nil { + return x.Comment + } + return "" +} + +type AddResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AddResponse) Reset() { + *x = AddResponse{} + mi := &file_dns_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AddResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AddResponse) ProtoMessage() {} + +func (x *AddResponse) ProtoReflect() protoreflect.Message { + mi := &file_dns_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AddResponse.ProtoReflect.Descriptor instead. +func (*AddResponse) Descriptor() ([]byte, []int) { + return file_dns_proto_rawDescGZIP(), []int{1} +} + +func (x *AddResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *AddResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *AddResponse) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +type DeleteRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Ids []string `protobuf:"bytes,2,rep,name=ids,proto3" json:"ids,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DeleteRequest) Reset() { + *x = DeleteRequest{} + mi := &file_dns_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DeleteRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeleteRequest) ProtoMessage() {} + +func (x *DeleteRequest) ProtoReflect() protoreflect.Message { + mi := &file_dns_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeleteRequest.ProtoReflect.Descriptor instead. +func (*DeleteRequest) Descriptor() ([]byte, []int) { + return file_dns_proto_rawDescGZIP(), []int{2} +} + +func (x *DeleteRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *DeleteRequest) GetIds() []string { + if x != nil { + return x.Ids + } + return nil +} + +type DeleteResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DeleteResponse) Reset() { + *x = DeleteResponse{} + mi := &file_dns_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DeleteResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeleteResponse) ProtoMessage() {} + +func (x *DeleteResponse) ProtoReflect() protoreflect.Message { + mi := &file_dns_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeleteResponse.ProtoReflect.Descriptor instead. +func (*DeleteResponse) Descriptor() ([]byte, []int) { + return file_dns_proto_rawDescGZIP(), []int{3} +} + +func (x *DeleteResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *DeleteResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +type CertRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CertRequest) Reset() { + *x = CertRequest{} + mi := &file_dns_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CertRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CertRequest) ProtoMessage() {} + +func (x *CertRequest) ProtoReflect() protoreflect.Message { + mi := &file_dns_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CertRequest.ProtoReflect.Descriptor instead. +func (*CertRequest) Descriptor() ([]byte, []int) { + return file_dns_proto_rawDescGZIP(), []int{4} +} + +func (x *CertRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +type CertResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + Certificate []byte `protobuf:"bytes,3,opt,name=certificate,proto3" json:"certificate,omitempty"` + PrivateKey []byte `protobuf:"bytes,4,opt,name=private_key,json=privateKey,proto3" json:"private_key,omitempty"` + Expires *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=expires,proto3" json:"expires,omitempty"` + Domain string `protobuf:"bytes,6,opt,name=domain,proto3" json:"domain,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CertResponse) Reset() { + *x = CertResponse{} + mi := &file_dns_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CertResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CertResponse) ProtoMessage() {} + +func (x *CertResponse) ProtoReflect() protoreflect.Message { + mi := &file_dns_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CertResponse.ProtoReflect.Descriptor instead. +func (*CertResponse) Descriptor() ([]byte, []int) { + return file_dns_proto_rawDescGZIP(), []int{5} +} + +func (x *CertResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *CertResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *CertResponse) GetCertificate() []byte { + if x != nil { + return x.Certificate + } + return nil +} + +func (x *CertResponse) GetPrivateKey() []byte { + if x != nil { + return x.PrivateKey + } + return nil +} + +func (x *CertResponse) GetExpires() *timestamppb.Timestamp { + if x != nil { + return x.Expires + } + return nil +} + +func (x *CertResponse) GetDomain() string { + if x != nil { + return x.Domain + } + return "" +} + +var File_dns_proto protoreflect.FileDescriptor + +const file_dns_proto_rawDesc = "" + + "\n" + + "\tdns.proto\x12\x03dns\x1a\x1fgoogle/protobuf/timestamp.proto\"\xf6\x01\n" + + "\n" + + "AddRequest\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x12\n" + + "\x04type\x18\x02 \x01(\tR\x04type\x12\x14\n" + + "\x05value\x18\x03 \x01(\tR\x05value\x12\x1f\n" + + "\vttl_seconds\x18\x04 \x01(\x03R\n" + + "ttlSeconds\x12'\n" + + "\x0fexpires_seconds\x18\x05 \x01(\x03R\x0eexpiresSeconds\x12\x1a\n" + + "\bpriority\x18\x06 \x01(\x05R\bpriority\x12\x16\n" + + "\x06weight\x18\a \x01(\x05R\x06weight\x12\x12\n" + + "\x04port\x18\b \x01(\x05R\x04port\x12\x18\n" + + "\acomment\x18\t \x01(\tR\acomment\"M\n" + + "\vAddResponse\x12\x18\n" + + "\asuccess\x18\x01 \x01(\bR\asuccess\x12\x14\n" + + "\x05error\x18\x02 \x01(\tR\x05error\x12\x0e\n" + + "\x02id\x18\x03 \x01(\tR\x02id\"5\n" + + "\rDeleteRequest\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x10\n" + + "\x03ids\x18\x02 \x03(\tR\x03ids\"@\n" + + "\x0eDeleteResponse\x12\x18\n" + + "\asuccess\x18\x01 \x01(\bR\asuccess\x12\x14\n" + + "\x05error\x18\x02 \x01(\tR\x05error\"!\n" + + "\vCertRequest\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\"\xcf\x01\n" + + "\fCertResponse\x12\x18\n" + + "\asuccess\x18\x01 \x01(\bR\asuccess\x12\x14\n" + + "\x05error\x18\x02 \x01(\tR\x05error\x12 \n" + + "\vcertificate\x18\x03 \x01(\fR\vcertificate\x12\x1f\n" + + "\vprivate_key\x18\x04 \x01(\fR\n" + + "privateKey\x124\n" + + "\aexpires\x18\x05 \x01(\v2\x1a.google.protobuf.TimestampR\aexpires\x12\x16\n" + + "\x06domain\x18\x06 \x01(\tR\x06domain2\x9d\x01\n" + + "\n" + + "DNSService\x12(\n" + + "\x03Add\x12\x0f.dns.AddRequest\x1a\x10.dns.AddResponse\x121\n" + + "\x06Delete\x12\x12.dns.DeleteRequest\x1a\x13.dns.DeleteResponse\x122\n" + + "\vRequestCert\x12\x10.dns.CertRequest\x1a\x11.dns.CertResponseB*Z(github.com/agentuity/go-common/dns/protob\x06proto3" + +var ( + file_dns_proto_rawDescOnce sync.Once + file_dns_proto_rawDescData []byte +) + +func file_dns_proto_rawDescGZIP() []byte { + file_dns_proto_rawDescOnce.Do(func() { + file_dns_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_dns_proto_rawDesc), len(file_dns_proto_rawDesc))) + }) + return file_dns_proto_rawDescData +} + +var file_dns_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_dns_proto_goTypes = []any{ + (*AddRequest)(nil), // 0: dns.AddRequest + (*AddResponse)(nil), // 1: dns.AddResponse + (*DeleteRequest)(nil), // 2: dns.DeleteRequest + (*DeleteResponse)(nil), // 3: dns.DeleteResponse + (*CertRequest)(nil), // 4: dns.CertRequest + (*CertResponse)(nil), // 5: dns.CertResponse + (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp +} +var file_dns_proto_depIdxs = []int32{ + 6, // 0: dns.CertResponse.expires:type_name -> google.protobuf.Timestamp + 0, // 1: dns.DNSService.Add:input_type -> dns.AddRequest + 2, // 2: dns.DNSService.Delete:input_type -> dns.DeleteRequest + 4, // 3: dns.DNSService.RequestCert:input_type -> dns.CertRequest + 1, // 4: dns.DNSService.Add:output_type -> dns.AddResponse + 3, // 5: dns.DNSService.Delete:output_type -> dns.DeleteResponse + 5, // 6: dns.DNSService.RequestCert:output_type -> dns.CertResponse + 4, // [4:7] is the sub-list for method output_type + 1, // [1:4] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_dns_proto_init() } +func file_dns_proto_init() { + if File_dns_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_dns_proto_rawDesc), len(file_dns_proto_rawDesc)), + NumEnums: 0, + NumMessages: 6, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_dns_proto_goTypes, + DependencyIndexes: file_dns_proto_depIdxs, + MessageInfos: file_dns_proto_msgTypes, + }.Build() + File_dns_proto = out.File + file_dns_proto_goTypes = nil + file_dns_proto_depIdxs = nil +} diff --git a/dns/proto/dns.proto b/dns/proto/dns.proto new file mode 100644 index 0000000..a82fbb2 --- /dev/null +++ b/dns/proto/dns.proto @@ -0,0 +1,64 @@ +syntax = "proto3"; + +package dns; + +option go_package = "github.com/agentuity/go-common/dns/proto"; + +import "google/protobuf/timestamp.proto"; + +// DNSService handles DNS record management and certificate requests +service DNSService { + // Add a DNS record + rpc Add(AddRequest) returns (AddResponse); + // Delete DNS records + rpc Delete(DeleteRequest) returns (DeleteResponse); + // Request a certificate for a domain + rpc RequestCert(CertRequest) returns (CertResponse); +} + +message AddRequest { + string name = 1; + string type = 2; + string value = 3; + // TTL in seconds (converted from time.Duration) + int64 ttl_seconds = 4; + // Expires in seconds (converted from time.Duration) + int64 expires_seconds = 5; + // Priority for MX and SRV records + int32 priority = 6; + // Weight for SRV records + int32 weight = 7; + // Port for SRV records + int32 port = 8; + + string comment = 9; +} + +message AddResponse { + bool success = 1; + string error = 2; + string id = 3; +} + +message DeleteRequest { + string name = 1; + repeated string ids = 2; +} + +message DeleteResponse { + bool success = 1; + string error = 2; +} + +message CertRequest { + string name = 1; +} + +message CertResponse { + bool success = 1; + string error = 2; + bytes certificate = 3; + bytes private_key = 4; + google.protobuf.Timestamp expires = 5; + string domain = 6; +} diff --git a/dns/proto/dns_grpc.pb.go b/dns/proto/dns_grpc.pb.go new file mode 100644 index 0000000..710134c --- /dev/null +++ b/dns/proto/dns_grpc.pb.go @@ -0,0 +1,207 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v5.29.3 +// source: dns.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + DNSService_Add_FullMethodName = "/dns.DNSService/Add" + DNSService_Delete_FullMethodName = "/dns.DNSService/Delete" + DNSService_RequestCert_FullMethodName = "/dns.DNSService/RequestCert" +) + +// DNSServiceClient is the client API for DNSService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// DNSService handles DNS record management and certificate requests +type DNSServiceClient interface { + // Add a DNS record + Add(ctx context.Context, in *AddRequest, opts ...grpc.CallOption) (*AddResponse, error) + // Delete DNS records + Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) + // Request a certificate for a domain + RequestCert(ctx context.Context, in *CertRequest, opts ...grpc.CallOption) (*CertResponse, error) +} + +type dNSServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewDNSServiceClient(cc grpc.ClientConnInterface) DNSServiceClient { + return &dNSServiceClient{cc} +} + +func (c *dNSServiceClient) Add(ctx context.Context, in *AddRequest, opts ...grpc.CallOption) (*AddResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(AddResponse) + err := c.cc.Invoke(ctx, DNSService_Add_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dNSServiceClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(DeleteResponse) + err := c.cc.Invoke(ctx, DNSService_Delete_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dNSServiceClient) RequestCert(ctx context.Context, in *CertRequest, opts ...grpc.CallOption) (*CertResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(CertResponse) + err := c.cc.Invoke(ctx, DNSService_RequestCert_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// DNSServiceServer is the server API for DNSService service. +// All implementations must embed UnimplementedDNSServiceServer +// for forward compatibility. +// +// DNSService handles DNS record management and certificate requests +type DNSServiceServer interface { + // Add a DNS record + Add(context.Context, *AddRequest) (*AddResponse, error) + // Delete DNS records + Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) + // Request a certificate for a domain + RequestCert(context.Context, *CertRequest) (*CertResponse, error) + mustEmbedUnimplementedDNSServiceServer() +} + +// UnimplementedDNSServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedDNSServiceServer struct{} + +func (UnimplementedDNSServiceServer) Add(context.Context, *AddRequest) (*AddResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Add not implemented") +} +func (UnimplementedDNSServiceServer) Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented") +} +func (UnimplementedDNSServiceServer) RequestCert(context.Context, *CertRequest) (*CertResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RequestCert not implemented") +} +func (UnimplementedDNSServiceServer) mustEmbedUnimplementedDNSServiceServer() {} +func (UnimplementedDNSServiceServer) testEmbeddedByValue() {} + +// UnsafeDNSServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DNSServiceServer will +// result in compilation errors. +type UnsafeDNSServiceServer interface { + mustEmbedUnimplementedDNSServiceServer() +} + +func RegisterDNSServiceServer(s grpc.ServiceRegistrar, srv DNSServiceServer) { + // If the following call pancis, it indicates UnimplementedDNSServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&DNSService_ServiceDesc, srv) +} + +func _DNSService_Add_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AddRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DNSServiceServer).Add(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DNSService_Add_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DNSServiceServer).Add(ctx, req.(*AddRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _DNSService_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DeleteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DNSServiceServer).Delete(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DNSService_Delete_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DNSServiceServer).Delete(ctx, req.(*DeleteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _DNSService_RequestCert_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CertRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DNSServiceServer).RequestCert(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DNSService_RequestCert_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DNSServiceServer).RequestCert(ctx, req.(*CertRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// DNSService_ServiceDesc is the grpc.ServiceDesc for DNSService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var DNSService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "dns.DNSService", + HandlerType: (*DNSServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Add", + Handler: _DNSService_Add_Handler, + }, + { + MethodName: "Delete", + Handler: _DNSService_Delete_Handler, + }, + { + MethodName: "RequestCert", + Handler: _DNSService_RequestCert_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "dns.proto", +} diff --git a/dns/redis_transport.go b/dns/redis_transport.go new file mode 100644 index 0000000..9477107 --- /dev/null +++ b/dns/redis_transport.go @@ -0,0 +1,51 @@ +package dns + +import ( + "context" + "fmt" + + cstr "github.com/agentuity/go-common/string" + "github.com/redis/go-redis/v9" +) + +type redisTransport struct { + redis *redis.Client +} + +var _ transport = (*redisTransport)(nil) + +func (t *redisTransport) Publish(ctx context.Context, action DNSAction) ([]byte, error) { + id := action.GetID() + if id == "" { + return nil, fmt.Errorf("message ID not found") + } + + replyChannel := "aether:response:" + action.GetAction() + ":" + id + action.SetReply(replyChannel) + + sub := t.redis.Subscribe(ctx, replyChannel) + defer sub.Close() + + if err := t.redis.Publish(ctx, "aether:request:"+action.GetAction()+":"+id, cstr.JSONStringify(action)).Err(); err != nil { + return nil, err + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case msg := <-sub.Channel(): + if msg == nil { + return nil, ErrClosed + } + return []byte(msg.Payload), nil + } +} + +func (t *redisTransport) PublishAsync(ctx context.Context, action DNSAction) error { + id := action.GetID() + if id == "" { + return fmt.Errorf("message ID not found") + } + + return t.redis.Publish(ctx, "aether:request:"+action.GetAction()+":"+id, cstr.JSONStringify(action)).Err() +}