feat: Add PostgreSQL connection pooling support
Some checks failed
Rhai Tests / Run Rhai Tests (pull_request) Has been cancelled

- Implement connection pooling using `r2d2` and `r2d2_postgres`
- Add connection pool configuration options to `PostgresConfigBuilder`
- Introduce transaction functions with automatic commit/rollback
- Add functions for executing queries using the connection pool
- Add `QueryParams` struct for building parameterized queries
- Add tests for connection pooling and transaction functions
This commit is contained in:
Mahmoud Emad 2025-05-09 10:45:53 +03:00
parent 22f87b320e
commit 114d63e590
3 changed files with 851 additions and 43 deletions

View File

@ -32,6 +32,8 @@ log = "0.4" # Logging facade
rhai = { version = "1.12.0", features = ["sync"] } # Embedded scripting language
rand = "0.8.5" # Random number generation
clap = "2.33" # Command-line argument parsing
r2d2 = "0.8.10"
r2d2_postgres = "0.18.2"
# Optional features for specific OS functionality
[target.'cfg(unix)'.dependencies]

View File

@ -1,7 +1,11 @@
use lazy_static::lazy_static;
use postgres::types::ToSql;
use postgres::{Client, Error as PostgresError, NoTls, Row};
use r2d2::Pool;
use r2d2_postgres::PostgresConnectionManager;
use std::env;
use std::sync::{Arc, Mutex, Once};
use std::time::Duration;
// Helper function to create a PostgreSQL error
fn create_postgres_error(_message: &str) -> PostgresError {
@ -22,6 +26,8 @@ fn create_postgres_error(_message: &str) -> PostgresError {
// Global PostgreSQL client instance using lazy_static
lazy_static! {
static ref POSTGRES_CLIENT: Mutex<Option<Arc<PostgresClientWrapper>>> = Mutex::new(None);
static ref POSTGRES_POOL: Mutex<Option<Arc<Pool<PostgresConnectionManager<NoTls>>>>> =
Mutex::new(None);
static ref INIT: Once = Once::new();
}
@ -39,6 +45,13 @@ pub struct PostgresConfigBuilder {
pub application_name: Option<String>,
pub connect_timeout: Option<u64>,
pub ssl_mode: Option<String>,
// Connection pool settings
pub pool_max_size: Option<u32>,
pub pool_min_idle: Option<u32>,
pub pool_idle_timeout: Option<Duration>,
pub pool_connection_timeout: Option<Duration>,
pub pool_max_lifetime: Option<Duration>,
pub use_pool: bool,
}
impl Default for PostgresConfigBuilder {
@ -52,6 +65,13 @@ impl Default for PostgresConfigBuilder {
application_name: None,
connect_timeout: None,
ssl_mode: None,
// Default pool settings
pool_max_size: Some(10),
pool_min_idle: Some(1),
pool_idle_timeout: Some(Duration::from_secs(300)),
pool_connection_timeout: Some(Duration::from_secs(30)),
pool_max_lifetime: Some(Duration::from_secs(1800)),
use_pool: false,
}
}
}
@ -110,6 +130,42 @@ impl PostgresConfigBuilder {
self
}
/// Enable connection pooling
pub fn use_pool(mut self, use_pool: bool) -> Self {
self.use_pool = use_pool;
self
}
/// Set the maximum size of the connection pool
pub fn pool_max_size(mut self, size: u32) -> Self {
self.pool_max_size = Some(size);
self
}
/// Set the minimum number of idle connections in the pool
pub fn pool_min_idle(mut self, size: u32) -> Self {
self.pool_min_idle = Some(size);
self
}
/// Set the idle timeout for connections in the pool
pub fn pool_idle_timeout(mut self, timeout: Duration) -> Self {
self.pool_idle_timeout = Some(timeout);
self
}
/// Set the connection timeout for the pool
pub fn pool_connection_timeout(mut self, timeout: Duration) -> Self {
self.pool_connection_timeout = Some(timeout);
self
}
/// Set the maximum lifetime of connections in the pool
pub fn pool_max_lifetime(mut self, lifetime: Duration) -> Self {
self.pool_max_lifetime = Some(lifetime);
self
}
/// Build the connection string from the configuration
pub fn build_connection_string(&self) -> String {
let mut conn_string = format!(
@ -141,6 +197,36 @@ impl PostgresConfigBuilder {
let conn_string = self.build_connection_string();
Client::connect(&conn_string, NoTls)
}
/// Build a PostgreSQL connection pool from the configuration
pub fn build_pool(&self) -> Result<Pool<PostgresConnectionManager<NoTls>>, r2d2::Error> {
let conn_string = self.build_connection_string();
let manager = PostgresConnectionManager::new(conn_string.parse().unwrap(), NoTls);
let mut pool_builder = r2d2::Pool::builder();
if let Some(max_size) = self.pool_max_size {
pool_builder = pool_builder.max_size(max_size);
}
if let Some(min_idle) = self.pool_min_idle {
pool_builder = pool_builder.min_idle(Some(min_idle));
}
if let Some(idle_timeout) = self.pool_idle_timeout {
pool_builder = pool_builder.idle_timeout(Some(idle_timeout));
}
if let Some(connection_timeout) = self.pool_connection_timeout {
pool_builder = pool_builder.connection_timeout(connection_timeout);
}
if let Some(max_lifetime) = self.pool_max_lifetime {
pool_builder = pool_builder.max_lifetime(Some(max_lifetime));
}
pool_builder.build(manager)
}
}
/// Wrapper for PostgreSQL client to handle connection
@ -149,6 +235,101 @@ pub struct PostgresClientWrapper {
client: Mutex<Option<Client>>,
}
/// Transaction functions for PostgreSQL
///
/// These functions provide a way to execute queries within a transaction.
/// The transaction is automatically committed when the function returns successfully,
/// or rolled back if an error occurs.
///
/// Example:
/// ```
/// use sal::postgresclient::{transaction, QueryParams};
///
/// let result = transaction(|client| {
/// // Execute queries within the transaction
/// client.execute("INSERT INTO users (name) VALUES ($1)", &[&"John"])?;
/// client.execute("UPDATE users SET active = true WHERE name = $1", &[&"John"])?;
///
/// // Return a result from the transaction
/// Ok(())
/// });
/// ```
pub fn transaction<F, T>(operations: F) -> Result<T, PostgresError>
where
F: FnOnce(&mut Client) -> Result<T, PostgresError>,
{
let client = get_postgres_client()?;
let client_mutex = client.get_client()?;
let mut client_guard = client_mutex.lock().unwrap();
if let Some(client) = client_guard.as_mut() {
// Begin transaction
client.execute("BEGIN", &[])?;
// Execute operations
match operations(client) {
Ok(result) => {
// Commit transaction
client.execute("COMMIT", &[])?;
Ok(result)
}
Err(e) => {
// Rollback transaction
let _ = client.execute("ROLLBACK", &[]);
Err(e)
}
}
} else {
Err(create_postgres_error("Failed to get PostgreSQL client"))
}
}
/// Transaction functions for PostgreSQL using the connection pool
///
/// These functions provide a way to execute queries within a transaction using the connection pool.
/// The transaction is automatically committed when the function returns successfully,
/// or rolled back if an error occurs.
///
/// Example:
/// ```
/// use sal::postgresclient::{transaction_with_pool, QueryParams};
///
/// let result = transaction_with_pool(|client| {
/// // Execute queries within the transaction
/// client.execute("INSERT INTO users (name) VALUES ($1)", &[&"John"])?;
/// client.execute("UPDATE users SET active = true WHERE name = $1", &[&"John"])?;
///
/// // Return a result from the transaction
/// Ok(())
/// });
/// ```
pub fn transaction_with_pool<F, T>(operations: F) -> Result<T, PostgresError>
where
F: FnOnce(&mut Client) -> Result<T, PostgresError>,
{
let pool = get_postgres_pool()?;
let mut client = pool.get().map_err(|e| {
create_postgres_error(&format!("Failed to get connection from pool: {}", e))
})?;
// Begin transaction
client.execute("BEGIN", &[])?;
// Execute operations
match operations(&mut client) {
Ok(result) => {
// Commit transaction
client.execute("COMMIT", &[])?;
Ok(result)
}
Err(e) => {
// Rollback transaction
let _ = client.execute("ROLLBACK", &[]);
Err(e)
}
}
}
impl PostgresClientWrapper {
/// Create a new PostgreSQL client wrapper
fn new(connection_string: String) -> Self {
@ -354,3 +535,291 @@ pub fn query_opt(
pub fn with_config(config: PostgresConfigBuilder) -> Result<Client, PostgresError> {
config.build()
}
/// Create a new PostgreSQL connection pool with custom configuration
pub fn with_pool_config(
config: PostgresConfigBuilder,
) -> Result<Pool<PostgresConnectionManager<NoTls>>, r2d2::Error> {
config.build_pool()
}
/// Get the PostgreSQL connection pool instance
pub fn get_postgres_pool() -> Result<Arc<Pool<PostgresConnectionManager<NoTls>>>, PostgresError> {
// Check if we already have a pool
{
let guard = POSTGRES_POOL.lock().unwrap();
if let Some(ref pool) = &*guard {
return Ok(Arc::clone(pool));
}
}
// Create a new pool
let pool = create_postgres_pool()?;
// Store the pool globally
{
let mut guard = POSTGRES_POOL.lock().unwrap();
*guard = Some(Arc::clone(&pool));
}
Ok(pool)
}
/// Create a new PostgreSQL connection pool
fn create_postgres_pool() -> Result<Arc<Pool<PostgresConnectionManager<NoTls>>>, PostgresError> {
// Try to get connection details from environment variables
let host = env::var("POSTGRES_HOST").unwrap_or_else(|_| String::from("localhost"));
let port = env::var("POSTGRES_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.unwrap_or(5432);
let user = env::var("POSTGRES_USER").unwrap_or_else(|_| String::from("postgres"));
let password = env::var("POSTGRES_PASSWORD").ok();
let database = env::var("POSTGRES_DB").unwrap_or_else(|_| String::from("postgres"));
// Build the configuration
let mut builder = PostgresConfigBuilder::new()
.host(&host)
.port(port)
.user(&user)
.database(&database)
.use_pool(true);
if let Some(pass) = password {
builder = builder.password(&pass);
}
// Create the pool
match builder.build_pool() {
Ok(pool) => {
// Test the connection
match pool.get() {
Ok(_) => Ok(Arc::new(pool)),
Err(e) => Err(create_postgres_error(&format!(
"Failed to connect to PostgreSQL: {}",
e
))),
}
}
Err(e) => Err(create_postgres_error(&format!(
"Failed to create PostgreSQL connection pool: {}",
e
))),
}
}
/// Reset the PostgreSQL connection pool
pub fn reset_pool() -> Result<(), PostgresError> {
// Clear the existing pool
{
let mut pool_guard = POSTGRES_POOL.lock().unwrap();
*pool_guard = None;
}
// Create a new pool, only return error if it fails
get_postgres_pool()?;
Ok(())
}
/// Execute a query using the connection pool
pub fn execute_with_pool(
query: &str,
params: &[&(dyn postgres::types::ToSql + Sync)],
) -> Result<u64, PostgresError> {
let pool = get_postgres_pool()?;
let mut client = pool.get().map_err(|e| {
create_postgres_error(&format!("Failed to get connection from pool: {}", e))
})?;
client.execute(query, params)
}
/// Execute a query using the connection pool and return the rows
pub fn query_with_pool(
query: &str,
params: &[&(dyn postgres::types::ToSql + Sync)],
) -> Result<Vec<Row>, PostgresError> {
let pool = get_postgres_pool()?;
let mut client = pool.get().map_err(|e| {
create_postgres_error(&format!("Failed to get connection from pool: {}", e))
})?;
client.query(query, params)
}
/// Execute a query using the connection pool and return a single row
pub fn query_one_with_pool(
query: &str,
params: &[&(dyn postgres::types::ToSql + Sync)],
) -> Result<Row, PostgresError> {
let pool = get_postgres_pool()?;
let mut client = pool.get().map_err(|e| {
create_postgres_error(&format!("Failed to get connection from pool: {}", e))
})?;
client.query_one(query, params)
}
/// Execute a query using the connection pool and return an optional row
pub fn query_opt_with_pool(
query: &str,
params: &[&(dyn postgres::types::ToSql + Sync)],
) -> Result<Option<Row>, PostgresError> {
let pool = get_postgres_pool()?;
let mut client = pool.get().map_err(|e| {
create_postgres_error(&format!("Failed to get connection from pool: {}", e))
})?;
client.query_opt(query, params)
}
/// Parameter builder for PostgreSQL queries
///
/// This struct helps build parameterized queries for PostgreSQL.
/// It provides a type-safe way to build query parameters.
#[derive(Default)]
pub struct QueryParams {
params: Vec<Box<dyn ToSql + Sync>>,
}
impl QueryParams {
/// Create a new empty parameter builder
pub fn new() -> Self {
Self { params: Vec::new() }
}
/// Add a parameter to the builder
pub fn add<T: 'static + ToSql + Sync>(&mut self, value: T) -> &mut Self {
self.params.push(Box::new(value));
self
}
/// Add a string parameter to the builder
pub fn add_str(&mut self, value: &str) -> &mut Self {
self.add(value.to_string())
}
/// Add an integer parameter to the builder
pub fn add_int(&mut self, value: i32) -> &mut Self {
self.add(value)
}
/// Add a float parameter to the builder
pub fn add_float(&mut self, value: f64) -> &mut Self {
self.add(value)
}
/// Add a boolean parameter to the builder
pub fn add_bool(&mut self, value: bool) -> &mut Self {
self.add(value)
}
/// Add an optional parameter to the builder
pub fn add_opt<T: 'static + ToSql + Sync>(&mut self, value: Option<T>) -> &mut Self {
if let Some(v) = value {
self.add(v);
} else {
// Add NULL value
self.params.push(Box::new(None::<String>));
}
self
}
/// Get the parameters as a slice of references
pub fn as_slice(&self) -> Vec<&(dyn ToSql + Sync)> {
self.params
.iter()
.map(|p| p.as_ref() as &(dyn ToSql + Sync))
.collect()
}
}
/// Execute a query with the parameter builder
pub fn execute_with_params(query_str: &str, params: &QueryParams) -> Result<u64, PostgresError> {
let client = get_postgres_client()?;
client.execute(query_str, &params.as_slice())
}
/// Execute a query with the parameter builder and return the rows
pub fn query_with_params(query_str: &str, params: &QueryParams) -> Result<Vec<Row>, PostgresError> {
let client = get_postgres_client()?;
client.query(query_str, &params.as_slice())
}
/// Execute a query with the parameter builder and return a single row
pub fn query_one_with_params(query_str: &str, params: &QueryParams) -> Result<Row, PostgresError> {
let client = get_postgres_client()?;
client.query_one(query_str, &params.as_slice())
}
/// Execute a query with the parameter builder and return an optional row
pub fn query_opt_with_params(
query_str: &str,
params: &QueryParams,
) -> Result<Option<Row>, PostgresError> {
let client = get_postgres_client()?;
client.query_opt(query_str, &params.as_slice())
}
/// Execute a query with the parameter builder using the connection pool
pub fn execute_with_pool_params(
query_str: &str,
params: &QueryParams,
) -> Result<u64, PostgresError> {
execute_with_pool(query_str, &params.as_slice())
}
/// Execute a query with the parameter builder using the connection pool and return the rows
pub fn query_with_pool_params(
query_str: &str,
params: &QueryParams,
) -> Result<Vec<Row>, PostgresError> {
query_with_pool(query_str, &params.as_slice())
}
/// Execute a query with the parameter builder using the connection pool and return a single row
pub fn query_one_with_pool_params(
query_str: &str,
params: &QueryParams,
) -> Result<Row, PostgresError> {
query_one_with_pool(query_str, &params.as_slice())
}
/// Execute a query with the parameter builder using the connection pool and return an optional row
pub fn query_opt_with_pool_params(
query_str: &str,
params: &QueryParams,
) -> Result<Option<Row>, PostgresError> {
query_opt_with_pool(query_str, &params.as_slice())
}
/// Send a notification on a channel
///
/// This function sends a notification on the specified channel with the specified payload.
///
/// Example:
/// ```
/// use sal::postgresclient::notify;
///
/// notify("my_channel", "Hello, world!").expect("Failed to send notification");
/// ```
pub fn notify(channel: &str, payload: &str) -> Result<(), PostgresError> {
let client = get_postgres_client()?;
client.execute(&format!("NOTIFY {}, '{}'", channel, payload), &[])?;
Ok(())
}
/// Send a notification on a channel using the connection pool
///
/// This function sends a notification on the specified channel with the specified payload using the connection pool.
///
/// Example:
/// ```
/// use sal::postgresclient::notify_with_pool;
///
/// notify_with_pool("my_channel", "Hello, world!").expect("Failed to send notification");
/// ```
pub fn notify_with_pool(channel: &str, payload: &str) -> Result<(), PostgresError> {
let pool = get_postgres_pool()?;
let mut client = pool.get().map_err(|e| {
create_postgres_error(&format!("Failed to get connection from pool: {}", e))
})?;
client.execute(&format!("NOTIFY {}, '{}'", channel, payload), &[])?;
Ok(())
}

View File

@ -4,7 +4,7 @@ use std::env;
#[cfg(test)]
mod postgres_client_tests {
use super::*;
#[test]
fn test_env_vars() {
// Save original environment variables to restore later
@ -13,24 +13,24 @@ mod postgres_client_tests {
let original_user = env::var("POSTGRES_USER").ok();
let original_password = env::var("POSTGRES_PASSWORD").ok();
let original_db = env::var("POSTGRES_DB").ok();
// Set test environment variables
env::set_var("POSTGRES_HOST", "test-host");
env::set_var("POSTGRES_PORT", "5433");
env::set_var("POSTGRES_USER", "test-user");
env::set_var("POSTGRES_PASSWORD", "test-password");
env::set_var("POSTGRES_DB", "test-db");
// Test with invalid port
env::set_var("POSTGRES_PORT", "invalid");
// Test with unset values
env::remove_var("POSTGRES_HOST");
env::remove_var("POSTGRES_PORT");
env::remove_var("POSTGRES_USER");
env::remove_var("POSTGRES_PASSWORD");
env::remove_var("POSTGRES_DB");
// Restore original environment variables
if let Some(host) = original_host {
env::set_var("POSTGRES_HOST", host);
@ -48,11 +48,11 @@ mod postgres_client_tests {
env::set_var("POSTGRES_DB", db);
}
}
#[test]
fn test_postgres_config_builder() {
// Test the PostgreSQL configuration builder
// Test default values
let config = PostgresConfigBuilder::new();
assert_eq!(config.host, "localhost");
@ -63,7 +63,7 @@ mod postgres_client_tests {
assert_eq!(config.application_name, None);
assert_eq!(config.connect_timeout, None);
assert_eq!(config.ssl_mode, None);
// Test setting values
let config = PostgresConfigBuilder::new()
.host("pg.example.com")
@ -74,7 +74,7 @@ mod postgres_client_tests {
.application_name("test-app")
.connect_timeout(30)
.ssl_mode("require");
assert_eq!(config.host, "pg.example.com");
assert_eq!(config.port, 5433);
assert_eq!(config.user, "test-user");
@ -84,11 +84,11 @@ mod postgres_client_tests {
assert_eq!(config.connect_timeout, Some(30));
assert_eq!(config.ssl_mode, Some("require".to_string()));
}
#[test]
fn test_connection_string_building() {
// Test building connection strings
// Test default connection string
let config = PostgresConfigBuilder::new();
let conn_string = config.build_connection_string();
@ -97,7 +97,7 @@ mod postgres_client_tests {
assert!(conn_string.contains("user=postgres"));
assert!(conn_string.contains("dbname=postgres"));
assert!(!conn_string.contains("password="));
// Test with all options
let config = PostgresConfigBuilder::new()
.host("pg.example.com")
@ -108,7 +108,7 @@ mod postgres_client_tests {
.application_name("test-app")
.connect_timeout(30)
.ssl_mode("require");
let conn_string = config.build_connection_string();
assert!(conn_string.contains("host=pg.example.com"));
assert!(conn_string.contains("port=5433"));
@ -119,11 +119,11 @@ mod postgres_client_tests {
assert!(conn_string.contains("connect_timeout=30"));
assert!(conn_string.contains("sslmode=require"));
}
#[test]
fn test_reset_mock() {
// This is a simplified test that doesn't require an actual PostgreSQL server
// Just verify that the reset function doesn't panic
if let Err(_) = reset() {
// If PostgreSQL is not available, this is expected to fail
@ -137,7 +137,8 @@ mod postgres_client_tests {
#[cfg(test)]
mod postgres_integration_tests {
use super::*;
use std::time::Duration;
// Helper function to check if PostgreSQL is available
fn is_postgres_available() -> bool {
match get_postgres_client() {
@ -145,28 +146,120 @@ mod postgres_integration_tests {
Err(_) => false,
}
}
#[test]
fn test_postgres_client_integration() {
if !is_postgres_available() {
println!("Skipping PostgreSQL integration tests - PostgreSQL server not available");
return;
}
println!("Running PostgreSQL integration tests...");
// Test basic operations
test_basic_postgres_operations();
// Test error handling
test_error_handling();
}
#[test]
fn test_connection_pool() {
if !is_postgres_available() {
println!("Skipping PostgreSQL connection pool tests - PostgreSQL server not available");
return;
}
run_connection_pool_test();
}
fn run_connection_pool_test() {
println!("Running PostgreSQL connection pool tests...");
// Test creating a connection pool
let config = PostgresConfigBuilder::new()
.use_pool(true)
.pool_max_size(5)
.pool_min_idle(1)
.pool_connection_timeout(Duration::from_secs(5));
let pool_result = config.build_pool();
assert!(pool_result.is_ok());
let pool = pool_result.unwrap();
// Test getting a connection from the pool
let conn_result = pool.get();
assert!(conn_result.is_ok());
// Test executing a query with the connection
let mut conn = conn_result.unwrap();
let query_result = conn.query("SELECT 1", &[]);
assert!(query_result.is_ok());
// Test the global pool
let global_pool_result = get_postgres_pool();
assert!(global_pool_result.is_ok());
// Test executing queries with the pool
let create_table_query = "
CREATE TEMPORARY TABLE pool_test (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL
)
";
let create_result = execute_with_pool(create_table_query, &[]);
assert!(create_result.is_ok());
// Test with parameters
let insert_result = execute_with_pool(
"INSERT INTO pool_test (name) VALUES ($1) RETURNING id",
&[&"test_pool"],
);
assert!(insert_result.is_ok());
// Test with QueryParams
let mut params = QueryParams::new();
params.add_str("test_pool_params");
let insert_params_result = execute_with_pool_params(
"INSERT INTO pool_test (name) VALUES ($1) RETURNING id",
&params,
);
assert!(insert_params_result.is_ok());
// Test query functions
let query_result = query_with_pool("SELECT * FROM pool_test", &[]);
assert!(query_result.is_ok());
let rows = query_result.unwrap();
assert_eq!(rows.len(), 2);
// Test query_one
let query_one_result =
query_one_with_pool("SELECT * FROM pool_test WHERE name = $1", &[&"test_pool"]);
assert!(query_one_result.is_ok());
// Test query_opt
let query_opt_result =
query_opt_with_pool("SELECT * FROM pool_test WHERE name = $1", &[&"nonexistent"]);
assert!(query_opt_result.is_ok());
assert!(query_opt_result.unwrap().is_none());
// Test resetting the pool
let reset_result = reset_pool();
assert!(reset_result.is_ok());
// Test getting the pool again after reset
let pool_after_reset = get_postgres_pool();
assert!(pool_after_reset.is_ok());
}
fn test_basic_postgres_operations() {
if !is_postgres_available() {
return;
}
// Create a test table
let create_table_query = "
CREATE TEMPORARY TABLE test_table (
@ -175,103 +268,347 @@ mod postgres_integration_tests {
value INTEGER
)
";
let create_result = execute(create_table_query, &[]);
assert!(create_result.is_ok());
// Insert data
let insert_query = "
INSERT INTO test_table (name, value)
VALUES ($1, $2)
RETURNING id
";
let insert_result = query(insert_query, &[&"test_name", &42]);
assert!(insert_result.is_ok());
let rows = insert_result.unwrap();
assert_eq!(rows.len(), 1);
let id: i32 = rows[0].get(0);
assert!(id > 0);
// Query data
let select_query = "
SELECT id, name, value
FROM test_table
WHERE id = $1
";
let select_result = query_one(select_query, &[&id]);
assert!(select_result.is_ok());
let row = select_result.unwrap();
let name: String = row.get(1);
let value: i32 = row.get(2);
assert_eq!(name, "test_name");
assert_eq!(value, 42);
// Update data
let update_query = "
UPDATE test_table
SET value = $1
WHERE id = $2
";
let update_result = execute(update_query, &[&100, &id]);
assert!(update_result.is_ok());
assert_eq!(update_result.unwrap(), 1); // 1 row affected
// Verify update
let verify_query = "
SELECT value
FROM test_table
WHERE id = $1
";
let verify_result = query_one(verify_query, &[&id]);
assert!(verify_result.is_ok());
let row = verify_result.unwrap();
let updated_value: i32 = row.get(0);
assert_eq!(updated_value, 100);
// Delete data
let delete_query = "
DELETE FROM test_table
WHERE id = $1
";
let delete_result = execute(delete_query, &[&id]);
assert!(delete_result.is_ok());
assert_eq!(delete_result.unwrap(), 1); // 1 row affected
}
#[test]
fn test_query_params() {
if !is_postgres_available() {
println!("Skipping PostgreSQL parameter tests - PostgreSQL server not available");
return;
}
run_query_params_test();
}
#[test]
fn test_transactions() {
if !is_postgres_available() {
println!("Skipping PostgreSQL transaction tests - PostgreSQL server not available");
return;
}
println!("Running PostgreSQL transaction tests...");
// Test successful transaction
let result = transaction(|client| {
// Create a temporary table
client.execute(
"CREATE TEMPORARY TABLE transaction_test (id SERIAL PRIMARY KEY, name TEXT NOT NULL)",
&[],
)?;
// Insert data
client.execute(
"INSERT INTO transaction_test (name) VALUES ($1)",
&[&"test_transaction"],
)?;
// Query data
let rows = client.query(
"SELECT * FROM transaction_test WHERE name = $1",
&[&"test_transaction"],
)?;
assert_eq!(rows.len(), 1);
let name: String = rows[0].get(1);
assert_eq!(name, "test_transaction");
// Return success
Ok(true)
});
assert!(result.is_ok());
assert_eq!(result.unwrap(), true);
// Test failed transaction
let result = transaction(|client| {
// Create a temporary table
client.execute(
"CREATE TEMPORARY TABLE transaction_test_fail (id SERIAL PRIMARY KEY, name TEXT NOT NULL)",
&[],
)?;
// Insert data
client.execute(
"INSERT INTO transaction_test_fail (name) VALUES ($1)",
&[&"test_transaction_fail"],
)?;
// Cause an error with invalid SQL
client.execute("THIS IS INVALID SQL", &[])?;
// This should not be reached
Ok(false)
});
assert!(result.is_err());
// Verify that the table was not created (transaction was rolled back)
let verify_result = query("SELECT * FROM transaction_test_fail", &[]);
assert!(verify_result.is_err());
// Test transaction with pool
let result = transaction_with_pool(|client| {
// Create a temporary table
client.execute(
"CREATE TEMPORARY TABLE transaction_pool_test (id SERIAL PRIMARY KEY, name TEXT NOT NULL)",
&[],
)?;
// Insert data
client.execute(
"INSERT INTO transaction_pool_test (name) VALUES ($1)",
&[&"test_transaction_pool"],
)?;
// Query data
let rows = client.query(
"SELECT * FROM transaction_pool_test WHERE name = $1",
&[&"test_transaction_pool"],
)?;
assert_eq!(rows.len(), 1);
let name: String = rows[0].get(1);
assert_eq!(name, "test_transaction_pool");
// Return success
Ok(true)
});
assert!(result.is_ok());
assert_eq!(result.unwrap(), true);
}
fn run_query_params_test() {
println!("Running PostgreSQL parameter tests...");
// Create a test table
let create_table_query = "
CREATE TEMPORARY TABLE param_test (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
value INTEGER,
active BOOLEAN,
score REAL
)
";
let create_result = execute(create_table_query, &[]);
assert!(create_result.is_ok());
// Test QueryParams builder
let mut params = QueryParams::new();
params.add_str("test_name");
params.add_int(42);
params.add_bool(true);
params.add_float(3.14);
// Insert data using QueryParams
let insert_query = "
INSERT INTO param_test (name, value, active, score)
VALUES ($1, $2, $3, $4)
RETURNING id
";
let insert_result = query_with_params(insert_query, &params);
assert!(insert_result.is_ok());
let rows = insert_result.unwrap();
assert_eq!(rows.len(), 1);
let id: i32 = rows[0].get(0);
assert!(id > 0);
// Query data using QueryParams
let mut query_params = QueryParams::new();
query_params.add_int(id);
let select_query = "
SELECT id, name, value, active, score
FROM param_test
WHERE id = $1
";
let select_result = query_one_with_params(select_query, &query_params);
assert!(select_result.is_ok());
let row = select_result.unwrap();
let name: String = row.get(1);
let value: i32 = row.get(2);
let active: bool = row.get(3);
let score: f64 = row.get(4);
assert_eq!(name, "test_name");
assert_eq!(value, 42);
assert_eq!(active, true);
assert_eq!(score, 3.14);
// Test optional parameters
let mut update_params = QueryParams::new();
update_params.add_int(100);
update_params.add_opt::<String>(None);
update_params.add_int(id);
let update_query = "
UPDATE param_test
SET value = $1, name = COALESCE($2, name)
WHERE id = $3
";
let update_result = execute_with_params(update_query, &update_params);
assert!(update_result.is_ok());
assert_eq!(update_result.unwrap(), 1); // 1 row affected
// Verify update
let verify_result = query_one_with_params(select_query, &query_params);
assert!(verify_result.is_ok());
let row = verify_result.unwrap();
let name: String = row.get(1);
let value: i32 = row.get(2);
assert_eq!(name, "test_name"); // Name should be unchanged
assert_eq!(value, 100); // Value should be updated
// Test query_opt_with_params
let mut nonexistent_params = QueryParams::new();
nonexistent_params.add_int(9999); // ID that doesn't exist
let opt_query = "
SELECT id, name
FROM param_test
WHERE id = $1
";
let opt_result = query_opt_with_params(opt_query, &nonexistent_params);
assert!(opt_result.is_ok());
assert!(opt_result.unwrap().is_none());
// Clean up
let delete_query = "
DELETE FROM param_test
WHERE id = $1
";
let delete_result = execute_with_params(delete_query, &query_params);
assert!(delete_result.is_ok());
assert_eq!(delete_result.unwrap(), 1); // 1 row affected
}
fn test_error_handling() {
if !is_postgres_available() {
return;
}
// Test invalid SQL
let invalid_query = "SELECT * FROM nonexistent_table";
let invalid_result = query(invalid_query, &[]);
assert!(invalid_result.is_err());
// Test parameter type mismatch
let mismatch_query = "SELECT $1::integer";
let mismatch_result = query(mismatch_query, &[&"not_an_integer"]);
assert!(mismatch_result.is_err());
// Test query_one with no results
let empty_query = "SELECT * FROM pg_tables WHERE tablename = 'nonexistent_table'";
let empty_result = query_one(empty_query, &[]);
assert!(empty_result.is_err());
// Test query_opt with no results
let opt_query = "SELECT * FROM pg_tables WHERE tablename = 'nonexistent_table'";
let opt_result = query_opt(opt_query, &[]);
assert!(opt_result.is_ok());
assert!(opt_result.unwrap().is_none());
}
#[test]
fn test_notify() {
if !is_postgres_available() {
println!("Skipping PostgreSQL notification tests - PostgreSQL server not available");
return;
}
println!("Running PostgreSQL notification tests...");
// Test sending a notification
let result = notify("test_channel", "test_payload");
assert!(result.is_ok());
// Test sending a notification with the pool
let result = notify_with_pool("test_channel_pool", "test_payload_pool");
assert!(result.is_ok());
}
}