via

Simple pubsub server inspired by https://patchbay.pub/
git clone https://git.ce9e.org/via.git

commit
dc5a3f856c40fb57dfe147e8f9b6999a65ed61d0
parent
5a2f503637a8702b3c7a24ab8162609cb306ac06
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2020-10-15 11:06
Merge branch 'feature-history'

Diffstat

M README.md 37 ++++++++++++++++++++++++++-----------
M via.go 231 +++++++++++++++++++++++++++++++++++++++++--------------------

2 files changed, 183 insertions, 85 deletions


diff --git a/README.md b/README.md

@@ -1,9 +1,8 @@
    1    -1 # Via - Simple generic HTTP server for messages and storage
   -1     1 # Via - Simple pubsub server
    2     2 
    3    -1 This is a minimal but generic server that does two things:
    4    -1 
    5    -1 -	pass messages between clients
    6    -1 -	store data
   -1     3 This is very much inspired by <https://patchbay.pub/> and its clones
   -1     4 [conduit](https://github.com/prologic/conduit) and
   -1     5 [duct](https://github.com/schollz/duct).
    7     6 
    8     7 ## Usage
    9     8 
@@ -19,13 +18,20 @@ Then start sending requests on the client:
   19    18 	# POST a message
   20    19 	curl http://localhost:8001/msg/someid -d somedata
   21    20 
   22    -1 	# Store, get, and delete a document
   23    -1 	curl http://localhost:8001/store/someid -d someid
   24    -1 	curl http://localhost:8001/store/someid
   25    -1 	curl http://localhost:8001/store/someid -X DELETE
   -1    21 Use the `hmsg` prefix if you want to keep a history:
   -1    22 
   -1    23 	# start listening and request any messages you may have missed
   -1    24 	curl http://localhost:8001/hmsg/someid -H 'Last-Event-Id: 3'
   -1    25 
   -1    26 	# POST works just as before
   -1    27 	curl http://localhost:8001/hmsg/someid -d somedata
   26    28 
   27    -1 You can also protect your message ID with a password so no one else can listen
   28    -1 to it at the same time:
   -1    29 	# the history only keeps up to 100 entries.
   -1    30 	# you can optimize it by replacing all entries by a single message
   -1    31 	curl http://localhost:8001/hmsg/someid -d combined -H 'Last-Event-Id: 3' -X PUT
   -1    32 
   -1    33 You can also protect your ID with a password so no one else can listen to
   -1    34 it at the same time:
   29    35 
   30    36 	curl http://localhost:8001/msg/someid:somepassword
   31    37 	curl http://localhost:8001/msg/someid  # 403
@@ -34,3 +40,12 @@ to it at the same time:
   34    40 You should regularly clean up old files:
   35    41 
   36    42 	find {storage_dir} -type f -mtime +7 -delete
   -1    43 
   -1    44 ## Differences to patchbay
   -1    45 
   -1    46 -	no support for MPMC (blocking POST)
   -1    47 -	no support for req/res
   -1    48 -	no support for blocking GET
   -1    49 -	support for [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events)
   -1    50 -	support for passwords
   -1    51 -	support for message history

diff --git a/via.go b/via.go

@@ -2,6 +2,7 @@ package main
    2     2 
    3     3 import (
    4     4 	"encoding/base64"
   -1     5 	"encoding/json"
    5     6 	"flag"
    6     7 	"fmt"
    7     8 	"io/ioutil"
@@ -9,6 +10,7 @@ import (
    9    10 	"net/http"
   10    11 	"os"
   11    12 	"path"
   -1    13 	"strconv"
   12    14 	"strings"
   13    15 	"sync"
   14    16 	"time"
@@ -23,12 +25,15 @@ type Topic struct {
   23    25 	sync.RWMutex
   24    26 	channels map[chan Msg]bool
   25    27 	password string
   -1    28 	hasHistory bool
   -1    29 	history []Msg
   26    30 	lastId int
   27    31 }
   28    32 
   29    33 var mux = &sync.RWMutex{}
   30    34 var topics = make(map[string]*Topic)
   31    35 var verbose = false
   -1    36 var maxHistorySize = 100
   32    37 var dir = ""
   33    38 
   34    39 func splitPassword(combined string) (string, string) {
@@ -40,7 +45,100 @@ func splitPassword(combined string) (string, string) {
   40    45 	}
   41    46 }
   42    47 
   43    -1 func pushChannel(key string, password string, ch chan Msg) bool {
   -1    48 func getStorePath(key string) string {
   -1    49 	hash := base64.URLEncoding.EncodeToString([]byte(key))
   -1    50 	return path.Join(dir, hash)
   -1    51 }
   -1    52 
   -1    53 func (topic Topic) storeHistory(key string) {
   -1    54 	topic.Lock()
   -1    55 	defer topic.Unlock()
   -1    56 
   -1    57 	path := getStorePath(fmt.Sprintf("%s:%s", key, topic.password))
   -1    58 
   -1    59 	content, err := json.Marshal(topic.history)
   -1    60 	if err != nil {
   -1    61 		log.Println("error storing history:", err)
   -1    62 		return
   -1    63 	}
   -1    64 
   -1    65 	err = ioutil.WriteFile(path, content, 0644)
   -1    66 	if err != nil {
   -1    67 		log.Println("error storing history:", err)
   -1    68 		return
   -1    69 	}
   -1    70 }
   -1    71 
   -1    72 func (topic *Topic) restoreHistory(key string) {
   -1    73 	topic.Lock()
   -1    74 	defer topic.Unlock()
   -1    75 
   -1    76 	path := getStorePath(fmt.Sprintf("%s:%s", key, topic.password))
   -1    77 
   -1    78 	content, err := ioutil.ReadFile(path)
   -1    79 	if err != nil {
   -1    80 		log.Println("error restoring history:", err)
   -1    81 		return
   -1    82 	}
   -1    83 
   -1    84 	var history []Msg
   -1    85 	err = json.Unmarshal(content, &history)
   -1    86 	if err != nil {
   -1    87 		log.Println("error restoring history:", err)
   -1    88 		return
   -1    89 	}
   -1    90 
   -1    91 	topic.history = history
   -1    92 	if len(history) > 0 {
   -1    93 		topic.lastId = history[len(history)-1].Id
   -1    94 	}
   -1    95 }
   -1    96 
   -1    97 func (topic *Topic) post(data []byte) {
   -1    98 	topic.Lock()
   -1    99 	defer topic.Unlock()
   -1   100 
   -1   101 	topic.lastId += 1
   -1   102 	msg := Msg{topic.lastId, data}
   -1   103 
   -1   104 	if topic.hasHistory {
   -1   105 		topic.history = append(topic.history, msg)
   -1   106 
   -1   107 		for len(topic.history) > maxHistorySize {
   -1   108 			topic.history = topic.history[1:]
   -1   109 		}
   -1   110 	}
   -1   111 
   -1   112 	for channel := range topic.channels {
   -1   113 		go func(ch chan Msg) {
   -1   114 			ch <- msg
   -1   115 		}(channel)
   -1   116 	}
   -1   117 }
   -1   118 
   -1   119 func (topic *Topic) put(data []byte, lastId int) {
   -1   120 	topic.Lock()
   -1   121 	defer topic.Unlock()
   -1   122 
   -1   123 	if len(topic.history) > 0 && lastId < topic.history[0].Id {
   -1   124 		return
   -1   125 	}
   -1   126 
   -1   127 	history := make([]Msg, 0)
   -1   128 	history = append(history, Msg{lastId, data})
   -1   129 	for _, msg := range topic.history {
   -1   130 		if msg.Id > lastId {
   -1   131 			history = append(history, msg)
   -1   132 		}
   -1   133 	}
   -1   134 	topic.history = history
   -1   135 
   -1   136 	if lastId > topic.lastId {
   -1   137 		topic.lastId = lastId
   -1   138 	}
   -1   139 }
   -1   140 
   -1   141 func getTopic(key string, password string) (*Topic, bool) {
   44   142 	mux.RLock()
   45   143 	topic, ok := topics[key]
   46   144 	mux.RUnlock()
@@ -49,12 +147,26 @@ func pushChannel(key string, password string, ch chan Msg) bool {
   49   147 		topic = &Topic{
   50   148 			channels: make(map[chan Msg]bool, 0),
   51   149 			password: password,
   -1   150 			hasHistory: strings.HasPrefix(key, "/hmsg/"),
   -1   151 			history: make([]Msg, 0),
   52   152 			lastId: 0,
   53   153 		}
   -1   154 		if topic.hasHistory {
   -1   155 			topic.restoreHistory(key)
   -1   156 		}
   54   157 		mux.Lock()
   55   158 		topics[key] = topic
   56   159 		mux.Unlock()
   57   160 	} else if topic.password != password {
   -1   161 		return nil, false
   -1   162 	}
   -1   163 
   -1   164 	return topic, true
   -1   165 }
   -1   166 
   -1   167 func pushChannel(key string, password string, ch chan Msg, lastId int) bool {
   -1   168 	topic, allowed := getTopic(key, password)
   -1   169 	if !allowed {
   58   170 		return false
   59   171 	}
   60   172 
@@ -62,6 +174,16 @@ func pushChannel(key string, password string, ch chan Msg) bool {
   62   174 	topic.channels[ch] = true
   63   175 	topic.Unlock()
   64   176 
   -1   177 	if topic.hasHistory {
   -1   178 		go func(t Topic) {
   -1   179 			for _, msg := range t.history {
   -1   180 				if msg.Id > lastId {
   -1   181 					ch <- msg
   -1   182 				}
   -1   183 			}
   -1   184 		}(*topic)
   -1   185 	}
   -1   186 
   65   187 	return true
   66   188 }
   67   189 
@@ -84,13 +206,7 @@ func popChannel(key string, ch chan Msg) {
   84   206 	}
   85   207 }
   86   208 
   87    -1 func getStorePath(key string) string {
   88    -1 	rel := strings.TrimPrefix(key, "/store/")
   89    -1 	hash := base64.URLEncoding.EncodeToString([]byte(rel))
   90    -1 	return path.Join(dir, hash)
   91    -1 }
   92    -1 
   93    -1 func postMsg(w http.ResponseWriter, r *http.Request) {
   -1   209 func post(w http.ResponseWriter, r *http.Request) {
   94   210 	key, password := splitPassword(r.URL.Path)
   95   211 
   96   212 	if password != "" {
@@ -113,24 +229,23 @@ func postMsg(w http.ResponseWriter, r *http.Request) {
  113   229 		return
  114   230 	}
  115   231 
  116    -1 	topic.RLock()
  117    -1 	defer topic.RUnlock()
   -1   232 	topic.post(body)
  118   233 
  119    -1 	topic.lastId += 1
  120    -1 	msg := Msg{topic.lastId, body}
  121    -1 
  122    -1 	for channel := range topic.channels {
  123    -1 		go func(ch chan Msg) {
  124    -1 			ch <- msg
  125    -1 		}(channel)
   -1   234 	if topic.hasHistory {
   -1   235 		topic.storeHistory(key)
  126   236 	}
  127   237 }
  128   238 
  129    -1 func getMsg(w http.ResponseWriter, r *http.Request) {
   -1   239 func get(w http.ResponseWriter, r *http.Request) {
  130   240 	key, password := splitPassword(r.URL.Path)
  131   241 
   -1   242 	lastId, err := strconv.Atoi(r.Header.Get("Last-Event-ID"))
   -1   243 	if err != nil {
   -1   244 		lastId = 0
   -1   245 	}
   -1   246 
  132   247 	ch := make(chan Msg)
  133    -1 	allowed := pushChannel(key, password, ch)
   -1   248 	allowed := pushChannel(key, password, ch, lastId)
  134   249 	if !allowed {
  135   250 		http.Error(w, "Forbidden", http.StatusForbidden)
  136   251 		return
@@ -168,79 +283,47 @@ func getMsg(w http.ResponseWriter, r *http.Request) {
  168   283 	}
  169   284 }
  170   285 
  171    -1 func putStore(w http.ResponseWriter, r *http.Request) {
  172    -1 	path := getStorePath(r.URL.Path)
   -1   286 func put(w http.ResponseWriter, r *http.Request) {
   -1   287 	key, password := splitPassword(r.URL.Path)
  173   288 
  174    -1 	content, err := ioutil.ReadAll(r.Body)
  175    -1 	if err != nil {
  176    -1 		log.Println("error reading request body:", err)
  177    -1 		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
   -1   289 	topic, allowed := getTopic(key, password)
   -1   290 
   -1   291 	if !allowed {
   -1   292 		http.Error(w, "Forbidden", http.StatusForbidden)
   -1   293 		return
   -1   294 	} else if !topic.hasHistory {
   -1   295 		http.Error(w, "No history", http.StatusBadRequest)
  178   296 		return
  179   297 	}
  180   298 
  181    -1 	err = ioutil.WriteFile(path, content, 0644)
   -1   299 	lastId, err := strconv.Atoi(r.Header.Get("Last-Event-ID"))
  182   300 	if err != nil {
  183    -1 		log.Println("error writing to file:", err)
  184    -1 		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
   -1   301 		http.Error(w, "Missing Last-Event-ID", http.StatusBadRequest)
  185   302 		return
  186   303 	}
  187    -1 }
  188    -1 
  189    -1 func getStore(w http.ResponseWriter, r *http.Request) {
  190    -1 	path := getStorePath(r.URL.Path)
  191   304 
  192    -1 	content, err := ioutil.ReadFile(path)
  193    -1 	if os.IsNotExist(err) {
  194    -1 		http.Error(w, "Not Found", http.StatusNotFound)
  195    -1 		return
  196    -1 	} else if err != nil {
  197    -1 		log.Println("error reading from file:", err)
   -1   305 	body, err := ioutil.ReadAll(r.Body)
   -1   306 	if err != nil {
   -1   307 		log.Println("error reading request body:", err)
  198   308 		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  199   309 		return
  200   310 	}
  201   311 
  202    -1 	w.Write(content)
  203    -1 }
  204    -1 
  205    -1 func deleteStore(w http.ResponseWriter, r *http.Request) {
  206    -1 	path := getStorePath(r.URL.Path)
  207    -1 
  208    -1 	err := os.Remove(path)
  209    -1 	if os.IsNotExist(err) {
  210    -1 		http.Error(w, "Not Found", http.StatusNotFound)
  211    -1 		return
  212    -1 	} else if err != nil {
  213    -1 		log.Println("error removing file:", err)
  214    -1 		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  215    -1 		return
  216    -1 	}
   -1   312 	topic.put(body, lastId)
   -1   313 	topic.storeHistory(key)
  217   314 }
  218   315 
  219    -1 func handleMsg(w http.ResponseWriter, r *http.Request) {
   -1   316 func handler(w http.ResponseWriter, r *http.Request) {
  220   317 	if verbose {
  221   318 		log.Println(r.Method, r.URL)
  222   319 	}
  223   320 
  224   321 	if r.Method == http.MethodGet {
  225    -1 		getMsg(w, r)
   -1   322 		get(w, r)
  226   323 	} else if r.Method == http.MethodPost {
  227    -1 		postMsg(w, r)
  228    -1 	} else {
  229    -1 		http.Error(w, "Unsupported Method", http.StatusMethodNotAllowed)
  230    -1 	}
  231    -1 }
  232    -1 
  233    -1 func handleStore(w http.ResponseWriter, r *http.Request) {
  234    -1 	if verbose {
  235    -1 		log.Println(r.Method, r.URL)
  236    -1 	}
  237    -1 
  238    -1 	if r.Method == http.MethodPut || r.Method == http.MethodPost {
  239    -1 		putStore(w, r)
  240    -1 	} else if r.Method == http.MethodGet || r.Method == http.MethodHead {
  241    -1 		getStore(w, r)
  242    -1 	} else if r.Method == http.MethodDelete {
  243    -1 		deleteStore(w, r)
   -1   324 		post(w, r)
   -1   325 	} else if r.Method == http.MethodPut {
   -1   326 		put(w, r)
  244   327 	} else {
  245   328 		http.Error(w, "Unsupported Method", http.StatusMethodNotAllowed)
  246   329 	}
@@ -261,8 +344,8 @@ func main() {
  261   344 		addr = fmt.Sprintf("localhost:%s", flag.Args()[0])
  262   345 	}
  263   346 
  264    -1 	http.HandleFunc("/msg/", handleMsg)
  265    -1 	http.HandleFunc("/store/", handleStore)
   -1   347 	http.HandleFunc("/msg/", handler)
   -1   348 	http.HandleFunc("/hmsg/", handler)
  266   349 
  267   350 	log.Printf("Serving on http://%s", addr)
  268   351 	log.Fatal(http.ListenAndServe(addr, nil))