Merge commit '10025f9fa5503865918cbae2af5366afe7fd7c54' as 'components/mycelium'

This commit is contained in:
2025-08-16 21:12:34 +02:00
132 changed files with 50951 additions and 0 deletions

View File

@@ -0,0 +1,30 @@
use std::net::IpAddr;
use mycelium::crypto::PublicKey;
use serde::Serialize;
#[derive(Debug, Serialize)]
struct InspectOutput {
#[serde(rename = "publicKey")]
public_key: PublicKey,
address: IpAddr,
}
/// Inspect the given pubkey, or the local key if no pubkey is given
pub fn inspect(pubkey: PublicKey, json: bool) -> Result<(), Box<dyn std::error::Error>> {
let address = pubkey.address().into();
if json {
let out = InspectOutput {
public_key: pubkey,
address,
};
let out_string = serde_json::to_string_pretty(&out)?;
println!("{out_string}");
} else {
println!("Public key: {pubkey}");
println!("Address: {address}");
}
Ok(())
}

View File

@@ -0,0 +1,13 @@
mod inspect;
#[cfg(feature = "message")]
mod message;
mod peer;
mod routes;
pub use inspect::inspect;
#[cfg(feature = "message")]
pub use message::{recv_msg, send_msg};
pub use peer::{add_peers, list_peers, remove_peers};
pub use routes::{
list_fallback_routes, list_no_route_entries, list_queried_subnets, list_selected_routes,
};

View File

