Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ build:
rm -rf build
GOOS=linux GOARCH=${ARCH} go build -o build/extensions/firetail-extension-${ARCH}
chmod +x build/extensions/firetail-extension-${ARCH}
cp firetail-wrapper.sh build/firetail-wrapper.sh

.PHONY: package
package: build
cd build && zip -r ../build/firetail-extension-${ARCH}-${VERSION}.zip extensions/
cd build && zip -r ../build/firetail-extension-${ARCH}-${VERSION}.zip extensions/ firetail-wrapper.sh

.PHONY: publish
publish:
Expand All @@ -34,4 +35,4 @@ public:

.PHONY: add
add:
aws lambda update-function-configuration --region ${AWS_REGION} --function-name ${FUNCTION_NAME} --layers ${LAYER_ARN}
aws lambda update-function-configuration --region ${AWS_REGION} --function-name ${FUNCTION_NAME} --layers ${LAYER_ARN}
3 changes: 1 addition & 2 deletions examples/minimal-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ This example demonstrates how to setup a simple HTTP GET endpoint. Once you fetc
## Deploy

```bash
pip3 install -t src/vendor -r aws_requirements.txt
npm install
serverless deploy
serverless deploy --param firetail-token=YOUR_API_TOKEN
```

The expected result should be similar to:
Expand Down
1 change: 0 additions & 1 deletion examples/minimal-python/aws_requirements.txt

This file was deleted.

14 changes: 3 additions & 11 deletions examples/minimal-python/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,13 @@
import sys

# Deps in src/vendor
sys.path.insert(0, 'src/vendor')
sys.path.insert(0, "src/vendor")

from firetail_lambda import firetail_handler, firetail_app # noqa: E402
app = firetail_app()


@firetail_handler(app)
def endpoint(event, context):
current_time = datetime.datetime.now().time()
return {
"statusCode": 200,
"body": json.dumps({
"message": "Hello, the current time is %s" % current_time
}),
"headers": {
"Current-Time": "%s" % current_time
}
"body": json.dumps({"message": "Hello, the current time is %s" % current_time}),
"headers": {"Current-Time": "%s" % current_time},
}
1 change: 1 addition & 0 deletions examples/minimal-python/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ provider:
environment:
FIRETAIL_API_TOKEN: ${param:firetail-token}
FIRETAIL_EXTENSION_DEBUG: TRUE
AWS_LAMBDA_EXEC_WRAPPER: /opt/firetail-wrapper.sh
tracing: true
iamRoleStatements:
- Effect: "Allow"
Expand Down
4 changes: 4 additions & 0 deletions firetail-wrapper.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
args=("$@")
export AWS_LAMBDA_RUNTIME_API="127.0.0.1:${FIRETAIL_LAMBDA_EXTENSION_PORT:-9009}"
exec "${args[@]}"
55 changes: 55 additions & 0 deletions firetail/record_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package firetail

import "log"

// recordReceiver receives records from the client into batches & passes them to the batch callback. If the batch callback
// returns an err, it does not remove the log entries from the batch.
func RecordReceiver(recordsChannel chan Record, maxBatchSize int, firetailApiUrl, firetailApiToken string) {
recordsBatch := []Record{}

for {
newRecords, recordsRemaining := receiveRecords(recordsChannel, maxBatchSize-len(recordsBatch))
recordsBatch = append(recordsBatch, newRecords...)

// If the batch is empty, but there's records remaining, then we continue; else we return.
if len(recordsBatch) == 0 {
if recordsRemaining {
continue
} else {
return
}

Check warning on line 20 in firetail/record_receiver.go

View check run for this annotation

Codecov / codecov/patch

firetail/record_receiver.go#L7-L20

Added lines #L7 - L20 were not covered by tests
}

// Give the batch to the batch callback. If it errs, we continue
recordsSent, err := SendRecordsToSaaS(recordsBatch, firetailApiUrl, firetailApiToken)
if err != nil {
log.Println("Error sending records to Firetail:", err.Error())
continue

Check warning on line 27 in firetail/record_receiver.go

View check run for this annotation

Codecov / codecov/patch

firetail/record_receiver.go#L24-L27

Added lines #L24 - L27 were not covered by tests
}
log.Println("Successfully sent", recordsSent, "record(s) to Firetail.")

// If the batch callback succeeded, we can clear the batch!
recordsBatch = []Record{}

Check warning on line 32 in firetail/record_receiver.go

View check run for this annotation

Codecov / codecov/patch

firetail/record_receiver.go#L29-L32

Added lines #L29 - L32 were not covered by tests
}
}

