From e3a45806a3782222dde530e68d32b6afe846b1c9 Mon Sep 17 00:00:00 2001 From: wanmail Date: Tue, 17 Dec 2024 14:43:51 +0800 Subject: [PATCH 01/13] add kafka stream --- go.mod | 4 +- go.sum | 47 +++++++++- internal/tailer/logstream/kafka.go | 112 ++++++++++++++++++++++++ internal/tailer/logstream/kafka_test.go | 75 ++++++++++++++++ internal/tailer/logstream/logstream.go | 2 + internal/tailer/tail.go | 2 +- 6 files changed, 238 insertions(+), 4 deletions(-) create mode 100644 internal/tailer/logstream/kafka.go create mode 100644 internal/tailer/logstream/kafka_test.go diff --git a/go.mod b/go.mod index 3ffe06257..08f643767 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.4 github.com/prometheus/common v0.60.0 + github.com/segmentio/kafka-go v0.4.47 go.opencensus.io v0.24.0 golang.org/x/sys v0.26.0 ) @@ -21,10 +22,11 @@ require ( github.com/klauspost/compress v1.17.9 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/uber/jaeger-client-go v2.25.0+incompatible // indirect - golang.org/x/sync v0.7.0 // indirect + golang.org/x/sync v0.8.0 // indirect google.golang.org/api v0.105.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/grpc v1.56.3 // indirect diff --git a/go.sum b/go.sum index ef3523c74..f549adbbc 100644 --- a/go.sum +++ b/go.sum @@ -100,6 +100,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -109,6 +110,9 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -123,6 +127,8 @@ github.com/prometheus/common v0.60.0/go.mod h1:h0LYf1R1deLSKtD4Vdg8gy4RuOvENW2J/ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -134,7 +140,14 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U= github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -147,6 +160,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -176,6 +191,8 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -194,6 +211,11 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -209,8 +231,10 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -231,13 +255,30 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -273,6 +314,8 @@ golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/tailer/logstream/kafka.go b/internal/tailer/logstream/kafka.go new file mode 100644 index 000000000..b3889ca31 --- /dev/null +++ b/internal/tailer/logstream/kafka.go @@ -0,0 +1,112 @@ +package logstream + +import ( + "context" + "errors" + "net/url" + "strings" + "sync" + + "github.com/golang/glog" + "github.com/google/mtail/internal/logline" + "github.com/segmentio/kafka-go" +) + +const ( + KafkaScheme = "kafka" +) + +type kafkaStream struct { + streamBase + config kafka.ReaderConfig + + cancel context.CancelFunc +} + +func parseKafkaURL(u *url.URL) (kafka.ReaderConfig, error) { + config := kafka.ReaderConfig{ + Brokers: []string{u.Host}, + Topic: strings.TrimPrefix(u.Path, "/"), + } + + if u.User.Username() != "" { + config.GroupID = u.User.Username() + } + + // TODO: Add support for more kafka options from url query parameters. + + return config, config.Validate() +} + +func newKafkaStream(ctx context.Context, wg *sync.WaitGroup, u *url.URL, oneShot OneShotMode) (LogStream, error) { + glog.V(2).Infof("newKafkaStream(%s): config", u.String()) + config, err := parseKafkaURL(u) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + ks := &kafkaStream{ + cancel: cancel, + config: config, + streamBase: streamBase{ + sourcename: u.String(), + lines: make(chan *logline.LogLine), + }, + } + + if err := ks.stream(ctx, wg, oneShot); err != nil { + return nil, err + } + + glog.V(2).Infof("newKafkaStream(%s): started stream", ks.sourcename) + + return ks, nil +} + +func (ks *kafkaStream) stream(ctx context.Context, wg *sync.WaitGroup, oneShot OneShotMode) error { + r := kafka.NewReader(ks.config) + + glog.V(2).Infof("stream(%s): opened new reader", ks.sourcename) + + var total int + wg.Add(1) + go func() { + defer wg.Done() + defer func() { + glog.V(2).Infof("stream(%s): read total %d bytes", ks.sourcename, total) + glog.V(2).Infof("stream(%s): closing kafka connection", ks.sourcename) + if err := r.Close(); err != nil { + logErrors.Add(ks.sourcename, 1) + glog.Infof("stream(%s): closing connection: %v", ks.sourcename, err) + } + logCloses.Add(ks.sourcename, 1) + }() + + for { + + m, err := r.ReadMessage(ctx) + + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + glog.V(2).Infof("stream(%s): context cancelled or deadline exceeded", ks.sourcename) + break + } + + if err != nil { + logErrors.Add(ks.sourcename, 1) + glog.V(2).Infof("stream(%s): read error: %v", ks.sourcename, err) + } + + logLines.Add(ks.sourcename, 1) + ks.lines <- logline.New(ctx, ks.sourcename, string(m.Value)) + + if oneShot == OneShotEnabled { + glog.Infof("stream(%s): read one in one shot mode, exiting. Sample data: %s = %s", ks.sourcename, string(m.Key), string(m.Value)) + break + } + } + close(ks.lines) + }() + + return nil +} diff --git a/internal/tailer/logstream/kafka_test.go b/internal/tailer/logstream/kafka_test.go new file mode 100644 index 000000000..b852cfa8a --- /dev/null +++ b/internal/tailer/logstream/kafka_test.go @@ -0,0 +1,75 @@ +package logstream + +import ( + "context" + "fmt" + "math/rand" + "os" + "sync" + "testing" + "time" + + "github.com/google/mtail/internal/logline" + "github.com/google/mtail/internal/testutil" + "github.com/google/mtail/internal/waker" + "github.com/segmentio/kafka-go" +) + +func TestKafkaStreamRead(t *testing.T) { + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + waker, _ := waker.NewTest(ctx, 1, "stream") + + // start kafka test server with docker + // refer to https://hub.docker.com/r/apache/kafka + host := os.Getenv("MTAIL_KAFKA_TEST_HOST") + if host == "" { + t.Log("use default kafka host") + host = "localhost:49092" + } + + topic := fmt.Sprintf("test-%d", rand.Intn(100)) + + conn, err := kafka.DialLeader(ctx, "tcp", host, topic, 0) + if err != nil { + testutil.FatalIfErr(t, err) + } + defer conn.Close() + + // err = testutil.CreateTopic(conn, topic) + // if err != nil { + // testutil.FatalIfErr(t, err) + // } + + consumerGroup := fmt.Sprintf("mtail-test-%d", rand.Intn(100)) + + msg := "yo" + + sourcename := fmt.Sprintf("%s://%s@%s/%s", KafkaScheme, consumerGroup, host, topic) + + t.Log("sourcename", sourcename) + + ks, err := New(ctx, &wg, waker, sourcename, OneShotDisabled) + testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.Background(), Filename: sourcename, Line: msg}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ks.Lines()) + + // write to kafka + n, err := conn.WriteMessages(kafka.Message{Topic: topic, Value: []byte(msg)}) + testutil.FatalIfErr(t, err) + t.Log(n) + + time.Sleep(time.Second * 1) + cancel() + wg.Wait() + + checkLineDiff() + + if v := <-ks.Lines(); v != nil { + t.Errorf("expecting filestream to be complete because stopped") + } +} diff --git a/internal/tailer/logstream/logstream.go b/internal/tailer/logstream/logstream.go index 7dbd5314a..aadbcbf9b 100644 --- a/internal/tailer/logstream/logstream.go +++ b/internal/tailer/logstream/logstream.go @@ -83,6 +83,8 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st return newSocketStream(ctx, wg, waker, u.Scheme, u.Host, oneShot) case "udp": return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, oneShot) + case KafkaScheme: + return newKafkaStream(ctx, wg, u, oneShot) case "", "file": path = u.Path } diff --git a/internal/tailer/tail.go b/internal/tailer/tail.go index cba63ffa9..8a2dfad97 100644 --- a/internal/tailer/tail.go +++ b/internal/tailer/tail.go @@ -194,7 +194,7 @@ func (t *Tailer) AddPattern(pattern string) error { default: glog.V(2).Infof("AddPattern(%v): %v in path pattern %q, treating as path", pattern, ErrUnsupportedURLScheme, u.Scheme) // Leave path alone per log message - case "unix", "unixgram", "tcp", "udp": + case "unix", "unixgram", "tcp", "udp", logstream.KafkaScheme: // Keep the scheme. glog.V(2).Infof("AddPattern(%v): is a socket", path) return t.TailPath(path) From b323ce3dffbfd14310c1e382c2a57feeca96b205 Mon Sep 17 00:00:00 2001 From: wanmail Date: Tue, 17 Dec 2024 14:58:53 +0800 Subject: [PATCH 02/13] add kafka stream documentation --- docs/Deploying.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/Deploying.md b/docs/Deploying.md index ae8773e2e..1277e1f99 100644 --- a/docs/Deploying.md +++ b/docs/Deploying.md @@ -67,6 +67,16 @@ default. Example: `mtail --progs /etc/mtail --logs /var/log/syslog --poll_interval 250ms --poll_log_interval 250ms` +### Consume data in Apache Kafka + +Use `--logs` flag to read data from Apache Kafka. + +You need to convert kafka configuration into URL format. + +You can refer to [parseKafkaURL](../internal//tailer/logstream/kafka.go#L26) function to write URL. + +Example: `mtail --progs /etc/mtail --logs "kafka://test-group@localhost:9092/test-topic"` + ### Setting garbage collection intervals `mtail` accumulates metrics and log files during its operation. By default, From 9c239e54f2b6b99ca43d18c6110e639ee642d37a Mon Sep 17 00:00:00 2001 From: wanmail Date: Tue, 17 Dec 2024 17:53:11 +0800 Subject: [PATCH 03/13] add kafka reader config options parse --- cmd/config-gen/main.go | 110 ++++++++++ internal/tailer/logstream/kafka.go | 6 +- .../logstream/kafka_config_generated.go | 206 ++++++++++++++++++ 3 files changed, 321 insertions(+), 1 deletion(-) create mode 100644 cmd/config-gen/main.go create mode 100644 internal/tailer/logstream/kafka_config_generated.go diff --git a/cmd/config-gen/main.go b/cmd/config-gen/main.go new file mode 100644 index 000000000..f62d94572 --- /dev/null +++ b/cmd/config-gen/main.go @@ -0,0 +1,110 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "os" + "reflect" + + "github.com/segmentio/kafka-go" +) + +const ( + KafkaConfig = "KafkaConfig" +) + +var ( + configType string + targetFile string + targetModule string +) + +func init() { + flag.StringVar(&configType, "type", KafkaConfig, "The type of config to generate") + flag.StringVar(&targetFile, "file", "config_generated.go", "The target file to generate the config from") + flag.StringVar(&targetModule, "module", "main", "The target module name") +} + +func main() { + flag.Parse() + + var p interface{} + + b := new(bytes.Buffer) + + fmt.Fprintf(b, "// Code generated by github.com/wanmail/mtail/cmd/config-gen; DO NOT EDIT.\n") + fmt.Fprintf(b, "package %s\n\n", targetModule) + + fmt.Fprintf(b, "import (\n") + fmt.Fprintf(b, " \"strings\"\n") + fmt.Fprintf(b, " \"time\"\n") + fmt.Fprintf(b, " \"strconv\"\n") + fmt.Fprintf(b, " \"net/url\"\n") + fmt.Fprintf(b, " \"github.com/segmentio/kafka-go\"\n") + fmt.Fprintf(b, ")\n\n\n") + + switch configType { + case KafkaConfig: + p = kafka.ReaderConfig{} + fmt.Fprintf(b, "func parse%s(u *url.URL, config *kafka.ReaderConfig) error {\n\n", configType) + + } + + v := reflect.ValueOf(p) + t := v.Type() + + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + + switch field.Type.Kind() { + case reflect.String: + fmt.Fprintf(b, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(b, " config.%s = %s\n", field.Name, field.Name) + fmt.Fprintf(b, " }\n\n") + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + fmt.Fprintf(b, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(b, " i, err := strconv.Atoi(%s)\n", field.Name) + fmt.Fprintf(b, " if err != nil {\n") + fmt.Fprintf(b, " return err\n") + fmt.Fprintf(b, " }\n") + switch field.Type.String() { + case "int": + fmt.Fprintf(b, " config.%s = i\n", field.Name) + case "time.Duration": + fmt.Fprintf(b, " config.%s = time.Second * time.Duration(i)\n", field.Name) + default: + fmt.Fprintf(b, " config.%s = %s(i)\n", field.Name, field.Type.String()) + } + fmt.Fprintf(b, " }\n\n") + + case reflect.Bool: + fmt.Fprintf(b, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(b, " b, err := strconv.ParseBool(%s)\n", field.Name) + fmt.Fprintf(b, " if err != nil {\n") + fmt.Fprintf(b, " return err\n") + fmt.Fprintf(b, " }\n") + fmt.Fprintf(b, " config.%s = b\n", field.Name) + fmt.Fprintf(b, " }\n\n") + + case reflect.Slice: + if field.Type.Elem().Kind() == reflect.String { + fmt.Fprintf(b, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(b, " config.%s = strings.Split(%s, \",\")\n", field.Name, field.Name) + fmt.Fprintf(b, " }\n\n") + } + default: + fmt.Fprintf(b, " // %s is not supported\n\n", field.Name) + + } + } + + fmt.Fprintf(b, " return nil\n") + fmt.Fprintf(b, "}\n") + + err := os.WriteFile(targetFile, b.Bytes(), 0644) + if err != nil { + fmt.Println("Failed to write file:", err) + os.Exit(1) + } +} diff --git a/internal/tailer/logstream/kafka.go b/internal/tailer/logstream/kafka.go index b3889ca31..2e21294bf 100644 --- a/internal/tailer/logstream/kafka.go +++ b/internal/tailer/logstream/kafka.go @@ -23,6 +23,8 @@ type kafkaStream struct { cancel context.CancelFunc } +//go:generate go run ./../../../cmd/config-gen/main.go -type KafkaConfig -file kafka_config_generated.go -module logstream + func parseKafkaURL(u *url.URL) (kafka.ReaderConfig, error) { config := kafka.ReaderConfig{ Brokers: []string{u.Host}, @@ -33,7 +35,9 @@ func parseKafkaURL(u *url.URL) (kafka.ReaderConfig, error) { config.GroupID = u.User.Username() } - // TODO: Add support for more kafka options from url query parameters. + if err := parseKafkaConfig(u, &config); err != nil { + return config, err + } return config, config.Validate() } diff --git a/internal/tailer/logstream/kafka_config_generated.go b/internal/tailer/logstream/kafka_config_generated.go new file mode 100644 index 000000000..05fed1be3 --- /dev/null +++ b/internal/tailer/logstream/kafka_config_generated.go @@ -0,0 +1,206 @@ +// Code generated by github.com/wanqian/cspm-go/codegen; DO NOT EDIT. +package logstream + +import ( + "strings" + "time" + "strconv" + "net/url" + "github.com/segmentio/kafka-go" +) + + +func parseKafkaConfig(u *url.URL, config *kafka.ReaderConfig) error { + + if Brokers := u.Query().Get("Brokers"); Brokers != "" { + config.Brokers = strings.Split(Brokers, ",") + } + + if GroupID := u.Query().Get("GroupID"); GroupID != "" { + config.GroupID = GroupID + } + + if GroupTopics := u.Query().Get("GroupTopics"); GroupTopics != "" { + config.GroupTopics = strings.Split(GroupTopics, ",") + } + + if Topic := u.Query().Get("Topic"); Topic != "" { + config.Topic = Topic + } + + if Partition := u.Query().Get("Partition"); Partition != "" { + i, err := strconv.Atoi(Partition) + if err != nil { + return err + } + config.Partition = i + } + + // Dialer is not supported + + if QueueCapacity := u.Query().Get("QueueCapacity"); QueueCapacity != "" { + i, err := strconv.Atoi(QueueCapacity) + if err != nil { + return err + } + config.QueueCapacity = i + } + + if MinBytes := u.Query().Get("MinBytes"); MinBytes != "" { + i, err := strconv.Atoi(MinBytes) + if err != nil { + return err + } + config.MinBytes = i + } + + if MaxBytes := u.Query().Get("MaxBytes"); MaxBytes != "" { + i, err := strconv.Atoi(MaxBytes) + if err != nil { + return err + } + config.MaxBytes = i + } + + if MaxWait := u.Query().Get("MaxWait"); MaxWait != "" { + i, err := strconv.Atoi(MaxWait) + if err != nil { + return err + } + config.MaxWait = time.Second * time.Duration(i) + } + + if ReadBatchTimeout := u.Query().Get("ReadBatchTimeout"); ReadBatchTimeout != "" { + i, err := strconv.Atoi(ReadBatchTimeout) + if err != nil { + return err + } + config.ReadBatchTimeout = time.Second * time.Duration(i) + } + + if ReadLagInterval := u.Query().Get("ReadLagInterval"); ReadLagInterval != "" { + i, err := strconv.Atoi(ReadLagInterval) + if err != nil { + return err + } + config.ReadLagInterval = time.Second * time.Duration(i) + } + + if HeartbeatInterval := u.Query().Get("HeartbeatInterval"); HeartbeatInterval != "" { + i, err := strconv.Atoi(HeartbeatInterval) + if err != nil { + return err + } + config.HeartbeatInterval = time.Second * time.Duration(i) + } + + if CommitInterval := u.Query().Get("CommitInterval"); CommitInterval != "" { + i, err := strconv.Atoi(CommitInterval) + if err != nil { + return err + } + config.CommitInterval = time.Second * time.Duration(i) + } + + if PartitionWatchInterval := u.Query().Get("PartitionWatchInterval"); PartitionWatchInterval != "" { + i, err := strconv.Atoi(PartitionWatchInterval) + if err != nil { + return err + } + config.PartitionWatchInterval = time.Second * time.Duration(i) + } + + if WatchPartitionChanges := u.Query().Get("WatchPartitionChanges"); WatchPartitionChanges != "" { + b, err := strconv.ParseBool(WatchPartitionChanges) + if err != nil { + return err + } + config.WatchPartitionChanges = b + } + + if SessionTimeout := u.Query().Get("SessionTimeout"); SessionTimeout != "" { + i, err := strconv.Atoi(SessionTimeout) + if err != nil { + return err + } + config.SessionTimeout = time.Second * time.Duration(i) + } + + if RebalanceTimeout := u.Query().Get("RebalanceTimeout"); RebalanceTimeout != "" { + i, err := strconv.Atoi(RebalanceTimeout) + if err != nil { + return err + } + config.RebalanceTimeout = time.Second * time.Duration(i) + } + + if JoinGroupBackoff := u.Query().Get("JoinGroupBackoff"); JoinGroupBackoff != "" { + i, err := strconv.Atoi(JoinGroupBackoff) + if err != nil { + return err + } + config.JoinGroupBackoff = time.Second * time.Duration(i) + } + + if RetentionTime := u.Query().Get("RetentionTime"); RetentionTime != "" { + i, err := strconv.Atoi(RetentionTime) + if err != nil { + return err + } + config.RetentionTime = time.Second * time.Duration(i) + } + + if StartOffset := u.Query().Get("StartOffset"); StartOffset != "" { + i, err := strconv.Atoi(StartOffset) + if err != nil { + return err + } + config.StartOffset = int64(i) + } + + if ReadBackoffMin := u.Query().Get("ReadBackoffMin"); ReadBackoffMin != "" { + i, err := strconv.Atoi(ReadBackoffMin) + if err != nil { + return err + } + config.ReadBackoffMin = time.Second * time.Duration(i) + } + + if ReadBackoffMax := u.Query().Get("ReadBackoffMax"); ReadBackoffMax != "" { + i, err := strconv.Atoi(ReadBackoffMax) + if err != nil { + return err + } + config.ReadBackoffMax = time.Second * time.Duration(i) + } + + // Logger is not supported + + // ErrorLogger is not supported + + if IsolationLevel := u.Query().Get("IsolationLevel"); IsolationLevel != "" { + i, err := strconv.Atoi(IsolationLevel) + if err != nil { + return err + } + config.IsolationLevel = kafka.IsolationLevel(i) + } + + if MaxAttempts := u.Query().Get("MaxAttempts"); MaxAttempts != "" { + i, err := strconv.Atoi(MaxAttempts) + if err != nil { + return err + } + config.MaxAttempts = i + } + + if OffsetOutOfRangeError := u.Query().Get("OffsetOutOfRangeError"); OffsetOutOfRangeError != "" { + b, err := strconv.ParseBool(OffsetOutOfRangeError) + if err != nil { + return err + } + config.OffsetOutOfRangeError = b + } + + return nil +} From fd5e60c3d84963f031937be360a40902da9599d9 Mon Sep 17 00:00:00 2001 From: wanmail Date: Thu, 26 Dec 2024 15:07:35 +0800 Subject: [PATCH 04/13] handle kafka connection errors --- internal/tailer/logstream/kafka.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/tailer/logstream/kafka.go b/internal/tailer/logstream/kafka.go index 2e21294bf..5c4790c96 100644 --- a/internal/tailer/logstream/kafka.go +++ b/internal/tailer/logstream/kafka.go @@ -96,6 +96,11 @@ func (ks *kafkaStream) stream(ctx context.Context, wg *sync.WaitGroup, oneShot O break } + if IsExitableError(err) { + glog.V(2).Infof("stream(%s): exiting, conn has error %s", ks.sourcename, err) + break + } + if err != nil { logErrors.Add(ks.sourcename, 1) glog.V(2).Infof("stream(%s): read error: %v", ks.sourcename, err) From 8cb78d0931d7a4ad6ba9d51a6d78de3e7f6e8204 Mon Sep 17 00:00:00 2001 From: wanmail Date: Wed, 8 Jan 2025 17:35:54 +0800 Subject: [PATCH 05/13] change UT package --- internal/tailer/logstream/kafka_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/tailer/logstream/kafka_test.go b/internal/tailer/logstream/kafka_test.go index b852cfa8a..b5275d101 100644 --- a/internal/tailer/logstream/kafka_test.go +++ b/internal/tailer/logstream/kafka_test.go @@ -1,4 +1,4 @@ -package logstream +package logstream_test import ( "context" @@ -10,6 +10,7 @@ import ( "time" "github.com/google/mtail/internal/logline" + "github.com/google/mtail/internal/tailer/logstream" "github.com/google/mtail/internal/testutil" "github.com/google/mtail/internal/waker" "github.com/segmentio/kafka-go" @@ -46,11 +47,11 @@ func TestKafkaStreamRead(t *testing.T) { msg := "yo" - sourcename := fmt.Sprintf("%s://%s@%s/%s", KafkaScheme, consumerGroup, host, topic) + sourcename := fmt.Sprintf("%s://%s@%s/%s", logstream.KafkaScheme, consumerGroup, host, topic) t.Log("sourcename", sourcename) - ks, err := New(ctx, &wg, waker, sourcename, OneShotDisabled) + ks, err := logstream.New(ctx, &wg, waker, sourcename, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) expected := []*logline.LogLine{ From 97ea79226156133d4018fc7aeec9cffa9824716b Mon Sep 17 00:00:00 2001 From: wanmail Date: Wed, 8 Jan 2025 17:37:21 +0800 Subject: [PATCH 06/13] add support for S3 file stream --- cmd/config-gen/main.go | 101 ++++--- go.mod | 18 ++ go.sum | 36 +++ internal/tailer/logstream/awss3.go | 247 ++++++++++++++++++ internal/tailer/logstream/logstream.go | 2 + .../tailer/logstream/s3_config_generated.go | 77 ++++++ internal/tailer/tail.go | 2 +- 7 files changed, 444 insertions(+), 39 deletions(-) create mode 100644 internal/tailer/logstream/awss3.go create mode 100644 internal/tailer/logstream/s3_config_generated.go diff --git a/cmd/config-gen/main.go b/cmd/config-gen/main.go index f62d94572..bc204e7ea 100644 --- a/cmd/config-gen/main.go +++ b/cmd/config-gen/main.go @@ -7,11 +7,13 @@ import ( "os" "reflect" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/segmentio/kafka-go" ) const ( KafkaConfig = "KafkaConfig" + S3Config = "S3Config" ) var ( @@ -29,26 +31,30 @@ func init() { func main() { flag.Parse() + ims := make(map[string]bool) + + ims["net/url"] = true + var p interface{} - b := new(bytes.Buffer) + // file header buffer + hb := new(bytes.Buffer) - fmt.Fprintf(b, "// Code generated by github.com/wanmail/mtail/cmd/config-gen; DO NOT EDIT.\n") - fmt.Fprintf(b, "package %s\n\n", targetModule) + // function buffer + fb := new(bytes.Buffer) - fmt.Fprintf(b, "import (\n") - fmt.Fprintf(b, " \"strings\"\n") - fmt.Fprintf(b, " \"time\"\n") - fmt.Fprintf(b, " \"strconv\"\n") - fmt.Fprintf(b, " \"net/url\"\n") - fmt.Fprintf(b, " \"github.com/segmentio/kafka-go\"\n") - fmt.Fprintf(b, ")\n\n\n") + fmt.Fprintf(hb, "// Code generated by github.com/wanmail/mtail/cmd/config-gen; DO NOT EDIT.\n") + fmt.Fprintf(hb, "package %s\n\n", targetModule) switch configType { case KafkaConfig: + ims["github.com/segmentio/kafka-go"] = true p = kafka.ReaderConfig{} - fmt.Fprintf(b, "func parse%s(u *url.URL, config *kafka.ReaderConfig) error {\n\n", configType) - + fmt.Fprintf(fb, "func parse%s(u *url.URL, config *kafka.ReaderConfig) error {\n\n", configType) + case S3Config: + ims["github.com/aws/aws-sdk-go-v2/aws"] = true + p = aws.Config{} + fmt.Fprintf(fb, "func parse%s(u *url.URL, config *aws.Config) error {\n\n", configType) } v := reflect.ValueOf(p) @@ -59,50 +65,69 @@ func main() { switch field.Type.Kind() { case reflect.String: - fmt.Fprintf(b, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) - fmt.Fprintf(b, " config.%s = %s\n", field.Name, field.Name) - fmt.Fprintf(b, " }\n\n") + switch field.Type.String() { + case "string": + fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(fb, " config.%s = %s\n", field.Name, field.Name) + fmt.Fprintf(fb, " }\n\n") + default: + fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(fb, " config.%s = %s(%s)\n", field.Name, field.Type.String(), field.Name) + fmt.Fprintf(fb, " }\n\n") + } + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: - fmt.Fprintf(b, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) - fmt.Fprintf(b, " i, err := strconv.Atoi(%s)\n", field.Name) - fmt.Fprintf(b, " if err != nil {\n") - fmt.Fprintf(b, " return err\n") - fmt.Fprintf(b, " }\n") + ims["strconv"] = true + fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(fb, " i, err := strconv.Atoi(%s)\n", field.Name) + fmt.Fprintf(fb, " if err != nil {\n") + fmt.Fprintf(fb, " return err\n") + fmt.Fprintf(fb, " }\n") switch field.Type.String() { case "int": - fmt.Fprintf(b, " config.%s = i\n", field.Name) + fmt.Fprintf(fb, " config.%s = i\n", field.Name) case "time.Duration": - fmt.Fprintf(b, " config.%s = time.Second * time.Duration(i)\n", field.Name) + ims["time"] = true + fmt.Fprintf(fb, " config.%s = time.Second * time.Duration(i)\n", field.Name) default: - fmt.Fprintf(b, " config.%s = %s(i)\n", field.Name, field.Type.String()) + fmt.Fprintf(fb, " config.%s = %s(i)\n", field.Name, field.Type.String()) } - fmt.Fprintf(b, " }\n\n") + fmt.Fprintf(fb, " }\n\n") case reflect.Bool: - fmt.Fprintf(b, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) - fmt.Fprintf(b, " b, err := strconv.ParseBool(%s)\n", field.Name) - fmt.Fprintf(b, " if err != nil {\n") - fmt.Fprintf(b, " return err\n") - fmt.Fprintf(b, " }\n") - fmt.Fprintf(b, " config.%s = b\n", field.Name) - fmt.Fprintf(b, " }\n\n") + fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(fb, " b, err := strconv.ParseBool(%s)\n", field.Name) + fmt.Fprintf(fb, " if err != nil {\n") + fmt.Fprintf(fb, " return err\n") + fmt.Fprintf(fb, " }\n") + fmt.Fprintf(fb, " config.%s = b\n", field.Name) + fmt.Fprintf(fb, " }\n\n") case reflect.Slice: if field.Type.Elem().Kind() == reflect.String { - fmt.Fprintf(b, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) - fmt.Fprintf(b, " config.%s = strings.Split(%s, \",\")\n", field.Name, field.Name) - fmt.Fprintf(b, " }\n\n") + ims["strings"] = true + fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(fb, " config.%s = strings.Split(%s, \",\")\n", field.Name, field.Name) + fmt.Fprintf(fb, " }\n\n") } default: - fmt.Fprintf(b, " // %s is not supported\n\n", field.Name) + fmt.Fprintf(fb, " // %s is not supported\n\n", field.Name) } } - fmt.Fprintf(b, " return nil\n") - fmt.Fprintf(b, "}\n") + fmt.Fprintf(hb, "import (\n") + for im := range ims { + fmt.Fprintf(hb, " \"%s\"\n", im) + } + fmt.Fprintf(hb, ")\n\n") + + fmt.Fprintf(fb, " return nil\n") + fmt.Fprintf(fb, "}\n") + + hb.Write(fb.Bytes()) - err := os.WriteFile(targetFile, b.Bytes(), 0644) + err := os.WriteFile(targetFile, hb.Bytes(), 0644) if err != nil { fmt.Println("Failed to write file:", err) os.Exit(1) diff --git a/go.mod b/go.mod index 08f643767..636dddbfc 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,10 @@ go 1.21.1 require ( contrib.go.opencensus.io/exporter/jaeger v0.2.1 + github.com/aws/aws-sdk-go-v2 v1.32.7 + github.com/aws/aws-sdk-go-v2/config v1.28.7 + github.com/aws/aws-sdk-go-v2/service/s3 v1.71.1 + github.com/aws/smithy-go v1.22.1 github.com/golang/glog v1.2.2 github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/google/go-cmp v0.6.0 @@ -16,6 +20,20 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.48 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.26 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.26 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.26 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.7 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.8 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.7 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.3 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/golang/protobuf v1.5.3 // indirect diff --git a/go.sum b/go.sum index f549adbbc..02f653344 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,42 @@ contrib.go.opencensus.io/exporter/jaeger v0.2.1/go.mod h1:Y8IsLgdxqh1QxYxPC5IgXV dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/aws/aws-sdk-go-v2 v1.32.7 h1:ky5o35oENWi0JYWUZkB7WYvVPP+bcRF5/Iq7JWSb5Rw= +github.com/aws/aws-sdk-go-v2 v1.32.7/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc= +github.com/aws/aws-sdk-go-v2/config v1.28.7 h1:GduUnoTXlhkgnxTD93g1nv4tVPILbdNQOzav+Wpg7AE= +github.com/aws/aws-sdk-go-v2/config v1.28.7/go.mod h1:vZGX6GVkIE8uECSUHB6MWAUsd4ZcG2Yq/dMa4refR3M= +github.com/aws/aws-sdk-go-v2/credentials v1.17.48 h1:IYdLD1qTJ0zanRavulofmqut4afs45mOWEI+MzZtTfQ= +github.com/aws/aws-sdk-go-v2/credentials v1.17.48/go.mod h1:tOscxHN3CGmuX9idQ3+qbkzrjVIx32lqDSU1/0d/qXs= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22 h1:kqOrpojG71DxJm/KDPO+Z/y1phm1JlC8/iT+5XRmAn8= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22/go.mod h1:NtSFajXVVL8TA2QNngagVZmUtXciyrHOt7xgz4faS/M= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.26 h1:I/5wmGMffY4happ8NOCuIUEWGUvvFp5NSeQcXl9RHcI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.26/go.mod h1:FR8f4turZtNy6baO0KJ5FJUmXH/cSkI9fOngs0yl6mA= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.26 h1:zXFLuEuMMUOvEARXFUVJdfqZ4bvvSgdGRq/ATcrQxzM= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.26/go.mod h1:3o2Wpy0bogG1kyOPrgkXA8pgIfEEv0+m19O9D5+W8y8= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.26 h1:GeNJsIFHB+WW5ap2Tec4K6dzcVTsRbsT1Lra46Hv9ME= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.26/go.mod h1:zfgMpwHDXX2WGoG84xG2H+ZlPTkJUU4YUvx2svLQYWo= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.7 h1:tB4tNw83KcajNAzaIMhkhVI2Nt8fAZd5A5ro113FEMY= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.7/go.mod h1:lvpyBGkZ3tZ9iSsUIcC2EWp+0ywa7aK3BLT+FwZi+mQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.7 h1:8eUsivBQzZHqe/3FE+cqwfH+0p5Jo8PFM/QYQSmeZ+M= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.7/go.mod h1:kLPQvGUmxn/fqiCrDeohwG33bq2pQpGeY62yRO6Nrh0= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.7 h1:Hi0KGbrnr57bEHWM0bJ1QcBzxLrL/k2DHvGYhb8+W1w= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.7/go.mod h1:wKNgWgExdjjrm4qvfbTorkvocEstaoDl4WCvGfeCy9c= +github.com/aws/aws-sdk-go-v2/service/s3 v1.71.1 h1:aOVVZJgWbaH+EJYPvEgkNhCEbXXvH7+oML36oaPK3zE= +github.com/aws/aws-sdk-go-v2/service/s3 v1.71.1/go.mod h1:r+xl5yzMk9083rMR+sJ5TYj9Tihvf/l1oxzZXDgGj2Q= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.8 h1:CvuUmnXI7ebaUAhbJcDy9YQx8wHR69eZ9I7q5hszt/g= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.8/go.mod h1:XDeGv1opzwm8ubxddF0cgqkZWsyOtw4lr6dxwmb6YQg= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.7 h1:F2rBfNAL5UyswqoeWv9zs74N/NanhK16ydHW1pahX6E= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.7/go.mod h1:JfyQ0g2JG8+Krq0EuZNnRwX0mU0HrwY/tG6JNfcqh4k= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.3 h1:Xgv/hyNgvLda/M9l9qxXc4UFSgppnRczLxlMs5Ae/QY= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.3/go.mod h1:5Gn+d+VaaRgsjewpMvGazt0WfcFO+Md4wLOuBfGR9Bc= +github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= +github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/internal/tailer/logstream/awss3.go b/internal/tailer/logstream/awss3.go new file mode 100644 index 000000000..bbe9f529c --- /dev/null +++ b/internal/tailer/logstream/awss3.go @@ -0,0 +1,247 @@ +package logstream + +import ( + "context" + "errors" + "net/url" + "regexp" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + cfg "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/smithy-go" + "github.com/golang/glog" + "github.com/google/mtail/internal/logline" + "github.com/google/mtail/internal/waker" +) + +const ( + AWS3Scheme = "s3" +) + +func IsAWSExitableError(err error) bool { + if err == nil { + return false + } + + var ae smithy.APIError + + if errors.As(err, &ae) { + switch ae.ErrorCode() { + case "AccessDenied", "UnauthorizedOperation": + return true + default: + return false + } + } + + return false +} + +type s3BucketConfig struct { + config aws.Config + + bucket string + + prefix string + + pattern string + re *regexp.Regexp + + // init time + lastModified time.Time + + lastKey string +} + +type s3Stream struct { + streamBase + s3BucketConfig + + cancel context.CancelFunc +} + +//go:generate go run ./../../../cmd/config-gen/main.go -type S3Config -file s3_config_generated.go -module logstream + +func parseS3URL(u *url.URL) (s3BucketConfig, error) { + config := s3BucketConfig{} + + if u.Host == "" { + return config, errors.New("S3 URL must contain a host as bucket") + } + config.bucket = u.Host + config.prefix = strings.TrimPrefix(u.Path, "/") + + if pattern := u.Query().Get("Pattern"); pattern != "" { + config.pattern = pattern + } else { + config.pattern = ".*" + } + + re, err := regexp.Compile(config.pattern) + if err != nil { + return config, err + } + config.re = re + + if lastModified := u.Query().Get("LastModified"); lastModified != "" { + t, err := time.Parse(time.RFC3339, lastModified) + if err != nil { + return config, err + } + config.lastModified = t + } else { + config.lastModified = time.Now() + } + + if lastKey := u.Query().Get("LastKey"); lastKey != "" { + config.lastKey = lastKey + } + + config.config, _ = cfg.LoadDefaultConfig(context.TODO()) + + if err := parseS3Config(u, &config.config); err != nil { + return config, err + } + + return config, nil +} + +func newAWSS3Stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, u *url.URL, oneShot OneShotMode) (LogStream, error) { + glog.V(2).Infof("newAWSS3Stream(%s): config", u.String()) + config, err := parseS3URL(u) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + fs := &s3Stream{ + s3BucketConfig: config, + cancel: cancel, + streamBase: streamBase{ + sourcename: u.Host, + lines: make(chan *logline.LogLine), + }, + } + + if err := fs.stream(ctx, wg, waker, oneShot); err != nil { + return nil, err + } + + glog.V(2).Infof("newKafkaStream(%s): started stream", fs.sourcename) + + return fs, nil +} + +func (fs *s3Stream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error { + client := s3.NewFromConfig(fs.config) + + var total int + wg.Add(1) + go func() { + defer wg.Done() + defer func() { + glog.V(2).Infof("stream(%s): read total %d bytes", fs.sourcename, total) + glog.Info("stream(%s): last key %s", fs.lastKey) + close(fs.lines) + fs.cancel() + }() + + for { + input := &s3.ListObjectsV2Input{ + Bucket: &fs.bucket, + Prefix: &fs.prefix, + } + + if fs.lastKey != "" { + input.StartAfter = &fs.lastKey + } + + paginator := s3.NewListObjectsV2Paginator(client, input) + + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + if IsAWSExitableError(err) { + glog.V(2).Infof("stream(%s): exiting, conn has AWS error %s", fs.sourcename, err) + return + } + + logErrors.Add(fs.sourcename, 1) + glog.Infof("stream(%s): error listing objects: %v", fs.sourcename, err) + break + } + for _, obj := range page.Contents { + fs.lastKey = *obj.Key + + // Skip files that are older than the time or do not match the pattern. + if obj.LastModified.Before(fs.lastModified) || !fs.re.MatchString(*obj.Key) { + continue + } + + out, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &fs.bucket, + Key: obj.Key, + }) + + if err != nil { + if IsAWSExitableError(err) { + glog.V(2).Infof("stream(%s): exiting, conn has AWS error %s", fs.sourcename, err) + return + } + + if ctx.Err() != nil { + // The context has been cancelled, so exit directly. + glog.V(2).Infof("stream(%s): context cancelled, exiting directly", fs.sourcename) + return + } + + logErrors.Add(fs.sourcename, 1) + glog.Infof("stream(%s): error getting object: %v", fs.sourcename, err) + continue + } + + logOpens.Add(fs.sourcename, 1) + glog.V(2).Infof("stream(%s): opened new file (%s)", fs.sourcename, *obj.Key) + + defer out.Body.Close() + + lr := NewLineReader(fs.sourcename, fs.lines, out.Body, defaultReadBufferSize, fs.cancel) + + n, err := lr.ReadAndSend(ctx) + + if n > 0 { + total += n + + // No error implies there is more to read so restart the loop. + if err == nil && ctx.Err() == nil { + continue + } + } + + if oneShot == OneShotEnabled { + glog.Infof("stream(%s): oneshot mode, exiting", fs.sourcename) + return + } + } + } + + // Wait for wakeup or termination. + glog.V(2).Infof("stream(%s): waiting", fs.sourcename) + select { + case <-ctx.Done(): + // Exit directly. + glog.V(2).Infof("stream(%s): context cancelled, exiting directly", fs.sourcename) + return + case <-waker.Wake(): + // sleep until next Wake() + glog.V(2).Infof("stream(%s): Wake received", fs.sourcename) + } + } + }() + + return nil +} diff --git a/internal/tailer/logstream/logstream.go b/internal/tailer/logstream/logstream.go index aadbcbf9b..3036b2b34 100644 --- a/internal/tailer/logstream/logstream.go +++ b/internal/tailer/logstream/logstream.go @@ -85,6 +85,8 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, oneShot) case KafkaScheme: return newKafkaStream(ctx, wg, u, oneShot) + case AWS3Scheme: + return newAWSS3Stream(ctx, wg, waker, u, oneShot) case "", "file": path = u.Path } diff --git a/internal/tailer/logstream/s3_config_generated.go b/internal/tailer/logstream/s3_config_generated.go new file mode 100644 index 000000000..bb72965f4 --- /dev/null +++ b/internal/tailer/logstream/s3_config_generated.go @@ -0,0 +1,77 @@ +// Code generated by github.com/wanmail/mtail/cmd/config-gen; DO NOT EDIT. +package logstream + +import ( + "net/url" + "github.com/aws/aws-sdk-go-v2/aws" + "strconv" +) + +func parseS3Config(u *url.URL, config *aws.Config) error { + + if Region := u.Query().Get("Region"); Region != "" { + config.Region = Region + } + + // Credentials is not supported + + // BearerAuthTokenProvider is not supported + + // HTTPClient is not supported + + // EndpointResolver is not supported + + // EndpointResolverWithOptions is not supported + + if RetryMaxAttempts := u.Query().Get("RetryMaxAttempts"); RetryMaxAttempts != "" { + i, err := strconv.Atoi(RetryMaxAttempts) + if err != nil { + return err + } + config.RetryMaxAttempts = i + } + + if RetryMode := u.Query().Get("RetryMode"); RetryMode != "" { + config.RetryMode = aws.RetryMode(RetryMode) + } + + // Retryer is not supported + + // Logger is not supported + + // ClientLogMode is not supported + + if DefaultsMode := u.Query().Get("DefaultsMode"); DefaultsMode != "" { + config.DefaultsMode = aws.DefaultsMode(DefaultsMode) + } + + // RuntimeEnvironment is not supported + + if AppID := u.Query().Get("AppID"); AppID != "" { + config.AppID = AppID + } + + // BaseEndpoint is not supported + + if DisableRequestCompression := u.Query().Get("DisableRequestCompression"); DisableRequestCompression != "" { + b, err := strconv.ParseBool(DisableRequestCompression) + if err != nil { + return err + } + config.DisableRequestCompression = b + } + + if RequestMinCompressSizeBytes := u.Query().Get("RequestMinCompressSizeBytes"); RequestMinCompressSizeBytes != "" { + i, err := strconv.Atoi(RequestMinCompressSizeBytes) + if err != nil { + return err + } + config.RequestMinCompressSizeBytes = int64(i) + } + + if AccountIDEndpointMode := u.Query().Get("AccountIDEndpointMode"); AccountIDEndpointMode != "" { + config.AccountIDEndpointMode = aws.AccountIDEndpointMode(AccountIDEndpointMode) + } + + return nil +} diff --git a/internal/tailer/tail.go b/internal/tailer/tail.go index 8a2dfad97..b2e33ad05 100644 --- a/internal/tailer/tail.go +++ b/internal/tailer/tail.go @@ -194,7 +194,7 @@ func (t *Tailer) AddPattern(pattern string) error { default: glog.V(2).Infof("AddPattern(%v): %v in path pattern %q, treating as path", pattern, ErrUnsupportedURLScheme, u.Scheme) // Leave path alone per log message - case "unix", "unixgram", "tcp", "udp", logstream.KafkaScheme: + case "unix", "unixgram", "tcp", "udp", logstream.KafkaScheme, logstream.AWS3Scheme: // Keep the scheme. glog.V(2).Infof("AddPattern(%v): is a socket", path) return t.TailPath(path) From a19e738c48be648fde21eccfe15c5978b58f8150 Mon Sep 17 00:00:00 2001 From: wanmail Date: Thu, 9 Jan 2025 17:15:47 +0800 Subject: [PATCH 07/13] AWS S3: set filename as S3 key --- internal/tailer/logstream/awss3.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/tailer/logstream/awss3.go b/internal/tailer/logstream/awss3.go index bbe9f529c..2732b1666 100644 --- a/internal/tailer/logstream/awss3.go +++ b/internal/tailer/logstream/awss3.go @@ -209,7 +209,7 @@ func (fs *s3Stream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker. defer out.Body.Close() - lr := NewLineReader(fs.sourcename, fs.lines, out.Body, defaultReadBufferSize, fs.cancel) + lr := NewLineReader(*obj.Key, fs.lines, out.Body, defaultReadBufferSize, fs.cancel) n, err := lr.ReadAndSend(ctx) From d7e5deeab89c50e99cafe6c372fbdeda490f3bad Mon Sep 17 00:00:00 2001 From: wanmail Date: Thu, 9 Jan 2025 17:43:57 +0800 Subject: [PATCH 08/13] AWS S3: support gzip decompressed --- internal/tailer/logstream/awss3.go | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/internal/tailer/logstream/awss3.go b/internal/tailer/logstream/awss3.go index 2732b1666..6867b306d 100644 --- a/internal/tailer/logstream/awss3.go +++ b/internal/tailer/logstream/awss3.go @@ -1,6 +1,7 @@ package logstream import ( + "compress/gzip" "context" "errors" "net/url" @@ -41,6 +42,11 @@ func IsAWSExitableError(err error) bool { return false } +const ( + PLAIN = "plain" + GZIP = "gzip" +) + type s3BucketConfig struct { config aws.Config @@ -55,6 +61,8 @@ type s3BucketConfig struct { lastModified time.Time lastKey string + + format string } type s3Stream struct { @@ -101,6 +109,12 @@ func parseS3URL(u *url.URL) (s3BucketConfig, error) { config.lastKey = lastKey } + if format := u.Query().Get("Format"); format != "" { + config.format = format + } else { + config.format = PLAIN + } + config.config, _ = cfg.LoadDefaultConfig(context.TODO()) if err := parseS3Config(u, &config.config); err != nil { @@ -209,7 +223,21 @@ func (fs *s3Stream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker. defer out.Body.Close() - lr := NewLineReader(*obj.Key, fs.lines, out.Body, defaultReadBufferSize, fs.cancel) + var lr *LineReader + + switch fs.format { + case GZIP: + gr, err := gzip.NewReader(out.Body) + if err != nil { + logErrors.Add(fs.sourcename, 1) + glog.Infof("stream(%s): error creating gzip reader: %v", fs.sourcename, err) + continue + } + + lr = NewLineReader(*obj.Key, fs.lines, gr, defaultReadBufferSize, fs.cancel) + default: + lr = NewLineReader(*obj.Key, fs.lines, out.Body, defaultReadBufferSize, fs.cancel) + } n, err := lr.ReadAndSend(ctx) From 23195b7b0ec0d9ea7b8f69b58aff22d31d2cc794 Mon Sep 17 00:00:00 2001 From: wanmail Date: Thu, 9 Jan 2025 17:55:12 +0800 Subject: [PATCH 09/13] AWS S3: Add UT --- internal/tailer/logstream/awss3_test.go | 143 ++++++++++++++++++++++++ internal/tailer/logstream/kafka_test.go | 5 +- 2 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 internal/tailer/logstream/awss3_test.go diff --git a/internal/tailer/logstream/awss3_test.go b/internal/tailer/logstream/awss3_test.go new file mode 100644 index 000000000..52018677c --- /dev/null +++ b/internal/tailer/logstream/awss3_test.go @@ -0,0 +1,143 @@ +package logstream_test + +import ( + "bytes" + "compress/gzip" + "context" + "fmt" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/google/mtail/internal/logline" + "github.com/google/mtail/internal/tailer/logstream" + "github.com/google/mtail/internal/testutil" + "github.com/google/mtail/internal/waker" +) + +func TestS3StreamRead(t *testing.T) { + var wg sync.WaitGroup + + bucket := os.Getenv("MTAIL_AWSS3_TEST_BUCKET") + if bucket == "" { + t.Skip("MTAIL_AWSS3_TEST_BUCKET not set") + } + + region := "us-east-1" + + key := "testdata/yo.txt" + + sourcename := fmt.Sprintf("s3://%s/%s/?Region=%s", bucket, "testdata", region) + + ctx, cancel := context.WithCancel(context.Background()) + waker, awaken := waker.NewTest(ctx, 1, "stream") + fs, err := logstream.New(ctx, &wg, waker, sourcename, logstream.OneShotDisabled) + testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: key, Line: "yo"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + + time.Sleep(time.Second * 3) + + awaken(1, 1) // synchronise past first read + + cfg, _ := config.LoadDefaultConfig(context.Background()) + client := s3.NewFromConfig(cfg) + _, err = client.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: &bucket, + Key: aws.String(key), + Body: strings.NewReader("yo\n"), + }) + testutil.FatalIfErr(t, err) + + defer func() { + client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: &bucket, + Key: aws.String(key), + }) + }() + + awaken(1, 1) + + time.Sleep(time.Second * 1) + + cancel() + wg.Wait() + + checkLineDiff() + + if v := <-fs.Lines(); v != nil { + t.Errorf("expecting filestream to be complete because stopped") + } +} + +func TestS3StreamReadGzip(t *testing.T) { + var wg sync.WaitGroup + + bucket := os.Getenv("MTAIL_AWSS3_TEST_BUCKET") + if bucket == "" { + t.Skip("MTAIL_AWSS3_TEST_BUCKET not set") + } + + region := "us-east-1" + + key := "testdata/yo.txt.gz" + + sourcename := fmt.Sprintf("s3://%s/%s/?Region=%s&Format=gzip", bucket, "testdata", region) + + ctx, cancel := context.WithCancel(context.Background()) + waker, awaken := waker.NewTest(ctx, 1, "stream") + fs, err := logstream.New(ctx, &wg, waker, sourcename, logstream.OneShotDisabled) + testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: key, Line: "yo"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + + time.Sleep(time.Second * 3) + + awaken(1, 1) // synchronise past first read + + cfg, _ := config.LoadDefaultConfig(context.Background()) + client := s3.NewFromConfig(cfg) + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + gz.Write([]byte("yo\n")) + gz.Close() + + _, err = client.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: &bucket, + Key: aws.String(key), + Body: bytes.NewReader(buf.Bytes()), + }) + + testutil.FatalIfErr(t, err) + + defer func() { + client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: &bucket, + Key: aws.String(key), + }) + }() + + awaken(1, 1) + + time.Sleep(time.Second * 1) + + cancel() + wg.Wait() + + checkLineDiff() + + if v := <-fs.Lines(); v != nil { + t.Errorf("expecting filestream to be complete because stopped") + } +} diff --git a/internal/tailer/logstream/kafka_test.go b/internal/tailer/logstream/kafka_test.go index b5275d101..ae4c7e7bf 100644 --- a/internal/tailer/logstream/kafka_test.go +++ b/internal/tailer/logstream/kafka_test.go @@ -26,8 +26,9 @@ func TestKafkaStreamRead(t *testing.T) { // refer to https://hub.docker.com/r/apache/kafka host := os.Getenv("MTAIL_KAFKA_TEST_HOST") if host == "" { - t.Log("use default kafka host") - host = "localhost:49092" + // t.Log("use default kafka host") + // host = "localhost:9092" + t.Skip("MTAIL_KAFKA_TEST_HOST not set") } topic := fmt.Sprintf("test-%d", rand.Intn(100)) From 6c44a37df7aa95550955490b92d7dc15d98fbffa Mon Sep 17 00:00:00 2001 From: wanmail Date: Mon, 13 Jan 2025 15:36:21 +0800 Subject: [PATCH 10/13] AWS S3: Add skipped object debug(v2) log --- internal/tailer/logstream/awss3.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/tailer/logstream/awss3.go b/internal/tailer/logstream/awss3.go index 6867b306d..76a7774fe 100644 --- a/internal/tailer/logstream/awss3.go +++ b/internal/tailer/logstream/awss3.go @@ -193,6 +193,7 @@ func (fs *s3Stream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker. // Skip files that are older than the time or do not match the pattern. if obj.LastModified.Before(fs.lastModified) || !fs.re.MatchString(*obj.Key) { + glog.V(2).Infof("stream(%s): skipping file (%s) (%v)", fs.sourcename, *obj.Key, obj.LastModified) continue } From 33a0473607964d280f170db52fbc078d70a96706 Mon Sep 17 00:00:00 2001 From: wanmail Date: Mon, 13 Jan 2025 15:40:42 +0800 Subject: [PATCH 11/13] AWS S3: docker build add ca update --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2798d5e9c..8fc15bb26 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,13 +3,13 @@ RUN apk add --update git make WORKDIR /go/src/github.com/google/mtail COPY . /go/src/github.com/google/mtail RUN make depclean && make install_deps && PREFIX=/go make STATIC=y -B install - +RUN apk add -U --no-cache ca-certificates FROM scratch COPY --from=builder /go/bin/mtail /usr/bin/mtail +COPY --from=alpine /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ ENTRYPOINT ["/usr/bin/mtail"] EXPOSE 3903 -WORKDIR /tmp ARG version=0.0.0-local From f610b64a7d7001165ca7f19340fa3f7c77f0ec12 Mon Sep 17 00:00:00 2001 From: wanmail Date: Mon, 13 Jan 2025 16:48:16 +0800 Subject: [PATCH 12/13] AWS S3: fix line reader loop --- internal/tailer/logstream/awss3.go | 46 ++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/internal/tailer/logstream/awss3.go b/internal/tailer/logstream/awss3.go index 76a7774fe..6d2dfa0ed 100644 --- a/internal/tailer/logstream/awss3.go +++ b/internal/tailer/logstream/awss3.go @@ -4,6 +4,7 @@ import ( "compress/gzip" "context" "errors" + "io" "net/url" "regexp" "strings" @@ -145,7 +146,7 @@ func newAWSS3Stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, return nil, err } - glog.V(2).Infof("newKafkaStream(%s): started stream", fs.sourcename) + glog.V(2).Infof("newAWSStream(%s): started stream", fs.sourcename) return fs, nil } @@ -235,22 +236,51 @@ func (fs *s3Stream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker. continue } + defer gr.Close() + lr = NewLineReader(*obj.Key, fs.lines, gr, defaultReadBufferSize, fs.cancel) default: lr = NewLineReader(*obj.Key, fs.lines, out.Body, defaultReadBufferSize, fs.cancel) } - n, err := lr.ReadAndSend(ctx) - - if n > 0 { - total += n + for { + n, err := lr.ReadAndSend(ctx) + + glog.V(2).Infof("stream(%s): read %d bytes, err is %v", fs.sourcename, n, err) + + if n > 0 { + total += n + + // No error implies there is more to read so restart the loop. + if err == nil && ctx.Err() == nil { + continue + } + } else if n == 0 && total > 0 { + // `pipe(7)` tells us "If all file descriptors referring to the + // write end of a fifo have been closed, then an attempt to + // read(2) from the fifo will see end-of-file (read(2) will + // return 0)." To avoid shutting down the stream at startup + // before any writer has connected to the fifo, condition on + // having read any bytes previously. + glog.V(2).Infof("stream(%s): exiting, 0 bytes read", fs.sourcename) + break + } - // No error implies there is more to read so restart the loop. - if err == nil && ctx.Err() == nil { - continue + // Test to see if we should exit. + if IsExitableError(err) { + // Because we've opened in nonblocking mode, this Read can return + // straight away. If there are no writers, it'll return EOF (per + // `pipe(7)` and `read(2)`.) This is expected when `mtail` is + // starting at system init as the writer may not be ready yet. + if !(errors.Is(err, io.EOF) && total == 0) { + glog.V(2).Infof("stream(%s): exiting, stream has error %s", fs.sourcename, err) + break + } } } + lr.Finish(ctx) + if oneShot == OneShotEnabled { glog.Infof("stream(%s): oneshot mode, exiting", fs.sourcename) return From c41dce2922e35ccd8656aee2e3bff68869e0e5aa Mon Sep 17 00:00:00 2001 From: wanmail Date: Mon, 13 Jan 2025 19:00:55 +0800 Subject: [PATCH 13/13] AWS S3: fix invalid prefix --- internal/tailer/logstream/awss3.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/tailer/logstream/awss3.go b/internal/tailer/logstream/awss3.go index 6d2dfa0ed..0d89836eb 100644 --- a/internal/tailer/logstream/awss3.go +++ b/internal/tailer/logstream/awss3.go @@ -168,7 +168,10 @@ func (fs *s3Stream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker. for { input := &s3.ListObjectsV2Input{ Bucket: &fs.bucket, - Prefix: &fs.prefix, + } + + if fs.prefix != "" { + input.Prefix = &fs.prefix } if fs.lastKey != "" {