From ef34f0bb4798a0d1410fffc974f084e4c6cfb920 Mon Sep 17 00:00:00 2001 From: sommerfeld Date: Fri, 28 Jun 2024 14:53:43 +0100 Subject: Initial commit --- src/main.rs | 384 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 384 insertions(+) create mode 100644 src/main.rs (limited to 'src') diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..e82cfee --- /dev/null +++ b/src/main.rs @@ -0,0 +1,384 @@ +use std::{ + fs::{remove_file, File}, + io::{stdin, BufReader, Write}, + net::IpAddr, + path::PathBuf, + process::exit, + time::Duration, +}; + +use anyhow::{anyhow, Context, Result}; +use clap::{command, Parser}; +use futures::future::join_all; +use human_panic::setup_panic; +use log::{error, info, warn}; +use nostr_relay_pool::RelaySendOptions; +use nostr_sdk::{ + nips::{ + nip01::Coordinate, + nip46::NostrConnectURI, + nip53::LiveEventStatus, + nip65::{self, RelayMetadata}, + }, + Client, Event, EventBuilder, Filter, Keys, Kind, Tag, TagKind, TagStandard, ToBech32, +}; +use nostr_signer::{Nip46Signer, NostrSigner}; +use procfs::net::TcpState; +use serde::{Deserialize, Serialize}; +use serde_json::{from_reader, to_string}; +use tokio::{ + signal::unix::{signal, SignalKind}, + spawn, + time::sleep, +}; + +const DEFAULT_WATCH_INTERVAL_SEC: u64 = 60; +const NIP46_TIMEOUT_SEC: u64 = 60; +const USERKIND_RELAY: &str = "wss://purplepag.es"; +// Ref: https://github.com/v0l/zap.stream/blob/f369faf9c0242f0dd7f6cfff52547f86e20127fc/src/const.ts#L27-L32 +const ZAP_STREAM_RELAYS: &[&str] = &[ + "wss://relay.snort.social", + "wss://nos.lol", + "wss://relay.damus.io", + // This one is a paid relay so it should not be in the default list + //"wss://nostr.wine", + // This last one is not in zap.stream's default list, but it's still useful to make sure the + // updates get spread out as much as possible + "wss://nostr.mutinywallet.com", +]; +const TCP_CHECK_DELAY_SEC: u64 = 1; + +#[derive(Parser, Debug)] +#[command(version, about)] +struct Args { + /// watch interval in seconds + #[arg(short, long, default_value_t = DEFAULT_WATCH_INTERVAL_SEC)] + interval: u64, + /// remove previously cached NIP46 signer credentials and ask for new ones + #[arg(long)] + reset_nip46: bool, + /// specific naddrs of Live Events to update, if none, all user authored Live Events that are + /// 'live' will be updated + naddrs: Vec, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +struct ClientData { + client_nsec: String, + nip46_uri: String, +} + +impl ClientData { + async fn generate() -> Result { + let keys = Keys::generate(); + println!("Paste NSECBUNKER URI (this only needs to be done once):"); + let mut line = String::new(); + stdin().read_line(&mut line)?; + let nip46_uri = line.trim_end().to_string(); + + Ok(ClientData { + client_nsec: keys.secret_key()?.to_bech32()?, + nip46_uri, + }) + } +} + +#[tokio::main] +async fn main() { + if let Err(e) = do_main().await { + error!("{:#}", e); + exit(1); + } +} + +async fn do_main() -> Result<()> { + setup_panic!(); + let args = Args::parse(); + set_logger(); + set_signal_handlers().context("failed to setup a signal termination handler")?; + + let client = setup_nostr_client( + args.reset_nip46, + Duration::from_secs(NIP46_TIMEOUT_SEC), + ZAP_STREAM_RELAYS, + ) + .await?; + info!("nostr client connected"); + + watch_count(client, &args.naddrs, Duration::from_secs(args.interval)).await?; + Ok(()) +} + +fn set_logger() { + let mut builder = pretty_env_logger::formatted_timed_builder(); + match std::env::var("RUST_LOG") { + Ok(_) => builder.parse_default_env(), + Err(_) => builder.filter_module(env!("CARGO_PKG_NAME"), log::LevelFilter::Info), + }; + builder.init(); +} + +fn set_signal_handlers() -> Result<()> { + tokio::spawn(async move { + if let Err(e) = tokio::signal::ctrl_c().await { + return e; + } + warn!("received ctrl-c signal. Exiting..."); + exit(0) + }); + #[cfg(unix)] + tokio::spawn(async move { + let mut stream = match signal(SignalKind::terminate()) { + Err(e) => return e, + Ok(s) => s, + }; + stream.recv().await; + warn!("received process termination signal. Exiting..."); + exit(0) + }); + #[cfg(unix)] + tokio::spawn(async move { + let mut stream = match signal(SignalKind::hangup()) { + Err(e) => return e, + Ok(s) => s, + }; + stream.recv().await; + warn!("received process hangup signal. Exiting..."); + exit(0) + }); + Ok(()) +} + +async fn setup_nostr_client( + reset_nip46: bool, + nip46_timeout: Duration, + client_relays: &[&str], +) -> Result { + let signer = create_signer(reset_nip46, nip46_timeout).await?; + let signer_pubkey = signer.signer_public_key(); + let client = Client::new(NostrSigner::from(signer)); + + setup_client_relays(&client, signer_pubkey, client_relays).await?; + + info!("connecting to client"); + client.connect().await; + Ok(client) +} + +async fn create_signer(reset_nip46: bool, nip46_timeout: Duration) -> Result { + if reset_nip46 { + let _ = remove_file(get_client_data_path()); + } + let client_data = get_or_generate_client_data().await?; + let client_keys = Keys::parse(client_data.client_nsec)?; + info!("setting up NIP46 signer"); + let signer = Nip46Signer::new( + NostrConnectURI::parse(client_data.nip46_uri)?, + client_keys, + nip46_timeout, + None, + ) + .await?; + Ok(signer) +} + +async fn get_or_generate_client_data() -> Result { + let path = get_client_data_path(); + match File::open(&path) { + Ok(file) => { + let reader = BufReader::new(file); + from_reader(reader) + .with_context(|| format!("cannot read client data from '{}'", path.display())) + } + Err(_) => { + let nostr_data = ClientData::generate().await?; + let mut file = File::create(&path)?; + file.write_all(to_string(&nostr_data)?.as_bytes()) + .with_context(|| format!("could not write client data to '{}'", path.display()))?; + Ok(nostr_data) + } + } +} + +fn get_client_data_path() -> PathBuf { + dirs::data_local_dir() + .unwrap_or(PathBuf::from("data")) + .join(format!("{}.json", env!("CARGO_PKG_NAME"))) +} + +async fn setup_client_relays( + client: &Client, + signer_pubkey: nostr_sdk::PublicKey, + client_relays: &[&str], +) -> Result<(), anyhow::Error> { + client.add_relay(USERKIND_RELAY).await?; + client.connect().await; + let filter = Filter::new().author(signer_pubkey).kind(Kind::RelayList); + let events: Vec = client + .get_events_of(vec![filter], Some(Duration::from_secs(10))) + .await?; + let event = events.first().ok_or(anyhow!("no matching events"))?; + info!("using NIP65 user outbox relays"); + let nip65_relays = nip65::extract_relay_list(event) + .iter() + .filter_map(|x| { + let url = x.0; + let meta = x.1; + if meta.as_ref().is_some_and(|m| *m == RelayMetadata::Write) || meta.is_none() { + return Some(url.to_string()); + } + None + }) + .collect::>(); + let mut all_relays = nip65_relays + .iter() + .map(|r| r.trim_end_matches('/')) + .collect::>(); + all_relays.extend_from_slice(client_relays); + all_relays.sort_unstable(); + all_relays.dedup(); + info!("using relays {:?}", all_relays); + client.remove_relay(USERKIND_RELAY).await?; + client.add_relays(all_relays).await?; + Ok(()) +} + +async fn watch_count(client: Client, naddrs: &[String], interval: Duration) -> Result<()> { + let filters = create_filters(naddrs, &client).await?; + loop { + let handle = spawn(sleep(interval)); + let count = get_count().await.context("failed ot read count")?; + info!("count: {}", count); + + update_count(&client, &filters, count).await?; + + info!(" waiting for new cycle in {:#?}", interval); + // wait the rest of the watch interval + handle.await?; + } +} + +async fn create_filters(naddrs: &[String], client: &Client) -> Result> { + Ok(match naddrs.is_empty() { + true => vec![Filter::new() + .author(client.signer().await?.public_key().await?) + .kind(Kind::LiveEvent)], + false => naddrs + .iter() + .map(|naddr| { + Filter::from(Coordinate::parse(naddr).unwrap_or_else(|_| { + error!("{naddr} is not a valid replaceable event coordinate"); + exit(1); + })) + }) + .collect::>(), + }) +} + +async fn get_count() -> Result { + get_count_from_procfs().await +} + +async fn get_count_from_procfs() -> Result { + let mut tcp = get_https_connected_ips()?; + sleep(Duration::from_secs(TCP_CHECK_DELAY_SEC)).await; + tcp.append(&mut get_https_connected_ips()?); + tcp.sort_unstable(); + tcp.dedup(); + Ok(tcp.len() as u64) +} + +fn get_https_connected_ips() -> Result> { + Ok(procfs::net::tcp()? + .into_iter() + .chain(procfs::net::tcp6()?) + .filter_map(|t| { + if t.local_address.port() == 443 && t.state == TcpState::Established { + return Some(t.local_address.ip()); + } + None + }) + .collect::>()) +} + +async fn update_count(client: &Client, filters: &[Filter], count: u64) -> Result<()> { + let events = get_relevant_events(client, filters).await?; + + if events.is_empty() { + warn!("no live events") + } else { + let new_events = get_updated_events(events, client, count).await; + + if new_events.is_empty() { + warn!("no events to update") + } else { + info!("broadcasting {} updated events", new_events.len()); + client + .batch_event(new_events, RelaySendOptions::new()) + .await?; + info!("updated events broadcasted"); + } + } + Ok(()) +} + +async fn get_relevant_events(client: &Client, filters: &[Filter]) -> Result> { + let events = client + .get_events_of(filters.to_vec(), None) + .await? + .into_iter() + .filter(|e| e.tags().iter().any(is_live_event_live)) + .collect::>(); + Ok(events) +} + +fn is_live_event_live(t: &Tag) -> bool { + t.as_standardized().is_some_and(|ts| match ts { + TagStandard::LiveEventStatus(s) => *s == LiveEventStatus::Live, + _ => false, + }) +} + +async fn get_updated_events(events: Vec, client: &Client, count: u64) -> Vec { + join_all( + events + .iter() + .map(|e| get_event_with_updated_count(e, client, count)), + ) + .await + .into_iter() + .filter_map(|event| match event { + Ok(ev) => Some(ev), + Err(e) => { + error!("event could not be updated: {}", e); + None + } + }) + .collect::>() +} + +async fn get_event_with_updated_count(event: &Event, client: &Client, count: u64) -> Result { + let new_event = client + .sign_event_builder(get_event_builder_with_updated_count(event, count)) + .await + .context(format!( + "cannot sign {}", + get_ellipsed(&event.id().to_bech32()?) + ))?; + Ok(new_event) +} + +fn get_ellipsed(input: &str) -> String { + format!("{}...{}", &input[..6], &input[input.len() - 6..]) +} + +fn get_event_builder_with_updated_count(event: &Event, count: u64) -> EventBuilder { + let mut tags: Vec = event + .clone() + .into_iter_tags() + .filter(|t| t.kind() != TagKind::CurrentParticipants) + .collect(); + tags.push(TagStandard::CurrentParticipants(count).into()); + let event_builder = EventBuilder::new(event.kind(), event.content(), tags); + event_builder +} -- cgit v1.2.3-70-g09d2