diff --git a/cmd/coord-server/connection.go b/cmd/coord-server/connection.go index 6712477e5f618d48165703b91998515098a9a26a..54d1954b92aaf6f42631df4cd312cbc6e20c32a2 100644 --- a/cmd/coord-server/connection.go +++ b/cmd/coord-server/connection.go @@ -3,7 +3,6 @@ package main import ( "bufio" "encoding/json" - "fmt" "io" "strconv" "sync" @@ -69,7 +68,7 @@ func (c *Connection) serve() { var request protocol.Request bytes := scanner.Bytes() if err := json.Unmarshal(bytes, &request); err != nil { - c.ctl <- fmt.Errorf("cannot parse `%s`: %s", bytes, err) + c.ctl <- protocol.Errorf(0,"cannot parse `%s`: %s", bytes, err) } else { c.Debugf("received request %v", request) c.ctl <- request @@ -84,9 +83,9 @@ func (c *Connection) serve() { if cmd.Record != nil { if err := c.stream.Append(*cmd.Record); err == nil { - c.ctl <- protocol.Recorded() + c.ctl <- protocol.Recorded(cmd.ID) } else { - c.ctl <- err + c.ctl <- protocol.Error(cmd.ID, err) } } else if cmd.Subscribe != nil { @@ -96,43 +95,40 @@ func (c *Connection) serve() { serial += 1 id := protocol.SubscriptionID(strconv.FormatUint(serial, 10)) subscriptions[id] = sub - c.ctl <- protocol.Subscribed(id) + c.ctl <- protocol.Subscribed(cmd.ID, id) c.Debugf("new subscribed: %s", id) } else { - c.ctl <- err + c.ctl <- protocol.Error(cmd.ID, err) } } else if cmd.Unsubscribe != nil { if sub, exists := subscriptions[*cmd.Unsubscribe]; exists { if err := sub.Close(); err == nil { - c.ctl <- protocol.Unsubscribed(*cmd.Unsubscribe) + c.ctl <- protocol.Unsubscribed(cmd.ID, *cmd.Unsubscribe) } else { - c.ctl <- err + c.ctl <- protocol.Error(cmd.ID, err) } } else { - c.ctl <- fmt.Errorf("unknown subscription: %s", sub) + c.ctl <- protocol.Errorf(cmd.ID, "unknown subscription: %s", sub) } } else { - c.ctl <- fmt.Errorf("cannot handle request: %v", cmd) + c.ctl <- protocol.Errorf(cmd.ID, "cannot handle request: %v", cmd) } - case error: - c.Warning(cmd.Error()) - c.ctl <- protocol.Error(cmd) - case protocol.Response: if bytes, err := json.Marshal(cmd); err != nil { c.Errorf("could not marshal response %#v: %s", cmd, err) } else if _, err := c.conn.Write(append(bytes, '\n')); err != nil { c.Errorf("could not send response `%s`: %s", bytes, err) } else { - c.Debugf("sent response %v", cmd) + if cmd.Error != nil { + c.Noticef("sent error response %v", cmd) + } else { + c.Debugf("sent response %v", cmd) + } } - case nil: - // Pass - default: c.Errorf("unknown connection command: %#v", cmd) } diff --git a/cmd/coord-server/main.go b/cmd/coord-server/main.go index d4b9d24e6c0e0bfade7cb6c9a55190f3bccecc94..9c025e46fa87bb708739e660611230e45ae7f7fb 100644 --- a/cmd/coord-server/main.go +++ b/cmd/coord-server/main.go @@ -7,6 +7,7 @@ import ( "os" "gitlab.irstea.fr/guillaume.perreal/coord/lib/log" + "gitlab.irstea.fr/guillaume.perreal/coord/lib/protocol" ) type ( @@ -33,7 +34,7 @@ func main() { services.Append(stream) for _, arg := range ctx.Args() { - var addr Addr + var addr protocol.Addr if err := addr.Decode(arg); err == nil { name := addr.String() listener, err := net.Listen(addr.Network(), addr.Addr.String()) diff --git a/cmd/coord-server/utils.go b/cmd/coord-server/utils.go index e9e0d3552ac0145fcca673df8b899d7c72f320a7..aee441a3b733a7973c8b6a33926024aa8915b9fa 100644 --- a/cmd/coord-server/utils.go +++ b/cmd/coord-server/utils.go @@ -9,10 +9,6 @@ import ( ) type ( - Addr struct { - net.Addr - } - Errors []error Closers struct { @@ -76,19 +72,3 @@ func (c *Closers) Close() (err error) { c.done.Wait() return } - -func (a Addr) String() string { - return a.Addr.Network() + "://" + a.Addr.String() -} - -func (a *Addr) Decode(s string) (err error) { - if strings.HasPrefix(s, "tcp://") { - a.Addr, err = net.ResolveTCPAddr("tcp", s[6:]) - return - } - if strings.HasPrefix(s, "unix://") { - a.Addr, err = net.ResolveUnixAddr("unix", s[7:]) - return - } - return fmt.Errorf("cannot parse address: %s", s) -} diff --git a/lib/protocol/addr.go b/lib/protocol/addr.go new file mode 100644 index 0000000000000000000000000000000000000000..bdd0cd62a78dd57792a1825f2f7166bc0ea38e12 --- /dev/null +++ b/lib/protocol/addr.go @@ -0,0 +1,29 @@ +package protocol + +import ( + "fmt" + "net" + "strings" +) + +type ( + Addr struct { + net.Addr + } +) + +func (a Addr) String() string { + return a.Addr.Network() + "://" + a.Addr.String() +} + +func (a *Addr) Decode(s string) (err error) { + if strings.HasPrefix(s, "tcp://") { + a.Addr, err = net.ResolveTCPAddr("tcp", s[6:]) + return + } + if strings.HasPrefix(s, "unix://") { + a.Addr, err = net.ResolveUnixAddr("unix", s[7:]) + return + } + return fmt.Errorf("cannot parse address: %s", s) +} diff --git a/lib/protocol/client/client.go b/lib/protocol/client/client.go new file mode 100644 index 0000000000000000000000000000000000000000..d3782a1aefb5ca69c6bff899ff29d2d77a5b3cc5 --- /dev/null +++ b/lib/protocol/client/client.go @@ -0,0 +1,110 @@ +package client + +import ( + "bufio" + "encoding/json" + "io" + "net" + "sync" + + "gitlab.irstea.fr/guillaume.perreal/coord/lib/protocol" +) + +type ( + Client struct { + conn io.ReadWriteCloser + ctl chan interface{} + stop sync.Once + done sync.WaitGroup + } + + requestCmd struct { + protocol.Request + resultCh chan<- requestResult + } + + requestResult struct { + protocol.Response + error + } +) + +func Dial(addr net.Addr) (*Client, error) { + conn, err := net.Dial(addr.Network(), addr.String()) + if err != nil { + return nil, err + } + return NewClient(conn), nil +} + +func NewClient(conn io.ReadWriteCloser) *Client { + c := &Client{conn: conn, ctl: make(chan interface{}, 5)} + c.done.Add(2) + go c.receive() + go c.loop() + return c +} + +func (c *Client) receive() { + defer func() { + c.done.Done() + }() + + scanner := bufio.NewScanner(c.conn) + + for scanner.Scan() { + var response protocol.Response + if err := json.Unmarshal(scanner.Bytes(), &response); err == nil { + c.ctl <- response + } else { + + } + } +} + +func (c *Client) loop() { + defer func() { + c.done.Done() + }() + + pending := make(map[protocol.RequestID]chan<- requestResult) + generator := protocol.RequestIDGenerator(0) + + for untypedCmd := range c.ctl { + switch cmd := untypedCmd.(type) { + case requestCmd: + cmd.ID = generator.Next() + bytes, err := json.Marshal(cmd.Request) + if err == nil { + _, err = c.conn.Write(append(bytes, '\n')) + } + if err == nil { + pending[cmd.ID] = cmd.resultCh + } else { + cmd.resultCh <- requestResult{error: err} + close(cmd.resultCh) + } + + case protocol.Response: + if cmd.Notification != nil { + } else if ch, found := pending[cmd.ID]; found { + ch <- requestResult{Response: cmd} + close(ch) + delete(pending, cmd.ID) + } + } + } +} + +func (c *Client) Close() error { + return nil +} + +func (c *Client) Request(request protocol.Request) (protocol.Response, error) { + ch := make(chan requestResult, 1) + c.ctl <- requestCmd{request, ch} + result := <-ch + return result.Response, result.error +} + + diff --git a/lib/protocol/types.go b/lib/protocol/types.go index b1f4f9d64af686827e8d814d9f4f6615abd8804b..3283eab53f1ebdd933371e198552a05671132516 100644 --- a/lib/protocol/types.go +++ b/lib/protocol/types.go @@ -13,13 +13,19 @@ type ( SubscriptionID string + RequestID uint64 + + RequestIDGenerator uint64 + Request struct { + ID RequestID `json:"id,omitempty"` Record *Event `json:"record,omitempty"` Subscribe *TopicID `json:"subscribe,omitempty"` Unsubscribe *SubscriptionID `json:"unsubscribe,omitempty"` } Response struct { + ID RequestID `json:"id,omitempty"` Recorded bool `json:"recorded,omitempty"` Subscribed *SubscriptionID `json:"subscribed,omitempty"` Unsubscribed *SubscriptionID `json:"unsubscribed,omitempty"` @@ -32,6 +38,13 @@ func (e Event) String() string { return fmt.Sprintf("{topic=%q, payload=%q}", string(e.Topic), string(e.Payload)) } +var DefaultRequestIDGenerator = RequestIDGenerator(0) + +func (g *RequestIDGenerator) Next() RequestID { + *g += 1 + return RequestID(*g) +} + func (s SubscriptionID) String() string { return string(s) } @@ -56,9 +69,13 @@ func (r Request) GoString() string { } } -func Record(ev Event) Request { return Request{Record: &ev} } -func Subscribe(topic TopicID) Request { return Request{Subscribe: &topic} } -func Unsubscribe(id SubscriptionID) Request { return Request{Unsubscribe: &id} } +func Record(ev Event) Request { return Request{ID: DefaultRequestIDGenerator.Next(), Record: &ev} } +func Subscribe(topic TopicID) Request { + return Request{ID: DefaultRequestIDGenerator.Next(), Subscribe: &topic} +} +func Unsubscribe(id SubscriptionID) Request { + return Request{ID: DefaultRequestIDGenerator.Next(), Unsubscribe: &id} +} func (r Response) String() string { if bytes, err := json.Marshal(r); err == nil { @@ -84,12 +101,28 @@ func (r Response) GoString() string { } } -func Recorded() Response { return Response{Recorded: true} } -func Subscribed(id SubscriptionID) Response { return Response{Subscribed: &id} } -func Unsubscribed(id SubscriptionID) Response { return Response{Unsubscribed: &id} } -func Notification(ev Event) Response { return Response{Notification: &ev} } +func Recorded(id RequestID) Response { + return Response{ID: id, Recorded: true} +} + +func Subscribed(id RequestID, sub SubscriptionID) Response { + return Response{ID: id, Subscribed: &sub} +} + +func Unsubscribed(id RequestID, sub SubscriptionID) Response { + return Response{ID: id, Unsubscribed: &sub} +} + +func Notification(ev Event) Response { + return Response{Notification: &ev} +} -func Error(err error) Response { +func Error(id RequestID, err error) Response { message := err.Error() - return Response{Error: &message} + return Response{ID: id, Error: &message} +} + +func Errorf(id RequestID, template string, values ...interface{}) Response { + message := fmt.Sprintf(template, values...) + return Response{ID: id, Error: &message} }