Stasher Prototyping a logstash alternative 23 February 2017 Silvan Jegen Computational Linguist me@sillymon.ch https://sillymon.ch * Logstash .image img/icon-logstash-bb.png .caption logstash logo from the [[https://www.elastic.co/products/logstash][official logstash site]] * What is it? - Now a project of elastic - Used to be log-focused .image img/logstash-img1.png * What does it do? "Centralize, Transform & Stash your data" .image img/logstash-overview.png * "Centralize, Transform & Stash" - "Inputs" from Log files, DBs, HTTP - "Filters" for cleaning and transforming - "Outputs" for archiving, alerting, monitoring, etc. * "Centralize, Transform & Stash" Plugins - Inputs: file, syslog, redis, ... - Filters: grok, mutate, drop, ... - Outputs: elasticsearch, file, graphite, email, ... * How does it work? Custom configuration language input { stdin {} } filter { anonymize { algorithm => "SHA256" fields => ["field1", "field2"] key => "something" } } output { elasticsearch { hosts => ["localhost:9200"] } csv { fields => ["field1", "[nested][field]"] path => "./test-%{+YYYY-MM-dd}.txt" } } * Plugins filter { anonymize { algorithm => "SHA256" fields => ["field1", "field2"] key => "something" } } * Some statistics - Written in Ruby - ~25K LOC in 377 files - 360+ contributors - 7'600+ commits - 7'000+ stars * Stasher Why? - Apparently Logstash is very slow - Generality of the work flow - I like Go * Implementation * Interfaces type Input interface { Start() chan *work.Work } type Filter interface { Filter(*work.Work) *work.Work } type Output interface { Output(*work.Work) error } * Manager type Manager struct { Input input.Input Filter filter.Filter Output output.Output } * Manager func (m *Manager) Run() { var wg sync.WaitGroup ic := m.Input.Start() for w := range ic { if w.Error() != nil { fmt.Printf("Got an error when getting Work input: %q\n", w.Error()) continue } wg.Add(1) go func(w *work.Work) { nw := m.Filter.Filter(w) err := nw.Error() if err != nil { fmt.Printf("Got an error when filtering Work: %q\n", err) } err = m.Output.Output(nw) if err != nil { fmt.Printf("Got an error when outputting Work: %q\n", err) } wg.Done() }(w) } wg.Wait() } * Main advantages over shell script - Error handling - Declarative config * Error handling for w := range ic { if w.Err != nil { fmt.Printf("Got an error when getting Work input: %q\n", w.Err) continue } wg.Add(1) go func(w *work.Work) { nw := m.Filter.Filter(w) err := nw.Error() if err != nil { fmt.Printf("Got an error when filtering Work: %q\n", err) } err = m.Output.Output(nw) if err != nil { fmt.Printf("Got an error when outputting Work: %q\n", err) } wg.Done() }(w) } * Config parser - Currently only supports string literals (no arrays) - Hand-written parser - Uses the Registry to get the modules * Registry registry/registry.go var ( Inputregistry map[string]func(map[string]string) input.Input Filterregistry map[string]func(map[string]string) filter.Filter Outputregistry map[string]func(map[string]string) output.Output ) * Registry input/http/http.go func init() { registry.Inputregistry["http"] = New } * Registry conf/init.go import ( // Initialize the different modules. By importing them in this // way, their constructors are registered in the registry. _ "github.com/Shugyousha/stasher/input/http" _ "github.com/Shugyousha/stasher/input/stdin" _ "github.com/Shugyousha/stasher/filter/http" _ "github.com/Shugyousha/stasher/filter/str" _ "github.com/Shugyousha/stasher/output/http" _ "github.com/Shugyousha/stasher/output/stdout" ) * Demo * High-level TODOs - Watch input directories - Multiple modules for each main module - Proper (configurable?) error handling - If else? * Considerations - "Dynamic" Plugins (Go 1.8!?) - Use HTTP for everything? - Better off with shell scripts? - Generality and error handling - DSL/declarative vs. Programming language balance?