aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLibravatar sommerfeld <sommerfeld@sommerfeld.dev>2024-06-28 14:53:43 +0100
committerLibravatar sommerfeld <sommerfeld@sommerfeld.dev>2024-06-28 14:53:43 +0100
commitef34f0bb4798a0d1410fffc974f084e4c6cfb920 (patch)
tree927965d3de187e2af5d594e6018b635a416ef6fb /src
downloadviewercount-ef34f0bb4798a0d1410fffc974f084e4c6cfb920.tar.gz
viewercount-ef34f0bb4798a0d1410fffc974f084e4c6cfb920.tar.bz2
viewercount-ef34f0bb4798a0d1410fffc974f084e4c6cfb920.zip
Initial commit
Diffstat (limited to 'src')
-rw-r--r--src/main.rs384
1 files changed, 384 insertions, 0 deletions
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..e82cfee
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,384 @@
+use std::{
+ fs::{remove_file, File},
+ io::{stdin, BufReader, Write},
+ net::IpAddr,
+ path::PathBuf,
+ process::exit,
+ time::Duration,
+};
+
+use anyhow::{anyhow, Context, Result};
+use clap::{command, Parser};
+use futures::future::join_all;
+use human_panic::setup_panic;
+use log::{error, info, warn};
+use nostr_relay_pool::RelaySendOptions;
+use nostr_sdk::{
+ nips::{
+ nip01::Coordinate,
+ nip46::NostrConnectURI,
+ nip53::LiveEventStatus,
+ nip65::{self, RelayMetadata},
+ },
+ Client, Event, EventBuilder, Filter, Keys, Kind, Tag, TagKind, TagStandard, ToBech32,
+};
+use nostr_signer::{Nip46Signer, NostrSigner};
+use procfs::net::TcpState;
+use serde::{Deserialize, Serialize};
+use serde_json::{from_reader, to_string};
+use tokio::{
+ signal::unix::{signal, SignalKind},
+ spawn,
+ time::sleep,
+};
+
+const DEFAULT_WATCH_INTERVAL_SEC: u64 = 60;
+const NIP46_TIMEOUT_SEC: u64 = 60;
+const USERKIND_RELAY: &str = "wss://purplepag.es";
+// Ref: https://github.com/v0l/zap.stream/blob/f369faf9c0242f0dd7f6cfff52547f86e20127fc/src/const.ts#L27-L32
+const ZAP_STREAM_RELAYS: &[&str] = &[
+ "wss://relay.snort.social",
+ "wss://nos.lol",
+ "wss://relay.damus.io",
+ // This one is a paid relay so it should not be in the default list
+ //"wss://nostr.wine",
+ // This last one is not in zap.stream's default list, but it's still useful to make sure the
+ // updates get spread out as much as possible
+ "wss://nostr.mutinywallet.com",
+];
+const TCP_CHECK_DELAY_SEC: u64 = 1;
+
+#[derive(Parser, Debug)]
+#[command(version, about)]
+struct Args {
+ /// watch interval in seconds
+ #[arg(short, long, default_value_t = DEFAULT_WATCH_INTERVAL_SEC)]
+ interval: u64,
+ /// remove previously cached NIP46 signer credentials and ask for new ones
+ #[arg(long)]
+ reset_nip46: bool,
+ /// specific naddrs of Live Events to update, if none, all user authored Live Events that are
+ /// 'live' will be updated
+ naddrs: Vec<String>,
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+struct ClientData {
+ client_nsec: String,
+ nip46_uri: String,
+}
+
+impl ClientData {
+ async fn generate() -> Result<Self> {
+ let keys = Keys::generate();
+ println!("Paste NSECBUNKER URI (this only needs to be done once):");
+ let mut line = String::new();
+ stdin().read_line(&mut line)?;
+ let nip46_uri = line.trim_end().to_string();
+
+ Ok(ClientData {
+ client_nsec: keys.secret_key()?.to_bech32()?,
+ nip46_uri,
+ })
+ }
+}
+
+#[tokio::main]
+async fn main() {
+ if let Err(e) = do_main().await {
+ error!("{:#}", e);
+ exit(1);
+ }
+}
+
+async fn do_main() -> Result<()> {
+ setup_panic!();
+ let args = Args::parse();
+ set_logger();
+ set_signal_handlers().context("failed to setup a signal termination handler")?;
+
+ let client = setup_nostr_client(
+ args.reset_nip46,
+ Duration::from_secs(NIP46_TIMEOUT_SEC),
+ ZAP_STREAM_RELAYS,
+ )
+ .await?;
+ info!("nostr client connected");
+
+ watch_count(client, &args.naddrs, Duration::from_secs(args.interval)).await?;
+ Ok(())
+}
+
+fn set_logger() {
+ let mut builder = pretty_env_logger::formatted_timed_builder();
+ match std::env::var("RUST_LOG") {
+ Ok(_) => builder.parse_default_env(),
+ Err(_) => builder.filter_module(env!("CARGO_PKG_NAME"), log::LevelFilter::Info),
+ };
+ builder.init();
+}
+
+fn set_signal_handlers() -> Result<()> {
+ tokio::spawn(async move {
+ if let Err(e) = tokio::signal::ctrl_c().await {
+ return e;
+ }
+ warn!("received ctrl-c signal. Exiting...");
+ exit(0)
+ });
+ #[cfg(unix)]
+ tokio::spawn(async move {
+ let mut stream = match signal(SignalKind::terminate()) {
+ Err(e) => return e,
+ Ok(s) => s,
+ };
+ stream.recv().await;
+ warn!("received process termination signal. Exiting...");
+ exit(0)
+ });
+ #[cfg(unix)]
+ tokio::spawn(async move {
+ let mut stream = match signal(SignalKind::hangup()) {
+ Err(e) => return e,
+ Ok(s) => s,
+ };
+ stream.recv().await;
+ warn!("received process hangup signal. Exiting...");
+ exit(0)
+ });
+ Ok(())
+}
+
+async fn setup_nostr_client(
+ reset_nip46: bool,
+ nip46_timeout: Duration,
+ client_relays: &[&str],
+) -> Result<Client> {
+ let signer = create_signer(reset_nip46, nip46_timeout).await?;
+ let signer_pubkey = signer.signer_public_key();
+ let client = Client::new(NostrSigner::from(signer));
+
+ setup_client_relays(&client, signer_pubkey, client_relays).await?;
+
+ info!("connecting to client");
+ client.connect().await;
+ Ok(client)
+}
+
+async fn create_signer(reset_nip46: bool, nip46_timeout: Duration) -> Result<Nip46Signer> {
+ if reset_nip46 {
+ let _ = remove_file(get_client_data_path());
+ }
+ let client_data = get_or_generate_client_data().await?;
+ let client_keys = Keys::parse(client_data.client_nsec)?;
+ info!("setting up NIP46 signer");
+ let signer = Nip46Signer::new(
+ NostrConnectURI::parse(client_data.nip46_uri)?,
+ client_keys,
+ nip46_timeout,
+ None,
+ )
+ .await?;
+ Ok(signer)
+}
+
+async fn get_or_generate_client_data() -> Result<ClientData> {
+ let path = get_client_data_path();
+ match File::open(&path) {
+ Ok(file) => {
+ let reader = BufReader::new(file);
+ from_reader(reader)
+ .with_context(|| format!("cannot read client data from '{}'", path.display()))
+ }
+ Err(_) => {
+ let nostr_data = ClientData::generate().await?;
+ let mut file = File::create(&path)?;
+ file.write_all(to_string(&nostr_data)?.as_bytes())
+ .with_context(|| format!("could not write client data to '{}'", path.display()))?;
+ Ok(nostr_data)
+ }
+ }
+}
+
+fn get_client_data_path() -> PathBuf {
+ dirs::data_local_dir()
+ .unwrap_or(PathBuf::from("data"))
+ .join(format!("{}.json", env!("CARGO_PKG_NAME")))
+}
+
+async fn setup_client_relays(
+ client: &Client,
+ signer_pubkey: nostr_sdk::PublicKey,
+ client_relays: &[&str],
+) -> Result<(), anyhow::Error> {
+ client.add_relay(USERKIND_RELAY).await?;
+ client.connect().await;
+ let filter = Filter::new().author(signer_pubkey).kind(Kind::RelayList);
+ let events: Vec<Event> = client
+ .get_events_of(vec![filter], Some(Duration::from_secs(10)))
+ .await?;
+ let event = events.first().ok_or(anyhow!("no matching events"))?;
+ info!("using NIP65 user outbox relays");
+ let nip65_relays = nip65::extract_relay_list(event)
+ .iter()
+ .filter_map(|x| {
+ let url = x.0;
+ let meta = x.1;
+ if meta.as_ref().is_some_and(|m| *m == RelayMetadata::Write) || meta.is_none() {
+ return Some(url.to_string());
+ }
+ None
+ })
+ .collect::<Vec<_>>();
+ let mut all_relays = nip65_relays
+ .iter()
+ .map(|r| r.trim_end_matches('/'))
+ .collect::<Vec<_>>();
+ all_relays.extend_from_slice(client_relays);
+ all_relays.sort_unstable();
+ all_relays.dedup();
+ info!("using relays {:?}", all_relays);
+ client.remove_relay(USERKIND_RELAY).await?;
+ client.add_relays(all_relays).await?;
+ Ok(())
+}
+
+async fn watch_count(client: Client, naddrs: &[String], interval: Duration) -> Result<()> {
+ let filters = create_filters(naddrs, &client).await?;
+ loop {
+ let handle = spawn(sleep(interval));
+ let count = get_count().await.context("failed ot read count")?;
+ info!("count: {}", count);
+
+ update_count(&client, &filters, count).await?;
+
+ info!(" waiting for new cycle in {:#?}", interval);
+ // wait the rest of the watch interval
+ handle.await?;
+ }
+}
+
+async fn create_filters(naddrs: &[String], client: &Client) -> Result<Vec<Filter>> {
+ Ok(match naddrs.is_empty() {
+ true => vec![Filter::new()
+ .author(client.signer().await?.public_key().await?)
+ .kind(Kind::LiveEvent)],
+ false => naddrs
+ .iter()
+ .map(|naddr| {
+ Filter::from(Coordinate::parse(naddr).unwrap_or_else(|_| {
+ error!("{naddr} is not a valid replaceable event coordinate");
+ exit(1);
+ }))
+ })
+ .collect::<Vec<_>>(),
+ })
+}
+
+async fn get_count() -> Result<u64> {
+ get_count_from_procfs().await
+}
+
+async fn get_count_from_procfs() -> Result<u64> {
+ let mut tcp = get_https_connected_ips()?;
+ sleep(Duration::from_secs(TCP_CHECK_DELAY_SEC)).await;
+ tcp.append(&mut get_https_connected_ips()?);
+ tcp.sort_unstable();
+ tcp.dedup();
+ Ok(tcp.len() as u64)
+}
+
+fn get_https_connected_ips() -> Result<Vec<IpAddr>> {
+ Ok(procfs::net::tcp()?
+ .into_iter()
+ .chain(procfs::net::tcp6()?)
+ .filter_map(|t| {
+ if t.local_address.port() == 443 && t.state == TcpState::Established {
+ return Some(t.local_address.ip());
+ }
+ None
+ })
+ .collect::<Vec<_>>())
+}
+
+async fn update_count(client: &Client, filters: &[Filter], count: u64) -> Result<()> {
+ let events = get_relevant_events(client, filters).await?;
+
+ if events.is_empty() {
+ warn!("no live events")
+ } else {
+ let new_events = get_updated_events(events, client, count).await;
+
+ if new_events.is_empty() {
+ warn!("no events to update")
+ } else {
+ info!("broadcasting {} updated events", new_events.len());
+ client
+ .batch_event(new_events, RelaySendOptions::new())
+ .await?;
+ info!("updated events broadcasted");
+ }
+ }
+ Ok(())
+}
+
+async fn get_relevant_events(client: &Client, filters: &[Filter]) -> Result<Vec<Event>> {
+ let events = client
+ .get_events_of(filters.to_vec(), None)
+ .await?
+ .into_iter()
+ .filter(|e| e.tags().iter().any(is_live_event_live))
+ .collect::<Vec<_>>();
+ Ok(events)
+}
+
+fn is_live_event_live(t: &Tag) -> bool {
+ t.as_standardized().is_some_and(|ts| match ts {
+ TagStandard::LiveEventStatus(s) => *s == LiveEventStatus::Live,
+ _ => false,
+ })
+}
+
+async fn get_updated_events(events: Vec<Event>, client: &Client, count: u64) -> Vec<Event> {
+ join_all(
+ events
+ .iter()
+ .map(|e| get_event_with_updated_count(e, client, count)),
+ )
+ .await
+ .into_iter()
+ .filter_map(|event| match event {
+ Ok(ev) => Some(ev),
+ Err(e) => {
+ error!("event could not be updated: {}", e);
+ None
+ }
+ })
+ .collect::<Vec<_>>()
+}
+
+async fn get_event_with_updated_count(event: &Event, client: &Client, count: u64) -> Result<Event> {
+ let new_event = client
+ .sign_event_builder(get_event_builder_with_updated_count(event, count))
+ .await
+ .context(format!(
+ "cannot sign {}",
+ get_ellipsed(&event.id().to_bech32()?)
+ ))?;
+ Ok(new_event)
+}
+
+fn get_ellipsed(input: &str) -> String {
+ format!("{}...{}", &input[..6], &input[input.len() - 6..])
+}
+
+fn get_event_builder_with_updated_count(event: &Event, count: u64) -> EventBuilder {
+ let mut tags: Vec<Tag> = event
+ .clone()
+ .into_iter_tags()
+ .filter(|t| t.kind() != TagKind::CurrentParticipants)
+ .collect();
+ tags.push(TagStandard::CurrentParticipants(count).into());
+ let event_builder = EventBuilder::new(event.kind(), event.content(), tags);
+ event_builder
+}