via

Simple pubsub server inspired by https://patchbay.pub/
git clone https://git.ce9e.org/via.git

commit
da546827545672c2fb575e0f65cc08d1710ad53e
parent
d55570c2d4468d6087d1bf4cef61512d3f7a94ca
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2025-06-13 14:48
concurrency: use channels instead of mutex

"Share memory by communicating; don't communicate by sharing memory."
-- https://pkg.go.dev/sync/atomic

This is more readable, more idiomatic and probably fixes a bunch of
concurrency issues.

Diffstat

M via.go 197 ++++++++++++++++++++++++++++++++-----------------------------

1 files changed, 102 insertions, 95 deletions


diff --git a/via.go b/via.go

@@ -26,13 +26,27 @@ type Msg struct {
   26    26 	Data []byte
   27    27 }
   28    28 
   -1    29 type Sub struct {
   -1    30 	ch     chan Msg
   -1    31 	lastId int
   -1    32 }
   -1    33 
   -1    34 type Post struct {
   -1    35 	data []byte
   -1    36 	ch   chan int
   -1    37 }
   -1    38 
   29    39 type Topic struct {
   30    -1 	sync.Mutex
   31    40 	channels   map[chan Msg]bool
   32    41 	hasHistory bool
   33    42 	history    []Msg
   34    43 	path       string
   35    44 	lastId     int
   -1    45 	subChan    chan Sub
   -1    46 	unsubChan  chan chan Msg
   -1    47 	postChan   chan Post
   -1    48 	putChan    chan Msg
   -1    49 	delChan    chan struct{}
   36    50 }
   37    51 
   38    52 var mux = &sync.Mutex{}
