refactor: add tracing and logging

This commit is contained in:
Moritz Böhme 2025-10-03 17:04:24 +02:00
parent 80a6cefbeb
commit d2fa95d1e4
No known key found for this signature in database
GPG key ID: 970C6E89EB0547A9
3 changed files with 124 additions and 19 deletions

View file

@ -6,10 +6,12 @@ use readability_rust::{ReadabilityFlags, ReadabilityOptions};
use reqwest::{Client, Url};
use rss::Channel;
use tokio::{task::JoinSet, time::sleep};
use tracing::{Instrument, debug, info, info_span, instrument};
use warp::Filter;
const REQUEST_DELAY: Duration = Duration::from_secs(1);
#[instrument(skip(client), err)]
async fn get_feed(url: String, client: &Client) -> Result<Channel> {
let url = urlencoding::decode(&url)?.into_owned();
let content = client.get(url).send().await?.bytes().await?;
@ -23,33 +25,35 @@ fn get_domain(item: &rss::Item) -> Option<String> {
.and_then(|parsed| parsed.domain().map(|domain| domain.to_string()))
}
async fn complete(mut channel: Channel, client: &Client) -> Result<Box<Channel>> {
let grouped: Vec<Vec<rss::Item>> = channel
#[instrument(skip_all)]
async fn complete_channel(mut channel: Channel, client: &Client) -> Result<Box<Channel>> {
let grouped: Vec<(Option<String>, Vec<rss::Item>)> = channel
.items()
.iter()
.sorted_by(|a, b| get_domain(a).cmp(&get_domain(b)))
.chunk_by(|&item| get_domain(item))
.into_iter()
.map(|(_k, v)| v.cloned().collect())
.map(|(k, v)| (k, v.cloned().collect()))
.collect();
let mut set = JoinSet::new();
for mut items in grouped.into_iter() {
let client = client.clone();
set.spawn(async move {
for (index, item) in &mut items.iter_mut().enumerate() {
if index > 0 {
sleep(REQUEST_DELAY).await;
}
info!(
num_items = channel.items().len(),
num_groups = grouped.len(),
);
if let Some(ref link) = item.link
&& let Ok(content) = get_content(link, &client.clone()).await
{
item.set_description(content);
let mut set = JoinSet::new();
for (website, mut items) in grouped.into_iter() {
let client = client.clone();
let num_items = items.len();
set.spawn(
async move {
for (index, item) in &mut items.iter_mut().enumerate() {
get_item(&client, index, item).await;
}
items
}
items
});
.instrument(info_span!("get_website", website, num_items)),
);
}
let items: Vec<rss::Item> = set.join_all().await.concat();
@ -58,6 +62,22 @@ async fn complete(mut channel: Channel, client: &Client) -> Result<Box<Channel>>
Ok(Box::new(channel))
}
#[instrument(skip(client, item))]
async fn get_item(client: &Client, index: usize, item: &mut rss::Item) {
if index > 0 {
debug!("delaying_next_request");
sleep(REQUEST_DELAY).await;
}
if let Some(ref link) = item.link
&& let Ok(content) = get_content(link, &client.clone()).await
{
debug!(content);
item.set_description(content);
}
}
#[instrument(skip(client), err)]
async fn get_content(link: &str, client: &Client) -> Result<String> {
let response = client.get(link).send().await?;
let mut readablity = readability_rust::Readability::new(
@ -95,13 +115,17 @@ pub(crate) fn custom_reject(error: impl Into<anyhow::Error>) -> warp::Rejection
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let client = Client::new();
let path = warp::path!(String)
.and_then(move |url| {
let client = client.clone();
async move {
let feed = get_feed(url, &client).await.map_err(custom_reject)?;
let updated = complete(feed, &client).await.map_err(custom_reject)?;
let updated = complete_channel(feed, &client)
.await
.map_err(custom_reject)?;
Ok::<String, warp::Rejection>(format!("{}", updated))
}
})