From 2e420b099775ca52d261b72092ce7e615a7a5847 Mon Sep 17 00:00:00 2001 From: Silvan Jegen Date: Tue, 29 Nov 2016 21:32:22 +0100 Subject: Run one Work load in one Goroutine --- manager.go | 101 +++++++++++++++++++++++++++++++------------------------------ 1 file changed, 52 insertions(+), 49 deletions(-) diff --git a/manager.go b/manager.go index b6b41fb..3253129 100644 --- a/manager.go +++ b/manager.go @@ -8,6 +8,7 @@ import ( "io" "os" "strings" + "sync" ) type Manager struct { @@ -25,7 +26,7 @@ type StdinInput struct { } type Filter interface { - Filter(chan *Work) chan *Work + Filter(*Work) *Work } type StringFilter struct { @@ -34,7 +35,7 @@ type StringFilter struct { } type Output interface { - Output(chan *Work) + Output(*Work) } type StdoutOutput struct { @@ -67,67 +68,69 @@ func (i *StdinInput) Start() chan *Work { return i.retchan } -func (f *StringFilter) Filter(ic chan *Work) chan *Work { - f.retchan = make(chan *Work, 100) +func (f *StringFilter) Filter(w *Work) *Work { if f.FilterFuncMap == nil { f.FilterFuncMap = make(map[string]func(string) string, 10) f.FilterFuncMap["F"] = func(s string) string { return strings.ToUpper(s) } } - go func() { - for w := range ic { - dec := json.NewDecoder(bytes.NewReader(w.data)) - jm := make(map[string]string, 10) - - err := dec.Decode(&jm) - if err == io.EOF { - fmt.Printf("EOF jm: %v\n", jm) - f.retchan <- w - continue - } - if err != nil { - fmt.Printf("Error when decoding JSON: %q\n", err) - w.err = err - } + dec := json.NewDecoder(bytes.NewReader(w.data)) + jm := make(map[string]string, 10) - changed := false - for k, v := range jm { - ff, ok := f.FilterFuncMap[k] - if !ok { - continue - } + err := dec.Decode(&jm) + if err == io.EOF { + fmt.Printf("EOF jm: %v\n", jm) + return w + } + if err != nil { + fmt.Printf("Error when decoding JSON: %q\n", err) + w.err = err + return w + } - jm[k] = ff(v) - changed = true - } - if changed { - bs, err := json.Marshal(jm) - if err != nil { - fmt.Printf("Error when marshalling JSON: %q\n", err) - w.err = err - } else { - w.data = bs - } - } + changed := false + for k, v := range jm { + ff, ok := f.FilterFuncMap[k] + if !ok { + continue + } - f.retchan <- w + jm[k] = ff(v) + changed = true + } + if changed { + bs, err := json.Marshal(jm) + if err != nil { + fmt.Printf("Error when marshalling JSON: %q\n", err) + w.err = err + } else { + w.data = bs } - close(f.retchan) - }() + } - return f.retchan + return w } -func (o *StdoutOutput) Output(wc chan *Work) { - for w := range wc { - fmt.Printf("%s\n", w.data) - } +func (o *StdoutOutput) Output(w *Work) { + fmt.Printf("%s\n", w.data) } -func (m *Manager) Go() { +func (m *Manager) Run() { + var wg sync.WaitGroup + ic := m.Input.Start() - fc := m.Filter.Filter(ic) - m.Output.Output(fc) + for w := range ic { + wg.Add(1) + + go func(w *Work) { + nw := m.Filter.Filter(w) + m.Output.Output(nw) + + wg.Done() + }(w) + } + + wg.Wait() } func main() { @@ -137,5 +140,5 @@ func main() { Output: &StdoutOutput{}, } - m.Go() + m.Run() } -- cgit v1.2.3