@@ -0,0 +1,306 @@
use std::{
io::Write,
mem,
net::{IpAddr, SocketAddr},
path::PathBuf,
};
use base64::{
alphabet,
engine::{GeneralPurpose, GeneralPurposeConfig},
Engine,
};
use mycelium::{crypto::PublicKey, message::MessageId, subnet::Subnet};
use serde::{Serialize, Serializer};
use tracing::{debug, error};
use mycelium_api::{MessageDestination, MessageReceiveInfo, MessageSendInfo, PushMessageResponse};
enum Payload {
Readable(String),
NotReadable(Vec<u8>),
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct CliMessage {
id: MessageId,
src_ip: IpAddr,
src_pk: PublicKey,
dst_ip: IpAddr,
dst_pk: PublicKey,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "serialize_payload")]
topic: Option<Payload>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "serialize_payload")]
payload: Option<Payload>,
}
const B64ENGINE: GeneralPurpose = base64::engine::general_purpose::GeneralPurpose::new(
&alphabet::STANDARD,
GeneralPurposeConfig::new(),
);
fn serialize_payload<S: Serializer>(p: &Option<Payload>, s: S) -> Result<S::Ok, S::Error> {
let base64 = match p {
None => None,
Some(Payload::Readable(data)) => Some(data.clone()),
Some(Payload::NotReadable(data)) => Some(B64ENGINE.encode(data)),
};
<Option<String>>::serialize(&base64, s)
}
/// Encode arbitrary data in standard base64.
pub fn encode_base64(input: &[u8]) -> String {
B64ENGINE.encode(input)
}
/// Send a message to a receiver.
#[allow(clippy::too_many_arguments)]
pub async fn send_msg(
destination: String,
msg: Option<String>,
wait: bool,
timeout: Option<u64>,
reply_to: Option<String>,
topic: Option<String>,
msg_path: Option<PathBuf>,
server_addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
if reply_to.is_some() && wait {
error!("Can't wait on a reply for a reply, either use --reply-to or --wait");
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Only one of --reply-to or --wait is allowed",
)
.into());
}
let destination = if destination.len() == 64 {
// Public key in hex format
match PublicKey::try_from(&*destination) {
Err(_) => {
error!("{destination} is not a valid hex encoded public key");
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Invalid hex encoded public key",
)
.into());
}
Ok(pk) => MessageDestination::Pk(pk),
}
} else {
match destination.parse() {
Err(e) => {
error!("{destination} is not a valid IPv6 address: {e}");
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Invalid IPv6 address",
)
.into());
}
Ok(ip) => {
let global_subnet = Subnet::new(
mycelium::GLOBAL_SUBNET_ADDRESS,
mycelium::GLOBAL_SUBNET_PREFIX_LEN,
)
.unwrap();
if !global_subnet.contains_ip(ip) {
error!("{destination} is not a part of {global_subnet}");
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"IPv6 address is not part of the mycelium subnet",
)
.into());
}
MessageDestination::Ip(ip)
}
}
};
// Load msg, files have prio.
let msg = if let Some(path) = msg_path {
match tokio::fs::read(&path).await {
Err(e) => {
error!("Could not read file at {:?}: {e}", path);
return Err(e.into());
}
Ok(data) => data,
}
} else if let Some(msg) = msg {
msg.into_bytes()
} else {
error!("Message is a required argument if `--msg-path` is not provided");
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Message is a required argument if `--msg-path` is not provided",
)
.into());
};
let mut url = format!("http://{server_addr}/api/v1/messages");
if let Some(reply_to) = reply_to {
url.push_str(&format!("/reply/{reply_to}"));
}
if wait {
// A year should be sufficient to wait
let reply_timeout = timeout.unwrap_or(60 * 60 * 24 * 365);
url.push_str(&format!("?reply_timeout={reply_timeout}"));
}
match reqwest::Client::new()
.post(url)
.json(&MessageSendInfo {
dst: destination,
topic: topic.map(String::into_bytes),
payload: msg,
})
.send()
.await
{
Err(e) => {
error!("Failed to send request: {e}");
return Err(e.into());
}
Ok(res) => {
if res.status() == STATUSCODE_NO_CONTENT {
return Ok(());
}
match res.json::<PushMessageResponse>().await {
Err(e) => {
error!("Failed to load response body {e}");
return Err(e.into());
}
Ok(resp) => {
match resp {
PushMessageResponse::Id(id) => {
let _ = serde_json::to_writer(std::io::stdout(), &id);
}
PushMessageResponse::Reply(mri) => {
let cm = CliMessage {
id: mri.id,
topic: mri.topic.map(|topic| {
if let Ok(s) = String::from_utf8(topic.clone()) {
Payload::Readable(s)
} else {
Payload::NotReadable(topic)
}
}),
src_ip: mri.src_ip,
src_pk: mri.src_pk,
dst_ip: mri.dst_ip,
dst_pk: mri.dst_pk,
payload: Some({
if let Ok(s) = String::from_utf8(mri.payload.clone()) {
Payload::Readable(s)
} else {
Payload::NotReadable(mri.payload)
}
}),
};
let _ = serde_json::to_writer(std::io::stdout(), &cm);
}
}
println!();
}
}
}
}
Ok(())
}
const STATUSCODE_NO_CONTENT: u16 = 204;
pub async fn recv_msg(
timeout: Option<u64>,
topic: Option<String>,
msg_path: Option<PathBuf>,
raw: bool,
server_addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
// One year timeout should be sufficient
let timeout = timeout.unwrap_or(60 * 60 * 24 * 365);
let mut url = format!("http://{server_addr}/api/v1/messages?timeout={timeout}");
if let Some(ref topic) = topic {
if topic.len() > 255 {
error!("{topic} is longer than the maximum allowed topic length of 255");
return Err(
std::io::Error::new(std::io::ErrorKind::InvalidInput, "Topic too long").into(),
);
}
url.push_str(&format!("&topic={}", encode_base64(topic.as_bytes())));
}
let mut cm = match reqwest::get(url).await {
Err(e) => {
error!("Failed to wait for message: {e}");
return Err(e.into());
}
Ok(resp) => {
if resp.status() == STATUSCODE_NO_CONTENT {
debug!("No message ready yet");
return Ok(());
}
debug!("Received message response");
match resp.json::<MessageReceiveInfo>().await {
Err(e) => {
error!("Failed to load response json: {e}");
return Err(e.into());
}
Ok(mri) => CliMessage {
id: mri.id,
topic: mri.topic.map(|topic| {
if let Ok(s) = String::from_utf8(topic.clone()) {
Payload::Readable(s)
} else {
Payload::NotReadable(topic)
}
}),
src_ip: mri.src_ip,
src_pk: mri.src_pk,
dst_ip: mri.dst_ip,
dst_pk: mri.dst_pk,
payload: Some({
if let Ok(s) = String::from_utf8(mri.payload.clone()) {
Payload::Readable(s)
} else {
Payload::NotReadable(mri.payload)
}
}),
},
}
}
};
if let Some(ref file_path) = msg_path {
if let Err(e) = tokio::fs::write(
&file_path,
match mem::take(&mut cm.payload).unwrap() {
Payload::Readable(ref s) => s as &dyn AsRef<[u8]>,
Payload::NotReadable(ref v) => v,
},
)
.await
{
error!("Failed to write response payload to file: {e}");
return Err(e.into());
}
}
if raw {
// only print payload if not already written
if msg_path.is_none() {
let _ = std::io::stdout().write_all(match cm.payload.unwrap() {
Payload::Readable(ref s) => s.as_bytes(),
Payload::NotReadable(ref v) => v,
});
println!();
}
} else {
let _ = serde_json::to_writer(std::io::stdout(), &cm);
println!();
}
Ok(())
}

View File

@@ -0,0 +1,141 @@
use mycelium::peer_manager::PeerStats;
use mycelium_api::AddPeer;
use prettytable::{row, Table};
use std::net::SocketAddr;
use tracing::{debug, error};
/// List the peers the current node is connected to
pub async fn list_peers(
server_addr: SocketAddr,
json_print: bool,
) -> Result<(), Box<dyn std::error::Error>> {
// Make API call
let request_url = format!("http://{server_addr}/api/v1/admin/peers");
match reqwest::get(&request_url).await {
Err(e) => {
error!("Failed to retrieve peers");
return Err(e.into());
}
Ok(resp) => {
debug!("Listing connected peers");
match resp.json::<Vec<PeerStats>>().await {
Err(e) => {
error!("Failed to load response json: {e}");
return Err(e.into());
}
Ok(peers) => {
if json_print {
// Print peers in JSON format
let json_output = serde_json::to_string_pretty(&peers)?;
println!("{json_output}");
} else {
// Print peers in table format
let mut table = Table::new();
table.add_row(row![
"Protocol",
"Socket",
"Type",
"Connection",
"Rx total",
"Tx total",
"Discovered",
"Last connection"
]);
for peer in peers.iter() {
table.add_row(row![
peer.endpoint.proto(),
peer.endpoint.address(),
peer.pt,
peer.connection_state,
format_bytes(peer.rx_bytes),
format_bytes(peer.tx_bytes),
format_seconds(peer.discovered),
peer.last_connected
.map(format_seconds)
.unwrap_or("Never connected".to_string()),
]);
}
table.printstd();
}
}
}
}
};
Ok(())
}
fn format_bytes(bytes: u64) -> String {
let byte = byte_unit::Byte::from_u64(bytes);
let adjusted_byte = byte.get_appropriate_unit(byte_unit::UnitType::Binary);
format!(
"{:.2} {}",
adjusted_byte.get_value(),
adjusted_byte.get_unit()
)
}
/// Convert an amount of seconds into a human readable string.
fn format_seconds(total_seconds: u64) -> String {
let seconds = total_seconds % 60;
let minutes = (total_seconds / 60) % 60;
let hours = (total_seconds / 3600) % 60;
let days = (total_seconds / 86400) % 60;
if days > 0 {
format!("{days}d {hours}h {minutes}m {seconds}s")
} else if hours > 0 {
format!("{hours}h {minutes}m {seconds}s")
} else if minutes > 0 {
format!("{minutes}m {seconds}s")
} else {
format!("{seconds}s")
}
}
/// Remove peer(s) by (underlay) IP
pub async fn remove_peers(
server_addr: SocketAddr,
peers: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
for peer in peers.iter() {
// encode to pass in URL
let peer_encoded = urlencoding::encode(peer);
let request_url = format!("http://{server_addr}/api/v1/admin/peers/{peer_encoded}");
if let Err(e) = client
.delete(&request_url)
.send()
.await
.and_then(|res| res.error_for_status())
{
error!("Failed to delete peer: {e}");
return Err(e.into());
}
}
Ok(())
}
/// Add peer(s) by (underlay) IP
pub async fn add_peers(
server_addr: SocketAddr,
peers: Vec<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
for peer in peers.into_iter() {
let request_url = format!("http://{server_addr}/api/v1/admin/peers");
if let Err(e) = client
.post(&request_url)
.json(&AddPeer { endpoint: peer })
.send()
.await
.and_then(|res| res.error_for_status())
{
error!("Failed to add peer: {e}");
return Err(e.into());
}
}
Ok(())
}

