Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
bin
*debug*
tmp
.idea
.idea
*.log
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1 +1 @@
go 1.23.5
golang 1.25.1
5 changes: 4 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}${pathSeparator}cmd/main.go"
"program": "${workspaceFolder}${pathSeparator}cmd/main.go",
"args": [
"-v"
]
}
]
}
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
build:
@go build -C cmd -o ..\bin\app.exe
build-nix:
@rm ../bin/app
@go build -C cmd -o ../bin/app
run:
@make build
@".\build\app.exe"
@".\build\app.exe -v"
test:
@go test ./...
dev:
@$$GOPATH/bin/air --build.cmd "make build" --build.bin ".\bin\app.exe" --build.args_bin "-v" --build.exclude_dir "templates,build,bin,.git,node_modules"
dev-nix:
@$$GOPATH/bin/air --build.cmd "make build" --build.bin ".\bin\app.exe" --build.exclude_dir "templates,build,bin,.git,node_modules"
80 changes: 54 additions & 26 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,52 @@ package api
import (
"context"
"encoding/json"
"github.com/brinestone/dfs/fs"
"log/slog"
"net"
"net/http"
"net/url"
"time"

"github.com/brinestone/dfs/fs"
"github.com/brinestone/dfs/storage"
)

type Config struct {
Addr string
Ctx context.Context
Logger *slog.Logger
FileServer *fs.FileServer
Store *storage.Store
BasePath string
Verbose bool
}

