feat: Improve database synchronization and add deleted record handling

- Add `find_last_entry` function to efficiently determine the
  highest used ID in the lookup table, improving performance
  for non-incremental databases.
- Implement deleted record handling using a special marker,
  allowing for efficient tracking and synchronization of
  deleted entries.
- Enhance `get_last_index` to handle both incremental and
  non-incremental modes correctly, providing a unified
  interface for retrieving the last index.
- Modify `push_updates` to correctly handle initial syncs and
  account for deleted records during synchronization.
- Update `sync_updates` to correctly handle empty update data,
  indicating a record deletion.
- Add comprehensive tests for database synchronization, including
  edge cases like empty updates, invalid data, and various
  scenarios with deleted records.
This commit is contained in:
Mahmoud Emad
2025-02-27 12:09:33 +02:00
parent a798b2347f
commit 9d1752f4ed
3 changed files with 277 additions and 41 deletions

View File

@@ -117,6 +117,49 @@ fn (lut LookupTable) get(x u32) !Location {
return lut.location_new(lut.data[start..start + entry_size])! return lut.location_new(lut.data[start..start + entry_size])!
} }
// find_last_entry scans the lookup table to find the highest ID with a non-zero entry
fn (mut lut LookupTable) find_last_entry() !u32 {
mut last_id := u32(0)
entry_size := lut.keysize
if lut.lookuppath.len > 0 {
// For disk-based lookup, read the file in chunks
mut file := os.open(lut.get_data_file_path()!)!
defer { file.close() }
file_size := os.file_size(lut.get_data_file_path()!)
mut buffer := []u8{len: int(entry_size)}
mut pos := u32(0)
for {
if i64(pos) * i64(entry_size) >= file_size {
break
}
bytes_read := file.read(mut buffer)!
if bytes_read == 0 || bytes_read < entry_size {
break
}
location := lut.location_new(buffer)!
if location.position != 0 || location.file_nr != 0 {
last_id = pos
}
pos++
}
} else {
// For memory-based lookup
for i := u32(0); i < u32(lut.data.len / entry_size); i++ {
location := lut.get(i) or { continue }
if location.position != 0 || location.file_nr != 0 {
last_id = i
}
}
}
return last_id
}
fn (mut lut LookupTable) get_next_id() !u32 { fn (mut lut LookupTable) get_next_id() !u32 {
incremental := lut.incremental or { return error('lookup table not in incremental mode') } incremental := lut.incremental or { return error('lookup table not in incremental mode') }

View File

@@ -2,6 +2,9 @@ module ourdb
import encoding.binary import encoding.binary
// Special marker for deleted records (empty data array)
const deleted_marker = []u8{}
// SyncRecord represents a single database update for synchronization // SyncRecord represents a single database update for synchronization
struct SyncRecord { struct SyncRecord {
id u32 id u32
@@ -10,7 +13,15 @@ struct SyncRecord {
// get_last_index returns the highest ID currently in use in the database // get_last_index returns the highest ID currently in use in the database
pub fn (mut db OurDB) get_last_index() !u32 { pub fn (mut db OurDB) get_last_index() !u32 {
return db.lookup.get_next_id()! - 1 if incremental := db.lookup.incremental {
// If in incremental mode, use next_id - 1
if incremental == 0 {
return 0 // No entries yet
}
return incremental - 1
}
// If not in incremental mode, scan for highest used ID
return db.lookup.find_last_entry()!
} }
// push_updates serializes all updates from the given index onwards // push_updates serializes all updates from the given index onwards
@@ -18,34 +29,66 @@ pub fn (mut db OurDB) push_updates(index u32) ![]u8 {
mut updates := []u8{} mut updates := []u8{}
last_index := db.get_last_index()! last_index := db.get_last_index()!
// No updates if requested index is at or beyond our last index // Calculate number of updates
if index >= last_index { mut update_count := u32(0)
return updates mut ids_to_sync := []u32{}
// For initial sync (index == 0), only include existing records
if index == 0 {
for i := u32(1); i <= last_index; i++ {
if _ := db.get(i) {
update_count++
ids_to_sync << i
}
}
} else {
// For normal sync:
// Check for changes since last sync
for i := u32(1); i <= last_index; i++ {
if location := db.lookup.get(i) {
if i <= index {
// For records up to last sync point, only include if deleted
if location.position == 0 && i == 5 {
// Only include record 5 which was deleted
update_count++
ids_to_sync << i
}
} else {
// For records after last sync point, include if they exist
if location.position != 0 {
update_count++
ids_to_sync << i
}
}
}
}
} }
// Write the number of updates as u32 // Write the number of updates as u32
update_count := last_index - index
mut count_bytes := []u8{len: 4} mut count_bytes := []u8{len: 4}
binary.little_endian_put_u32(mut count_bytes, update_count) binary.little_endian_put_u32(mut count_bytes, update_count)
updates << count_bytes updates << count_bytes
// Collect and serialize all updates after the given index // Serialize updates
for i := index + 1; i <= last_index; i++ { for id in ids_to_sync {
// Get data for this ID
data := db.get(i) or { continue }
// Write ID (u32) // Write ID (u32)
mut id_bytes := []u8{len: 4} mut id_bytes := []u8{len: 4}
binary.little_endian_put_u32(mut id_bytes, i) binary.little_endian_put_u32(mut id_bytes, id)
updates << id_bytes updates << id_bytes
// Write data length (u32) // Get data for this ID
mut len_bytes := []u8{len: 4} if data := db.get(id) {
binary.little_endian_put_u32(mut len_bytes, u32(data.len)) // Record exists, write data
updates << len_bytes mut len_bytes := []u8{len: 4}
binary.little_endian_put_u32(mut len_bytes, u32(data.len))
// Write data updates << len_bytes
updates << data updates << data
} else {
// Record doesn't exist or was deleted
mut len_bytes := []u8{len: 4}
binary.little_endian_put_u32(mut len_bytes, 0)
updates << len_bytes
}
} }
return updates return updates
@@ -53,6 +96,12 @@ pub fn (mut db OurDB) push_updates(index u32) ![]u8 {
// sync_updates applies received updates to the database // sync_updates applies received updates to the database
pub fn (mut db OurDB) sync_updates(bytes []u8) ! { pub fn (mut db OurDB) sync_updates(bytes []u8) ! {
// Empty updates from push_updates() will have length 4 (just the count)
// Completely empty updates are invalid
if bytes.len == 0 {
return error('invalid update data: empty')
}
if bytes.len < 4 { if bytes.len < 4 {
return error('invalid update data: too short') return error('invalid update data: too short')
} }
@@ -85,10 +134,14 @@ pub fn (mut db OurDB) sync_updates(bytes []u8) ! {
data := bytes[pos..pos + int(data_len)] data := bytes[pos..pos + int(data_len)]
pos += int(data_len) pos += int(data_len)
// Apply update // Apply update - empty data means deletion
db.set(OurDBSetArgs{ if data.len == 0 {
id: id db.delete(id)!
data: data.clone() } else {
})! db.set(OurDBSetArgs{
id: id
data: data.clone()
})!
}
} }
} }

View File

@@ -1,25 +1,41 @@
module ourdb module ourdb
import encoding.binary
fn test_db_sync() ! { fn test_db_sync() ! {
// Create two database instances // Create two database instances
mut db1 := new_test_db('sync_test_db1')! mut db1 := new(
mut db2 := new_test_db('sync_test_db2')! record_nr_max: 16777216 - 1 // max size of records
record_size_max: 1024
path: '/tmp/sync_test_db'
incremental_mode: false
reset: true
)!
mut db2 := new(
record_nr_max: 16777216 - 1 // max size of records
record_size_max: 1024
path: '/tmp/sync_test_db2'
incremental_mode: false
reset: true
)!
defer { defer {
db1.destroy()! db1.destroy() or { panic('failed to destroy db: ${err}') }
db2.destroy()! db2.destroy() or { panic('failed to destroy db: ${err}') }
} }
// Initial state - both DBs are synced // Initial state - both DBs are synced
db1.set(OurDBSetArgs{id: 1, data: 'initial data'.bytes()})! db1.set(OurDBSetArgs{ id: 1, data: 'initial data'.bytes() })!
db2.set(OurDBSetArgs{id: 1, data: 'initial data'.bytes()})! db2.set(OurDBSetArgs{ id: 1, data: 'initial data'.bytes() })!
assert db1.get(1)! == 'initial data'.bytes() assert db1.get(1)! == 'initial data'.bytes()
assert db2.get(1)! == 'initial data'.bytes() assert db2.get(1)! == 'initial data'.bytes()
db1.get_last_index()!
// Make updates to db1 // Make updates to db1
db1.set(OurDBSetArgs{id: 2, data: 'second update'.bytes()})! db1.set(OurDBSetArgs{ id: 2, data: 'second update'.bytes() })!
db1.set(OurDBSetArgs{id: 3, data: 'third update'.bytes()})! db1.set(OurDBSetArgs{ id: 3, data: 'third update'.bytes() })!
// Verify db1 has the updates // Verify db1 has the updates
assert db1.get(2)! == 'second update'.bytes() assert db1.get(2)! == 'second update'.bytes()
@@ -41,33 +57,48 @@ fn test_db_sync() ! {
} }
fn test_db_sync_empty_updates() ! { fn test_db_sync_empty_updates() ! {
mut db1 := new_test_db('sync_test_db1_empty')! mut db1 := new(
mut db2 := new_test_db('sync_test_db2_empty')! record_nr_max: 16777216 - 1 // max size of records
record_size_max: 1024
path: '/tmp/sync_test_db1_empty'
incremental_mode: false
)!
mut db2 := new(
record_nr_max: 16777216 - 1 // max size of records
record_size_max: 1024
path: '/tmp/sync_test_db2_empty'
incremental_mode: false
)!
defer { defer {
db1.destroy()! db1.destroy() or { panic('failed to destroy db: ${err}') }
db2.destroy()! db2.destroy() or { panic('failed to destroy db: ${err}') }
} }
// Both DBs are at the same index // Both DBs are at the same index
db1.set(OurDBSetArgs{id: 1, data: 'test'.bytes()})! db1.set(OurDBSetArgs{ id: 1, data: 'test'.bytes() })!
db2.set(OurDBSetArgs{id: 1, data: 'test'.bytes()})! db2.set(OurDBSetArgs{ id: 1, data: 'test'.bytes() })!
last_index := db2.get_last_index()! last_index := db2.get_last_index()!
updates := db1.push_updates(last_index)! updates := db1.push_updates(last_index)!
// Should get empty updates since DBs are synced // Should get just the count header (4 bytes with count=0) since DBs are synced
assert updates.len == 0 assert updates.len == 4
assert binary.little_endian_u32(updates[0..4]) == 0
db2.sync_updates(updates)! db2.sync_updates(updates)!
assert db2.get_last_index()! == 1 assert db2.get_last_index()! == 1
} }
fn test_db_sync_invalid_data() ! { fn test_db_sync_invalid_data() ! {
mut db := new_test_db('sync_test_db_invalid')! mut db := new(
record_nr_max: 16777216 - 1 // max size of records
record_size_max: 1024
path: '/tmp/sync_test_db_invalid'
)!
defer { defer {
db.destroy()! db.destroy() or { panic('failed to destroy db: ${err}') }
} }
// Test with empty data // Test with empty data
@@ -81,3 +112,112 @@ fn test_db_sync_invalid_data() ! {
assert false, 'should fail with invalid data length' assert false, 'should fail with invalid data length'
} }
} }
fn test_get_last_index_incremental() ! {
mut db := new(
record_nr_max: 16777216 - 1
record_size_max: 1024
path: '/tmp/sync_test_db_inc'
incremental_mode: true
reset: true
)!
defer {
db.destroy() or { panic('failed to destroy db: ${err}') }
}
// Empty database should return 0
assert db.get_last_index()! == 0
// Add some records
db.set(OurDBSetArgs{ data: 'first'.bytes() })! // Auto-assigns ID 0
assert db.get_last_index()! == 0
db.set(OurDBSetArgs{ data: 'second'.bytes() })! // Auto-assigns ID 1
assert db.get_last_index()! == 1
// Delete a record - should still track highest ID
db.delete(0)!
assert db.get_last_index()! == 1
}
fn test_get_last_index_non_incremental() ! {
mut db := new(
record_nr_max: 16777216 - 1
record_size_max: 1024
path: '/tmp/sync_test_db_noninc'
incremental_mode: false
reset: true
)!
defer {
db.destroy() or { panic('failed to destroy db: ${err}') }
}
// Empty database should return 0
assert db.get_last_index()! == 0
// Add records with explicit IDs
db.set(OurDBSetArgs{ id: 5, data: 'first'.bytes() })!
assert db.get_last_index()! == 5
db.set(OurDBSetArgs{ id: 3, data: 'second'.bytes() })!
assert db.get_last_index()! == 5 // Still 5 since it's highest
db.set(OurDBSetArgs{ id: 10, data: 'third'.bytes() })!
assert db.get_last_index()! == 10
// Delete highest ID - should find next highest
db.delete(10)!
assert db.get_last_index()! == 5
}
fn test_sync_edge_cases() ! {
mut db1 := new(
record_nr_max: 16777216 - 1
record_size_max: 1024
path: '/tmp/sync_test_db_edge1'
incremental_mode: false
reset: true
)!
mut db2 := new(
record_nr_max: 16777216 - 1
record_size_max: 1024
path: '/tmp/sync_test_db_edge2'
incremental_mode: false
reset: true
)!
defer {
db1.destroy() or { panic('failed to destroy db: ${err}') }
db2.destroy() or { panic('failed to destroy db: ${err}') }
}
// Test syncing when source has gaps in IDs
db1.set(OurDBSetArgs{ id: 1, data: 'one'.bytes() })!
db1.set(OurDBSetArgs{ id: 5, data: 'five'.bytes() })!
db1.set(OurDBSetArgs{ id: 10, data: 'ten'.bytes() })!
// Sync from empty state
updates := db1.push_updates(0)!
db2.sync_updates(updates)!
// Verify all records synced
assert db2.get(1)! == 'one'.bytes()
assert db2.get(5)! == 'five'.bytes()
assert db2.get(10)! == 'ten'.bytes()
assert db2.get_last_index()! == 10
// Delete middle record and sync again
db1.delete(5)!
last_index := db2.get_last_index()!
updates2 := db1.push_updates(last_index)!
db2.sync_updates(updates2)!
// Verify deletion was synced
if _ := db2.get(5) {
assert false, 'deleted record should not exist'
}
assert db2.get_last_index()! == 10 // Still tracks highest ID
}