View File

@@ -0,0 +1,152 @@
use mycelium_api::{NoRouteSubnet, QueriedSubnet, Route};
use prettytable::{row, Table};
use std::net::SocketAddr;
use tracing::{debug, error};
pub async fn list_selected_routes(
server_addr: SocketAddr,
json_print: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let request_url = format!("http://{server_addr}/api/v1/admin/routes/selected");
match reqwest::get(&request_url).await {
Err(e) => {
error!("Failed to retrieve selected routes");
return Err(e.into());
}
Ok(resp) => {
debug!("Listing selected routes");
if json_print {
// API call returns routes in JSON format by default
let selected_routes = resp.text().await?;
println!("{selected_routes}");
} else {
// Print routes in table format
let routes: Vec<Route> = resp.json().await?;
let mut table = Table::new();
table.add_row(row!["Subnet", "Next Hop", "Metric", "Seq No"]);
for route in routes.iter() {
table.add_row(row![
&route.subnet,
&route.next_hop,
route.metric,
route.seqno,
]);
}
table.printstd();
}
}
}
Ok(())
}
pub async fn list_fallback_routes(
server_addr: SocketAddr,
json_print: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let request_url = format!("http://{server_addr}/api/v1/admin/routes/fallback");
match reqwest::get(&request_url).await {
Err(e) => {
error!("Failed to retrieve fallback routes");
return Err(e.into());
}
Ok(resp) => {
debug!("Listing fallback routes");
if json_print {
// API call returns routes in JSON format by default
let fallback_routes = resp.text().await?;
println!("{fallback_routes}");
} else {
// Print routes in table format
let routes: Vec<Route> = resp.json().await?;
let mut table = Table::new();
table.add_row(row!["Subnet", "Next Hop", "Metric", "Seq No"]);
for route in routes.iter() {
table.add_row(row![
&route.subnet,
&route.next_hop,
route.metric,
route.seqno,
]);
}
table.printstd();
}
}
}
Ok(())
}
pub async fn list_queried_subnets(
server_addr: SocketAddr,
json_print: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let request_url = format!("http://{server_addr}/api/v1/admin/routes/queried");
match reqwest::get(&request_url).await {
Err(e) => {
error!("Failed to retrieve queried subnets");
return Err(e.into());
}
Ok(resp) => {
debug!("Listing queried routes");
if json_print {
// API call returns routes in JSON format by default
let queried_routes = resp.text().await?;
println!("{queried_routes}");
} else {
// Print routes in table format
let queries: Vec<QueriedSubnet> = resp.json().await?;
let mut table = Table::new();
table.add_row(row!["Subnet", "Query expiration"]);
for query in queries.iter() {
table.add_row(row![query.subnet, query.expiration,]);
}
table.printstd();
}
}
}
Ok(())
}
pub async fn list_no_route_entries(
server_addr: SocketAddr,
json_print: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let request_url = format!("http://{server_addr}/api/v1/admin/routes/no_route");
match reqwest::get(&request_url).await {
Err(e) => {
error!("Failed to retrieve subnets with no route entries");
return Err(e.into());
}
Ok(resp) => {
debug!("Listing no route entries");
if json_print {
// API call returns routes in JSON format by default
let nrs = resp.text().await?;
println!("{nrs}");
} else {
// Print routes in table format
let no_routes: Vec<NoRouteSubnet> = resp.json().await?;
let mut table = Table::new();
table.add_row(row!["Subnet", "Entry expiration"]);
for nrs in no_routes.iter() {
table.add_row(row![nrs.subnet, nrs.expiration,]);
}
table.printstd();
}
}
}
Ok(())
}