diff --git a/go.mod b/go.mod index 68d5f67..e9f8ebf 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 + k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3 ) require ( @@ -170,7 +171,6 @@ require ( k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/kubectl v0.31.0 // indirect - k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3 // indirect oras.land/oras-go v1.2.5 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/kustomize/api v0.17.2 // indirect diff --git a/internal/cli/app/app.go b/internal/cli/app/app.go index acd84fe..4928d56 100644 --- a/internal/cli/app/app.go +++ b/internal/cli/app/app.go @@ -10,6 +10,7 @@ import ( "github.com/weka/gohomecli/internal/cli/api" "github.com/weka/gohomecli/internal/cli/config" + "github.com/weka/gohomecli/internal/cli/local/remote" "github.com/weka/gohomecli/internal/env" ) @@ -28,7 +29,8 @@ var appCmd = &cobra.Command{ env.InitEnv() - if cmdHasGroup(cmd, api.APIGroup.ID, config.ConfigGroup.ID) { + if cmdHasGroup(cmd, api.APIGroup.ID, config.ConfigGroup.ID, remote.RemoteAccessGroup.ID) { + env.SkipAPIKeyValidation = cmdHasGroup(cmd, remote.RemoteAccessGroup.ID) env.InitConfig(env.SiteName) } diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 78231f6..949b7c1 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -7,6 +7,7 @@ import ( "github.com/weka/gohomecli/internal/cli/app" "github.com/weka/gohomecli/internal/cli/config" "github.com/weka/gohomecli/internal/cli/local" + "github.com/weka/gohomecli/internal/cli/local/remote" "github.com/weka/gohomecli/internal/utils" ) @@ -14,6 +15,7 @@ func init() { api.Cli.InitCobra(app.Cmd()) config.Cli.InitCobra(app.Cmd()) local.Cli.InitCobra(app.Cmd()) + remote.Cli.InitCobra(app.Cmd()) } // Execute adds all child commands to the root command and sets flags appropriately. diff --git a/internal/cli/local/local.go b/internal/cli/local/local.go index f8872de..a0f1c64 100644 --- a/internal/cli/local/local.go +++ b/internal/cli/local/local.go @@ -41,6 +41,7 @@ func init() { setup.Cli.InitCobra(localCmd) upgrade.Cli.InitCobra(localCmd) cleanup.Cli.InitCobra(localCmd) + statusCli := status.CliHook() statusCli.InitCobra(localCmd) }) diff --git a/internal/cli/local/remote/copy_recording.go b/internal/cli/local/remote/copy_recording.go new file mode 100644 index 0000000..f9d91b6 --- /dev/null +++ b/internal/cli/local/remote/copy_recording.go @@ -0,0 +1,203 @@ +package remote + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "regexp" + + "github.com/google/uuid" + "github.com/spf13/cobra" + + "github.com/weka/gohomecli/internal/local/chart" + "github.com/weka/gohomecli/internal/utils" +) + +const outputDirPerms = 0o750 + +var ( + // safeFilenamePattern allows safe characters for recording filenames + safeFilenamePattern = regexp.MustCompile(`^[a-zA-Z0-9_\-:.]+\.cast$`) + + // ErrMissingCopyFilter is returned when no filter is specified for copy-recording command + ErrMissingCopyFilter = errors.New("must specify one of: --recording, --cluster-id, or --all") + + // ErrInvalidFilename is returned when a recording filename contains unsafe characters + ErrInvalidFilename = errors.New("invalid recording filename: must be a .cast file with safe characters") + + // ErrRecordingNotFound is returned when a specific recording cannot be found + ErrRecordingNotFound = errors.New("recording not found") +) + +type copyRecordingOptions struct { + recording string + clusterID string + output string + all bool +} + +func newCopyRecordingCmd() *cobra.Command { + opts := ©RecordingOptions{} + + cmd := &cobra.Command{ + Use: "copy-recording", + Short: "Copy recording files from PVC to local filesystem", + Long: `Copy session recording files from the recordings PVC to a local directory. + +Recordings can be copied individually or in bulk by cluster ID or all at once. + +Examples: + homecli remote-access copy-recording --recording "2024-01-15T10:30:00-abc123.cast" --output /tmp/ + homecli remote-access copy-recording --cluster-id "550e8400-e29b-41d4-a716-446655440000" --output /tmp/ + homecli remote-access copy-recording --all --output /tmp/ +`, + RunE: func(cmd *cobra.Command, _ []string) error { + return copyRecordingRun(cmd, opts) + }, + PreRunE: func(_ *cobra.Command, _ []string) error { + if opts.recording == "" && opts.clusterID == "" && !opts.all { + return ErrMissingCopyFilter + } + + // Validate recording filename to prevent path traversal + if opts.recording != "" && !safeFilenamePattern.MatchString(opts.recording) { + return ErrInvalidFilename + } + + // Validate cluster ID is a valid UUID + if opts.clusterID != "" { + if _, err := uuid.Parse(opts.clusterID); err != nil { + return ErrInvalidClusterID + } + } + + return nil + }, + } + + cmd.Flags().StringVar(&opts.recording, "recording", "", "Specific recording filename to copy") + cmd.Flags().StringVar(&opts.clusterID, "cluster-id", "", "Copy all recordings for a cluster") + cmd.Flags().BoolVar(&opts.all, "all", false, "Copy all recordings") + cmd.Flags().StringVarP(&opts.output, "output", "o", "", "Local destination directory (required)") + _ = cmd.MarkFlagRequired("output") //nolint:errcheck // flag exists + + return cmd +} + +func copyRecordingRun(cmd *cobra.Command, opts *copyRecordingOptions) error { + ctx := cmd.Context() + + // Ensure output directory exists + if err := os.MkdirAll(opts.output, outputDirPerms); err != nil { + return fmt.Errorf("failed to create output directory: %w", err) + } + + // Create exec client + execClient, err := chart.NewK8sExecClient(ctx, recordingsSidecarLabel, recordingsSidecarContainer) + if err != nil { + return fmt.Errorf("failed to create Kubernetes client: %w", err) + } + + // Determine which files to copy + filesToCopy, err := resolveFilesToCopy(ctx, execClient, opts) + if err != nil { + return err + } + + if len(filesToCopy) == 0 { + utils.UserNote("No matching recordings found") + + return nil + } + + utils.UserOutput("Copying %d recording(s) to %s\n", len(filesToCopy), opts.output) + + // Copy each file + copiedCount := 0 + for _, recording := range filesToCopy { + // Construct remote path from ClusterID + Filename + // Filename may contain subdirectories (e.g., "subdir/file.cast") + remotePath := filepath.Join(recordingsPath, recording.Filename) + if recording.ClusterID != "" { + remotePath = filepath.Join(recordingsPath, recording.ClusterID, recording.Filename) + } + + // Use basename for local destination (flatten directory structure) + localFilename := filepath.Base(recording.Filename) + dstPath := filepath.Join(opts.output, localFilename) + + if err := execClient.CopyFromPod(ctx, remotePath, dstPath); err != nil { + utils.UserWarning("Failed to copy %s: %v", localFilename, err) + + continue + } + + utils.UserOutput(" Copied: %s\n", localFilename) + copiedCount++ + } + + utils.UserNote("Successfully copied %d/%d recording(s)", copiedCount, len(filesToCopy)) + + return nil +} + +// resolveFilesToCopy determines which recording files to copy based on options. +func resolveFilesToCopy( + ctx context.Context, + execClient *chart.K8sExecClient, + opts *copyRecordingOptions, +) ([]RecordingInfo, error) { + if opts.recording == "" { + // List by cluster ID or all (empty clusterID = all) + // listRecordings validates clusterID internally + return listRecordings(ctx, execClient, opts.clusterID) + } + + return resolveSpecificRecording(ctx, execClient, opts) +} + +// resolveSpecificRecording finds a specific recording by name. +func resolveSpecificRecording( + ctx context.Context, + execClient *chart.K8sExecClient, + opts *copyRecordingOptions, +) ([]RecordingInfo, error) { + // Note: filename and clusterID validation is done in PreRunE + + if opts.clusterID != "" { + return resolveWithClusterID(opts), nil + } + + return searchForRecording(ctx, execClient, opts.recording) +} + +// resolveWithClusterID returns recording info when cluster ID is explicitly provided. +// Note: clusterID validation is done in PreRunE. +func resolveWithClusterID(opts *copyRecordingOptions) []RecordingInfo { + return []RecordingInfo{{ + Filename: opts.recording, + ClusterID: opts.clusterID, + }} +} + +// searchForRecording searches all recordings to find the one matching the filename. +func searchForRecording( + ctx context.Context, + execClient *chart.K8sExecClient, + recordingName string, +) ([]RecordingInfo, error) { + allRecordings, listErr := listRecordings(ctx, execClient, "") + if listErr != nil { + return nil, fmt.Errorf("failed to list recordings: %w", listErr) + } + + for _, r := range allRecordings { + if r.Filename == recordingName { + return []RecordingInfo{r}, nil + } + } + + return nil, fmt.Errorf("%w: %s", ErrRecordingNotFound, recordingName) +} diff --git a/internal/cli/local/remote/list.go b/internal/cli/local/remote/list.go new file mode 100644 index 0000000..fac7337 --- /dev/null +++ b/internal/cli/local/remote/list.go @@ -0,0 +1,175 @@ +package remote + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/weka/gohomecli/internal/local/chart" + "github.com/weka/gohomecli/internal/utils" +) + +const ( + minutesPerHour = 60 + hoursPerDay = 24 + uuidStringLength = 36 +) + +type ( + listOptions struct { + outputFormat string + } + + // SessionInfo represents information about an active remote session + SessionInfo struct { + SessionID string `json:"sessionId" yaml:"sessionId"` + ClusterID string `json:"clusterId" yaml:"clusterId"` + ClusterName string `json:"clusterName" yaml:"clusterName"` + Duration string `json:"duration" yaml:"duration"` + } +) + +func newListCmd() *cobra.Command { + opts := &listOptions{} + + cmd := &cobra.Command{ + Use: "list", + Short: "List active remote sessions", + Long: `List active remote sessions. + +Examples: + homecli remote-access list # List all active sessions in table format + homecli remote-access list --output json # List sessions in JSON format + homecli remote-access list --output yaml # List sessions in YAML format + `, + PreRunE: func(_ *cobra.Command, _ []string) error { + return validateOutputFormat(opts.outputFormat) + }, + RunE: func(cmd *cobra.Command, _ []string) error { + return listRun(cmd, opts) + }, + } + + cmd.Flags().StringVarP(&opts.outputFormat, "output", "o", "table", "Output format: table, json, yaml") + + return cmd +} + +func listRun(cmd *cobra.Command, opts *listOptions) error { + ctx := cmd.Context() + + k8s, err := chart.NewKubernetesClient() + if err != nil { + return fmt.Errorf("failed to create Kubernetes client: %w", err) + } + + // List pods with app=remote-session label + selector := fmt.Sprintf("%s=%s", remoteAccessLabel, remoteAccessValue) + pods, err := k8s.Clientset.CoreV1().Pods(chart.ReleaseNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: selector, + }) + if err != nil { + return fmt.Errorf("failed to list pods: %w", err) + } + + if len(pods.Items) == 0 { + utils.UserNote("No active remote sessions") + + return nil + } + + // Build session info list + sessions := make([]SessionInfo, 0, len(pods.Items)) + for i := range pods.Items { + pod := &pods.Items[i] + duration := formatDuration(pod.CreationTimestamp.Time) + sessions = append(sessions, SessionInfo{ + SessionID: pod.Labels["session-id"], + ClusterID: pod.Labels["cluster-id"], + ClusterName: pod.Labels["cluster-name"], + Duration: duration, + }) + } + + // Output based on format + switch opts.outputFormat { + case "json": + return outputSessionsAsJSON(sessions) + case "yaml": + return outputSessionsAsYAML(sessions) + default: + return outputSessionsAsTable(sessions) + } +} + +func formatDuration(creationTime time.Time) string { + duration := time.Since(creationTime) + + if duration < time.Minute { + return fmt.Sprintf("%ds", int(duration.Seconds())) + } + if duration < time.Hour { + return fmt.Sprintf("%dm", int(duration.Minutes())) + } + if duration < hoursPerDay*time.Hour { + hours := int(duration.Hours()) + minutes := int(duration.Minutes()) % minutesPerHour + + return fmt.Sprintf("%dh%dm", hours, minutes) + } + days := int(duration.Hours()) / hoursPerDay + hours := int(duration.Hours()) % hoursPerDay + + return fmt.Sprintf("%dd%dh", days, hours) +} + +func outputSessionsAsJSON(sessions []SessionInfo) error { + output, err := json.MarshalIndent(sessions, "", " ") + if err != nil { + return err + } + utils.UserOutputJSON(output) + + return nil +} + +func outputSessionsAsYAML(sessions []SessionInfo) error { + output, err := yaml.Marshal(sessions) + if err != nil { + return err + } + fmt.Println(string(output)) + + return nil +} + +func outputSessionsAsTable(sessions []SessionInfo) error { + headers := []string{"SESSION", "CLUSTER ID", "CLUSTER NAME", "DURATION"} + index := 0 + utils.RenderTableRows(headers, func() []string { + if index < len(sessions) { + s := sessions[index] + index++ + // Truncate cluster ID to 36 chars if longer, for table consistency with invalid pod labels. + clusterIDDisplay := s.ClusterID + if len(clusterIDDisplay) > uuidStringLength { + clusterIDDisplay = clusterIDDisplay[:uuidStringLength] + } + + return []string{ + s.SessionID, + clusterIDDisplay, + s.ClusterName, + s.Duration, + } + } + + return nil + }) + + return nil +} diff --git a/internal/cli/local/remote/list_recordings.go b/internal/cli/local/remote/list_recordings.go new file mode 100644 index 0000000..4bc1a9e --- /dev/null +++ b/internal/cli/local/remote/list_recordings.go @@ -0,0 +1,303 @@ +package remote + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sort" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" + + "github.com/weka/gohomecli/internal/local/chart" + "github.com/weka/gohomecli/internal/utils" +) + +const ( + recordingsSidecarLabel = "app=remote-access-recordings-sidecar" + recordingsSidecarContainer = "sidecar" + recordingsPath = "/recordings" + statOutputParts = 3 // number of parts in stat output: path|size|mtime +) + +// ErrInvalidClusterID is returned when a cluster ID is not a valid UUID +var ErrInvalidClusterID = errors.New("invalid cluster ID: must be a valid UUID") + +type ( + listRecordingsOptions struct { + clusterID string + outputFormat string + } + + // RecordingInfo represents information about a recording file + RecordingInfo struct { + Filename string `json:"filename" yaml:"filename"` + ClusterID string `json:"clusterId,omitempty" yaml:"clusterId,omitempty"` + Size int64 `json:"size" yaml:"size"` + ModTime int64 `json:"modTime" yaml:"modTime"` + } +) + +func newListRecordingsCmd() *cobra.Command { + opts := &listRecordingsOptions{} + + cmd := &cobra.Command{ + Use: "list-recordings", + Short: "List available session recordings", + Long: `List available session recordings stored in the recordings PVC. + +Recordings are stored as asciinema .cast files and can be filtered by cluster ID. + +Examples: + homecli remote-access list-recordings + homecli remote-access list-recordings --cluster-id 550e8400-e29b-41d4-a716-446655440000 + homecli remote-access list-recordings --output json +`, + PreRunE: func(_ *cobra.Command, _ []string) error { + if err := validateOutputFormat(opts.outputFormat); err != nil { + return err + } + + // Validate cluster ID is a valid UUID + if opts.clusterID != "" { + if _, err := uuid.Parse(opts.clusterID); err != nil { + return ErrInvalidClusterID + } + } + + return nil + }, + RunE: func(cmd *cobra.Command, _ []string) error { + return listRecordingsRun(cmd, opts) + }, + } + + cmd.Flags().StringVar(&opts.clusterID, "cluster-id", "", "Filter by cluster ID") + cmd.Flags().StringVarP(&opts.outputFormat, "output", "o", "table", "Output format: table, json, yaml") + + return cmd +} + +func listRecordingsRun(cmd *cobra.Command, opts *listRecordingsOptions) error { + ctx := cmd.Context() + + // Create exec client + execClient, err := chart.NewK8sExecClient(ctx, recordingsSidecarLabel, recordingsSidecarContainer) + if err != nil { + return fmt.Errorf("failed to create Kubernetes client: %w", err) + } + + recordings, err := listRecordings(ctx, execClient, opts.clusterID) + if err != nil { + return err + } + + if len(recordings) == 0 { + utils.UserNote("No recordings found") + + return nil + } + + // Output based on format + switch opts.outputFormat { + case "json": + return outputRecordingsAsJSON(recordings) + case "yaml": + return outputRecordingsAsYAML(recordings) + default: + return outputRecordingsAsTable(recordings) + } +} + +// listRecordings lists recordings, optionally filtered by cluster ID. +func listRecordings(ctx context.Context, client *chart.K8sExecClient, clusterID string) ([]RecordingInfo, error) { + searchPath := recordingsPath + if clusterID != "" { + searchPath = fmt.Sprintf("%s/%s", recordingsPath, clusterID) + } + + // Check if directory exists first + _, err := client.Exec(ctx, "test", "-d", searchPath) + if err != nil { + // Directory doesn't exist - return empty results (not an error) + return nil, nil //nolint:nilerr // test -d exits non-zero if dir doesn't exist; that's not an error for us + } + + // Find all .cast files recursively with stat info + // Output format: /recordings/[clusterID/]filename.cast|size|mtime + output, err := client.Exec(ctx, + "find", searchPath, + "-name", "*.cast", + "-type", "f", + "-exec", "stat", "-c", "%n|%s|%Y", "{}", ";", + ) + if err != nil { + // Directory exists but find failed - this is a real error + return nil, fmt.Errorf("failed to list recordings: %w", err) + } + + recordings := parseStatOutput(output) + + // Sort by modification time descending (latest first) + sort.Slice(recordings, func(i, j int) bool { + return recordings[i].ModTime > recordings[j].ModTime + }) + + return recordings, nil +} + +// parseStatOutput parses the output of stat command into RecordingInfo structs +// Path format: /recordings/filename.cast or /recordings/clusterID/filename.cast +func parseStatOutput(output string) []RecordingInfo { + lines := strings.Split(strings.TrimSpace(output), "\n") + recordings := make([]RecordingInfo, 0, len(lines)) + skipped := 0 + + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" { + continue + } + + info, ok := parseStatLine(line) + if !ok { + skipped++ + + continue + } + + recordings = append(recordings, info) + } + + if skipped > 0 { + logger.Warn(). + Int("skipped", skipped). + Int("parsed", len(recordings)). + Msg("Some stat output entries could not be parsed") + } + + return recordings +} + +// parseStatLine parses a single stat output line into a RecordingInfo. +// Returns false if the line is malformed. +func parseStatLine(line string) (RecordingInfo, bool) { + parts := strings.Split(line, "|") + if len(parts) != statOutputParts { + logger.Debug(). + Str("line", line). + Int("parts", len(parts)). + Int("expected", statOutputParts). + Msg("Skipping stat output entry with wrong number of parts") + + return RecordingInfo{}, false + } + + // Extract filename and cluster ID from path + // /recordings/file.cast -> clusterID="", filename="file.cast" + // /recordings/abc-123/file.cast -> clusterID="abc-123", filename="file.cast" + // /recordings/abc-123/subdir/file.cast -> clusterID="abc-123", filename="subdir/file.cast" + fullPath := parts[0] + relPath := strings.TrimPrefix(fullPath, recordingsPath+"/") + pathParts := strings.Split(relPath, "/") + + var filename, clusterID string + if len(pathParts) == 1 { + filename = pathParts[0] + } else { + clusterID = pathParts[0] + filename = strings.Join(pathParts[1:], "/") // handles nested dirs + } + + size, sizeErr := strconv.ParseInt(parts[1], 10, 64) + mtime, mtimeErr := strconv.ParseInt(parts[2], 10, 64) + if sizeErr != nil || mtimeErr != nil { + logger.Debug(). + Err(sizeErr). + AnErr("mtimeErr", mtimeErr). + Str("line", line). + Msg("Skipping stat output entry with invalid size or mtime") + + return RecordingInfo{}, false + } + + return RecordingInfo{ + Filename: filename, + Size: size, + ModTime: mtime, + ClusterID: clusterID, + }, true +} + +func outputRecordingsAsJSON(recordings []RecordingInfo) error { + output, err := json.MarshalIndent(recordings, "", " ") + if err != nil { + return err + } + utils.UserOutputJSON(output) + + return nil +} + +func outputRecordingsAsYAML(recordings []RecordingInfo) error { + output, err := yaml.Marshal(recordings) + if err != nil { + return err + } + fmt.Println(string(output)) + + return nil +} + +func outputRecordingsAsTable(recordings []RecordingInfo) error { + headers := []string{"FILENAME", "SIZE", "MODIFIED", "CLUSTER ID"} + index := 0 + utils.RenderTableRows(headers, func() []string { + if index < len(recordings) { + r := recordings[index] + index++ + clusterID := r.ClusterID + if clusterID == "" { + clusterID = "-" + } + + return []string{ + r.Filename, + formatSize(r.Size), + formatTime(r.ModTime), + clusterID, + } + } + + return nil + }) + + return nil +} + +// formatSize formats bytes into human-readable format +func formatSize(bytes int64) string { + const unit = 1024 + if bytes < unit { + return fmt.Sprintf("%d B", bytes) + } + div, exp := int64(unit), 0 + for n := bytes / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + + return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp]) +} + +// formatTime formats unix timestamp into human-readable format +func formatTime(timestamp int64) string { + t := time.Unix(timestamp, 0) + + return t.Format("2006-01-02 15:04") +} diff --git a/internal/cli/local/remote/remote.go b/internal/cli/local/remote/remote.go new file mode 100644 index 0000000..69bdade --- /dev/null +++ b/internal/cli/local/remote/remote.go @@ -0,0 +1,64 @@ +// Package remote provides CLI commands for managing remote tmate sessions +package remote + +import ( + "errors" + "fmt" + + "github.com/spf13/cobra" + + "github.com/weka/gohomecli/internal/cli/app/hooks" + "github.com/weka/gohomecli/internal/utils" +) + +var ( + logger = utils.GetLogger("Remote") //nolint:gochecknoglobals // standard pattern + + // RemoteAccessGroup is the command group for remote access commands + RemoteAccessGroup = cobra.Group{ //nolint:gochecknoglobals // cobra pattern + ID: "remote-access", + Title: "Remote Access Commands", + } + + // Cli is the hooks.Cli instance for remote access commands + Cli hooks.Cli //nolint:gochecknoglobals // cobra pattern + + // validOutputFormats defines the allowed output format values + validOutputFormats = []string{"table", "json", "yaml"} //nolint:gochecknoglobals // used by multiple commands + + // ErrInvalidOutputFormat is returned when an invalid output format is specified + ErrInvalidOutputFormat = errors.New("invalid output format") +) + +//nolint:gochecknoinits // cobra CLI pattern +func init() { + Cli.AddHook(func(appCmd *cobra.Command) { + appCmd.AddGroup(&RemoteAccessGroup) + + remoteCmd := &cobra.Command{ + Use: "remote-access", + Short: "Manage remote tmate sessions", + Long: "Start, stop, and manage remote tmate sessions and recordings for cluster debugging", + GroupID: RemoteAccessGroup.ID, + } + + remoteCmd.AddCommand(newStartCmd()) + remoteCmd.AddCommand(newStopCmd()) + remoteCmd.AddCommand(newListCmd()) + remoteCmd.AddCommand(newListRecordingsCmd()) + remoteCmd.AddCommand(newCopyRecordingCmd()) + + appCmd.AddCommand(remoteCmd) + }) +} + +// validateOutputFormat checks if the output format is valid +func validateOutputFormat(format string) error { + for _, valid := range validOutputFormats { + if format == valid { + return nil + } + } + + return fmt.Errorf("%w: %q (valid: table, json, yaml)", ErrInvalidOutputFormat, format) +} diff --git a/internal/cli/local/remote/start.go b/internal/cli/local/remote/start.go new file mode 100644 index 0000000..8cef690 --- /dev/null +++ b/internal/cli/local/remote/start.go @@ -0,0 +1,418 @@ +package remote + +import ( + "crypto/rand" + "encoding/hex" + "errors" + "fmt" + "os" + "regexp" + "strconv" + "strings" + + "github.com/google/uuid" + "github.com/spf13/cobra" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + + "github.com/weka/gohomecli/internal/env" + "github.com/weka/gohomecli/internal/local/chart" + "github.com/weka/gohomecli/internal/utils" +) + +// Kubernetes label value constraints. +const maxLabelValueLength = 63 + +// labelValuePattern matches valid Kubernetes label values. +// Must start and end with alphanumeric, can contain alphanumerics, dashes, underscores, and dots. +var labelValuePattern = regexp.MustCompile(`^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$`) + +var ( + // ErrSSHKeysPathNotDirectory is returned when the ssh-keys-path exists but is not a directory. + ErrSSHKeysPathNotDirectory = errors.New("ssh-keys-path must be a directory") + + // ErrClusterNameTooLong is returned when the cluster name exceeds the Kubernetes label value limit. + ErrClusterNameTooLong = errors.New("cluster-name exceeds maximum length of 63 characters") + + // ErrClusterNameInvalid is returned when the cluster name contains invalid characters for a Kubernetes label. + ErrClusterNameInvalid = errors.New( + "cluster-name must start and end with alphanumeric characters, " + + "and contain only alphanumerics, dashes, underscores, or dots", + ) + + // ErrClusterIDInvalid is returned when the cluster ID is not a valid UUID. + ErrClusterIDInvalid = errors.New("cluster-id must be a valid UUID") + + // ErrTmateServerFlagsIncomplete is returned when some but not all tmate server flags are provided. + ErrTmateServerFlagsIncomplete = errors.New( + "when using custom tmate server, all flags must be provided: " + + "--tmate-server-host, --tmate-server-port, --tmate-server-rsa-fingerprint, " + + "--tmate-server-ed25519-fingerprint, --tmate-server-ecdsa-fingerprint", + ) +) + +const ( + remoteAccessLabel = "app" + remoteAccessValue = "remote-access" + remoteAccessConfigLabel = "app=remote-access-config" + sessionRecordingsPVCLabel = "app=remote-access-recordings" + sessionIDBytes = 3 // 3 bytes = 6 hex chars + sharedSocketPath = "/shared/tmate.socket" +) + +type startOptions struct { + // Tmate server flags (optional overrides - tmate.py has built-in config for known cloud URLs) + tmateServerHost string + tmateServerPort string + tmateServerRSA string + tmateServerEd25519 string + tmateServerECDSA string + clusterID string + clusterName string + sshKeysPath string + cloudURL string + hostName string + terminalCols int + terminalLines int + debug bool +} + +func newStartCmd() *cobra.Command { + opts := &startOptions{} + + cmd := &cobra.Command{ + Use: "start", + Short: "Start a new remote-access tmate session", + Long: `Start a remote-access tmate session for remote cluster connection. +Tmate server config is resolved from cloud URL. Use --tmate-server-* flags for custom servers. + +Examples: + homecli remote-access start --cluster-id "550e8400-..." --cluster-name "prod" --ssh-keys-path "/root/.ssh" # Start a session with cloud URL from config + homecli remote-access start --cluster-id "550e8400-..." --cluster-name "prod" --ssh-keys-path "/root/.ssh" --tmate-server-host "tmate.example.com" --tmate-server-port "22" --tmate-server-rsa-fingerprint "1234567890" --tmate-server-ed25519-fingerprint "1234567890" --tmate-server-ecdsa-fingerprint "1234567890" # Start a session with custom tmate server + `, + PreRunE: func(_ *cobra.Command, _ []string) error { + // Validate SSH keys path exists and is a directory + if err := validateSSHKeysPath(opts.sshKeysPath); err != nil { + return err + } + + // Validate cluster name for Kubernetes label compatibility + if err := validateClusterName(opts.clusterName); err != nil { + return err + } + + // Validate cluster ID is a valid UUID to prevent label injection attacks + if _, err := uuid.Parse(opts.clusterID); err != nil { + return fmt.Errorf("%w: %s", ErrClusterIDInvalid, opts.clusterID) + } + + // Validate tmate server flags: if any are provided, all must be provided + if err := validateTmateServerFlags(opts); err != nil { + return err + } + + return nil + }, + RunE: func(cmd *cobra.Command, _ []string) error { + return startRun(cmd, opts) + }, + } + + // Required flags + cmd.Flags().StringVar(&opts.clusterID, "cluster-id", "", "Cluster GUID (required)") + cmd.Flags(). + StringVar(&opts.clusterName, "cluster-name", "", "Human-readable cluster name, max 63 chars, alphanumeric with dashes/underscores/dots (required)") + cmd.Flags(). + StringVar(&opts.sshKeysPath, "ssh-keys-path", "", "Host path to existing SSH keys directory, mounted as HostPath volume (required)") + _ = cmd.MarkFlagRequired("cluster-id") //nolint:errcheck // flag exists + _ = cmd.MarkFlagRequired("cluster-name") //nolint:errcheck // flag exists + _ = cmd.MarkFlagRequired("ssh-keys-path") //nolint:errcheck // flag exists + + // Optional flags + cmd.Flags().StringVar(&opts.cloudURL, "cloud-url", "", "Cloud Weka Home URL (default: from config)") + cmd.Flags().StringVar(&opts.hostName, "host-name", "", "Override hostname (default: system hostname)") + cmd.Flags().IntVar(&opts.terminalCols, "terminal-cols", 0, "Terminal width (default: 158)") + cmd.Flags().IntVar(&opts.terminalLines, "terminal-lines", 0, "Terminal height (default: 35)") + cmd.Flags().BoolVar(&opts.debug, "debug", false, "Enable debug logging in tmate container (default: false)") + + // Tmate server override flags (optional - tmate.py has built-in config for known cloud URLs) + cmd.Flags().StringVar(&opts.tmateServerHost, "tmate-server-host", "", "Override tmate SSH server hostname") + cmd.Flags().StringVar(&opts.tmateServerPort, "tmate-server-port", "", "Override tmate SSH server port") + cmd.Flags().StringVar(&opts.tmateServerRSA, "tmate-server-rsa-fingerprint", "", "Override RSA fingerprint") + cmd.Flags().StringVar( + &opts.tmateServerEd25519, "tmate-server-ed25519-fingerprint", "", "Override Ed25519 fingerprint", + ) + cmd.Flags().StringVar(&opts.tmateServerECDSA, "tmate-server-ecdsa-fingerprint", "", "Override ECDSA fingerprint") + + return cmd +} + +func startRun(cmd *cobra.Command, opts *startOptions) error { + ctx := cmd.Context() + + // Resolve cloud URL from flag, config, or default + cloudURL := opts.cloudURL + if cloudURL == "" && env.CurrentSiteConfig != nil { + cloudURL = env.CurrentSiteConfig.CloudURL + } + if cloudURL == "" { + cloudURL = env.DefaultCloudURL + utils.UserNote("Cloud URL not configured, using default: %s", cloudURL) + } + + // Get local LWH address from ingress + lwhAddress, err := chart.GetIngressAddress(ctx) + if err != nil { + return fmt.Errorf("failed to get LWH address: %w", err) + } + localLWHURL := "http://" + lwhAddress + + // Build webhook base URLs (tmate.py builds full paths using WEBHOOK_URL_TEMPLATE) + webhookURLs := buildWebhookBaseURLs(cloudURL, localLWHURL) + + // Generate session ID + sessionID := generateShortID() + + // Create Kubernetes client + k8s, err := chart.NewKubernetesClient() + if err != nil { + return fmt.Errorf("failed to create Kubernetes client: %w", err) + } + + // Get recordings PVC name + recordingsPVCName, err := chart.GetPVCName(ctx, sessionRecordingsPVCLabel) + if err != nil { + return fmt.Errorf("failed to find recordings PVC: %w", err) + } + + // Get remote-access image from ConfigMap + remoteAccessImage, err := chart.GetConfigMapData(ctx, remoteAccessConfigLabel, "image") + if err != nil { + return fmt.Errorf("failed to get remote-access image from config: %w", err) + } + + // Create the pod + pod := buildSessionPod(sessionID, webhookURLs, cloudURL, recordingsPVCName, remoteAccessImage, opts) + + logger.Info(). + Str("sessionID", sessionID). + Str("clusterID", opts.clusterID). + Str("clusterName", opts.clusterName). + Msg("Creating remote session pod...") + + createdPod, err := k8s.Clientset.CoreV1().Pods(chart.ReleaseNamespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create session pod: %w", err) + } + + utils.UserOutput("Remote session started successfully\n") + utils.UserOutput(" Session ID: %s\n", sessionID) + + logger.Info().Str("pod", createdPod.Name).Msg("Remote session pod created") + + return nil +} + +func buildWebhookBaseURLs(cloudURL, localAPIURL string) string { + var urls []string + + // Add cloud base URL if configured + if cloudURL != "" { + urls = append(urls, strings.TrimSuffix(cloudURL, "/")) + } + + // Always add local LWH base URL + urls = append(urls, localAPIURL) + + return strings.Join(urls, ",") +} + +// validateSSHKeysPath validates that the SSH keys path exists and is a directory. +func validateSSHKeysPath(path string) error { + info, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("ssh-keys-path does not exist: %s", path) + } + + return fmt.Errorf("failed to access ssh-keys-path: %w", err) + } + + if !info.IsDir() { + return fmt.Errorf("%w: %s", ErrSSHKeysPathNotDirectory, path) + } + + return nil +} + +// validateClusterName validates that the cluster name is valid for use as a Kubernetes label value. +func validateClusterName(name string) error { + if len(name) > maxLabelValueLength { + return fmt.Errorf("%w (got %d characters)", ErrClusterNameTooLong, len(name)) + } + + // Single character names are valid if alphanumeric + if len(name) == 1 { + if !((name[0] >= 'a' && name[0] <= 'z') || + (name[0] >= 'A' && name[0] <= 'Z') || + (name[0] >= '0' && name[0] <= '9')) { + return fmt.Errorf("%w: %q", ErrClusterNameInvalid, name) + } + + return nil + } + + if !labelValuePattern.MatchString(name) { + return fmt.Errorf("%w: %q", ErrClusterNameInvalid, name) + } + + return nil +} + +// validateTmateServerFlags validates that if ONE of these flags is provided, ALL must be provided. +func validateTmateServerFlags(opts *startOptions) error { + flags := map[string]string{ + "--tmate-server-host": opts.tmateServerHost, + "--tmate-server-port": opts.tmateServerPort, + "--tmate-server-rsa-fingerprint": opts.tmateServerRSA, + "--tmate-server-ed25519-fingerprint": opts.tmateServerEd25519, + "--tmate-server-ecdsa-fingerprint": opts.tmateServerECDSA, + } + + var provided, missing []string + for name, value := range flags { + if value != "" { + provided = append(provided, name) + } else { + missing = append(missing, name) + } + } + + // If ONE is provided but not ALL, return error + if len(provided) > 0 && len(missing) > 0 { + return ErrTmateServerFlagsIncomplete + } + + return nil +} + +func generateShortID() string { + b := make([]byte, sessionIDBytes) + _, _ = rand.Read(b) //nolint:errcheck // crypto/rand.Read error indicates serious system issue + + return hex.EncodeToString(b) +} + +func buildTmateEnv(webhookURLs, cloudURL string, opts *startOptions) []corev1.EnvVar { + envVars := []corev1.EnvVar{ + {Name: "CLUSTER_ID", Value: opts.clusterID}, + {Name: "CLUSTER_NAME", Value: opts.clusterName}, + {Name: "SHARED_SOCKET", Value: sharedSocketPath}, + {Name: "CLOUD_URL", Value: cloudURL}, + {Name: "WEBHOOK_URLS", Value: webhookURLs}, + } + + if opts.hostName != "" { + envVars = append(envVars, corev1.EnvVar{Name: "HOST_NAME", Value: opts.hostName}) + } + if opts.terminalCols > 0 { + envVars = append(envVars, corev1.EnvVar{Name: "TERMINAL_COLS", Value: strconv.Itoa(opts.terminalCols)}) + } + if opts.terminalLines > 0 { + envVars = append(envVars, corev1.EnvVar{Name: "TERMINAL_LINES", Value: strconv.Itoa(opts.terminalLines)}) + } + if opts.tmateServerHost != "" { + envVars = append(envVars, corev1.EnvVar{Name: "TMATE_SERVER_HOST", Value: opts.tmateServerHost}) + } + if opts.tmateServerPort != "" { + envVars = append(envVars, corev1.EnvVar{Name: "TMATE_SERVER_PORT", Value: opts.tmateServerPort}) + } + if opts.tmateServerRSA != "" { + envVars = append(envVars, corev1.EnvVar{Name: "TMATE_SERVER_RSA_FINGERPRINT", Value: opts.tmateServerRSA}) + } + if opts.tmateServerEd25519 != "" { + envVars = append( + envVars, corev1.EnvVar{Name: "TMATE_SERVER_ED25519_FINGERPRINT", Value: opts.tmateServerEd25519}, + ) + } + if opts.tmateServerECDSA != "" { + envVars = append(envVars, corev1.EnvVar{Name: "TMATE_SERVER_ECDSA_FINGERPRINT", Value: opts.tmateServerECDSA}) + } + if opts.debug { + envVars = append(envVars, corev1.EnvVar{Name: "DEBUG", Value: "1"}) + } + + return envVars +} + +func buildPodVolumes(recordingsPVCName, sshKeysPath string) []corev1.Volume { + return []corev1.Volume{ + {Name: "shared-socket", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}}, + {Name: "recordings", VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ClaimName: recordingsPVCName}, + }}, + {Name: "ssh-keys", VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{Path: sshKeysPath}, + }}, + {Name: "dev-pts", VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{Path: "/dev/pts"}, + }}, + } +} + +func buildSessionPod( + sessionID, webhookURLs, cloudURL, recordingsPVCName, image string, + opts *startOptions, +) *corev1.Pod { + tmateEnv := buildTmateEnv(webhookURLs, cloudURL, opts) + recorderEnv := []corev1.EnvVar{ + {Name: "SHARED_SOCKET", Value: sharedSocketPath}, + {Name: "CLUSTER_ID", Value: opts.clusterID}, + } + + tmateMounts := []corev1.VolumeMount{ + {Name: "shared-socket", MountPath: "/shared"}, + {Name: "ssh-keys", MountPath: "/root/.ssh", ReadOnly: true}, + {Name: "dev-pts", MountPath: "/dev/pts"}, + } + recorderMounts := []corev1.VolumeMount{ + {Name: "shared-socket", MountPath: "/shared"}, + {Name: "recordings", MountPath: "/recordings"}, + {Name: "dev-pts", MountPath: "/dev/pts"}, + } + + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "remote-session-" + sessionID, + Namespace: chart.ReleaseNamespace, + Labels: map[string]string{ + remoteAccessLabel: remoteAccessValue, + "session-id": sessionID, + "cluster-id": opts.clusterID, + "cluster-name": opts.clusterName, + }, + }, + Spec: corev1.PodSpec{ + ShareProcessNamespace: ptr.To(true), + RestartPolicy: corev1.RestartPolicyAlways, + Containers: []corev1.Container{ + { + Name: "tmate", + Image: image, + Command: []string{"python3", "/tmate/tmate.py"}, + Env: tmateEnv, + VolumeMounts: tmateMounts, + }, + { + Name: "recorder", + Image: image, + Command: []string{"python3", "/recorder/recorder.py"}, + Env: recorderEnv, + VolumeMounts: recorderMounts, + }, + }, + Volumes: buildPodVolumes(recordingsPVCName, opts.sshKeysPath), + }, + } +} diff --git a/internal/cli/local/remote/stop.go b/internal/cli/local/remote/stop.go new file mode 100644 index 0000000..285a7e5 --- /dev/null +++ b/internal/cli/local/remote/stop.go @@ -0,0 +1,124 @@ +package remote + +import ( + "errors" + "fmt" + "regexp" + + "github.com/google/uuid" + "github.com/spf13/cobra" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/weka/gohomecli/internal/local/chart" + "github.com/weka/gohomecli/internal/utils" +) + +// sessionIDPattern matches 6 hex characters (format from generateShortID) +var sessionIDPattern = regexp.MustCompile(`^[0-9a-fA-F]{6}$`) + +// ErrMissingStopFilter is returned when no filter is specified for stop command +var ErrMissingStopFilter = errors.New("must specify one of: --session-id, --cluster-id, or --all") + +type stopOptions struct { + sessionID string + clusterID string + all bool +} + +func newStopCmd() *cobra.Command { + opts := &stopOptions{} + + cmd := &cobra.Command{ + Use: "stop", + Short: "Stop running remote session(s)", + Long: `Stop running remote session(s). + + +Examples: + homecli remote-access stop --session-id a1b2c3 # Stop a specific session by ID + homecli remote-access stop --cluster-id 550e8400-e29b-41d4-a716-446655440000 # Stop all sessions for a cluster + homecli remote-access stop --all # Stop all active sessions + `, + RunE: func(cmd *cobra.Command, _ []string) error { + return stopRun(cmd, opts) + }, + PreRunE: func(_ *cobra.Command, _ []string) error { + if opts.sessionID == "" && opts.clusterID == "" && !opts.all { + return ErrMissingStopFilter + } + + // Validate session-id format to prevent label selector injection + if opts.sessionID != "" && !sessionIDPattern.MatchString(opts.sessionID) { + return errors.New("invalid session-id format: must be 6 hex characters (e.g., a1b2c3)") + } + + // Validate cluster-id format to prevent label selector injection + if opts.clusterID != "" { + if _, err := uuid.Parse(opts.clusterID); err != nil { + return errors.New("invalid cluster-id format: must be a valid UUID") + } + } + + return nil + }, + } + + cmd.Flags().StringVar(&opts.sessionID, "session-id", "", "Stop specific session by ID") + cmd.Flags().StringVar(&opts.clusterID, "cluster-id", "", "Stop all sessions for a cluster") + cmd.Flags().BoolVar(&opts.all, "all", false, "Stop all active sessions") + + return cmd +} + +func stopRun(cmd *cobra.Command, opts *stopOptions) error { + ctx := cmd.Context() + + k8s, err := chart.NewKubernetesClient() + if err != nil { + return fmt.Errorf("failed to create Kubernetes client: %w", err) + } + + // Build label selector + selector := fmt.Sprintf("%s=%s", remoteAccessLabel, remoteAccessValue) + if opts.sessionID != "" { + selector += ",session-id=" + opts.sessionID + } else if opts.clusterID != "" { + selector += ",cluster-id=" + opts.clusterID + } + // --all: no additional filter + + logger.Debug().Str("selector", selector).Msg("Listing pods to stop") + + // List matching pods + pods, err := k8s.Clientset.CoreV1().Pods(chart.ReleaseNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: selector, + }) + if err != nil { + return fmt.Errorf("failed to list pods: %w", err) + } + + if len(pods.Items) == 0 { + utils.UserNote("No matching sessions found") + + return nil + } + + // Delete each pod + stoppedCount := 0 + for i := range pods.Items { + pod := &pods.Items[i] + sessionID := pod.Labels["session-id"] + logger.Info().Str("pod", pod.Name).Str("sessionID", sessionID).Msg("Stopping session pod") + + if err := k8s.Clientset.CoreV1().Pods(chart.ReleaseNamespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { + utils.UserWarning("Failed to stop session %s: %v", sessionID, err) + } else { + utils.UserOutput("Stopped session: %s (pod: %s)\n", sessionID, pod.Name) + stoppedCount++ + } + } + + utils.UserNote("Stopped %d session(s)", stoppedCount) + + return nil +} diff --git a/internal/env/config.go b/internal/env/config.go index 57e8ecb..c8e91aa 100644 --- a/internal/env/config.go +++ b/internal/env/config.go @@ -54,6 +54,9 @@ type Config struct { DefaultSite string `toml:"default_site"` } +// SkipAPIKeyValidation can be set to true before calling InitConfig to skip API key validation. +var SkipAPIKeyValidation bool + func InitConfig(siteNameFromCommandLine string) { if initialized { return @@ -174,7 +177,7 @@ func getSiteConfig(config *Config, siteNameFromCommandLine string) (*SiteConfig, } func validateSiteConfig(siteConfig *SiteConfig, siteName string) { - if siteConfig.APIKey == "" { + if !SkipAPIKeyValidation && siteConfig.APIKey == "" { utils.UserWarning("config error: \"api_key\" is unset for site %s", siteName) } if siteConfig.CloudURL == "" { diff --git a/internal/local/chart/kube.go b/internal/local/chart/kube.go index ca45a5a..6fd2498 100644 --- a/internal/local/chart/kube.go +++ b/internal/local/chart/kube.go @@ -1,22 +1,259 @@ package chart import ( + "archive/tar" + "bytes" "context" + "errors" "fmt" + "io" "net" "os" "path/filepath" + "strings" + "time" helmclient "github.com/mittwald/go-helm-client" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/remotecommand" "github.com/weka/gohomecli/internal/utils" ) -const KubeConfigPath = "/etc/rancher/k3s/k3s.yaml" +const ( + KubeConfigPath = "/etc/rancher/k3s/k3s.yaml" + copyFromPodTimeout = 10 * time.Minute + + tarTypeRegV7 = '\x00' // V7 Unix tar regular file typeflag (null byte) +) + +var errFileNotFoundInTar = errors.New("file not found in tar archive") + +// K8sClient holds a Kubernetes clientset and REST config +type K8sClient struct { + Clientset *kubernetes.Clientset + Config *rest.Config +} + +// NewKubernetesClient creates a new Kubernetes client from the default kubeconfig +func NewKubernetesClient() (*K8sClient, error) { + kubeconfig, err := ReadKubeConfig(KubeConfigPath) + if err != nil { + return nil, err + } + + config, err := clientcmd.RESTConfigFromKubeConfig(kubeconfig) + if err != nil { + return nil, err + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + + return &K8sClient{ + Clientset: clientset, + Config: config, + }, nil +} + +// K8sExecClient holds Kubernetes client info for executing commands in a specific pod +type K8sExecClient struct { + K8s *K8sClient + PodName string + Container string +} + +// NewK8sExecClient creates a client for executing commands in a pod found by label selector +func NewK8sExecClient(ctx context.Context, labelSelector, container string) (*K8sExecClient, error) { + k8s, err := NewKubernetesClient() + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client: %w", err) + } + + pods, err := k8s.Clientset.CoreV1().Pods(ReleaseNamespace).List(ctx, v1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, fmt.Errorf("failed to list pods: %w", err) + } + + if len(pods.Items) == 0 { + return nil, fmt.Errorf("no pod found with label selector %q", labelSelector) + } + + return &K8sExecClient{ + K8s: k8s, + PodName: pods.Items[0].Name, + Container: container, + }, nil +} + +// newExecutor creates a remotecommand executor for the given command +func (c *K8sExecClient) newExecutor(command []string) (remotecommand.Executor, error) { + req := c.K8s.Clientset.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(c.PodName). + Namespace(ReleaseNamespace). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: c.Container, + Command: command, + Stdout: true, + Stderr: true, + }, scheme.ParameterCodec) + + return remotecommand.NewSPDYExecutor(c.K8s.Config, "POST", req.URL()) +} + +// Exec runs a command in the pod and returns the output +func (c *K8sExecClient) Exec(ctx context.Context, command ...string) (string, error) { + executor, err := c.newExecutor(command) + if err != nil { + return "", fmt.Errorf("failed to create executor: %w", err) + } + + var stdout, stderr bytes.Buffer + err = executor.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdout: &stdout, + Stderr: &stderr, + }) + if err != nil { + return "", fmt.Errorf("exec failed: %s - %w", stderr.String(), err) + } + + return stdout.String(), nil +} + +// CopyFromPod copies a file from the pod to the local filesystem using streaming +func (c *K8sExecClient) CopyFromPod(ctx context.Context, remotePath, localPath string) error { + // Run tar directly without using a shell to avoid command injection risks + executor, err := c.newExecutor([]string{"tar", "cf", "-", remotePath}) + if err != nil { + return fmt.Errorf("failed to create executor: %w", err) + } + + // Add timeout to prevent indefinite hangs on large files or slow connections + copyCtx, cancel := context.WithTimeout(ctx, copyFromPodTimeout) + defer cancel() + + // Use pipe for streaming (memory efficient for large files) + reader, writer := io.Pipe() + defer reader.Close() // Ensure cleanup even on panic + var stderr bytes.Buffer + execErrCh := make(chan error, 1) + + // Stream tar from pod in background + go func() { + defer writer.Close() + execErrCh <- executor.StreamWithContext(copyCtx, remotecommand.StreamOptions{ + Stdout: writer, + Stderr: &stderr, + }) + }() + + // Extract file from tar stream + extractErr := extractTarFile(reader, localPath) + + // Close reader to unblock the goroutine if extraction fails/completes + reader.Close() + + // Wait for goroutine to complete and get its error + execErr := <-execErrCh + + // Log stderr if there's any output + if stderr.Len() > 0 { + logger.Debug().Str("stderr", stderr.String()).Msg("CopyFromPod tar stderr") + } + + if extractErr != nil { + logger.Debug().Err(extractErr).Str("stderr", stderr.String()).Msg("CopyFromPod extract failed") + + return extractErr + } + if execErr != nil { + // Ignore "closed pipe" error when extraction succeeded - this happens when we close + // the reader after extracting the file but before tar finishes streaming + if isClosedPipeError(execErr) { + return nil + } + // Check for context timeout + if errors.Is(execErr, context.DeadlineExceeded) { + return fmt.Errorf("copy timed out after %v: %w", copyFromPodTimeout, execErr) + } + + return fmt.Errorf("tar exec failed: %s - %w", stderr.String(), execErr) + } + + return nil +} + +// isClosedPipeError checks if the error is a "closed pipe" error which is expected +// when we close the reader early after successfully extracting the file +func isClosedPipeError(err error) bool { + if err == nil { + return false + } + + return strings.Contains(err.Error(), "closed pipe") +} + +// extractTarFile extracts the first regular file from a tar stream and drains remaining data. +// Draining prevents "closed pipe" errors from the streaming goroutine. +func extractTarFile(reader io.Reader, destPath string) error { //nolint:gocognit // simple loop + tr := tar.NewReader(reader) + extracted := false + + for { + header, err := tr.Next() + if errors.Is(err, io.EOF) { + if !extracted { + return errFileNotFoundInTar + } + + return nil + } + + if err != nil { + if extracted { + return nil // ignore stream errors after successful extraction + } + + return fmt.Errorf("failed to read tar: %w", err) + } + + // Extract first regular file, then continue draining + if !extracted && (header.Typeflag == tar.TypeReg || header.Typeflag == tarTypeRegV7) { + if err := writeFile(destPath, tr); err != nil { + return err + } + + extracted = true + } + } +} + +func writeFile(path string, r io.Reader) error { + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer f.Close() //nolint:errcheck // best effort + + if _, err := io.Copy(f, r); err != nil { + os.Remove(path) //nolint:errcheck // best effort cleanup of partial file + + return fmt.Errorf("failed to write file: %w", err) + } + + return nil +} // ReadKubeConfig reads the kubeconfig from the given path with fallback to ~/.kube/config func ReadKubeConfig(kubeConfigPath string) ([]byte, error) { @@ -131,22 +368,12 @@ type PodInfo struct { // GetNonRuninngPods returns a list of non-running pods in the ReleaseNamespace namespace func GetNonRuninngPods(ctx context.Context) ([]PodInfo, error) { - kubeconfig, err := ReadKubeConfig(KubeConfigPath) - if err != nil { - return nil, err - } - - config, err := clientcmd.RESTConfigFromKubeConfig(kubeconfig) + k8s, err := NewKubernetesClient() if err != nil { return nil, err } - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - - pods, err := clientset.CoreV1().Pods(ReleaseNamespace).List(ctx, v1.ListOptions{}) + pods, err := k8s.Clientset.CoreV1().Pods(ReleaseNamespace).List(ctx, v1.ListOptions{}) if err != nil { return nil, err } @@ -193,24 +420,65 @@ func getContainerStatusReason(containerStatus corev1.ContainerStatus, podReason return "Unknown" } -// GetIngressAddress returns the address of the ingress in ReleaseNamespace namespace -func GetIngressAddress(ctx context.Context) (string, error) { - kubeconfig, err := ReadKubeConfig(KubeConfigPath) +// GetPVCName returns the name of a PVC found by label selector +func GetPVCName(ctx context.Context, labelSelector string) (string, error) { + k8s, err := NewKubernetesClient() if err != nil { return "", err } - config, err := clientcmd.RESTConfigFromKubeConfig(kubeconfig) + pvcs, err := k8s.Clientset.CoreV1().PersistentVolumeClaims(ReleaseNamespace).List(ctx, v1.ListOptions{ + LabelSelector: labelSelector, + }) if err != nil { return "", err } - clientset, err := kubernetes.NewForConfig(config) + if len(pvcs.Items) == 0 { + return "", fmt.Errorf("no PVC found with label selector %q in namespace %s", labelSelector, ReleaseNamespace) + } + + return pvcs.Items[0].Name, nil +} + +// GetConfigMapData returns a specific key's value from a ConfigMap found by label selector +func GetConfigMapData(ctx context.Context, labelSelector, key string) (string, error) { + k8s, err := NewKubernetesClient() + if err != nil { + return "", err + } + + configMaps, err := k8s.Clientset.CoreV1().ConfigMaps(ReleaseNamespace).List(ctx, v1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return "", err + } + + if len(configMaps.Items) == 0 { + return "", fmt.Errorf( + "no ConfigMap found with label selector %q in namespace %s", + labelSelector, + ReleaseNamespace, + ) + } + + value, ok := configMaps.Items[0].Data[key] + if !ok { + return "", fmt.Errorf("key %q not found in ConfigMap %s", key, configMaps.Items[0].Name) + } + + return value, nil +} + +// GetIngressAddress returns the address of the ingress in ReleaseNamespace namespace +func GetIngressAddress(ctx context.Context) (string, error) { + k8s, err := NewKubernetesClient() if err != nil { return "", err } - ingress, err := clientset.NetworkingV1(). + ingress, err := k8s.Clientset.NetworkingV1(). Ingresses(ReleaseNamespace). Get(ctx, "wekahome", v1.GetOptions{}) if err != nil { diff --git a/internal/local/chart/v3.go b/internal/local/chart/v3.go index 5c96179..992699a 100644 --- a/internal/local/chart/v3.go +++ b/internal/local/chart/v3.go @@ -259,6 +259,11 @@ func configureLWH(*config_v1.Configuration) (yamlMap, error) { writeMapEntry(cfg, "victoria-metrics-k8s-stack.vmalert.enabled", false), // grafana writeMapEntry(cfg, "grafana.initChownData.enabled", false), + // disable large dashboards to stay under Helm 1MB secret limit + writeMapEntry(cfg, "grafna.enabledDashboards.observe-dashboard", false), + writeMapEntry(cfg, "grafana.enabledDashboards.events-insights", false), + // enable remote session client for LWH + writeMapEntry(cfg, "remoteSessionClient.enabled", true), // redis - disable at all. writeMapEntry(cfg, "storage.redis.useInternal", false), writeMapEntry(cfg, "storage.redis.useExternal", false),