via

Simple pubsub server inspired by https://patchbay.pub/
git clone https://git.ce9e.org/via.git

commit
324d95575c6a54d4aebd07bb4409158be08d5ba5
parent
5e9a49518602130a829eb5ad0dc30d3081c763fa
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2020-03-25 05:40
allow to set a password on topics

Diffstat

M via.go 51 ++++++++++++++++++++++++++++++++++++++++++---------

1 files changed, 42 insertions, 9 deletions


diff --git a/via.go b/via.go

@@ -1,9 +1,13 @@
    1     1 // Simple pubsub server inspired by https://patchbay.pub
    2     2 //
    3     3 // Usage: via [-v] [[host]:port]
    4    -1 // curl http://localhost:8001/someid/  # block
    5    -1 // curl http://localhost:8001/someid/?sse  # server sent event stream
    6    -1 // curl http://localhost:8001/someid/ -d somedata
   -1     4 // curl http://localhost:8001/someid  # block
   -1     5 // curl http://localhost:8001/someid?sse  # server sent event stream
   -1     6 // curl http://localhost:8001/someid -d somedata
   -1     7 //
   -1     8 // curl http://localhost:8001/someid:somepassword?sse
   -1     9 // curl http://localhost:8001/someid  # 403
   -1    10 // curl http://localhost:8001/someid -d somedata  # 200
    7    11 package main
    8    12 
    9    13 import (
@@ -13,6 +17,7 @@ import (
   13    17 	"log"
   14    18 	"net/http"
   15    19 	"os"
   -1    20 	"strings"
   16    21 	"sync"
   17    22 	"time"
   18    23 )
@@ -20,13 +25,23 @@ import (
   20    25 type Topic struct {
   21    26 	sync.RWMutex
   22    27 	channels map[chan []byte]bool
   -1    28 	password string
   23    29 }
   24    30 
   25    31 var mux = &sync.RWMutex{}
   26    32 var topics = make(map[string]Topic)
   27    33 var verbose = false
   28    34 
   29    -1 func pushChannel(key string, ch chan []byte) {
   -1    35 func splitPassword(combined string) (string, string) {
   -1    36 	split := strings.SplitN(combined, ":", 2)
   -1    37 	if len(split) == 2 {
   -1    38 		return split[0], split[1]
   -1    39 	} else {
   -1    40 		return combined, ""
   -1    41 	}
   -1    42 }
   -1    43 
   -1    44 func pushChannel(key string, password string, ch chan []byte) bool {
   30    45 	mux.RLock()
   31    46 	topic, ok := topics[key]
   32    47 	mux.RUnlock()
@@ -34,15 +49,20 @@ func pushChannel(key string, ch chan []byte) {
   34    49 	if !ok {
   35    50 		topic = Topic{
   36    51 			channels: make(map[chan []byte]bool, 0),
   -1    52 			password: password,
   37    53 		}
   38    54 		mux.Lock()
   39    55 		topics[key] = topic
   40    56 		mux.Unlock()
   -1    57 	} else if topic.password != password {
   -1    58 		return false
   41    59 	}
   42    60 
   43    61 	topic.Lock()
   44    62 	topic.channels[ch] = true
   45    63 	topic.Unlock()
   -1    64 
   -1    65 	return true
   46    66 }
   47    67 
   48    68 func popChannel(key string, ch chan []byte) {
@@ -65,6 +85,8 @@ func popChannel(key string, ch chan []byte) {
   65    85 }
   66    86 
   67    87 func post(w http.ResponseWriter, r *http.Request) {
   -1    88 	key, _ := splitPassword(r.URL.Path)
   -1    89 
   68    90 	body, err := ioutil.ReadAll(r.Body)
   69    91 	if err != nil {
   70    92 		log.Println("error reading request body:", err)
@@ -91,14 +113,21 @@ func post(w http.ResponseWriter, r *http.Request) {
   91   113 }
   92   114 
   93   115 func getBlocking(w http.ResponseWriter, r *http.Request) {
   94    -1 	ch := make(chan []byte)
   95    -1 	pushChannel(r.URL.Path, ch)
   96    -1 	defer popChannel(r.URL.Path, ch)
   -1   116 	key, password := splitPassword(r.URL.Path)
   97   117 
   -1   118 	ch := make(chan []byte)
   -1   119 	allowed := pushChannel(key, password, ch)
   -1   120 	if !allowed {
   -1   121 		http.Error(w, "Forbidden", http.StatusForbidden)
   -1   122 		return
   -1   123 	}
   -1   124 	defer popChannel(key, ch)
   98   125 	w.Write(<-ch)
   99   126 }
  100   127 
  101   128 func getSse(w http.ResponseWriter, r *http.Request) {
   -1   129 	key, password := splitPassword(r.URL.Path)
   -1   130 
  102   131 	ticker := time.NewTicker(time.Second)
  103   132 	defer ticker.Stop()
  104   133 
@@ -113,8 +142,12 @@ func getSse(w http.ResponseWriter, r *http.Request) {
  113   142 	flusher.Flush()
  114   143 
  115   144 	ch := make(chan []byte)
  116    -1 	pushChannel(r.URL.Path, ch)
  117    -1 	defer popChannel(r.URL.Path, ch)
   -1   145 	allowed := pushChannel(key, password, ch)
   -1   146 	if !allowed {
   -1   147 		http.Error(w, "Forbidden", http.StatusForbidden)
   -1   148 		return
   -1   149 	}
   -1   150 	defer popChannel(key, ch)
  118   151 
  119   152 	ctx := r.Context()
  120   153