summaryrefslogtreecommitdiff
path: root/manager.go
diff options
context:
space:
mode:
authorSilvan Jegen <s.jegen@gmail.com>2016-11-29 21:32:22 +0100
committerSilvan Jegen <s.jegen@gmail.com>2016-11-29 21:32:22 +0100
commit2e420b099775ca52d261b72092ce7e615a7a5847 (patch)
tree1c6d274c2fc0c1d5c983c1e22d32f6137f89d50b /manager.go
parentdaa631dbb2891b6fb88357c4e6e5750bf4ed6e70 (diff)
Run one Work load in one Goroutine
Diffstat (limited to 'manager.go')
-rw-r--r--manager.go101
1 files changed, 52 insertions, 49 deletions
diff --git a/manager.go b/manager.go
index b6b41fb..3253129 100644
--- a/manager.go
+++ b/manager.go
@@ -8,6 +8,7 @@ import (
"io"
"os"
"strings"
+ "sync"
)
type Manager struct {
@@ -25,7 +26,7 @@ type StdinInput struct {
}
type Filter interface {
- Filter(chan *Work) chan *Work
+ Filter(*Work) *Work
}
type StringFilter struct {
@@ -34,7 +35,7 @@ type StringFilter struct {
}
type Output interface {
- Output(chan *Work)
+ Output(*Work)
}
type StdoutOutput struct {
@@ -67,67 +68,69 @@ func (i *StdinInput) Start() chan *Work {
return i.retchan
}
-func (f *StringFilter) Filter(ic chan *Work) chan *Work {
- f.retchan = make(chan *Work, 100)
+func (f *StringFilter) Filter(w *Work) *Work {
if f.FilterFuncMap == nil {
f.FilterFuncMap = make(map[string]func(string) string, 10)
f.FilterFuncMap["F"] = func(s string) string { return strings.ToUpper(s) }
}
- go func() {
- for w := range ic {
- dec := json.NewDecoder(bytes.NewReader(w.data))
- jm := make(map[string]string, 10)
-
- err := dec.Decode(&jm)
- if err == io.EOF {
- fmt.Printf("EOF jm: %v\n", jm)
- f.retchan <- w
- continue
- }
- if err != nil {
- fmt.Printf("Error when decoding JSON: %q\n", err)
- w.err = err
- }
+ dec := json.NewDecoder(bytes.NewReader(w.data))
+ jm := make(map[string]string, 10)
- changed := false
- for k, v := range jm {
- ff, ok := f.FilterFuncMap[k]
- if !ok {
- continue
- }
+ err := dec.Decode(&jm)
+ if err == io.EOF {
+ fmt.Printf("EOF jm: %v\n", jm)
+ return w
+ }
+ if err != nil {
+ fmt.Printf("Error when decoding JSON: %q\n", err)
+ w.err = err
+ return w
+ }
- jm[k] = ff(v)
- changed = true
- }
- if changed {
- bs, err := json.Marshal(jm)
- if err != nil {
- fmt.Printf("Error when marshalling JSON: %q\n", err)
- w.err = err
- } else {
- w.data = bs
- }
- }
+ changed := false
+ for k, v := range jm {
+ ff, ok := f.FilterFuncMap[k]
+ if !ok {
+ continue
+ }
- f.retchan <- w
+ jm[k] = ff(v)
+ changed = true
+ }
+ if changed {
+ bs, err := json.Marshal(jm)
+ if err != nil {
+ fmt.Printf("Error when marshalling JSON: %q\n", err)
+ w.err = err
+ } else {
+ w.data = bs
}
- close(f.retchan)
- }()
+ }
- return f.retchan
+ return w
}
-func (o *StdoutOutput) Output(wc chan *Work) {
- for w := range wc {
- fmt.Printf("%s\n", w.data)
- }
+func (o *StdoutOutput) Output(w *Work) {
+ fmt.Printf("%s\n", w.data)
}
-func (m *Manager) Go() {
+func (m *Manager) Run() {
+ var wg sync.WaitGroup
+
ic := m.Input.Start()
- fc := m.Filter.Filter(ic)
- m.Output.Output(fc)
+ for w := range ic {
+ wg.Add(1)
+
+ go func(w *Work) {
+ nw := m.Filter.Filter(w)
+ m.Output.Output(nw)
+
+ wg.Done()
+ }(w)
+ }
+
+ wg.Wait()
}
func main() {
@@ -137,5 +140,5 @@ func main() {
Output: &StdoutOutput{},
}
- m.Go()
+ m.Run()
}