diff --git a/CLAUDE.md b/CLAUDE.md index 1e348e8..8e33f05 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -158,7 +158,8 @@ Manages connections to multiple Kubernetes clusters: - Force fresh discovery: restart k8sql or use `--refresh-crds` flag **Retry Logic:** -- Connection and discovery failures retry 3 times with linear backoff (100ms, 200ms, 300ms) +- Connection and discovery failures retry up to 5 times with exponential backoff (100ms, 200ms, 400ms, 800ms, 1600ms base delays) +- ±25% jitter added to prevent thundering herd when multiple clients retry simultaneously - Handles intermittent network issues and proxy problems - All requested contexts must succeed - no partial failures (ensures predictability for scripting) @@ -171,6 +172,7 @@ Manages connections to multiple Kubernetes clusters: - **k8s-openapi**: Kubernetes API type definitions (v1.32) - **rustyline**: REPL with readline support - **clap**: CLI argument parsing +- **fastrand**: Fast random number generation (for retry jitter) ## Table Schema diff --git a/Cargo.lock b/Cargo.lock index 4aba9b7..344b631 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,7 +8,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" dependencies = [ - "getrandom 0.2.16", + "getrandom 0.2.17", "once_cell", "version_check", ] @@ -130,9 +130,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb372a7cbcac02a35d3fb7b3fc1f969ec078e871f9bb899bf00a2e1809bec8a3" +checksum = "2a2b10dcb159faf30d3f81f6d56c1211a5bea2ca424eabe477648a44b993320e" dependencies = [ "arrow-arith", "arrow-array", @@ -151,9 +151,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f377dcd19e440174596d83deb49cd724886d91060c07fec4f67014ef9d54049" +checksum = "288015089e7931843c80ed4032c5274f02b37bcb720c4a42096d50b390e70372" dependencies = [ "arrow-array", "arrow-buffer", @@ -165,9 +165,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eaff85a44e9fa914660fb0d0bb00b79c4a3d888b5334adb3ea4330c84f002" +checksum = "65ca404ea6191e06bf30956394173337fa9c35f445bd447fe6c21ab944e1a23c" dependencies = [ "ahash 0.8.12", "arrow-buffer", @@ -184,9 +184,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2819d893750cb3380ab31ebdc8c68874dd4429f90fd09180f3c93538bd21626" +checksum = "36356383099be0151dacc4245309895f16ba7917d79bdb71a7148659c9206c56" dependencies = [ "bytes", "half", @@ -196,9 +196,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d131abb183f80c450d4591dc784f8d7750c50c6e2bc3fcaad148afc8361271" +checksum = "9c8e372ed52bd4ee88cc1e6c3859aa7ecea204158ac640b10e187936e7e87074" dependencies = [ "arrow-array", "arrow-buffer", @@ -218,9 +218,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2275877a0e5e7e7c76954669366c2aa1a829e340ab1f612e647507860906fb6b" +checksum = "8e4100b729fe656f2e4fb32bc5884f14acf9118d4ad532b7b33c1132e4dce896" dependencies = [ "arrow-array", "arrow-cast", @@ -233,9 +233,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05738f3d42cb922b9096f7786f606fcb8669260c2640df8490533bb2fa38c9d3" +checksum = "bf87f4ff5fc13290aa47e499a8b669a82c5977c6a1fedce22c7f542c1fd5a597" dependencies = [ "arrow-buffer", "arrow-schema", @@ -246,9 +246,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d09446e8076c4b3f235603d9ea7c5494e73d441b01cd61fb33d7254c11964b3" +checksum = "eb3ca63edd2073fcb42ba112f8ae165df1de935627ead6e203d07c99445f2081" dependencies = [ "arrow-array", "arrow-buffer", @@ -261,9 +261,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "371ffd66fa77f71d7628c63f209c9ca5341081051aa32f9c8020feb0def787c0" +checksum = "a36b2332559d3310ebe3e173f75b29989b4412df4029a26a30cc3f7da0869297" dependencies = [ "arrow-array", "arrow-buffer", @@ -285,9 +285,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbc94fc7adec5d1ba9e8cd1b1e8d6f72423b33fe978bf1f46d970fafab787521" +checksum = "13c4e0530272ca755d6814218dffd04425c5b7854b87fa741d5ff848bf50aa39" dependencies = [ "arrow-array", "arrow-buffer", @@ -313,9 +313,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "169676f317157dc079cc5def6354d16db63d8861d61046d2f3883268ced6f99f" +checksum = "b07f52788744cc71c4628567ad834cadbaeb9f09026ff1d7a4120f69edf7abd3" dependencies = [ "arrow-array", "arrow-buffer", @@ -326,9 +326,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d27609cd7dd45f006abae27995c2729ef6f4b9361cde1ddd019dc31a5aa017e0" +checksum = "6bb63203e8e0e54b288d0d8043ca8fa1013820822a27692ef1b78a977d879f2c" dependencies = [ "serde_core", "serde_json", @@ -336,9 +336,9 @@ dependencies = [ [[package]] name = "arrow-select" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae980d021879ea119dd6e2a13912d81e64abed372d53163e804dfe84639d8010" +checksum = "c96d8a1c180b44ecf2e66c9a2f2bbcb8b1b6f14e165ce46ac8bde211a363411b" dependencies = [ "ahash 0.8.12", "arrow-array", @@ -350,9 +350,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf35e8ef49dcf0c5f6d175edee6b8af7b45611805333129c541a8b89a0fc0534" +checksum = "a8ad6a81add9d3ea30bf8374ee8329992c7fd246ffd8b7e2f48a3cea5aa0cc9a" dependencies = [ "arrow-array", "arrow-buffer", @@ -396,7 +396,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -407,7 +407,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -472,9 +472,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" -version = "1.8.1" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e050f626429857a27ddccb31e0aca21356bfa709c04041aefddac081a8f068a" +checksum = "7d809780667f4410e7c41b07f52439b94d2bdf8528eeedc287fa38d3b7f95d82" [[package]] name = "bcder" @@ -546,7 +546,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -591,9 +591,9 @@ checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "cc" -version = "1.2.51" +version = "1.2.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a0aeaff4ff1a90589618835a598e545176939b97874f7abc7851caa0618f203" +checksum = "cd4932aefd12402b36c60956a4fe0035421f544799057659ff86f923657aada3" dependencies = [ "find-msvc-tools", "jobserver", @@ -668,7 +668,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -755,7 +755,7 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" dependencies = [ - "getrandom 0.2.16", + "getrandom 0.2.17", "once_cell", "tiny-keccak", ] @@ -881,7 +881,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -892,7 +892,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ "darling_core", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -1349,7 +1349,7 @@ checksum = "1063ad4c9e094b3f798acee16d9a47bd7372d9699be2de21b05c3bd3f34ab848" dependencies = [ "datafusion-doc", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -1587,7 +1587,7 @@ checksum = "2cdc8d50f426189eef89dac62fabfa0abb27d5cc008f25bf4156a0203325becc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -1608,7 +1608,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -1651,7 +1651,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -1684,7 +1684,7 @@ dependencies = [ "enum-ordinalize", "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -1722,7 +1722,7 @@ checksum = "8ca9601fb2d62598ee17836250842873a413586e5d7ed88b356e38ddbb0ec631" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -1793,9 +1793,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645cbb3a84e60b7531617d5ae4e57f7e27308f6445f5abf653209ea76dec8dff" +checksum = "f449e6c6c08c865631d4890cfacf252b3d396c9bcc83adb6623cdb02a8336c41" [[package]] name = "fixedbitset" @@ -1902,7 +1902,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -1953,9 +1953,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", "libc", @@ -1983,7 +1983,7 @@ dependencies = [ "proc-macro-error2", "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -2354,9 +2354,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.12.1" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" +checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", "hashbrown 0.16.1", @@ -2520,6 +2520,7 @@ dependencies = [ "datafusion-functions-json", "datafusion-postgres", "dirs", + "fastrand", "futures", "indicatif", "k8s-metrics", @@ -2618,7 +2619,7 @@ dependencies = [ "quote", "serde", "serde_json", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -2669,7 +2670,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -2737,9 +2738,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.178" +version = "0.2.180" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" +checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" [[package]] name = "libm" @@ -3049,9 +3050,9 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "pest" -version = "2.8.4" +version = "2.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbcfd20a6d4eeba40179f05735784ad32bdaef05ce8e8af05f180d45bb3e7e22" +checksum = "2c9eb05c21a464ea704b53158d358a31e6425db2f63a1a7312268b05fe2b75f7" dependencies = [ "memchr", "ucd-trie", @@ -3059,9 +3060,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.8.4" +version = "2.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51f72981ade67b1ca6adc26ec221be9f463f2b5839c7508998daa17c23d94d7f" +checksum = "68f9dbced329c441fa79d80472764b1a2c7e57123553b8519b36663a2fb234ed" dependencies = [ "pest", "pest_generator", @@ -3069,22 +3070,22 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.8.4" +version = "2.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dee9efd8cdb50d719a80088b76f81aec7c41ed6d522ee750178f83883d271625" +checksum = "3bb96d5051a78f44f43c8f712d8e810adb0ebf923fc9ed2655a7f66f63ba8ee5" dependencies = [ "pest", "pest_meta", "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] name = "pest_meta" -version = "2.8.4" +version = "2.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf1d70880e76bdc13ba52eafa6239ce793d85c8e43896507e43dd8984ff05b82" +checksum = "602113b5b5e8621770cfd490cfd90b9f84ab29bd2b0e49ad83eb6d186cef2365" dependencies = [ "pest", "sha2", @@ -3168,7 +3169,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -3274,14 +3275,14 @@ dependencies = [ "proc-macro-error-attr2", "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] name = "proc-macro2" -version = "1.0.104" +version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9695f8df41bb4f3d222c95a67532365f569318332d03d5f3f67f37b20e6ebdf0" +checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7" dependencies = [ "unicode-ident", ] @@ -3363,7 +3364,7 @@ dependencies = [ "proc-macro2", "pyo3-macros-backend", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -3376,14 +3377,14 @@ dependencies = [ "proc-macro2", "pyo3-build-config", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] name = "quote" -version = "1.0.42" +version = "1.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a" dependencies = [ "proc-macro2", ] @@ -3457,7 +3458,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.16", + "getrandom 0.2.17", ] [[package]] @@ -3486,7 +3487,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76009fbe0614077fc1a2ce255e3a1881a2e3a3527097d5dc6d8212c585e7e38b" dependencies = [ "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -3504,7 +3505,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" dependencies = [ - "getrandom 0.2.16", + "getrandom 0.2.17", "libredox", "thiserror 2.0.17", ] @@ -3526,7 +3527,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -3587,7 +3588,7 @@ checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" dependencies = [ "cc", "cfg-if", - "getrandom 0.2.16", + "getrandom 0.2.17", "libc", "untrusted", "windows-sys 0.52.0", @@ -3595,9 +3596,9 @@ dependencies = [ [[package]] name = "rkyv" -version = "0.7.45" +version = "0.7.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" +checksum = "2297bf9c81a3f0dc96bc9521370b88f054168c29826a75e89c55ff196e7ed6a1" dependencies = [ "bitvec", "bytecheck", @@ -3613,9 +3614,9 @@ dependencies = [ [[package]] name = "rkyv_derive" -version = "0.7.45" +version = "0.7.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "503d1d27590a2b0a3a4ca4c94755aa2875657196ecbf401a42eff41d7de532c0" +checksum = "84d7b42d4b8d06048d3ac8db0eb31bcb942cbeb709f0b5f2b2ebde398d3038f5" dependencies = [ "proc-macro2", "quote", @@ -3647,7 +3648,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.112", + "syn 2.0.114", "unicode-ident", ] @@ -3785,7 +3786,7 @@ checksum = "5d66de233f908aebf9cc30ac75ef9103185b4b715c6f2fb7a626aa5e5ede53ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -3834,7 +3835,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -3924,7 +3925,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -3935,7 +3936,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -4072,7 +4073,7 @@ checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -4130,9 +4131,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.112" +version = "2.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21f182278bf2d2bcb3c88b1b08a37df029d71ce3d3ae26168e3c653b213b99d4" +checksum = "d4d107df263a3013ef9b1879b0df87d706ff80f65a86ea879bd9c31f9b307c2a" dependencies = [ "proc-macro2", "quote", @@ -4153,7 +4154,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -4207,7 +4208,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -4218,7 +4219,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -4320,7 +4321,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -4335,9 +4336,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.17" +version = "0.7.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" dependencies = [ "bytes", "futures-core", @@ -4456,7 +4457,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -4597,9 +4598,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.7" +version = "2.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" dependencies = [ "form_urlencoded", "idna", @@ -4721,7 +4722,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", "wasm-bindgen-shared", ] @@ -4806,7 +4807,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -4817,7 +4818,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -5077,28 +5078,28 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", "synstructure", ] [[package]] name = "zerocopy" -version = "0.8.31" +version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" +checksum = "668f5168d10b9ee831de31933dc111a459c97ec93225beb307aed970d1372dfd" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.31" +version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" +checksum = "2c7962b26b0a8685668b671ee4b54d007a67d4eaf05fda79ac0ecf41e32270f1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -5118,7 +5119,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", "synstructure", ] @@ -5139,7 +5140,7 @@ checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] @@ -5172,11 +5173,11 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.112", + "syn 2.0.114", ] [[package]] name = "zmij" -version = "1.0.8" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317f17ff091ac4515f17cc7a190d2769a8c9a96d227de5d64b500b01cda8f2cd" +checksum = "2fc5a66a20078bf1251bde995aa2fdcc4b800c70b5d92dd2c62abc5c60f679f8" diff --git a/Cargo.toml b/Cargo.toml index f5ba3c8..52f7369 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,9 @@ dirs = "6" # Async runtime tokio = { version = "1", features = ["full"] } +# Random number generation (for retry jitter) +fastrand = "2" + # CLI argument parsing clap = { version = "4", features = ["derive"] } diff --git a/src/cli/repl.rs b/src/cli/repl.rs index 907cd87..19190b1 100644 --- a/src/cli/repl.rs +++ b/src/cli/repl.rs @@ -25,6 +25,7 @@ use crate::output::{ truncate_value, }; use crate::progress::{ProgressUpdate, run_with_progress}; +use indicatif::ProgressBar; // SQL keywords from sqlparser (for completion) use datafusion::sql::sqlparser::keywords::ALL_KEYWORDS; @@ -32,6 +33,48 @@ use datafusion::sql::sqlparser::keywords::ALL_KEYWORDS; /// Marker for cancelled queries (Ctrl+C during execution) struct QueryCancelled; +/// Handle a progress update by updating the spinner message +/// +/// This extracts the progress handling logic from the main REPL loop +/// to reduce nesting and improve readability. +fn handle_progress_update( + progress: Result, + pool: &K8sClientPool, + spinner: &ProgressBar, +) { + match progress { + Ok(ProgressUpdate::StartingQuery { + table, + cluster_count, + }) => { + if cluster_count > 1 { + spinner.set_message(format!( + "Querying {} across {} clusters...", + table, cluster_count + )); + } else { + spinner.set_message(format!("Querying {}...", table)); + } + } + Ok(ProgressUpdate::ClusterComplete { + cluster, + rows, + elapsed_ms: _, + }) => { + let (done, total) = pool.progress().progress(); + spinner.set_message(format!("[{}/{}] {} ({} rows)", done, total, cluster, rows)); + } + // Connection/discovery events - shouldn't happen during query + Ok(ProgressUpdate::Connecting { .. }) + | Ok(ProgressUpdate::Connected { .. }) + | Ok(ProgressUpdate::Discovering { .. }) + | Ok(ProgressUpdate::DiscoveryComplete { .. }) + | Ok(ProgressUpdate::RegisteringTables { .. }) => {} + // Channel errors - just continue + Err(broadcast::error::RecvError::Closed) | Err(broadcast::error::RecvError::Lagged(_)) => {} + } +} + // For extracting function signatures use datafusion::logical_expr::{Signature, TypeSignature}; @@ -1349,37 +1392,7 @@ pub async fn run_repl(mut session: K8sSessionContext, pool: Arc) } // Check for progress updates progress = progress_rx.recv() => { - match progress { - Ok(ProgressUpdate::StartingQuery { table, cluster_count }) => { - if cluster_count > 1 { - spinner.set_message(format!( - "Querying {} across {} clusters...", - table, cluster_count - )); - } else { - spinner.set_message(format!("Querying {}...", table)); - } - } - Ok(ProgressUpdate::ClusterComplete { cluster, rows, elapsed_ms: _ }) => { - let (done, total) = pool.progress().progress(); - spinner.set_message(format!( - "[{}/{}] {} ({} rows)", - done, total, cluster, rows - )); - } - // Connection/discovery events - shouldn't happen during query - Ok(ProgressUpdate::Connecting { .. }) - | Ok(ProgressUpdate::Connected { .. }) - | Ok(ProgressUpdate::Discovering { .. }) - | Ok(ProgressUpdate::DiscoveryComplete { .. }) - | Ok(ProgressUpdate::RegisteringTables { .. }) => {} - Err(broadcast::error::RecvError::Closed) => { - // Channel closed, wait for query - } - Err(broadcast::error::RecvError::Lagged(_)) => { - // Missed some updates, continue - } - } + handle_progress_update(progress, &pool, &spinner); } // Query completed query_result = &mut query_handle => { diff --git a/src/datafusion_integration/execution.rs b/src/datafusion_integration/execution.rs index 5b96acc..42adac2 100644 --- a/src/datafusion_integration/execution.rs +++ b/src/datafusion_integration/execution.rs @@ -37,7 +37,9 @@ use datafusion::arrow::datatypes::SchemaRef; use datafusion::error::Result; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; -use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; +use datafusion::physical_plan::metrics::{ + Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, +}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::Stream; @@ -69,6 +71,25 @@ pub struct QueryTarget { pub namespace: Option, } +/// Metrics for tracking streaming execution performance +/// +/// Groups all DataFusion metrics used during K8s resource fetching into a single struct. +/// This makes it easier to pass metrics through the execution pipeline. +pub struct ExecutionMetrics { + /// Number of K8s resources returned + pub rows_fetched: Count, + /// Number of K8s API calls (LIST operations) + pub pages_fetched: Count, + /// Total K8s API fetch time + pub fetch_time: Time, + /// Arrow RecordBatch memory size in bytes + pub output_bytes: Count, + /// JSON to Arrow conversion time + pub conversion_time: Time, + /// Time to first page (TTFB) + pub first_page_time: Time, +} + /// Custom ExecutionPlan that lazily fetches Kubernetes resources /// /// Each partition corresponds to one QueryTarget (cluster, namespace pair). @@ -317,6 +338,14 @@ impl ExecutionPlan for K8sExecutionPlan { ); // Create a streaming execution that yields RecordBatches as pages arrive + let metrics = ExecutionMetrics { + rows_fetched, + pages_fetched, + fetch_time, + output_bytes, + conversion_time, + first_page_time, + }; let stream = create_streaming_execution( pool, table_name, @@ -324,12 +353,7 @@ impl ExecutionPlan for K8sExecutionPlan { api_filters, fetch_limit, columns, - rows_fetched, - pages_fetched, - fetch_time, - output_bytes, - conversion_time, - first_page_time, + metrics, ); Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) @@ -337,7 +361,6 @@ impl ExecutionPlan for K8sExecutionPlan { } /// Create a stream that fetches K8s pages and yields RecordBatches -#[allow(clippy::too_many_arguments)] fn create_streaming_execution( pool: Arc, table_name: String, @@ -345,12 +368,7 @@ fn create_streaming_execution( api_filters: ApiFilters, fetch_limit: Option, columns: Arc>, - rows_fetched: datafusion::physical_plan::metrics::Count, - pages_fetched: datafusion::physical_plan::metrics::Count, - fetch_time: datafusion::physical_plan::metrics::Time, - output_bytes: datafusion::physical_plan::metrics::Count, - conversion_time: datafusion::physical_plan::metrics::Time, - first_page_time: datafusion::physical_plan::metrics::Time, + metrics: ExecutionMetrics, ) -> Pin> + Send>> { let stream = try_stream! { let start = Instant::now(); @@ -374,7 +392,7 @@ fn create_streaming_execution( Ok(items) => { // Track time to first page if !first_page_received { - first_page_time.add_duration(first_page_timer.elapsed()); + metrics.first_page_time.add_duration(first_page_timer.elapsed()); first_page_received = true; } @@ -382,8 +400,8 @@ fn create_streaming_execution( total_rows += page_row_count; // Update metrics - pages_fetched.add(1); - rows_fetched.add(page_row_count); + metrics.pages_fetched.add(1); + metrics.rows_fetched.add(page_row_count); debug!( cluster = %target.cluster, @@ -400,9 +418,9 @@ fn create_streaming_execution( let batch = json_to_record_batch(&target.cluster, &columns, items)?; // Track conversion time and output bytes - conversion_time.add_duration(convert_start.elapsed()); + metrics.conversion_time.add_duration(convert_start.elapsed()); let batch_bytes = batch.get_array_memory_size(); - output_bytes.add(batch_bytes); + metrics.output_bytes.add(batch_bytes); yield batch; } @@ -444,7 +462,7 @@ fn create_streaming_execution( } // Record total fetch time - fetch_time.add_duration(start.elapsed()); + metrics.fetch_time.add_duration(start.elapsed()); // Report progress pool.progress().cluster_complete( diff --git a/src/datafusion_integration/provider.rs b/src/datafusion_integration/provider.rs index 502efda..3039238 100644 --- a/src/datafusion_integration/provider.rs +++ b/src/datafusion_integration/provider.rs @@ -900,11 +900,11 @@ mod tests { K8sTableProvider::new(resource_info, pool) } - /// Helper to create equality filter (column = value) - fn eq_filter(column: &str, value: impl Into) -> Expr { + /// Helper to create a binary filter expression (column op value) + fn binary_filter(column: &str, op: Operator, value: impl Into) -> Expr { Expr::BinaryExpr(BinaryExpr { left: Box::new(Expr::Column(Column::new_unqualified(column))), - op: Operator::Eq, + op, right: Box::new(Expr::Literal( datafusion::common::ScalarValue::Utf8(Some(value.into())), None, @@ -912,40 +912,24 @@ mod tests { }) } + /// Helper to create equality filter (column = value) + fn eq_filter(column: &str, value: impl Into) -> Expr { + binary_filter(column, Operator::Eq, value) + } + /// Helper to create not-equals filter (column != value) fn ne_filter(column: &str, value: impl Into) -> Expr { - Expr::BinaryExpr(BinaryExpr { - left: Box::new(Expr::Column(Column::new_unqualified(column))), - op: Operator::NotEq, - right: Box::new(Expr::Literal( - datafusion::common::ScalarValue::Utf8(Some(value.into())), - None, - )), - }) + binary_filter(column, Operator::NotEq, value) } /// Helper to create greater-than filter (column > value) fn gt_filter(column: &str, value: impl Into) -> Expr { - Expr::BinaryExpr(BinaryExpr { - left: Box::new(Expr::Column(Column::new_unqualified(column))), - op: Operator::Gt, - right: Box::new(Expr::Literal( - datafusion::common::ScalarValue::Utf8(Some(value.into())), - None, - )), - }) + binary_filter(column, Operator::Gt, value) } /// Helper to create LIKE filter (column LIKE pattern) fn like_filter(column: &str, pattern: impl Into) -> Expr { - Expr::BinaryExpr(BinaryExpr { - left: Box::new(Expr::Column(Column::new_unqualified(column))), - op: Operator::LikeMatch, - right: Box::new(Expr::Literal( - datafusion::common::ScalarValue::Utf8(Some(pattern.into())), - None, - )), - }) + binary_filter(column, Operator::LikeMatch, pattern) } /// Helper to create IN filter (column IN (values)) @@ -965,15 +949,6 @@ mod tests { }) } - /// Helper to create AND filter (left AND right) - fn and_filter(left: Expr, right: Expr) -> Expr { - Expr::BinaryExpr(BinaryExpr { - left: Box::new(left), - op: Operator::And, - right: Box::new(right), - }) - } - /// Helper to assert Exact pushdown fn assert_exact(result: &[TableProviderFilterPushDown]) { assert_eq!(result.len(), 1); diff --git a/src/kubernetes/client.rs b/src/kubernetes/client.rs index 4cfb678..15debb4 100644 --- a/src/kubernetes/client.rs +++ b/src/kubernetes/client.rs @@ -30,6 +30,77 @@ const READ_TIMEOUT: Duration = Duration::from_secs(30); /// Smaller pages reduce memory pressure and allow faster initial response const PAGE_SIZE: u32 = 500; +/// Maximum retry attempts for transient failures +const MAX_RETRY_ATTEMPTS: u32 = 5; + +/// Base delay for exponential backoff (100ms base: 100ms, 200ms, 400ms, 800ms, 1600ms) +const RETRY_BASE_DELAY_MS: u64 = 100; + +/// Jitter factor (±25% of delay to prevent thundering herd) +const RETRY_JITTER_FACTOR: f64 = 0.25; + +/// Calculate delay with exponential backoff and jitter +/// +/// Returns a delay with ±25% random jitter to prevent synchronized retries +fn calculate_retry_delay(attempt: u32) -> Duration { + let base_delay_ms = RETRY_BASE_DELAY_MS * 2u64.pow(attempt); + // Add jitter: ±25% of base delay + let jitter_range = (base_delay_ms as f64 * RETRY_JITTER_FACTOR) as u64; + let jitter = if jitter_range > 0 { + fastrand::u64(0..jitter_range * 2) as i64 - jitter_range as i64 + } else { + 0 + }; + let delay_ms = (base_delay_ms as i64 + jitter).max(10) as u64; // minimum 10ms + Duration::from_millis(delay_ms) +} + +/// Execute an async operation with exponential backoff retry and jitter +/// +/// # Arguments +/// * `operation` - The async operation to retry +/// * `should_retry` - Predicate to determine if an error is retryable +/// * `context` - Description for logging +/// +/// # Returns +/// The result of the operation, or the last error if all retries failed +async fn retry_with_backoff( + mut operation: F, + should_retry: impl Fn(&E) -> bool, + context: &str, +) -> std::result::Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, + E: std::fmt::Display, +{ + let mut last_error = None; + + for attempt in 0..MAX_RETRY_ATTEMPTS { + match operation().await { + Ok(result) => return Ok(result), + Err(e) => { + if should_retry(&e) && attempt + 1 < MAX_RETRY_ATTEMPTS { + let delay = calculate_retry_delay(attempt); + warn!( + context = %context, + attempt = attempt + 1, + max_attempts = MAX_RETRY_ATTEMPTS, + delay_ms = delay.as_millis() as u64, + "Retrying after error: {}", e + ); + tokio::time::sleep(delay).await; + last_error = Some(e); + } else { + return Err(e); + } + } + } + } + + Err(last_error.expect("retry loop should set last_error after failed attempts")) +} + /// Check if a context spec contains patterns (*, ?) or multiple contexts (comma-separated) pub fn is_multi_or_pattern_spec(spec: Option<&str>) -> bool { spec.map(|s| s.contains(',') || s.contains('*') || s.contains('?')) @@ -500,19 +571,24 @@ impl K8sClientPool { Ok(count) } - /// Get the resource registry for the current context - /// Automatically refreshes if TTL expired - pub async fn get_registry(&self, context: Option<&str>) -> Result { - let ctx = match context { - Some(c) => c.to_string(), + /// Resolve context to a string, using first current context if None + async fn resolve_context(&self, context: Option<&str>) -> Result { + match context { + Some(c) => Ok(c.to_string()), None => self .current_contexts .read() .await .first() .cloned() - .ok_or_else(|| anyhow!("No current context set"))?, - }; + .ok_or_else(|| anyhow!("No current context set")), + } + } + + /// Get the resource registry for the current context + /// Automatically refreshes if TTL expired + pub async fn get_registry(&self, context: Option<&str>) -> Result { + let ctx = self.resolve_context(context).await?; // Ensure we have discovered resources for this context (respects TTL) self.discover_resources_for_context(&ctx, false).await?; @@ -589,16 +665,7 @@ impl K8sClientPool { /// Get client for a specific context, or current context if None pub async fn get_client(&self, context: Option<&str>) -> Result { - let ctx = match context { - Some(c) => c.to_string(), - None => self - .current_contexts - .read() - .await - .first() - .cloned() - .ok_or_else(|| anyhow!("No current context set"))?, - }; + let ctx = self.resolve_context(context).await?; self.get_or_create_client(&ctx).await } @@ -685,59 +752,24 @@ impl K8sClientPool { super::context_matcher::ContextMatcher::new(&all_contexts).resolve(context_spec)?; // Ensure we have clients and discovered resources for all contexts IN PARALLEL - // Retry up to 3 times for intermittent failures + // Retry up to 3 times with exponential backoff for intermittent failures let discovery_futures: Vec<_> = matched_contexts .iter() .map(|ctx| { let ctx = ctx.clone(); async move { - let mut last_error = None; - for attempt in 1..=3 { - match self.get_or_create_client(&ctx).await { - Ok(_) => { - // Client connected, now discover resources - match self - .discover_resources_for_context(&ctx, force_refresh) - .await - { - Ok(_) => return Ok::<_, anyhow::Error>(()), - Err(e) => { - warn!( - context = %ctx, - attempt = attempt, - error = %e, - "Discovery failed, retrying..." - ); - last_error = Some(e); - if attempt < 3 { - tokio::time::sleep(std::time::Duration::from_millis( - 100 * attempt as u64, - )) - .await; - } - } - } - } - Err(e) => { - warn!( - context = %ctx, - attempt = attempt, - error = %e, - "Connection failed, retrying..." - ); - last_error = Some(e); - if attempt < 3 { - tokio::time::sleep(std::time::Duration::from_millis( - 100 * attempt as u64, - )) - .await; - } - } - } - } - Err(last_error - .expect("retry loop should always set last_error after 3 failed attempts") - .context(format!("Failed after 3 attempts for cluster '{}'", ctx))) + retry_with_backoff( + || async { + // Connect and discover in a single retriable operation + self.get_or_create_client(&ctx).await?; + self.discover_resources_for_context(&ctx, force_refresh) + .await + }, + |_: &anyhow::Error| true, // Always retry on any error + &ctx, + ) + .await + .context(format!("Failed for cluster '{}'", ctx)) } }) .collect(); @@ -781,9 +813,6 @@ impl K8sClientPool { table: &str, ctx_name: &str, ) -> Result> { - const MAX_RETRIES: u32 = 3; - const RETRY_BASE_DELAY_MS: u64 = 100; - // Check if a kube error is retryable (transient failures) let is_retryable = |err: &kube::Error| -> bool { match err { @@ -795,43 +824,10 @@ impl K8sClientPool { } }; - let mut last_error = None; - - for attempt in 0..MAX_RETRIES { - match api.list(params).await { - Ok(list) => return Ok(list), - Err(e) => { - if is_retryable(&e) { - let delay = Duration::from_millis(RETRY_BASE_DELAY_MS * 2u64.pow(attempt)); - warn!( - table = %table, - context = %ctx_name, - attempt = attempt + 1, - max_attempts = MAX_RETRIES, - delay_ms = delay.as_millis(), - error = %e, - "Retryable error, backing off" - ); - tokio::time::sleep(delay).await; - last_error = Some(e); - } else { - debug!( - table = %table, - context = %ctx_name, - error = %e, - "Non-retryable error" - ); - return Err(anyhow!("K8s API error: {}", e)); - } - } - } - } - - Err(anyhow!( - "Failed after {} retries: {}", - MAX_RETRIES, - last_error.map(|e| e.to_string()).unwrap_or_default() - )) + let context = format!("list {} in {}", table, ctx_name); + retry_with_backoff(|| api.list(params), is_retryable, &context) + .await + .map_err(|e| anyhow!("K8s API error: {}", e)) } /// Build ListParams from API filters (label selectors, field selectors) @@ -1050,8 +1046,6 @@ impl K8sClientPool { #[cfg(test)] mod tests { - use super::*; - /// Helper function to test alias building logic in isolation /// This replicates the logic from process_discovered_crds fn build_aliases(