via

Simple pubsub server inspired by https://patchbay.pub/
git clone https://git.ce9e.org/via.git

commit
9831d3f9b359090bfe809d7abcecc81a50104f45
parent
967e357eb133873b68e9a83d6b1ad610b426080d
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2020-10-17 05:37
refactor topic concurrency

-	do all the locking in the views
-	use Mutex instead of RWMutex for simplicity
-	avoid goroutines or do the locking inside
-	add channel to topic after sending history

Diffstat

M via.go 47 +++++++++++++++++++----------------------------

1 files changed, 19 insertions, 28 deletions


diff --git a/via.go b/via.go

@@ -22,7 +22,7 @@ type Msg struct {
   22    22 }
   23    23 
   24    24 type Topic struct {
   25    -1 	sync.RWMutex
   -1    25 	sync.Mutex
   26    26 	channels map[chan Msg]bool
   27    27 	password string
   28    28 	hasHistory bool
@@ -51,9 +51,6 @@ func getStorePath(key string) string {
   51    51 }
   52    52 
   53    53 func (topic Topic) storeHistory(key string) {
   54    -1 	topic.Lock()
   55    -1 	defer topic.Unlock()
   56    -1 
   57    54 	path := getStorePath(fmt.Sprintf("%s:%s", key, topic.password))
   58    55 
   59    56 	content, err := json.Marshal(topic.history)
@@ -70,9 +67,6 @@ func (topic Topic) storeHistory(key string) {
   70    67 }
   71    68 
   72    69 func (topic *Topic) restoreHistory(key string) {
   73    -1 	topic.Lock()
   74    -1 	defer topic.Unlock()
   75    -1 
   76    70 	path := getStorePath(fmt.Sprintf("%s:%s", key, topic.password))
   77    71 
   78    72 	content, err := ioutil.ReadFile(path)
@@ -97,9 +91,6 @@ func (topic *Topic) restoreHistory(key string) {
   97    91 }
   98    92 
   99    93 func (topic *Topic) post(data []byte) {
  100    -1 	topic.Lock()
  101    -1 	defer topic.Unlock()
  102    -1 
  103    94 	topic.lastId += 1
  104    95 	msg := Msg{topic.lastId, data}
  105    96 
@@ -111,17 +102,12 @@ func (topic *Topic) post(data []byte) {
  111   102 		}
  112   103 	}
  113   104 
  114    -1 	for channel := range topic.channels {
  115    -1 		go func(ch chan Msg) {
  116    -1 			ch <- msg
  117    -1 		}(channel)
   -1   105 	for ch := range topic.channels {
   -1   106 		ch <- msg
  118   107 	}
  119   108 }
  120   109 
  121   110 func (topic *Topic) put(data []byte, lastId int) {
  122    -1 	topic.Lock()
  123    -1 	defer topic.Unlock()
  124    -1 
  125   111 	if len(topic.history) > 0 && lastId < topic.history[0].Id {
  126   112 		return
  127   113 	}
@@ -172,19 +158,18 @@ func pushChannel(key string, password string, ch chan Msg, lastId int) bool {
  172   158 		return false
  173   159 	}
  174   160 
  175    -1 	topic.Lock()
  176    -1 	topic.channels[ch] = true
  177    -1 	topic.Unlock()
   -1   161 	go func() {
   -1   162 		topic.Lock()
   -1   163 		defer topic.Unlock()
  178   164 
  179    -1 	if topic.hasHistory {
  180    -1 		go func(t Topic) {
  181    -1 			for _, msg := range t.history {
  182    -1 				if msg.Id > lastId {
  183    -1 					ch <- msg
  184    -1 				}
   -1   165 		for _, msg := range topic.history {
   -1   166 			if msg.Id > lastId {
   -1   167 				ch <- msg
  185   168 			}
  186    -1 		}(*topic)
  187    -1 	}
   -1   169 		}
   -1   170 
   -1   171 		topic.channels[ch] = true
   -1   172 	}()
  188   173 
  189   174 	return true
  190   175 }
@@ -231,6 +216,9 @@ func post(w http.ResponseWriter, r *http.Request) {
  231   216 		return
  232   217 	}
  233   218 
   -1   219 	topic.Lock()
   -1   220 	defer topic.Unlock()
   -1   221 
  234   222 	topic.post(body)
  235   223 
  236   224 	if topic.hasHistory {
@@ -311,6 +299,9 @@ func put(w http.ResponseWriter, r *http.Request) {
  311   299 		return
  312   300 	}
  313   301 
   -1   302 	topic.Lock()
   -1   303 	defer topic.Unlock()
   -1   304 
  314   305 	topic.put(body, lastId)
  315   306 	topic.storeHistory(key)
  316   307 }