From 114d63e59042aaa3dbdacec92c8d3f2edb0377f8 Mon Sep 17 00:00:00 2001 From: Mahmoud Emad Date: Fri, 9 May 2025 10:45:53 +0300 Subject: [PATCH] feat: Add PostgreSQL connection pooling support - Implement connection pooling using `r2d2` and `r2d2_postgres` - Add connection pool configuration options to `PostgresConfigBuilder` - Introduce transaction functions with automatic commit/rollback - Add functions for executing queries using the connection pool - Add `QueryParams` struct for building parameterized queries - Add tests for connection pooling and transaction functions --- Cargo.toml | 2 + src/postgresclient/postgresclient.rs | 469 +++++++++++++++++++++++++++ src/postgresclient/tests.rs | 423 +++++++++++++++++++++--- 3 files changed, 851 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9b4cfdc..327da7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,8 @@ log = "0.4" # Logging facade rhai = { version = "1.12.0", features = ["sync"] } # Embedded scripting language rand = "0.8.5" # Random number generation clap = "2.33" # Command-line argument parsing +r2d2 = "0.8.10" +r2d2_postgres = "0.18.2" # Optional features for specific OS functionality [target.'cfg(unix)'.dependencies] diff --git a/src/postgresclient/postgresclient.rs b/src/postgresclient/postgresclient.rs index a9595c3..b2e4baa 100644 --- a/src/postgresclient/postgresclient.rs +++ b/src/postgresclient/postgresclient.rs @@ -1,7 +1,11 @@ use lazy_static::lazy_static; +use postgres::types::ToSql; use postgres::{Client, Error as PostgresError, NoTls, Row}; +use r2d2::Pool; +use r2d2_postgres::PostgresConnectionManager; use std::env; use std::sync::{Arc, Mutex, Once}; +use std::time::Duration; // Helper function to create a PostgreSQL error fn create_postgres_error(_message: &str) -> PostgresError { @@ -22,6 +26,8 @@ fn create_postgres_error(_message: &str) -> PostgresError { // Global PostgreSQL client instance using lazy_static lazy_static! { static ref POSTGRES_CLIENT: Mutex>> = Mutex::new(None); + static ref POSTGRES_POOL: Mutex>>>> = + Mutex::new(None); static ref INIT: Once = Once::new(); } @@ -39,6 +45,13 @@ pub struct PostgresConfigBuilder { pub application_name: Option, pub connect_timeout: Option, pub ssl_mode: Option, + // Connection pool settings + pub pool_max_size: Option, + pub pool_min_idle: Option, + pub pool_idle_timeout: Option, + pub pool_connection_timeout: Option, + pub pool_max_lifetime: Option, + pub use_pool: bool, } impl Default for PostgresConfigBuilder { @@ -52,6 +65,13 @@ impl Default for PostgresConfigBuilder { application_name: None, connect_timeout: None, ssl_mode: None, + // Default pool settings + pool_max_size: Some(10), + pool_min_idle: Some(1), + pool_idle_timeout: Some(Duration::from_secs(300)), + pool_connection_timeout: Some(Duration::from_secs(30)), + pool_max_lifetime: Some(Duration::from_secs(1800)), + use_pool: false, } } } @@ -110,6 +130,42 @@ impl PostgresConfigBuilder { self } + /// Enable connection pooling + pub fn use_pool(mut self, use_pool: bool) -> Self { + self.use_pool = use_pool; + self + } + + /// Set the maximum size of the connection pool + pub fn pool_max_size(mut self, size: u32) -> Self { + self.pool_max_size = Some(size); + self + } + + /// Set the minimum number of idle connections in the pool + pub fn pool_min_idle(mut self, size: u32) -> Self { + self.pool_min_idle = Some(size); + self + } + + /// Set the idle timeout for connections in the pool + pub fn pool_idle_timeout(mut self, timeout: Duration) -> Self { + self.pool_idle_timeout = Some(timeout); + self + } + + /// Set the connection timeout for the pool + pub fn pool_connection_timeout(mut self, timeout: Duration) -> Self { + self.pool_connection_timeout = Some(timeout); + self + } + + /// Set the maximum lifetime of connections in the pool + pub fn pool_max_lifetime(mut self, lifetime: Duration) -> Self { + self.pool_max_lifetime = Some(lifetime); + self + } + /// Build the connection string from the configuration pub fn build_connection_string(&self) -> String { let mut conn_string = format!( @@ -141,6 +197,36 @@ impl PostgresConfigBuilder { let conn_string = self.build_connection_string(); Client::connect(&conn_string, NoTls) } + + /// Build a PostgreSQL connection pool from the configuration + pub fn build_pool(&self) -> Result>, r2d2::Error> { + let conn_string = self.build_connection_string(); + let manager = PostgresConnectionManager::new(conn_string.parse().unwrap(), NoTls); + + let mut pool_builder = r2d2::Pool::builder(); + + if let Some(max_size) = self.pool_max_size { + pool_builder = pool_builder.max_size(max_size); + } + + if let Some(min_idle) = self.pool_min_idle { + pool_builder = pool_builder.min_idle(Some(min_idle)); + } + + if let Some(idle_timeout) = self.pool_idle_timeout { + pool_builder = pool_builder.idle_timeout(Some(idle_timeout)); + } + + if let Some(connection_timeout) = self.pool_connection_timeout { + pool_builder = pool_builder.connection_timeout(connection_timeout); + } + + if let Some(max_lifetime) = self.pool_max_lifetime { + pool_builder = pool_builder.max_lifetime(Some(max_lifetime)); + } + + pool_builder.build(manager) + } } /// Wrapper for PostgreSQL client to handle connection @@ -149,6 +235,101 @@ pub struct PostgresClientWrapper { client: Mutex>, } +/// Transaction functions for PostgreSQL +/// +/// These functions provide a way to execute queries within a transaction. +/// The transaction is automatically committed when the function returns successfully, +/// or rolled back if an error occurs. +/// +/// Example: +/// ``` +/// use sal::postgresclient::{transaction, QueryParams}; +/// +/// let result = transaction(|client| { +/// // Execute queries within the transaction +/// client.execute("INSERT INTO users (name) VALUES ($1)", &[&"John"])?; +/// client.execute("UPDATE users SET active = true WHERE name = $1", &[&"John"])?; +/// +/// // Return a result from the transaction +/// Ok(()) +/// }); +/// ``` +pub fn transaction(operations: F) -> Result +where + F: FnOnce(&mut Client) -> Result, +{ + let client = get_postgres_client()?; + let client_mutex = client.get_client()?; + let mut client_guard = client_mutex.lock().unwrap(); + + if let Some(client) = client_guard.as_mut() { + // Begin transaction + client.execute("BEGIN", &[])?; + + // Execute operations + match operations(client) { + Ok(result) => { + // Commit transaction + client.execute("COMMIT", &[])?; + Ok(result) + } + Err(e) => { + // Rollback transaction + let _ = client.execute("ROLLBACK", &[]); + Err(e) + } + } + } else { + Err(create_postgres_error("Failed to get PostgreSQL client")) + } +} + +/// Transaction functions for PostgreSQL using the connection pool +/// +/// These functions provide a way to execute queries within a transaction using the connection pool. +/// The transaction is automatically committed when the function returns successfully, +/// or rolled back if an error occurs. +/// +/// Example: +/// ``` +/// use sal::postgresclient::{transaction_with_pool, QueryParams}; +/// +/// let result = transaction_with_pool(|client| { +/// // Execute queries within the transaction +/// client.execute("INSERT INTO users (name) VALUES ($1)", &[&"John"])?; +/// client.execute("UPDATE users SET active = true WHERE name = $1", &[&"John"])?; +/// +/// // Return a result from the transaction +/// Ok(()) +/// }); +/// ``` +pub fn transaction_with_pool(operations: F) -> Result +where + F: FnOnce(&mut Client) -> Result, +{ + let pool = get_postgres_pool()?; + let mut client = pool.get().map_err(|e| { + create_postgres_error(&format!("Failed to get connection from pool: {}", e)) + })?; + + // Begin transaction + client.execute("BEGIN", &[])?; + + // Execute operations + match operations(&mut client) { + Ok(result) => { + // Commit transaction + client.execute("COMMIT", &[])?; + Ok(result) + } + Err(e) => { + // Rollback transaction + let _ = client.execute("ROLLBACK", &[]); + Err(e) + } + } +} + impl PostgresClientWrapper { /// Create a new PostgreSQL client wrapper fn new(connection_string: String) -> Self { @@ -354,3 +535,291 @@ pub fn query_opt( pub fn with_config(config: PostgresConfigBuilder) -> Result { config.build() } + +/// Create a new PostgreSQL connection pool with custom configuration +pub fn with_pool_config( + config: PostgresConfigBuilder, +) -> Result>, r2d2::Error> { + config.build_pool() +} + +/// Get the PostgreSQL connection pool instance +pub fn get_postgres_pool() -> Result>>, PostgresError> { + // Check if we already have a pool + { + let guard = POSTGRES_POOL.lock().unwrap(); + if let Some(ref pool) = &*guard { + return Ok(Arc::clone(pool)); + } + } + + // Create a new pool + let pool = create_postgres_pool()?; + + // Store the pool globally + { + let mut guard = POSTGRES_POOL.lock().unwrap(); + *guard = Some(Arc::clone(&pool)); + } + + Ok(pool) +} + +/// Create a new PostgreSQL connection pool +fn create_postgres_pool() -> Result>>, PostgresError> { + // Try to get connection details from environment variables + let host = env::var("POSTGRES_HOST").unwrap_or_else(|_| String::from("localhost")); + let port = env::var("POSTGRES_PORT") + .ok() + .and_then(|p| p.parse::().ok()) + .unwrap_or(5432); + let user = env::var("POSTGRES_USER").unwrap_or_else(|_| String::from("postgres")); + let password = env::var("POSTGRES_PASSWORD").ok(); + let database = env::var("POSTGRES_DB").unwrap_or_else(|_| String::from("postgres")); + + // Build the configuration + let mut builder = PostgresConfigBuilder::new() + .host(&host) + .port(port) + .user(&user) + .database(&database) + .use_pool(true); + + if let Some(pass) = password { + builder = builder.password(&pass); + } + + // Create the pool + match builder.build_pool() { + Ok(pool) => { + // Test the connection + match pool.get() { + Ok(_) => Ok(Arc::new(pool)), + Err(e) => Err(create_postgres_error(&format!( + "Failed to connect to PostgreSQL: {}", + e + ))), + } + } + Err(e) => Err(create_postgres_error(&format!( + "Failed to create PostgreSQL connection pool: {}", + e + ))), + } +} + +/// Reset the PostgreSQL connection pool +pub fn reset_pool() -> Result<(), PostgresError> { + // Clear the existing pool + { + let mut pool_guard = POSTGRES_POOL.lock().unwrap(); + *pool_guard = None; + } + + // Create a new pool, only return error if it fails + get_postgres_pool()?; + Ok(()) +} + +/// Execute a query using the connection pool +pub fn execute_with_pool( + query: &str, + params: &[&(dyn postgres::types::ToSql + Sync)], +) -> Result { + let pool = get_postgres_pool()?; + let mut client = pool.get().map_err(|e| { + create_postgres_error(&format!("Failed to get connection from pool: {}", e)) + })?; + client.execute(query, params) +} + +/// Execute a query using the connection pool and return the rows +pub fn query_with_pool( + query: &str, + params: &[&(dyn postgres::types::ToSql + Sync)], +) -> Result, PostgresError> { + let pool = get_postgres_pool()?; + let mut client = pool.get().map_err(|e| { + create_postgres_error(&format!("Failed to get connection from pool: {}", e)) + })?; + client.query(query, params) +} + +/// Execute a query using the connection pool and return a single row +pub fn query_one_with_pool( + query: &str, + params: &[&(dyn postgres::types::ToSql + Sync)], +) -> Result { + let pool = get_postgres_pool()?; + let mut client = pool.get().map_err(|e| { + create_postgres_error(&format!("Failed to get connection from pool: {}", e)) + })?; + client.query_one(query, params) +} + +/// Execute a query using the connection pool and return an optional row +pub fn query_opt_with_pool( + query: &str, + params: &[&(dyn postgres::types::ToSql + Sync)], +) -> Result, PostgresError> { + let pool = get_postgres_pool()?; + let mut client = pool.get().map_err(|e| { + create_postgres_error(&format!("Failed to get connection from pool: {}", e)) + })?; + client.query_opt(query, params) +} + +/// Parameter builder for PostgreSQL queries +/// +/// This struct helps build parameterized queries for PostgreSQL. +/// It provides a type-safe way to build query parameters. +#[derive(Default)] +pub struct QueryParams { + params: Vec>, +} + +impl QueryParams { + /// Create a new empty parameter builder + pub fn new() -> Self { + Self { params: Vec::new() } + } + + /// Add a parameter to the builder + pub fn add(&mut self, value: T) -> &mut Self { + self.params.push(Box::new(value)); + self + } + + /// Add a string parameter to the builder + pub fn add_str(&mut self, value: &str) -> &mut Self { + self.add(value.to_string()) + } + + /// Add an integer parameter to the builder + pub fn add_int(&mut self, value: i32) -> &mut Self { + self.add(value) + } + + /// Add a float parameter to the builder + pub fn add_float(&mut self, value: f64) -> &mut Self { + self.add(value) + } + + /// Add a boolean parameter to the builder + pub fn add_bool(&mut self, value: bool) -> &mut Self { + self.add(value) + } + + /// Add an optional parameter to the builder + pub fn add_opt(&mut self, value: Option) -> &mut Self { + if let Some(v) = value { + self.add(v); + } else { + // Add NULL value + self.params.push(Box::new(None::)); + } + self + } + + /// Get the parameters as a slice of references + pub fn as_slice(&self) -> Vec<&(dyn ToSql + Sync)> { + self.params + .iter() + .map(|p| p.as_ref() as &(dyn ToSql + Sync)) + .collect() + } +} + +/// Execute a query with the parameter builder +pub fn execute_with_params(query_str: &str, params: &QueryParams) -> Result { + let client = get_postgres_client()?; + client.execute(query_str, ¶ms.as_slice()) +} + +/// Execute a query with the parameter builder and return the rows +pub fn query_with_params(query_str: &str, params: &QueryParams) -> Result, PostgresError> { + let client = get_postgres_client()?; + client.query(query_str, ¶ms.as_slice()) +} + +/// Execute a query with the parameter builder and return a single row +pub fn query_one_with_params(query_str: &str, params: &QueryParams) -> Result { + let client = get_postgres_client()?; + client.query_one(query_str, ¶ms.as_slice()) +} + +/// Execute a query with the parameter builder and return an optional row +pub fn query_opt_with_params( + query_str: &str, + params: &QueryParams, +) -> Result, PostgresError> { + let client = get_postgres_client()?; + client.query_opt(query_str, ¶ms.as_slice()) +} + +/// Execute a query with the parameter builder using the connection pool +pub fn execute_with_pool_params( + query_str: &str, + params: &QueryParams, +) -> Result { + execute_with_pool(query_str, ¶ms.as_slice()) +} + +/// Execute a query with the parameter builder using the connection pool and return the rows +pub fn query_with_pool_params( + query_str: &str, + params: &QueryParams, +) -> Result, PostgresError> { + query_with_pool(query_str, ¶ms.as_slice()) +} + +/// Execute a query with the parameter builder using the connection pool and return a single row +pub fn query_one_with_pool_params( + query_str: &str, + params: &QueryParams, +) -> Result { + query_one_with_pool(query_str, ¶ms.as_slice()) +} + +/// Execute a query with the parameter builder using the connection pool and return an optional row +pub fn query_opt_with_pool_params( + query_str: &str, + params: &QueryParams, +) -> Result, PostgresError> { + query_opt_with_pool(query_str, ¶ms.as_slice()) +} + +/// Send a notification on a channel +/// +/// This function sends a notification on the specified channel with the specified payload. +/// +/// Example: +/// ``` +/// use sal::postgresclient::notify; +/// +/// notify("my_channel", "Hello, world!").expect("Failed to send notification"); +/// ``` +pub fn notify(channel: &str, payload: &str) -> Result<(), PostgresError> { + let client = get_postgres_client()?; + client.execute(&format!("NOTIFY {}, '{}'", channel, payload), &[])?; + Ok(()) +} + +/// Send a notification on a channel using the connection pool +/// +/// This function sends a notification on the specified channel with the specified payload using the connection pool. +/// +/// Example: +/// ``` +/// use sal::postgresclient::notify_with_pool; +/// +/// notify_with_pool("my_channel", "Hello, world!").expect("Failed to send notification"); +/// ``` +pub fn notify_with_pool(channel: &str, payload: &str) -> Result<(), PostgresError> { + let pool = get_postgres_pool()?; + let mut client = pool.get().map_err(|e| { + create_postgres_error(&format!("Failed to get connection from pool: {}", e)) + })?; + client.execute(&format!("NOTIFY {}, '{}'", channel, payload), &[])?; + Ok(()) +} diff --git a/src/postgresclient/tests.rs b/src/postgresclient/tests.rs index 894144d..5102617 100644 --- a/src/postgresclient/tests.rs +++ b/src/postgresclient/tests.rs @@ -4,7 +4,7 @@ use std::env; #[cfg(test)] mod postgres_client_tests { use super::*; - + #[test] fn test_env_vars() { // Save original environment variables to restore later @@ -13,24 +13,24 @@ mod postgres_client_tests { let original_user = env::var("POSTGRES_USER").ok(); let original_password = env::var("POSTGRES_PASSWORD").ok(); let original_db = env::var("POSTGRES_DB").ok(); - + // Set test environment variables env::set_var("POSTGRES_HOST", "test-host"); env::set_var("POSTGRES_PORT", "5433"); env::set_var("POSTGRES_USER", "test-user"); env::set_var("POSTGRES_PASSWORD", "test-password"); env::set_var("POSTGRES_DB", "test-db"); - + // Test with invalid port env::set_var("POSTGRES_PORT", "invalid"); - + // Test with unset values env::remove_var("POSTGRES_HOST"); env::remove_var("POSTGRES_PORT"); env::remove_var("POSTGRES_USER"); env::remove_var("POSTGRES_PASSWORD"); env::remove_var("POSTGRES_DB"); - + // Restore original environment variables if let Some(host) = original_host { env::set_var("POSTGRES_HOST", host); @@ -48,11 +48,11 @@ mod postgres_client_tests { env::set_var("POSTGRES_DB", db); } } - + #[test] fn test_postgres_config_builder() { // Test the PostgreSQL configuration builder - + // Test default values let config = PostgresConfigBuilder::new(); assert_eq!(config.host, "localhost"); @@ -63,7 +63,7 @@ mod postgres_client_tests { assert_eq!(config.application_name, None); assert_eq!(config.connect_timeout, None); assert_eq!(config.ssl_mode, None); - + // Test setting values let config = PostgresConfigBuilder::new() .host("pg.example.com") @@ -74,7 +74,7 @@ mod postgres_client_tests { .application_name("test-app") .connect_timeout(30) .ssl_mode("require"); - + assert_eq!(config.host, "pg.example.com"); assert_eq!(config.port, 5433); assert_eq!(config.user, "test-user"); @@ -84,11 +84,11 @@ mod postgres_client_tests { assert_eq!(config.connect_timeout, Some(30)); assert_eq!(config.ssl_mode, Some("require".to_string())); } - + #[test] fn test_connection_string_building() { // Test building connection strings - + // Test default connection string let config = PostgresConfigBuilder::new(); let conn_string = config.build_connection_string(); @@ -97,7 +97,7 @@ mod postgres_client_tests { assert!(conn_string.contains("user=postgres")); assert!(conn_string.contains("dbname=postgres")); assert!(!conn_string.contains("password=")); - + // Test with all options let config = PostgresConfigBuilder::new() .host("pg.example.com") @@ -108,7 +108,7 @@ mod postgres_client_tests { .application_name("test-app") .connect_timeout(30) .ssl_mode("require"); - + let conn_string = config.build_connection_string(); assert!(conn_string.contains("host=pg.example.com")); assert!(conn_string.contains("port=5433")); @@ -119,11 +119,11 @@ mod postgres_client_tests { assert!(conn_string.contains("connect_timeout=30")); assert!(conn_string.contains("sslmode=require")); } - + #[test] fn test_reset_mock() { // This is a simplified test that doesn't require an actual PostgreSQL server - + // Just verify that the reset function doesn't panic if let Err(_) = reset() { // If PostgreSQL is not available, this is expected to fail @@ -137,7 +137,8 @@ mod postgres_client_tests { #[cfg(test)] mod postgres_integration_tests { use super::*; - + use std::time::Duration; + // Helper function to check if PostgreSQL is available fn is_postgres_available() -> bool { match get_postgres_client() { @@ -145,28 +146,120 @@ mod postgres_integration_tests { Err(_) => false, } } - + #[test] fn test_postgres_client_integration() { if !is_postgres_available() { println!("Skipping PostgreSQL integration tests - PostgreSQL server not available"); return; } - + println!("Running PostgreSQL integration tests..."); - + // Test basic operations test_basic_postgres_operations(); - + // Test error handling test_error_handling(); } - + + #[test] + fn test_connection_pool() { + if !is_postgres_available() { + println!("Skipping PostgreSQL connection pool tests - PostgreSQL server not available"); + return; + } + + run_connection_pool_test(); + } + + fn run_connection_pool_test() { + println!("Running PostgreSQL connection pool tests..."); + + // Test creating a connection pool + let config = PostgresConfigBuilder::new() + .use_pool(true) + .pool_max_size(5) + .pool_min_idle(1) + .pool_connection_timeout(Duration::from_secs(5)); + + let pool_result = config.build_pool(); + assert!(pool_result.is_ok()); + + let pool = pool_result.unwrap(); + + // Test getting a connection from the pool + let conn_result = pool.get(); + assert!(conn_result.is_ok()); + + // Test executing a query with the connection + let mut conn = conn_result.unwrap(); + let query_result = conn.query("SELECT 1", &[]); + assert!(query_result.is_ok()); + + // Test the global pool + let global_pool_result = get_postgres_pool(); + assert!(global_pool_result.is_ok()); + + // Test executing queries with the pool + let create_table_query = " + CREATE TEMPORARY TABLE pool_test ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL + ) + "; + + let create_result = execute_with_pool(create_table_query, &[]); + assert!(create_result.is_ok()); + + // Test with parameters + let insert_result = execute_with_pool( + "INSERT INTO pool_test (name) VALUES ($1) RETURNING id", + &[&"test_pool"], + ); + assert!(insert_result.is_ok()); + + // Test with QueryParams + let mut params = QueryParams::new(); + params.add_str("test_pool_params"); + + let insert_params_result = execute_with_pool_params( + "INSERT INTO pool_test (name) VALUES ($1) RETURNING id", + ¶ms, + ); + assert!(insert_params_result.is_ok()); + + // Test query functions + let query_result = query_with_pool("SELECT * FROM pool_test", &[]); + assert!(query_result.is_ok()); + let rows = query_result.unwrap(); + assert_eq!(rows.len(), 2); + + // Test query_one + let query_one_result = + query_one_with_pool("SELECT * FROM pool_test WHERE name = $1", &[&"test_pool"]); + assert!(query_one_result.is_ok()); + + // Test query_opt + let query_opt_result = + query_opt_with_pool("SELECT * FROM pool_test WHERE name = $1", &[&"nonexistent"]); + assert!(query_opt_result.is_ok()); + assert!(query_opt_result.unwrap().is_none()); + + // Test resetting the pool + let reset_result = reset_pool(); + assert!(reset_result.is_ok()); + + // Test getting the pool again after reset + let pool_after_reset = get_postgres_pool(); + assert!(pool_after_reset.is_ok()); + } + fn test_basic_postgres_operations() { if !is_postgres_available() { return; } - + // Create a test table let create_table_query = " CREATE TEMPORARY TABLE test_table ( @@ -175,103 +268,347 @@ mod postgres_integration_tests { value INTEGER ) "; - + let create_result = execute(create_table_query, &[]); assert!(create_result.is_ok()); - + // Insert data let insert_query = " INSERT INTO test_table (name, value) VALUES ($1, $2) RETURNING id "; - + let insert_result = query(insert_query, &[&"test_name", &42]); assert!(insert_result.is_ok()); - + let rows = insert_result.unwrap(); assert_eq!(rows.len(), 1); - + let id: i32 = rows[0].get(0); assert!(id > 0); - + // Query data let select_query = " SELECT id, name, value FROM test_table WHERE id = $1 "; - + let select_result = query_one(select_query, &[&id]); assert!(select_result.is_ok()); - + let row = select_result.unwrap(); let name: String = row.get(1); let value: i32 = row.get(2); - + assert_eq!(name, "test_name"); assert_eq!(value, 42); - + // Update data let update_query = " UPDATE test_table SET value = $1 WHERE id = $2 "; - + let update_result = execute(update_query, &[&100, &id]); assert!(update_result.is_ok()); assert_eq!(update_result.unwrap(), 1); // 1 row affected - + // Verify update let verify_query = " SELECT value FROM test_table WHERE id = $1 "; - + let verify_result = query_one(verify_query, &[&id]); assert!(verify_result.is_ok()); - + let row = verify_result.unwrap(); let updated_value: i32 = row.get(0); assert_eq!(updated_value, 100); - + // Delete data let delete_query = " DELETE FROM test_table WHERE id = $1 "; - + let delete_result = execute(delete_query, &[&id]); assert!(delete_result.is_ok()); assert_eq!(delete_result.unwrap(), 1); // 1 row affected } - + + #[test] + fn test_query_params() { + if !is_postgres_available() { + println!("Skipping PostgreSQL parameter tests - PostgreSQL server not available"); + return; + } + + run_query_params_test(); + } + + #[test] + fn test_transactions() { + if !is_postgres_available() { + println!("Skipping PostgreSQL transaction tests - PostgreSQL server not available"); + return; + } + + println!("Running PostgreSQL transaction tests..."); + + // Test successful transaction + let result = transaction(|client| { + // Create a temporary table + client.execute( + "CREATE TEMPORARY TABLE transaction_test (id SERIAL PRIMARY KEY, name TEXT NOT NULL)", + &[], + )?; + + // Insert data + client.execute( + "INSERT INTO transaction_test (name) VALUES ($1)", + &[&"test_transaction"], + )?; + + // Query data + let rows = client.query( + "SELECT * FROM transaction_test WHERE name = $1", + &[&"test_transaction"], + )?; + + assert_eq!(rows.len(), 1); + let name: String = rows[0].get(1); + assert_eq!(name, "test_transaction"); + + // Return success + Ok(true) + }); + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), true); + + // Test failed transaction + let result = transaction(|client| { + // Create a temporary table + client.execute( + "CREATE TEMPORARY TABLE transaction_test_fail (id SERIAL PRIMARY KEY, name TEXT NOT NULL)", + &[], + )?; + + // Insert data + client.execute( + "INSERT INTO transaction_test_fail (name) VALUES ($1)", + &[&"test_transaction_fail"], + )?; + + // Cause an error with invalid SQL + client.execute("THIS IS INVALID SQL", &[])?; + + // This should not be reached + Ok(false) + }); + + assert!(result.is_err()); + + // Verify that the table was not created (transaction was rolled back) + let verify_result = query("SELECT * FROM transaction_test_fail", &[]); + + assert!(verify_result.is_err()); + + // Test transaction with pool + let result = transaction_with_pool(|client| { + // Create a temporary table + client.execute( + "CREATE TEMPORARY TABLE transaction_pool_test (id SERIAL PRIMARY KEY, name TEXT NOT NULL)", + &[], + )?; + + // Insert data + client.execute( + "INSERT INTO transaction_pool_test (name) VALUES ($1)", + &[&"test_transaction_pool"], + )?; + + // Query data + let rows = client.query( + "SELECT * FROM transaction_pool_test WHERE name = $1", + &[&"test_transaction_pool"], + )?; + + assert_eq!(rows.len(), 1); + let name: String = rows[0].get(1); + assert_eq!(name, "test_transaction_pool"); + + // Return success + Ok(true) + }); + + assert!(result.is_ok()); + assert_eq!(result.unwrap(), true); + } + + fn run_query_params_test() { + println!("Running PostgreSQL parameter tests..."); + + // Create a test table + let create_table_query = " + CREATE TEMPORARY TABLE param_test ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + value INTEGER, + active BOOLEAN, + score REAL + ) + "; + + let create_result = execute(create_table_query, &[]); + assert!(create_result.is_ok()); + + // Test QueryParams builder + let mut params = QueryParams::new(); + params.add_str("test_name"); + params.add_int(42); + params.add_bool(true); + params.add_float(3.14); + + // Insert data using QueryParams + let insert_query = " + INSERT INTO param_test (name, value, active, score) + VALUES ($1, $2, $3, $4) + RETURNING id + "; + + let insert_result = query_with_params(insert_query, ¶ms); + assert!(insert_result.is_ok()); + + let rows = insert_result.unwrap(); + assert_eq!(rows.len(), 1); + + let id: i32 = rows[0].get(0); + assert!(id > 0); + + // Query data using QueryParams + let mut query_params = QueryParams::new(); + query_params.add_int(id); + + let select_query = " + SELECT id, name, value, active, score + FROM param_test + WHERE id = $1 + "; + + let select_result = query_one_with_params(select_query, &query_params); + assert!(select_result.is_ok()); + + let row = select_result.unwrap(); + let name: String = row.get(1); + let value: i32 = row.get(2); + let active: bool = row.get(3); + let score: f64 = row.get(4); + + assert_eq!(name, "test_name"); + assert_eq!(value, 42); + assert_eq!(active, true); + assert_eq!(score, 3.14); + + // Test optional parameters + let mut update_params = QueryParams::new(); + update_params.add_int(100); + update_params.add_opt::(None); + update_params.add_int(id); + + let update_query = " + UPDATE param_test + SET value = $1, name = COALESCE($2, name) + WHERE id = $3 + "; + + let update_result = execute_with_params(update_query, &update_params); + assert!(update_result.is_ok()); + assert_eq!(update_result.unwrap(), 1); // 1 row affected + + // Verify update + let verify_result = query_one_with_params(select_query, &query_params); + assert!(verify_result.is_ok()); + + let row = verify_result.unwrap(); + let name: String = row.get(1); + let value: i32 = row.get(2); + + assert_eq!(name, "test_name"); // Name should be unchanged + assert_eq!(value, 100); // Value should be updated + + // Test query_opt_with_params + let mut nonexistent_params = QueryParams::new(); + nonexistent_params.add_int(9999); // ID that doesn't exist + + let opt_query = " + SELECT id, name + FROM param_test + WHERE id = $1 + "; + + let opt_result = query_opt_with_params(opt_query, &nonexistent_params); + assert!(opt_result.is_ok()); + assert!(opt_result.unwrap().is_none()); + + // Clean up + let delete_query = " + DELETE FROM param_test + WHERE id = $1 + "; + + let delete_result = execute_with_params(delete_query, &query_params); + assert!(delete_result.is_ok()); + assert_eq!(delete_result.unwrap(), 1); // 1 row affected + } + fn test_error_handling() { if !is_postgres_available() { return; } - + // Test invalid SQL let invalid_query = "SELECT * FROM nonexistent_table"; let invalid_result = query(invalid_query, &[]); assert!(invalid_result.is_err()); - + // Test parameter type mismatch let mismatch_query = "SELECT $1::integer"; let mismatch_result = query(mismatch_query, &[&"not_an_integer"]); assert!(mismatch_result.is_err()); - + // Test query_one with no results let empty_query = "SELECT * FROM pg_tables WHERE tablename = 'nonexistent_table'"; let empty_result = query_one(empty_query, &[]); assert!(empty_result.is_err()); - + // Test query_opt with no results let opt_query = "SELECT * FROM pg_tables WHERE tablename = 'nonexistent_table'"; let opt_result = query_opt(opt_query, &[]); assert!(opt_result.is_ok()); assert!(opt_result.unwrap().is_none()); } + + #[test] + fn test_notify() { + if !is_postgres_available() { + println!("Skipping PostgreSQL notification tests - PostgreSQL server not available"); + return; + } + + println!("Running PostgreSQL notification tests..."); + + // Test sending a notification + let result = notify("test_channel", "test_payload"); + assert!(result.is_ok()); + + // Test sending a notification with the pool + let result = notify_with_pool("test_channel_pool", "test_payload_pool"); + assert!(result.is_ok()); + } }