Commit 8b12cf44 authored by Guillaume Perréal's avatar Guillaume Perréal
Browse files

Working !

No related merge requests found
Showing with 368 additions and 127 deletions
+368 -127
......@@ -2,18 +2,9 @@ package main
import (
"github.com/mkideal/cli"
"net"
)
type (
TCPAddr struct {
*net.TCPAddr
}
UnixAddr struct {
*net.UnixAddr
}
LoggerConfig struct {
Verbose []bool `cli:"v,verbose" usage:"increase the log verbosity"`
Quiet bool `cli:"q,quiet" usage:"decrease the log verbosity"`
......@@ -21,19 +12,6 @@ type (
ServerConfig struct {
cli.Helper
TCP []TCPAddr `cli:"t,tcp" usage:"serve on TCP port"`
Unix []UnixAddr `cli:"u,unix" usage:"serve on unix socket"`
Console bool `cli:"C,console" usage:"open a connection to the console"`
LoggerConfig
}
)
func (t *TCPAddr) Decode(s string) (err error) {
t.TCPAddr, err = net.ResolveTCPAddr("tcp", s)
return
}
func (t *UnixAddr) Decode(s string) (err error) {
t.UnixAddr, err = net.ResolveUnixAddr("unix", s)
return
}
......@@ -14,55 +14,67 @@ import (
type (
Connection struct {
conn io.ReadWriteCloser
conn io.ReadWriteCloser
stream *Stream
ctl chan interface{}
ctl chan interface{}
done sync.WaitGroup
stop sync.Once
*log.Logger
done sync.WaitGroup
stop sync.Once
closed bool
}
)
func StartConnection(conn io.ReadWriteCloser, stream *Stream, logger *log.Logger) *Connection {
c := &Connection{conn: conn, stream: stream, ctl: make(chan interface{}), Logger: logger}
c := &Connection{conn: conn, stream: stream, ctl: make(chan interface{}, 5), Logger: logger}
c.done.Add(2)
go c.serve()
return c
}
func (c *Connection) serve() {
closed := false
serial := uint64(0)
subscriptions := make(map[protocol.SubscriptionID]*Subscription)
c.done.Add(2)
defer func() {
c.done.Done()
if err := c.conn.Close(); err != nil {
c.Infof("error while closing the connection: %s", err)
var errs Errors
for _, sub := range subscriptions {
errs.Append(sub.Close())
}
if errs.HasError() {
c.Warningf("error while closing subscriptions: %s", errs)
}
c.done.Done()
c.Debug("connection closed")
}()
go func() {
scanner := bufio.NewScanner(c.conn)
defer func() {
closed = true
if !c.closed {
if err := scanner.Err(); err != nil {
c.Errorf("error reading from socket: %s", err)
} else {
c.Debug("client closed the socket")
}
}
close(c.ctl)
c.done.Done()
}()
scanner := bufio.NewScanner(c.conn)
for !closed && scanner.Scan() {
c.Debug("waiting for requests")
for scanner.Scan() {
var request protocol.Request
if err := json.Unmarshal(scanner.Bytes(), &request); err != nil {
c.ctl <- err
bytes := scanner.Bytes()
if err := json.Unmarshal(bytes, &request); err != nil {
c.ctl <- fmt.Errorf("cannot parse `%s`: %s", bytes, err)
} else {
c.Debugf("received request %v", request)
c.ctl <- request
}
}
if err := scanner.Err(); err != nil {
c.Errorf("error reading request: %s", err)
}
}()
for untypedCmd := range c.ctl {
......@@ -71,7 +83,11 @@ func (c *Connection) serve() {
case protocol.Request:
if cmd.Record != nil {
c.ctl <- c.stream.Append(*cmd.Record)
if err := c.stream.Append(*cmd.Record); err == nil {
c.ctl <- protocol.Recorded()
} else {
c.ctl <- err
}
} else if cmd.Subscribe != nil {
if sub, err := c.stream.Subscribe(cmd.Subscribe.String(), func(event protocol.Event) {
......@@ -81,13 +97,14 @@ func (c *Connection) serve() {
id := protocol.SubscriptionID(strconv.FormatUint(serial, 10))
subscriptions[id] = sub
c.ctl <- protocol.Subscribed(id)
c.Debugf("new subscribed: %s", id)
} else {
c.ctl <- err
}
} else if cmd.Unsubscribe != nil {
if sub, exists := subscriptions[*cmd.Unsubscribe]; exists {
if err := sub.Close; err == nil {
if err := sub.Close(); err == nil {
c.ctl <- protocol.Unsubscribed(*cmd.Unsubscribe)
} else {
c.ctl <- err
......@@ -95,14 +112,9 @@ func (c *Connection) serve() {
} else {
c.ctl <- fmt.Errorf("unknown subscription: %s", sub)
}
} else if cmd.Quit != nil {
c.ctl <- protocol.GoodBye("received Quit request")
c.stop.Do(func() {
closed = true
close(c.ctl)
})
} else {
c.ctl <- fmt.Errorf("cannot handle request: %#v", cmd)
c.ctl <- fmt.Errorf("cannot handle request: %v", cmd)
}
case error:
......@@ -113,7 +125,7 @@ func (c *Connection) serve() {
if bytes, err := json.Marshal(cmd); err != nil {
c.Errorf("could not marshal response %#v: %s", cmd, err)
} else if _, err := c.conn.Write(append(bytes, '\n')); err != nil {
c.Errorf("could not send response: %s", cmd, err)
c.Errorf("could not send response `%s`: %s", bytes, err)
} else {
c.Debugf("sent response %v", cmd)
}
......@@ -126,3 +138,12 @@ func (c *Connection) serve() {
}
}
}
func (c *Connection) Close() (err error) {
c.stop.Do(func() {
c.closed = true
err = c.conn.Close()
})
c.done.Wait()
return
}
package main
import (
"net"
"time"
"gitlab.irstea.fr/guillaume.perreal/coord/lib/log"
"net"
"sync"
)
type (
Listener struct {
}
Deadliner interface {
SetDeadline(t time.Time) error
stream *Stream
listener net.Listener
done sync.WaitGroup
closed bool
*log.Logger
}
)
func RunListener(stream *Stream, listener net.Listener, logger *log.Logger) {
func Listen(stream *Stream, listener net.Listener, logger *log.Logger) *Listener {
l := &Listener{stream: stream, listener: listener, Logger: logger}
l.done.Add(1)
go l.serve()
return l
}
go func() {
defer func() {
}()
func (l *Listener) serve() {
var closers Closers
for {
conn, err := listener.Accept()
if err != nil {
continue
}
_ = conn
defer func() {
l.Info("stopped accepting connections")
if err := closers.Close(); err != nil {
l.Warningf("error while closing connections: %s", err)
}
l.done.Done()
}()
l.Info("accepting connections")
for {
netConn, err := l.listener.Accept()
if err != nil {
if netErr, ok := err.(net.Error); ok && !netErr.Temporary() {
if !l.closed {
l.Errorf("error while listening for new connections: %s", err)
}
return
}
l.Warningf("while listening for new connections: %s", err)
continue
}
name := netConn.RemoteAddr().Network() + "://" + netConn.RemoteAddr().String()
l.Infof("accepted connection from %s", name)
connLogger := l.Logger.WithField("client", name)
conn := StartConnection(netConn, l.stream, connLogger)
closers.Append(conn)
}
}
func (l *Listener) Close() error {
l.closed = true
if err := l.listener.Close(); err != nil {
return err
}
l.done.Wait()
return nil
}
package main
import (
"github.com/mkideal/cli"
"io"
"net"
"os"
"os/signal"
"syscall"
"github.com/mkideal/cli"
"github.com/thejerf/suture"
"gitlab.irstea.fr/guillaume.perreal/coord/lib/log"
)
......@@ -17,11 +14,6 @@ type (
io.ReadCloser
io.WriteCloser
}
SignalHandler struct {
signals chan os.Signal
target suture.Service
}
)
func main() {
......@@ -32,29 +24,34 @@ func main() {
logger.Debugf("configuration: %+v", conf)
stream := NewStream()
//
// for _, unixAddr := range conf.Unix {
// addr := unixAddr.UnixAddr
// spv.Add(CreateListener(addr.String(), stream, func() (net.Listener, error) {
// listener, err := net.ListenUnix("unix", addr)
// if err == nil {
// listener.SetUnlinkOnClose(true)
// }
// return listener, err
// }, logger))
// }
//
// for _, tcpAddr := range conf.TCP {
// addr := tcpAddr.TCPAddr
// spv.Add(CreateListener(addr.String(), stream, func() (net.Listener, error) {
// return net.ListenTCP("tcp", addr)
// }, logger))
// }
//
// spv.Add(&SignalHandler{make(chan os.Signal, 1), spv})
//
// spv.Serve()
var services Closers
sh := StartSignalHandler(&services)
services.Append(sh)
stream := StartStream()
services.Append(stream)
for _, arg := range ctx.Args() {
var addr Addr
if err := addr.Decode(arg); err == nil {
name := addr.String()
listener, err := net.Listen(addr.Network(), addr.Addr.String())
if err == nil {
if unixListener, isUnix := listener.(*net.UnixListener); isUnix {
unixListener.SetUnlinkOnClose(true)
}
l := Listen(stream, listener, logger.WithField("server", name))
services.Append(l)
} else {
logger.Errorf("could not listen on %s: %s", name, err.Error())
}
} else {
logger.Error(err.Error())
}
}
services.Wait()
return nil
}))
......@@ -79,25 +76,6 @@ func setupLogger(conf LoggerConfig) *log.Logger {
return log.DefaultLogger
}
func (w *SignalHandler) Serve() {
signal.Notify(w.signals, syscall.SIGTERM, syscall.SIGINT)
sig, ok := <-w.signals
if ok {
log.Criticalf("received signal '%s', shutting down", sig)
w.target.Stop()
}
}
func (w *SignalHandler) Stop() {
signal.Stop(w.signals)
close(w.signals)
}
func (w *SignalHandler) Complete() bool {
return true
}
func (c PseudoReadWriteCloser) Close() error {
if err := c.ReadCloser.Close(); err != nil {
return err
......
package main
import (
"gitlab.irstea.fr/guillaume.perreal/coord/lib/log"
"io"
"os"
"os/signal"
"sync"
"syscall"
)
type (
SignalHandler struct {
signals chan os.Signal
target io.Closer
stop sync.Once
done sync.WaitGroup
}
)
func StartSignalHandler(target io.Closer) *SignalHandler {
h := &SignalHandler{signals: make(chan os.Signal), target: target}
h.done.Add(1)
go h.serve()
return h
}
func (h *SignalHandler) serve() {
defer func() {
h.done.Done()
_ = h.target.Close()
}()
signal.Notify(h.signals, syscall.SIGTERM, syscall.SIGINT)
sig, ok := <- h.signals
if ok {
log.Criticalf("received signal '%s', shutting down", sig)
}
}
func (h *SignalHandler) Close() (err error) {
h.stop.Do(func() {
signal.Stop(h.signals)
close(h.signals)
})
h.done.Wait()
return
}
package main
import (
"fmt"
"io"
"net"
"strings"
"sync"
)
type (
Addr struct {
net.Addr
}
Errors []error
Closers struct {
closers []io.Closer
done sync.WaitGroup
stop sync.Once
}
)
func (e *Errors) Append(err error) {
if err != nil {
*e = append(*e, err)
}
}
func (e Errors) HasError() bool {
return e != nil
}
func (e Errors) Error() string {
if !e.HasError() {
return "no errors"
}
var buf strings.Builder
for i, err := range e {
if i > 0 {
_, _ = buf.WriteString(", ")
}
_, _ = buf.WriteString(err.Error())
}
return buf.String()
}
func (e Errors) String() string {
return e.Error()
}
func (c *Closers) Append(closer io.Closer) {
c.done.Add(1)
c.closers = append(c.closers, closer)
}
func (c *Closers) Wait() {
c.done.Wait()
}
func (c *Closers) Close() (err error) {
c.stop.Do(func() {
var errs Errors
for _, closer := range c.closers {
cl := closer
go func() {
defer c.done.Done()
errs.Append(cl.Close())
}()
}
if errs.HasError() {
err = errs
}
})
c.done.Wait()
return
}
func (a Addr) String() string {
return a.Addr.Network() + "://" + a.Addr.String()
}
func (a *Addr) Decode(s string) (err error) {
if strings.HasPrefix(s, "tcp://") {
a.Addr, err = net.ResolveTCPAddr("tcp", s[6:])
return
}
if strings.HasPrefix(s, "unix://") {
a.Addr, err = net.ResolveUnixAddr("unix", s[7:])
return
}
return fmt.Errorf("cannot parse address: %s", s)
}
package main
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
)
func TestErrors_Zero(t *testing.T) {
var err Errors
assert.Nil(t, err)
assert.False(t, err.HasError())
assert.Equal(t,"no errors", err.Error())
}
func TestErrors_AppendNil(t *testing.T) {
var err Errors
err.Append(nil)
assert.Nil(t, err)
assert.False(t, err.HasError())
assert.Equal(t,"no errors", err.Error())
}
func TestErrors_OneError(t *testing.T) {
var err Errors
err.Append(errors.New("test"))
assert.NotNil(t, err)
assert.True(t, err.HasError())
assert.Equal(t,"test", err.Error())
}
func TestErrors_TwoErrors(t *testing.T) {
var err Errors
err.Append(errors.New("a"))
err.Append(errors.New("b"))
assert.NotNil(t, err)
assert.True(t, err.HasError())
assert.Equal(t,"a, b", err.Error())
}
......@@ -2,12 +2,13 @@ package protocol
import (
"encoding/json"
"fmt"
)
type (
Event struct {
Topic TopicID `json:"topic"`
Payload *json.RawMessage `json:"payload"`
Topic TopicID `json:"topic"`
Payload json.RawMessage `json:"payload"`
}
SubscriptionID string
......@@ -16,7 +17,6 @@ type (
Record *Event `json:"record,omitempty"`
Subscribe *TopicID `json:"subscribe,omitempty"`
Unsubscribe *SubscriptionID `json:"unsubscribe,omitempty"`
Quit *string `json:"quit,omitempty"`
}
Response struct {
......@@ -24,25 +24,70 @@ type (
Subscribed *SubscriptionID `json:"subscribed,omitempty"`
Unsubscribed *SubscriptionID `json:"unsubscribed,omitempty"`
Notification *Event `json:"notification,omitempty"`
GoodBye *string `json:"goodBye,omitempty"`
Error *string `json:"error,omitempty"`
}
)
func (e Event) String() string {
return fmt.Sprintf("{topic=%q, payload=%q}", string(e.Topic), string(e.Payload))
}
func (s SubscriptionID) String() string {
return string(s)
}
func (r Request) String() string {
if bytes, err := json.Marshal(r); err == nil {
return string(bytes)
} else {
return err.Error()
}
}
func (r Request) GoString() string {
if r.Record != nil {
return fmt.Sprintf("protocol.Record(%v)", *r.Record)
} else if r.Subscribe != nil {
return fmt.Sprintf("protocol.Subscribe(%q)", *r.Subscribe)
} else if r.Unsubscribe != nil {
return fmt.Sprintf("protocol.Unsubscribe(%q)", *r.Unsubscribe)
} else {
return `errors.New("invalid request")`
}
}
func Record(ev Event) Request { return Request{Record: &ev} }
func Subscribe(topic TopicID) Request { return Request{Subscribe: &topic} }
func Unsubscribe(id SubscriptionID) Request { return Request{Unsubscribe: &id} }
func Quit(reason string) Request { return Request{Quit: &reason} }
func (r Response) String() string {
if bytes, err := json.Marshal(r); err == nil {
return string(bytes)
} else {
return err.Error()
}
}
func (r Response) GoString() string {
if r.Recorded {
return "protocol.Recorded()"
} else if r.Subscribed != nil {
return fmt.Sprintf("protocol.Subscribed(%q)", *r.Subscribed)
} else if r.Unsubscribed != nil {
return fmt.Sprintf("protocol.Unsubscribed(%q)", *r.Unsubscribed)
} else if r.Notification != nil {
return fmt.Sprintf("protocol.Notification(%v)", *r.Notification)
} else if r.Error != nil {
return fmt.Sprintf("protocol.Error(%v)", *r.Error)
} else {
return `errors.New("invalid response")`
}
}
func Recorded() Response { return Response{Recorded: true} }
func Subscribed(id SubscriptionID) Response { return Response{Subscribed: &id} }
func Unsubscribed(id SubscriptionID) Response { return Response{Unsubscribed: &id} }
func Notification(ev Event) Response { return Response{Notification: &ev} }
func GoodBye(reason string) Response { return Response{GoodBye: &reason} }
func Error(err error) Response {
message := err.Error()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment