diff options
author | sommerfeld <sommerfeld@sommerfeld.dev> | 2024-06-30 23:22:10 +0100 |
---|---|---|
committer | sommerfeld <sommerfeld@sommerfeld.dev> | 2024-06-30 23:22:10 +0100 |
commit | d2cc03d3bb969c6d2ed57a443e17218a26cf7292 (patch) | |
tree | 5e00a18455d51b19a3e90f14ccc89c69f9399162 | |
parent | 50185514c9063ba064920b35218a0f819c5d217c (diff) | |
download | viewercount-d2cc03d3bb969c6d2ed57a443e17218a26cf7292.tar.gz viewercount-d2cc03d3bb969c6d2ed57a443e17218a26cf7292.tar.bz2 viewercount-d2cc03d3bb969c6d2ed57a443e17218a26cf7292.zip |
Fix updating outdated events
Since we updated anything matching the filters, we were some times
bringing old versions of replaceable events back to life. The issue is
that those were events that should be discarded since they are outdated
but we still pick them up from some relays.
Thus we need to index them by their naddr coordinate and only keep the
latest event per naddr.
I also improved logging along the way.
-rw-r--r-- | src/main.rs | 63 |
1 files changed, 36 insertions, 27 deletions
diff --git a/src/main.rs b/src/main.rs index 01657a6..9984791 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, fs::{remove_file, File}, io::{stdin, BufReader, Write}, net::IpAddr, @@ -307,33 +308,42 @@ fn get_https_connected_ips() -> Result<Vec<IpAddr>> { async fn update_count(client: &Client, filters: &[Filter], count: u64) -> Result<()> { let events = get_relevant_events(client, filters).await?; - if events.is_empty() { - warn!("no live events") - } else { - let new_events = get_updated_events(events, client, count).await; + warn!("no live events"); + return Ok(()); + } - if new_events.is_empty() { - warn!("no events to update") - } else { - info!("broadcasting {} updated events", new_events.len()); - client - .batch_event(new_events, RelaySendOptions::new()) - .await?; - info!("updated events broadcasted"); - } + let new_events = get_updated_events(events, client, count).await; + if new_events.is_empty() { + return Ok(()); } + + info!("broadcasting {} updated events", new_events.len()); + client + .batch_event(new_events, RelaySendOptions::new()) + .await?; Ok(()) } async fn get_relevant_events(client: &Client, filters: &[Filter]) -> Result<Vec<Event>> { - let events = client - .get_events_of(filters.to_vec(), None) - .await? - .into_iter() + let events = client.get_events_of(filters.to_vec(), None).await?; + + let mut events_by_naddr: HashMap<String, Event> = Default::default(); + for event in events { + let naddr = get_naddr(&event).to_string(); + if let Some(dup_event) = events_by_naddr.get_mut(&naddr) { + if event.created_at() > dup_event.created_at() { + *dup_event = event; + } + } else { + events_by_naddr.insert(naddr, event); + } + } + + Ok(events_by_naddr + .into_values() .filter(|e| e.tags().iter().any(is_live_event_live)) - .collect::<Vec<_>>(); - Ok(events) + .collect()) } fn is_live_event_live(t: &Tag) -> bool { @@ -362,20 +372,15 @@ async fn get_updated_events(events: Vec<Event>, client: &Client, count: u64) -> } async fn get_event_with_updated_count(event: &Event, client: &Client, count: u64) -> Result<Event> { + let naddr = get_naddr(event).to_bech32()?; let new_event = client .sign_event_builder(get_event_builder_with_updated_count(event, count)) .await - .context(format!( - "cannot sign {}", - get_ellipsed(&event.id().to_bech32()?) - ))?; + .context(format!("cannot sign {naddr}"))?; + info!("updating {naddr}"); Ok(new_event) } -fn get_ellipsed(input: &str) -> String { - format!("{}...{}", &input[..6], &input[input.len() - 6..]) -} - fn get_event_builder_with_updated_count(event: &Event, count: u64) -> EventBuilder { let mut tags: Vec<Tag> = event .clone() @@ -386,3 +391,7 @@ fn get_event_builder_with_updated_count(event: &Event, count: u64) -> EventBuild let event_builder = EventBuilder::new(event.kind(), event.content(), tags); event_builder } + +fn get_naddr(event: &Event) -> Coordinate { + Coordinate::new(event.kind(), event.pubkey).identifier(event.identifier().unwrap_or_default()) +} |