via

Simple pubsub server inspired by https://patchbay.pub/
git clone https://git.ce9e.org/via.git

commit
5a2f503637a8702b3c7a24ab8162609cb306ac06
parent
b864754c1f71fc501d8b105a4cb61b7957100491
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2020-10-15 07:52
add message IDs

Diffstat

M via.go 32 +++++++++++++++++++++-----------

1 files changed, 21 insertions, 11 deletions


diff --git a/via.go b/via.go

@@ -14,14 +14,20 @@ import (
   14    14 	"time"
   15    15 )
   16    16 
   -1    17 type Msg struct {
   -1    18 	Id int
   -1    19 	Data []byte
   -1    20 }
   -1    21 
   17    22 type Topic struct {
   18    23 	sync.RWMutex
   19    -1 	channels map[chan []byte]bool
   -1    24 	channels map[chan Msg]bool
   20    25 	password string
   -1    26 	lastId int
   21    27 }
   22    28 
   23    29 var mux = &sync.RWMutex{}
   24    -1 var topics = make(map[string]Topic)
   -1    30 var topics = make(map[string]*Topic)
   25    31 var verbose = false
   26    32 var dir = ""
   27    33 
@@ -34,15 +40,16 @@ func splitPassword(combined string) (string, string) {
   34    40 	}
   35    41 }
   36    42 
   37    -1 func pushChannel(key string, password string, ch chan []byte) bool {
   -1    43 func pushChannel(key string, password string, ch chan Msg) bool {
   38    44 	mux.RLock()
   39    45 	topic, ok := topics[key]
   40    46 	mux.RUnlock()
   41    47 
   42    48 	if !ok {
   43    -1 		topic = Topic{
   44    -1 			channels: make(map[chan []byte]bool, 0),
   -1    49 		topic = &Topic{
   -1    50 			channels: make(map[chan Msg]bool, 0),
   45    51 			password: password,
   -1    52 			lastId: 0,
   46    53 		}
   47    54 		mux.Lock()
   48    55 		topics[key] = topic
@@ -58,7 +65,7 @@ func pushChannel(key string, password string, ch chan []byte) bool {
   58    65 	return true
   59    66 }
   60    67 
   61    -1 func popChannel(key string, ch chan []byte) {
   -1    68 func popChannel(key string, ch chan Msg) {
   62    69 	mux.RLock()
   63    70 	topic := topics[key]
   64    71 	mux.RUnlock()
@@ -109,9 +116,12 @@ func postMsg(w http.ResponseWriter, r *http.Request) {
  109   116 	topic.RLock()
  110   117 	defer topic.RUnlock()
  111   118 
   -1   119 	topic.lastId += 1
   -1   120 	msg := Msg{topic.lastId, body}
   -1   121 
  112   122 	for channel := range topic.channels {
  113    -1 		go func(ch chan []byte) {
  114    -1 			ch <- body
   -1   123 		go func(ch chan Msg) {
   -1   124 			ch <- msg
  115   125 		}(channel)
  116   126 	}
  117   127 }
@@ -119,7 +129,7 @@ func postMsg(w http.ResponseWriter, r *http.Request) {
  119   129 func getMsg(w http.ResponseWriter, r *http.Request) {
  120   130 	key, password := splitPassword(r.URL.Path)
  121   131 
  122    -1 	ch := make(chan []byte)
   -1   132 	ch := make(chan Msg)
  123   133 	allowed := pushChannel(key, password, ch)
  124   134 	if !allowed {
  125   135 		http.Error(w, "Forbidden", http.StatusForbidden)
@@ -151,8 +161,8 @@ func getMsg(w http.ResponseWriter, r *http.Request) {
  151   161 		case <-ticker.C:
  152   162 			fmt.Fprintf(w, ": ping\n\n")
  153   163 			flusher.Flush()
  154    -1 		case s := <-ch:
  155    -1 			fmt.Fprintf(w, "data: %s\n\n", s)
   -1   164 		case msg := <-ch:
   -1   165 			fmt.Fprintf(w, "id: %d\ndata: %s\n\n", msg.Id, msg.Data)
  156   166 			flusher.Flush()
  157   167 		}
  158   168 	}