Уважаемые пользователи Голос!
Сайт доступен в режиме «чтение» до сентября 2020 года. Операции с токенами Golos, Cyber можно проводить, используя альтернативные клиенты или через эксплорер Cyberway. Подробности здесь: https://golos.io/@goloscore/operacii-s-tokenami-golos-cyber-1594822432061
С уважением, команда “Голос”
GOLOS
RU
EN
UA
ropox
5 лет назад
cyberway

Cyberway -> Event Manager -> Mqtt

Привет,

хотел поделиться относительно простым способом получения событий от Event Manager-а сайбервея.

Мотивация

Я пока не нашел другого способа, как отслеживать вторичные экшены, экшены которые вызваются, при обработке блоков. В блокчейне golos это были виртуальные операции. К примеру вы вызваете экшен gls.publish.payoutmssg, а этот экшен в свою очередь вызывает экшен transfer контракта cyberway.token. Так вот если просто перебирать блоки, то вторичные экшены и генерируемые события (events) не получится забрать.

Для этого в cyberway имеется плагин, event engine, который все события и все исполняемые экшены отправляет по unix сокету. Проблема только в том, что не так просто можно читать эти данные. К примеру java без костылей не может читать из unix сокетов. К тому же только одно приложение за раз может читать из сокета, а хочется подключить несколько приложений/ботов к одной ноде cyberway.

Message Broker

В качестве посредника можно использовать message broker. Это очередь сообщений, к которой можно подключить один или несколько так называемых message producer (отправителей сообщений) и один или несколько message consumer (получателей сообщений). Есть разные message brocker, которые могут хранить сообщения в очереди и отдавать по требованию, есть сложные вроде kafka и есть попроще, вроде mosquitto. Я попробовал на свом домашнем компьютере запутить kafka, но мне просто не хватило памяти на компьютере, потому я отказался от этого. Есть еще другие вроде RabbitMQ и тп, но я сразу обратился к тому, что я уже использовал дома, для home automation - mosquitto.

Mosquitto

Mosquitto - брокер сообщений работающий по протоколу MQTT. Протокол MQTT был создан для Интернета вещей, что бы различные устройства могли общаться друг с другом. Брокер очень легковесный, практически не потребляет памяти и процессороного времени. Устроен он относительно просто. Отправители сообщений могут отправлять сообщения в так называемые топики, устроенные как директории. К примеру cyberway/actions/gls.publish/payoutmssg. Другие приложения могут подключиться к этим топикам и получать сообщения. Причем можно подписаться к примеру на "cyberway/#" и получать все сообщения. Или подписаться на cyberway/actions/gls.publish/# и получать сообщения, вроде upvote, payoutmssg, createmssg и так далее смартконтракта gls.publish.

Я написал простенький адаптер для mqtt и выложил на гитхабе. Он подключается к unix сокету и все полученные сообщения отправляет брокеру. Потом можно в любом приложении подключиться к брокеру и подписаться только на нужные сообщения.

Настройка

Сперва надо собрать адаптер для mqtt. Для этого надо склонировать репозиторий с адаптером

git clone https://github.com/gropox/cyberway2mqtt.git

Зайти в папку cyberway2mqtt и собрать образ докера с адаптером

cd cyberway2mqtt
docker build . -t c2m

Далее надо включить плагин event engine в config.ini ноды

plugin = eosio::event_engine_plugin
event-engine-unix-socket = /dev/cyberevents/action

/dev/cyberevents/action - это файл (unix socket), будет создан в директории /dev/cyberevents, которая будет примонтирована в контейнер с nodeos -димоном.

Далее дополним существующий docker-compose.yml

Во первых добавим mqtt сервер как сревис

.......
services:

  mqtt:
    container_name: mqtt
    image: eclipse-mosquitto
    ports:
      - 127.0.0.1:1884:1883
    networks:
      - cyberway-net

  mongo:
    container_name: mongo
    image: mongo:4.0.6-xenial
        ......

Добавим volume c папкой, где будет создаваться unix socket

      - ${PWD}/cyberevents:/dev/cyberevents

Папка cyberevents лежит в той же директории где и docker-compose.xml у меня. Можно было бы создать отдельный volume по аналогии с дисками для монги и ноды. Но у меня после экспериментов осталось так.

