summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--filter/http/http.go4
-rw-r--r--filter/str/string.go4
-rw-r--r--input/http/http.go5
-rw-r--r--input/stdin/stdin.go7
-rw-r--r--manager/manager.go4
-rw-r--r--output/http/http.go2
-rw-r--r--work/work.go8
7 files changed, 23 insertions, 11 deletions
diff --git a/filter/http/http.go b/filter/http/http.go
index e3d447b..3143718 100644
--- a/filter/http/http.go
+++ b/filter/http/http.go
@@ -26,14 +26,14 @@ func New(kv map[string]string) filter.Filter {
func (hf *HTTPFilter) Filter(w *work.Work) *work.Work {
resp, err := http.Post(hf.url, "application/json", bytes.NewReader(w.Data))
if err != nil {
- w.Err = err
+ w.SetError(err)
return w
}
defer resp.Body.Close()
filtered, err := ioutil.ReadAll(resp.Body)
if err != nil {
- w.Err = err
+ w.SetError(err)
return w
}
diff --git a/filter/str/string.go b/filter/str/string.go
index 37f49f6..f6087be 100644
--- a/filter/str/string.go
+++ b/filter/str/string.go
@@ -46,7 +46,7 @@ func (f *StringFilter) Filter(w *work.Work) *work.Work {
}
if err != nil {
fmt.Fprintf(os.Stderr, "Error when decoding JSON: %q\n", err)
- w.Err = err
+ w.SetError(err)
return w
}
@@ -64,7 +64,7 @@ func (f *StringFilter) Filter(w *work.Work) *work.Work {
bs, err := json.Marshal(jm)
if err != nil {
fmt.Fprintf(os.Stderr, "Error when marshaling JSON: %q\n", err)
- w.Err = err
+ w.SetError(err)
} else {
w.Data = bs
}
diff --git a/input/http/http.go b/input/http/http.go
index 87ec0f9..61e2c39 100644
--- a/input/http/http.go
+++ b/input/http/http.go
@@ -50,10 +50,13 @@ func (hi *HTTPInput) httphandler(w http.ResponseWriter, r *http.Request) {
return
}
all, err := ioutil.ReadAll(r.Body)
+ wrk := &work.Work{Data: all}
if err != nil {
fmt.Printf("Error when reading HTTP request body: %q\n", err)
+ wrk.SetError(err)
}
- hi.retchan <- &work.Work{Data: all, Err: err}
+
+ hi.retchan <- wrk
}
func (hi *HTTPInput) Start() chan *work.Work {
diff --git a/input/stdin/stdin.go b/input/stdin/stdin.go
index 394b952..4421741 100644
--- a/input/stdin/stdin.go
+++ b/input/stdin/stdin.go
@@ -33,10 +33,15 @@ func (i *StdinInput) Start() chan *work.Work {
if err == io.EOF {
break
}
+
+ w := &work.Work{Data: bs}
+
if err != nil {
fmt.Printf("Error when reading input from Stdin: %q", err)
+ w.SetError(err)
}
- i.retchan <- &work.Work{Data: bs, Err: err}
+
+ i.retchan <- w
}
close(i.retchan)
}()
diff --git a/manager/manager.go b/manager/manager.go
index 5bbd49f..2a58aab 100644
--- a/manager/manager.go
+++ b/manager/manager.go
@@ -21,8 +21,8 @@ func (m *Manager) Run() {
ic := m.Input.Start()
for w := range ic {
- if w.Err != nil {
- fmt.Printf("Got an error when getting Work input: %q\n", w.Err)
+ if w.Error() != nil {
+ fmt.Printf("Got an error when getting Work input: %q\n", w.Error())
continue
}
wg.Add(1)
diff --git a/output/http/http.go b/output/http/http.go
index 158f091..8d17622 100644
--- a/output/http/http.go
+++ b/output/http/http.go
@@ -25,7 +25,7 @@ func New(kv map[string]string) output.Output {
func (h *HttpOutput) Output(w *work.Work) error {
_, err := http.Post(h.url, "application/json", bytes.NewReader(w.Data))
if err != nil {
- w.Err = err
+ w.SetError(err)
}
return err
diff --git a/work/work.go b/work/work.go
index 07461ea..8d53153 100644
--- a/work/work.go
+++ b/work/work.go
@@ -2,9 +2,13 @@ package work
type Work struct {
Data []byte
- Err error
+ err error
}
func (w *Work) Error() error {
- return w.Err
+ return w.err
+}
+
+func (w *Work) SetError(err error) {
+ w.err = err
}