via

Simple pubsub server inspired by https://patchbay.pub/
git clone https://git.ce9e.org/via.git

commit
d980c035990d1d9ee8113d0521f0d2ddbbce5072
parent
1a9a661732c30925d383d1b51f8d93f0ebb3effe
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2025-06-13 15:01
concurrency: use explicit refCount instead of len(channels) in cleanup()

Diffstat

M via.go 18 ++++++++++++++----

1 files changed, 14 insertions, 4 deletions


diff --git a/via.go b/via.go

@@ -42,6 +42,7 @@ type Topic struct {
   42    42 	history    []Msg
   43    43 	path       string
   44    44 	lastId     int
   -1    45 	refCount   int
   45    46 	subChan    chan Sub
   46    47 	unsubChan  chan chan Msg
   47    48 	postChan   chan Post
@@ -103,16 +104,19 @@ func (topic *Topic) deleteHistory() {
  103   104 }
  104   105 
  105   106 func (topic *Topic) cleanup(key string) bool {
  106    -1 	if len(topic.channels) > 0 {
   -1   107 	mux.Lock()
   -1   108 	defer mux.Unlock()
   -1   109 
   -1   110 	topic.refCount -= 1
   -1   111 
   -1   112 	if topic.refCount > 0 {
  107   113 		return false
  108   114 	}
  109   115 
  110   116 	if verbose {
  111   117 		log.Println("clearing topic", key)
  112   118 	}
  113    -1 	mux.Lock()
  114   119 	delete(topics, key)
  115    -1 	mux.Unlock()
  116   120 	return true
  117   121 }
  118   122 
@@ -122,6 +126,8 @@ func (topic *Topic) run(key string) {
  122   126 	}
  123   127 
  124   128 	for {
   -1   129 		doCleanup := true
   -1   130 
  125   131 		select {
  126   132 		case sub := <-topic.subChan:
  127   133 			for _, msg := range topic.history {
@@ -131,6 +137,7 @@ func (topic *Topic) run(key string) {
  131   137 			}
  132   138 
  133   139 			topic.channels[sub.ch] = true
   -1   140 			doCleanup = false
  134   141 		case ch := <-topic.unsubChan:
  135   142 			close(ch)
  136   143 			delete(topic.channels, ch)
@@ -177,7 +184,7 @@ func (topic *Topic) run(key string) {
  177   184 			topic.deleteHistory()
  178   185 		}
  179   186 
  180    -1 		if topic.cleanup(key) {
   -1   187 		if doCleanup && topic.cleanup(key) {
  181   188 			break
  182   189 		}
  183   190 	}
@@ -196,6 +203,7 @@ func getTopic(key string) *Topic {
  196   203 			history:    make([]Msg, 0),
  197   204 			path:       path.Join(dir, filename),
  198   205 			lastId:     0,
   -1   206 			refCount:   0,
  199   207 			subChan:    make(chan Sub),
  200   208 			unsubChan:  make(chan chan Msg),
  201   209 			postChan:   make(chan Post),
@@ -206,6 +214,8 @@ func getTopic(key string) *Topic {
  206   214 		go topic.run(key)
  207   215 	}
  208   216 
   -1   217 	topic.refCount += 1
   -1   218 
  209   219 	return topic
  210   220 }
  211   221