Yandex Message Queue

Для работы с Yandex Message Queue в Rust используется SDK от Amazon AWS: aws-sdk-rust.

Инициализация

Подключаем необходимые библиотеки:

toml
[dependencies]
aws-config = { version = "=0.56.0" }
aws-sdk-sqs = "0.29"
aws-credential-types = "=0.56.0"

Последние версии библиотек не подходят для работы с Yandex Message Queue по причине того, что Amazon перешел на использование Json протокола сообщений вместо XML. При использовании свежих версий библиотек возникает ошибка:

text
<Message>Action param was not found.</Message>

Пример кода для инициализации клиента:

rust
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_sdk_sqs as sqs;
use sqs::{config::Region, Client};

async fn init_ymq() -> aws_sdk_sqs::Client {
    let sdk_config = ::aws_config::load_from_env().await;
    let config = sqs::config::Builder::from(&sdk_config)
        .credentials_provider(EnvironmentVariableCredentialsProvider::new())
        .region(Region::new("ru-central1"))
        .endpoint_url("https://message-queue.api.cloud.yandex.net/")
        .build();

    let client = Client::from_conf(config);
    client
}

Ключи для доступа к облаку

Данный код предполагает наличие переменных среды окружения:

bash
AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXXXXX
AWS_SECRET_ACCESS_KEY=YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY

Для получения этих ключей:

  • Создайте сервисный аккаунт с ролью editor
  • Создайте статические ключи доступа (документация)
bash
yc iam access-key create --service-account-name my-robot --description "ключ для работы с очередью"

После выполнения команды вы получаете:

text
access_key:
  id: ajeml05rc31dkj0apn6l
  service_account_id: ajegupruud35jpdug83d
  created_at: "2024-04-07T12:07:33.209621763Z"
  description: ключ для работы с очередью
  key_id: XXXXXXXXXXXXXXXXX
secret: YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY

Значение key_id помещаем в AWS_ACCESS_KEY_ID, а secret — в AWS_SECRET_ACCESS_KEY.

Публикация сообщений в очередь

Для работы с очередью нужен URL очереди вида https://message-queue.api.cloud.yandex.net/xxxxxxxxxxxx/xxxxxxxxxxxxxxxxxxx/<queue_name>, узнать который можно в консоли Яндекс Облака.

rust
use aws_sdk_sqs as sqs;
use serde::Serialize;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum AppError {
    // ...

    #[error("Ошибка при работе с очередью: {0}")]
    YmqError(String),
}

async fn publish_message<T: Serialize>(
    client: &sqs::Client,
    queue_url: &str,
    message: T,
) -> Result<(), AppError> {
    let body = serde_json::to_string(&message)?;
    client
        .send_message()
        .queue_url(queue_url)
        .message_body(&body)
        .send()
        .await
        .map_err(|e| AppError::YmqError(e.to_string()))?;
    Ok(())
}

Здесь мы используем крейт thiserror для унификации ошибок.

Чтение сообщений из очереди

Для чтения сообщений используется метод receive_message, который помогает получить сообщения из очереди (до 10 шт за раз):

rust
use aws_sdk_sqs as sqs;
use serde::de::DeserializeOwned;

async fn read_messages<T: DeserializeOwned>(
    client: &sqs::Client,
    queue_url: &str,
) -> Result<Vec<T>, AppError> {
    match client
        .receive_message()
        .max_number_of_messages(10)
        .queue_url(queue_url)
        .send()
        .await
        .map_err(|e| AppError::YmqError(e.to_string()))?
        .messages
    {
        Some(messages) => {
            let items: Vec<_> = messages
                .into_iter()
                .filter_map(|m| {
                    m.body
                        .map(|b| serde_json::from_str(&b).ok())
                        .flatten()
                })
                .collect();

            Ok(items)
        }
        None => Ok(vec![]),
    }
}