summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--filter/string.go56
-rw-r--r--input/http.go45
-rw-r--r--input/stdin.go36
-rw-r--r--main.go22
-rw-r--r--manager.go191
-rw-r--r--output/stdout.go14
-rw-r--r--types.go48
-rw-r--r--work/work.go10
8 files changed, 231 insertions, 191 deletions
diff --git a/filter/string.go b/filter/string.go
new file mode 100644
index 0000000..cb44d75
--- /dev/null
+++ b/filter/string.go
@@ -0,0 +1,56 @@
+package filter
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+
+ "github.com/Shugyousha/stasher/work"
+)
+
+type StringFilter struct {
+ FilterFuncMap map[string]func(string) string
+}
+
+func NewStringFilter(ffmap map[string]func(string) string) *StringFilter {
+ return &StringFilter{FilterFuncMap: ffmap}
+}
+
+func (f *StringFilter) Filter(w *work.Work) *work.Work {
+ dec := json.NewDecoder(bytes.NewReader(w.Data))
+ jm := make(map[string]string, 10)
+
+ err := dec.Decode(&jm)
+ if err == io.EOF {
+ fmt.Printf("EOF jm: %v\n", jm)
+ return w
+ }
+ if err != nil {
+ fmt.Printf("Error when decoding JSON: %q\n", err)
+ w.Err = err
+ return w
+ }
+
+ changed := false
+ for field, ff := range f.FilterFuncMap {
+ str, ok := jm[field]
+ if !ok {
+ continue
+ }
+
+ jm[field] = ff(str)
+ changed = true
+ }
+ if changed {
+ bs, err := json.Marshal(jm)
+ if err != nil {
+ fmt.Printf("Error when marshalling JSON: %q\n", err)
+ w.Err = err
+ } else {
+ w.Data = bs
+ }
+ }
+
+ return w
+}
diff --git a/input/http.go b/input/http.go
new file mode 100644
index 0000000..285f1f6
--- /dev/null
+++ b/input/http.go
@@ -0,0 +1,45 @@
+package input
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net/http"
+
+ "github.com/Shugyousha/stasher/work"
+)
+
+type HTTPInput struct {
+ retchan chan *work.Work
+ prefix string
+ port string
+}
+
+func NewHTTPInput(prefix, port string) *HTTPInput {
+ return &HTTPInput{prefix: prefix, port: port}
+}
+
+func (hi *HTTPInput) httphandler(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "POST" {
+ fmt.Printf("Expected POST method was: %q\n", r.Method)
+ return
+ }
+ all, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ fmt.Printf("Error when reading HTTP request body: %q\n", err)
+ return
+ }
+ hi.retchan <- &work.Work{Data: all}
+}
+
+func (hi *HTTPInput) Start() chan *work.Work {
+ hi.retchan = make(chan *work.Work, 100)
+
+ go func() {
+ http.HandleFunc("/"+hi.prefix, hi.httphandler)
+ err := http.ListenAndServe(hi.port, nil)
+ fmt.Printf("Error when serving HTTP: %q\n", err)
+ close(hi.retchan)
+ }()
+
+ return hi.retchan
+}
diff --git a/input/stdin.go b/input/stdin.go
new file mode 100644
index 0000000..5acfa5e
--- /dev/null
+++ b/input/stdin.go
@@ -0,0 +1,36 @@
+package input
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+
+ "github.com/Shugyousha/stasher/work"
+)
+
+type StdinInput struct {
+ retchan chan *work.Work
+}
+
+func (i *StdinInput) Start() chan *work.Work {
+ i.retchan = make(chan *work.Work, 100)
+ r := bufio.NewReader(os.Stdin)
+
+ go func() {
+ for {
+ bs, err := r.ReadBytes(byte('\n'))
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ fmt.Printf("Error when reading input from Stdin: %q", err)
+ os.Exit(1)
+ }
+ i.retchan <- &work.Work{Data: bs}
+ }
+ close(i.retchan)
+ }()
+
+ return i.retchan
+}
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..7d60fb5
--- /dev/null
+++ b/main.go
@@ -0,0 +1,22 @@
+package main
+
+import (
+ "strings"
+
+ "github.com/Shugyousha/stasher/filter"
+ "github.com/Shugyousha/stasher/input"
+ "github.com/Shugyousha/stasher/output"
+)
+
+func main() {
+ ffmap := make(map[string]func(string) string, 10)
+ ffmap["F"] = func(s string) string { return strings.ToUpper(s) }
+
+ m := Manager{
+ Input: input.NewHTTPInput("", ":8080"),
+ Filter: filter.NewStringFilter(ffmap),
+ Output: &output.StdoutOutput{},
+ }
+
+ m.Run()
+}
diff --git a/manager.go b/manager.go
deleted file mode 100644
index 2029552..0000000
--- a/manager.go
+++ /dev/null
@@ -1,191 +0,0 @@
-package main
-
-import (
- "bufio"
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "os"
- "strings"
- "sync"
-)
-
-type Manager struct {
- Input Input
- Filter Filter
- Output Output
-}
-
-type Input interface {
- Start() chan *Work
-}
-
-type StdinInput struct {
- retchan chan *Work
-}
-
-type HTTPInput struct {
- retchan chan *Work
- prefix string
- port string
-}
-
-type Filter interface {
- Filter(*Work) *Work
-}
-
-type StringFilter struct {
- FilterFuncMap map[string]func(string) string
-}
-
-type Output interface {
- Output(*Work)
-}
-
-type StdoutOutput struct {
-}
-
-type Work struct {
- data []byte
- err error
-}
-
-func (w *Work) Error() error {
- return w.err
-}
-
-func (i *StdinInput) Start() chan *Work {
- i.retchan = make(chan *Work, 100)
- r := bufio.NewReader(os.Stdin)
-
- go func() {
- for {
- bs, err := r.ReadBytes(byte('\n'))
- if err == io.EOF {
- break
- }
- if err != nil {
- fmt.Printf("Error when reading input from Stdin: %q", err)
- os.Exit(1)
- }
- i.retchan <- &Work{data: bs}
- }
- close(i.retchan)
- }()
-
- return i.retchan
-}
-
-func (hi *HTTPInput) httphandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != "POST" {
- fmt.Printf("Expected POST method was: %q\n", r.Method)
- return
- }
- all, err := ioutil.ReadAll(r.Body)
- if err != nil {
- fmt.Printf("Error when reading HTTP request body: %q\n", err)
- return
- }
- hi.retchan <- &Work{data: all}
-}
-
-func (hi *HTTPInput) Start() chan *Work {
- hi.retchan = make(chan *Work, 100)
-
- go func() {
- http.HandleFunc("/"+hi.prefix, hi.httphandler)
- err := http.ListenAndServe(hi.port, nil)
- fmt.Printf("Error when serving HTTP: %q\n", err)
- close(hi.retchan)
- }()
-
- return hi.retchan
-}
-
-func NewStringFilter(ffmap map[string]func(string) string) *StringFilter {
- return &StringFilter{FilterFuncMap: ffmap}
-}
-
-func NewHTTPInput(prefix, port string) *HTTPInput {
- return &HTTPInput{prefix: prefix, port: port}
-}
-
-func (f *StringFilter) Filter(w *Work) *Work {
- dec := json.NewDecoder(bytes.NewReader(w.data))
- jm := make(map[string]string, 10)
-
- err := dec.Decode(&jm)
- if err == io.EOF {
- fmt.Printf("EOF jm: %v\n", jm)
- return w
- }
- if err != nil {
- fmt.Printf("Error when decoding JSON: %q\n", err)
- w.err = err
- return w
- }
-
- changed := false
- for field, ff := range f.FilterFuncMap {
- str, ok := jm[field]
- if !ok {
- continue
- }
-
- jm[field] = ff(str)
- changed = true
- }
- if changed {
- bs, err := json.Marshal(jm)
- if err != nil {
- fmt.Printf("Error when marshalling JSON: %q\n", err)
- w.err = err
- } else {
- w.data = bs
- }
- }
-
- return w
-}
-
-func (o *StdoutOutput) Output(w *Work) {
- fmt.Printf("%s\n", w.data)
-}
-
-func (m *Manager) Run() {
- var wg sync.WaitGroup
-
- ic := m.Input.Start()
- for w := range ic {
- wg.Add(1)
-
- go func(w *Work) {
- nw := m.Filter.Filter(w)
- err := nw.Error()
- if err != nil {
- fmt.Printf("Got an error when processing Work: %q\n", err)
- }
- m.Output.Output(nw)
-
- wg.Done()
- }(w)
- }
-
- wg.Wait()
-}
-
-func main() {
- ffmap := make(map[string]func(string) string, 10)
- ffmap["F"] = func(s string) string { return strings.ToUpper(s) }
-
- m := Manager{
- Input: NewHTTPInput("", ":8080"),
- Filter: NewStringFilter(ffmap),
- Output: &StdoutOutput{},
- }
-
- m.Run()
-}
diff --git a/output/stdout.go b/output/stdout.go
new file mode 100644
index 0000000..5cb753e
--- /dev/null
+++ b/output/stdout.go
@@ -0,0 +1,14 @@
+package output
+
+import (
+ "fmt"
+
+ "github.com/Shugyousha/stasher/work"
+)
+
+type StdoutOutput struct {
+}
+
+func (o *StdoutOutput) Output(w *work.Work) {
+ fmt.Printf("%s\n", w.Data)
+}
diff --git a/types.go b/types.go
new file mode 100644
index 0000000..abd2658
--- /dev/null
+++ b/types.go
@@ -0,0 +1,48 @@
+package main
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/Shugyousha/stasher/work"
+)
+
+type Input interface {
+ Start() chan *work.Work
+}
+
+type Filter interface {
+ Filter(*work.Work) *work.Work
+}
+
+type Output interface {
+ Output(*work.Work)
+}
+
+type Manager struct {
+ Input Input
+ Filter Filter
+ Output Output
+}
+
+func (m *Manager) Run() {
+ var wg sync.WaitGroup
+
+ ic := m.Input.Start()
+ for w := range ic {
+ wg.Add(1)
+
+ go func(w *work.Work) {
+ nw := m.Filter.Filter(w)
+ err := nw.Error()
+ if err != nil {
+ fmt.Printf("Got an error when processing Work: %q\n", err)
+ }
+ m.Output.Output(nw)
+
+ wg.Done()
+ }(w)
+ }
+
+ wg.Wait()
+}
diff --git a/work/work.go b/work/work.go
new file mode 100644
index 0000000..07461ea
--- /dev/null
+++ b/work/work.go
@@ -0,0 +1,10 @@
+package work
+
+type Work struct {
+ Data []byte
+ Err error
+}
+
+func (w *Work) Error() error {
+ return w.Err
+}