package main import ( "bufio" "bytes" "encoding/json" "fmt" "io" "os" "strings" "sync" ) type Manager struct { Input Input Filter Filter Output Output } type Input interface { Start() chan *Work } type StdinInput struct { retchan chan *Work } type Filter interface { Filter(*Work) *Work } type StringFilter struct { retchan chan *Work FilterFuncMap map[string]func(string) string } type Output interface { Output(*Work) } type StdoutOutput struct { } type Work struct { data []byte err error } func (i *StdinInput) Start() chan *Work { i.retchan = make(chan *Work, 100) r := bufio.NewReader(os.Stdin) go func() { for { bs, err := r.ReadBytes(byte('\n')) if err == io.EOF { break } if err != nil { fmt.Printf("Error when reading input from Stdin: %q", err) os.Exit(1) } i.retchan <- &Work{data: bs} } close(i.retchan) }() return i.retchan } func (f *StringFilter) Filter(w *Work) *Work { dec := json.NewDecoder(bytes.NewReader(w.data)) jm := make(map[string]string, 10) err := dec.Decode(&jm) if err == io.EOF { fmt.Printf("EOF jm: %v\n", jm) return w } if err != nil { fmt.Printf("Error when decoding JSON: %q\n", err) w.err = err return w } changed := false for field, ff := range f.FilterFuncMap { str, ok := jm[field] if !ok { continue } jm[field] = ff(str) changed = true } if changed { bs, err := json.Marshal(jm) if err != nil { fmt.Printf("Error when marshalling JSON: %q\n", err) w.err = err } else { w.data = bs } } return w } func (o *StdoutOutput) Output(w *Work) { fmt.Printf("%s\n", w.data) } func (m *Manager) Run() { var wg sync.WaitGroup ic := m.Input.Start() for w := range ic { wg.Add(1) go func(w *Work) { nw := m.Filter.Filter(w) m.Output.Output(nw) wg.Done() }(w) } wg.Wait() } func main() { ffmap := make(map[string]func(string) string, 10) ffmap["F"] = func(s string) string { return strings.ToUpper(s) } m := Manager{ Input: &StdinInput{}, Filter: &StringFilter{FilterFuncMap: ffmap}, Output: &StdoutOutput{}, } m.Run() }