Cyberway -> Event Manager -> Mqtt
хотел поделиться относительно простым способом получения событий от Event Manager-а сайбервея.
Мотивация
Я пока не нашел другого способа, как отслеживать вторичные экшены, экшены которые вызваются, при обработке блоков. В блокчейне golos это были виртуальные операции. К примеру вы вызваете экшен gls.publish.payoutmssg, а этот экшен в свою очередь вызывает экшен transfer контракта cyberway.token. Так вот если просто перебирать блоки, то вторичные экшены и генерируемые события (events) не получится забрать.
Для этого в cyberway имеется плагин, event engine, который все события и все исполняемые экшены отправляет по unix сокету. Проблема только в том, что не так просто можно читать эти данные. К примеру java без костылей не может читать из unix сокетов. К тому же только одно приложение за раз может читать из сокета, а хочется подключить несколько приложений/ботов к одной ноде cyberway.
Message Broker
В качестве посредника можно использовать message broker. Это очередь сообщений, к которой можно подключить один или несколько так называемых message producer (отправителей сообщений) и один или несколько message consumer (получателей сообщений). Есть разные message brocker, которые могут хранить сообщения в очереди и отдавать по требованию, есть сложные вроде kafka и есть попроще, вроде mosquitto. Я попробовал на свом домашнем компьютере запутить kafka, но мне просто не хватило памяти на компьютере, потому я отказался от этого. Есть еще другие вроде RabbitMQ и тп, но я сразу обратился к тому, что я уже использовал дома, для home automation - mosquitto.
Mosquitto
Mosquitto - брокер сообщений работающий по протоколу MQTT. Протокол MQTT был создан для Интернета вещей, что бы различные устройства могли общаться друг с другом. Брокер очень легковесный, практически не потребляет памяти и процессороного времени. Устроен он относительно просто. Отправители сообщений могут отправлять сообщения в так называемые топики, устроенные как директории. К примеру cyberway/actions/gls.publish/payoutmssg. Другие приложения могут подключиться к этим топикам и получать сообщения. Причем можно подписаться к примеру на "cyberway/#" и получать все сообщения. Или подписаться на cyberway/actions/gls.publish/# и получать сообщения, вроде upvote, payoutmssg, createmssg и так далее смартконтракта gls.publish.
Я написал простенький адаптер для mqtt и выложил на гитхабе. Он подключается к unix сокету и все полученные сообщения отправляет брокеру. Потом можно в любом приложении подключиться к брокеру и подписаться только на нужные сообщения.
Настройка
Сперва надо собрать адаптер для mqtt. Для этого надо склонировать репозиторий с адаптером
git clone https://github.com/gropox/cyberway2mqtt.git
Зайти в папку cyberway2mqtt и собрать образ докера с адаптером
cd cyberway2mqtt
docker build . -t c2m
Далее надо включить плагин event engine в config.ini ноды
plugin = eosio::event_engine_plugin
event-engine-unix-socket = /dev/cyberevents/action
/dev/cyberevents/action - это файл (unix socket), будет создан в директории /dev/cyberevents, которая будет примонтирована в контейнер с nodeos -димоном.
Далее дополним существующий docker-compose.yml
Во первых добавим mqtt сервер как сревис
.......
services:
mqtt:
container_name: mqtt
image: eclipse-mosquitto
ports:
- 127.0.0.1:1884:1883
networks:
- cyberway-net
mongo:
container_name: mongo
image: mongo:4.0.6-xenial
......
Добавим volume c папкой, где будет создаваться unix socket
- ${PWD}/cyberevents:/dev/cyberevents
Папка cyberevents лежит в той же директории где и docker-compose.xml у меня. Можно было бы создать отдельный volume по аналогии с дисками для монги и ноды. Но у меня после экспериментов осталось так.
Так выглядит у меня секция volumes у nodeosd
- "8888"
volumes:
- cyberway-nodeos-data:/opt/cyberway/bin/data-dir
- ${PWD}/config.ini:/opt/cyberway/bin/data-dir/config.ini
- ${PWD}/genesis-data:/opt/cyberway/bin/genesis-data
- ${PWD}/cyberevents:/dev/cyberevents
И надо добавить теперь адаптер, как сервис, после сервиса nodeosd
c2m:
container_name: c2m
image: c2m
volumes_from:
- nodeosd
networks:
- cyberway-net
Запуск
Теперь собственно остается только запустить все командой
docker-composer up -d
При запуске nodeosd создаст сокет файл и остановится. Будет ждать подключения адаптера. В адаптере я установил задержку несколько секунд, что бы дать успеть разогреться mongodb и nodeosd и потребителям сообщений, которые потом будут подключаться к брокеру сообщений.
Как только адаптер начнет работать, в логе nodeosd станут появляться следующие сообщения
Значит все впорядке.
Адаптер
Немного про адаптер и структуру топиков.
event manager выдает через сокет сообщения в виде json. Сообщения бывают нескольких видов. См. аттрибут msg_type.
- AcceptBlock - новые блоки
- CommitBlock - irreversible блоки
- AcceptTrx - новые транзакции в пуле транзакций
- ApplyTrx - транзакции принятые в блок
Адаптер отправляет эти сообщения в соотвествующие топики "AcceptBlock", "CommitBlock" и так далее.
Так же адаптер дополнительно разбирает ApplyTrx и извлекает экшены из сообшения. Экшены потом потправляются в топики вида actions/{receiver}/{code}/{action}.
К примеру экшен с переводом будет записан в топик actions/cyber.token/cyber.token/transfer.
Экшен публикации сообщения в топик actions/gls.publish/gls.publish/createmssg.
Тестирование
Что бы проверить работу, можно установить клиент mosquitto. Под ubuntu это пакет mosquitto-clients
apt install mosquitto-clients
К примеру мы хотим отслеживать все переводы в системе. Для этого надо подписаться на соотвествующий топик "actions/cyber.token/cyber.token/transfer".
mosquitto_sub -p 1884 -t actions/cyber.token/cyber.token/transfer
- mosquitto_sub тул подписки на топики, выдает получаемые сообщения в консоль
- -p 1884 порт, который указан в docker-compose.yml
- -t actions/cyber.token/cyber.token/transfer, топик на который мы подписываемся
node-js
В основном я программирую под node-js. Для подключения скриптом, надо установить пакет mqtt.
Примерная схема подключения. Так я подключаюсь в болталкине к потоку событий, для отслеживания апвоутов и создания сообщений.
var mqtt = require('mqtt')
const TOPIC_CREATEMSSG = "actions/gls.publish/gls.publish/createmssg";
const TOPIC_UPVOTE = "actions/gls.publish/gls.publish/upvote";
async function init() {
log.info("initialize boltalkin");
await db.init();
await telegram.init();
log.info("connect to mqtt", CONFIG.mqtt);
// Подключаемся к брокеру. CONFIG.mqtt = "mqtt://localhost:1884"
const mqtt_client = mqtt.connect(CONFIG.mqtt);
//Создаем обработчик сообщений
mqtt_client.on('message', async function (topic, message) {
log.trace("got message from ", topic, message.toString());
const action = JSON.parse(message.toString());
switch(topic) {
case TOPIC_CREATEMSSG:
log.trace("got createmssg action ", action);
await processCreateMessage(action.block_num, action.block_time, action.id, action.args);
break;
case TOPIC_UPVOTE:
log.trace("got upvote action", action);
await processUpvoute(action.args);
break;
}
})
//подписываемся на нужные топики
mqtt_client.on('connect', function () {
mqtt_client.subscribe(TOPIC_CREATEMSSG, function (err) {});
mqtt_client.subscribe(TOPIC_UPVOTE, function (err) {});
})
}