Working communication code using socket

Sockets are sooo much better, waow
I like easy code

Closes #11 (even if I didn't implement all the proposal)
Closes #16
This commit is contained in:
ppom 2023-05-04 01:01:22 +02:00
parent 4f3d3952f0
commit 1d95c7bef6
3 changed files with 56 additions and 118 deletions

View File

@ -2,13 +2,10 @@ package app
import ( import (
"encoding/gob" "encoding/gob"
"errors"
"fmt" "fmt"
"log" "log"
"math/rand" "net"
"os" "os"
"path"
"time"
) )
const ( const (
@ -18,7 +15,6 @@ const (
type Request struct { type Request struct {
Request int Request int
Id int
Pattern string Pattern string
} }
@ -27,65 +23,27 @@ type Response struct {
Actions ReadableMap Actions ReadableMap
} }
// Runtime files: func SocketPath() string {
// /run/user/<uid>/reaction/reaction.pipe return fmt.Sprintf("/run/user/%v/reaction.sock", os.Getuid())
// /run/user/<uid>/reaction/id.response
func RuntimeDirectory() string {
return fmt.Sprintf("/run/user/%v/reaction/", os.Getuid())
}
func PipePath() string {
return path.Join(RuntimeDirectory(), "reaction.pipe")
}
func (r Request) ResponsePath() string {
return path.Join(RuntimeDirectory(), string(r.Id))
}
func Send(data Request) {
pipePath := PipePath()
pipe, err := os.OpenFile(pipePath, os.O_RDWR, os.ModeNamedPipe)
if err != nil {
log.Println("Failed to open", pipePath, ":", err)
log.Fatalln("Is the reaction daemon running? Does the CLI run as the same user?")
}
log.Println("DEBUG opening ok, encoding...")
enc := gob.NewEncoder(pipe)
err = enc.Encode(data)
if err != nil {
log.Fatalf("Failed to write to %s: %s", pipePath, err)
}
} }
func SendAndRetrieve(data Request) Response { func SendAndRetrieve(data Request) Response {
if data.Id == 0 { conn, err := net.Dial("unix", SocketPath())
data.Id = rand.Int()
}
log.Println("DEBUG sending:", data)
Send(data)
responsePath := data.ResponsePath()
d, _ := time.ParseDuration("100ms")
for tries := 20; tries > 0; tries-- {
log.Println("DEBUG waiting for answer...")
file, err := os.Open(responsePath)
if errors.Is(err, os.ErrNotExist) {
time.Sleep(d)
continue
}
defer os.Remove(responsePath)
if err != nil { if err != nil {
log.Fatalf("Error opening daemon answer: %s", err) log.Fatalln("Error opening connection top daemon:", err)
} }
err = gob.NewEncoder(conn).Encode(data)
if err != nil {
log.Fatalln("Can't send message:", err)
}
var response Response var response Response
err = gob.NewDecoder(file).Decode(&response) err = gob.NewDecoder(conn).Decode(&response)
if err != nil { if err != nil {
log.Fatalf("Error parsing daemon answer: %s", err) log.Fatalln("Invalid answer from daemon:", err)
} }
return response return response
}
log.Fatalln("Timeout while waiting answer from the daemon")
return Response{errors.New("unreachable code"), nil}
} }
func usage(err string) { func usage(err string) {
@ -96,7 +54,7 @@ func usage(err string) {
func CLI() { func CLI() {
if len(os.Args) <= 1 { if len(os.Args) <= 1 {
response := SendAndRetrieve(Request{Query, 0, ""}) response := SendAndRetrieve(Request{Query, ""})
if response.Err != nil { if response.Err != nil {
log.Fatalln("Received error from daemon:", response.Err) log.Fatalln("Received error from daemon:", response.Err)
} }
@ -108,7 +66,7 @@ func CLI() {
if len(os.Args) != 3 { if len(os.Args) != 3 {
usage("flush takes one <PATTERN> argument") usage("flush takes one <PATTERN> argument")
} }
response := SendAndRetrieve(Request{Flush, 0, os.Args[2]}) response := SendAndRetrieve(Request{Flush, os.Args[2]})
if response.Err != nil { if response.Err != nil {
log.Fatalln("Received error from daemon:", response.Err) log.Fatalln("Received error from daemon:", response.Err)
} }

View File

@ -2,12 +2,10 @@ package app
import ( import (
"encoding/gob" "encoding/gob"
"errors"
"log" "log"
"net"
"os" "os"
"sync" "sync"
"syscall"
"time"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
@ -109,78 +107,60 @@ func (r ReadableMap) ToString() string {
return string(text) return string(text)
} }
// Pipe-related, server-related functions // Socket-related, server-related functions
func createOpenPipe() *os.File { func createOpenSocket() net.Listener {
err := os.Mkdir(RuntimeDirectory(), 0755) socketPath := SocketPath()
if err != nil && !errors.Is(err, os.ErrExist) { _, err := os.Stat(socketPath)
log.Fatalln("FATAL Failed to create runtime directory", err)
}
pipePath := PipePath()
_, err = os.Stat(pipePath)
if err == nil { if err == nil {
log.Println("WARN Runtime file", pipePath, "already exists: Is the daemon already running? Deleting.") log.Println("WARN socket", socketPath, "already exists: Is the daemon already running? Deleting.")
err = os.Remove(pipePath) err = os.Remove(socketPath)
if err != nil { if err != nil {
log.Println("FATAL Failed to remove runtime file:", err) log.Println("FATAL Failed to remove socket:", err)
} }
} }
err = syscall.Mkfifo(pipePath, 0600) ln, err := net.Listen("unix", socketPath)
if err != nil { if err != nil {
log.Println("FATAL Failed to create runtime file:", err) log.Println("FATAL Failed to create socket:", err)
}
file, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
log.Println("FATAL Failed to open runtime file:", err)
}
return file
}
func Respond(request Request, response Response) {
file, err := os.Create(request.ResponsePath())
if err != nil {
log.Println("WARN Can't respond to message:", err)
return
}
err = gob.NewEncoder(file).Encode(response)
if err != nil {
log.Println("WARN Can't respond to message:", err)
return
} }
return ln
} }
// Handle connections // Handle connections
func Serve() { func Serve() {
pipe := createOpenPipe() ln := createOpenSocket()
defer ln.Close()
for { for {
conn, err := ln.Accept()
if err != nil {
log.Println("ERROR Failed to open connection from cli:", err)
continue
}
go func(conn net.Conn) {
var request Request var request Request
err := gob.NewDecoder(pipe).Decode(&request)
if err != nil {
d, _ := time.ParseDuration("1s")
if err.Error() == "EOF" {
log.Println("DEBUG received EOF, seeking one byte")
_, err = pipe.Seek(1, 1)
if err != nil {
log.Println("DEBUG failed to seek:", err)
}
time.Sleep(d)
continue
}
log.Println("WARN Invalid Message received:", err)
time.Sleep(d)
continue
}
go func(request Request) {
var response Response var response Response
err := gob.NewDecoder(conn).Decode(&request)
if err != nil {
log.Println("ERROR Invalid Message from cli:", err)
return
}
switch request.Request { switch request.Request {
case Query: case Query:
response.Actions = actionStore.store.ToReadable() response.Actions = actionStore.store.ToReadable()
case Flush: case Flush:
actionStore.Flush(request.Pattern) actionStore.Flush(request.Pattern)
default: default:
log.Println("WARN Invalid Message: unrecognised Request type") log.Println("ERROR Invalid Message from cli: unrecognised Request type")
return
} }
Respond(request, response)
}(request) gob.NewEncoder(conn).Encode(response)
if err != nil {
log.Println("ERROR Can't respond to cli:", err)
return
}
}(conn)
} }
} }

View File

@ -24,10 +24,10 @@ func cmdStdout(commandline []string) chan *string {
cmd := exec.Command(commandline[0], commandline[1:]...) cmd := exec.Command(commandline[0], commandline[1:]...)
stdout, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
log.Fatalln("couldn't open stdout on command:", err) log.Fatalln("FATAL couldn't open stdout on command:", err)
} }
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
log.Fatalln("couldn't start command:", err) log.Fatalln("FATAL couldn't start command:", err)
} }
defer stdout.Close() defer stdout.Close()
@ -221,7 +221,7 @@ func Main() {
quit() quit()
} }
case <-sigs: case <-sigs:
log.Printf("Received SIGINT or SIGTERM, exiting") log.Printf("INFO Received SIGINT/SIGTERM, exiting")
quit() quit()
} }
} }
@ -235,7 +235,7 @@ func quit() {
// wait for them to complete // wait for them to complete
wgActions.Wait() wgActions.Wait()
// delete pipe // delete pipe
os.Remove(PipePath()) os.Remove(SocketPath())
os.Exit(3) os.Exit(3)
} }