via

Simple pubsub server inspired by https://patchbay.pub/
git clone https://git.ce9e.org/via.git

commit
b4a21a15d3c9c3d7c5fd98a8e8802c0d900211b7
parent
dc3496d3be293eaa6c38e448141950c457cb9b06
Author
Tobias Bengfort <tobias.bengfort@posteo.de>
Date
2020-10-15 07:56
allow to replace history

Diffstat

M via.go 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-

1 files changed, 64 insertions, 1 deletions


diff --git a/via.go b/via.go

@@ -116,7 +116,29 @@ func (topic *Topic) post(data []byte) {
  116   116 	}
  117   117 }
  118   118 
  119    -1 func pushChannel(key string, password string, ch chan Msg, lastId int) bool {
   -1   119 func (topic *Topic) put(data []byte, lastId int) {
   -1   120 	topic.Lock()
   -1   121 	defer topic.Unlock()
   -1   122 
   -1   123 	if len(topic.history) > 0 && lastId < topic.history[0].Id {
   -1   124 		return
   -1   125 	}
   -1   126 
   -1   127 	history := make([]Msg, 0)
   -1   128 	history = append(history, Msg{lastId, data})
   -1   129 	for _, msg := range topic.history {
   -1   130 		if msg.Id > lastId {
   -1   131 			history = append(history, msg)
   -1   132 		}
   -1   133 	}
   -1   134 	topic.history = history
   -1   135 
   -1   136 	if lastId > topic.lastId {
   -1   137 		topic.lastId = lastId
   -1   138 	}
   -1   139 }
   -1   140 
   -1   141 func getTopic(key string, password string) (*Topic, bool) {
  120   142 	mux.RLock()
  121   143 	topic, ok := topics[key]
  122   144 	mux.RUnlock()
@@ -136,6 +158,15 @@ func pushChannel(key string, password string, ch chan Msg, lastId int) bool {
  136   158 		topics[key] = topic
  137   159 		mux.Unlock()
  138   160 	} else if topic.password != password {
   -1   161 		return nil, false
   -1   162 	}
   -1   163 
   -1   164 	return topic, true
   -1   165 }
   -1   166 
   -1   167 func pushChannel(key string, password string, ch chan Msg, lastId int) bool {
   -1   168 	topic, allowed := getTopic(key, password)
   -1   169 	if !allowed {
  139   170 		return false
  140   171 	}
  141   172 
@@ -252,6 +283,36 @@ func get(w http.ResponseWriter, r *http.Request) {
  252   283 	}
  253   284 }
  254   285 
   -1   286 func put(w http.ResponseWriter, r *http.Request) {
   -1   287 	key, password := splitPassword(r.URL.Path)
   -1   288 
   -1   289 	topic, allowed := getTopic(key, password)
   -1   290 
   -1   291 	if !allowed {
   -1   292 		http.Error(w, "Forbidden", http.StatusForbidden)
   -1   293 		return
   -1   294 	} else if !topic.hasHistory {
   -1   295 		http.Error(w, "No history", http.StatusBadRequest)
   -1   296 		return
   -1   297 	}
   -1   298 
   -1   299 	lastId, err := strconv.Atoi(r.Header.Get("Last-Event-ID"))
   -1   300 	if err != nil {
   -1   301 		http.Error(w, "Missing Last-Event-ID", http.StatusBadRequest)
   -1   302 		return
   -1   303 	}
   -1   304 
   -1   305 	body, err := ioutil.ReadAll(r.Body)
   -1   306 	if err != nil {
   -1   307 		log.Println("error reading request body:", err)
   -1   308 		http.Error(w, "Internal Server Error", http.StatusInternalServerError)
   -1   309 		return
   -1   310 	}
   -1   311 
   -1   312 	topic.put(body, lastId)
   -1   313 	topic.storeHistory(key)
   -1   314 }
   -1   315 
  255   316 func handler(w http.ResponseWriter, r *http.Request) {
  256   317 	if verbose {
  257   318 		log.Println(r.Method, r.URL)
@@ -261,6 +322,8 @@ func handler(w http.ResponseWriter, r *http.Request) {
  261   322 		get(w, r)
  262   323 	} else if r.Method == http.MethodPost {
  263   324 		post(w, r)
   -1   325 	} else if r.Method == http.MethodPut {
   -1   326 		put(w, r)
  264   327 	} else {
  265   328 		http.Error(w, "Unsupported Method", http.StatusMethodNotAllowed)
  266   329 	}