type Server struct {
type Api struct {
Config
mux *http.ServeMux
server *http.Server
listener net.Listener
}

func (s *Server) Start() (err error) {
l, err := net.Listen("tcp", s.Addr)
func (s *Api) Start() (err error) {
s.listener, err = net.Listen("tcp", s.Addr)
if err != nil {
return
}

s.listener = l
s.server = &http.Server{
Handler: s.mux,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 15 * time.Second,
}

s.server.RegisterOnShutdown(func() {
s.Logger.Info("Server is shutting down", "addr", s.Addr)
})
s.Logger.Info("Server started successfully", "addr", s.Addr)
err = s.server.Serve(l)
s.server = &http.Server{Handler: s.mux}
s.Logger.Info("Api started successfully", "addr", s.Addr)
err = s.server.Serve(s.listener)
return
}

func (s *Server) Stop() error {
return s.server.Shutdown(s.Ctx)
func (s *Api) Stop() error {
s.server.Shutdown(s.Ctx)
return nil
}

func NewServer(c Config) (server *Server) {
server = &Server{
func NewApi(c Config) (server *Api) {
server = &Api{
Config: c,
mux: http.NewServeMux(),
}
Expand All @@ -62,19 +59,50 @@ func NewServer(c Config) (server *Server) {
return
}

func (s *Server) setupHandlers() {
func (s *Api) makeApiPath(path string) string {
pathname, err := url.JoinPath(s.BasePath, path)
if err != nil {
panic(err)
}
return pathname
}

func (s *Api) makeApiPathWithBase(path string, base string) string {
pathname, err := url.JoinPath(s.BasePath, base, path)
if err != nil {
panic(err)
}
return pathname
}

func (s *Api) registerHandler(path string, handler http.Handler) {
// pathname := s.makeApiPath(path)
s.mux.Handle(path, handler)
if s.Verbose {
s.Logger.Debug("registered handler", "path", path)
}
}

func (s *Api) registerLoggedHandler(path string, handler http.Handler) {
// Logger middleware
logMiddleware := func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.Logger.Info("Request received", "method", r.Method, "path", r.URL.Path, "remote", r.RemoteAddr)
start := time.Now()
next.ServeHTTP(w, r)
timeTaken := time.Since(start)
s.Logger.Info("Request received", "duration", timeTaken, "method", r.Method, "path", r.URL.Path, "remote", r.RemoteAddr)
})
}
// s.mux.Handle(path, logMiddleware(handler))
s.registerHandler(path, logMiddleware(handler))
}

s.mux.Handle(
"/",
logMiddleware(http.HandlerFunc(homeHandler)),
func (s *Api) setupHandlers() {
s.registerLoggedHandler(
s.makeApiPath("/health"),
http.HandlerFunc(homeHandler),
)
s.setupStorageEndpointHandlers()
}

func homeHandler(w http.ResponseWriter, r *http.Request) {
Expand Down
26 changes: 26 additions & 0 deletions api/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package api

import (
"encoding/json"
"net/http"

"github.com/brinestone/dfs/storage"
)

const storageUrlPath = "storage"

func getStorageStatsHandler(api *Api, store *storage.Store) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s, err := json.Marshal(store.GetStats())
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
api.Logger.Error("error while handling request", "err", err)
return
}
w.Write(s)
})
}

func (api *Api) setupStorageEndpointHandlers() {
api.registerLoggedHandler(api.makeApiPathWithBase("stats", storageUrlPath), getStorageStatsHandler(api, api.Store))
}
110 changes: 76 additions & 34 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,103 +4,145 @@ import (
"context"
"crypto/md5"
"encoding/hex"
"flag"
"fmt"
"github.com/google/uuid"
"log/slog"
"os"
"os/signal"
"path"
"path/filepath"
"syscall"

"github.com/akamensky/argparse"

"github.com/google/uuid"

"github.com/brinestone/dfs/api"
"github.com/brinestone/dfs/fs"
"github.com/brinestone/dfs/p2p"
"github.com/brinestone/dfs/storage"
)

var (
storageRoot = flag.String("root", path.Join(homeDir, ".dfs"), "Filesystem path to be used as root.")
bufferSize = flag.Int64("bs", 4096, "Buffer Size")
storageRoot *string
verbose *bool
bufferSize *int
homeDir, _ = os.UserHomeDir()
logger = slog.Default().WithGroup("DFS")
logger *slog.Logger
ctx = context.Background()
)

func makeServer(ctx context.Context, listenAddr string, id string) (*fs.FileServer, context.CancelFunc) {
func makeStore(root string) (store *storage.Store) {
storageConfig := storage.StoreConfig{
TransformKey: storage.CASKeyTransformer(root),
Root: root,
Logger: logger.WithGroup("store"),
}

store = storage.NewStore(storageConfig)
return
}

func makeServer(ctx context.Context, listenAddr string, id string, store *storage.Store) (*fs.FileServer, context.CancelFunc) {
fsLogger := logger.WithGroup("fs")
tcpTransportConfig := p2p.TcpTransportConfig{
ListenAddr: listenAddr,
Handshake: p2p.NoopHandshake,
Decoder: p2p.DefaultDecoder{
EncodingConfig: p2p.EncodingConfig{
BufferSize: *bufferSize,
BufferSize: int64(*bufferSize),
},
},
Logger: logger.WithGroup("FS/TCP"),
Logger: fsLogger.WithGroup("tcp"),
}
tcpTransport := p2p.NewTcpTransport(tcpTransportConfig)
serverGroup := fmt.Sprintf("fs-%s", id)
hash := md5.Sum([]byte(serverGroup))
root := path.Join(*storageRoot, hex.EncodeToString(hash[:]))

c, cancel := context.WithCancel(ctx)

serverConfig := fs.FileServerConfig{
ListenAddr: listenAddr,
StorageRoot: root,
KeyTransformer: storage.CASKeyTransformer(root),
Transport: tcpTransport,
Id: id,
Context: c,
Logger: logger.WithGroup("FS/Server"),
StreamChunkSize: *bufferSize,
Logger: fsLogger.WithGroup("server"),
StreamChunkSize: int64(*bufferSize),
}

s := fs.NewFileServer(serverConfig)
s := fs.NewFileServer(serverConfig, store)
return s, cancel
}

func startFileServer() *fs.FileServer {
serverId, err := os.Hostname()
if err != nil {
serverId = uuid.NewString() // todo: rethink this
}
server, cancel := makeServer(ctx, ":5060", serverId)
func startFileServer(serverId string, store *storage.Store) *fs.FileServer {
server, cancel := makeServer(ctx, ":5060", serverId, store)

go func() {
defer cancel()
if err = server.Start(); err != nil {
if err := server.Start(); err != nil {
logger.Error("error while starting file server", "reason", err.Error())
}
}()

return server
}

func startApi(addr string) *api.Server {
apiServer := api.NewServer(api.Config{
Addr: addr,
Ctx: ctx,
Logger: logger.WithGroup("API"),
func startApi(addr string, store *storage.Store, f *fs.FileServer) *api.Api {
apiServer := api.NewApi(api.Config{
Addr: addr,
BasePath: "/api",
Verbose: *verbose,
Ctx: ctx,
Logger: logger.WithGroup("api"),
Store: store,
FileServer: f,
})

go func() {
err := apiServer.Start()
if err != nil {
defer apiServer.Stop()
logger.Error("error while starting API", err.Error())
logger.Error("error while starting API", "err", err.Error())
return
}
}()

return apiServer
}

func parseArgs() {
parser := argparse.NewParser("print", "Prints the provided string to stdout")
storageRoot = parser.String("r", "storage-root", &argparse.Options{Required: false, Default: path.Join(homeDir, ".dfs"), Help: "The file system path to be used to store data"})
bufferSize = parser.Int("b", "buffer-size", &argparse.Options{Help: "The buffer-size to use", Default: 4096})
verbose = parser.Flag("v", "verbose", &argparse.Options{Help: "Enable verbose logs", Default: true})
if err := parser.Parse(os.Args); err != nil {
fmt.Print(parser.Usage(err))
os.Exit(1)
}
}

func setupLogger() {
if *verbose {
slog.SetLogLoggerLevel(slog.LevelDebug)
}
logger = slog.Default().WithGroup("dfs")
}

func main() {
flag.Parsed()
ctx = context.Background()
apiServer := startApi(":8000")
fileServer := startFileServer()
parseArgs()
setupLogger()
serverId, err := os.Hostname()
if err != nil {
serverId = uuid.NewString() // todo: rethink this
}

serverGroup := fmt.Sprintf("fs-%s", serverId)
hash := md5.Sum([]byte(serverGroup))
root, err := filepath.Abs(path.Join(*storageRoot, hex.EncodeToString(hash[:])))
if err != nil {
panic(err)
}

store := makeStore(root)
fileServer := startFileServer(serverId, store)
apiServer := startApi(":8000", store, fileServer)

apiServer.Config.FileServer = fileServer
endCh := make(chan os.Signal, 1)
Expand All @@ -114,7 +156,7 @@ func main() {
logger.Info("Shutting down")
err := fileServer.Shutdown()
if err != nil {
logger.Error("error while stopping File Server", "cause", err.Error())
logger.Error("error while stopping File Api", "cause", err.Error())
}
err = apiServer.Stop()
if err != nil {
Expand Down
Loading