Yandex Message Queue
Для работы с Yandex Message Queue в rust (и не только) используется SDK от Amazon AWS: aws-sdk-rust.
Инициализация
Подключаем необходимые библиотеки
[dependencies]
aws-config = { version = "=0.56.0" }
aws-sdk-sqs = "0.29"
aws-credential-types="=0.56.0"
Последние версии библиотек не подходят для работы с Yandex Message Queue по причине того, что Amazon перешел на использование Json протокола сообщений вместо XML. При использованиие свежих версий библиотек возникает ошибка:
<Message>Action param was not found.</Message>
Пример кода для инициализации клиента:
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_sdk_sqs as sqs;
use sqs::{config::Region, Client};
async fn init_ymq() -> aws_sdk_sqs::Client {
let sdk_config = ::aws_config::load_from_env().await;
let config = sqs::config::Builder::from(&sdk_config)
.credentials_provider(EnvironmentVariableCredentialsProvider::new())
.region(Region::new("ru-central1"))
.endpoint_url("https://message-queue.api.cloud.yandex.net/")
.build();
let client = Client::from_conf(config);
client
}
Ключи для доступа к облаку
Данный код предполагает наличие переменных среды окружения
AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXXXXX
AWS_SECRET_ACCESS_KEY=YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY
Для получения этих ключей:
- Создайте сервисный аккаунт с ролью editor
- Создайте статические ключи доступа (документация)
yc iam access-key create --service-account-name my-robot --description "ключ для работы с очередью"
после выполнения команды, вы получаете:
access_key:
id: ajeml05rc31dkj0apn6l
service_account_id: ajegupruud35jpdug83d
created_at: "2024-04-07T12:07:33.209621763Z"
description: ключ для работы с очередью
key_id: XXXXXXXXXXXXXXXXX
secret: YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY
Значение key_id помещаем в AWS_ACCESS_KEY_ID, а secret - в AWS_SECRET_ACCESS_KEY.
Публикация сообщений в очередь
Для работы с очередью, нужен url очереди вида
https://message-queue.api.cloud.yandex.net/xxxxxxxxxxxx/xxxxxxxxxxxxxxxxxxx/<queue_name>
узнать который вы можете в консоли Яндекс Облака
use aws_sdk_sqs as sqs;
use serde::Serialize;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
// ...
#[error("Ошибка при работе с очередью: {0}")]
YmqError(String),
}
async fn publish_message<T: Serialize>(client: &sqs::Client, queue_url: &str, message: T)->Result<(),AppError>{
let body = serde_json::to_string(&message)?;
client
.send_message()
.queue_url(queue_url)
.message_body(&body)
.send()
.await
.map_err(|e|
AppError::YmqError(e.to_string())
)?;
Ok(())
}
Здесь мы используем крейт thiserror для унификации ошибок.
Чтение сообщений из очереди
Для чтения сообщений используется метод receive_message
, который помогает получить сообщения из очереди (до 10 шт за раз):
use aws_sdk_sqs as sqs;
use serde::de::DeserializeOwned;
use error::AppError;
async fn read_messages<T: DeserializeOwned>(client: &sqs::Client, queue_url: &str)->Result<Vec<T>,AppError>{
match client
.receive_message()
.max_number_of_messages(10)
.queue_url(queue_url)
.send()
.await
.map_err(|e|
AppError::YmqError(e.to_string())
)?.messages{
Some(messages) => {
let items: Vec<_>= messages
.into_iter()
.filter_map(|m|
m.body.map(|b|
serde_json::from_str(&b).ok()
).flatten()
).collect();
Ok(items)
}
None => {
Ok(vec![])
}
}
}