diff --git a/README.md b/README.md index 120f9a9b..4d7d3568 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ Where supported Execbeat can be started also using the respetive service scripts * [Golang](https://golang.org/dl/) 1.7.4 * [Glide](https://github.com/Masterminds/glide) >= 0.11.0 - +* [yaml.v2](https://github.com/go-yaml/yaml) ### Build To build the binary for execbeat run the command below. This will generate a binary @@ -51,6 +51,11 @@ in the same directory with the name execbeat. make clean && make ``` +To build execbeat client which dynamically inserts commands to execbeat + +``` +cd beater/client && go build +``` ### Run To run execbeat with debugging output enabled, run: @@ -59,6 +64,9 @@ To run execbeat with debugging output enabled, run: ./execbeat -c execbeat.yml -e -d "*" ``` +To run execbeat's client +cd beater/client && ./client echo how are you + ### Test To test execbeat, run the following command: diff --git a/beater/client/execbeat_client.go b/beater/client/execbeat_client.go new file mode 100644 index 00000000..bae3dbd2 --- /dev/null +++ b/beater/client/execbeat_client.go @@ -0,0 +1,65 @@ +package main + +import ( + "fmt" + "net" +// "time" +// "strconv" +// "encoding/json" + "os" + "github.com/christiangalsterer/execbeat/config" + "github.com/elastic/beats/libbeat/logp" + "gopkg.in/yaml.v2" +) + +func CheckError(err error) { + if err != nil { + logp.Err("execbeat", "Error: " , err) + } +} + +func main() { + if (len(os.Args) == 1) { + logp.Err("execbeat", "Not enough arguments") + logp.Err("execbeat", os.Args) + return + } + ServerAddr,err := net.ResolveUDPAddr("udp","127.0.0.1:10001") + CheckError(err) + + LocalAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + CheckError(err) + + Conn, err := net.DialUDP("udp", LocalAddr, ServerAddr) + CheckError(err) + + argsWithProg := os.Args + + req := &config.ExecConfig{ + Schedule: "", + Command: "", + Args: "", + DocumentType: "", + Fields: nil} + + + if (len(os.Args[1:]) > 1){ + req.Command = argsWithProg[1] + + out := "" + for _, str := range os.Args[2:] { + out = fmt.Sprintf("%s%s%s", out, str, " ") + } + req.Args = out + }else{ + req.Command = argsWithProg[1] + } + + reqMar, _ := yaml.Marshal(req) + logp.Debug("execbeat", "client %s", string(reqMar)) + + _,err = Conn.Write(reqMar) + CheckError(err) + + defer Conn.Close() +} diff --git a/beater/execbeat.go b/beater/execbeat.go index 15f8e8a7..d2d35dba 100644 --- a/beater/execbeat.go +++ b/beater/execbeat.go @@ -41,6 +41,9 @@ func (exexBeat *Execbeat) Run(b *beat.Beat) error { go poller.Run() } + /* Watch for commands from execbeat_cli to dynamically cron-job those commands */ + go CmdMonitorServerLoop(exexBeat) + for { select { case <-exexBeat.done: diff --git a/beater/execbeat_server.go b/beater/execbeat_server.go new file mode 100644 index 00000000..af19ec87 --- /dev/null +++ b/beater/execbeat_server.go @@ -0,0 +1,56 @@ +package beat + +import ( + //"fmt" + "net" + "github.com/christiangalsterer/execbeat/config" + "github.com/elastic/beats/libbeat/logp" + "gopkg.in/yaml.v2" +) + +func CheckError(err error) { + if err != nil { + logp.Err("Error: " , err) + } +} + +func CmdMonitorServerLoop(exexBeat *Execbeat) { + /* Lets prepare a address at any address at port 10001*/ + ServerAddr,err := net.ResolveUDPAddr("udp",":10001") + CheckError(err) + + /* Now listen at selected port */ + ServerConn, err := net.ListenUDP("udp", ServerAddr) + CheckError(err) + defer ServerConn.Close() + + buf := make([]byte, 1024) + + for { + var poller *Executor + + n,addr,err := ServerConn.ReadFromUDP(buf) + if err != nil { + //fmt.Println(err); + logp.Err("execbeat", err) + } + logp.Debug("execbeat", "Received from %s", addr) + logp.Debug("execbeat", "%s", string(buf[0:n])) + //fmt.Println(string(buf[0:n])) + + //fmt.Println(buf) + data := config.ExecConfig{} + //FIXME: Unmarshalling will fail if stream/data is segmented + if err := yaml.Unmarshal([]byte(string(buf[0:n])), &data); err != nil { + logp.Err("execbeat", err) + continue + } + + logp.Debug("execbeat", "Creating poller with command: %v", data.Command) + + //Insert the command received from client in ExecConfig slice + exexBeat.ExecConfig.Execbeat.Commands = append(exexBeat.ExecConfig.Execbeat.Commands, data) + poller = NewExecutor(exexBeat, data) + go poller.Run() + } +} diff --git a/config/config.go b/config/config.go index 651b39e4..b35e6214 100644 --- a/config/config.go +++ b/config/config.go @@ -11,11 +11,11 @@ type ExecbeatConfig struct { } type ExecConfig struct { - Schedule string - Command string - Args string - DocumentType string `config:"document_type"` - Fields map[string]string `config:"fields"` + Schedule string `yaml:"schedule"` + Command string `yaml:"command"` + Args string `yaml:"args"` + DocumentType string `config:"document_type" yaml:"document_type` + Fields map[string]string `config:"fields" yaml:"fields"` } type ConfigSettings struct {