diff --git a/Dockerfile b/Dockerfile index 2798d5e9c..8fc15bb26 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,13 +3,13 @@ RUN apk add --update git make WORKDIR /go/src/github.com/google/mtail COPY . /go/src/github.com/google/mtail RUN make depclean && make install_deps && PREFIX=/go make STATIC=y -B install - +RUN apk add -U --no-cache ca-certificates FROM scratch COPY --from=builder /go/bin/mtail /usr/bin/mtail +COPY --from=alpine /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ ENTRYPOINT ["/usr/bin/mtail"] EXPOSE 3903 -WORKDIR /tmp ARG version=0.0.0-local diff --git a/cmd/config-gen/main.go b/cmd/config-gen/main.go new file mode 100644 index 000000000..bc204e7ea --- /dev/null +++ b/cmd/config-gen/main.go @@ -0,0 +1,135 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "os" + "reflect" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/segmentio/kafka-go" +) + +const ( + KafkaConfig = "KafkaConfig" + S3Config = "S3Config" +) + +var ( + configType string + targetFile string + targetModule string +) + +func init() { + flag.StringVar(&configType, "type", KafkaConfig, "The type of config to generate") + flag.StringVar(&targetFile, "file", "config_generated.go", "The target file to generate the config from") + flag.StringVar(&targetModule, "module", "main", "The target module name") +} + +func main() { + flag.Parse() + + ims := make(map[string]bool) + + ims["net/url"] = true + + var p interface{} + + // file header buffer + hb := new(bytes.Buffer) + + // function buffer + fb := new(bytes.Buffer) + + fmt.Fprintf(hb, "// Code generated by github.com/wanmail/mtail/cmd/config-gen; DO NOT EDIT.\n") + fmt.Fprintf(hb, "package %s\n\n", targetModule) + + switch configType { + case KafkaConfig: + ims["github.com/segmentio/kafka-go"] = true + p = kafka.ReaderConfig{} + fmt.Fprintf(fb, "func parse%s(u *url.URL, config *kafka.ReaderConfig) error {\n\n", configType) + case S3Config: + ims["github.com/aws/aws-sdk-go-v2/aws"] = true + p = aws.Config{} + fmt.Fprintf(fb, "func parse%s(u *url.URL, config *aws.Config) error {\n\n", configType) + } + + v := reflect.ValueOf(p) + t := v.Type() + + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + + switch field.Type.Kind() { + case reflect.String: + switch field.Type.String() { + case "string": + fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(fb, " config.%s = %s\n", field.Name, field.Name) + fmt.Fprintf(fb, " }\n\n") + default: + fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(fb, " config.%s = %s(%s)\n", field.Name, field.Type.String(), field.Name) + fmt.Fprintf(fb, " }\n\n") + } + + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + ims["strconv"] = true + fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(fb, " i, err := strconv.Atoi(%s)\n", field.Name) + fmt.Fprintf(fb, " if err != nil {\n") + fmt.Fprintf(fb, " return err\n") + fmt.Fprintf(fb, " }\n") + switch field.Type.String() { + case "int": + fmt.Fprintf(fb, " config.%s = i\n", field.Name) + case "time.Duration": + ims["time"] = true + fmt.Fprintf(fb, " config.%s = time.Second * time.Duration(i)\n", field.Name) + default: + fmt.Fprintf(fb, " config.%s = %s(i)\n", field.Name, field.Type.String()) + } + fmt.Fprintf(fb, " }\n\n") + + case reflect.Bool: + fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(fb, " b, err := strconv.ParseBool(%s)\n", field.Name) + fmt.Fprintf(fb, " if err != nil {\n") + fmt.Fprintf(fb, " return err\n") + fmt.Fprintf(fb, " }\n") + fmt.Fprintf(fb, " config.%s = b\n", field.Name) + fmt.Fprintf(fb, " }\n\n") + + case reflect.Slice: + if field.Type.Elem().Kind() == reflect.String { + ims["strings"] = true + fmt.Fprintf(fb, " if %s := u.Query().Get(\"%s\"); %s != \"\" {\n", field.Name, field.Name, field.Name) + fmt.Fprintf(fb, " config.%s = strings.Split(%s, \",\")\n", field.Name, field.Name) + fmt.Fprintf(fb, " }\n\n") + } + default: + fmt.Fprintf(fb, " // %s is not supported\n\n", field.Name) + + } + } + + fmt.Fprintf(hb, "import (\n") + for im := range ims { + fmt.Fprintf(hb, " \"%s\"\n", im) + } + fmt.Fprintf(hb, ")\n\n") + + fmt.Fprintf(fb, " return nil\n") + fmt.Fprintf(fb, "}\n") + + hb.Write(fb.Bytes()) + + err := os.WriteFile(targetFile, hb.Bytes(), 0644) + if err != nil { + fmt.Println("Failed to write file:", err) + os.Exit(1) + } +} diff --git a/docs/Deploying.md b/docs/Deploying.md index ae8773e2e..1277e1f99 100644 --- a/docs/Deploying.md +++ b/docs/Deploying.md @@ -67,6 +67,16 @@ default. Example: `mtail --progs /etc/mtail --logs /var/log/syslog --poll_interval 250ms --poll_log_interval 250ms` +### Consume data in Apache Kafka + +Use `--logs` flag to read data from Apache Kafka. + +You need to convert kafka configuration into URL format. + +You can refer to [parseKafkaURL](../internal//tailer/logstream/kafka.go#L26) function to write URL. + +Example: `mtail --progs /etc/mtail --logs "kafka://test-group@localhost:9092/test-topic"` + ### Setting garbage collection intervals `mtail` accumulates metrics and log files during its operation. By default, diff --git a/go.mod b/go.mod index 3ffe06257..636dddbfc 100644 --- a/go.mod +++ b/go.mod @@ -4,27 +4,47 @@ go 1.21.1 require ( contrib.go.opencensus.io/exporter/jaeger v0.2.1 + github.com/aws/aws-sdk-go-v2 v1.32.7 + github.com/aws/aws-sdk-go-v2/config v1.28.7 + github.com/aws/aws-sdk-go-v2/service/s3 v1.71.1 + github.com/aws/smithy-go v1.22.1 github.com/golang/glog v1.2.2 github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/google/go-cmp v0.6.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.4 github.com/prometheus/common v0.60.0 + github.com/segmentio/kafka-go v0.4.47 go.opencensus.io v0.24.0 golang.org/x/sys v0.26.0 ) require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.48 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.26 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.26 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.26 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.7 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.8 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.7 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.3 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/uber/jaeger-client-go v2.25.0+incompatible // indirect - golang.org/x/sync v0.7.0 // indirect + golang.org/x/sync v0.8.0 // indirect google.golang.org/api v0.105.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/grpc v1.56.3 // indirect diff --git a/go.sum b/go.sum index ef3523c74..02f653344 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,42 @@ contrib.go.opencensus.io/exporter/jaeger v0.2.1/go.mod h1:Y8IsLgdxqh1QxYxPC5IgXV dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/aws/aws-sdk-go-v2 v1.32.7 h1:ky5o35oENWi0JYWUZkB7WYvVPP+bcRF5/Iq7JWSb5Rw= +github.com/aws/aws-sdk-go-v2 v1.32.7/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc= +github.com/aws/aws-sdk-go-v2/config v1.28.7 h1:GduUnoTXlhkgnxTD93g1nv4tVPILbdNQOzav+Wpg7AE= +github.com/aws/aws-sdk-go-v2/config v1.28.7/go.mod h1:vZGX6GVkIE8uECSUHB6MWAUsd4ZcG2Yq/dMa4refR3M= +github.com/aws/aws-sdk-go-v2/credentials v1.17.48 h1:IYdLD1qTJ0zanRavulofmqut4afs45mOWEI+MzZtTfQ= +github.com/aws/aws-sdk-go-v2/credentials v1.17.48/go.mod h1:tOscxHN3CGmuX9idQ3+qbkzrjVIx32lqDSU1/0d/qXs= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22 h1:kqOrpojG71DxJm/KDPO+Z/y1phm1JlC8/iT+5XRmAn8= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22/go.mod h1:NtSFajXVVL8TA2QNngagVZmUtXciyrHOt7xgz4faS/M= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.26 h1:I/5wmGMffY4happ8NOCuIUEWGUvvFp5NSeQcXl9RHcI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.26/go.mod h1:FR8f4turZtNy6baO0KJ5FJUmXH/cSkI9fOngs0yl6mA= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.26 h1:zXFLuEuMMUOvEARXFUVJdfqZ4bvvSgdGRq/ATcrQxzM= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.26/go.mod h1:3o2Wpy0bogG1kyOPrgkXA8pgIfEEv0+m19O9D5+W8y8= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.26 h1:GeNJsIFHB+WW5ap2Tec4K6dzcVTsRbsT1Lra46Hv9ME= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.26/go.mod h1:zfgMpwHDXX2WGoG84xG2H+ZlPTkJUU4YUvx2svLQYWo= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.7 h1:tB4tNw83KcajNAzaIMhkhVI2Nt8fAZd5A5ro113FEMY= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.7/go.mod h1:lvpyBGkZ3tZ9iSsUIcC2EWp+0ywa7aK3BLT+FwZi+mQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.7 h1:8eUsivBQzZHqe/3FE+cqwfH+0p5Jo8PFM/QYQSmeZ+M= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.7/go.mod h1:kLPQvGUmxn/fqiCrDeohwG33bq2pQpGeY62yRO6Nrh0= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.7 h1:Hi0KGbrnr57bEHWM0bJ1QcBzxLrL/k2DHvGYhb8+W1w= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.7/go.mod h1:wKNgWgExdjjrm4qvfbTorkvocEstaoDl4WCvGfeCy9c= +github.com/aws/aws-sdk-go-v2/service/s3 v1.71.1 h1:aOVVZJgWbaH+EJYPvEgkNhCEbXXvH7+oML36oaPK3zE= +github.com/aws/aws-sdk-go-v2/service/s3 v1.71.1/go.mod h1:r+xl5yzMk9083rMR+sJ5TYj9Tihvf/l1oxzZXDgGj2Q= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.8 h1:CvuUmnXI7ebaUAhbJcDy9YQx8wHR69eZ9I7q5hszt/g= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.8/go.mod h1:XDeGv1opzwm8ubxddF0cgqkZWsyOtw4lr6dxwmb6YQg= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.7 h1:F2rBfNAL5UyswqoeWv9zs74N/NanhK16ydHW1pahX6E= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.7/go.mod h1:JfyQ0g2JG8+Krq0EuZNnRwX0mU0HrwY/tG6JNfcqh4k= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.3 h1:Xgv/hyNgvLda/M9l9qxXc4UFSgppnRczLxlMs5Ae/QY= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.3/go.mod h1:5Gn+d+VaaRgsjewpMvGazt0WfcFO+Md4wLOuBfGR9Bc= +github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= +github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -100,6 +136,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -109,6 +146,9 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -123,6 +163,8 @@ github.com/prometheus/common v0.60.0/go.mod h1:h0LYf1R1deLSKtD4Vdg8gy4RuOvENW2J/ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0= +github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -134,7 +176,14 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U= github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -147,6 +196,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -176,6 +227,8 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -194,6 +247,11 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -209,8 +267,10 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -231,13 +291,30 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -273,6 +350,8 @@ golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/tailer/logstream/awss3.go b/internal/tailer/logstream/awss3.go new file mode 100644 index 000000000..0d89836eb --- /dev/null +++ b/internal/tailer/logstream/awss3.go @@ -0,0 +1,309 @@ +package logstream + +import ( + "compress/gzip" + "context" + "errors" + "io" + "net/url" + "regexp" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + cfg "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/smithy-go" + "github.com/golang/glog" + "github.com/google/mtail/internal/logline" + "github.com/google/mtail/internal/waker" +) + +const ( + AWS3Scheme = "s3" +) + +func IsAWSExitableError(err error) bool { + if err == nil { + return false + } + + var ae smithy.APIError + + if errors.As(err, &ae) { + switch ae.ErrorCode() { + case "AccessDenied", "UnauthorizedOperation": + return true + default: + return false + } + } + + return false +} + +const ( + PLAIN = "plain" + GZIP = "gzip" +) + +type s3BucketConfig struct { + config aws.Config + + bucket string + + prefix string + + pattern string + re *regexp.Regexp + + // init time + lastModified time.Time + + lastKey string + + format string +} + +type s3Stream struct { + streamBase + s3BucketConfig + + cancel context.CancelFunc +} + +//go:generate go run ./../../../cmd/config-gen/main.go -type S3Config -file s3_config_generated.go -module logstream + +func parseS3URL(u *url.URL) (s3BucketConfig, error) { + config := s3BucketConfig{} + + if u.Host == "" { + return config, errors.New("S3 URL must contain a host as bucket") + } + config.bucket = u.Host + config.prefix = strings.TrimPrefix(u.Path, "/") + + if pattern := u.Query().Get("Pattern"); pattern != "" { + config.pattern = pattern + } else { + config.pattern = ".*" + } + + re, err := regexp.Compile(config.pattern) + if err != nil { + return config, err + } + config.re = re + + if lastModified := u.Query().Get("LastModified"); lastModified != "" { + t, err := time.Parse(time.RFC3339, lastModified) + if err != nil { + return config, err + } + config.lastModified = t + } else { + config.lastModified = time.Now() + } + + if lastKey := u.Query().Get("LastKey"); lastKey != "" { + config.lastKey = lastKey + } + + if format := u.Query().Get("Format"); format != "" { + config.format = format + } else { + config.format = PLAIN + } + + config.config, _ = cfg.LoadDefaultConfig(context.TODO()) + + if err := parseS3Config(u, &config.config); err != nil { + return config, err + } + + return config, nil +} + +func newAWSS3Stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, u *url.URL, oneShot OneShotMode) (LogStream, error) { + glog.V(2).Infof("newAWSS3Stream(%s): config", u.String()) + config, err := parseS3URL(u) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + fs := &s3Stream{ + s3BucketConfig: config, + cancel: cancel, + streamBase: streamBase{ + sourcename: u.Host, + lines: make(chan *logline.LogLine), + }, + } + + if err := fs.stream(ctx, wg, waker, oneShot); err != nil { + return nil, err + } + + glog.V(2).Infof("newAWSStream(%s): started stream", fs.sourcename) + + return fs, nil +} + +func (fs *s3Stream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error { + client := s3.NewFromConfig(fs.config) + + var total int + wg.Add(1) + go func() { + defer wg.Done() + defer func() { + glog.V(2).Infof("stream(%s): read total %d bytes", fs.sourcename, total) + glog.Info("stream(%s): last key %s", fs.lastKey) + close(fs.lines) + fs.cancel() + }() + + for { + input := &s3.ListObjectsV2Input{ + Bucket: &fs.bucket, + } + + if fs.prefix != "" { + input.Prefix = &fs.prefix + } + + if fs.lastKey != "" { + input.StartAfter = &fs.lastKey + } + + paginator := s3.NewListObjectsV2Paginator(client, input) + + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + if IsAWSExitableError(err) { + glog.V(2).Infof("stream(%s): exiting, conn has AWS error %s", fs.sourcename, err) + return + } + + logErrors.Add(fs.sourcename, 1) + glog.Infof("stream(%s): error listing objects: %v", fs.sourcename, err) + break + } + for _, obj := range page.Contents { + fs.lastKey = *obj.Key + + // Skip files that are older than the time or do not match the pattern. + if obj.LastModified.Before(fs.lastModified) || !fs.re.MatchString(*obj.Key) { + glog.V(2).Infof("stream(%s): skipping file (%s) (%v)", fs.sourcename, *obj.Key, obj.LastModified) + continue + } + + out, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &fs.bucket, + Key: obj.Key, + }) + + if err != nil { + if IsAWSExitableError(err) { + glog.V(2).Infof("stream(%s): exiting, conn has AWS error %s", fs.sourcename, err) + return + } + + if ctx.Err() != nil { + // The context has been cancelled, so exit directly. + glog.V(2).Infof("stream(%s): context cancelled, exiting directly", fs.sourcename) + return + } + + logErrors.Add(fs.sourcename, 1) + glog.Infof("stream(%s): error getting object: %v", fs.sourcename, err) + continue + } + + logOpens.Add(fs.sourcename, 1) + glog.V(2).Infof("stream(%s): opened new file (%s)", fs.sourcename, *obj.Key) + + defer out.Body.Close() + + var lr *LineReader + + switch fs.format { + case GZIP: + gr, err := gzip.NewReader(out.Body) + if err != nil { + logErrors.Add(fs.sourcename, 1) + glog.Infof("stream(%s): error creating gzip reader: %v", fs.sourcename, err) + continue + } + + defer gr.Close() + + lr = NewLineReader(*obj.Key, fs.lines, gr, defaultReadBufferSize, fs.cancel) + default: + lr = NewLineReader(*obj.Key, fs.lines, out.Body, defaultReadBufferSize, fs.cancel) + } + + for { + n, err := lr.ReadAndSend(ctx) + + glog.V(2).Infof("stream(%s): read %d bytes, err is %v", fs.sourcename, n, err) + + if n > 0 { + total += n + + // No error implies there is more to read so restart the loop. + if err == nil && ctx.Err() == nil { + continue + } + } else if n == 0 && total > 0 { + // `pipe(7)` tells us "If all file descriptors referring to the + // write end of a fifo have been closed, then an attempt to + // read(2) from the fifo will see end-of-file (read(2) will + // return 0)." To avoid shutting down the stream at startup + // before any writer has connected to the fifo, condition on + // having read any bytes previously. + glog.V(2).Infof("stream(%s): exiting, 0 bytes read", fs.sourcename) + break + } + + // Test to see if we should exit. + if IsExitableError(err) { + // Because we've opened in nonblocking mode, this Read can return + // straight away. If there are no writers, it'll return EOF (per + // `pipe(7)` and `read(2)`.) This is expected when `mtail` is + // starting at system init as the writer may not be ready yet. + if !(errors.Is(err, io.EOF) && total == 0) { + glog.V(2).Infof("stream(%s): exiting, stream has error %s", fs.sourcename, err) + break + } + } + } + + lr.Finish(ctx) + + if oneShot == OneShotEnabled { + glog.Infof("stream(%s): oneshot mode, exiting", fs.sourcename) + return + } + } + } + + // Wait for wakeup or termination. + glog.V(2).Infof("stream(%s): waiting", fs.sourcename) + select { + case <-ctx.Done(): + // Exit directly. + glog.V(2).Infof("stream(%s): context cancelled, exiting directly", fs.sourcename) + return + case <-waker.Wake(): + // sleep until next Wake() + glog.V(2).Infof("stream(%s): Wake received", fs.sourcename) + } + } + }() + + return nil +} diff --git a/internal/tailer/logstream/awss3_test.go b/internal/tailer/logstream/awss3_test.go new file mode 100644 index 000000000..52018677c --- /dev/null +++ b/internal/tailer/logstream/awss3_test.go @@ -0,0 +1,143 @@ +package logstream_test + +import ( + "bytes" + "compress/gzip" + "context" + "fmt" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/google/mtail/internal/logline" + "github.com/google/mtail/internal/tailer/logstream" + "github.com/google/mtail/internal/testutil" + "github.com/google/mtail/internal/waker" +) + +func TestS3StreamRead(t *testing.T) { + var wg sync.WaitGroup + + bucket := os.Getenv("MTAIL_AWSS3_TEST_BUCKET") + if bucket == "" { + t.Skip("MTAIL_AWSS3_TEST_BUCKET not set") + } + + region := "us-east-1" + + key := "testdata/yo.txt" + + sourcename := fmt.Sprintf("s3://%s/%s/?Region=%s", bucket, "testdata", region) + + ctx, cancel := context.WithCancel(context.Background()) + waker, awaken := waker.NewTest(ctx, 1, "stream") + fs, err := logstream.New(ctx, &wg, waker, sourcename, logstream.OneShotDisabled) + testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: key, Line: "yo"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + + time.Sleep(time.Second * 3) + + awaken(1, 1) // synchronise past first read + + cfg, _ := config.LoadDefaultConfig(context.Background()) + client := s3.NewFromConfig(cfg) + _, err = client.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: &bucket, + Key: aws.String(key), + Body: strings.NewReader("yo\n"), + }) + testutil.FatalIfErr(t, err) + + defer func() { + client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: &bucket, + Key: aws.String(key), + }) + }() + + awaken(1, 1) + + time.Sleep(time.Second * 1) + + cancel() + wg.Wait() + + checkLineDiff() + + if v := <-fs.Lines(); v != nil { + t.Errorf("expecting filestream to be complete because stopped") + } +} + +func TestS3StreamReadGzip(t *testing.T) { + var wg sync.WaitGroup + + bucket := os.Getenv("MTAIL_AWSS3_TEST_BUCKET") + if bucket == "" { + t.Skip("MTAIL_AWSS3_TEST_BUCKET not set") + } + + region := "us-east-1" + + key := "testdata/yo.txt.gz" + + sourcename := fmt.Sprintf("s3://%s/%s/?Region=%s&Format=gzip", bucket, "testdata", region) + + ctx, cancel := context.WithCancel(context.Background()) + waker, awaken := waker.NewTest(ctx, 1, "stream") + fs, err := logstream.New(ctx, &wg, waker, sourcename, logstream.OneShotDisabled) + testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: key, Line: "yo"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + + time.Sleep(time.Second * 3) + + awaken(1, 1) // synchronise past first read + + cfg, _ := config.LoadDefaultConfig(context.Background()) + client := s3.NewFromConfig(cfg) + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + gz.Write([]byte("yo\n")) + gz.Close() + + _, err = client.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: &bucket, + Key: aws.String(key), + Body: bytes.NewReader(buf.Bytes()), + }) + + testutil.FatalIfErr(t, err) + + defer func() { + client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: &bucket, + Key: aws.String(key), + }) + }() + + awaken(1, 1) + + time.Sleep(time.Second * 1) + + cancel() + wg.Wait() + + checkLineDiff() + + if v := <-fs.Lines(); v != nil { + t.Errorf("expecting filestream to be complete because stopped") + } +} diff --git a/internal/tailer/logstream/kafka.go b/internal/tailer/logstream/kafka.go new file mode 100644 index 000000000..5c4790c96 --- /dev/null +++ b/internal/tailer/logstream/kafka.go @@ -0,0 +1,121 @@ +package logstream + +import ( + "context" + "errors" + "net/url" + "strings" + "sync" + + "github.com/golang/glog" + "github.com/google/mtail/internal/logline" + "github.com/segmentio/kafka-go" +) + +const ( + KafkaScheme = "kafka" +) + +type kafkaStream struct { + streamBase + config kafka.ReaderConfig + + cancel context.CancelFunc +} + +//go:generate go run ./../../../cmd/config-gen/main.go -type KafkaConfig -file kafka_config_generated.go -module logstream + +func parseKafkaURL(u *url.URL) (kafka.ReaderConfig, error) { + config := kafka.ReaderConfig{ + Brokers: []string{u.Host}, + Topic: strings.TrimPrefix(u.Path, "/"), + } + + if u.User.Username() != "" { + config.GroupID = u.User.Username() + } + + if err := parseKafkaConfig(u, &config); err != nil { + return config, err + } + + return config, config.Validate() +} + +func newKafkaStream(ctx context.Context, wg *sync.WaitGroup, u *url.URL, oneShot OneShotMode) (LogStream, error) { + glog.V(2).Infof("newKafkaStream(%s): config", u.String()) + config, err := parseKafkaURL(u) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + ks := &kafkaStream{ + cancel: cancel, + config: config, + streamBase: streamBase{ + sourcename: u.String(), + lines: make(chan *logline.LogLine), + }, + } + + if err := ks.stream(ctx, wg, oneShot); err != nil { + return nil, err + } + + glog.V(2).Infof("newKafkaStream(%s): started stream", ks.sourcename) + + return ks, nil +} + +func (ks *kafkaStream) stream(ctx context.Context, wg *sync.WaitGroup, oneShot OneShotMode) error { + r := kafka.NewReader(ks.config) + + glog.V(2).Infof("stream(%s): opened new reader", ks.sourcename) + + var total int + wg.Add(1) + go func() { + defer wg.Done() + defer func() { + glog.V(2).Infof("stream(%s): read total %d bytes", ks.sourcename, total) + glog.V(2).Infof("stream(%s): closing kafka connection", ks.sourcename) + if err := r.Close(); err != nil { + logErrors.Add(ks.sourcename, 1) + glog.Infof("stream(%s): closing connection: %v", ks.sourcename, err) + } + logCloses.Add(ks.sourcename, 1) + }() + + for { + + m, err := r.ReadMessage(ctx) + + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + glog.V(2).Infof("stream(%s): context cancelled or deadline exceeded", ks.sourcename) + break + } + + if IsExitableError(err) { + glog.V(2).Infof("stream(%s): exiting, conn has error %s", ks.sourcename, err) + break + } + + if err != nil { + logErrors.Add(ks.sourcename, 1) + glog.V(2).Infof("stream(%s): read error: %v", ks.sourcename, err) + } + + logLines.Add(ks.sourcename, 1) + ks.lines <- logline.New(ctx, ks.sourcename, string(m.Value)) + + if oneShot == OneShotEnabled { + glog.Infof("stream(%s): read one in one shot mode, exiting. Sample data: %s = %s", ks.sourcename, string(m.Key), string(m.Value)) + break + } + } + close(ks.lines) + }() + + return nil +} diff --git a/internal/tailer/logstream/kafka_config_generated.go b/internal/tailer/logstream/kafka_config_generated.go new file mode 100644 index 000000000..05fed1be3 --- /dev/null +++ b/internal/tailer/logstream/kafka_config_generated.go @@ -0,0 +1,206 @@ +// Code generated by github.com/wanqian/cspm-go/codegen; DO NOT EDIT. +package logstream + +import ( + "strings" + "time" + "strconv" + "net/url" + "github.com/segmentio/kafka-go" +) + + +func parseKafkaConfig(u *url.URL, config *kafka.ReaderConfig) error { + + if Brokers := u.Query().Get("Brokers"); Brokers != "" { + config.Brokers = strings.Split(Brokers, ",") + } + + if GroupID := u.Query().Get("GroupID"); GroupID != "" { + config.GroupID = GroupID + } + + if GroupTopics := u.Query().Get("GroupTopics"); GroupTopics != "" { + config.GroupTopics = strings.Split(GroupTopics, ",") + } + + if Topic := u.Query().Get("Topic"); Topic != "" { + config.Topic = Topic + } + + if Partition := u.Query().Get("Partition"); Partition != "" { + i, err := strconv.Atoi(Partition) + if err != nil { + return err + } + config.Partition = i + } + + // Dialer is not supported + + if QueueCapacity := u.Query().Get("QueueCapacity"); QueueCapacity != "" { + i, err := strconv.Atoi(QueueCapacity) + if err != nil { + return err + } + config.QueueCapacity = i + } + + if MinBytes := u.Query().Get("MinBytes"); MinBytes != "" { + i, err := strconv.Atoi(MinBytes) + if err != nil { + return err + } + config.MinBytes = i + } + + if MaxBytes := u.Query().Get("MaxBytes"); MaxBytes != "" { + i, err := strconv.Atoi(MaxBytes) + if err != nil { + return err + } + config.MaxBytes = i + } + + if MaxWait := u.Query().Get("MaxWait"); MaxWait != "" { + i, err := strconv.Atoi(MaxWait) + if err != nil { + return err + } + config.MaxWait = time.Second * time.Duration(i) + } + + if ReadBatchTimeout := u.Query().Get("ReadBatchTimeout"); ReadBatchTimeout != "" { + i, err := strconv.Atoi(ReadBatchTimeout) + if err != nil { + return err + } + config.ReadBatchTimeout = time.Second * time.Duration(i) + } + + if ReadLagInterval := u.Query().Get("ReadLagInterval"); ReadLagInterval != "" { + i, err := strconv.Atoi(ReadLagInterval) + if err != nil { + return err + } + config.ReadLagInterval = time.Second * time.Duration(i) + } + + if HeartbeatInterval := u.Query().Get("HeartbeatInterval"); HeartbeatInterval != "" { + i, err := strconv.Atoi(HeartbeatInterval) + if err != nil { + return err + } + config.HeartbeatInterval = time.Second * time.Duration(i) + } + + if CommitInterval := u.Query().Get("CommitInterval"); CommitInterval != "" { + i, err := strconv.Atoi(CommitInterval) + if err != nil { + return err + } + config.CommitInterval = time.Second * time.Duration(i) + } + + if PartitionWatchInterval := u.Query().Get("PartitionWatchInterval"); PartitionWatchInterval != "" { + i, err := strconv.Atoi(PartitionWatchInterval) + if err != nil { + return err + } + config.PartitionWatchInterval = time.Second * time.Duration(i) + } + + if WatchPartitionChanges := u.Query().Get("WatchPartitionChanges"); WatchPartitionChanges != "" { + b, err := strconv.ParseBool(WatchPartitionChanges) + if err != nil { + return err + } + config.WatchPartitionChanges = b + } + + if SessionTimeout := u.Query().Get("SessionTimeout"); SessionTimeout != "" { + i, err := strconv.Atoi(SessionTimeout) + if err != nil { + return err + } + config.SessionTimeout = time.Second * time.Duration(i) + } + + if RebalanceTimeout := u.Query().Get("RebalanceTimeout"); RebalanceTimeout != "" { + i, err := strconv.Atoi(RebalanceTimeout) + if err != nil { + return err + } + config.RebalanceTimeout = time.Second * time.Duration(i) + } + + if JoinGroupBackoff := u.Query().Get("JoinGroupBackoff"); JoinGroupBackoff != "" { + i, err := strconv.Atoi(JoinGroupBackoff) + if err != nil { + return err + } + config.JoinGroupBackoff = time.Second * time.Duration(i) + } + + if RetentionTime := u.Query().Get("RetentionTime"); RetentionTime != "" { + i, err := strconv.Atoi(RetentionTime) + if err != nil { + return err + } + config.RetentionTime = time.Second * time.Duration(i) + } + + if StartOffset := u.Query().Get("StartOffset"); StartOffset != "" { + i, err := strconv.Atoi(StartOffset) + if err != nil { + return err + } + config.StartOffset = int64(i) + } + + if ReadBackoffMin := u.Query().Get("ReadBackoffMin"); ReadBackoffMin != "" { + i, err := strconv.Atoi(ReadBackoffMin) + if err != nil { + return err + } + config.ReadBackoffMin = time.Second * time.Duration(i) + } + + if ReadBackoffMax := u.Query().Get("ReadBackoffMax"); ReadBackoffMax != "" { + i, err := strconv.Atoi(ReadBackoffMax) + if err != nil { + return err + } + config.ReadBackoffMax = time.Second * time.Duration(i) + } + + // Logger is not supported + + // ErrorLogger is not supported + + if IsolationLevel := u.Query().Get("IsolationLevel"); IsolationLevel != "" { + i, err := strconv.Atoi(IsolationLevel) + if err != nil { + return err + } + config.IsolationLevel = kafka.IsolationLevel(i) + } + + if MaxAttempts := u.Query().Get("MaxAttempts"); MaxAttempts != "" { + i, err := strconv.Atoi(MaxAttempts) + if err != nil { + return err + } + config.MaxAttempts = i + } + + if OffsetOutOfRangeError := u.Query().Get("OffsetOutOfRangeError"); OffsetOutOfRangeError != "" { + b, err := strconv.ParseBool(OffsetOutOfRangeError) + if err != nil { + return err + } + config.OffsetOutOfRangeError = b + } + + return nil +} diff --git a/internal/tailer/logstream/kafka_test.go b/internal/tailer/logstream/kafka_test.go new file mode 100644 index 000000000..ae4c7e7bf --- /dev/null +++ b/internal/tailer/logstream/kafka_test.go @@ -0,0 +1,77 @@ +package logstream_test + +import ( + "context" + "fmt" + "math/rand" + "os" + "sync" + "testing" + "time" + + "github.com/google/mtail/internal/logline" + "github.com/google/mtail/internal/tailer/logstream" + "github.com/google/mtail/internal/testutil" + "github.com/google/mtail/internal/waker" + "github.com/segmentio/kafka-go" +) + +func TestKafkaStreamRead(t *testing.T) { + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + waker, _ := waker.NewTest(ctx, 1, "stream") + + // start kafka test server with docker + // refer to https://hub.docker.com/r/apache/kafka + host := os.Getenv("MTAIL_KAFKA_TEST_HOST") + if host == "" { + // t.Log("use default kafka host") + // host = "localhost:9092" + t.Skip("MTAIL_KAFKA_TEST_HOST not set") + } + + topic := fmt.Sprintf("test-%d", rand.Intn(100)) + + conn, err := kafka.DialLeader(ctx, "tcp", host, topic, 0) + if err != nil { + testutil.FatalIfErr(t, err) + } + defer conn.Close() + + // err = testutil.CreateTopic(conn, topic) + // if err != nil { + // testutil.FatalIfErr(t, err) + // } + + consumerGroup := fmt.Sprintf("mtail-test-%d", rand.Intn(100)) + + msg := "yo" + + sourcename := fmt.Sprintf("%s://%s@%s/%s", logstream.KafkaScheme, consumerGroup, host, topic) + + t.Log("sourcename", sourcename) + + ks, err := logstream.New(ctx, &wg, waker, sourcename, logstream.OneShotDisabled) + testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.Background(), Filename: sourcename, Line: msg}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ks.Lines()) + + // write to kafka + n, err := conn.WriteMessages(kafka.Message{Topic: topic, Value: []byte(msg)}) + testutil.FatalIfErr(t, err) + t.Log(n) + + time.Sleep(time.Second * 1) + cancel() + wg.Wait() + + checkLineDiff() + + if v := <-ks.Lines(); v != nil { + t.Errorf("expecting filestream to be complete because stopped") + } +} diff --git a/internal/tailer/logstream/logstream.go b/internal/tailer/logstream/logstream.go index 7dbd5314a..3036b2b34 100644 --- a/internal/tailer/logstream/logstream.go +++ b/internal/tailer/logstream/logstream.go @@ -83,6 +83,10 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st return newSocketStream(ctx, wg, waker, u.Scheme, u.Host, oneShot) case "udp": return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, oneShot) + case KafkaScheme: + return newKafkaStream(ctx, wg, u, oneShot) + case AWS3Scheme: + return newAWSS3Stream(ctx, wg, waker, u, oneShot) case "", "file": path = u.Path } diff --git a/internal/tailer/logstream/s3_config_generated.go b/internal/tailer/logstream/s3_config_generated.go new file mode 100644 index 000000000..bb72965f4 --- /dev/null +++ b/internal/tailer/logstream/s3_config_generated.go @@ -0,0 +1,77 @@ +// Code generated by github.com/wanmail/mtail/cmd/config-gen; DO NOT EDIT. +package logstream + +import ( + "net/url" + "github.com/aws/aws-sdk-go-v2/aws" + "strconv" +) + +func parseS3Config(u *url.URL, config *aws.Config) error { + + if Region := u.Query().Get("Region"); Region != "" { + config.Region = Region + } + + // Credentials is not supported + + // BearerAuthTokenProvider is not supported + + // HTTPClient is not supported + + // EndpointResolver is not supported + + // EndpointResolverWithOptions is not supported + + if RetryMaxAttempts := u.Query().Get("RetryMaxAttempts"); RetryMaxAttempts != "" { + i, err := strconv.Atoi(RetryMaxAttempts) + if err != nil { + return err + } + config.RetryMaxAttempts = i + } + + if RetryMode := u.Query().Get("RetryMode"); RetryMode != "" { + config.RetryMode = aws.RetryMode(RetryMode) + } + + // Retryer is not supported + + // Logger is not supported + + // ClientLogMode is not supported + + if DefaultsMode := u.Query().Get("DefaultsMode"); DefaultsMode != "" { + config.DefaultsMode = aws.DefaultsMode(DefaultsMode) + } + + // RuntimeEnvironment is not supported + + if AppID := u.Query().Get("AppID"); AppID != "" { + config.AppID = AppID + } + + // BaseEndpoint is not supported + + if DisableRequestCompression := u.Query().Get("DisableRequestCompression"); DisableRequestCompression != "" { + b, err := strconv.ParseBool(DisableRequestCompression) + if err != nil { + return err + } + config.DisableRequestCompression = b + } + + if RequestMinCompressSizeBytes := u.Query().Get("RequestMinCompressSizeBytes"); RequestMinCompressSizeBytes != "" { + i, err := strconv.Atoi(RequestMinCompressSizeBytes) + if err != nil { + return err + } + config.RequestMinCompressSizeBytes = int64(i) + } + + if AccountIDEndpointMode := u.Query().Get("AccountIDEndpointMode"); AccountIDEndpointMode != "" { + config.AccountIDEndpointMode = aws.AccountIDEndpointMode(AccountIDEndpointMode) + } + + return nil +} diff --git a/internal/tailer/tail.go b/internal/tailer/tail.go index cba63ffa9..b2e33ad05 100644 --- a/internal/tailer/tail.go +++ b/internal/tailer/tail.go @@ -194,7 +194,7 @@ func (t *Tailer) AddPattern(pattern string) error { default: glog.V(2).Infof("AddPattern(%v): %v in path pattern %q, treating as path", pattern, ErrUnsupportedURLScheme, u.Scheme) // Leave path alone per log message - case "unix", "unixgram", "tcp", "udp": + case "unix", "unixgram", "tcp", "udp", logstream.KafkaScheme, logstream.AWS3Scheme: // Keep the scheme. glog.V(2).Infof("AddPattern(%v): is a socket", path) return t.TailPath(path)