// ReceiveRecords returns a slice of firetail Records up to the size of `limit`, and a boolean indicating that the channel
// still has items to be read - it will only be `false` when the channel is closed & empty. It achieves this by continuously
// reading from the log server's recordsChannel until it's empty, or the size limit has been reached.
func receiveRecords(recordsChannel chan Record, limit int) ([]Record, bool) {
records := []Record{}
for {
select {
case record, open := <-recordsChannel:
if !open {
return records, false
}
records = append(records, record)
if len(records) == limit {
return records, true
}
default:
return records, true

Check warning on line 52 in firetail/record_receiver.go

View check run for this annotation

Codecov / codecov/patch

firetail/record_receiver.go#L39-L52

Added lines #L39 - L52 were not covered by tests
}
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-chi/chi/v5 v5.2.1
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/aws/aws-lambda-go v1.34.1/go.mod h1:jwFe2KmMsHmffA1X2R09hH6lFzJQxzI8q
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.2.1 h1:KOIHODQj58PmL80G2Eak4WdvUzjSJSm0vG72crDCqb8=
github.com/go-chi/chi/v5 v5.2.1/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
Expand Down
40 changes: 31 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import (
"firetail-lambda-extension/extensionsapi"
"firetail-lambda-extension/firetail"
"firetail-lambda-extension/logsapi"
"firetail-lambda-extension/proxy"
"fmt"
"io/ioutil"
"log"
Expand Down Expand Up @@ -35,16 +37,36 @@
}
log.Println("Registered extension, ID:", extensionClient.ExtensionID)

// Create a logsApiClient, start it & remember to shut it down when we're done
logsApiClient, err := logsapi.NewClient(logsapi.Options{
ExtensionID: extensionClient.ExtensionID,
LogServerAddress: "sandbox:1234",
})
if err != nil {
panic(err)
// In legacy mode, we use the logs API. Otherwise, we use the new proxy client.
if isLegacy, err := strconv.ParseBool(os.Getenv("FIRETAIL_EXTENSION_LEGACY")); err == nil && isLegacy {
// Create a logsApiClient, start it & remember to shut it down when we're done
logsApiClient, err := logsapi.NewClient(logsapi.Options{
ExtensionID: extensionClient.ExtensionID,
LogServerAddress: "sandbox:1234",
})
if err != nil {
panic(err)

Check warning on line 48 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L42-L48

Added lines #L42 - L48 were not covered by tests
}
go logsApiClient.Start(ctx)
defer logsApiClient.Shutdown(ctx)

Check warning on line 51 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L50-L51

Added lines #L50 - L51 were not covered by tests
} else {
firetailApiUrl, firetailApiUrlSet := os.LookupEnv("FIRETAIL_API_URL")
if !firetailApiUrlSet {
firetailApiUrl = logsapi.DefaultFiretailApiUrl
}
proxyServer, err := proxy.NewProxyServer()
if err != nil {
panic(err)

Check warning on line 59 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L59

Added line #L59 was not covered by tests
}
go proxyServer.ListenAndServe()
defer proxyServer.Shutdown(ctx)
go firetail.RecordReceiver(
proxyServer.RecordsChannel,
logsapi.DefaultMaxBatchSize,
firetailApiUrl,
os.Getenv("FIRETAIL_API_TOKEN"),
)
}
go logsApiClient.Start(ctx)
defer logsApiClient.Shutdown(ctx)

// awaitShutdown will block until a shutdown event is received, or the context is cancelled
reason, err := awaitShutdown(extensionClient, ctx)
Expand Down
184 changes: 184 additions & 0 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package proxy

import (
"context"
"encoding/json"
"firetail-lambda-extension/firetail"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"strconv"
"time"

"github.com/go-chi/chi/v5"
)

type ProxyServer struct {
runtimeEndpoint string
port int
server *http.Server
eventsChannel chan *http.Response
lambdaResponseChannel chan *http.Request
RecordsChannel chan firetail.Record
}

func NewProxyServer() (*ProxyServer, error) {
portStr, portSet := os.LookupEnv("FIRETAIL_LAMBDA_EXTENSION_PORT")
var port int
var err error
if port, err = strconv.Atoi(portStr); err != nil || !portSet {
port = 9009
}

ps := &ProxyServer{
runtimeEndpoint: os.Getenv("AWS_LAMBDA_RUNTIME_API"),
port: port,
eventsChannel: make(chan *http.Response),
lambdaResponseChannel: make(chan *http.Request),
RecordsChannel: make(chan firetail.Record),
}

r := chi.NewRouter()

handleError := func(w http.ResponseWriter, r *http.Request) {
http.Error(w, http.StatusText(404), 404)
}
r.NotFound(handleError)
r.MethodNotAllowed(handleError)

initEndpoint, err := url.Parse(
fmt.Sprintf(
"http://%s/2018-06-01/runtime/init/error",
ps.runtimeEndpoint,
),
)
if err != nil {
return nil, err
}
initErrorHandler := getProxyHandler(
func(r *http.Request) (*url.URL, error) {
return initEndpoint, nil
},
nil,
nil,
)
r.Post("/2018-06-01/runtime/init/error", initErrorHandler)

invokeErrorHandler := getProxyHandler(
func(r *http.Request) (*url.URL, error) {
return url.Parse(
fmt.Sprintf(
"http://%s/2018-06-01/runtime/invocation/%s/error",
ps.runtimeEndpoint,
chi.URLParam(r, "requestId"),
),
)
},
nil,
nil,
)
r.Post("/2018-06-01/runtime/invocation/{requestId}/error", invokeErrorHandler)

nextEndpoint, err := url.Parse(
fmt.Sprintf(
"http://%s/2018-06-01/runtime/invocation/next",
ps.runtimeEndpoint,
),
)
if err != nil {
return nil, err
}
nextHandler := getProxyHandler(
func(r *http.Request) (*url.URL, error) {
return nextEndpoint, nil
},
nil,
&ps.eventsChannel,
)
r.Get("/2018-06-01/runtime/invocation/next", nextHandler)

responseHandler := getProxyHandler(
func(r *http.Request) (*url.URL, error) {
return url.Parse(
fmt.Sprintf(
"http://%s/2018-06-01/runtime/invocation/%s/response",
ps.runtimeEndpoint,
chi.URLParam(r, "requestId"),
),
)
},
&ps.lambdaResponseChannel,
nil,
)
r.Post("/2018-06-01/runtime/invocation/{requestId}/response", responseHandler)

ps.server = &http.Server{
Addr: fmt.Sprintf(":%d", ps.port),
Handler: r,
}

return ps, nil
}

func (p *ProxyServer) recordAssembler() {
for {
// Events and lambda responses should come in pairs, event first and response second.
event, ok := <-p.eventsChannel
if !ok {
log.Println("Events channel closed, stopping record assembler.")
return
}

// We can record the time between receiving the event and the response
// to calculate the execution time of the lambda function.
eventReceivedAt := time.Now()

lambdaResponse, ok := <-p.lambdaResponseChannel
if !ok {
log.Println("Lambda response channel closed, stopping record assembler.")
return
}

executionTime := time.Since(eventReceivedAt)

eventBody, err := io.ReadAll(event.Body)
if err != nil {
log.Println("Error reading event body:", err)
continue
}
responseBody, err := io.ReadAll(lambdaResponse.Body)
if err != nil {
log.Println("Error reading response body:", err)
continue
}

var recordResponse firetail.RecordResponse
if err := json.Unmarshal(responseBody, &recordResponse); err != nil {
log.Println("Error unmarshalling response body:", err)
continue
}

p.RecordsChannel <- firetail.Record{
Event: eventBody,
Response: recordResponse,
ExecutionTime: executionTime.Seconds(),
}
}
}

func (p *ProxyServer) ListenAndServe() error {
go p.recordAssembler()
return p.server.ListenAndServe()
}

func (p *ProxyServer) Shutdown(ctx context.Context) error {
if err := p.server.Shutdown(ctx); err != nil {
return err
}
close(p.eventsChannel)
close(p.lambdaResponseChannel)
return nil
}
Loading
Loading