From 26412f3077698dabcdbdf6a6e40276c56a6f74b5 Mon Sep 17 00:00:00 2001 From: Silvan Jegen Date: Fri, 9 Dec 2016 21:12:18 +0100 Subject: Reorganize files --- filter/string.go | 56 ++++++++++++++++ input/http.go | 45 +++++++++++++ input/stdin.go | 36 +++++++++++ main.go | 22 +++++++ manager.go | 191 ------------------------------------------------------- output/stdout.go | 14 ++++ types.go | 48 ++++++++++++++ work/work.go | 10 +++ 8 files changed, 231 insertions(+), 191 deletions(-) create mode 100644 filter/string.go create mode 100644 input/http.go create mode 100644 input/stdin.go create mode 100644 main.go delete mode 100644 manager.go create mode 100644 output/stdout.go create mode 100644 types.go create mode 100644 work/work.go diff --git a/filter/string.go b/filter/string.go new file mode 100644 index 0000000..cb44d75 --- /dev/null +++ b/filter/string.go @@ -0,0 +1,56 @@ +package filter + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + + "github.com/Shugyousha/stasher/work" +) + +type StringFilter struct { + FilterFuncMap map[string]func(string) string +} + +func NewStringFilter(ffmap map[string]func(string) string) *StringFilter { + return &StringFilter{FilterFuncMap: ffmap} +} + +func (f *StringFilter) Filter(w *work.Work) *work.Work { + dec := json.NewDecoder(bytes.NewReader(w.Data)) + jm := make(map[string]string, 10) + + err := dec.Decode(&jm) + if err == io.EOF { + fmt.Printf("EOF jm: %v\n", jm) + return w + } + if err != nil { + fmt.Printf("Error when decoding JSON: %q\n", err) + w.Err = err + return w + } + + changed := false + for field, ff := range f.FilterFuncMap { + str, ok := jm[field] + if !ok { + continue + } + + jm[field] = ff(str) + changed = true + } + if changed { + bs, err := json.Marshal(jm) + if err != nil { + fmt.Printf("Error when marshalling JSON: %q\n", err) + w.Err = err + } else { + w.Data = bs + } + } + + return w +} diff --git a/input/http.go b/input/http.go new file mode 100644 index 0000000..285f1f6 --- /dev/null +++ b/input/http.go @@ -0,0 +1,45 @@ +package input + +import ( + "fmt" + "io/ioutil" + "net/http" + + "github.com/Shugyousha/stasher/work" +) + +type HTTPInput struct { + retchan chan *work.Work + prefix string + port string +} + +func NewHTTPInput(prefix, port string) *HTTPInput { + return &HTTPInput{prefix: prefix, port: port} +} + +func (hi *HTTPInput) httphandler(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + fmt.Printf("Expected POST method was: %q\n", r.Method) + return + } + all, err := ioutil.ReadAll(r.Body) + if err != nil { + fmt.Printf("Error when reading HTTP request body: %q\n", err) + return + } + hi.retchan <- &work.Work{Data: all} +} + +func (hi *HTTPInput) Start() chan *work.Work { + hi.retchan = make(chan *work.Work, 100) + + go func() { + http.HandleFunc("/"+hi.prefix, hi.httphandler) + err := http.ListenAndServe(hi.port, nil) + fmt.Printf("Error when serving HTTP: %q\n", err) + close(hi.retchan) + }() + + return hi.retchan +} diff --git a/input/stdin.go b/input/stdin.go new file mode 100644 index 0000000..5acfa5e --- /dev/null +++ b/input/stdin.go @@ -0,0 +1,36 @@ +package input + +import ( + "bufio" + "fmt" + "io" + "os" + + "github.com/Shugyousha/stasher/work" +) + +type StdinInput struct { + retchan chan *work.Work +} + +func (i *StdinInput) Start() chan *work.Work { + i.retchan = make(chan *work.Work, 100) + r := bufio.NewReader(os.Stdin) + + go func() { + for { + bs, err := r.ReadBytes(byte('\n')) + if err == io.EOF { + break + } + if err != nil { + fmt.Printf("Error when reading input from Stdin: %q", err) + os.Exit(1) + } + i.retchan <- &work.Work{Data: bs} + } + close(i.retchan) + }() + + return i.retchan +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..7d60fb5 --- /dev/null +++ b/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "strings" + + "github.com/Shugyousha/stasher/filter" + "github.com/Shugyousha/stasher/input" + "github.com/Shugyousha/stasher/output" +) + +func main() { + ffmap := make(map[string]func(string) string, 10) + ffmap["F"] = func(s string) string { return strings.ToUpper(s) } + + m := Manager{ + Input: input.NewHTTPInput("", ":8080"), + Filter: filter.NewStringFilter(ffmap), + Output: &output.StdoutOutput{}, + } + + m.Run() +} diff --git a/manager.go b/manager.go deleted file mode 100644 index 2029552..0000000 --- a/manager.go +++ /dev/null @@ -1,191 +0,0 @@ -package main - -import ( - "bufio" - "bytes" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "net/http" - "os" - "strings" - "sync" -) - -type Manager struct { - Input Input - Filter Filter - Output Output -} - -type Input interface { - Start() chan *Work -} - -type StdinInput struct { - retchan chan *Work -} - -type HTTPInput struct { - retchan chan *Work - prefix string - port string -} - -type Filter interface { - Filter(*Work) *Work -} - -type StringFilter struct { - FilterFuncMap map[string]func(string) string -} - -type Output interface { - Output(*Work) -} - -type StdoutOutput struct { -} - -type Work struct { - data []byte - err error -} - -func (w *Work) Error() error { - return w.err -} - -func (i *StdinInput) Start() chan *Work { - i.retchan = make(chan *Work, 100) - r := bufio.NewReader(os.Stdin) - - go func() { - for { - bs, err := r.ReadBytes(byte('\n')) - if err == io.EOF { - break - } - if err != nil { - fmt.Printf("Error when reading input from Stdin: %q", err) - os.Exit(1) - } - i.retchan <- &Work{data: bs} - } - close(i.retchan) - }() - - return i.retchan -} - -func (hi *HTTPInput) httphandler(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - fmt.Printf("Expected POST method was: %q\n", r.Method) - return - } - all, err := ioutil.ReadAll(r.Body) - if err != nil { - fmt.Printf("Error when reading HTTP request body: %q\n", err) - return - } - hi.retchan <- &Work{data: all} -} - -func (hi *HTTPInput) Start() chan *Work { - hi.retchan = make(chan *Work, 100) - - go func() { - http.HandleFunc("/"+hi.prefix, hi.httphandler) - err := http.ListenAndServe(hi.port, nil) - fmt.Printf("Error when serving HTTP: %q\n", err) - close(hi.retchan) - }() - - return hi.retchan -} - -func NewStringFilter(ffmap map[string]func(string) string) *StringFilter { - return &StringFilter{FilterFuncMap: ffmap} -} - -func NewHTTPInput(prefix, port string) *HTTPInput { - return &HTTPInput{prefix: prefix, port: port} -} - -func (f *StringFilter) Filter(w *Work) *Work { - dec := json.NewDecoder(bytes.NewReader(w.data)) - jm := make(map[string]string, 10) - - err := dec.Decode(&jm) - if err == io.EOF { - fmt.Printf("EOF jm: %v\n", jm) - return w - } - if err != nil { - fmt.Printf("Error when decoding JSON: %q\n", err) - w.err = err - return w - } - - changed := false - for field, ff := range f.FilterFuncMap { - str, ok := jm[field] - if !ok { - continue - } - - jm[field] = ff(str) - changed = true - } - if changed { - bs, err := json.Marshal(jm) - if err != nil { - fmt.Printf("Error when marshalling JSON: %q\n", err) - w.err = err - } else { - w.data = bs - } - } - - return w -} - -func (o *StdoutOutput) Output(w *Work) { - fmt.Printf("%s\n", w.data) -} - -func (m *Manager) Run() { - var wg sync.WaitGroup - - ic := m.Input.Start() - for w := range ic { - wg.Add(1) - - go func(w *Work) { - nw := m.Filter.Filter(w) - err := nw.Error() - if err != nil { - fmt.Printf("Got an error when processing Work: %q\n", err) - } - m.Output.Output(nw) - - wg.Done() - }(w) - } - - wg.Wait() -} - -func main() { - ffmap := make(map[string]func(string) string, 10) - ffmap["F"] = func(s string) string { return strings.ToUpper(s) } - - m := Manager{ - Input: NewHTTPInput("", ":8080"), - Filter: NewStringFilter(ffmap), - Output: &StdoutOutput{}, - } - - m.Run() -} diff --git a/output/stdout.go b/output/stdout.go new file mode 100644 index 0000000..5cb753e --- /dev/null +++ b/output/stdout.go @@ -0,0 +1,14 @@ +package output + +import ( + "fmt" + + "github.com/Shugyousha/stasher/work" +) + +type StdoutOutput struct { +} + +func (o *StdoutOutput) Output(w *work.Work) { + fmt.Printf("%s\n", w.Data) +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..abd2658 --- /dev/null +++ b/types.go @@ -0,0 +1,48 @@ +package main + +import ( + "fmt" + "sync" + + "github.com/Shugyousha/stasher/work" +) + +type Input interface { + Start() chan *work.Work +} + +type Filter interface { + Filter(*work.Work) *work.Work +} + +type Output interface { + Output(*work.Work) +} + +type Manager struct { + Input Input + Filter Filter + Output Output +} + +func (m *Manager) Run() { + var wg sync.WaitGroup + + ic := m.Input.Start() + for w := range ic { + wg.Add(1) + + go func(w *work.Work) { + nw := m.Filter.Filter(w) + err := nw.Error() + if err != nil { + fmt.Printf("Got an error when processing Work: %q\n", err) + } + m.Output.Output(nw) + + wg.Done() + }(w) + } + + wg.Wait() +} diff --git a/work/work.go b/work/work.go new file mode 100644 index 0000000..07461ea --- /dev/null +++ b/work/work.go @@ -0,0 +1,10 @@ +package work + +type Work struct { + Data []byte + Err error +} + +func (w *Work) Error() error { + return w.Err +} -- cgit v1.2.1-18-gbd029