Wpis z mikrobloga

Mircy pomozcie mi z #python

Mam sobie pub/sub z ktorego przychodza jakies tam wiadomosci. Zadanie mam takie ze musze (po krotkim post-processingu) wyslac to do websocket'a. Moj pub/sub kiedy przychodzi widomosc operuje na 10 thread-worker'ach czyli costam przychodzi i sie thread odpala.

Dotychczas uzywalem bilbioteki websocket-clienti tego mechanizmu: https://github.com/websocket-client/websocket-client#short-lived-connection . Zauwazylem jednak ze tworzenie tych polaczen dla kazdej wiadomosci skutkowalo wiekszym lagiem niz uzywanie jednego trwalego polaczenia.

Wiec z racji tego ze chce miec jedno dlugo-zyjace i regenerujace sie polaczenie postanowilem uzyc WebSocketApp z w/w bibilioteki. Mam tam onopen, onmessage etc. I ruszam run_forever w oddzielnym 'deamon' thread'zie. Wtedy mam w glownym thread'ie dostepna ta aplikacje i moge wysylac wiadomosci uzywajac metody send. Z racji tego ze w/w biblioteka zapewnie "thread-safety" nie ma problemu zeby pozniej moj pub/sub workers uzywaly tego samego obiektu do wysylania payload'ow. W dodatku dochodzi jeszcze kwestia wysylania co 10-sec "keepAlive", to cos innego niz zwykly websocket'owy "ping" bo musi zawierac poprawna mutacje gql, z racji tego odpalam jeszcze jeden thread ktore te wiadomosci wysyla.

Problem w tym ze po tym jak wysle wiadomosc przez websocket, musze odczytac jeszcze status tej wiadomosci i sparsowac to co odeslano tak zebym pozniej mogl w moim pubsubie zrobic ack/nack zgodnie z tym czy tak wiadomosc zostala pomyslnie wyslana. Mam rowniez definicje jakie statusy sa "retryable" a jakie "non-retryable". Zgodnie z tym wtedy moge taka wiadomosci nack'owac i przychodzi ona jeszcze raz w pub-subie.

PYTANIE: nie wiem dokladnie jak z tym on_message "zapisac" odpowiedz na poszczegolne zapytania i je pozniej zdobyc, z tego co wiem to websocket nie wysyla nieczego "jak mu sie zachce" wszystko jest odpowiedzia na wczesniejsze payloady (aka mutacje gql). Taki payload ma oczywiscie swoje ID ktore ja kontroluje i odpowiedz przychodzi z takim samym ID.

W moim pubsubie jak przychodzi wiadomosc to musze zrobic tak:
- Robie jakis prosty post-processing
- Wysylam wiadomosc w websocket uzywajac tego single-tonu WebSocketApp ktory chodzi sobie w daemon-threadzie
- Musze poczekac wtedy na odpowiedz
- Zgodnie ze statusem ktory dostaje od websocke zrobic ack/nack dla wiadomosci z pubsub.

Tutaj przykladowy kod: https://pastebin.com/FrtGseeg
Oczywiscie nie wszystko tam jest, np ominalem fakt ze musze co 10 sekund wysylac to "keepAlive" etc.

Znalazlem wczesniej cos takiego: https://stackoverflow.com/questions/61532846/accessing-incoming-messages-with-a-python-websocket-client

To queue dziala niby w moim przypadku ale boje sie ze bez sprawdzania jakie ID ma odpowiedz moge "pomieszac" te wiadomosc z racji tego ze to queue dziala jako FIFO. Myslalem o jakims in-memory cache ktore zapisywalo by odpowiedzi bazujac na tym ID cos w stylu key/value tylko zeby bylo tam jakies expiry. Troche boje sie uzywac jakiegos globalnego dict.
Co mozecie mi poradzic ( ͡° ͜ʖ ͡°) Sorry za wysryw tak wgl XD.

#programowanie #python
  • 6
  • Odpowiedz
@chrabia_bober:

Dotychczas uzywalem bilbioteki websocket-clienti tego mechanizmu: https://github.com/websocket-client/websocket-client#short-lived-connection . Zauwazylem jednak ze tworzenie tych polaczen dla kazdej wiadomosci skutkowalo wiekszym lagiem niz uzywanie jednego trwalego polaczenia.


Z ciekawości, to dlaczego używasz short lived zamiast long lived, które tam są? Przecież ciągle czekasz na wiadomości, więc w Twoim interesie (zazwyczaj, nie mówię, że zawsze) powinno być trzymanie połączenia do pub/suba i odbieranie wiadomości za pomocą tego utrzymywanego połączenia. Jak ciągle odpalasz nowe
  • Odpowiedz
via Wykop Mobilny (Android)
  • 0
@devopsiarz: No właśnie tak jak mówię, chce użyć long-lived. Tak jak pisałem , w moim przypadku pubsub to oddzielny protokół, wysyłanie do websocketa to tylko część tego co robię z wiadomościami z pub-suba.
  • Odpowiedz
Moja propozycja, taka na kolanie, może być błędna.

Tak rozumiem Twój program:

Kolejność procesowania:

1) pubsub ----> funkcja1 ----> ws [request]
2) pubsub <----- funkcja2 <---- ws [response]

W sensie tak powinno Ci to chodzić, jeśli dobrze to zrozumiałem. Oczywiście w tym przypadku funkcja1 procesuje przychodzący request z pubsub i przesyła do websocketa. Funkcja2 procesuje odpowiedź z websocketa i zwraca z powrotem do pubsub

Do komunikacji pomiędzy tymi elementami w programie użył
  • Odpowiedz
@chrabia_bober: a teraz doczytałem końcówkę, którą napisałeś, więc jako uzupełnienie: w queue możesz dicty przesyłać lub nawet klasy, to apropo czy będziesz chciał mieć jakieś ciut bardziej skomplikowane wiadomości, aby pilnować kolejności. Ale możesz też mieć właśnie jakieś globalne cache z ttlem, nie widzę tu żadnego problemu. Reszta jak w moim przykładzie (z grubsza).
  • Odpowiedz
@devopsiarz: Dzieki wielkie za protipy.

Generalnie to nie ma funkcji2. Wszytko sie dzieje w funkcji1 (on_entry) czyli to co triggeruje jak przychodzi cos z pubsuba. Wtedy ja wysylam do cos do ws i czekam na odpowiedz i wtedy decyzja czy zrobic ack/nack bo mam dostepny objekt entry ktory ma te metody.

Chyba nieuniknione bedzie uzycie https://github.com/tkem/cachetools TTLCache po to zebym mogl zapisac w on_message w ws odpowiedz pod kluczem
  • Odpowiedz