via

Simple pubsub server inspired by https://patchbay.pub/
git clone https://git.ce9e.org/via.git

commit
7a92a3cf56fc3ad6341ecd38e9ec91eb3f28f411
parent
78871f0b0173ad606cd970db1078060f0752107d
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2025-06-08 03:59
concurrency: hold lock between topics check and modification

otherwise a topic could appear/vanish in between

Diffstat

M via.go 18 +++++++-----------

1 files changed, 7 insertions, 11 deletions


diff --git a/via.go b/via.go

@@ -34,7 +34,7 @@ type Topic struct {
   34    34 	lastId     int
   35    35 }
   36    36 
   37    -1 var mux = &sync.RWMutex{}
   -1    37 var mux = &sync.Mutex{}
   38    38 var topics = make(map[string]*Topic)
   39    39 var verbose = false
   40    40 var maxHistorySize = 100
@@ -133,9 +133,9 @@ func (topic *Topic) put(data []byte, lastId int) {
  133   133 }
  134   134 
  135   135 func getTopic(key string) *Topic {
  136    -1 	mux.RLock()
   -1   136 	mux.Lock()
   -1   137 	defer mux.Unlock()
  137   138 	topic, exists := topics[key]
  138    -1 	mux.RUnlock()
  139   139 
  140   140 	if !exists {
  141   141 		topic = &Topic{
@@ -147,9 +147,7 @@ func getTopic(key string) *Topic {
  147   147 		if topic.hasHistory {
  148   148 			topic.restoreHistory(key)
  149   149 		}
  150    -1 		mux.Lock()
  151   150 		topics[key] = topic
  152    -1 		mux.Unlock()
  153   151 	}
  154   152 
  155   153 	return topic
@@ -173,9 +171,9 @@ func pushChannel(key string, ch chan Msg, lastId int) {
  173   171 }
  174   172 
  175   173 func popChannel(key string, ch chan Msg) {
  176    -1 	mux.RLock()
   -1   174 	mux.Lock()
   -1   175 	defer mux.Unlock()
  177   176 	topic := topics[key]
  178    -1 	mux.RUnlock()
  179   177 
  180   178 	topic.Lock()
  181   179 	delete(topic.channels, ch)
@@ -185,9 +183,7 @@ func popChannel(key string, ch chan Msg) {
  185   183 		if verbose {
  186   184 			log.Println("clearing topic", key)
  187   185 		}
  188    -1 		mux.Lock()
  189   186 		delete(topics, key)
  190    -1 		mux.Unlock()
  191   187 	}
  192   188 }
  193   189 
@@ -199,9 +195,9 @@ func post(w http.ResponseWriter, r *http.Request) {
  199   195 		return
  200   196 	}
  201   197 
  202    -1 	mux.RLock()
   -1   198 	mux.Lock()
  203   199 	topic, exists := topics[r.URL.Path]
  204    -1 	mux.RUnlock()
   -1   200 	mux.Unlock()
  205   201 
  206   202 	response := make(map[string]int)
  207   203 	defer func() {