Stasher Prototyping a logstash alternative 23 February 2017 Silvan Jegen me@sillymon.ch https://sillymon.ch * Logstash .image img/icon-logstash-bb.png .caption logstash logo from the [[https://www.elastic.co/products/logstash][official logstash site]] * What is it and what does it do? "Centralize, Transform & Stash your data" .image img/logstash-overview.png * "Centralize, Transform & Stash" - "Inputs" from Log files, DBs, HTTP - "Filters" for cleaning and transforming - "Outputs" for archiving, alerting, monitoring, etc. Using text-based formats as the data representation * "Centralize, Transform & Stash" Plugins - Inputs: file, syslog, redis, ... - Filters: grok, mutate, drop, ... - Outputs: elasticsearch, file, graphite, email, ... * How does it work? Custom configuration language input { stdin {} } filter { anonymize { algorithm => "SHA256" fields => ["field1", "field2"] key => "something" } } output { elasticsearch { hosts => ["localhost:9200"] } csv { fields => ["field1", "[nested][field]"] path => "./test-%{+YYYY-MM-dd}.txt" } } * Plugins filter { anonymize { algorithm => "SHA256" fields => ["field1", "field2"] key => "something" } } * Stasher * Stasher Why? - Apparently Logstash is very slow - Generality of the work flow - I like Go Code available at .link https://sillymon.ch/cgit/stasher/ https://sillymon.ch/cgit/stasher/ * Implementation * Implementation - Interfaces - "Manager" - Error handling - Config parser - "Registry" * Interfaces {input,filter,output}/interface.go type Input interface { Start() chan *work.Work } type Filter interface { Filter(*work.Work) *work.Work } type Output interface { Output(*work.Work) error } * Manager manager/manager.go type Manager struct { Input input.Input Filter filter.Filter Output output.Output } * Manager func (m *Manager) Run() { var wg sync.WaitGroup ic := m.Input.Start() for w := range ic { if w.Error() != nil { fmt.Printf("Got an error when getting Work input: %q\n", w.Error()) continue } wg.Add(1) go func(w *work.Work) { defer wg.Done() nw := m.Filter.Filter(w) err := nw.Error() if err != nil { fmt.Printf("Got an error when filtering Work: %q\n", err) return } err = m.Output.Output(nw) if err != nil { fmt.Printf("Got an error when outputting Work: %q\n", err) } }(w) } wg.Wait() } * Error handling for w := range ic { if w.Error() != nil { fmt.Printf("Got an error when getting Work input: %q\n", w.Error()) continue } wg.Add(1) go func(w *work.Work) { defer wg.Done() nw := m.Filter.Filter(w) err := nw.Error() if err != nil { fmt.Printf("Got an error when filtering Work: %q\n", err) return } err = m.Output.Output(nw) if err != nil { fmt.Printf("Got an error when outputting Work: %q\n", err) } }(w) } * Config parser - Hand-written parser - Currently only supports string literals (no arrays) - Uses the "Registry" to get the modules * Registry registry/registry.go var ( Inputregistry map[string]func(map[string]string) input.Input Filterregistry map[string]func(map[string]string) filter.Filter Outputregistry map[string]func(map[string]string) output.Output ) * Registry input/http/http.go func init() { registry.Inputregistry["http"] = New } * Implemented modules - input: stdin, http - filter: str(ing), http - output: stdout, http * Demo (a.k.a. "It totally works, I swear!") * TODOs - Watch input directories - Multiple modules for each main module - Proper (configurable?) error handling * Considerations & Conclusions - Error handling and logging are important - Use HTTP for everything? - Level of "declarativeness" in the configuration - Still better off with shell scripts and pipes?