From e9bf62e77214a1d603ec586f837afbf721ca0506 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Tue, 13 Jan 2026 21:08:58 -0500 Subject: [PATCH 1/2] remove deprecated relay logic --- endpoint.go | 12 +- model/protos.go | 6 +- proto/client.proto | 7 - proto/pbclient/client.pb.go | 172 ++++--------- proto/pbrelay/relay.pb.go | 486 ++---------------------------------- proto/relay.proto | 40 --- relay.go | 51 +--- server/control/clients.go | 70 +----- server/control/relays.go | 451 +-------------------------------- server/control/secrets.go | 58 ----- server/control/server.go | 2 +- server/control/store.go | 134 ---------- server/relay/clients.go | 71 +----- server/relay/control.go | 481 +---------------------------------- server/relay/server.go | 4 +- server/relay/store.go | 8 +- 16 files changed, 120 insertions(+), 1933 deletions(-) diff --git a/endpoint.go b/endpoint.go index a37358d..0ab2bd7 100644 --- a/endpoint.go +++ b/endpoint.go @@ -283,17 +283,7 @@ func (ep *endpoint) runRelay(ctx context.Context, conn *quic.Conn) error { return fmt.Errorf("relay unexpected response") } - var relays []*pbclient.DirectRelay - relays = append(relays, iterc.MapSlice(resp.Relay.Relays, func(relay *pbclient.Relay) *pbclient.DirectRelay { - return &pbclient.DirectRelay{ - Id: relay.Id, - Addresses: relay.Addresses, - ServerCertificate: relay.ServerCertificate, - } - })...) - relays = append(relays, resp.Relay.Directs...) - - ep.peer.setRelays(relays) + ep.peer.setRelays(resp.Relay.Directs) } }) diff --git a/model/protos.go b/model/protos.go index 7c9d716..cf856df 100644 --- a/model/protos.go +++ b/model/protos.go @@ -48,7 +48,7 @@ func (v ConnectRelayNextProto) String() string { func GetConnectRelayNextProto(conn *quic.Conn) ConnectRelayNextProto { proto := conn.ConnectionState().TLS.NegotiatedProtocol - for _, v := range []ConnectRelayNextProto{ConnectRelayV02, ConnectRelayV01} { + for _, v := range []ConnectRelayNextProto{ConnectRelayV02} { if v.string == proto { return v } @@ -58,7 +58,6 @@ func GetConnectRelayNextProto(conn *quic.Conn) ConnectRelayNextProto { var ( ConnectRelayUnknown = ConnectRelayNextProto{} - ConnectRelayV01 = ConnectRelayNextProto{"connet-peer-relay/0.1"} // 0.7.0 ConnectRelayV02 = ConnectRelayNextProto{"connet-peer-relay/0.2"} // 0.13.0 ) @@ -71,7 +70,7 @@ func (v RelayControlNextProto) String() string { func GetRelayControlNextProto(conn *quic.Conn) RelayControlNextProto { proto := conn.ConnectionState().TLS.NegotiatedProtocol - for _, v := range []RelayControlNextProto{RelayControlV03, RelayControlV02} { + for _, v := range []RelayControlNextProto{RelayControlV03} { if v.string == proto { return v } @@ -81,7 +80,6 @@ func GetRelayControlNextProto(conn *quic.Conn) RelayControlNextProto { var ( RelayControlUnknown = RelayControlNextProto{} - RelayControlV02 = RelayControlNextProto{"connet-relay/0.2"} // 0.8.0 RelayControlV03 = RelayControlNextProto{"connet-relay/0.3"} // 0.13.0 // Update GetRelayControlNextProto when adding a new one ) diff --git a/proto/client.proto b/proto/client.proto index 600a870..41d12da 100644 --- a/proto/client.proto +++ b/proto/client.proto @@ -48,7 +48,6 @@ message Response { repeated RemotePeer peers = 1; } message Relays { - repeated Relay relays = 1; repeated DirectRelay directs = 2; } } @@ -66,12 +65,6 @@ message RemotePeer { Peer peer = 8; } -message Relay { - string id = 3; - repeated model.HostPort addresses = 4; - bytes server_certificate = 2; // endpoint specific certificate to be used by the client -} - message DirectRelay { string id = 1; // relay id as assigned by the control server repeated model.HostPort addresses = 2; diff --git a/proto/pbclient/client.pb.go b/proto/pbclient/client.pb.go index 64f8dc0..4381e9c 100644 --- a/proto/pbclient/client.pb.go +++ b/proto/pbclient/client.pb.go @@ -393,66 +393,6 @@ func (x *RemotePeer) GetPeer() *Peer { return nil } -type Relay struct { - state protoimpl.MessageState `protogen:"open.v1"` - Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` - Addresses []*pbmodel.HostPort `protobuf:"bytes,4,rep,name=addresses,proto3" json:"addresses,omitempty"` - ServerCertificate []byte `protobuf:"bytes,2,opt,name=server_certificate,json=serverCertificate,proto3" json:"server_certificate,omitempty"` // endpoint specific certificate to be used by the client - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *Relay) Reset() { - *x = Relay{} - mi := &file_client_proto_msgTypes[6] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *Relay) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Relay) ProtoMessage() {} - -func (x *Relay) ProtoReflect() protoreflect.Message { - mi := &file_client_proto_msgTypes[6] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Relay.ProtoReflect.Descriptor instead. -func (*Relay) Descriptor() ([]byte, []int) { - return file_client_proto_rawDescGZIP(), []int{6} -} - -func (x *Relay) GetId() string { - if x != nil { - return x.Id - } - return "" -} - -func (x *Relay) GetAddresses() []*pbmodel.HostPort { - if x != nil { - return x.Addresses - } - return nil -} - -func (x *Relay) GetServerCertificate() []byte { - if x != nil { - return x.ServerCertificate - } - return nil -} - type DirectRelay struct { state protoimpl.MessageState `protogen:"open.v1"` Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // relay id as assigned by the control server @@ -466,7 +406,7 @@ type DirectRelay struct { func (x *DirectRelay) Reset() { *x = DirectRelay{} - mi := &file_client_proto_msgTypes[7] + mi := &file_client_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -478,7 +418,7 @@ func (x *DirectRelay) String() string { func (*DirectRelay) ProtoMessage() {} func (x *DirectRelay) ProtoReflect() protoreflect.Message { - mi := &file_client_proto_msgTypes[7] + mi := &file_client_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -491,7 +431,7 @@ func (x *DirectRelay) ProtoReflect() protoreflect.Message { // Deprecated: Use DirectRelay.ProtoReflect.Descriptor instead. func (*DirectRelay) Descriptor() ([]byte, []int) { - return file_client_proto_rawDescGZIP(), []int{7} + return file_client_proto_rawDescGZIP(), []int{6} } func (x *DirectRelay) GetId() string { @@ -540,7 +480,7 @@ type Request_Announce struct { func (x *Request_Announce) Reset() { *x = Request_Announce{} - mi := &file_client_proto_msgTypes[8] + mi := &file_client_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -552,7 +492,7 @@ func (x *Request_Announce) String() string { func (*Request_Announce) ProtoMessage() {} func (x *Request_Announce) ProtoReflect() protoreflect.Message { - mi := &file_client_proto_msgTypes[8] + mi := &file_client_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -600,7 +540,7 @@ type Request_Relay struct { func (x *Request_Relay) Reset() { *x = Request_Relay{} - mi := &file_client_proto_msgTypes[9] + mi := &file_client_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -612,7 +552,7 @@ func (x *Request_Relay) String() string { func (*Request_Relay) ProtoMessage() {} func (x *Request_Relay) ProtoReflect() protoreflect.Message { - mi := &file_client_proto_msgTypes[9] + mi := &file_client_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -658,7 +598,7 @@ type Response_Announce struct { func (x *Response_Announce) Reset() { *x = Response_Announce{} - mi := &file_client_proto_msgTypes[10] + mi := &file_client_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -670,7 +610,7 @@ func (x *Response_Announce) String() string { func (*Response_Announce) ProtoMessage() {} func (x *Response_Announce) ProtoReflect() protoreflect.Message { - mi := &file_client_proto_msgTypes[10] + mi := &file_client_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -695,7 +635,6 @@ func (x *Response_Announce) GetPeers() []*RemotePeer { type Response_Relays struct { state protoimpl.MessageState `protogen:"open.v1"` - Relays []*Relay `protobuf:"bytes,1,rep,name=relays,proto3" json:"relays,omitempty"` Directs []*DirectRelay `protobuf:"bytes,2,rep,name=directs,proto3" json:"directs,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -703,7 +642,7 @@ type Response_Relays struct { func (x *Response_Relays) Reset() { *x = Response_Relays{} - mi := &file_client_proto_msgTypes[11] + mi := &file_client_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -715,7 +654,7 @@ func (x *Response_Relays) String() string { func (*Response_Relays) ProtoMessage() {} func (x *Response_Relays) ProtoReflect() protoreflect.Message { - mi := &file_client_proto_msgTypes[11] + mi := &file_client_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -731,13 +670,6 @@ func (*Response_Relays) Descriptor() ([]byte, []int) { return file_client_proto_rawDescGZIP(), []int{3, 1} } -func (x *Response_Relays) GetRelays() []*Relay { - if x != nil { - return x.Relays - } - return nil -} - func (x *Response_Relays) GetDirects() []*DirectRelay { if x != nil { return x.Directs @@ -769,15 +701,14 @@ const file_client_proto_rawDesc = "" + "\x05Relay\x12+\n" + "\bendpoint\x18\x01 \x01(\v2\x0f.model.EndpointR\bendpoint\x12\x1f\n" + "\x04role\x18\x02 \x01(\x0e2\v.model.RoleR\x04role\x12-\n" + - "\x12client_certificate\x18\x03 \x01(\fR\x11clientCertificate\"\xaa\x02\n" + + "\x12client_certificate\x18\x03 \x01(\fR\x11clientCertificate\"\x83\x02\n" + "\bResponse\x12\"\n" + "\x05error\x18\x01 \x01(\v2\f.error.ErrorR\x05error\x125\n" + "\bannounce\x18\x02 \x01(\v2\x19.client.Response.AnnounceR\bannounce\x12-\n" + "\x05relay\x18\x03 \x01(\v2\x17.client.Response.RelaysR\x05relay\x1a4\n" + "\bAnnounce\x12(\n" + - "\x05peers\x18\x01 \x03(\v2\x12.client.RemotePeerR\x05peers\x1a^\n" + - "\x06Relays\x12%\n" + - "\x06relays\x18\x01 \x03(\v2\r.client.RelayR\x06relays\x12-\n" + + "\x05peers\x18\x01 \x03(\v2\x12.client.RemotePeerR\x05peers\x1a7\n" + + "\x06Relays\x12-\n" + "\adirects\x18\x02 \x03(\v2\x13.client.DirectRelayR\adirects\"\xac\x01\n" + "\x04Peer\x12)\n" + "\adirects\x18\x03 \x03(\v2\x0f.model.AddrPortR\adirects\x12\x1b\n" + @@ -788,11 +719,7 @@ const file_client_proto_rawDesc = "" + "RemotePeer\x12\x0e\n" + "\x02id\x18\x01 \x01(\tR\x02id\x12\x1a\n" + "\bmetadata\x18\t \x01(\tR\bmetadata\x12 \n" + - "\x04peer\x18\b \x01(\v2\f.client.PeerR\x04peer\"u\n" + - "\x05Relay\x12\x0e\n" + - "\x02id\x18\x03 \x01(\tR\x02id\x12-\n" + - "\taddresses\x18\x04 \x03(\v2\x0f.model.HostPortR\taddresses\x12-\n" + - "\x12server_certificate\x18\x02 \x01(\fR\x11serverCertificate\"\xbf\x01\n" + + "\x04peer\x18\b \x01(\v2\f.client.PeerR\x04peer\"\xbf\x01\n" + "\vDirectRelay\x12\x0e\n" + "\x02id\x18\x01 \x01(\tR\x02id\x12-\n" + "\taddresses\x18\x02 \x03(\v2\x0f.model.HostPortR\taddresses\x12-\n" + @@ -812,7 +739,7 @@ func file_client_proto_rawDescGZIP() []byte { return file_client_proto_rawDescData } -var file_client_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_client_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_client_proto_goTypes = []any{ (*AuthenticateReq)(nil), // 0: client.AuthenticateReq (*AuthenticateResp)(nil), // 1: client.AuthenticateResp @@ -820,43 +747,40 @@ var file_client_proto_goTypes = []any{ (*Response)(nil), // 3: client.Response (*Peer)(nil), // 4: client.Peer (*RemotePeer)(nil), // 5: client.RemotePeer - (*Relay)(nil), // 6: client.Relay - (*DirectRelay)(nil), // 7: client.DirectRelay - (*Request_Announce)(nil), // 8: client.Request.Announce - (*Request_Relay)(nil), // 9: client.Request.Relay - (*Response_Announce)(nil), // 10: client.Response.Announce - (*Response_Relays)(nil), // 11: client.Response.Relays - (*pberror.Error)(nil), // 12: error.Error - (*pbmodel.AddrPort)(nil), // 13: model.AddrPort - (*pbmodel.HostPort)(nil), // 14: model.HostPort - (*pbmodel.Endpoint)(nil), // 15: model.Endpoint - (pbmodel.Role)(0), // 16: model.Role + (*DirectRelay)(nil), // 6: client.DirectRelay + (*Request_Announce)(nil), // 7: client.Request.Announce + (*Request_Relay)(nil), // 8: client.Request.Relay + (*Response_Announce)(nil), // 9: client.Response.Announce + (*Response_Relays)(nil), // 10: client.Response.Relays + (*pberror.Error)(nil), // 11: error.Error + (*pbmodel.AddrPort)(nil), // 12: model.AddrPort + (*pbmodel.HostPort)(nil), // 13: model.HostPort + (*pbmodel.Endpoint)(nil), // 14: model.Endpoint + (pbmodel.Role)(0), // 15: model.Role } var file_client_proto_depIdxs = []int32{ - 12, // 0: client.AuthenticateResp.error:type_name -> error.Error - 13, // 1: client.AuthenticateResp.public:type_name -> model.AddrPort - 8, // 2: client.Request.announce:type_name -> client.Request.Announce - 9, // 3: client.Request.relay:type_name -> client.Request.Relay - 12, // 4: client.Response.error:type_name -> error.Error - 10, // 5: client.Response.announce:type_name -> client.Response.Announce - 11, // 6: client.Response.relay:type_name -> client.Response.Relays - 13, // 7: client.Peer.directs:type_name -> model.AddrPort + 11, // 0: client.AuthenticateResp.error:type_name -> error.Error + 12, // 1: client.AuthenticateResp.public:type_name -> model.AddrPort + 7, // 2: client.Request.announce:type_name -> client.Request.Announce + 8, // 3: client.Request.relay:type_name -> client.Request.Relay + 11, // 4: client.Response.error:type_name -> error.Error + 9, // 5: client.Response.announce:type_name -> client.Response.Announce + 10, // 6: client.Response.relay:type_name -> client.Response.Relays + 12, // 7: client.Peer.directs:type_name -> model.AddrPort 4, // 8: client.RemotePeer.peer:type_name -> client.Peer - 14, // 9: client.Relay.addresses:type_name -> model.HostPort - 14, // 10: client.DirectRelay.addresses:type_name -> model.HostPort - 15, // 11: client.Request.Announce.endpoint:type_name -> model.Endpoint - 16, // 12: client.Request.Announce.role:type_name -> model.Role - 4, // 13: client.Request.Announce.peer:type_name -> client.Peer - 15, // 14: client.Request.Relay.endpoint:type_name -> model.Endpoint - 16, // 15: client.Request.Relay.role:type_name -> model.Role - 5, // 16: client.Response.Announce.peers:type_name -> client.RemotePeer - 6, // 17: client.Response.Relays.relays:type_name -> client.Relay - 7, // 18: client.Response.Relays.directs:type_name -> client.DirectRelay - 19, // [19:19] is the sub-list for method output_type - 19, // [19:19] is the sub-list for method input_type - 19, // [19:19] is the sub-list for extension type_name - 19, // [19:19] is the sub-list for extension extendee - 0, // [0:19] is the sub-list for field type_name + 13, // 9: client.DirectRelay.addresses:type_name -> model.HostPort + 14, // 10: client.Request.Announce.endpoint:type_name -> model.Endpoint + 15, // 11: client.Request.Announce.role:type_name -> model.Role + 4, // 12: client.Request.Announce.peer:type_name -> client.Peer + 14, // 13: client.Request.Relay.endpoint:type_name -> model.Endpoint + 15, // 14: client.Request.Relay.role:type_name -> model.Role + 5, // 15: client.Response.Announce.peers:type_name -> client.RemotePeer + 6, // 16: client.Response.Relays.directs:type_name -> client.DirectRelay + 17, // [17:17] is the sub-list for method output_type + 17, // [17:17] is the sub-list for method input_type + 17, // [17:17] is the sub-list for extension type_name + 17, // [17:17] is the sub-list for extension extendee + 0, // [0:17] is the sub-list for field type_name } func init() { file_client_proto_init() } @@ -870,7 +794,7 @@ func file_client_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_client_proto_rawDesc), len(file_client_proto_rawDesc)), NumEnums: 0, - NumMessages: 12, + NumMessages: 11, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/pbrelay/relay.pb.go b/proto/pbrelay/relay.pb.go index ba9f6fe..f3fdc24 100644 --- a/proto/pbrelay/relay.pb.go +++ b/proto/pbrelay/relay.pb.go @@ -23,55 +23,6 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type ChangeType int32 - -const ( - ChangeType_ChangeUnknown ChangeType = 0 - ChangeType_ChangePut ChangeType = 1 - ChangeType_ChangeDel ChangeType = 2 -) - -// Enum value maps for ChangeType. -var ( - ChangeType_name = map[int32]string{ - 0: "ChangeUnknown", - 1: "ChangePut", - 2: "ChangeDel", - } - ChangeType_value = map[string]int32{ - "ChangeUnknown": 0, - "ChangePut": 1, - "ChangeDel": 2, - } -) - -func (x ChangeType) Enum() *ChangeType { - p := new(ChangeType) - *p = x - return p -} - -func (x ChangeType) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (ChangeType) Descriptor() protoreflect.EnumDescriptor { - return file_relay_proto_enumTypes[0].Descriptor() -} - -func (ChangeType) Type() protoreflect.EnumType { - return &file_relay_proto_enumTypes[0] -} - -func (x ChangeType) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use ChangeType.Descriptor instead. -func (ChangeType) EnumDescriptor() ([]byte, []int) { - return file_relay_proto_rawDescGZIP(), []int{0} -} - type AuthenticateReq struct { state protoimpl.MessageState `protogen:"open.v1"` Token string `protobuf:"bytes,1,opt,name=token,proto3" json:"token,omitempty"` @@ -232,214 +183,6 @@ func (x *AuthenticateResp) GetControlAuthenticationKey() []byte { return nil } -type ClientsReq struct { - state protoimpl.MessageState `protogen:"open.v1"` - Offset int64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *ClientsReq) Reset() { - *x = ClientsReq{} - mi := &file_relay_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ClientsReq) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ClientsReq) ProtoMessage() {} - -func (x *ClientsReq) ProtoReflect() protoreflect.Message { - mi := &file_relay_proto_msgTypes[2] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ClientsReq.ProtoReflect.Descriptor instead. -func (*ClientsReq) Descriptor() ([]byte, []int) { - return file_relay_proto_rawDescGZIP(), []int{2} -} - -func (x *ClientsReq) GetOffset() int64 { - if x != nil { - return x.Offset - } - return 0 -} - -type ClientsResp struct { - state protoimpl.MessageState `protogen:"open.v1"` - Changes []*ClientsResp_Change `protobuf:"bytes,1,rep,name=changes,proto3" json:"changes,omitempty"` - Offset int64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"` - Restart bool `protobuf:"varint,3,opt,name=restart,proto3" json:"restart,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *ClientsResp) Reset() { - *x = ClientsResp{} - mi := &file_relay_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ClientsResp) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ClientsResp) ProtoMessage() {} - -func (x *ClientsResp) ProtoReflect() protoreflect.Message { - mi := &file_relay_proto_msgTypes[3] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ClientsResp.ProtoReflect.Descriptor instead. -func (*ClientsResp) Descriptor() ([]byte, []int) { - return file_relay_proto_rawDescGZIP(), []int{3} -} - -func (x *ClientsResp) GetChanges() []*ClientsResp_Change { - if x != nil { - return x.Changes - } - return nil -} - -func (x *ClientsResp) GetOffset() int64 { - if x != nil { - return x.Offset - } - return 0 -} - -func (x *ClientsResp) GetRestart() bool { - if x != nil { - return x.Restart - } - return false -} - -type ServersReq struct { - state protoimpl.MessageState `protogen:"open.v1"` - Offset int64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *ServersReq) Reset() { - *x = ServersReq{} - mi := &file_relay_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ServersReq) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ServersReq) ProtoMessage() {} - -func (x *ServersReq) ProtoReflect() protoreflect.Message { - mi := &file_relay_proto_msgTypes[4] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ServersReq.ProtoReflect.Descriptor instead. -func (*ServersReq) Descriptor() ([]byte, []int) { - return file_relay_proto_rawDescGZIP(), []int{4} -} - -func (x *ServersReq) GetOffset() int64 { - if x != nil { - return x.Offset - } - return 0 -} - -type ServersResp struct { - state protoimpl.MessageState `protogen:"open.v1"` - Changes []*ServersResp_Change `protobuf:"bytes,1,rep,name=changes,proto3" json:"changes,omitempty"` - Offset int64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"` - Restart bool `protobuf:"varint,3,opt,name=restart,proto3" json:"restart,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *ServersResp) Reset() { - *x = ServersResp{} - mi := &file_relay_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ServersResp) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ServersResp) ProtoMessage() {} - -func (x *ServersResp) ProtoReflect() protoreflect.Message { - mi := &file_relay_proto_msgTypes[5] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ServersResp.ProtoReflect.Descriptor instead. -func (*ServersResp) Descriptor() ([]byte, []int) { - return file_relay_proto_rawDescGZIP(), []int{5} -} - -func (x *ServersResp) GetChanges() []*ServersResp_Change { - if x != nil { - return x.Changes - } - return nil -} - -func (x *ServersResp) GetOffset() int64 { - if x != nil { - return x.Offset - } - return 0 -} - -func (x *ServersResp) GetRestart() bool { - if x != nil { - return x.Restart - } - return false -} - type ClientAuthentication struct { state protoimpl.MessageState `protogen:"open.v1"` Endpoint *pbmodel.Endpoint `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"` @@ -451,7 +194,7 @@ type ClientAuthentication struct { func (x *ClientAuthentication) Reset() { *x = ClientAuthentication{} - mi := &file_relay_proto_msgTypes[6] + mi := &file_relay_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -463,7 +206,7 @@ func (x *ClientAuthentication) String() string { func (*ClientAuthentication) ProtoMessage() {} func (x *ClientAuthentication) ProtoReflect() protoreflect.Message { - mi := &file_relay_proto_msgTypes[6] + mi := &file_relay_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -476,7 +219,7 @@ func (x *ClientAuthentication) ProtoReflect() protoreflect.Message { // Deprecated: Use ClientAuthentication.ProtoReflect.Descriptor instead. func (*ClientAuthentication) Descriptor() ([]byte, []int) { - return file_relay_proto_rawDescGZIP(), []int{6} + return file_relay_proto_rawDescGZIP(), []int{2} } func (x *ClientAuthentication) GetEndpoint() *pbmodel.Endpoint { @@ -500,142 +243,6 @@ func (x *ClientAuthentication) GetCertificateKey() string { return "" } -type ClientsResp_Change struct { - state protoimpl.MessageState `protogen:"open.v1"` - Change ChangeType `protobuf:"varint,1,opt,name=change,proto3,enum=relay.ChangeType" json:"change,omitempty"` - Endpoint *pbmodel.Endpoint `protobuf:"bytes,2,opt,name=endpoint,proto3" json:"endpoint,omitempty"` - Role pbmodel.Role `protobuf:"varint,3,opt,name=role,proto3,enum=model.Role" json:"role,omitempty"` - CertificateKey string `protobuf:"bytes,4,opt,name=certificate_key,json=certificateKey,proto3" json:"certificate_key,omitempty"` - Certificate []byte `protobuf:"bytes,5,opt,name=certificate,proto3" json:"certificate,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *ClientsResp_Change) Reset() { - *x = ClientsResp_Change{} - mi := &file_relay_proto_msgTypes[7] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ClientsResp_Change) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ClientsResp_Change) ProtoMessage() {} - -func (x *ClientsResp_Change) ProtoReflect() protoreflect.Message { - mi := &file_relay_proto_msgTypes[7] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ClientsResp_Change.ProtoReflect.Descriptor instead. -func (*ClientsResp_Change) Descriptor() ([]byte, []int) { - return file_relay_proto_rawDescGZIP(), []int{3, 0} -} - -func (x *ClientsResp_Change) GetChange() ChangeType { - if x != nil { - return x.Change - } - return ChangeType_ChangeUnknown -} - -func (x *ClientsResp_Change) GetEndpoint() *pbmodel.Endpoint { - if x != nil { - return x.Endpoint - } - return nil -} - -func (x *ClientsResp_Change) GetRole() pbmodel.Role { - if x != nil { - return x.Role - } - return pbmodel.Role(0) -} - -func (x *ClientsResp_Change) GetCertificateKey() string { - if x != nil { - return x.CertificateKey - } - return "" -} - -func (x *ClientsResp_Change) GetCertificate() []byte { - if x != nil { - return x.Certificate - } - return nil -} - -type ServersResp_Change struct { - state protoimpl.MessageState `protogen:"open.v1"` - Change ChangeType `protobuf:"varint,1,opt,name=change,proto3,enum=relay.ChangeType" json:"change,omitempty"` - Endpoint *pbmodel.Endpoint `protobuf:"bytes,2,opt,name=endpoint,proto3" json:"endpoint,omitempty"` - ServerCertificate []byte `protobuf:"bytes,3,opt,name=server_certificate,json=serverCertificate,proto3" json:"server_certificate,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache -} - -func (x *ServersResp_Change) Reset() { - *x = ServersResp_Change{} - mi := &file_relay_proto_msgTypes[8] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) -} - -func (x *ServersResp_Change) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*ServersResp_Change) ProtoMessage() {} - -func (x *ServersResp_Change) ProtoReflect() protoreflect.Message { - mi := &file_relay_proto_msgTypes[8] - if x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use ServersResp_Change.ProtoReflect.Descriptor instead. -func (*ServersResp_Change) Descriptor() ([]byte, []int) { - return file_relay_proto_rawDescGZIP(), []int{5, 0} -} - -func (x *ServersResp_Change) GetChange() ChangeType { - if x != nil { - return x.Change - } - return ChangeType_ChangeUnknown -} - -func (x *ServersResp_Change) GetEndpoint() *pbmodel.Endpoint { - if x != nil { - return x.Endpoint - } - return nil -} - -func (x *ServersResp_Change) GetServerCertificate() []byte { - if x != nil { - return x.ServerCertificate - } - return nil -} - var File_relay_proto protoreflect.FileDescriptor const file_relay_proto_rawDesc = "" + @@ -654,40 +261,11 @@ const file_relay_proto_rawDesc = "" + "\n" + "control_id\x18\x02 \x01(\tR\tcontrolId\x12'\n" + "\x0freconnect_token\x18\x03 \x01(\fR\x0ereconnectToken\x12<\n" + - "\x1acontrol_authentication_key\x18\x04 \x01(\fR\x18controlAuthenticationKey\"$\n" + - "\n" + - "ClientsReq\x12\x16\n" + - "\x06offset\x18\x01 \x01(\x03R\x06offset\"\xc3\x02\n" + - "\vClientsResp\x123\n" + - "\achanges\x18\x01 \x03(\v2\x19.relay.ClientsResp.ChangeR\achanges\x12\x16\n" + - "\x06offset\x18\x02 \x01(\x03R\x06offset\x12\x18\n" + - "\arestart\x18\x03 \x01(\bR\arestart\x1a\xcc\x01\n" + - "\x06Change\x12)\n" + - "\x06change\x18\x01 \x01(\x0e2\x11.relay.ChangeTypeR\x06change\x12+\n" + - "\bendpoint\x18\x02 \x01(\v2\x0f.model.EndpointR\bendpoint\x12\x1f\n" + - "\x04role\x18\x03 \x01(\x0e2\v.model.RoleR\x04role\x12'\n" + - "\x0fcertificate_key\x18\x04 \x01(\tR\x0ecertificateKey\x12 \n" + - "\vcertificate\x18\x05 \x01(\fR\vcertificate\"$\n" + - "\n" + - "ServersReq\x12\x16\n" + - "\x06offset\x18\x01 \x01(\x03R\x06offset\"\x86\x02\n" + - "\vServersResp\x123\n" + - "\achanges\x18\x01 \x03(\v2\x19.relay.ServersResp.ChangeR\achanges\x12\x16\n" + - "\x06offset\x18\x02 \x01(\x03R\x06offset\x12\x18\n" + - "\arestart\x18\x03 \x01(\bR\arestart\x1a\x8f\x01\n" + - "\x06Change\x12)\n" + - "\x06change\x18\x01 \x01(\x0e2\x11.relay.ChangeTypeR\x06change\x12+\n" + - "\bendpoint\x18\x02 \x01(\v2\x0f.model.EndpointR\bendpoint\x12-\n" + - "\x12server_certificate\x18\x03 \x01(\fR\x11serverCertificate\"\x8d\x01\n" + + "\x1acontrol_authentication_key\x18\x04 \x01(\fR\x18controlAuthenticationKey\"\x8d\x01\n" + "\x14ClientAuthentication\x12+\n" + "\bendpoint\x18\x01 \x01(\v2\x0f.model.EndpointR\bendpoint\x12\x1f\n" + "\x04role\x18\x02 \x01(\x0e2\v.model.RoleR\x04role\x12'\n" + - "\x0fcertificate_key\x18\x03 \x01(\tR\x0ecertificateKey*=\n" + - "\n" + - "ChangeType\x12\x11\n" + - "\rChangeUnknown\x10\x00\x12\r\n" + - "\tChangePut\x10\x01\x12\r\n" + - "\tChangeDel\x10\x02B,Z*github.com/connet-dev/connet/proto/pbrelayb\x06proto3" + "\x0fcertificate_key\x18\x03 \x01(\tR\x0ecertificateKeyB,Z*github.com/connet-dev/connet/proto/pbrelayb\x06proto3" var ( file_relay_proto_rawDescOnce sync.Once @@ -701,41 +279,26 @@ func file_relay_proto_rawDescGZIP() []byte { return file_relay_proto_rawDescData } -var file_relay_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_relay_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_relay_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_relay_proto_goTypes = []any{ - (ChangeType)(0), // 0: relay.ChangeType - (*AuthenticateReq)(nil), // 1: relay.AuthenticateReq - (*AuthenticateResp)(nil), // 2: relay.AuthenticateResp - (*ClientsReq)(nil), // 3: relay.ClientsReq - (*ClientsResp)(nil), // 4: relay.ClientsResp - (*ServersReq)(nil), // 5: relay.ServersReq - (*ServersResp)(nil), // 6: relay.ServersResp - (*ClientAuthentication)(nil), // 7: relay.ClientAuthentication - (*ClientsResp_Change)(nil), // 8: relay.ClientsResp.Change - (*ServersResp_Change)(nil), // 9: relay.ServersResp.Change - (*pbmodel.HostPort)(nil), // 10: model.HostPort - (*pberror.Error)(nil), // 11: error.Error - (*pbmodel.Endpoint)(nil), // 12: model.Endpoint - (pbmodel.Role)(0), // 13: model.Role + (*AuthenticateReq)(nil), // 0: relay.AuthenticateReq + (*AuthenticateResp)(nil), // 1: relay.AuthenticateResp + (*ClientAuthentication)(nil), // 2: relay.ClientAuthentication + (*pbmodel.HostPort)(nil), // 3: model.HostPort + (*pberror.Error)(nil), // 4: error.Error + (*pbmodel.Endpoint)(nil), // 5: model.Endpoint + (pbmodel.Role)(0), // 6: model.Role } var file_relay_proto_depIdxs = []int32{ - 10, // 0: relay.AuthenticateReq.addresses:type_name -> model.HostPort - 11, // 1: relay.AuthenticateResp.error:type_name -> error.Error - 8, // 2: relay.ClientsResp.changes:type_name -> relay.ClientsResp.Change - 9, // 3: relay.ServersResp.changes:type_name -> relay.ServersResp.Change - 12, // 4: relay.ClientAuthentication.endpoint:type_name -> model.Endpoint - 13, // 5: relay.ClientAuthentication.role:type_name -> model.Role - 0, // 6: relay.ClientsResp.Change.change:type_name -> relay.ChangeType - 12, // 7: relay.ClientsResp.Change.endpoint:type_name -> model.Endpoint - 13, // 8: relay.ClientsResp.Change.role:type_name -> model.Role - 0, // 9: relay.ServersResp.Change.change:type_name -> relay.ChangeType - 12, // 10: relay.ServersResp.Change.endpoint:type_name -> model.Endpoint - 11, // [11:11] is the sub-list for method output_type - 11, // [11:11] is the sub-list for method input_type - 11, // [11:11] is the sub-list for extension type_name - 11, // [11:11] is the sub-list for extension extendee - 0, // [0:11] is the sub-list for field type_name + 3, // 0: relay.AuthenticateReq.addresses:type_name -> model.HostPort + 4, // 1: relay.AuthenticateResp.error:type_name -> error.Error + 5, // 2: relay.ClientAuthentication.endpoint:type_name -> model.Endpoint + 6, // 3: relay.ClientAuthentication.role:type_name -> model.Role + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_relay_proto_init() } @@ -748,14 +311,13 @@ func file_relay_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_relay_proto_rawDesc), len(file_relay_proto_rawDesc)), - NumEnums: 1, - NumMessages: 9, + NumEnums: 0, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, GoTypes: file_relay_proto_goTypes, DependencyIndexes: file_relay_proto_depIdxs, - EnumInfos: file_relay_proto_enumTypes, MessageInfos: file_relay_proto_msgTypes, }.Build() File_relay_proto = out.File diff --git a/proto/relay.proto b/proto/relay.proto index 836aa9e..a303f9a 100644 --- a/proto/relay.proto +++ b/proto/relay.proto @@ -23,46 +23,6 @@ message AuthenticateResp { bytes control_authentication_key = 4; } -enum ChangeType { - ChangeUnknown = 0; - ChangePut = 1; - ChangeDel = 2; -} - -message ClientsReq { - int64 offset = 1; -} - -message ClientsResp { - repeated Change changes = 1; - int64 offset = 2; - bool restart = 3; - - message Change { - ChangeType change = 1; - model.Endpoint endpoint = 2; - model.Role role = 3; - string certificate_key = 4; - bytes certificate = 5; - } -} - -message ServersReq { - int64 offset = 1; -} - -message ServersResp { - repeated Change changes = 1; - int64 offset = 2; - bool restart = 3; - - message Change { - ChangeType change = 1; - model.Endpoint endpoint = 2; - bytes server_certificate = 3; - } -} - message ClientAuthentication { model.Endpoint endpoint = 1; model.Role role = 2; diff --git a/relay.go b/relay.go index cac3327..958f8f0 100644 --- a/relay.go +++ b/relay.go @@ -17,7 +17,6 @@ import ( "github.com/connet-dev/connet/pkg/slogc" "github.com/connet-dev/connet/proto" "github.com/connet-dev/connet/proto/pbclientrelay" - "github.com/connet-dev/connet/proto/pbconnect" "github.com/connet-dev/connet/proto/pberror" "github.com/quic-go/quic-go" ) @@ -109,62 +108,28 @@ func (r *relay) connect(ctx context.Context, hp model.HostPort) (*quic.Conn, err cfg := r.serverConf.Load() - nextProtos := iterc.MapVarStrings(model.ConnectRelayV02, model.ConnectRelayV01) - if cfg.auth == nil { - nextProtos = iterc.MapVarStrings(model.ConnectRelayV01) - } - - r.logger.Debug("dialing relay", "addr", addr, "server", cfg.tls.name, "cert", cfg.tls.key, "protos", nextProtos) + r.logger.Debug("dialing relay", "addr", addr, "server", cfg.tls.name, "cert", cfg.tls.key) conn, err := r.local.direct.transport.Dial(ctx, addr, &tls.Config{ Certificates: []tls.Certificate{r.local.clientCert}, RootCAs: cfg.tls.cas, ServerName: cfg.tls.name, - NextProtos: nextProtos, + NextProtos: iterc.MapVarStrings(model.ConnectRelayV02), }, quicc.ClientConfig(r.local.direct.handshakeIdleTimeout)) if err != nil { return nil, err } - protocol := model.GetConnectRelayNextProto(conn) - if protocol == model.ConnectRelayV01 { - if err := r.check(ctx, conn); err != nil { - cerr := conn.CloseWithError(quic.ApplicationErrorCode(pberror.Code_ConnectionCheckFailed), "connection check failed") - return nil, errors.Join(err, cerr) - } - } else { - if err := r.authenticate(ctx, conn, cfg.auth); err != nil { - if perr := pberror.GetError(err); perr != nil { - cerr := conn.CloseWithError(quic.ApplicationErrorCode(perr.Code), perr.Message) - return nil, errors.Join(perr, cerr) - } - cerr := conn.CloseWithError(quic.ApplicationErrorCode(pberror.Code_ConnectionCheckFailed), "connection check failed") - return nil, errors.Join(err, cerr) + if err := r.authenticate(ctx, conn, cfg.auth); err != nil { + if perr := pberror.GetError(err); perr != nil { + cerr := conn.CloseWithError(quic.ApplicationErrorCode(perr.Code), perr.Message) + return nil, errors.Join(perr, cerr) } + cerr := conn.CloseWithError(quic.ApplicationErrorCode(pberror.Code_ConnectionCheckFailed), "connection check failed") + return nil, errors.Join(err, cerr) } return conn, nil } -func (r *relay) check(ctx context.Context, conn *quic.Conn) error { - stream, err := conn.OpenStreamSync(ctx) - if err != nil { - return err - } - defer func() { - if err := stream.Close(); err != nil { - slogc.Fine(r.logger, "error closing check stream", "err", err) - } - }() - - if err := proto.Write(stream, &pbconnect.Request{}); err != nil { - return err - } - if _, err := pbconnect.ReadResponse(stream); err != nil { - return err - } - - return nil -} - func (r *relay) authenticate(ctx context.Context, conn *quic.Conn, auth []byte) error { stream, err := conn.OpenStreamSync(ctx) if err != nil { diff --git a/server/control/clients.go b/server/control/clients.go index e23b9fe..f1eb310 100644 --- a/server/control/clients.go +++ b/server/control/clients.go @@ -17,7 +17,6 @@ import ( "github.com/connet-dev/connet/model" "github.com/connet-dev/connet/pkg/iterc" "github.com/connet-dev/connet/pkg/logc" - "github.com/connet-dev/connet/pkg/notify" "github.com/connet-dev/connet/pkg/quicc" "github.com/connet-dev/connet/pkg/reliable" "github.com/connet-dev/connet/pkg/slogc" @@ -43,8 +42,6 @@ type ClientAuthenticator interface { type ClientAuthentication []byte type ClientRelays interface { - Client(ctx context.Context, endpoint model.Endpoint, role model.Role, cert *x509.Certificate, auth ClientAuthentication, - notify func(map[RelayID]relayCacheValue) error) error Directs(ctx context.Context, endpoint model.Endpoint, role model.Role, cert *x509.Certificate, auth ClientAuthentication, notify func(map[RelayID]*pbclient.DirectRelay) error) error } @@ -712,72 +709,17 @@ func (s *clientStream) relay(ctx context.Context, req *pbclient.Request_Relay) e g := reliable.NewGroup(ctx) g.Go(quicc.CancelStream(s.stream)) - relaysResp := notify.NewEmpty[*pbclient.Response_Relays]() g.Go(func(ctx context.Context) error { - defer s.conn.logger.Debug("completed relay write") - return relaysResp.Listen(ctx, func(resp *pbclient.Response_Relays) error { - directIds := map[string]struct{}{} - for _, d := range resp.Directs { - directIds[d.Id] = struct{}{} - } - - resp = &pbclient.Response_Relays{ - Relays: iterc.FilterSlice(resp.Relays, func(r *pbclient.Relay) bool { - _, ok := directIds[r.Id] - return !ok - }), - Directs: resp.Directs, - } - - s.conn.logger.Debug("updated all relay list", "relays", len(resp.Relays), "directs", len(resp.Directs)) - if err := proto.Write(s.stream, &pbclient.Response{ - Relay: resp, - }); err != nil { - return fmt.Errorf("client relay response: %w", err) - } - return nil - }) - }) - - g.Go(func(ctx context.Context) error { - defer s.conn.logger.Debug("completed relay notify") - return s.conn.server.relays.Client(ctx, endpoint, role, clientCert, s.conn.auth, func(relays map[RelayID]relayCacheValue) error { - s.conn.logger.Debug("updated relay list", "relays", len(relays)) - - var addrs []*pbclient.Relay - for id, value := range relays { - addrs = append(addrs, &pbclient.Relay{ - Id: id.string, - Addresses: model.PBsFromHostPorts(value.Hostports), - ServerCertificate: value.Cert.Raw, - }) - } - - relaysResp.Update(func(resp *pbclient.Response_Relays) *pbclient.Response_Relays { - return &pbclient.Response_Relays{ - Relays: addrs, - Directs: resp.GetDirects(), - } - }) - return nil - }) - }) - - g.Go(func(ctx context.Context) error { - if s.conn.protocol == model.ClientControlV02 { - // old clients don't support direct relays - return nil - } - defer s.conn.logger.Debug("completed direct relay notify") return s.conn.server.relays.Directs(ctx, endpoint, role, clientCert, s.conn.auth, func(relays map[RelayID]*pbclient.DirectRelay) error { s.conn.logger.Debug("updated direct relay list", "relays", len(relays)) - relaysResp.Update(func(resp *pbclient.Response_Relays) *pbclient.Response_Relays { - return &pbclient.Response_Relays{ - Relays: resp.GetRelays(), + if err := proto.Write(s.stream, &pbclient.Response{ + Relay: &pbclient.Response_Relays{ Directs: slices.Collect(maps.Values(relays)), - } - }) + }, + }); err != nil { + return fmt.Errorf("client relay response: %w", err) + } return nil }) }) diff --git a/server/control/relays.go b/server/control/relays.go index a481ed6..84c58f1 100644 --- a/server/control/relays.go +++ b/server/control/relays.go @@ -49,41 +49,6 @@ func newRelayServer( stores Stores, logger *slog.Logger, ) (*relayServer, error) { - conns, err := stores.RelayConns() - if err != nil { - return nil, fmt.Errorf("relay conns store open: %w", err) - } - - clients, err := stores.RelayClients() - if err != nil { - return nil, fmt.Errorf("relay clients store open: %w", err) - } - - servers, err := stores.RelayServers() - if err != nil { - return nil, fmt.Errorf("relay servers store open: %w", err) - } - - serverOffsets, err := stores.RelayServerOffsets() - if err != nil { - return nil, fmt.Errorf("relay server offsets store open: %w", err) - } - - endpointsMsgs, endpointsOffset, err := servers.Snapshot() - if err != nil { - return nil, fmt.Errorf("relay servers snapshot: %w", err) - } - - endpointsCache := map[model.Endpoint]map[RelayID]relayCacheValue{} - for _, msg := range endpointsMsgs { - srv := endpointsCache[msg.Key.Endpoint] - if srv == nil { - srv = map[RelayID]relayCacheValue{} - endpointsCache[msg.Key.Endpoint] = srv - } - srv[msg.Key.RelayID] = relayCacheValue{Hostports: msg.Value.Hostports, Cert: msg.Value.Cert} - } - directs, err := stores.RelayDirects() if err != nil { return nil, fmt.Errorf("relay directs store open: %w", err) @@ -147,15 +112,6 @@ func newRelayServer( reconnect: &reconnectToken{[32]byte(serverSecret.Bytes)}, - stores: stores, - conns: conns, - clients: clients, - servers: servers, - serverOffsets: serverOffsets, - - endpointsCache: endpointsCache, - endpointsOffset: endpointsOffset, - directs: directs, directsCache: directsCache, directsOffset: directsOffset, @@ -172,16 +128,6 @@ type relayServer struct { reconnect *reconnectToken - stores Stores - conns logc.KV[RelayConnKey, RelayConnValue] - clients logc.KV[RelayClientKey, RelayClientValue] - servers logc.KV[RelayServerKey, RelayServerValue] - serverOffsets logc.KV[RelayConnKey, int64] - - endpointsCache map[model.Endpoint]map[RelayID]relayCacheValue - endpointsOffset int64 - endpointsMu sync.RWMutex - directs logc.KV[RelayConnKey, RelayDirectValue] directsCache map[RelayID]directRelay directsOffset int64 @@ -194,71 +140,6 @@ type directRelay struct { template *pbclient.DirectRelay } -func (s *relayServer) getEndpoint(endpoint model.Endpoint) (map[RelayID]relayCacheValue, int64) { - s.endpointsMu.RLock() - defer s.endpointsMu.RUnlock() - - return maps.Clone(s.endpointsCache[endpoint]), s.endpointsOffset -} - -func (s *relayServer) Client(ctx context.Context, endpoint model.Endpoint, role model.Role, cert *x509.Certificate, auth ClientAuthentication, - notifyFn func(map[RelayID]relayCacheValue) error) error { - - key := RelayClientKey{Endpoint: endpoint, Role: role, Key: model.NewKey(cert)} - val := RelayClientValue{Cert: cert, Authentication: auth} - if err := s.clients.Put(key, val); err != nil { - return err - } - defer func() { - if err := s.clients.Del(key); err != nil { - s.logger.Warn("failed to delete client", "key", key, "err", err) - } - }() - - return s.listen(ctx, endpoint, notifyFn) -} - -func (s *relayServer) listen(ctx context.Context, endpoint model.Endpoint, - notifyFn func(map[RelayID]relayCacheValue) error) error { - - servers, offset := s.getEndpoint(endpoint) - if err := notifyFn(servers); err != nil { - return err - } - - for { - msgs, nextOffset, err := s.servers.Consume(ctx, offset) - if err != nil { - return err - } - - var changed bool - for _, msg := range msgs { - if msg.Key.Endpoint != endpoint { - continue - } - - if msg.Delete { - delete(servers, msg.Key.RelayID) - } else { - if servers == nil { - servers = map[RelayID]relayCacheValue{} - } - servers[msg.Key.RelayID] = relayCacheValue{Hostports: msg.Value.Hostports, Cert: msg.Value.Cert} - } - changed = true - } - - offset = nextOffset - - if changed { - if err := notifyFn(servers); err != nil { - return err - } - } - } -} - func (s *relayServer) cachedDirects() (map[RelayID]directRelay, int64) { s.directsMu.RLock() defer s.directsMu.RUnlock() @@ -343,13 +224,9 @@ func (s *relayServer) run(ctx context.Context) error { for _, ingress := range s.ingresses { g.Go(reliable.Bind(ingress, s.runListener)) } - g.Go(s.runEndpointsCache) g.Go(s.runDirectsCache) - g.Go(logc.ScheduleCompact(s.conns)) - g.Go(logc.ScheduleCompact(s.clients)) - g.Go(logc.ScheduleCompact(s.servers)) - g.Go(logc.ScheduleCompact(s.serverOffsets)) + g.Go(logc.ScheduleCompact(s.directs)) return g.Wait() } @@ -376,7 +253,7 @@ func (s *relayServer) runListener(ctx context.Context, ingress Ingress) error { tlsConf := ingress.TLS.Clone() if len(tlsConf.NextProtos) == 0 { - tlsConf.NextProtos = iterc.MapVarStrings(model.RelayControlV03, model.RelayControlV02) + tlsConf.NextProtos = iterc.MapVarStrings(model.RelayControlV03) } quicConf := quicc.ServerConfig() @@ -417,48 +294,6 @@ func (s *relayServer) runListener(ctx context.Context, ingress Ingress) error { } } -func (s *relayServer) runEndpointsCache(ctx context.Context) error { - update := func(msg logc.Message[RelayServerKey, RelayServerValue]) { - s.endpointsMu.Lock() - defer s.endpointsMu.Unlock() - - srv := s.endpointsCache[msg.Key.Endpoint] - if msg.Delete { - delete(srv, msg.Key.RelayID) - if len(srv) == 0 { - delete(s.endpointsCache, msg.Key.Endpoint) - } - } else { - if srv == nil { - srv = map[RelayID]relayCacheValue{} - s.endpointsCache[msg.Key.Endpoint] = srv - } - srv[msg.Key.RelayID] = relayCacheValue{Hostports: msg.Value.Hostports, Cert: msg.Value.Cert} - } - - s.endpointsOffset = msg.Offset + 1 - } - - for { - s.endpointsMu.RLock() - offset := s.endpointsOffset - s.endpointsMu.RUnlock() - - msgs, nextOffset, err := s.servers.Consume(ctx, offset) - if err != nil { - return fmt.Errorf("relay servers consume: %w", err) - } - - for _, msg := range msgs { - update(msg) - } - - s.endpointsMu.Lock() - s.endpointsOffset = nextOffset - s.endpointsMu.Unlock() - } -} - func (s *relayServer) runDirectsCache(ctx context.Context) error { update := func(msg logc.Message[RelayConnKey, RelayDirectValue]) { s.directsMu.Lock() @@ -502,28 +337,11 @@ func (s *relayServer) runDirectsCache(ctx context.Context) error { } } -func (s *relayServer) getRelayServerOffset(id RelayID) (int64, error) { - offset, err := s.serverOffsets.Get(RelayConnKey{id}) - switch { - case errors.Is(err, logc.ErrNotFound): - return logc.OffsetOldest, nil - case err != nil: - return logc.OffsetInvalid, err - default: - return offset, nil - } -} - -func (s *relayServer) setRelayServerOffset(id RelayID, offset int64) error { - return s.serverOffsets.Put(RelayConnKey{id}, offset) -} - type relayConn struct { server *relayServer conn *quic.Conn logger *slog.Logger - endpoints logc.KV[RelayEndpointKey, RelayEndpointValue] relayConnAuth } @@ -568,48 +386,18 @@ func (c *relayConn) runErr(ctx context.Context) error { c.logger.Info("relay connected", "addr", c.conn.RemoteAddr(), "metadata", c.metadata) defer c.logger.Info("relay disconnected", "addr", c.conn.RemoteAddr(), "metadata", c.metadata) - endpoints, err := c.server.stores.RelayEndpoints(c.id) - if err != nil { - return err - } - defer func() { - if err := endpoints.Close(); err != nil { - c.logger.Warn("failed to close endpoints store", "id", c.id, "err", err) - } - }() - c.endpoints = endpoints - key := RelayConnKey{ID: c.id} - value := RelayConnValue{c.auth, c.hostports, c.metadata} - if err := c.server.conns.Put(key, value); err != nil { + value := RelayDirectValue{c.auth, c.hostports, c.metadata, c.certificate, c.authSignKey} + if err := c.server.directs.Put(key, value); err != nil { return err } defer func() { - if err := c.server.conns.Del(key); err != nil { + if err := c.server.directs.Del(key); err != nil { c.logger.Warn("failed to delete conn", "key", key, "err", err) } }() - if c.protocol == model.RelayControlV03 { - key := RelayConnKey{ID: c.id} - value := RelayDirectValue{c.auth, c.hostports, c.metadata, c.certificate, c.authSignKey} - if err := c.server.directs.Put(key, value); err != nil { - return err - } - defer func() { - if err := c.server.directs.Del(key); err != nil { - c.logger.Warn("failed to delete conn", "key", key, "err", err) - } - }() - } - - return reliable.RunGroup(ctx, - c.runRelayClients, - c.runRelayEndpoints, - c.runRelayServers, - logc.ScheduleCompactAcc(c.endpoints), - func(ctx context.Context) error { return quicc.WaitLogRTTStats(ctx, c.conn, c.logger) }, - ) + return quicc.WaitLogRTTStats(ctx, c.conn, c.logger) // v0.14.0 rotate secrets? } func (c *relayConn) authenticate(ctx context.Context) (*relayConnAuth, error) { @@ -629,55 +417,7 @@ func (c *relayConn) authenticate(ctx context.Context) (*relayConnAuth, error) { return nil, fmt.Errorf("auth read request: %w", err) } - switch model.GetRelayControlNextProto(c.conn) { - case model.RelayControlV02: - return c.authenticateV2(authStream, req) - default: - return c.authenticateV3(authStream, req) - } -} - -func (c *relayConn) authenticateV2(authStream *quic.Stream, req *pbrelay.AuthenticateReq) (*relayConnAuth, error) { - auth, err := c.server.auth.Authenticate(RelayAuthenticateRequest{ - Proto: model.RelayControlV02, - Token: req.Token, - Addr: c.conn.RemoteAddr(), - BuildVersion: req.BuildVersion, - }) - if err != nil { - perr := pberror.GetError(err) - if perr == nil { - perr = pberror.NewError(pberror.Code_AuthenticationFailed, "authentication failed: %v", err) - } - if err := proto.Write(authStream, &pbrelay.AuthenticateResp{Error: perr}); err != nil { - return nil, fmt.Errorf("relay auth err write: %w", err) - } - return nil, fmt.Errorf("auth failed: %w", perr) - } - - var id RelayID - if sid, err := c.server.reconnect.openRelayID(req.ReconnectToken); err != nil { - c.logger.Debug("decode failed", "err", err) - id = NewRelayID() - } else { - id = sid - } - - retoken, err := c.server.reconnect.sealRelayID(id) - if err != nil { - c.logger.Debug("encrypting failed", "err", err) - retoken = nil - } - if err := proto.Write(authStream, &pbrelay.AuthenticateResp{ - ControlId: c.server.id, - ReconnectToken: retoken, - }); err != nil { - return nil, fmt.Errorf("auth write response: %w", err) - } - - c.logger.Debug("authentication completed", "local", c.conn.LocalAddr(), "remote", c.conn.RemoteAddr(), "proto", model.RelayControlV02, "build", req.BuildVersion) - hostports := model.HostPortFromPBs(req.Addresses) - return &relayConnAuth{id, auth, hostports, req.Metadata, model.RelayControlV02, nil, nil}, nil + return c.authenticateV3(authStream, req) } func (c *relayConn) authenticateV3(authStream *quic.Stream, req *pbrelay.AuthenticateReq) (*relayConnAuth, error) { @@ -756,180 +496,3 @@ func (c *relayConn) authenticateV3(authStream *quic.Stream, req *pbrelay.Authent hostports := model.HostPortFromPBs(req.Addresses) return &relayConnAuth{id, auth, hostports, req.Metadata, protocol, cert, sharedKey}, nil } - -func (c *relayConn) runRelayClients(ctx context.Context) error { - stream, err := c.conn.AcceptStream(ctx) - if err != nil { - return err - } - defer func() { - if err := stream.Close(); err != nil { - slogc.Fine(c.logger, "error closing relay clients", "err", err) - } - }() - - for { - req := &pbrelay.ClientsReq{} - if err := proto.Read(stream, req); err != nil { - return err - } - - var msgs []logc.Message[RelayClientKey, RelayClientValue] - var nextOffset int64 - if req.Offset == logc.OffsetOldest { - msgs, nextOffset, err = c.server.clients.Snapshot() - c.logger.Debug("sending initial relay changes", "offset", nextOffset, "changes", len(msgs)) - } else { - msgs, nextOffset, err = c.server.clients.Consume(ctx, req.Offset) - c.logger.Debug("sending delta relay changes", "offset", nextOffset, "changes", len(msgs)) - } - if err != nil { - return err - } - - // if len(msgs) == 0 && offset >= 0 && offset < nextOffset { - // TODO we are too far off and potentially have missed messages - // } - - resp := &pbrelay.ClientsResp{Offset: nextOffset} - - for _, msg := range msgs { - ok, err := c.server.auth.Allow(c.auth, msg.Value.Authentication, msg.Key.Endpoint) - switch { - case err != nil: - return err - case !ok: - continue - } - - change := &pbrelay.ClientsResp_Change{ - Endpoint: msg.Key.Endpoint.PB(), - Role: msg.Key.Role.PB(), - CertificateKey: msg.Key.Key.String(), - } - - if msg.Delete { - change.Change = pbrelay.ChangeType_ChangeDel - } else { - change.Change = pbrelay.ChangeType_ChangePut - change.Certificate = msg.Value.Cert.Raw - } - - resp.Changes = append(resp.Changes, change) - } - - if err := proto.Write(stream, resp); err != nil { - return err - } - } -} - -func (c *relayConn) runRelayServers(ctx context.Context) error { - stream, err := c.conn.OpenStreamSync(ctx) - if err != nil { - return err - } - defer func() { - if err := stream.Close(); err != nil { - slogc.Fine(c.logger, "error closing relay servers", "err", err) - } - }() - - for { - offset, err := c.server.getRelayServerOffset(c.id) - if err != nil { - return err - } - - req := &pbrelay.ServersReq{ - Offset: offset, - } - if err := proto.Write(stream, req); err != nil { - return err - } - - resp := &pbrelay.ServersResp{} - if err := proto.Read(stream, resp); err != nil { - return err - } - - for _, change := range resp.Changes { - key := RelayEndpointKey{Endpoint: model.EndpointFromPB(change.Endpoint)} - - switch change.Change { - case pbrelay.ChangeType_ChangePut: - cert, err := x509.ParseCertificate(change.ServerCertificate) - if err != nil { - return err - } - value := RelayEndpointValue{Cert: cert} - if err := c.endpoints.Put(key, value); err != nil { - return err - } - case pbrelay.ChangeType_ChangeDel: - if err := c.endpoints.Del(key); err != nil { - return err - } - default: - return fmt.Errorf("unknown change: %v", change.Change) - } - } - - if err := c.server.setRelayServerOffset(c.id, resp.Offset); err != nil { - return err - } - } -} - -func (c *relayConn) runRelayEndpoints(ctx context.Context) error { - initialMsgs, offset, err := c.endpoints.Snapshot() - if err != nil { - return err - } - - for _, msg := range initialMsgs { - key := RelayServerKey{Endpoint: msg.Key.Endpoint, RelayID: c.id} - value := RelayServerValue{Hostports: c.hostports, Cert: msg.Value.Cert} - if err := c.server.servers.Put(key, value); err != nil { - return err - } - } - - defer func() { - msgs, _, err := c.endpoints.Snapshot() - if err != nil { - c.logger.Warn("cannot snapshot endpoint", "err", err) - return - } - - for _, msg := range msgs { - key := RelayServerKey{Endpoint: msg.Key.Endpoint, RelayID: c.id} - if err := c.server.servers.Del(key); err != nil { - c.logger.Warn("cannot delete endpoint", "key", key, "err", err) - } - } - }() - - for { - msgs, nextOffset, err := c.endpoints.Consume(ctx, offset) - if err != nil { - return err - } - - for _, msg := range msgs { - key := RelayServerKey{Endpoint: msg.Key.Endpoint, RelayID: c.id} - if msg.Delete { - if err := c.server.servers.Del(key); err != nil { - return err - } - } else { - value := RelayServerValue{Hostports: c.hostports, Cert: msg.Value.Cert} - if err := c.server.servers.Put(key, value); err != nil { - return err - } - } - } - - offset = nextOffset - } -} diff --git a/server/control/secrets.go b/server/control/secrets.go index 317b89c..d17a51e 100644 --- a/server/control/secrets.go +++ b/server/control/secrets.go @@ -2,7 +2,6 @@ package control import ( "crypto/rand" - "encoding/binary" "errors" "fmt" "io" @@ -49,9 +48,6 @@ func (s *reconnectToken) openClientID(encryptedID []byte) (ClientID, error) { if err != nil { return ClientIDNil, err } - if len(data) == 20 { - return ClientID{formatBase62(data)}, nil - } return ClientID{string(data)}, nil } @@ -64,59 +60,5 @@ func (s *reconnectToken) openRelayID(encryptedID []byte) (RelayID, error) { if err != nil { return RelayIDNil, err } - if len(data) == 20 { - return RelayID{formatBase62(data)}, nil - } return RelayID{string(data)}, nil } - -// TODO copied from ksuid, remove in v0.14.0 -const base62Characters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" -const zeroString = "000000000000000000000000000" - -func formatBase62(src []byte) string { - var dst = make([]byte, 27) - const srcBase = 4294967296 - const dstBase = 62 - - // Split src into 5 4-byte words, this is where most of the efficiency comes - // from because this is a O(N^2) algorithm, and we make N = N / 4 by working - // on 32 bits at a time. - parts := [5]uint32{ - binary.BigEndian.Uint32(src[0:4]), - binary.BigEndian.Uint32(src[4:8]), - binary.BigEndian.Uint32(src[8:12]), - binary.BigEndian.Uint32(src[12:16]), - binary.BigEndian.Uint32(src[16:20]), - } - - n := len(dst) - bp := parts[:] - bq := [5]uint32{} - - for len(bp) != 0 { - quotient := bq[:0] - remainder := uint64(0) - - for _, c := range bp { - value := uint64(c) + uint64(remainder)*srcBase - digit := value / dstBase - remainder = value % dstBase - - if len(quotient) != 0 || digit != 0 { - quotient = append(quotient, uint32(digit)) - } - } - - // Writes at the end of the destination buffer because we computed the - // lowest bits first. - n-- - dst[n] = base62Characters[remainder] - bp = quotient - } - - // Add padding at the head of the destination buffer for all bytes that were - // not set. - copy(dst[:n], zeroString) - return string(dst) -} diff --git a/server/control/server.go b/server/control/server.go index ab2abff..131d116 100644 --- a/server/control/server.go +++ b/server/control/server.go @@ -137,7 +137,7 @@ func (s *Server) getEndpoints() (map[string]StatusEndpoint, error) { } func (s *Server) getRelays() (map[string]StatusRelay, error) { - msgs, _, err := s.relays.conns.Snapshot() + msgs, _, err := s.relays.directs.Snapshot() if err != nil { return nil, err } diff --git a/server/control/store.go b/server/control/store.go index b8c96f6..4454e88 100644 --- a/server/control/store.go +++ b/server/control/store.go @@ -6,7 +6,6 @@ import ( "path/filepath" "github.com/connet-dev/connet/model" - "github.com/connet-dev/connet/pkg/certc" "github.com/connet-dev/connet/pkg/logc" "github.com/connet-dev/connet/proto/pbclient" ) @@ -17,11 +16,6 @@ type Stores interface { ClientConns() (logc.KV[ClientConnKey, ClientConnValue], error) ClientPeers() (logc.KV[ClientPeerKey, ClientPeerValue], error) - RelayConns() (logc.KV[RelayConnKey, RelayConnValue], error) - RelayClients() (logc.KV[RelayClientKey, RelayClientValue], error) - RelayEndpoints(id RelayID) (logc.KV[RelayEndpointKey, RelayEndpointValue], error) - RelayServers() (logc.KV[RelayServerKey, RelayServerValue], error) - RelayServerOffsets() (logc.KV[RelayConnKey, int64], error) RelayDirects() (logc.KV[RelayConnKey, RelayDirectValue], error) } @@ -45,26 +39,6 @@ func (f *fileStores) ClientPeers() (logc.KV[ClientPeerKey, ClientPeerValue], err return logc.NewKV[ClientPeerKey, ClientPeerValue](filepath.Join(f.dir, "client-peers")) } -func (f *fileStores) RelayConns() (logc.KV[RelayConnKey, RelayConnValue], error) { - return logc.NewKV[RelayConnKey, RelayConnValue](filepath.Join(f.dir, "relay-conns")) -} - -func (f *fileStores) RelayClients() (logc.KV[RelayClientKey, RelayClientValue], error) { - return logc.NewKV[RelayClientKey, RelayClientValue](filepath.Join(f.dir, "relay-clients")) -} - -func (f *fileStores) RelayEndpoints(id RelayID) (logc.KV[RelayEndpointKey, RelayEndpointValue], error) { - return logc.NewKV[RelayEndpointKey, RelayEndpointValue](filepath.Join(f.dir, "relay-endpoints", id.string)) -} - -func (f *fileStores) RelayServers() (logc.KV[RelayServerKey, RelayServerValue], error) { - return logc.NewKV[RelayServerKey, RelayServerValue](filepath.Join(f.dir, "relay-servers")) -} - -func (f *fileStores) RelayServerOffsets() (logc.KV[RelayConnKey, int64], error) { - return logc.NewKV[RelayConnKey, int64](filepath.Join(f.dir, "relay-server-offsets")) -} - func (f *fileStores) RelayDirects() (logc.KV[RelayConnKey, RelayDirectValue], error) { return logc.NewKV[RelayConnKey, RelayDirectValue](filepath.Join(f.dir, "relay-directs")) } @@ -115,12 +89,6 @@ type RelayConnKey struct { ID RelayID `json:"id"` } -type RelayConnValue struct { - Authentication RelayAuthentication `json:"authentication"` - Hostports []model.HostPort `json:"hostports"` - Metadata string `json:"metadata"` -} - type RelayDirectValue struct { Authentication RelayAuthentication `json:"authentication"` Hostports []model.HostPort `json:"hostports"` @@ -163,105 +131,3 @@ func (v *RelayDirectValue) UnmarshalJSON(b []byte) error { *v = RelayDirectValue{s.Authentication, s.Hostports, s.Metadata, cert, &authKey} return nil } - -type RelayClientKey struct { - Endpoint model.Endpoint `json:"endpoint"` - Role model.Role `json:"role"` - Key model.Key `json:"key"` -} - -type RelayClientValue struct { - Cert *x509.Certificate `json:"cert"` - Authentication ClientAuthentication `json:"authentication"` -} - -type jsonRelayClientValue struct { - Cert []byte `json:"cert"` - Authentication []byte `json:"authentication"` -} - -func (v RelayClientValue) MarshalJSON() ([]byte, error) { - return json.Marshal(jsonRelayClientValue{ - Cert: v.Cert.Raw, - Authentication: v.Authentication, - }) -} - -func (v *RelayClientValue) UnmarshalJSON(b []byte) error { - s := jsonRelayClientValue{} - if err := json.Unmarshal(b, &s); err != nil { - return err - } - - cert, err := x509.ParseCertificate(s.Cert) - if err != nil { - return err - } - - *v = RelayClientValue{cert, s.Authentication} - return nil -} - -type RelayEndpointKey struct { - Endpoint model.Endpoint `json:"endpoint"` -} - -type RelayEndpointValue struct { - Cert *x509.Certificate `json:"cert"` -} - -func (v RelayEndpointValue) MarshalJSON() ([]byte, error) { - return certc.MarshalJSONCert(v.Cert) -} - -func (v *RelayEndpointValue) UnmarshalJSON(b []byte) error { - cert, err := certc.UnmarshalJSONCert(b) - if err != nil { - return err - } - - *v = RelayEndpointValue{cert} - return nil -} - -type RelayServerKey struct { - Endpoint model.Endpoint `json:"endpoint"` - RelayID RelayID `json:"relay_id"` -} - -type RelayServerValue struct { - Hostports []model.HostPort `json:"hostports"` - Cert *x509.Certificate `json:"cert"` -} - -type jsonRelayServerValue struct { - Hostports []model.HostPort `json:"hostports"` - Cert []byte `json:"cert"` -} - -func (v RelayServerValue) MarshalJSON() ([]byte, error) { - return json.Marshal(jsonRelayServerValue{ - Hostports: v.Hostports, - Cert: v.Cert.Raw, - }) -} - -func (v *RelayServerValue) UnmarshalJSON(b []byte) error { - s := jsonRelayServerValue{} - if err := json.Unmarshal(b, &s); err != nil { - return err - } - - cert, err := x509.ParseCertificate(s.Cert) - if err != nil { - return err - } - - *v = RelayServerValue{Hostports: s.Hostports, Cert: cert} - return nil -} - -type relayCacheValue struct { - Hostports []model.HostPort - Cert *x509.Certificate -} diff --git a/server/relay/clients.go b/server/relay/clients.go index d5ba82b..36db9a3 100644 --- a/server/relay/clients.go +++ b/server/relay/clients.go @@ -35,11 +35,9 @@ type clientAuth struct { metadata string } -type tlsAuthenticator func(chi *tls.ClientHelloInfo) (*tls.Config, error) -type clientAuthenticator func(serverName string, certs []*x509.Certificate) *clientAuth type directAuthenticator func(req *pbclientrelay.AuthenticateReq, cert *x509.Certificate) (*clientAuth, error) -func newClientsServer(cfg Config, tlsAuth tlsAuthenticator, clAuth clientAuthenticator, directAuth directAuthenticator, directCert *certc.Cert) (*clientsServer, error) { +func newClientsServer(cfg Config, directAuth directAuthenticator, directCert *certc.Cert) (*clientsServer, error) { directTLS, err := directCert.TLSCert() if err != nil { return nil, fmt.Errorf("direct TLS cert: %w", err) @@ -52,16 +50,8 @@ func newClientsServer(cfg Config, tlsAuth tlsAuthenticator, clAuth clientAuthent } return &clientsServer{ - tlsConf: &tls.Config{ - GetConfigForClient: func(chi *tls.ClientHelloInfo) (*tls.Config, error) { - if chi.ServerName == directTLSConf.ServerName { - return directTLSConf, nil - } - return tlsAuth(chi) - }, - }, - auth: clAuth, - direct: directAuth, + tlsConf: directTLSConf, + direct: directAuth, endpoints: map[model.Endpoint]*endpointClients{}, @@ -71,7 +61,6 @@ func newClientsServer(cfg Config, tlsAuth tlsAuthenticator, clAuth clientAuthent type clientsServer struct { tlsConf *tls.Config - auth clientAuthenticator direct directAuthenticator endpoints map[model.Endpoint]*endpointClients @@ -284,34 +273,18 @@ func (c *clientConn) run(ctx context.Context) { var errNotRecognizedClient = errors.New("client not recognized as a destination or a source") func (c *clientConn) runErr(ctx context.Context) error { - protocol := model.GetConnectRelayNextProto(c.conn) - if protocol == model.ConnectRelayV02 { - if auth, err := c.authenticate(ctx); err != nil { - if perr := pberror.GetError(err); perr != nil { - cerr := c.conn.CloseWithError(quic.ApplicationErrorCode(perr.Code), perr.Message) - err = errors.Join(perr, cerr) - } else { - cerr := c.conn.CloseWithError(quic.ApplicationErrorCode(pberror.Code_AuthenticationFailed), "Error while authenticating") - err = errors.Join(err, cerr) - } - return err + if auth, err := c.authenticate(ctx); err != nil { + if perr := pberror.GetError(err); perr != nil { + cerr := c.conn.CloseWithError(quic.ApplicationErrorCode(perr.Code), perr.Message) + err = errors.Join(perr, cerr) } else { - c.auth = auth - c.logger = c.logger.With("endpoint", auth.endpoint, "role", auth.role, "key", auth.key) + cerr := c.conn.CloseWithError(quic.ApplicationErrorCode(pberror.Code_AuthenticationFailed), "Error while authenticating") + err = errors.Join(err, cerr) } + return err } else { - serverName := c.conn.ConnectionState().TLS.ServerName - certs := c.conn.ConnectionState().TLS.PeerCertificates - if auth := c.server.auth(serverName, certs); auth == nil { - return c.conn.CloseWithError(quic.ApplicationErrorCode(pberror.Code_AuthenticationFailed), "authentication missing") - } else { - c.auth = auth - c.logger = c.logger.With("endpoint", auth.endpoint, "role", auth.role, "key", auth.key) - } - - if err := c.check(ctx); err != nil { - return err - } + c.auth = auth + c.logger = c.logger.With("endpoint", auth.endpoint, "role", auth.role, "key", auth.key) } switch c.auth.role { @@ -361,26 +334,6 @@ func (c *clientConn) authenticate(ctx context.Context) (*clientAuth, error) { return auth, nil } -func (c *clientConn) check(ctx context.Context) error { - stream, err := c.conn.AcceptStream(ctx) - if err != nil { - return fmt.Errorf("accept client stream: %w", err) - } - defer func() { - if err := stream.Close(); err != nil { - slogc.Fine(c.logger, "error closing check client stream", "err", err) - } - }() - - if _, err := pbconnect.ReadRequest(stream); err != nil { - return fmt.Errorf("read client stream: %w", err) - } else if err := proto.Write(stream, &pbconnect.Response{}); err != nil { - return fmt.Errorf("write client stream: %w", err) - } - - return nil -} - func (c *clientConn) runDestination(ctx context.Context) error { fcs := c.server.addDestination(c) defer c.server.removeDestination(fcs, c) diff --git a/server/relay/control.go b/server/relay/control.go index 6a6c4a5..40c5107 100644 --- a/server/relay/control.go +++ b/server/relay/control.go @@ -9,7 +9,6 @@ import ( "fmt" "log/slog" "net" - "sync" "sync/atomic" "time" @@ -17,7 +16,6 @@ import ( "github.com/connet-dev/connet/pkg/certc" "github.com/connet-dev/connet/pkg/iterc" "github.com/connet-dev/connet/pkg/logc" - "github.com/connet-dev/connet/pkg/netc" "github.com/connet-dev/connet/pkg/quicc" "github.com/connet-dev/connet/pkg/reliable" "github.com/connet-dev/connet/pkg/slogc" @@ -26,7 +24,6 @@ import ( "github.com/connet-dev/connet/proto/pbclientrelay" "github.com/connet-dev/connet/proto/pberror" "github.com/connet-dev/connet/proto/pbrelay" - "github.com/klev-dev/klevdb" "github.com/quic-go/quic-go" "golang.org/x/crypto/nacl/box" protobuf "google.golang.org/protobuf/proto" @@ -34,7 +31,6 @@ import ( type controlClient struct { hostports []model.HostPort - root *certc.Cert direct *certc.Cert metadata string @@ -43,16 +39,7 @@ type controlClient struct { controlTLSConf *tls.Config handshakeIdleTimeout time.Duration - config logc.KV[ConfigKey, ConfigValue] - clients logc.KV[ClientKey, ClientValue] - servers logc.KV[ServerKey, ServerValue] - - serverByNameOffset int64 - serverByName map[string]*relayServer - serverByNameMu sync.RWMutex - - clientsStreamOffset int64 - clientsLogOffset int64 + config logc.KV[ConfigKey, ConfigValue] authUnsealKey atomic.Pointer[[32]byte] @@ -60,47 +47,13 @@ type controlClient struct { logger *slog.Logger } -func newControlClient(cfg Config, root *certc.Cert, direct *certc.Cert, configStore logc.KV[ConfigKey, ConfigValue]) (*controlClient, error) { - clients, err := cfg.Stores.Clients() - if err != nil { - return nil, err - } - servers, err := cfg.Stores.Servers() - if err != nil { - return nil, err - } - - msgs, serversOffset, err := servers.Snapshot() - if err != nil { - return nil, err - } - - serverByName := map[string]*relayServer{} - for _, msg := range msgs { - srv, err := newRelayServer(msg) - if err != nil { - return nil, err - } - serverByName[srv.name] = srv - } - - clientsStreamOffset, err := configStore.GetOrDefault(configClientsStreamOffset, ConfigValue{Int64: logc.OffsetOldest}) - if err != nil { - return nil, err - } - - clientsLogOffset, err := configStore.GetOrDefault(configClientsLogOffset, ConfigValue{Int64: logc.OffsetOldest}) - if err != nil { - return nil, err - } - +func newControlClient(cfg Config, direct *certc.Cert, configStore logc.KV[ConfigKey, ConfigValue]) (*controlClient, error) { hostports := iterc.FlattenSlice(iterc.MapSlice(cfg.Ingress, func(in Ingress) []model.HostPort { return in.Hostports })) c := &controlClient{ hostports: hostports, - root: root, direct: direct, metadata: cfg.Metadata, @@ -109,19 +62,11 @@ func newControlClient(cfg Config, root *certc.Cert, direct *certc.Cert, configSt controlTLSConf: &tls.Config{ ServerName: cfg.ControlHost, RootCAs: cfg.ControlCAs, - NextProtos: iterc.MapVarStrings(model.RelayControlV03, model.RelayControlV02), + NextProtos: iterc.MapVarStrings(model.RelayControlV03), }, handshakeIdleTimeout: cfg.HandshakeIdleTimeout, - config: configStore, - clients: clients, - servers: servers, - - serverByNameOffset: serversOffset, - serverByName: serverByName, - - clientsStreamOffset: clientsStreamOffset.Int64, - clientsLogOffset: clientsLogOffset.Int64, + config: configStore, logger: cfg.Logger.With("client", "relay-control"), } @@ -129,56 +74,6 @@ func newControlClient(cfg Config, root *certc.Cert, direct *certc.Cert, configSt return c, nil } -func (s *controlClient) getClientsStreamOffset() int64 { - return s.clientsStreamOffset -} - -func (s *controlClient) setClientsStreamOffset(v int64) error { - if err := s.config.Put(configClientsStreamOffset, ConfigValue{Int64: v}); err != nil { - return err - } - s.clientsStreamOffset = v - return nil -} - -func (s *controlClient) getClientsLogOffset() int64 { - return s.clientsLogOffset -} - -func (s *controlClient) setClientsLogOffset(v int64) error { - if err := s.config.Put(configClientsLogOffset, ConfigValue{Int64: v}); err != nil { - return err - } - s.clientsLogOffset = v - return nil -} - -func (s *controlClient) getServer(name string) *relayServer { - s.serverByNameMu.RLock() - defer s.serverByNameMu.RUnlock() - - return s.serverByName[name] -} - -func (s *controlClient) tlsAuthenticate(chi *tls.ClientHelloInfo) (*tls.Config, error) { - if srv := s.getServer(chi.ServerName); srv != nil { - return &tls.Config{ - ClientAuth: tls.RequireAndVerifyClientCert, - Certificates: srv.tls, - ClientCAs: srv.cas.Load(), - NextProtos: iterc.MapVarStrings(model.ConnectRelayV01), - }, nil - } - return nil, nil -} - -func (s *controlClient) v1Auth(serverName string, certs []*x509.Certificate) *clientAuth { - if srv := s.getServer(serverName); srv != nil { - return srv.authenticate(certs) - } - return nil -} - func (s *controlClient) v2Auth(authReq *pbclientrelay.AuthenticateReq, cert *x509.Certificate) (*clientAuth, error) { authUnsealKey := s.authUnsealKey.Load() if authUnsealKey == nil { @@ -208,8 +103,6 @@ func (s *controlClient) run(ctx context.Context, tfn TransportsFn) error { return reliable.RunGroup(ctx, reliable.Bind(tfn, s.runControl), logc.ScheduleCompact(s.config), - logc.ScheduleCompact(s.clients), - logc.ScheduleCompact(s.servers), ) } @@ -280,13 +173,7 @@ func (s *controlClient) connectSingle(ctx context.Context, transport *quic.Trans } }() - switch model.GetRelayControlNextProto(conn) { - case model.RelayControlV02: - err = s.authenticateV2(authStream, reconnConfig) - default: - err = s.authenticate(authStream, reconnConfig) - } - if err != nil { + if err := s.authenticate(authStream, reconnConfig); err != nil { perr := pberror.GetError(err) if perr == nil { perr = pberror.NewError(pberror.Code_AuthenticationFailed, "authentication failed") @@ -298,45 +185,6 @@ func (s *controlClient) connectSingle(ctx context.Context, transport *quic.Trans return conn, nil } -func (s *controlClient) authenticateV2(authStream *quic.Stream, reconnConfig ConfigValue) error { - if err := proto.Write(authStream, &pbrelay.AuthenticateReq{ - Token: s.controlToken, - Addresses: model.PBsFromHostPorts(s.hostports), - ReconnectToken: reconnConfig.Bytes, - BuildVersion: model.BuildVersion(), - Metadata: s.metadata, - }); err != nil { - return fmt.Errorf("auth write error: %w", err) - } - - resp := &pbrelay.AuthenticateResp{} - if err := proto.Read(authStream, resp); err != nil { - return fmt.Errorf("auth read error: %w", err) - } - if resp.Error != nil { - return fmt.Errorf("remote error: %w", resp.Error) - } - - controlIDConfig, err := s.config.GetOrDefault(configControlID, ConfigValue{}) - if err != nil { - return fmt.Errorf("server control id get: %w", err) - } - if controlIDConfig.String != "" && controlIDConfig.String != resp.ControlId { - return fmt.Errorf("unexpected server id, has: %s, resp: %s", controlIDConfig.String, resp.ControlId) - } - controlIDConfig.String = resp.ControlId - if err := s.config.Put(configControlID, controlIDConfig); err != nil { - return fmt.Errorf("server control id set: %w", err) - } - - reconnConfig.Bytes = resp.ReconnectToken - if err := s.config.Put(configControlReconnect, reconnConfig); err != nil { - return fmt.Errorf("server reconnect set: %w", err) - } - - return nil -} - func (s *controlClient) authenticate(authStream *quic.Stream, reconnConfig ConfigValue) error { relayPk, relaySk, err := box.GenerateKey(rand.Reader) if err != nil { @@ -425,322 +273,5 @@ func (s *controlClient) runConnection(ctx context.Context, conn *quic.Conn) erro s.connStatus.Store(statusc.Connected) defer s.connStatus.Store(statusc.Reconnecting) - return reliable.RunGroup(ctx, - reliable.Bind(conn, s.runClientsStream), - s.runClientsLog, - s.runServersLog, - reliable.Bind(conn, s.runServersStream), - func(ctx context.Context) error { return quicc.WaitLogRTTStats(ctx, conn, s.logger) }, // TODO v0.14.0 exchange auth - ) -} - -func (s *controlClient) runClientsStream(ctx context.Context, conn *quic.Conn) error { - stream, err := conn.OpenStreamSync(ctx) - if err != nil { - return err - } - defer func() { - if err := stream.Close(); err != nil { - slogc.Fine(s.logger, "error closing clients stream", "err", err) - } - }() - - g := reliable.NewGroup(ctx) - g.Go(quicc.CancelStream(stream)) - - g.Go(func(ctx context.Context) error { - for { - req := &pbrelay.ClientsReq{ - Offset: s.getClientsStreamOffset(), - } - if err := proto.Write(stream, req); err != nil { - return err - } - - resp := &pbrelay.ClientsResp{} - if err := proto.Read(stream, resp); err != nil { - return err - } - - for _, change := range resp.Changes { - key := ClientKey{ - Endpoint: model.EndpointFromPB(change.Endpoint), - Role: model.RoleFromPB(change.Role), - Key: model.NewKeyString(change.CertificateKey), - } - - switch change.Change { - case pbrelay.ChangeType_ChangePut: - cert, err := x509.ParseCertificate(change.Certificate) - if err != nil { - return err - } - if err := s.clients.Put(key, ClientValue{cert}); err != nil { - return err - } - case pbrelay.ChangeType_ChangeDel: - if err := s.clients.Del(key); err != nil { - return err - } - default: - return fmt.Errorf("unknown change: %v", change.Change) - } - } - - if err := s.setClientsStreamOffset(resp.Offset); err != nil { - return err - } - } - }) - - return g.Wait() -} - -func (s *controlClient) runClientsLog(ctx context.Context) error { - for { - msgs, nextOffset, err := s.clients.Consume(ctx, s.getClientsLogOffset()) - if err != nil { - return err - } - - for _, msg := range msgs { - srvKey := ServerKey{msg.Key.Endpoint} - clKey := serverClientKey{msg.Key.Role, msg.Key.Key} - sv, err := s.servers.Get(srvKey) - - switch { - case errors.Is(err, klevdb.ErrNotFound): - serverName := netc.GenDomainName("connet.control.relay") - serverRoot, err := s.root.NewServer(certc.CertOpts{ - Domains: []string{serverName}, - }) - if err != nil { - return err - } - sv = ServerValue{Name: serverName, Cert: serverRoot} - case err != nil: - return err - } - - if msg.Delete { - delete(sv.Clients, clKey) - } else { - if sv.Clients == nil { - sv.Clients = map[serverClientKey]ClientValue{} - } - sv.Clients[clKey] = msg.Value - } - - if len(sv.Clients) == 0 { - if err := s.servers.Del(srvKey); err != nil { - return err - } - } else { - if err := s.servers.Put(srvKey, sv); err != nil { - return err - } - } - } - - if err := s.setClientsLogOffset(nextOffset); err != nil { - return err - } - } -} - -func (s *controlClient) runServersStream(ctx context.Context, conn *quic.Conn) error { - stream, err := conn.AcceptStream(ctx) - if err != nil { - return err - } - defer func() { - if err := stream.Close(); err != nil { - slogc.Fine(s.logger, "error closing servers stream", "err", err) - } - }() - - g := reliable.NewGroup(ctx) - g.Go(quicc.CancelStream(stream)) - - g.Go(func(ctx context.Context) error { - for { - req := &pbrelay.ServersReq{} - if err := proto.Read(stream, req); err != nil { - return err - } - - var msgs []logc.Message[ServerKey, ServerValue] - var nextOffset int64 - if req.Offset == logc.OffsetOldest { - msgs, nextOffset, err = s.servers.Snapshot() - s.logger.Debug("sending initial control changes", "offset", nextOffset, "changes", len(msgs)) - } else { - msgs, nextOffset, err = s.servers.Consume(ctx, req.Offset) - s.logger.Debug("sending delta control changes", "offset", nextOffset, "changes", len(msgs)) - } - if err != nil { - return err - } - - resp := &pbrelay.ServersResp{Offset: nextOffset} - - for _, msg := range msgs { - var change = &pbrelay.ServersResp_Change{ - Endpoint: msg.Key.Endpoint.PB(), - } - if msg.Delete { - change.Change = pbrelay.ChangeType_ChangeDel - } else { - change.ServerCertificate = msg.Value.Cert.Raw() - change.Change = pbrelay.ChangeType_ChangePut - } - resp.Changes = append(resp.Changes, change) - } - - if err := proto.Write(stream, resp); err != nil { - return err - } - } - }) - - return g.Wait() -} - -func (s *controlClient) runServersLog(ctx context.Context) error { - upsert := func(msg logc.Message[ServerKey, ServerValue]) error { - serverName := msg.Value.Name - - s.serverByNameMu.RLock() - srv := s.serverByName[serverName] - s.serverByNameMu.RUnlock() - - if srv != nil { - return srv.update(msg) - } - - s.serverByNameMu.Lock() - defer s.serverByNameMu.Unlock() - - srv = s.serverByName[serverName] - if srv != nil { - return srv.update(msg) - } - - srv, err := newRelayServer(msg) - if err != nil { - return err - } - s.serverByName[serverName] = srv - return nil - } - - drop := func(msg logc.Message[ServerKey, ServerValue]) error { - s.serverByNameMu.Lock() - defer s.serverByNameMu.Unlock() - - delete(s.serverByName, msg.Value.Name) - - return nil - } - - for { - msgs, nextOffset, err := s.servers.Consume(ctx, s.serverByNameOffset) - if err != nil { - return err - } - - for _, msg := range msgs { - if msg.Delete { - if err := drop(msg); err != nil { - return err - } - } else { - if err := upsert(msg); err != nil { - return err - } - } - } - - s.serverByNameOffset = nextOffset - } -} - -type relayServer struct { - endpoint model.Endpoint - name string - - tls []tls.Certificate - cas atomic.Pointer[x509.CertPool] - - clients map[serverClientKey]*x509.Certificate - mu sync.RWMutex -} - -func newRelayServer(msg logc.Message[ServerKey, ServerValue]) (*relayServer, error) { - srvCert, err := msg.Value.Cert.TLSCert() - if err != nil { - return nil, err - } - - srv := &relayServer{ - endpoint: msg.Key.Endpoint, - name: srvCert.Leaf.DNSNames[0], - - tls: []tls.Certificate{srvCert}, - - clients: map[serverClientKey]*x509.Certificate{}, - } - - cas := x509.NewCertPool() - for k, v := range msg.Value.Clients { - srv.clients[k] = v.Cert - cas.AddCert(v.Cert) - } - srv.cas.Store(cas) - - return srv, nil -} - -func (s *relayServer) update(msg logc.Message[ServerKey, ServerValue]) error { - s.mu.Lock() - defer s.mu.Unlock() - - seenSet := map[serverClientKey]struct{}{} - cas := x509.NewCertPool() - for k, v := range msg.Value.Clients { - if clientCert, ok := s.clients[k]; ok { - cas.AddCert(clientCert) - } else { - s.clients[k] = v.Cert - cas.AddCert(v.Cert) - } - - seenSet[k] = struct{}{} - } - s.cas.Store(cas) - - for k := range s.clients { - if _, seen := seenSet[k]; !seen { - delete(s.clients, k) - } - } - - return nil -} - -func (s *relayServer) authenticate(certs []*x509.Certificate) *clientAuth { - cert := certs[0] - key := model.NewKey(cert) - - s.mu.RLock() - defer s.mu.RUnlock() - - if dst, ok := s.clients[serverClientKey{model.Destination, key}]; ok && dst.Equal(cert) { - return &clientAuth{s.endpoint, model.Destination, key, model.ConnectRelayV01, ""} - } - if src, ok := s.clients[serverClientKey{model.Source, key}]; ok && src.Equal(cert) { - return &clientAuth{s.endpoint, model.Source, key, model.ConnectRelayV01, ""} - } - - return nil + return quicc.WaitLogRTTStats(ctx, conn, s.logger) // TODO v0.14.0 exchange auth } diff --git a/server/relay/server.go b/server/relay/server.go index add3d21..1098534 100644 --- a/server/relay/server.go +++ b/server/relay/server.go @@ -72,12 +72,12 @@ func NewServer(cfg Config) (*Server, error) { return nil, fmt.Errorf("generate direct relay cert: %w", err) } - control, err := newControlClient(cfg, rootCert, directCert, configStore) + control, err := newControlClient(cfg, directCert, configStore) if err != nil { return nil, fmt.Errorf("relay control client: %w", err) } - clients, err := newClientsServer(cfg, control.tlsAuthenticate, control.v1Auth, control.v2Auth, directCert) + clients, err := newClientsServer(cfg, control.v2Auth, directCert) if err != nil { return nil, fmt.Errorf("relay clients server: %w", err) } diff --git a/server/relay/store.go b/server/relay/store.go index 47284f3..e0ed1bc 100644 --- a/server/relay/store.go +++ b/server/relay/store.go @@ -39,11 +39,9 @@ func (f *fileStores) Servers() (logc.KV[ServerKey, ServerValue], error) { type ConfigKey string var ( - configStatelessReset ConfigKey = "stateless-reset" - configControlID ConfigKey = "control-id" - configControlReconnect ConfigKey = "control-reconnect" - configClientsStreamOffset ConfigKey = "clients-stream-offset" - configClientsLogOffset ConfigKey = "clients-log-offset" + configStatelessReset ConfigKey = "stateless-reset" + configControlID ConfigKey = "control-id" + configControlReconnect ConfigKey = "control-reconnect" ) type ConfigValue struct { From 1098837655889ef4b9129733c32bf0b2894526a9 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Tue, 13 Jan 2026 22:43:55 -0500 Subject: [PATCH 2/2] cleanup --- server/control/relays.go | 4 ---- server/relay/clients.go | 32 +++++++++++++++++--------------- server/relay/control.go | 2 +- server/relay/server.go | 2 +- 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/server/control/relays.go b/server/control/relays.go index 84c58f1..0e6fb1e 100644 --- a/server/control/relays.go +++ b/server/control/relays.go @@ -417,10 +417,6 @@ func (c *relayConn) authenticate(ctx context.Context) (*relayConnAuth, error) { return nil, fmt.Errorf("auth read request: %w", err) } - return c.authenticateV3(authStream, req) -} - -func (c *relayConn) authenticateV3(authStream *quic.Stream, req *pbrelay.AuthenticateReq) (*relayConnAuth, error) { protocol := model.RelayControlV03 cert, err := x509.ParseCertificate(req.ServerCertificate) if err != nil { diff --git a/server/relay/clients.go b/server/relay/clients.go index 36db9a3..edb31e2 100644 --- a/server/relay/clients.go +++ b/server/relay/clients.go @@ -35,10 +35,22 @@ type clientAuth struct { metadata string } -type directAuthenticator func(req *pbclientrelay.AuthenticateReq, cert *x509.Certificate) (*clientAuth, error) +type ClientAuthenticator interface { + Authenticate(req *pbclientrelay.AuthenticateReq, cert *x509.Certificate) (*clientAuth, error) +} + +type clientsServer struct { + tlsConf *tls.Config + auth ClientAuthenticator + + endpoints map[model.Endpoint]*endpointClients + endpointsMu sync.RWMutex + + logger *slog.Logger +} -func newClientsServer(cfg Config, directAuth directAuthenticator, directCert *certc.Cert) (*clientsServer, error) { - directTLS, err := directCert.TLSCert() +func newClientsServer(cfg Config, cert *certc.Cert, auth ClientAuthenticator) (*clientsServer, error) { + directTLS, err := cert.TLSCert() if err != nil { return nil, fmt.Errorf("direct TLS cert: %w", err) } @@ -51,7 +63,7 @@ func newClientsServer(cfg Config, directAuth directAuthenticator, directCert *ce return &clientsServer{ tlsConf: directTLSConf, - direct: directAuth, + auth: auth, endpoints: map[model.Endpoint]*endpointClients{}, @@ -59,16 +71,6 @@ func newClientsServer(cfg Config, directAuth directAuthenticator, directCert *ce }, nil } -type clientsServer struct { - tlsConf *tls.Config - direct directAuthenticator - - endpoints map[model.Endpoint]*endpointClients - endpointsMu sync.RWMutex - - logger *slog.Logger -} - type endpointClients struct { endpoint model.Endpoint destinations map[model.Key]*clientConn @@ -314,7 +316,7 @@ func (c *clientConn) authenticate(ctx context.Context) (*clientAuth, error) { return nil, fmt.Errorf("client auth read: %w", err) } - auth, err := c.server.direct(req, c.conn.ConnectionState().TLS.PeerCertificates[0]) + auth, err := c.server.auth.Authenticate(req, c.conn.ConnectionState().TLS.PeerCertificates[0]) if err != nil { perr := pberror.GetError(err) if perr == nil { diff --git a/server/relay/control.go b/server/relay/control.go index 40c5107..b9ba007 100644 --- a/server/relay/control.go +++ b/server/relay/control.go @@ -74,7 +74,7 @@ func newControlClient(cfg Config, direct *certc.Cert, configStore logc.KV[Config return c, nil } -func (s *controlClient) v2Auth(authReq *pbclientrelay.AuthenticateReq, cert *x509.Certificate) (*clientAuth, error) { +func (s *controlClient) Authenticate(authReq *pbclientrelay.AuthenticateReq, cert *x509.Certificate) (*clientAuth, error) { authUnsealKey := s.authUnsealKey.Load() if authUnsealKey == nil { return nil, fmt.Errorf("no control verification key") diff --git a/server/relay/server.go b/server/relay/server.go index 1098534..ab63713 100644 --- a/server/relay/server.go +++ b/server/relay/server.go @@ -77,7 +77,7 @@ func NewServer(cfg Config) (*Server, error) { return nil, fmt.Errorf("relay control client: %w", err) } - clients, err := newClientsServer(cfg, control.v2Auth, directCert) + clients, err := newClientsServer(cfg, directCert, control) if err != nil { return nil, fmt.Errorf("relay clients server: %w", err) }