From 40c484ed3cfd232c60e39af58419c144ac6644ab Mon Sep 17 00:00:00 2001 From: Dan Pramann Date: Thu, 5 Dec 2024 10:00:04 -0600 Subject: [PATCH 1/5] Temporary changes so tests work with Consul version expected --- .../to-consul/consul_node_services_client_ent_test.go | 2 +- .../to-consul/consul_node_services_client_test.go | 2 +- control-plane/catalog/to-consul/syncer_ent_test.go | 6 +++--- control-plane/catalog/to-consul/syncer_test.go | 9 +++++---- control-plane/catalog/to-consul/testing.go | 6 ++++++ 5 files changed, 16 insertions(+), 9 deletions(-) diff --git a/control-plane/catalog/to-consul/consul_node_services_client_ent_test.go b/control-plane/catalog/to-consul/consul_node_services_client_ent_test.go index ac570948f5..7c1b6ae253 100644 --- a/control-plane/catalog/to-consul/consul_node_services_client_ent_test.go +++ b/control-plane/catalog/to-consul/consul_node_services_client_ent_test.go @@ -327,7 +327,7 @@ func TestNamespacesNodeServicesClient_NodeServices(t *testing.T) { } t.Run(name, func(tt *testing.T) { require := require.New(tt) - svr, err := testutil.NewTestServerConfigT(tt, nil) + svr, err := testutil.NewTestServerConfigT(tt, InvocaSpecificConsulConfig) require.NoError(err) defer svr.Stop() diff --git a/control-plane/catalog/to-consul/consul_node_services_client_test.go b/control-plane/catalog/to-consul/consul_node_services_client_test.go index 83354e640a..ebbab0da3b 100644 --- a/control-plane/catalog/to-consul/consul_node_services_client_test.go +++ b/control-plane/catalog/to-consul/consul_node_services_client_test.go @@ -157,7 +157,7 @@ func TestPreNamespacesNodeServicesClient_NodeServices(t *testing.T) { for name, c := range cases { t.Run(name, func(tt *testing.T) { require := require.New(tt) - svr, err := testutil.NewTestServerConfigT(tt, nil) + svr, err := testutil.NewTestServerConfigT(tt, InvocaSpecificConsulConfig) require.NoError(err) defer svr.Stop() diff --git a/control-plane/catalog/to-consul/syncer_ent_test.go b/control-plane/catalog/to-consul/syncer_ent_test.go index 2cc206f908..00ee12050c 100644 --- a/control-plane/catalog/to-consul/syncer_ent_test.go +++ b/control-plane/catalog/to-consul/syncer_ent_test.go @@ -15,7 +15,7 @@ import ( func TestConsulSyncer_ConsulNamespaces(t *testing.T) { t.Parallel() - a, err := testutil.NewTestServerConfigT(t, nil) + a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig) require.NoError(t, err) defer a.Stop() @@ -66,7 +66,7 @@ func TestConsulSyncer_ConsulNamespaces(t *testing.T) { func TestConsulSyncer_ReapConsulNamespace(t *testing.T) { t.Parallel() - a, err := testutil.NewTestServerConfigT(t, nil) + a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig) require.NoError(t, err) defer a.Stop() @@ -135,7 +135,7 @@ func TestConsulSyncer_ReapConsulNamespace(t *testing.T) { func TestConsulSyncer_reapServiceInstanceNamespacesEnabled(t *testing.T) { t.Parallel() - a, err := testutil.NewTestServerConfigT(t, nil) + a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig) require.NoError(t, err) defer a.Stop() diff --git a/control-plane/catalog/to-consul/syncer_test.go b/control-plane/catalog/to-consul/syncer_test.go index f42f6fee46..d7751e03a5 100644 --- a/control-plane/catalog/to-consul/syncer_test.go +++ b/control-plane/catalog/to-consul/syncer_test.go @@ -26,7 +26,8 @@ func TestConsulSyncer_register(t *testing.T) { require := require.New(t) // Set up server, client, syncer - a, err := testutil.NewTestServerConfigT(t, nil) + + a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig) require.NoError(err) defer a.Stop() @@ -72,7 +73,7 @@ func TestConsulSyncer_reapServiceInstance(t *testing.T) { require := require.New(t) // Set up server, client, syncer - a, err := testutil.NewTestServerConfigT(t, nil) + a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig) require.NoError(err) defer a.Stop() @@ -137,7 +138,7 @@ func TestConsulSyncer_reapService(t *testing.T) { for _, k8sNS := range sourceK8sNamespaceAnnotations { t.Run(k8sNS, func(tt *testing.T) { // Set up server, client, syncer - a, err := testutil.NewTestServerConfigT(tt, nil) + a, err := testutil.NewTestServerConfigT(tt, InvocaSpecificConsulConfig) require.NoError(tt, err) defer a.Stop() @@ -185,7 +186,7 @@ func TestConsulSyncer_reapService(t *testing.T) { func TestConsulSyncer_noReapingUntilInitialSync(t *testing.T) { t.Parallel() - a, err := testutil.NewTestServerConfigT(t, nil) + a, err := testutil.NewTestServerConfigT(t, InvocaSpecificConsulConfig) require.NoError(t, err) defer a.Stop() client, err := api.NewClient(&api.Config{ diff --git a/control-plane/catalog/to-consul/testing.go b/control-plane/catalog/to-consul/testing.go index e6541c6ba1..17a6e850c8 100644 --- a/control-plane/catalog/to-consul/testing.go +++ b/control-plane/catalog/to-consul/testing.go @@ -1,6 +1,7 @@ package catalog import ( + "github.com/hashicorp/consul/sdk/testutil" "sync" "github.com/hashicorp/consul/api" @@ -27,3 +28,8 @@ func (s *testSyncer) Sync(rs []*api.CatalogRegistration) { func newTestSyncer() *testSyncer { return &testSyncer{} } + +// callback to remove the peering key from the config generated, avoids "invalid config key peering" +func InvocaSpecificConsulConfig(cfg *testutil.TestServerConfig) { + cfg.Peering = nil +} From 3030a6f4c8e32e9790e33b520dfcfd0c28f63bf3 Mon Sep 17 00:00:00 2001 From: Dan Pramann Date: Thu, 5 Dec 2024 11:56:35 -0600 Subject: [PATCH 2/5] See if we can do an inital sync of all Services, and then unlock reaping --- control-plane/catalog/to-consul/resource.go | 21 ++++++ control-plane/catalog/to-consul/syncer.go | 9 +++ .../catalog/to-consul/syncer_test.go | 4 ++ .../subcommand/sync-catalog/command.go | 69 +++++++++++-------- 4 files changed, 73 insertions(+), 30 deletions(-) diff --git a/control-plane/catalog/to-consul/resource.go b/control-plane/catalog/to-consul/resource.go index 9ae150dc6d..3816fa9333 100644 --- a/control-plane/catalog/to-consul/resource.go +++ b/control-plane/catalog/to-consul/resource.go @@ -66,6 +66,10 @@ type ServiceResource struct { // Ctx is used to cancel processes kicked off by ServiceResource. Ctx context.Context + // waitForServiceSnapshotToBePopulatedCh is the chan used to signal that a snapshot of services has synced + // reaping of services in consul should wait for this to allow best effort population of "view of services" + WaitForInitialServicesToBePopulatedCh chan bool + // AllowK8sNamespacesSet is a set of k8s namespaces to explicitly allow for // syncing. It supports the special character `*` which indicates that // all k8s namespaces are eligible unless explicitly denied. This filter @@ -816,6 +820,23 @@ func (t *ServiceResource) addPrefixAndK8SNamespace(name, namespace string) strin return name } +func (t *ServiceResource) PopulateInitialServices() { + defer close(t.WaitForInitialServicesToBePopulatedCh) + + // Open question, is this okay when there are thousands of Services in the cluster? + allTheServices, err := t.Client.CoreV1().Services("").List(context.Background(), metav1.ListOptions{}) + if err != nil { + // TODO - Is it valid to start reaping if we don't get all the Services initial to populate our view? + t.Log.Warn("Failed to list all services to preform initial sync", "err", err) + return + } + + for _, service := range allTheServices.Items { + key := fmt.Sprintf("%s/%s", service.ObjectMeta.Namespace, service.ObjectMeta.Name) + t.Upsert(key, &service) + } +} + // consulHealthCheckID deterministically generates a health check ID based on service ID and Kubernetes namespace. func consulHealthCheckID(k8sNS string, serviceID string) string { return fmt.Sprintf("%s/%s", k8sNS, serviceID) diff --git a/control-plane/catalog/to-consul/syncer.go b/control-plane/catalog/to-consul/syncer.go index 2e2edad61a..72929d2e8f 100644 --- a/control-plane/catalog/to-consul/syncer.go +++ b/control-plane/catalog/to-consul/syncer.go @@ -76,6 +76,10 @@ type ConsulSyncer struct { lock sync.Mutex once sync.Once + // WaitForServiceSnapshotToBePopulatedCh is the chan used to wait for a snapshot of services to be + // populated at startup before to starting to reap services in consul that are no longer needed + WaitForServiceSnapshotToBePopulatedCh chan bool + // initialSync is used to ensure that we have received our initial list // of services before we start reaping services. When it is closed, // the initial sync is complete. @@ -168,6 +172,11 @@ func (s *ConsulSyncer) watchReapableServices(ctx context.Context) { // because we have no tracked services in our maps yet. <-s.initialSync + s.Log.Info("Wait for services to be populated before checking if there no longer used one in consul") + <-s.WaitForServiceSnapshotToBePopulatedCh + s.Log.Debug("Services have been populated at startup. We can start looking " + + "for no longer needed services in consul") + opts := &api.QueryOptions{ AllowStale: true, WaitIndex: 1, diff --git a/control-plane/catalog/to-consul/syncer_test.go b/control-plane/catalog/to-consul/syncer_test.go index d7751e03a5..ebfdf20dd8 100644 --- a/control-plane/catalog/to-consul/syncer_test.go +++ b/control-plane/catalog/to-consul/syncer_test.go @@ -290,6 +290,9 @@ func testConsulSyncer(client *api.Client) (*ConsulSyncer, func()) { // testConsulSyncerWithConfig starts a consul syncer that can be configured // prior to starting via the configurator method. func testConsulSyncerWithConfig(client *api.Client, configurator func(*ConsulSyncer)) (*ConsulSyncer, func()) { + waitForInitialServicesCh := make(chan bool) + close(waitForInitialServicesCh) + s := &ConsulSyncer{ Client: client, Log: hclog.Default(), @@ -300,6 +303,7 @@ func testConsulSyncerWithConfig(client *api.Client, configurator func(*ConsulSyn ConsulNodeServicesClient: &PreNamespacesNodeServicesClient{ Client: client, }, + WaitForServiceSnapshotToBePopulatedCh: waitForInitialServicesCh, } configurator(s) s.init() diff --git a/control-plane/subcommand/sync-catalog/command.go b/control-plane/subcommand/sync-catalog/command.go index 105ce6619c..0cee6ab38d 100644 --- a/control-plane/subcommand/sync-catalog/command.go +++ b/control-plane/subcommand/sync-catalog/command.go @@ -241,43 +241,52 @@ func (c *Command) Run(args []string) int { Client: c.consulClient, } } + + waitForInitalServicesCh := make(chan bool) + // Build the Consul sync and start it syncer := &catalogtoconsul.ConsulSyncer{ - Client: c.consulClient, - Log: c.logger.Named("to-consul/sink"), - EnableNamespaces: c.flagEnableNamespaces, - CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy, - SyncPeriod: c.flagConsulWritePeriod, - ServicePollPeriod: c.flagConsulWritePeriod * 2, - ConsulK8STag: c.flagConsulK8STag, - ConsulNodeName: c.flagConsulNodeName, - ConsulNodeServicesClient: svcsClient, + Client: c.consulClient, + Log: c.logger.Named("to-consul/sink"), + EnableNamespaces: c.flagEnableNamespaces, + CrossNamespaceACLPolicy: c.flagCrossNamespaceACLPolicy, + SyncPeriod: c.flagConsulWritePeriod, + ServicePollPeriod: c.flagConsulWritePeriod * 2, + ConsulK8STag: c.flagConsulK8STag, + ConsulNodeName: c.flagConsulNodeName, + ConsulNodeServicesClient: svcsClient, + WaitForServiceSnapshotToBePopulatedCh: waitForInitalServicesCh, } go syncer.Run(ctx) + resource := catalogtoconsul.ServiceResource{ + Log: c.logger.Named("to-consul/source"), + Client: c.clientset, + Syncer: syncer, + Ctx: ctx, + AllowK8sNamespacesSet: allowSet, + DenyK8sNamespacesSet: denySet, + ExplicitEnable: !c.flagK8SDefault, + ClusterIPSync: c.flagSyncClusterIPServices, + LoadBalancerEndpointsSync: c.flagSyncLBEndpoints, + NodePortSync: catalogtoconsul.NodePortSyncType(c.flagNodePortSyncType), + ConsulK8STag: c.flagConsulK8STag, + ConsulServicePrefix: c.flagConsulServicePrefix, + AddK8SNamespaceSuffix: c.flagAddK8SNamespaceSuffix, + EnableNamespaces: c.flagEnableNamespaces, + ConsulDestinationNamespace: c.flagConsulDestinationNamespace, + EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, + K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, + ConsulNodeName: c.flagConsulNodeName, + WaitForServiceSnapshotToBePopulatedCh: waitForInitalServicesCh, + } + + resource.PopulateInitialServices() + // Build the controller and start it ctl := &controller.Controller{ - Log: c.logger.Named("to-consul/controller"), - Resource: &catalogtoconsul.ServiceResource{ - Log: c.logger.Named("to-consul/source"), - Client: c.clientset, - Syncer: syncer, - Ctx: ctx, - AllowK8sNamespacesSet: allowSet, - DenyK8sNamespacesSet: denySet, - ExplicitEnable: !c.flagK8SDefault, - ClusterIPSync: c.flagSyncClusterIPServices, - LoadBalancerEndpointsSync: c.flagSyncLBEndpoints, - NodePortSync: catalogtoconsul.NodePortSyncType(c.flagNodePortSyncType), - ConsulK8STag: c.flagConsulK8STag, - ConsulServicePrefix: c.flagConsulServicePrefix, - AddK8SNamespaceSuffix: c.flagAddK8SNamespaceSuffix, - EnableNamespaces: c.flagEnableNamespaces, - ConsulDestinationNamespace: c.flagConsulDestinationNamespace, - EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, - K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, - ConsulNodeName: c.flagConsulNodeName, - }, + Log: c.logger.Named("to-consul/controller"), + Resource: &resource, } toConsulCh = make(chan struct{}) From 73eccec5877aed2f649496e6d6232951af15bbcc Mon Sep 17 00:00:00 2001 From: Dan Pramann Date: Thu, 5 Dec 2024 11:57:49 -0600 Subject: [PATCH 3/5] Needed WaitForInitialServicesToBePopulatedCh on the struct --- control-plane/subcommand/sync-catalog/command.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/control-plane/subcommand/sync-catalog/command.go b/control-plane/subcommand/sync-catalog/command.go index 0cee6ab38d..e262535423 100644 --- a/control-plane/subcommand/sync-catalog/command.go +++ b/control-plane/subcommand/sync-catalog/command.go @@ -278,7 +278,7 @@ func (c *Command) Run(args []string) int { EnableK8SNSMirroring: c.flagEnableK8SNSMirroring, K8SNSMirroringPrefix: c.flagK8SNSMirroringPrefix, ConsulNodeName: c.flagConsulNodeName, - WaitForServiceSnapshotToBePopulatedCh: waitForInitalServicesCh, + WaitForInitialServicesToBePopulatedCh: waitForInitalServicesCh, } resource.PopulateInitialServices() From f1347cb830ca1a5bcaf89e36c0fd5436c9f5a878 Mon Sep 17 00:00:00 2001 From: Dan Pramann Date: Thu, 5 Dec 2024 12:23:39 -0600 Subject: [PATCH 4/5] Additional logging around the blocking of reaping, and when reaping will start --- control-plane/catalog/to-consul/syncer.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/control-plane/catalog/to-consul/syncer.go b/control-plane/catalog/to-consul/syncer.go index 72929d2e8f..3a7299519e 100644 --- a/control-plane/catalog/to-consul/syncer.go +++ b/control-plane/catalog/to-consul/syncer.go @@ -134,7 +134,10 @@ func (s *ConsulSyncer) Sync(rs []*api.CatalogRegistration) { // Signal that the initial sync is complete and our maps have been populated. // We can now safely reap untracked services. - s.initialSyncOnce.Do(func() { close(s.initialSync) }) + s.initialSyncOnce.Do(func() { + s.Log.Debug("[Sync] initial sync happened, reaping of services enabled") + close(s.initialSync) + }) } // Run is the long-running runloop for reconciling the local set of @@ -171,11 +174,12 @@ func (s *ConsulSyncer) watchReapableServices(ctx context.Context) { // populated. If we don't wait, we will reap all services tagged with k8s // because we have no tracked services in our maps yet. <-s.initialSync + s.Log.Debug("[watchReapableServices] initial sync has happened") - s.Log.Info("Wait for services to be populated before checking if there no longer used one in consul") + s.Log.Info("[watchReapableServices] waiting for services to be populated before enabling reaping of services") <-s.WaitForServiceSnapshotToBePopulatedCh - s.Log.Debug("Services have been populated at startup. We can start looking " + - "for no longer needed services in consul") + s.Log.Debug("[watchReapableServices] services have been populated at startup. We can start looking " + + "for no longer needed services in Consul") opts := &api.QueryOptions{ AllowStale: true, From 810e1524397168aea0150430d78de97511d297a2 Mon Sep 17 00:00:00 2001 From: Dan Pramann Date: Fri, 6 Dec 2024 11:11:32 -0600 Subject: [PATCH 5/5] Block watcheService on the same WaitForServiceSnapshotToBePopulated channel for watchService() --- control-plane/catalog/to-consul/syncer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/control-plane/catalog/to-consul/syncer.go b/control-plane/catalog/to-consul/syncer.go index 3a7299519e..8f6534b3eb 100644 --- a/control-plane/catalog/to-consul/syncer.go +++ b/control-plane/catalog/to-consul/syncer.go @@ -263,6 +263,10 @@ func (s *ConsulSyncer) watchService(ctx context.Context, name, namespace string) s.Log.Info("starting service watcher", "service-name", name, "service-consul-namespace", namespace) defer s.Log.Info("stopping service watcher", "service-name", name, "service-consul-namespace", namespace) + s.Log.Debug("[watchService] waiting for services to be populated before enabling watching a service") + <-s.WaitForServiceSnapshotToBePopulatedCh + s.Log.Debug("[watchService] services have been populated at startup, watching") + for { select { // Quit if our context is over