diff options
-rw-r--r-- | filter/http/http.go | 4 | ||||
-rw-r--r-- | filter/str/string.go | 4 | ||||
-rw-r--r-- | input/http/http.go | 5 | ||||
-rw-r--r-- | input/stdin/stdin.go | 7 | ||||
-rw-r--r-- | manager/manager.go | 4 | ||||
-rw-r--r-- | output/http/http.go | 2 | ||||
-rw-r--r-- | work/work.go | 8 |
7 files changed, 23 insertions, 11 deletions
diff --git a/filter/http/http.go b/filter/http/http.go index e3d447b..3143718 100644 --- a/filter/http/http.go +++ b/filter/http/http.go @@ -26,14 +26,14 @@ func New(kv map[string]string) filter.Filter { func (hf *HTTPFilter) Filter(w *work.Work) *work.Work { resp, err := http.Post(hf.url, "application/json", bytes.NewReader(w.Data)) if err != nil { - w.Err = err + w.SetError(err) return w } defer resp.Body.Close() filtered, err := ioutil.ReadAll(resp.Body) if err != nil { - w.Err = err + w.SetError(err) return w } diff --git a/filter/str/string.go b/filter/str/string.go index 37f49f6..f6087be 100644 --- a/filter/str/string.go +++ b/filter/str/string.go @@ -46,7 +46,7 @@ func (f *StringFilter) Filter(w *work.Work) *work.Work { } if err != nil { fmt.Fprintf(os.Stderr, "Error when decoding JSON: %q\n", err) - w.Err = err + w.SetError(err) return w } @@ -64,7 +64,7 @@ func (f *StringFilter) Filter(w *work.Work) *work.Work { bs, err := json.Marshal(jm) if err != nil { fmt.Fprintf(os.Stderr, "Error when marshaling JSON: %q\n", err) - w.Err = err + w.SetError(err) } else { w.Data = bs } diff --git a/input/http/http.go b/input/http/http.go index 87ec0f9..61e2c39 100644 --- a/input/http/http.go +++ b/input/http/http.go @@ -50,10 +50,13 @@ func (hi *HTTPInput) httphandler(w http.ResponseWriter, r *http.Request) { return } all, err := ioutil.ReadAll(r.Body) + wrk := &work.Work{Data: all} if err != nil { fmt.Printf("Error when reading HTTP request body: %q\n", err) + wrk.SetError(err) } - hi.retchan <- &work.Work{Data: all, Err: err} + + hi.retchan <- wrk } func (hi *HTTPInput) Start() chan *work.Work { diff --git a/input/stdin/stdin.go b/input/stdin/stdin.go index 394b952..4421741 100644 --- a/input/stdin/stdin.go +++ b/input/stdin/stdin.go @@ -33,10 +33,15 @@ func (i *StdinInput) Start() chan *work.Work { if err == io.EOF { break } + + w := &work.Work{Data: bs} + if err != nil { fmt.Printf("Error when reading input from Stdin: %q", err) + w.SetError(err) } - i.retchan <- &work.Work{Data: bs, Err: err} + + i.retchan <- w } close(i.retchan) }() diff --git a/manager/manager.go b/manager/manager.go index 5bbd49f..2a58aab 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -21,8 +21,8 @@ func (m *Manager) Run() { ic := m.Input.Start() for w := range ic { - if w.Err != nil { - fmt.Printf("Got an error when getting Work input: %q\n", w.Err) + if w.Error() != nil { + fmt.Printf("Got an error when getting Work input: %q\n", w.Error()) continue } wg.Add(1) diff --git a/output/http/http.go b/output/http/http.go index 158f091..8d17622 100644 --- a/output/http/http.go +++ b/output/http/http.go @@ -25,7 +25,7 @@ func New(kv map[string]string) output.Output { func (h *HttpOutput) Output(w *work.Work) error { _, err := http.Post(h.url, "application/json", bytes.NewReader(w.Data)) if err != nil { - w.Err = err + w.SetError(err) } return err diff --git a/work/work.go b/work/work.go index 07461ea..8d53153 100644 --- a/work/work.go +++ b/work/work.go @@ -2,9 +2,13 @@ package work type Work struct { Data []byte - Err error + err error } func (w *Work) Error() error { - return w.Err + return w.err +} + +func (w *Work) SetError(err error) { + w.err = err } |