package main import ( "bufio" "bytes" "encoding/json" "fmt" "io" "net/http" "os" "strings" "sync" ) type Manager struct { Input Input Filter Filter Output Output } type Input interface { Start() chan *Work } type StdinInput struct { retchan chan *Work } type HTTPInput struct { retchan chan *Work prefix string port string } type Filter interface { Filter(*Work) *Work } type StringFilter struct { FilterFuncMap map[string]func(string) string } type Output interface { Output(*Work) } type StdoutOutput struct { } type Work struct { data []byte err error } func (w *Work) Error() error { return w.err } func (i *StdinInput) Start() chan *Work { i.retchan = make(chan *Work, 100) r := bufio.NewReader(os.Stdin) go func() { for { bs, err := r.ReadBytes(byte('\n')) if err == io.EOF { break } if err != nil { fmt.Printf("Error when reading input from Stdin: %q", err) os.Exit(1) } i.retchan <- &Work{data: bs} } close(i.retchan) }() return i.retchan } func (hi *HTTPInput) httphandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { fmt.Printf("Expected POST method was: %q\n", r.Method) return } err := r.ParseForm() if err != nil { fmt.Printf("Error when parsing HTTP request form values: %q\n", err) return } workdata := r.PostForm["work"] fmt.Printf("Form %#v\n", r.Form) if workdata == nil { fmt.Printf("Work form field was empty.\n") return } fmt.Printf("Work form field was: %q\n", workdata) hi.retchan <- &Work{data: []byte(workdata[0])} } func (hi *HTTPInput) Start() chan *Work { hi.retchan = make(chan *Work, 100) go func() { http.HandleFunc("/"+hi.prefix, hi.httphandler) err := http.ListenAndServe(hi.port, nil) fmt.Printf("Error when serving HTTP: %q\n", err) close(hi.retchan) }() return hi.retchan } func NewStringFilter(ffmap map[string]func(string) string) *StringFilter { return &StringFilter{FilterFuncMap: ffmap} } func NewHTTPInput(prefix, port string) *HTTPInput { return &HTTPInput{prefix: prefix, port: port} } func (f *StringFilter) Filter(w *Work) *Work { dec := json.NewDecoder(bytes.NewReader(w.data)) jm := make(map[string]string, 10) err := dec.Decode(&jm) if err == io.EOF { fmt.Printf("EOF jm: %v\n", jm) return w } if err != nil { fmt.Printf("Error when decoding JSON: %q\n", err) w.err = err return w } changed := false for field, ff := range f.FilterFuncMap { str, ok := jm[field] if !ok { continue } jm[field] = ff(str) changed = true } if changed { bs, err := json.Marshal(jm) if err != nil { fmt.Printf("Error when marshalling JSON: %q\n", err) w.err = err } else { w.data = bs } } return w } func (o *StdoutOutput) Output(w *Work) { fmt.Printf("%s\n", w.data) } func (m *Manager) Run() { var wg sync.WaitGroup ic := m.Input.Start() for w := range ic { wg.Add(1) go func(w *Work) { nw := m.Filter.Filter(w) err := nw.Error() if err != nil { fmt.Printf("Got an error when processing Work: %q\n", err) } m.Output.Output(nw) wg.Done() }(w) } wg.Wait() } func main() { ffmap := make(map[string]func(string) string, 10) ffmap["F"] = func(s string) string { return strings.ToUpper(s) } m := Manager{ Input: NewHTTPInput("", ":8080"), Filter: NewStringFilter(ffmap), Output: &StdoutOutput{}, } m.Run() }