Как лесно да напишете разпределена уеб услуга в Python AMQP

Хей Хабр. От доста време пиша на Python. Наскоро трябваше да се справя с RabbitMQ. Харесва ми. Тъй като се сглобява в клъстер без никакви проблеми (ясно е, че с някои тънкости). Тогава си помислих: би било хубаво да го използвам като опашка за съобщения в част от API на проекта, върху който работя. Самият API е написан на торнадо, основната идея беше да се изключи блокиращият код от API. Всички синхронни операции бяха извършени в пула от нишки.

Първото нещо, което реших, беше да направя отделен „работещ“ процес(и), който да се грижи за цялата синхронна работа. Мислех, че "работникът" е възможно най-прост и изпълнява задачи от опашката една след друга. Да кажем, че е избрал нещо от базата данни, отговорил е, поел е следващата задача и т.н. Можете сами да управлявате много „работници“ и тогава AMQP вече действа като вид IPC.

След известно време от това израсна модул, който се грижи за цялата рутина, свързана с AMQP и прехвърлянето на съобщения напред и назад, и също така ги компресира с gzip, ако има твърде много данни. Така се роди екипажът. Всъщност, използвайки го, ще напишем прост API, който ще се състои от сървър на торнадо и прости и неусложнени „работнически“ процеси. Гледайки напред, ще кажа, че целият код е наличен в github и това, за което ще говоря след това, е събрано в примерната папка.

Подготовка

Така че, нека го вземем по ред. Първото нещо, което трябва да направим, е да инсталираме RabbitMQ. Как да направя това, няма да описвам. Мога само да кажа, че на същия ubunt е инсталиран и работи от кутията. На моя Mac единственото нещо, което трябваше да направя, беше да инсталирам LaunchRocket, който събираше всички услуги, които бяха инсталирани чрез homebrew и изходв GUI:

разпределена

След това нека създадем нашия проект virtualenv и да инсталираме самия модул чрез pip:

Торнадото умишлено не е посочено в зависимостите на модула, тъй като може да не е на хоста с работника. И в уеб частта те обикновено създават requirements.txt, където са посочени всички други зависимости.

Ще напиша кода на части, за да не наруша реда на историята. Какво получаваме в крайна сметка може да видите тук.

Самият торнадо сървър се състои от две части. В първата част дефинираме манипулаторите за заявката, а във втората част се задейства цикълът на събитието. Нека да напишем сървър и да създадем нашия първи api метод.

Благодарение на съпрограмата в tornado, кодът изглежда прост. Можете да напишете същото без съпрограмма.

Нашият сървър е готов. Но ако го стартираме и отидем на /, тогава няма да чакаме отговор, няма кой да го обработи.

Сега нека напишем прост работник:

И така, както можете да видите в кода, има проста функция, обвита с Task("test") декоратор, където test е уникален идентификатор на задача. Вашият работник не може да има две задачи с един и същ идентификатор. Разбира се, би било правилно задачата да се нарече „crew.example.test“ (както обикновено го наричам в производствена среда), но за нашия пример просто „test“ е достатъчно.

Context.settings.counter веднага хваща окото. Това е контекст, който се инициализира в работния процес, когато се извика функцията за изпълнение. Освен това контекстът вече има context.headers - това са заглавките на отговора за разделяне на метаданните от отговора. В примера с функцията за обратно извикване, именно този речник се предава на _on_response.

Заглавките се нулират след всеки отговор, но context.settings не. Използвам context.settings, за да предам връзка с база данни или друг обект към работните(ите) функции.

Също работникобработва ключове за стартиране, няма много от тях:

URL адресът на връзката с базата данни и други променливи могат да бъдат взети от променливите на средата. Следователно работникът в параметрите чака само как да се свърже с AMQP (хост и порт) и нивото на регистриране.

Така че, нека стартираме всичко и да проверим:

разпределена

Работи, но какво се случи зад екрана?

При стартиране на tornado сървъра, tornado се свърза с RabbitMQ, създаде Exchange DLX и започна да слуша DLX опашката. Това е Dead-Letter-Exchange - специална опашка, която съдържа задачи, които никой работник не е поел за определен таймаут. Той също така създаде опашка с уникален идентификатор, където ще идват отговорите от работниците.

След като стартира, работникът създаде опашка за всяка задача, обвита от декоратора, и се абонира за тях. Когато пристигне задача, работникът на главния цикъл създава една нишка, контролира времето за изпълнение на задачата в основната нишка и изпълнява обвитата функция. След връщане от обвитата функция я сериализира и я поставя в опашката за отговор на сървъра.

След получаване на заявка, tornado сървърът поставя задачата в съответната опашка, като посочва идентификатора на уникалната си опашка, в която трябва да пристигне отговорът. Ако нито един работник не е поел задачата, тогава RabbitMQ пренасочва задачата към обменния DLX и торнадо сървърът получава съобщение, че времето за изчакване на опашката е изтекло, хвърляйки изключение.

Висяща задача

За да демонстрираме как работи механизмът за изпълнение на задачи, които са блокирани в ход, нека напишем друг уеб метод и задача в worker.

Във файла master.py добавете:

И го добавете към списъка с манипулатори:

Както можете да видите от горния пример, задачата ще премине в безкраен цикъл. Ако обаче задачата не бъде изпълнена в рамките на 3 секунди (отчитайки времетоdequeued), главният цикъл в worker-а ще хвърли SystemExit изключение към нишката. И да, ще трябва да го обработите.

Както бе споменато по-горе, контекстът е такъв специален обект, който се импортира и има няколко вградени променливи.

Нека направим малко проста статистика за отговорите на нашия работник.

Добавете следния манипулатор към файла master.py:

Също така ще се регистрираме в списъка с оператори на заявки:

Този манипулатор не се различава много от предишните, той просто връща стойността, която работникът му е подал.

Сега самата задача.

Добавете към файла worker.py:

Функцията връща низ, съдържащ информация за броя на задачите, обработени от работника.

PubSub и дълго анкетиране

Сега нека внедрим няколко манипулатора. Една от заявките просто ще виси и ще чака, а втората ще получи POST данни. След прехвърлянето на последния, първият ще ги подари.

Нека напишем задачата за публикуване.

Ако не е необходимо да прехвърляте контрола на работника, можете просто да публикувате директно от торнадо сървъра

Паралелно изпълнение на работа

Често има ситуация, в която можем да изпълняваме няколко задачи паралелно. crew има малко синтактична захар за това:

В този случай на задачата ще бъдат присвоени две задачи паралелно и излизането от с ще се извърши, когато последната бъде изпълнена.

Но трябва да внимавате, тъй като някои задачи може да предизвикат изключение. Тя ще бъде приравнена директно към променливата. Така че трябва да проверите дали test_result и stat_result не са екземпляри на класа Exception.

Бъдещи планове

Когато eigrad предложи да напиша слой, който може да изпълнява всяко wsgi приложение с помощта на crew, веднага ми хареса тази идея. Само си представете молбиняма да нахлуе във вашето wsgi приложение, а равномерно ще премине през опашката към wsgi-worker.

Никога не съм писал wsgi сървър и дори не знам откъде да започна. Но можете да ми помогнете, приемам заявки за изтегляне.

Също така мисля да добавя клиент за друга популярна асинхронна рамка, за twisted. Но докато го разбирам, и няма достатъчно свободно време.

Благодаря

Благодарение на разработчиците на RabbitMQ и AMQP. Страхотни идеи.

Също така благодаря на читателите. Дано не сте си загубили времето.

И тук можете да получите грант за тестов период на Yandex.Cloud. Необходимо е само да въведете "Habr" в полето "секретна парола".