@@ -88,48 +102,83 @@ func (topic *Topic) deleteHistory() {
   88   102 	}
   89   103 }
   90   104 
   91    -1 func (topic *Topic) post(data []byte) {
   92    -1 	topic.lastId += 1
   93    -1 	msg := Msg{topic.lastId, data}
   94    -1 
   95    -1 	if topic.hasHistory {
   96    -1 		topic.history = append(topic.history, msg)
   97    -1 
   98    -1 		for len(topic.history) > maxHistorySize {
   99    -1 			topic.history = topic.history[1:]
  100    -1 		}
   -1   105 func (topic *Topic) cleanup(key string) bool {
   -1   106 	if len(topic.channels) > 0 {
   -1   107 		return false
  101   108 	}
  102   109 
  103    -1 	for ch := range topic.channels {
  104    -1 		ch <- msg
   -1   110 	if verbose {
   -1   111 		log.Println("clearing topic", key)
  105   112 	}
   -1   113 	mux.Lock()
   -1   114 	delete(topics, key)
   -1   115 	mux.Unlock()
   -1   116 	return true
  106   117 }
  107   118 
  108    -1 func (topic *Topic) put(data []byte, lastId int) {
  109    -1 	if len(topic.history) > 0 && lastId < topic.history[0].Id {
  110    -1 		return
   -1   119 func (topic *Topic) run(key string) {
   -1   120 	if topic.hasHistory {
   -1   121 		topic.restoreHistory()
  111   122 	}
  112   123 
  113    -1 	history := make([]Msg, 0)
  114    -1 	history = append(history, Msg{lastId, data})
  115    -1 	for _, msg := range topic.history {
  116    -1 		if msg.Id > lastId {
   -1   124 	for {
   -1   125 		select {
   -1   126 		case sub := <-topic.subChan:
   -1   127 			for _, msg := range topic.history {
   -1   128 				if msg.Id > sub.lastId {
   -1   129 					sub.ch <- msg
   -1   130 				}
   -1   131 			}
   -1   132 
   -1   133 			topic.channels[sub.ch] = true
   -1   134 		case ch := <-topic.unsubChan:
   -1   135 			delete(topic.channels, ch)
   -1   136 		case post := <-topic.postChan:
   -1   137 			topic.lastId += 1
   -1   138 			msg := Msg{topic.lastId, post.data}
   -1   139 
   -1   140 			if topic.hasHistory {
   -1   141 				topic.history = append(topic.history, msg)
   -1   142 				for len(topic.history) > maxHistorySize {
   -1   143 					topic.history = topic.history[1:]
   -1   144 				}
   -1   145 				topic.storeHistory()
   -1   146 
   -1   147 				post.ch <- maxHistorySize - len(topic.history)
   -1   148 			}
   -1   149 
   -1   150 			close(post.ch)
   -1   151 
   -1   152 			for ch := range topic.channels {
   -1   153 				ch <- msg
   -1   154 			}
   -1   155 		case msg := <-topic.putChan:
   -1   156 			if len(topic.history) > 0 && msg.Id < topic.history[0].Id {
   -1   157 				continue
   -1   158 			}
   -1   159 
   -1   160 			history := make([]Msg, 0)
  117   161 			history = append(history, msg)
  118    -1 		}
  119    -1 	}
  120    -1 	topic.history = history
   -1   162 			for _, m := range topic.history {
   -1   163 				if m.Id > msg.Id {
   -1   164 					history = append(history, m)
   -1   165 				}
   -1   166 			}
   -1   167 			topic.history = history
   -1   168 			topic.storeHistory()
  121   169 
  122    -1 	if lastId > topic.lastId {
  123    -1 		topic.lastId = lastId
  124    -1 	}
  125    -1 }
   -1   170 			if msg.Id > topic.lastId {
   -1   171 				topic.lastId = msg.Id
   -1   172 			}
   -1   173 		case _ = <-topic.delChan:
   -1   174 			topic.history = make([]Msg, 0)
   -1   175 			topic.lastId = 0
   -1   176 			topic.deleteHistory()
   -1   177 		}
  126   178 
  127    -1 func (topic *Topic) cleanup(key string) {
  128    -1 	if len(topic.channels) == 0 {
  129    -1 		if verbose {
  130    -1 			log.Println("clearing topic", key)
   -1   179 		if topic.cleanup(key) {
   -1   180 			break
  131   181 		}
  132    -1 		delete(topics, key)
  133   182 	}
  134   183 }
  135   184 
@@ -146,54 +195,29 @@ func getTopic(key string) *Topic {
  146   195 			history:    make([]Msg, 0),
  147   196 			path:       path.Join(dir, filename),
  148   197 			lastId:     0,
  149    -1 		}
  150    -1 		if topic.hasHistory {
  151    -1 			topic.restoreHistory()
   -1   198 			subChan:    make(chan Sub),
   -1   199 			unsubChan:  make(chan chan Msg),
   -1   200 			postChan:   make(chan Post),
   -1   201 			putChan:    make(chan Msg),
   -1   202 			delChan:    make(chan struct{}),
  152   203 		}
  153   204 		topics[key] = topic
   -1   205 		go topic.run(key)
  154   206 	}
  155   207 
  156   208 	return topic
  157   209 }
  158   210 
  159    -1 func pushChannel(key string, ch chan Msg, lastId int) {
  160    -1 	topic := getTopic(key)
  161    -1 
  162    -1 	go func() {
  163    -1 		topic.Lock()
  164    -1 		defer topic.Unlock()
  165    -1 
  166    -1 		for _, msg := range topic.history {
  167    -1 			if msg.Id > lastId {
  168    -1 				ch <- msg
  169    -1 			}
  170    -1 		}
  171    -1 
  172    -1 		topic.channels[ch] = true
  173    -1 	}()
  174    -1 }
  175    -1 
  176    -1 func popChannel(key string, ch chan Msg) {
  177    -1 	mux.Lock()
  178    -1 	defer mux.Unlock()
  179    -1 	topic := topics[key]
  180    -1 
  181    -1 	topic.Lock()
  182    -1 	delete(topic.channels, ch)
  183    -1 	topic.Unlock()
  184    -1 
  185    -1 	topic.cleanup(key)
  186    -1 }
  187    -1 
  188   211 func get(w http.ResponseWriter, r *http.Request) {
  189   212 	lastId, err := strconv.Atoi(r.Header.Get("Last-Event-ID"))
  190   213 	if err != nil {
  191   214 		lastId = 0
  192   215 	}
  193   216 
   -1   217 	topic := getTopic(r.URL.Path)
   -1   218 
  194   219 	ch := make(chan Msg)
  195    -1 	pushChannel(r.URL.Path, ch, lastId)
  196    -1 	defer popChannel(r.URL.Path, ch)
   -1   220 	topic.subChan <- Sub{ch, lastId}
  197   221 
  198   222 	ctx := r.Context()
  199   223 
@@ -215,6 +239,8 @@ func get(w http.ResponseWriter, r *http.Request) {
  215   239 	for {
  216   240 		select {
  217   241 		case <-ctx.Done():
   -1   242 			log.Println("lost a connection on", r.URL.Path)
   -1   243 			topic.unsubChan <- ch
  218   244 			return
  219   245 		case <-ticker.C:
  220   246 			fmt.Fprintf(w, ": ping\n\n")
@@ -234,24 +260,18 @@ func post(w http.ResponseWriter, r *http.Request) {
  234   260 		return
  235   261 	}
  236   262 
   -1   263 	ch := make(chan int)
  237   264 	topic := getTopic(r.URL.Path)
  238    -1 	defer topic.cleanup(r.URL.Path)
   -1   265 	topic.postChan <- Post{body, ch}
  239   266 
  240   267 	response := make(map[string]int)
  241    -1 	defer func() {
  242    -1 		w.Header().Set("Content-Type", "application/json")
  243    -1 		json.NewEncoder(w).Encode(response)
  244    -1 	}()
  245    -1 
  246    -1 	topic.Lock()
  247    -1 	defer topic.Unlock()
  248    -1 
  249    -1 	topic.post(body)
  250    -1 
  251    -1 	if topic.hasHistory {
  252    -1 		topic.storeHistory()
  253    -1 		response["historyRemaining"] = maxHistorySize - len(topic.history)
   -1   268 	remaining, ok := <-ch
   -1   269 	if ok {
   -1   270 		response["historyRemaining"] = remaining
  254   271 	}
   -1   272 
   -1   273 	w.Header().Set("Content-Type", "application/json")
   -1   274 	json.NewEncoder(w).Encode(response)
  255   275 }
  256   276 
  257   277 func put(w http.ResponseWriter, r *http.Request) {
@@ -274,13 +294,7 @@ func put(w http.ResponseWriter, r *http.Request) {
  274   294 	}
  275   295 
  276   296 	topic := getTopic(r.URL.Path)
  277    -1 	defer topic.cleanup(r.URL.Path)
  278    -1 
  279    -1 	topic.Lock()
  280    -1 	defer topic.Unlock()
  281    -1 
  282    -1 	topic.put(body, lastId)
  283    -1 	topic.storeHistory()
   -1   297 	topic.putChan <- Msg{lastId, body}
  284   298 }
  285   299 
  286   300 func del(w http.ResponseWriter, r *http.Request) {
@@ -290,14 +304,7 @@ func del(w http.ResponseWriter, r *http.Request) {
  290   304 	}
  291   305 
  292   306 	topic := getTopic(r.URL.Path)
  293    -1 	defer topic.cleanup(r.URL.Path)
  294    -1 
  295    -1 	topic.Lock()
  296    -1 	defer topic.Unlock()
  297    -1 
  298    -1 	topic.history = make([]Msg, 0)
  299    -1 	topic.lastId = 0
  300    -1 	topic.deleteHistory()
   -1   307 	topic.delChan <- struct{}{}
  301   308 }
  302   309 
  303   310 func handler(w http.ResponseWriter, r *http.Request) {