start implementing ourdb sync
This commit is contained in:
94
lib/data/ourdb/sync.v
Normal file
94
lib/data/ourdb/sync.v
Normal file
@@ -0,0 +1,94 @@
|
||||
module ourdb
|
||||
|
||||
import encoding.binary
|
||||
|
||||
// SyncRecord represents a single database update for synchronization
|
||||
struct SyncRecord {
|
||||
id u32
|
||||
data []u8
|
||||
}
|
||||
|
||||
// get_last_index returns the highest ID currently in use in the database
|
||||
pub fn (mut db OurDB) get_last_index() !u32 {
|
||||
return db.lookup.get_next_id()! - 1
|
||||
}
|
||||
|
||||
// push_updates serializes all updates from the given index onwards
|
||||
pub fn (mut db OurDB) push_updates(index u32) ![]u8 {
|
||||
mut updates := []u8{}
|
||||
last_index := db.get_last_index()!
|
||||
|
||||
// No updates if requested index is at or beyond our last index
|
||||
if index >= last_index {
|
||||
return updates
|
||||
}
|
||||
|
||||
// Write the number of updates as u32
|
||||
update_count := last_index - index
|
||||
mut count_bytes := []u8{len: 4}
|
||||
binary.little_endian_put_u32(mut count_bytes, update_count)
|
||||
updates << count_bytes
|
||||
|
||||
// Collect and serialize all updates after the given index
|
||||
for i := index + 1; i <= last_index; i++ {
|
||||
// Get data for this ID
|
||||
data := db.get(i) or { continue }
|
||||
|
||||
// Write ID (u32)
|
||||
mut id_bytes := []u8{len: 4}
|
||||
binary.little_endian_put_u32(mut id_bytes, i)
|
||||
updates << id_bytes
|
||||
|
||||
// Write data length (u32)
|
||||
mut len_bytes := []u8{len: 4}
|
||||
binary.little_endian_put_u32(mut len_bytes, u32(data.len))
|
||||
updates << len_bytes
|
||||
|
||||
// Write data
|
||||
updates << data
|
||||
}
|
||||
|
||||
return updates
|
||||
}
|
||||
|
||||
// sync_updates applies received updates to the database
|
||||
pub fn (mut db OurDB) sync_updates(bytes []u8) ! {
|
||||
if bytes.len < 4 {
|
||||
return error('invalid update data: too short')
|
||||
}
|
||||
|
||||
mut pos := 0
|
||||
|
||||
// Read number of updates
|
||||
update_count := binary.little_endian_u32(bytes[pos..pos + 4])
|
||||
pos += 4
|
||||
|
||||
// Process each update
|
||||
for _ in 0 .. update_count {
|
||||
if pos + 8 > bytes.len {
|
||||
return error('invalid update data: truncated header')
|
||||
}
|
||||
|
||||
// Read ID
|
||||
id := binary.little_endian_u32(bytes[pos..pos + 4])
|
||||
pos += 4
|
||||
|
||||
// Read data length
|
||||
data_len := binary.little_endian_u32(bytes[pos..pos + 4])
|
||||
pos += 4
|
||||
|
||||
if pos + int(data_len) > bytes.len {
|
||||
return error('invalid update data: truncated content')
|
||||
}
|
||||
|
||||
// Read data
|
||||
data := bytes[pos..pos + int(data_len)]
|
||||
pos += int(data_len)
|
||||
|
||||
// Apply update
|
||||
db.set(OurDBSetArgs{
|
||||
id: id
|
||||
data: data.clone()
|
||||
})!
|
||||
}
|
||||
}
|
||||
83
lib/data/ourdb/sync_test.v
Normal file
83
lib/data/ourdb/sync_test.v
Normal file
@@ -0,0 +1,83 @@
|
||||
module ourdb
|
||||
|
||||
fn test_db_sync() ! {
|
||||
// Create two database instances
|
||||
mut db1 := new_test_db('sync_test_db1')!
|
||||
mut db2 := new_test_db('sync_test_db2')!
|
||||
|
||||
defer {
|
||||
db1.destroy()!
|
||||
db2.destroy()!
|
||||
}
|
||||
|
||||
// Initial state - both DBs are synced
|
||||
db1.set(OurDBSetArgs{id: 1, data: 'initial data'.bytes()})!
|
||||
db2.set(OurDBSetArgs{id: 1, data: 'initial data'.bytes()})!
|
||||
|
||||
assert db1.get(1)! == 'initial data'.bytes()
|
||||
assert db2.get(1)! == 'initial data'.bytes()
|
||||
|
||||
// Make updates to db1
|
||||
db1.set(OurDBSetArgs{id: 2, data: 'second update'.bytes()})!
|
||||
db1.set(OurDBSetArgs{id: 3, data: 'third update'.bytes()})!
|
||||
|
||||
// Verify db1 has the updates
|
||||
assert db1.get(2)! == 'second update'.bytes()
|
||||
assert db1.get(3)! == 'third update'.bytes()
|
||||
|
||||
// Verify db2 is behind
|
||||
assert db1.get_last_index()! == 3
|
||||
assert db2.get_last_index()! == 1
|
||||
|
||||
// Sync db2 with updates from db1
|
||||
last_synced_index := db2.get_last_index()!
|
||||
updates := db1.push_updates(last_synced_index)!
|
||||
db2.sync_updates(updates)!
|
||||
|
||||
// Verify db2 is now synced
|
||||
assert db2.get_last_index()! == 3
|
||||
assert db2.get(2)! == 'second update'.bytes()
|
||||
assert db2.get(3)! == 'third update'.bytes()
|
||||
}
|
||||
|
||||
fn test_db_sync_empty_updates() ! {
|
||||
mut db1 := new_test_db('sync_test_db1_empty')!
|
||||
mut db2 := new_test_db('sync_test_db2_empty')!
|
||||
|
||||
defer {
|
||||
db1.destroy()!
|
||||
db2.destroy()!
|
||||
}
|
||||
|
||||
// Both DBs are at the same index
|
||||
db1.set(OurDBSetArgs{id: 1, data: 'test'.bytes()})!
|
||||
db2.set(OurDBSetArgs{id: 1, data: 'test'.bytes()})!
|
||||
|
||||
last_index := db2.get_last_index()!
|
||||
updates := db1.push_updates(last_index)!
|
||||
|
||||
// Should get empty updates since DBs are synced
|
||||
assert updates.len == 0
|
||||
|
||||
db2.sync_updates(updates)!
|
||||
assert db2.get_last_index()! == 1
|
||||
}
|
||||
|
||||
fn test_db_sync_invalid_data() ! {
|
||||
mut db := new_test_db('sync_test_db_invalid')!
|
||||
|
||||
defer {
|
||||
db.destroy()!
|
||||
}
|
||||
|
||||
// Test with empty data
|
||||
if _ := db.sync_updates([]u8{}) {
|
||||
assert false, 'should fail with empty data'
|
||||
}
|
||||
|
||||
// Test with invalid data length
|
||||
invalid_data := []u8{len: 2, init: 0}
|
||||
if _ := db.sync_updates(invalid_data) {
|
||||
assert false, 'should fail with invalid data length'
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user