1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"os"
)
type Manager struct {
Input Input
Filter Filter
Output Output
}
type Input interface {
Start() chan *Work
}
type StdinInput struct {
retchan chan *Work
}
type Filter interface {
Filter(chan *Work) chan *Work
}
type StringFilter struct {
retchan chan *Work
}
type Output interface {
Output(chan *Work)
}
type StdoutOutput struct {
}
type Work struct {
data []byte
err error
}
func (i *StdinInput) Start() chan *Work {
i.retchan = make(chan *Work, 100)
r := bufio.NewReader(os.Stdin)
go func() {
for {
bs, err := r.ReadBytes(byte('\n'))
if err == io.EOF {
break
}
if err != nil {
fmt.Printf("Error when reading input from Stdin: %q", err)
os.Exit(1)
}
i.retchan <- &Work{data: bs}
}
close(i.retchan)
}()
return i.retchan
}
func (f *StringFilter) Filter(ic chan *Work) chan *Work {
f.retchan = make(chan *Work, 100)
go func() {
for w := range ic {
dec := json.NewDecoder(bytes.NewReader(w.data))
jm := make(map[string]string, 10)
err := dec.Decode(&jm)
if err == io.EOF {
fmt.Printf("EOF jm: %v\n", jm)
f.retchan <- w
continue
}
if err != nil {
fmt.Printf("Error when decoding JSON: %q\n", err)
w.err = err
}
fmt.Printf("jm: %v\n", jm)
f.retchan <- w
}
close(f.retchan)
}()
return f.retchan
}
func (o *StdoutOutput) Output(wc chan *Work) {
for w := range wc {
fmt.Printf("%s\n", w.data)
}
}
func (m *Manager) Go() {
ic := m.Input.Start()
fc := m.Filter.Filter(ic)
m.Output.Output(fc)
}
func main() {
m := Manager{
Input: &StdinInput{},
Filter: &StringFilter{},
Output: &StdoutOutput{},
}
m.Go()
}
|