Yandex Message Queue
Для работы с Yandex Message Queue в Rust используется SDK от Amazon AWS: aws-sdk-rust.
Инициализация
Подключаем необходимые библиотеки:
[dependencies] aws-config = { version = "=0.56.0" } aws-sdk-sqs = "0.29" aws-credential-types = "=0.56.0"
Последние версии библиотек не подходят для работы с Yandex Message Queue по причине того, что Amazon перешел на использование Json протокола сообщений вместо XML. При использовании свежих версий библиотек возникает ошибка:
<Message>Action param was not found.</Message>
Пример кода для инициализации клиента:
use aws_config::environment::EnvironmentVariableCredentialsProvider; use aws_sdk_sqs as sqs; use sqs::{config::Region, Client}; async fn init_ymq() -> aws_sdk_sqs::Client { let sdk_config = ::aws_config::load_from_env().await; let config = sqs::config::Builder::from(&sdk_config) .credentials_provider(EnvironmentVariableCredentialsProvider::new()) .region(Region::new("ru-central1")) .endpoint_url("https://message-queue.api.cloud.yandex.net/") .build(); let client = Client::from_conf(config); client }
Ключи для доступа к облаку
Данный код предполагает наличие переменных среды окружения:
AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXXXXX AWS_SECRET_ACCESS_KEY=YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY
Для получения этих ключей:
- Создайте сервисный аккаунт с ролью editor
- Создайте статические ключи доступа (документация)
yc iam access-key create --service-account-name my-robot --description "ключ для работы с очередью"
После выполнения команды вы получаете:
access_key: id: ajeml05rc31dkj0apn6l service_account_id: ajegupruud35jpdug83d created_at: "2024-04-07T12:07:33.209621763Z" description: ключ для работы с очередью key_id: XXXXXXXXXXXXXXXXX secret: YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY
Значение key_id помещаем в AWS_ACCESS_KEY_ID, а secret — в AWS_SECRET_ACCESS_KEY.
Публикация сообщений в очередь
Для работы с очередью нужен URL очереди вида https://message-queue.api.cloud.yandex.net/xxxxxxxxxxxx/xxxxxxxxxxxxxxxxxxx/<queue_name>, узнать который можно в консоли Яндекс Облака.
use aws_sdk_sqs as sqs; use serde::Serialize; use thiserror::Error; #[derive(Error, Debug)] pub enum AppError { // ... #[error("Ошибка при работе с очередью: {0}")] YmqError(String), } async fn publish_message<T: Serialize>( client: &sqs::Client, queue_url: &str, message: T, ) -> Result<(), AppError> { let body = serde_json::to_string(&message)?; client .send_message() .queue_url(queue_url) .message_body(&body) .send() .await .map_err(|e| AppError::YmqError(e.to_string()))?; Ok(()) }
Здесь мы используем крейт thiserror для унификации ошибок.
Чтение сообщений из очереди
Для чтения сообщений используется метод receive_message, который помогает получить сообщения из очереди (до 10 шт за раз):
use aws_sdk_sqs as sqs; use serde::de::DeserializeOwned; async fn read_messages<T: DeserializeOwned>( client: &sqs::Client, queue_url: &str, ) -> Result<Vec<T>, AppError> { match client .receive_message() .max_number_of_messages(10) .queue_url(queue_url) .send() .await .map_err(|e| AppError::YmqError(e.to_string()))? .messages { Some(messages) => { let items: Vec<_> = messages .into_iter() .filter_map(|m| { m.body .map(|b| serde_json::from_str(&b).ok()) .flatten() }) .collect(); Ok(items) } None => Ok(vec![]), } }