Так выглядит у меня секция volumes у nodeosd

      - "8888"
    volumes:
      - cyberway-nodeos-data:/opt/cyberway/bin/data-dir
      - ${PWD}/config.ini:/opt/cyberway/bin/data-dir/config.ini
      - ${PWD}/genesis-data:/opt/cyberway/bin/genesis-data
      - ${PWD}/cyberevents:/dev/cyberevents

И надо добавить теперь адаптер, как сервис, после сервиса nodeosd

  c2m:
    container_name: c2m
    image: c2m
    volumes_from:
      - nodeosd
    networks:
      - cyberway-net

Запуск

Теперь собственно остается только запустить все командой

docker-composer up -d

При запуске nodeosd создаст сокет файл и остановится. Будет ждать подключения адаптера. В адаптере я установил задержку несколько секунд, что бы дать успеть разогреться mongodb и nodeosd и потребителям сообщений, которые потом будут подключаться к брокеру сообщений.

Как только адаптер начнет работать, в логе nodeosd станут появляться следующие сообщения

image.png

Значит все впорядке.

Адаптер

Немного про адаптер и структуру топиков.

event manager выдает через сокет сообщения в виде json. Сообщения бывают нескольких видов. См. аттрибут msg_type.

  • AcceptBlock - новые блоки
  • CommitBlock - irreversible блоки
  • AcceptTrx - новые транзакции в пуле транзакций
  • ApplyTrx - транзакции принятые в блок

Адаптер отправляет эти сообщения в соотвествующие топики "AcceptBlock", "CommitBlock" и так далее.

Так же адаптер дополнительно разбирает ApplyTrx и извлекает экшены из сообшения. Экшены потом потправляются в топики вида actions/{receiver}/{code}/{action}.

К примеру экшен с переводом будет записан в топик actions/cyber.token/cyber.token/transfer.
Экшен публикации сообщения в топик actions/gls.publish/gls.publish/createmssg.

Тестирование

Что бы проверить работу, можно установить клиент mosquitto. Под ubuntu это пакет mosquitto-clients

apt install mosquitto-clients

К примеру мы хотим отслеживать все переводы в системе. Для этого надо подписаться на соотвествующий топик "actions/cyber.token/cyber.token/transfer".

mosquitto_sub -p 1884 -t actions/cyber.token/cyber.token/transfer
  • mosquitto_sub тул подписки на топики, выдает получаемые сообщения в консоль
  • -p 1884 порт, который указан в docker-compose.yml
  • -t actions/cyber.token/cyber.token/transfer, топик на который мы подписываемся

image.png

node-js

В основном я программирую под node-js. Для подключения скриптом, надо установить пакет mqtt.

Примерная схема подключения. Так я подключаюсь в болталкине к потоку событий, для отслеживания апвоутов и создания сообщений.

var mqtt = require('mqtt')
const TOPIC_CREATEMSSG = "actions/gls.publish/gls.publish/createmssg";
const TOPIC_UPVOTE = "actions/gls.publish/gls.publish/upvote";

async function init() {
    log.info("initialize boltalkin");
    await db.init();
    await telegram.init();

    log.info("connect to mqtt", CONFIG.mqtt);
      // Подключаемся к брокеру. CONFIG.mqtt = "mqtt://localhost:1884"
    const mqtt_client  = mqtt.connect(CONFIG.mqtt);

      //Создаем обработчик сообщений
    mqtt_client.on('message', async function (topic, message) {
        log.trace("got message from ", topic, message.toString());
        const action = JSON.parse(message.toString());
        switch(topic) {
        case TOPIC_CREATEMSSG:
            log.trace("got createmssg action ", action);
            await processCreateMessage(action.block_num, action.block_time, action.id, action.args);
            break;
        case TOPIC_UPVOTE:
            log.trace("got upvote action", action);
            await processUpvoute(action.args);
            break;
        }
    })

      //подписываемся на нужные топики
    mqtt_client.on('connect', function () {
        mqtt_client.subscribe(TOPIC_CREATEMSSG, function (err) {});
        mqtt_client.subscribe(TOPIC_UPVOTE, function (err) {});
    })
}
cyberwayпрограммированиеботeventengine
92
1153.792 GOLOS
На Golos с February 2017
Комментарии (3)
Сортировать по:
Сначала старые