sal/redisclient/src/redisclient.rs
Mahmoud-Emad 3e617c2489
Some checks are pending
Rhai Tests / Run Rhai Tests (push) Waiting to run
feat: Add redisclient package to the monorepo
- Integrate the redisclient package into the workspace.
- Update the MONOREPO_CONVERSION_PLAN.md to reflect the
  completion of the redisclient package conversion.
  This includes marking its conversion as complete and
  updating the success metrics.
- Add the redisclient package's Cargo.toml file.
- Add the redisclient package's source code files.
- Add tests for the redisclient package.
- Add README file for the redisclient package.
2025-06-18 17:53:03 +03:00

362 lines
10 KiB
Rust

use lazy_static::lazy_static;
use redis::{Client, Cmd, Connection, RedisError, RedisResult};
use std::env;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, Once};
/// Redis connection configuration builder
///
/// This struct is used to build a Redis connection configuration.
/// It follows the builder pattern to allow for flexible configuration.
#[derive(Clone)]
pub struct RedisConfigBuilder {
pub host: String,
pub port: u16,
pub db: i64,
pub username: Option<String>,
pub password: Option<String>,
pub use_tls: bool,
pub use_unix_socket: bool,
pub socket_path: Option<String>,
pub connection_timeout: Option<u64>,
}
impl Default for RedisConfigBuilder {
fn default() -> Self {
Self {
host: "127.0.0.1".to_string(),
port: 6379,
db: 0,
username: None,
password: None,
use_tls: false,
use_unix_socket: false,
socket_path: None,
connection_timeout: None,
}
}
}
impl RedisConfigBuilder {
/// Create a new Redis connection configuration builder with default values
pub fn new() -> Self {
Self::default()
}
/// Set the host for the Redis connection
pub fn host(mut self, host: &str) -> Self {
self.host = host.to_string();
self
}
/// Set the port for the Redis connection
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}
/// Set the database for the Redis connection
pub fn db(mut self, db: i64) -> Self {
self.db = db;
self
}
/// Set the username for the Redis connection (Redis 6.0+)
pub fn username(mut self, username: &str) -> Self {
self.username = Some(username.to_string());
self
}
/// Set the password for the Redis connection
pub fn password(mut self, password: &str) -> Self {
self.password = Some(password.to_string());
self
}
/// Enable TLS for the Redis connection
pub fn use_tls(mut self, use_tls: bool) -> Self {
self.use_tls = use_tls;
self
}
/// Use Unix socket for the Redis connection
pub fn use_unix_socket(mut self, use_unix_socket: bool) -> Self {
self.use_unix_socket = use_unix_socket;
self
}
/// Set the Unix socket path for the Redis connection
pub fn socket_path(mut self, socket_path: &str) -> Self {
self.socket_path = Some(socket_path.to_string());
self.use_unix_socket = true;
self
}
/// Set the connection timeout in seconds
pub fn connection_timeout(mut self, seconds: u64) -> Self {
self.connection_timeout = Some(seconds);
self
}
/// Build the connection URL from the configuration
pub fn build_connection_url(&self) -> String {
if self.use_unix_socket {
if let Some(ref socket_path) = self.socket_path {
return format!("unix://{}", socket_path);
} else {
// Default socket path
let home_dir = env::var("HOME").unwrap_or_else(|_| String::from("/root"));
return format!("unix://{}/hero/var/myredis.sock", home_dir);
}
}
let mut url = if self.use_tls {
format!("rediss://{}:{}", self.host, self.port)
} else {
format!("redis://{}:{}", self.host, self.port)
};
// Add authentication if provided
if let Some(ref username) = self.username {
if let Some(ref password) = self.password {
url = format!(
"redis://{}:{}@{}:{}",
username, password, self.host, self.port
);
} else {
url = format!("redis://{}@{}:{}", username, self.host, self.port);
}
} else if let Some(ref password) = self.password {
url = format!("redis://:{}@{}:{}", password, self.host, self.port);
}
// Add database
url = format!("{}/{}", url, self.db);
url
}
/// Build a Redis client from the configuration
pub fn build(&self) -> RedisResult<(Client, i64)> {
let url = self.build_connection_url();
let client = Client::open(url)?;
Ok((client, self.db))
}
}
// Global Redis client instance using lazy_static
lazy_static! {
static ref REDIS_CLIENT: Mutex<Option<Arc<RedisClientWrapper>>> = Mutex::new(None);
static ref INIT: Once = Once::new();
}
// Wrapper for Redis client to handle connection and DB selection
pub struct RedisClientWrapper {
client: Client,
connection: Mutex<Option<Connection>>,
db: i64,
initialized: AtomicBool,
}
impl RedisClientWrapper {
// Create a new Redis client wrapper
fn new(client: Client, db: i64) -> Self {
RedisClientWrapper {
client,
connection: Mutex::new(None),
db,
initialized: AtomicBool::new(false),
}
}
// Execute a command on the Redis connection
pub fn execute<T: redis::FromRedisValue>(&self, cmd: &mut Cmd) -> RedisResult<T> {
let mut conn_guard = self.connection.lock().unwrap();
// If we don't have a connection or it's not working, create a new one
if conn_guard.is_none() || {
if let Some(ref mut conn) = *conn_guard {
let ping_result: RedisResult<String> = redis::cmd("PING").query(conn);
ping_result.is_err()
} else {
true
}
} {
*conn_guard = Some(self.client.get_connection()?);
}
cmd.query(&mut conn_guard.as_mut().unwrap())
}
// Initialize the client (ping and select DB)
fn initialize(&self) -> RedisResult<()> {
if self.initialized.load(Ordering::Relaxed) {
return Ok(());
}
let mut conn = self.client.get_connection()?;
// Ping Redis to ensure it works
let ping_result: String = redis::cmd("PING").query(&mut conn)?;
if ping_result != "PONG" {
return Err(RedisError::from((
redis::ErrorKind::ResponseError,
"Failed to ping Redis server",
)));
}
// Select the database
let _ = redis::cmd("SELECT").arg(self.db).exec(&mut conn);
self.initialized.store(true, Ordering::Relaxed);
// Store the connection
let mut conn_guard = self.connection.lock().unwrap();
*conn_guard = Some(conn);
Ok(())
}
}
// Get the Redis client instance
pub fn get_redis_client() -> RedisResult<Arc<RedisClientWrapper>> {
// Check if we already have a client
{
let guard = REDIS_CLIENT.lock().unwrap();
if let Some(ref client) = &*guard {
return Ok(Arc::clone(client));
}
}
// Create a new client
let client = create_redis_client()?;
// Store the client globally
{
let mut guard = REDIS_CLIENT.lock().unwrap();
*guard = Some(Arc::clone(&client));
}
Ok(client)
}
// Create a new Redis client
fn create_redis_client() -> RedisResult<Arc<RedisClientWrapper>> {
// Get Redis configuration from environment variables
let db = get_redis_db();
let password = env::var("REDIS_PASSWORD").ok();
let username = env::var("REDIS_USERNAME").ok();
let host = env::var("REDIS_HOST").unwrap_or_else(|_| String::from("127.0.0.1"));
let port = env::var("REDIS_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.unwrap_or(6379);
// Create a builder with environment variables
let mut builder = RedisConfigBuilder::new().host(&host).port(port).db(db);
if let Some(user) = username {
builder = builder.username(&user);
}
if let Some(pass) = password {
builder = builder.password(&pass);
}
// First try: Connect via Unix socket if it exists
let home_dir = env::var("HOME").unwrap_or_else(|_| String::from("/root"));
let socket_path = format!("{}/hero/var/myredis.sock", home_dir);
if Path::new(&socket_path).exists() {
// Try to connect via Unix socket
let socket_builder = builder.clone().socket_path(&socket_path);
match socket_builder.build() {
Ok((client, db)) => {
let wrapper = Arc::new(RedisClientWrapper::new(client, db));
// Initialize the client
if let Err(err) = wrapper.initialize() {
eprintln!(
"Socket exists at {} but connection failed: {}",
socket_path, err
);
} else {
return Ok(wrapper);
}
}
Err(err) => {
eprintln!(
"Socket exists at {} but connection failed: {}",
socket_path, err
);
}
}
}
// Second try: Connect via TCP
match builder.clone().build() {
Ok((client, db)) => {
let wrapper = Arc::new(RedisClientWrapper::new(client, db));
// Initialize the client
wrapper.initialize()?;
Ok(wrapper)
}
Err(err) => Err(RedisError::from((
redis::ErrorKind::IoError,
"Failed to connect to Redis",
format!(
"Could not connect via socket at {} or via TCP to {}:{}: {}",
socket_path, host, port, err
),
))),
}
}
// Get the Redis DB number from environment variable
fn get_redis_db() -> i64 {
env::var("REDISDB")
.ok()
.and_then(|db_str| db_str.parse::<i64>().ok())
.unwrap_or(0)
}
// Reload the Redis client
pub fn reset() -> RedisResult<()> {
// Clear the existing client
{
let mut client_guard = REDIS_CLIENT.lock().unwrap();
*client_guard = None;
}
// Create a new client, only return error if it fails
// We don't need to return the client itself
get_redis_client()?;
Ok(())
}
// Execute a Redis command
pub fn execute<T>(cmd: &mut Cmd) -> RedisResult<T>
where
T: redis::FromRedisValue,
{
let client = get_redis_client()?;
client.execute(cmd)
}
/// Create a new Redis client with custom configuration
///
/// # Arguments
///
/// * `config` - The Redis connection configuration builder
///
/// # Returns
///
/// * `RedisResult<Client>` - The Redis client if successful, error otherwise
pub fn with_config(config: RedisConfigBuilder) -> RedisResult<Client> {
let (client, _) = config.build()?;
Ok(client)
}