Squashed 'components/rfs/' content from commit 9808a5e
git-subtree-dir: components/rfs git-subtree-split: 9808a5e9fc768edc7d8b1dfa5b91b3f018dff0cb
This commit is contained in:
133
rfs/src/store/bs.rs
Normal file
133
rfs/src/store/bs.rs
Normal file
@@ -0,0 +1,133 @@
|
||||
use super::{Error, Result, Store};
|
||||
use crate::fungi::meta::Block;
|
||||
use aes_gcm::{
|
||||
aead::{
|
||||
generic_array::{self, GenericArray},
|
||||
Aead, KeyInit,
|
||||
},
|
||||
Aes256Gcm, Nonce,
|
||||
};
|
||||
|
||||
fn hash(input: &[u8]) -> GenericArray<u8, generic_array::typenum::U32> {
|
||||
let hash = blake2b_simd::Params::new().hash_length(32).hash(input);
|
||||
GenericArray::from_slice(hash.as_bytes()).to_owned()
|
||||
}
|
||||
|
||||
/// The block store builds on top of a store and adds encryption and compression
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BlockStore<S: Store> {
|
||||
store: S,
|
||||
}
|
||||
|
||||
impl<S> From<S> for BlockStore<S>
|
||||
where
|
||||
S: Store,
|
||||
{
|
||||
fn from(store: S) -> Self {
|
||||
Self { store }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> BlockStore<S>
|
||||
where
|
||||
S: Store,
|
||||
{
|
||||
pub fn inner(self) -> S {
|
||||
self.store
|
||||
}
|
||||
|
||||
pub async fn get(&self, block: &Block) -> Result<Vec<u8>> {
|
||||
let encrypted = self.store.get(&block.id).await?;
|
||||
|
||||
let cipher = Aes256Gcm::new_from_slice(&block.key).map_err(|_| Error::InvalidKey)?;
|
||||
let nonce = Nonce::from_slice(&block.key[..12]);
|
||||
|
||||
let compressed = cipher
|
||||
.decrypt(nonce, encrypted.as_slice())
|
||||
.map_err(|_| Error::EncryptionError)?;
|
||||
|
||||
let mut decoder = snap::raw::Decoder::new();
|
||||
let plain = decoder.decompress_vec(&compressed)?;
|
||||
|
||||
Ok(plain)
|
||||
}
|
||||
|
||||
pub async fn set(&self, blob: &[u8]) -> Result<Block> {
|
||||
// we first calculate the hash of the plain-text data
|
||||
|
||||
let key = hash(blob);
|
||||
let mut encoder = snap::raw::Encoder::new();
|
||||
// data is then compressed
|
||||
let compressed = encoder.compress_vec(blob)?;
|
||||
|
||||
// we then encrypt it using the hash of the plain-text as a key
|
||||
let cipher = Aes256Gcm::new(&key);
|
||||
// the nonce is still driven from the key, a nonce is 12 bytes for aes
|
||||
// it's done like this so a store can still dedup the data
|
||||
let nonce = Nonce::from_slice(&key[..12]);
|
||||
|
||||
// we encrypt the data
|
||||
let encrypted = cipher
|
||||
.encrypt(nonce, compressed.as_slice())
|
||||
.map_err(|_| Error::EncryptionError)?;
|
||||
|
||||
// we hash it again, and use that as the store key
|
||||
let id = hash(&encrypted);
|
||||
|
||||
let block = Block {
|
||||
id: id.into(),
|
||||
key: key.into(),
|
||||
};
|
||||
|
||||
self.store.set(&block.id, &encrypted).await?;
|
||||
|
||||
Ok(block)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::super::Route;
|
||||
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[derive(Default)]
|
||||
struct InMemoryStore {
|
||||
map: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Store for InMemoryStore {
|
||||
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||
let map = self.map.lock().await;
|
||||
let v = map.get(key).ok_or(Error::KeyNotFound)?;
|
||||
Ok(v.clone())
|
||||
}
|
||||
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
|
||||
let mut map = self.map.lock().await;
|
||||
map.insert(key.into(), blob.into());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn routes(&self) -> Vec<Route> {
|
||||
vec![Route::url("mem://")]
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_block_store() {
|
||||
let store = InMemoryStore::default();
|
||||
let block_store = BlockStore::from(store);
|
||||
|
||||
let blob = "some random data to store";
|
||||
let block = block_store.set(blob.as_bytes()).await.unwrap();
|
||||
|
||||
let received = block_store.get(&block).await.unwrap();
|
||||
|
||||
assert_eq!(blob.as_bytes(), received.as_slice());
|
||||
}
|
||||
}
|
||||
83
rfs/src/store/dir.rs
Normal file
83
rfs/src/store/dir.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use super::{Error, Result, Route, Store};
|
||||
use std::io::ErrorKind;
|
||||
use std::os::unix::prelude::OsStrExt;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
use url;
|
||||
|
||||
pub const SCHEME: &str = "dir";
|
||||
|
||||
/// DirStore is a simple store that store blobs on the filesystem
|
||||
/// and is mainly used for testing
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DirStore {
|
||||
root: PathBuf,
|
||||
}
|
||||
|
||||
impl DirStore {
|
||||
pub async fn make<U: AsRef<str>>(url: &U) -> Result<DirStore> {
|
||||
let u = url::Url::parse(url.as_ref())?;
|
||||
if u.scheme() != SCHEME {
|
||||
return Err(Error::InvalidScheme(u.scheme().into(), SCHEME.into()));
|
||||
}
|
||||
|
||||
Ok(DirStore::new(u.path()).await?)
|
||||
}
|
||||
pub async fn new<P: Into<PathBuf>>(root: P) -> Result<Self> {
|
||||
let root = root.into();
|
||||
fs::create_dir_all(&root).await?;
|
||||
Ok(Self { root })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Store for DirStore {
|
||||
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||
let file_name = hex::encode(key);
|
||||
let dir_path = self.root.join(&file_name[0..2]);
|
||||
|
||||
let mut path = dir_path.join(&file_name);
|
||||
let data = match fs::read(&path).await {
|
||||
Ok(data) => data,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {
|
||||
path = self.root.join(file_name);
|
||||
let data = match fs::read(&path).await {
|
||||
Ok(data) => data,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {
|
||||
return Err(Error::KeyNotFound);
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(Error::IO(err));
|
||||
}
|
||||
};
|
||||
data
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(Error::IO(err));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(data)
|
||||
}
|
||||
|
||||
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
|
||||
let file_name = hex::encode(key);
|
||||
let dir_path = self.root.join(&file_name[0..2]);
|
||||
|
||||
fs::create_dir_all(&dir_path).await?;
|
||||
|
||||
let file_path = dir_path.join(file_name);
|
||||
fs::write(file_path, blob).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn routes(&self) -> Vec<Route> {
|
||||
let r = Route::url(format!(
|
||||
"dir://{}",
|
||||
String::from_utf8_lossy(self.root.as_os_str().as_bytes())
|
||||
));
|
||||
|
||||
vec![r]
|
||||
}
|
||||
}
|
||||
73
rfs/src/store/http.rs
Normal file
73
rfs/src/store/http.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
use super::{Error, Result, Route, Store};
|
||||
use reqwest::{self, StatusCode};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HTTPStore {
|
||||
url: Url,
|
||||
}
|
||||
|
||||
impl HTTPStore {
|
||||
pub async fn make<U: AsRef<str>>(url: &U) -> Result<HTTPStore> {
|
||||
let u = Url::parse(url.as_ref())?;
|
||||
if u.scheme() != "http" && u.scheme() != "https" {
|
||||
return Err(Error::Other(anyhow::Error::msg("invalid scheme")));
|
||||
}
|
||||
|
||||
Ok(HTTPStore::new(u).await?)
|
||||
}
|
||||
pub async fn new<U: Into<Url>>(url: U) -> Result<Self> {
|
||||
let url = url.into();
|
||||
Ok(Self { url })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Store for HTTPStore {
|
||||
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||
let file = hex::encode(key);
|
||||
let mut file_path = self.url.clone();
|
||||
file_path
|
||||
.path_segments_mut()
|
||||
.map_err(|_| Error::Other(anyhow::Error::msg("cannot be base")))?
|
||||
.push(&file[0..2])
|
||||
.push(&file);
|
||||
let mut legacy_path = self.url.clone();
|
||||
|
||||
legacy_path
|
||||
.path_segments_mut()
|
||||
.map_err(|_| Error::Other(anyhow::Error::msg("cannot be base")))?
|
||||
.push(&file);
|
||||
|
||||
let data = match reqwest::get(file_path).await {
|
||||
Ok(mut response) => {
|
||||
if response.status() == StatusCode::NOT_FOUND {
|
||||
response = reqwest::get(legacy_path)
|
||||
.await
|
||||
.map_err(|_| Error::KeyNotFound)?;
|
||||
if response.status() != StatusCode::OK {
|
||||
return Err(Error::KeyNotFound);
|
||||
}
|
||||
}
|
||||
if response.status() != StatusCode::OK {
|
||||
return Err(Error::Unavailable);
|
||||
}
|
||||
response.bytes().await.map_err(|e| Error::Other(e.into()))?
|
||||
}
|
||||
Err(err) => return Err(Error::Other(err.into())),
|
||||
};
|
||||
Ok(data.into())
|
||||
}
|
||||
|
||||
async fn set(&self, _key: &[u8], _blob: &[u8]) -> Result<()> {
|
||||
Err(Error::Other(anyhow::Error::msg(
|
||||
"http store doesn't support uploading",
|
||||
)))
|
||||
}
|
||||
|
||||
fn routes(&self) -> Vec<Route> {
|
||||
let r = Route::url(self.url.clone());
|
||||
|
||||
vec![r]
|
||||
}
|
||||
}
|
||||
240
rfs/src/store/mod.rs
Normal file
240
rfs/src/store/mod.rs
Normal file
@@ -0,0 +1,240 @@
|
||||
mod bs;
|
||||
pub mod dir;
|
||||
pub mod http;
|
||||
mod router;
|
||||
pub mod s3store;
|
||||
pub mod zdb;
|
||||
|
||||
use anyhow::Context;
|
||||
use rand::seq::SliceRandom;
|
||||
|
||||
pub use bs::BlockStore;
|
||||
use regex::Regex;
|
||||
|
||||
use crate::fungi;
|
||||
|
||||
pub use self::router::Router;
|
||||
|
||||
pub async fn make<U: AsRef<str>>(u: U) -> Result<Stores> {
|
||||
let parsed = url::Url::parse(u.as_ref())?;
|
||||
|
||||
match parsed.scheme() {
|
||||
dir::SCHEME => return Ok(Stores::Dir(dir::DirStore::make(&u).await?)),
|
||||
"s3" | "s3s" | "s3s+tls" => return Ok(Stores::S3(s3store::S3Store::make(&u).await?)),
|
||||
"zdb" => return Ok(Stores::ZDB(zdb::ZdbStore::make(&u).await?)),
|
||||
"http" | "https" => return Ok(Stores::HTTP(http::HTTPStore::make(&u).await?)),
|
||||
_ => return Err(Error::UnknownStore(parsed.scheme().into())),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("key not found")]
|
||||
KeyNotFound,
|
||||
#[error("invalid key")]
|
||||
InvalidKey,
|
||||
#[error("invalid blob")]
|
||||
InvalidBlob,
|
||||
#[error("key is not routable")]
|
||||
KeyNotRoutable,
|
||||
#[error("store is not available")]
|
||||
Unavailable,
|
||||
|
||||
#[error("compression error: {0}")]
|
||||
Compression(#[from] snap::Error),
|
||||
|
||||
#[error("encryption error")]
|
||||
EncryptionError,
|
||||
|
||||
// TODO: better display for the Box<Vec<Self>>
|
||||
#[error("multiple error: {0:?}")]
|
||||
Multiple(Box<Vec<Self>>),
|
||||
|
||||
#[error("io error: {0}")]
|
||||
IO(#[from] std::io::Error),
|
||||
|
||||
#[error("url parse error: {0}")]
|
||||
Url(#[from] url::ParseError),
|
||||
#[error("unknown store type '{0}'")]
|
||||
UnknownStore(String),
|
||||
#[error("invalid schema '{0}' expected '{1}'")]
|
||||
InvalidScheme(String, String),
|
||||
|
||||
#[error("unknown store error {0:#}")]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub struct Route {
|
||||
pub start: Option<u8>,
|
||||
pub end: Option<u8>,
|
||||
pub url: String,
|
||||
}
|
||||
|
||||
impl Route {
|
||||
pub fn url<S: Into<String>>(s: S) -> Self {
|
||||
Self {
|
||||
start: None,
|
||||
end: None,
|
||||
url: s.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
/// The store trait defines a simple (low level) key/value store interface to set/get blobs
|
||||
/// the concern of the store is to only store given data with given key and implement
|
||||
/// the means to retrieve it again once a get is called.
|
||||
#[async_trait::async_trait]
|
||||
pub trait Store: Send + Sync + 'static {
|
||||
async fn get(&self, key: &[u8]) -> Result<Vec<u8>>;
|
||||
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()>;
|
||||
fn routes(&self) -> Vec<Route>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<S> Store for Router<S>
|
||||
where
|
||||
S: Store,
|
||||
{
|
||||
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||
if key.is_empty() {
|
||||
return Err(Error::InvalidKey);
|
||||
}
|
||||
let mut errors = Vec::default();
|
||||
|
||||
// to make it fare we shuffle the list of matching routers randomly everytime
|
||||
// before we do a get
|
||||
let mut routers: Vec<&S> = self.route(key[0]).collect();
|
||||
routers.shuffle(&mut rand::thread_rng());
|
||||
for store in routers {
|
||||
match store.get(key).await {
|
||||
Ok(object) => return Ok(object),
|
||||
Err(err) => errors.push(err),
|
||||
};
|
||||
}
|
||||
|
||||
if errors.is_empty() {
|
||||
return Err(Error::KeyNotRoutable);
|
||||
}
|
||||
|
||||
// return aggregated errors
|
||||
return Err(Error::Multiple(Box::new(errors)));
|
||||
}
|
||||
|
||||
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
|
||||
if key.is_empty() {
|
||||
return Err(Error::InvalidKey);
|
||||
}
|
||||
|
||||
let mut b = false;
|
||||
for store in self.route(key[0]) {
|
||||
b = true;
|
||||
store.set(key, blob).await?;
|
||||
}
|
||||
|
||||
if !b {
|
||||
return Err(Error::KeyNotRoutable);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn routes(&self) -> Vec<Route> {
|
||||
let mut routes = Vec::default();
|
||||
for (key, value) in self.routes.iter() {
|
||||
for sub in value.routes() {
|
||||
let r = Route {
|
||||
start: Some(sub.start.unwrap_or(*key.start())),
|
||||
end: Some(sub.end.unwrap_or(*key.end())),
|
||||
url: sub.url,
|
||||
};
|
||||
routes.push(r);
|
||||
}
|
||||
}
|
||||
|
||||
routes
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_router(meta: &fungi::Reader) -> Result<Router<Stores>> {
|
||||
let mut router = Router::new();
|
||||
|
||||
for route in meta.routes().await.context("failed to get store routes")? {
|
||||
let store = make(&route.url)
|
||||
.await
|
||||
.with_context(|| format!("failed to initialize store '{}'", route.url))?;
|
||||
router.add(route.start, route.end, store);
|
||||
}
|
||||
|
||||
Ok(router)
|
||||
}
|
||||
|
||||
pub async fn parse_router(urls: &[String]) -> anyhow::Result<Router<Stores>> {
|
||||
let mut router = Router::new();
|
||||
let pattern = r"^(?P<range>[0-9a-f]{2}-[0-9a-f]{2})=(?P<url>.+)$";
|
||||
let re = Regex::new(pattern)?;
|
||||
|
||||
for u in urls {
|
||||
let ((start, end), store) = match re.captures(u) {
|
||||
None => ((0x00, 0xff), make(u).await?),
|
||||
Some(captures) => {
|
||||
let url = captures.name("url").context("missing url group")?.as_str();
|
||||
let rng = captures
|
||||
.name("range")
|
||||
.context("missing range group")?
|
||||
.as_str();
|
||||
|
||||
let store = make(url).await?;
|
||||
let range = match rng.split_once('-') {
|
||||
None => anyhow::bail!("invalid range format"),
|
||||
Some((low, high)) => (
|
||||
u8::from_str_radix(low, 16)
|
||||
.with_context(|| format!("failed to parse low range '{}'", low))?,
|
||||
u8::from_str_radix(high, 16)
|
||||
.with_context(|| format!("failed to parse high range '{}'", high))?,
|
||||
),
|
||||
};
|
||||
(range, store)
|
||||
}
|
||||
};
|
||||
|
||||
router.add(start, end, store);
|
||||
}
|
||||
|
||||
Ok(router)
|
||||
}
|
||||
|
||||
pub enum Stores {
|
||||
S3(s3store::S3Store),
|
||||
Dir(dir::DirStore),
|
||||
ZDB(zdb::ZdbStore),
|
||||
HTTP(http::HTTPStore),
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Store for Stores {
|
||||
async fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||
match self {
|
||||
self::Stores::S3(s3_store) => s3_store.get(key).await,
|
||||
self::Stores::Dir(dir_store) => dir_store.get(key).await,
|
||||
self::Stores::ZDB(zdb_store) => zdb_store.get(key).await,
|
||||
self::Stores::HTTP(http_store) => http_store.get(key).await,
|
||||
}
|
||||
}
|
||||
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
|
||||
match self {
|
||||
self::Stores::S3(s3_store) => s3_store.set(key, blob).await,
|
||||
self::Stores::Dir(dir_store) => dir_store.set(key, blob).await,
|
||||
self::Stores::ZDB(zdb_store) => zdb_store.set(key, blob).await,
|
||||
self::Stores::HTTP(http_store) => http_store.set(key, blob).await,
|
||||
}
|
||||
}
|
||||
fn routes(&self) -> Vec<Route> {
|
||||
match self {
|
||||
self::Stores::S3(s3_store) => s3_store.routes(),
|
||||
self::Stores::Dir(dir_store) => dir_store.routes(),
|
||||
self::Stores::ZDB(zdb_store) => zdb_store.routes(),
|
||||
self::Stores::HTTP(http_store) => http_store.routes(),
|
||||
}
|
||||
}
|
||||
}
|
||||
56
rfs/src/store/router.rs
Normal file
56
rfs/src/store/router.rs
Normal file
@@ -0,0 +1,56 @@
|
||||
use std::ops::RangeInclusive;
|
||||
|
||||
/// route implements a naive prefix router by going through the complete set of
|
||||
/// available routers and find that ones that matches this given prefix
|
||||
#[derive(Default, Clone)]
|
||||
pub struct Router<T> {
|
||||
pub(crate) routes: Vec<(RangeInclusive<u8>, T)>,
|
||||
}
|
||||
|
||||
impl<T> Router<T> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
routes: Vec::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// add a range
|
||||
pub fn add(&mut self, start: u8, end: u8, route: T) {
|
||||
self.routes.push((start..=end, route));
|
||||
}
|
||||
|
||||
/// return all stores that matches a certain key
|
||||
///
|
||||
/// TODO: may be they need to be randomized
|
||||
pub fn route(&self, i: u8) -> impl Iterator<Item = &T> {
|
||||
self.routes
|
||||
.iter()
|
||||
.filter(move |f| f.0.contains(&i))
|
||||
.map(|v| &v.1)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test() {
|
||||
let mut router = Router::default();
|
||||
|
||||
router.add(0, 255, "a");
|
||||
router.add(0, 255, "b");
|
||||
router.add(0, 128, "c");
|
||||
|
||||
let paths: Vec<&str> = router.route(200).map(|v| *v).collect();
|
||||
assert_eq!(paths.len(), 2);
|
||||
assert_eq!(paths[0], "a");
|
||||
assert_eq!(paths[1], "b");
|
||||
|
||||
let paths: Vec<&str> = router.route(0).map(|v| *v).collect();
|
||||
assert_eq!(paths.len(), 3);
|
||||
assert_eq!(paths[0], "a");
|
||||
assert_eq!(paths[1], "b");
|
||||
assert_eq!(paths[2], "c");
|
||||
}
|
||||
}
|
||||
191
rfs/src/store/s3store.rs
Normal file
191
rfs/src/store/s3store.rs
Normal file
@@ -0,0 +1,191 @@
|
||||
use super::{Error, Result, Route, Store};
|
||||
|
||||
use anyhow::Context;
|
||||
use s3::{creds::Credentials, error::S3Error, Bucket, Region};
|
||||
use url::Url;
|
||||
|
||||
fn get_config<U: AsRef<str>>(u: U) -> Result<(Credentials, Region, String)> {
|
||||
let url = Url::parse(u.as_ref())?;
|
||||
|
||||
let access_key = url.username().to_string();
|
||||
let access_secret = url.password().map(|s| s.to_owned());
|
||||
|
||||
let host = url.host_str().context("host not found")?;
|
||||
let port = url.port().context("port not found")?;
|
||||
let scheme = match url.scheme() {
|
||||
"s3" => "http://",
|
||||
"s3+tls" | "s3s" => "https://",
|
||||
_ => return Err(Error::Other(anyhow::Error::msg("invalid scheme"))),
|
||||
};
|
||||
|
||||
let endpoint = format!("{}{}:{}", scheme, host, port);
|
||||
|
||||
let bucket_name = url.path().trim_start_matches('/').to_string();
|
||||
|
||||
let region_name = url
|
||||
.query_pairs()
|
||||
.find(|(key, _)| key == "region")
|
||||
.map(|(_, value)| value.to_string())
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok((
|
||||
Credentials {
|
||||
access_key: Some(access_key),
|
||||
secret_key: access_secret,
|
||||
security_token: None,
|
||||
session_token: None,
|
||||
expiration: None,
|
||||
},
|
||||
Region::Custom {
|
||||
region: region_name,
|
||||
endpoint,
|
||||
},
|
||||
bucket_name,
|
||||
))
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct S3Store {
|
||||
bucket: Bucket,
|
||||
url: String,
|
||||
// this is only here as a work around for this bug https://github.com/durch/rust-s3/issues/337
|
||||
// because rfs uses the store in async (and parallel) matter to upload/download blobs
|
||||
// we need to synchronize this locally in that store which will hurt performance
|
||||
// the 2 solutions now is to either wait until this bug is fixed, or switch to another client
|
||||
// but for now we keep this work around
|
||||
}
|
||||
|
||||
impl S3Store {
|
||||
pub async fn make<U: AsRef<str>>(url: &U) -> Result<S3Store> {
|
||||
let (cred, region, bucket_name) = get_config(url.as_ref())?;
|
||||
Ok(S3Store::new(url.as_ref(), &bucket_name, region, cred)?)
|
||||
}
|
||||
pub fn new(url: &str, bucket_name: &str, region: Region, cred: Credentials) -> Result<Self> {
|
||||
let bucket = Bucket::new(bucket_name, region, cred)
|
||||
.context("failed instantiate bucket")?
|
||||
.with_path_style();
|
||||
|
||||
Ok(Self {
|
||||
bucket,
|
||||
url: url.to_owned(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Store for S3Store {
|
||||
async fn get(&self, key: &[u8]) -> super::Result<Vec<u8>> {
|
||||
match self.bucket.get_object(hex::encode(key)).await {
|
||||
Ok(res) => Ok(res.to_vec()),
|
||||
Err(S3Error::HttpFailWithBody(404, _)) => Err(Error::KeyNotFound),
|
||||
Err(S3Error::Io(err)) => Err(Error::IO(err)),
|
||||
Err(err) => Err(anyhow::Error::from(err).into()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
|
||||
self.bucket
|
||||
.put_object(hex::encode(key), blob)
|
||||
.await
|
||||
.context("put object over s3 storage")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn routes(&self) -> Vec<Route> {
|
||||
vec![Route::url(self.url.clone())]
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_get_config() {
|
||||
let (cred, region, bucket_name) =
|
||||
get_config("s3s://minioadmin:minioadmin@127.0.0.1:9000/mybucket?region=minio").unwrap();
|
||||
assert_eq!(
|
||||
cred,
|
||||
Credentials {
|
||||
access_key: Some("minioadmin".to_string()),
|
||||
secret_key: Some("minioadmin".to_string()),
|
||||
security_token: None,
|
||||
session_token: None,
|
||||
expiration: None,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
region,
|
||||
Region::Custom {
|
||||
region: "minio".to_string(),
|
||||
endpoint: "https://127.0.0.1:9000".to_string()
|
||||
}
|
||||
);
|
||||
assert_eq!(bucket_name, "mybucket".to_string())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_config_without_tls() {
|
||||
let (cred, region, bucket_name) =
|
||||
get_config("s3://minioadmin:minioadmin@127.0.0.1:9000/mybucket?region=minio").unwrap();
|
||||
assert_eq!(
|
||||
cred,
|
||||
Credentials {
|
||||
access_key: Some("minioadmin".to_string()),
|
||||
secret_key: Some("minioadmin".to_string()),
|
||||
security_token: None,
|
||||
session_token: None,
|
||||
expiration: None,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
region,
|
||||
Region::Custom {
|
||||
region: "minio".to_string(),
|
||||
endpoint: "http://127.0.0.1:9000".to_string()
|
||||
}
|
||||
);
|
||||
assert_eq!(bucket_name, "mybucket".to_string())
|
||||
}
|
||||
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn test_set_get() {
|
||||
let url = "s3://minioadmin:minioadmin@127.0.0.1:9000/mybucket?region=minio";
|
||||
let (cred, region, bucket_name) = get_config(url).unwrap();
|
||||
|
||||
let store = S3Store::new(url, &bucket_name, region, cred);
|
||||
let store = store.unwrap();
|
||||
|
||||
let key = b"test.txt";
|
||||
let blob = b"# Hello, World!";
|
||||
|
||||
_ = store.set(key, blob).await;
|
||||
|
||||
let get_res = store.get(key).await;
|
||||
let get_res = get_res.unwrap();
|
||||
|
||||
assert_eq!(get_res, blob)
|
||||
}
|
||||
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn test_set_get_without_region() {
|
||||
let url = "s3://minioadmin:minioadmin@127.0.0.1:9000/mybucket";
|
||||
let (cred, region, bucket_name) = get_config(url).unwrap();
|
||||
|
||||
let store = S3Store::new(url, &bucket_name, region, cred);
|
||||
let store = store.unwrap();
|
||||
|
||||
let key = b"test2.txt";
|
||||
let blob = b"# Hello, World!";
|
||||
|
||||
_ = store.set(key, blob).await;
|
||||
|
||||
let get_res = store.get(key).await;
|
||||
let get_res = get_res.unwrap();
|
||||
|
||||
assert_eq!(get_res, blob)
|
||||
}
|
||||
}
|
||||
176
rfs/src/store/zdb.rs
Normal file
176
rfs/src/store/zdb.rs
Normal file
@@ -0,0 +1,176 @@
|
||||
use super::{Error, Result, Route, Store};
|
||||
use anyhow::Context;
|
||||
|
||||
use bb8_redis::{
|
||||
bb8::{CustomizeConnection, Pool},
|
||||
redis::{
|
||||
aio::Connection, cmd, AsyncCommands, ConnectionAddr, ConnectionInfo, RedisConnectionInfo,
|
||||
RedisError,
|
||||
},
|
||||
RedisConnectionManager,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct WithNamespace {
|
||||
namespace: Option<String>,
|
||||
password: Option<String>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CustomizeConnection<Connection, RedisError> for WithNamespace {
|
||||
async fn on_acquire(&self, connection: &mut Connection) -> anyhow::Result<(), RedisError> {
|
||||
match self.namespace {
|
||||
Some(ref ns) if ns != "default" => {
|
||||
let mut c = cmd("SELECT");
|
||||
let c = c.arg(ns);
|
||||
if let Some(ref password) = self.password {
|
||||
c.arg(password);
|
||||
}
|
||||
|
||||
let result = c.query_async(connection).await;
|
||||
if let Err(ref err) = result {
|
||||
error!("failed to switch namespace to {}: {}", ns, err);
|
||||
}
|
||||
result
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ZdbStoreFactory;
|
||||
|
||||
fn get_connection_info<U: AsRef<str>>(u: U) -> Result<(ConnectionInfo, Option<String>)> {
|
||||
let u = url::Url::parse(u.as_ref())?;
|
||||
|
||||
let (address, namespace) = match u.host() {
|
||||
Some(host) => {
|
||||
let addr = match host {
|
||||
url::Host::Domain(domain) => domain.to_owned(),
|
||||
url::Host::Ipv4(ipv4) => ipv4.to_string(),
|
||||
url::Host::Ipv6(ipv6) => ipv6.to_string(),
|
||||
};
|
||||
|
||||
let addr = ConnectionAddr::Tcp(addr, u.port().unwrap_or(9900));
|
||||
let ns: Option<String> = u
|
||||
.path_segments()
|
||||
.and_then(|s| s.last().map(|s| s.to_owned()));
|
||||
(addr, ns)
|
||||
}
|
||||
None => (ConnectionAddr::Unix(u.path().into()), None),
|
||||
};
|
||||
|
||||
Ok((
|
||||
ConnectionInfo {
|
||||
addr: address,
|
||||
redis: RedisConnectionInfo {
|
||||
db: 0,
|
||||
username: if u.username().is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(u.username().into())
|
||||
},
|
||||
password: u.password().map(|s| s.into()),
|
||||
},
|
||||
},
|
||||
namespace,
|
||||
))
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ZdbStore {
|
||||
url: String,
|
||||
pool: Pool<RedisConnectionManager>,
|
||||
}
|
||||
|
||||
impl ZdbStore {
|
||||
pub async fn make<U: AsRef<str>>(url: &U) -> Result<ZdbStore> {
|
||||
let (mut info, namespace) = get_connection_info(url.as_ref())?;
|
||||
|
||||
let namespace = WithNamespace {
|
||||
namespace,
|
||||
password: info.redis.password.take(),
|
||||
};
|
||||
|
||||
log::debug!("connection {:#?}", info);
|
||||
log::debug!("switching namespace to: {:?}", namespace.namespace);
|
||||
|
||||
let mgr = RedisConnectionManager::new(info)
|
||||
.context("failed to create redis connection manager")?;
|
||||
|
||||
let pool = Pool::builder()
|
||||
.max_size(20)
|
||||
.connection_customizer(Box::new(namespace))
|
||||
.build(mgr)
|
||||
.await
|
||||
.context("failed to create connection pool")?;
|
||||
|
||||
Ok(ZdbStore {
|
||||
url: url.as_ref().to_string(),
|
||||
pool,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Store for ZdbStore {
|
||||
async fn get(&self, key: &[u8]) -> super::Result<Vec<u8>> {
|
||||
let mut con = self.pool.get().await.context("failed to get connection")?;
|
||||
|
||||
let result: Option<Vec<u8>> = con.get(key).await.context("failed to get blob")?;
|
||||
let result = result.ok_or(Error::KeyNotFound)?;
|
||||
|
||||
if result.is_empty() {
|
||||
return Err(Error::InvalidBlob);
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn set(&self, key: &[u8], blob: &[u8]) -> Result<()> {
|
||||
let mut con = self.pool.get().await.context("failed to get connection")?;
|
||||
|
||||
if con
|
||||
.exists(key)
|
||||
.await
|
||||
.context("failed to check if blob exists")?
|
||||
{
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
con.set(key, blob).await.context("failed to set blob")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn routes(&self) -> Vec<Route> {
|
||||
vec![Route::url(self.url.clone())]
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_connection_info_simple() {
|
||||
let (info, ns) = get_connection_info("zdb://hub.grid.tf:9900").unwrap();
|
||||
assert_eq!(ns, None);
|
||||
assert_eq!(info.addr, ConnectionAddr::Tcp("hub.grid.tf".into(), 9900));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_connection_info_ns() {
|
||||
let (info, ns) = get_connection_info("zdb://username@hub.grid.tf/custom").unwrap();
|
||||
assert_eq!(ns, Some("custom".into()));
|
||||
assert_eq!(info.addr, ConnectionAddr::Tcp("hub.grid.tf".into(), 9900));
|
||||
assert_eq!(info.redis.username, Some("username".into()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_connection_info_unix() {
|
||||
let (info, ns) = get_connection_info("zdb:///path/to/socket").unwrap();
|
||||
assert_eq!(ns, None);
|
||||
assert_eq!(info.addr, ConnectionAddr::Unix("/path/to/socket".into()));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user