db/ourdb/src/lookup.rs
2025-04-20 06:52:35 +02:00

525 lines
18 KiB
Rust

use std::fs::{self, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use crate::error::Error;
use crate::location::Location;
const DATA_FILE_NAME: &str = "data";
const INCREMENTAL_FILE_NAME: &str = ".inc";
/// Configuration for creating a new lookup table
pub struct LookupConfig {
/// Size of the lookup table
pub size: u32,
/// Size of each entry in bytes (2-6)
/// - 2: For databases with < 65,536 records (single file)
/// - 3: For databases with < 16,777,216 records (single file)
/// - 4: For databases with < 4,294,967,296 records (single file)
/// - 6: For large databases requiring multiple files
pub keysize: u8,
/// Path for disk-based lookup
pub lookuppath: String,
/// Whether to use incremental mode
pub incremental_mode: bool,
}
/// Lookup table maps keys to physical locations in the backend storage
pub struct LookupTable {
/// Size of each entry in bytes (2-6)
keysize: u8,
/// Path for disk-based lookup
lookuppath: String,
/// In-memory data for memory-based lookup
data: Vec<u8>,
/// Next empty slot if incremental mode is enabled
incremental: Option<u32>,
}
impl LookupTable {
/// Returns the keysize of this lookup table
pub fn keysize(&self) -> u8 {
self.keysize
}
/// Creates a new lookup table with the given configuration
pub fn new(config: LookupConfig) -> Result<Self, Error> {
// Verify keysize is valid
if ![2, 3, 4, 6].contains(&config.keysize) {
return Err(Error::InvalidOperation(format!("Invalid keysize: {}", config.keysize)));
}
let incremental = if config.incremental_mode {
Some(get_incremental_info(&config)?)
} else {
None
};
if !config.lookuppath.is_empty() {
// Create directory if it doesn't exist
fs::create_dir_all(&config.lookuppath)?;
// For disk-based lookup, create empty file if it doesn't exist
let data_path = Path::new(&config.lookuppath).join(DATA_FILE_NAME);
if !data_path.exists() {
let data = vec![0u8; config.size as usize * config.keysize as usize];
fs::write(&data_path, &data)?;
}
Ok(LookupTable {
data: Vec::new(),
keysize: config.keysize,
lookuppath: config.lookuppath,
incremental,
})
} else {
// For memory-based lookup
Ok(LookupTable {
data: vec![0u8; config.size as usize * config.keysize as usize],
keysize: config.keysize,
lookuppath: String::new(),
incremental,
})
}
}
/// Gets a location for the given ID
pub fn get(&self, id: u32) -> Result<Location, Error> {
let entry_size = self.keysize as usize;
if !self.lookuppath.is_empty() {
// Disk-based lookup
let data_path = Path::new(&self.lookuppath).join(DATA_FILE_NAME);
// Check file size first
let file_size = fs::metadata(&data_path)?.len();
let start_pos = id as u64 * entry_size as u64;
if start_pos + entry_size as u64 > file_size {
return Err(Error::LookupError(format!(
"Invalid read for get in lut: {}: {} would exceed file size {}",
self.lookuppath, start_pos + entry_size as u64, file_size
)));
}
// Read directly from file
let mut file = File::open(&data_path)?;
file.seek(SeekFrom::Start(start_pos))?;
let mut data = vec![0u8; entry_size];
let bytes_read = file.read(&mut data)?;
if bytes_read < entry_size {
return Err(Error::LookupError(format!(
"Incomplete read: expected {} bytes but got {}",
entry_size, bytes_read
)));
}
return Location::from_bytes(&data, self.keysize);
}
// Memory-based lookup
if (id * self.keysize as u32) as usize >= self.data.len() {
return Err(Error::LookupError("Index out of bounds".to_string()));
}
let start = (id * self.keysize as u32) as usize;
let end = start + entry_size;
Location::from_bytes(&self.data[start..end], self.keysize)
}
/// Sets a location for the given ID
pub fn set(&mut self, id: u32, location: Location) -> Result<(), Error> {
let entry_size = self.keysize as usize;
// Handle incremental mode
if let Some(incremental) = self.incremental {
if id == incremental {
self.increment_index()?;
}
if id > incremental {
return Err(Error::InvalidOperation(
"Cannot set ID for insertions when incremental mode is enabled".to_string()
));
}
}
// Convert location to bytes based on keysize
let location_bytes = match self.keysize {
2 => {
if location.file_nr != 0 {
return Err(Error::InvalidOperation("file_nr must be 0 for keysize=2".to_string()));
}
if location.position > 0xFFFF {
return Err(Error::InvalidOperation(
"position exceeds max value for keysize=2 (max 65535)".to_string()
));
}
vec![(location.position >> 8) as u8, location.position as u8]
},
3 => {
if location.file_nr != 0 {
return Err(Error::InvalidOperation("file_nr must be 0 for keysize=3".to_string()));
}
if location.position > 0xFFFFFF {
return Err(Error::InvalidOperation(
"position exceeds max value for keysize=3 (max 16777215)".to_string()
));
}
vec![
(location.position >> 16) as u8,
(location.position >> 8) as u8,
location.position as u8
]
},
4 => {
if location.file_nr != 0 {
return Err(Error::InvalidOperation("file_nr must be 0 for keysize=4".to_string()));
}
vec![
(location.position >> 24) as u8,
(location.position >> 16) as u8,
(location.position >> 8) as u8,
location.position as u8
]
},
6 => {
// Full location with file_nr and position
location.to_bytes()
},
_ => return Err(Error::InvalidOperation(format!("Invalid keysize: {}", self.keysize))),
};
if !self.lookuppath.is_empty() {
// Disk-based lookup
let data_path = Path::new(&self.lookuppath).join(DATA_FILE_NAME);
let mut file = OpenOptions::new().write(true).open(data_path)?;
let start_pos = id as u64 * entry_size as u64;
file.seek(SeekFrom::Start(start_pos))?;
file.write_all(&location_bytes)?;
} else {
// Memory-based lookup
let start = (id * self.keysize as u32) as usize;
if start + entry_size > self.data.len() {
return Err(Error::LookupError("Index out of bounds".to_string()));
}
for (i, &byte) in location_bytes.iter().enumerate() {
self.data[start + i] = byte;
}
}
Ok(())
}
/// Deletes an entry for the given ID
pub fn delete(&mut self, id: u32) -> Result<(), Error> {
// Set location to all zeros
self.set(id, Location::default())
}
/// Gets the next available ID in incremental mode
pub fn get_next_id(&self) -> Result<u32, Error> {
let incremental = self.incremental.ok_or_else(||
Error::InvalidOperation("Lookup table not in incremental mode".to_string())
)?;
let table_size = if !self.lookuppath.is_empty() {
let data_path = Path::new(&self.lookuppath).join(DATA_FILE_NAME);
fs::metadata(data_path)?.len() as u32
} else {
self.data.len() as u32
};
if incremental * self.keysize as u32 >= table_size {
return Err(Error::LookupError("Lookup table is full".to_string()));
}
Ok(incremental)
}
/// Increments the index in incremental mode
pub fn increment_index(&mut self) -> Result<(), Error> {
let mut incremental = self.incremental.ok_or_else(||
Error::InvalidOperation("Lookup table not in incremental mode".to_string())
)?;
incremental += 1;
self.incremental = Some(incremental);
if !self.lookuppath.is_empty() {
let inc_path = Path::new(&self.lookuppath).join(INCREMENTAL_FILE_NAME);
fs::write(inc_path, incremental.to_string())?;
}
Ok(())
}
/// Exports the lookup table to a file
pub fn export_data(&self, path: &str) -> Result<(), Error> {
if !self.lookuppath.is_empty() {
// For disk-based lookup, just copy the file
let data_path = Path::new(&self.lookuppath).join(DATA_FILE_NAME);
fs::copy(data_path, path)?;
} else {
// For memory-based lookup, write the data to file
fs::write(path, &self.data)?;
}
Ok(())
}
/// Imports the lookup table from a file
pub fn import_data(&mut self, path: &str) -> Result<(), Error> {
if !self.lookuppath.is_empty() {
// For disk-based lookup, copy the file
let data_path = Path::new(&self.lookuppath).join(DATA_FILE_NAME);
fs::copy(path, data_path)?;
} else {
// For memory-based lookup, read the data from file
self.data = fs::read(path)?;
}
Ok(())
}
/// Exports only non-zero entries to save space
pub fn export_sparse(&self, path: &str) -> Result<(), Error> {
let mut output = Vec::new();
let entry_size = self.keysize as usize;
if !self.lookuppath.is_empty() {
// For disk-based lookup
let data_path = Path::new(&self.lookuppath).join(DATA_FILE_NAME);
let mut file = File::open(&data_path)?;
let file_size = fs::metadata(&data_path)?.len();
let max_entries = file_size / entry_size as u64;
for id in 0..max_entries {
file.seek(SeekFrom::Start(id * entry_size as u64))?;
let mut buffer = vec![0u8; entry_size];
let bytes_read = file.read(&mut buffer)?;
if bytes_read < entry_size {
break;
}
// Check if entry is non-zero
if buffer.iter().any(|&b| b != 0) {
// Write ID (4 bytes) + entry
output.extend_from_slice(&(id as u32).to_be_bytes());
output.extend_from_slice(&buffer);
}
}
} else {
// For memory-based lookup
let max_entries = self.data.len() / entry_size;
for id in 0..max_entries {
let start = id * entry_size;
let entry = &self.data[start..start + entry_size];
// Check if entry is non-zero
if entry.iter().any(|&b| b != 0) {
// Write ID (4 bytes) + entry
output.extend_from_slice(&(id as u32).to_be_bytes());
output.extend_from_slice(entry);
}
}
}
// Write the output to file
fs::write(path, &output)?;
Ok(())
}
/// Imports sparse data (only non-zero entries)
pub fn import_sparse(&mut self, path: &str) -> Result<(), Error> {
let data = fs::read(path)?;
let entry_size = self.keysize as usize;
let record_size = 4 + entry_size; // ID (4 bytes) + entry
if data.len() % record_size != 0 {
return Err(Error::DataCorruption(
"Invalid sparse data format: size mismatch".to_string()
));
}
for chunk_start in (0..data.len()).step_by(record_size) {
if chunk_start + record_size > data.len() {
break;
}
// Extract ID (4 bytes)
let id_bytes = &data[chunk_start..chunk_start + 4];
let id = u32::from_be_bytes([id_bytes[0], id_bytes[1], id_bytes[2], id_bytes[3]]);
// Extract entry
let entry = &data[chunk_start + 4..chunk_start + record_size];
// Create location from entry
let location = Location::from_bytes(entry, self.keysize)?;
// Set the entry
self.set(id, location)?;
}
Ok(())
}
/// Finds the highest ID with a non-zero entry
pub fn find_last_entry(&mut self) -> Result<u32, Error> {
let mut last_id = 0u32;
let entry_size = self.keysize as usize;
if !self.lookuppath.is_empty() {
// For disk-based lookup
let data_path = Path::new(&self.lookuppath).join(DATA_FILE_NAME);
let mut file = File::open(&data_path)?;
let file_size = fs::metadata(&data_path)?.len();
let mut buffer = vec![0u8; entry_size];
let mut pos = 0u32;
while (pos as u64 * entry_size as u64) < file_size {
file.seek(SeekFrom::Start(pos as u64 * entry_size as u64))?;
let bytes_read = file.read(&mut buffer)?;
if bytes_read == 0 || bytes_read < entry_size {
break;
}
let location = Location::from_bytes(&buffer, self.keysize)?;
if location.position != 0 || location.file_nr != 0 {
last_id = pos;
}
pos += 1;
}
} else {
// For memory-based lookup
for i in 0..(self.data.len() / entry_size) as u32 {
if let Ok(location) = self.get(i) {
if location.position != 0 || location.file_nr != 0 {
last_id = i;
}
}
}
}
Ok(last_id)
}
}
/// Helper function to get the incremental value
fn get_incremental_info(config: &LookupConfig) -> Result<u32, Error> {
if !config.incremental_mode {
return Ok(0);
}
if !config.lookuppath.is_empty() {
let inc_path = Path::new(&config.lookuppath).join(INCREMENTAL_FILE_NAME);
if !inc_path.exists() {
// Create a separate file for storing the incremental value
fs::write(&inc_path, "1")?;
}
let inc_str = fs::read_to_string(&inc_path)?;
let incremental = match inc_str.trim().parse::<u32>() {
Ok(val) => val,
Err(_) => {
// If the value is invalid, reset it to 1
fs::write(&inc_path, "1")?;
1
}
};
Ok(incremental)
} else {
// For memory-based lookup, start with 1
Ok(1)
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use super::*;
use std::env::temp_dir;
use std::time::{SystemTime, UNIX_EPOCH};
fn get_temp_dir() -> PathBuf {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
temp_dir().join(format!("ourdb_lookup_test_{}", timestamp))
}
#[test]
fn test_memory_lookup() {
let config = LookupConfig {
size: 1000,
keysize: 4,
lookuppath: String::new(),
incremental_mode: true,
};
let mut lookup = LookupTable::new(config).unwrap();
// Test set and get
let location = Location {
file_nr: 0,
position: 12345,
};
lookup.set(1, location).unwrap();
let retrieved = lookup.get(1).unwrap();
assert_eq!(retrieved.file_nr, location.file_nr);
assert_eq!(retrieved.position, location.position);
// Test incremental mode
let next_id = lookup.get_next_id().unwrap();
assert_eq!(next_id, 2);
lookup.increment_index().unwrap();
let next_id = lookup.get_next_id().unwrap();
assert_eq!(next_id, 3);
}
#[test]
fn test_disk_lookup() {
let temp_dir = get_temp_dir();
fs::create_dir_all(&temp_dir).unwrap();
let config = LookupConfig {
size: 1000,
keysize: 4,
lookuppath: temp_dir.to_string_lossy().to_string(),
incremental_mode: true,
};
let mut lookup = LookupTable::new(config).unwrap();
// Test set and get
let location = Location {
file_nr: 0,
position: 12345,
};
lookup.set(1, location).unwrap();
let retrieved = lookup.get(1).unwrap();
assert_eq!(retrieved.file_nr, location.file_nr);
assert_eq!(retrieved.position, location.position);
// Clean up
fs::remove_dir_all(temp_dir).unwrap();
}
}