Init Project #1

Open
allanger wants to merge 10 commits from init-project into main
24 changed files with 7346 additions and 0 deletions

1
operator/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

5228
operator/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

63
operator/Cargo.toml Normal file
View File

@@ -0,0 +1,63 @@
[package]
name = "s3-operator"
version = "0.1.0"
authors = ["allanger <iam@allanger.xyz>"]
edition = "2024"
default-run = "controller"
license = "GPLv3"
publish = false
[[bin]]
doc = false
name = "controller"
path = "src/controller.rs"
[[bin]]
doc = false
name = "crdgen"
path = "src/crdgen.rs"
[lib]
name = "api"
path = "src/lib.rs"
[dependencies]
kube = { version = "3.0.1", features = ["runtime", "derive", "client", "aws-lc-rs"] }
k8s-openapi = { version = "0.27.0", features = ["latest", "schemars"] }
schemars = { version = "1" }
darling = "0.23.0"
clap = { version = "4.5.60", features = ["derive"] }
serde = { version = "1.0.228", features = ["serde_derive"] }
serde_json = "1.0.149"
serde_yaml = "0.9.34"
thiserror = "2.0.18"
tracing = "0.1.44"
tokio = { version = "1.49.0", features = ["macros", "rt-multi-thread"] }
anyhow = "1.0.102"
futures = "0.3.32"
actix-web = "4.13.0"
tracing-subscriber = { version = "0.3.22", features = ["json", "env-filter"] }
aws-config = { version = "1.8.15", features = ["behavior-version-latest", "rustls"] }
aws-sdk-s3 = "1.125.0"
aws-credential-types = "1.2.14"
minio = "0.3.0"
reqwest = { version = "0.13.2", features = ["json"] }
base64 = "0.22.1"
rand = "0.10.0"
aws-sigv4 = { version = "1.4.2", features = ["sigv4a"] }
http = "1"
aws-sig-auth = "0.60.3"
aws-smithy-http = "0.63.6"
aws-types = "1.3.14"
hmac = "0.12.1"
sha2 = "0.10.9"
hex = "0.4.3"
async-trait = "0.1.89"
[dev-dependencies]
assert-json-diff = "2.0.2"
envtest = "0.1.2"
http = "1"
hyper = "1"
tower-test = "0.4.0"

View File

@@ -0,0 +1,10 @@
---
apiVersion: s3.badhouseplants.net/v1beta1
kind: S3Bucket
metadata:
name: test
namespace: default
spec:
instance: test
cleanup: true
ownConfigmap: false

View File

@@ -0,0 +1,9 @@
---
apiVersion: s3.badhouseplants.net/v1beta1
kind: S3BucketUser
metadata:
name: test
namespace: default
spec:
bucket: test
policy: readWrite

View File

@@ -0,0 +1,22 @@
---
apiVersion: v1
kind: Secret
metadata:
name: test
namespace: default
stringData:
ACCESS_KEY: overlord
SECRET_KEY: 's1cdlej3#^&LQetsQ8GUYix7ypLf#$#$wsdf'
---
apiVersion: s3.badhouseplants.net/v1beta1
kind: S3Instance
metadata:
name: test
spec:
endpoint: https://rustfs.badhouseplants.net
forcePathStyle: true
region: us-east-1
provider: rustfs
credentialsSecret:
namespace: default
name: test

1
operator/src/api/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod v1beta1;

View File

@@ -0,0 +1,3 @@
pub mod s3_bucket;
pub mod s3_bucket_user;
pub mod s3_instance;

View File

@@ -0,0 +1,53 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
use k8s_openapi::serde::{Deserialize, Serialize};
use kube::CustomResource;
use kube::{self};
use schemars::JsonSchema;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
kind = "S3Bucket",
group = "s3.badhouseplants.net",
version = "v1beta1",
shortname = "bucket",
doc = "Manage buckets on the s3 instance",
namespaced,
status = "S3BucketStatus",
printcolumn = r#"{"name":"Instance","type":"string","description":"On which instance this bucket is created","jsonPath":".spec.instance"}"#,
printcolumn = r#"{"name":"Region","type":"string","description":"The region of the bucket","jsonPath":".status.region"}"#,
printcolumn = r#"{"name":"Bucket Name","type":"string","description":"The full name of the bucket","jsonPath":".status.bucketName"}"#,
printcolumn = r#"{"name":"Total Objects","type":"number","description":"How many objects are there in the bucket","jsonPath":".status.totalObjects"}"#,
printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"#
)]
#[serde(rename_all = "camelCase")]
pub struct S3BucketSpec {
/// On which instance this bucket should be created
pub instance: String,
/// Should perform a cleanup on delete?
/// It will remove all objects from the bucket
#[serde(default)]
pub cleanup: bool,
/// Should set the owner reference on the CM
#[serde(default)]
pub own_configmap: bool,
}
/// The status object of `DbInstance`
#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct S3BucketStatus {
/// Is this bucket ready.
#[serde(default)]
pub ready: bool,
pub conditions: Vec<Condition>,
#[serde(default)]
pub size: Option<u64>,
#[serde(default)]
pub total_objects: Option<u64>,
#[serde(default)]
pub endpoint: Option<String>,
#[serde(default)]
pub region: Option<String>,
#[serde(default)]
pub bucket_name: Option<String>,
}

View File

@@ -0,0 +1,40 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
use k8s_openapi::serde::{Deserialize, Serialize};
use kube::CustomResource;
use kube::{self};
use schemars::JsonSchema;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
kind = "S3BucketUser",
group = "s3.badhouseplants.net",
version = "v1beta1",
shortname = "s3bu",
doc = "Manage users that have access to s3 buckets",
namespaced,
status = "S3BucketUserStatus",
printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"#
)]
#[serde(rename_all = "camelCase")]
pub struct S3BucketUserSpec {
/// To which bucket access should be provided
pub bucket: String,
pub policy: String,
/// Should perform a cleanup on delete?
#[serde(default)]
pub cleanup: bool,
/// Should set the owner reference on the Secret
#[serde(default)]
pub own_secret: bool,
}
/// The status object of `DbInstance`
#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
pub struct S3BucketUserStatus {
/// Is this bucket ready.
#[serde(default)]
pub ready: bool,
pub conditions: Vec<Condition>,
#[serde(default)]
pub access_key: Option<String>,
}

View File

@@ -0,0 +1,57 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
use k8s_openapi::serde::{Deserialize, Serialize};
use kube::CustomResource;
use kube::{self};
use schemars::JsonSchema;
#[derive(Serialize, Deserialize, JsonSchema, Clone, Debug)]
pub enum Provider {
#[serde(rename = "minio")]
Minio,
#[serde(rename = "rustfs")]
Rustfs,
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
kind = "S3Instance",
group = "s3.badhouseplants.net",
version = "v1beta1",
shortname = "s3in",
doc = "Connect the operator to any s3 backend using this resource",
status = "S3InstanceStatus",
printcolumn = r#"{"name":"Endpoint","type":"string","description":"The URL of the instance","jsonPath":".spec.endpoint"}"#,
printcolumn = r#"{"name":"Region","type":"string","description":"The region of the instance","jsonPath":".spec.region"}"#,
printcolumn = r#"{"name":"Path Style","type":"boolean","description":"Is forcing path style","jsonPath":".spec.forcePathStyle"}"#,
printcolumn = r#"{"name":"Total Buckets","type":"number","description":"How many buckets are there on the instance","jsonPath":".status.total_buckets"}"#,
printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"#
)]
#[serde(rename_all = "camelCase")]
pub struct S3InstanceSpec {
pub endpoint: String,
pub region: String,
pub credentials_secret: NamespacedName,
#[serde(default)]
pub force_path_style: bool,
pub provider: Provider,
}
/// The status object of `DbInstance`
#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
pub struct S3InstanceStatus {
#[serde(default)]
pub ready: bool,
//#[schemars(schema_with = "conditions")]
pub conditions: Vec<Condition>,
#[serde(default)]
pub buckets: Option<Vec<String>>,
#[serde(default)]
pub total_buckets: Option<usize>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub struct NamespacedName {
#[serde(rename = "namespace")]
pub namespace: String,
#[serde(rename = "name")]
pub name: String,
}

View File

@@ -0,0 +1,51 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time};
use k8s_openapi::jiff::Timestamp;
use kube::api::ObjectMeta;
pub(crate) fn set_condition(
mut conditions: Vec<Condition>,
metadata: ObjectMeta,
condition_type: &str,
condition_status: String,
condition_reason: String,
condition_message: String,
) -> Vec<Condition> {
if let Some(condition) = conditions.iter_mut().find(|c| c.type_ == condition_type) {
condition.status = condition_status;
condition.last_transition_time = Time::from(Timestamp::now());
condition.message = condition_message;
condition.reason = condition_reason;
condition.observed_generation = metadata.generation;
} else {
conditions.push(Condition {
last_transition_time: Time::from(Timestamp::now()),
message: condition_message,
observed_generation: metadata.generation,
reason: condition_reason,
status: condition_status,
type_: condition_type.to_string(),
});
}
conditions
}
pub(crate) fn is_condition_true(mut conditions: Vec<Condition>, condition_type: &str) -> bool {
if let Some(condition) = conditions.iter_mut().find(|c| c.type_ == condition_type) {
return condition.status == "True";
}
false
}
pub(crate) fn is_condition_false(mut conditions: Vec<Condition>, condition_type: &str) -> bool {
if let Some(condition) = conditions.iter_mut().find(|c| c.type_ == condition_type) {
return condition.status == "False";
}
false
}
pub(crate) fn is_condition_unknown(mut conditions: Vec<Condition>, condition_type: &str) -> bool {
if let Some(condition) = conditions.iter_mut().find(|c| c.type_ == condition_type) {
return condition.status == "Unknown";
}
false
}

View File

@@ -0,0 +1,66 @@
mod conditions;
mod controllers;
mod providers;
mod s3;
use crate::controllers::{s3_bucket, s3_bucket_user, s3_instance};
use actix_web::{App, HttpRequest, HttpResponse, HttpServer, Responder, get, middleware};
use clap::Parser;
use kube::Client;
use tracing_subscriber::EnvFilter;
/// Simple program to greet a person
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(long, default_value_t = 60000)]
/// The address the metric endpoint binds to.
metrics_port: u16,
#[arg(long, default_value_t = 8081)]
/// The address the probe endpoint binds to.
health_probe_port: u16,
#[arg(long, default_value_t = true)]
/// Enabling this will ensure there is only one active controller manager.
// DB Operator feature flags
#[arg(long, default_value_t = false)]
/// If enabled, DB Operator will run full reconciliation only
/// when changes are detected
is_change_check_nabled: bool,
}
#[get("/health")]
async fn health(_: HttpRequest) -> impl Responder {
HttpResponse::Ok().json("healthy")
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.json()
.with_env_filter(EnvFilter::from_default_env())
.init();
let client = Client::try_default()
.await
.expect("failed to create kube Client");
let s3in_controller = s3_instance::run(client.clone());
let s3bucket_controller = s3_bucket::run(client.clone());
let s3bucketuser_controller = s3_bucket_user::run(client.clone());
// Start web server
let server = HttpServer::new(move || {
App::new()
.wrap(middleware::Logger::default().exclude("/health"))
.service(health)
})
.bind("0.0.0.0:8080")?
.shutdown_timeout(5);
// Both runtimes implements graceful shutdown, so poll until both are done
tokio::join!(
s3in_controller,
s3bucket_controller,
s3bucketuser_controller,
server.run()
)
.3?;
Ok(())
}

View File

@@ -0,0 +1,3 @@
pub(crate) mod s3_bucket;
pub(crate) mod s3_bucket_user;
pub(crate) mod s3_instance;

View File

@@ -0,0 +1,489 @@
use crate::conditions::{is_condition_true, set_condition};
use crate::controllers::s3_instance;
use crate::s3::S3Client;
use crate::s3::s3api::S3Api;
use api::api::v1beta1::s3_bucket::{S3Bucket, S3BucketStatus};
use api::api::v1beta1::s3_instance::S3Instance;
use futures::StreamExt;
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
use kube::api::{ListParams, ObjectMeta, PostParams};
use kube::runtime::Controller;
use kube::runtime::controller::Action;
use kube::runtime::events::Recorder;
use kube::runtime::watcher::Config;
use kube::{Api, Client, Error, Resource, ResourceExt};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tracing::*;
const TYPE_BUCKET_READY: &str = "BucketReady";
const FIN_CLEANUP: &str = "s3.badhouseplants.net/bucket-cleanup";
const CONFIGMAP_LABEL: &str = "s3.badhouseplants.net/s3-bucket";
const AWS_REGION: &str = "AWS_REGION";
const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL";
#[instrument(skip(ctx, obj), fields(trace_id))]
pub(crate) async fn reconcile(obj: Arc<S3Bucket>, ctx: Arc<Context>) -> S3BucketResult<Action> {
info!("Staring reconciling");
let s3bucket_api: Api<S3Bucket> =
Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
let cm_api: Api<ConfigMap> = Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
let s3in_api: Api<S3Instance> = Api::all(ctx.client.clone());
info!("Getting the S3Bucket resource");
let mut s3bucket = match s3bucket_api.get(&obj.name_any()).await {
Ok(s3bucket) => s3bucket,
Err(Error::Api(ae)) if ae.code == 404 => {
info!("Object is not found, probably removed");
return Ok(Action::await_change());
}
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
};
// On the first reconciliation status is None
// it needs to be initialized
let mut status = match s3bucket.clone().status {
None => {
info!("Status is not yet set, initializing the object");
return init_object(s3bucket, s3bucket_api).await;
}
Some(status) => status,
};
let configmap_name = format!("{}-bucket-info", s3bucket.name_any());
info!("Getting the configmap");
// Get the cm, if it's already there, we need to validate, or create an empty one
let mut configmap = match get_configmap(cm_api.clone(), &configmap_name).await {
Ok(configmap) => configmap,
Err(Error::Api(ae)) if ae.code == 404 => {
info!("ConfigMap is not found, creating a new one");
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some(configmap_name),
namespace: Some(s3bucket.clone().namespace().unwrap()),
..Default::default()
},
..Default::default()
};
match create_configmap(cm_api.clone(), cm).await {
Ok(cm) => cm,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
}
}
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
};
info!("Labeling the configmap");
configmap = match label_configmap(cm_api.clone(), &s3bucket.name_any(), configmap).await {
Ok(configmap) => configmap,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
};
info!("Setting owner references to the configmap");
if s3bucket.spec.own_configmap {
configmap = match own_configmap(cm_api.clone(), s3bucket.clone(), configmap).await {
Ok(configmap) => configmap,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
};
};
if is_condition_true(status.clone().conditions, TYPE_BUCKET_READY) {
let mut current_finalizers = match s3bucket.clone().metadata.finalizers {
Some(finalizers) => finalizers,
None => vec![],
};
if s3bucket.spec.cleanup {
if !current_finalizers.contains(&FIN_CLEANUP.to_string()) {
info!("Adding a finalizer");
current_finalizers.push(FIN_CLEANUP.to_string());
}
} else {
if current_finalizers.contains(&FIN_CLEANUP.to_string()) {
if let Some(index) = current_finalizers
.iter()
.position(|x| *x == FIN_CLEANUP.to_string())
{
current_finalizers.remove(index);
};
}
};
s3bucket.metadata.finalizers = Some(current_finalizers);
match s3bucket_api
.replace(&s3bucket.name_any(), &PostParams::default(), &s3bucket)
.await
{
Ok(_) => {
return Ok(Action::await_change());
}
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
}
};
info!("Getting the S3Intsance");
let s3in = match s3in_api.get(&s3bucket.spec.instance).await {
Ok(s3in) => s3in,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
};
info!("Updating the ConfigMap");
if let Err(err) = ensure_data_configmap(cm_api.clone(), s3in.clone(), configmap.clone()).await {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
};
info!("Getting the s3instance secret");
let secret_ns = s3in.clone().spec.credentials_secret.namespace;
let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
let secret = match s3_instance::get_secret(secret_api.clone(), s3in.clone()).await {
Ok(secret) => secret,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
};
info!("Getting data from the secret");
// Getting data from the secret to initialize the clinet
let data = match secret.data {
Some(data) => data,
None => {
let err = anyhow::Error::msg("empty data");
error!("{}", err);
return Err(S3BucketError::InvalidSecret(err));
}
};
let access_key = match data.get(s3_instance::ACCESS_KEY) {
Some(access_key) => String::from_utf8(access_key.0.clone()).unwrap(),
None => {
let err = anyhow::Error::msg("empty access key");
error!("{}", err);
return Err(S3BucketError::InvalidSecret(err));
}
};
let secret_key = match data.get(s3_instance::SECRET_KEY) {
Some(secret_key) => String::from_utf8(secret_key.0.clone()).unwrap(),
None => {
let err = anyhow::Error::msg("empty secret key");
error!("{}", err);
return Err(S3BucketError::InvalidSecret(err));
}
};
info!("Creating an s3 client");
let s3_client = S3Api::new(
access_key,
secret_key,
s3in.clone().spec.endpoint.to_string(),
s3in.clone().spec.region.to_string(),
s3in.clone().spec.force_path_style,
)
.await;
let bucket_name = format!("{}-{}", s3bucket.namespace().unwrap(), s3bucket.name_any());
if s3bucket.metadata.deletion_timestamp.is_some() {
info!("Object is marked for deletion");
if let Some(mut finalizers) = s3bucket.clone().metadata.finalizers {
if finalizers.contains(&FIN_CLEANUP.to_string()) {
match s3_client.clone().delete_bucket(bucket_name.clone()).await {
Ok(_) => {
if let Some(index) = finalizers
.iter()
.position(|x| *x == FIN_CLEANUP.to_string())
{
finalizers.remove(index);
};
}
Err(err) => {
error!("{}", err);
return Err(S3BucketError::IllegalS3Bucket);
}
}
}
s3bucket.metadata.finalizers = Some(finalizers);
};
match s3bucket_api
.replace(&s3bucket.name_any(), &PostParams::default(), &s3bucket)
.await
{
Ok(_) => {
return Ok(Action::await_change());
}
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
}
}
info!("Getting buckets");
let buckets = match s3_client.clone().list_buckets().await {
Ok(buckets) => buckets,
Err(err) => {
error!("{}", err);
return Err(S3BucketError::IllegalS3Bucket);
}
};
if buckets.contains(&bucket_name) {
info!("Bucket already exists");
} else {
if let Err(err) = s3_client.clone().create_bucket(bucket_name.clone()).await {
error!("{}", err);
return Err(S3BucketError::IllegalS3Bucket);
}
}
status.ready = true;
status.conditions = set_condition(
status.conditions,
s3bucket.metadata.clone(),
TYPE_BUCKET_READY,
"True".to_string(),
"Reconciled".to_string(),
"Bucket is ready".to_string(),
);
status.endpoint = Some(s3in.clone().spec.endpoint);
status.size = s3_client.clone().count_size(bucket_name.clone()).await.ok();
status.total_objects = s3_client
.clone()
.count_objects(bucket_name.clone())
.await
.ok();
status.region = Some(s3in.spec.region);
status.bucket_name = Some(bucket_name.clone());
s3bucket.status = Some(status);
info!("Updating status of the s3bucket resource");
match s3bucket_api
.replace_status(&s3bucket.name_any(), &PostParams::default(), &s3bucket)
.await
{
Ok(_) => {
return Ok(Action::requeue(Duration::from_secs(120)));
}
Err(err) => {
error!("{}", err);
return Err(S3BucketError::KubeError(err));
}
};
}
// Bootstrap the object by adding a default status to it
async fn init_object(mut obj: S3Bucket, api: Api<S3Bucket>) -> Result<Action, S3BucketError> {
let conditions = set_condition(
vec![],
obj.metadata.clone(),
TYPE_BUCKET_READY,
"Unknown".to_string(),
"Reconciling".to_string(),
"Reconciliation started".to_string(),
);
obj.status = Some(S3BucketStatus {
conditions,
..S3BucketStatus::default()
});
match api
.replace_status(obj.clone().name_any().as_str(), &Default::default(), &obj)
.await
{
Ok(_) => Ok(Action::await_change()),
Err(err) => {
error!("{}", err);
Err(S3BucketError::KubeError(err))
}
}
}
// Get the configmap with the bucket data
async fn get_configmap(api: Api<ConfigMap>, name: &str) -> Result<ConfigMap, kube::Error> {
info!("Getting a configmap: {}", name);
match api.get(name).await {
Ok(cm) => Ok(cm),
Err(err) => Err(err),
}
}
// Create ConfigMap
async fn create_configmap(api: Api<ConfigMap>, cm: ConfigMap) -> Result<ConfigMap, kube::Error> {
match api.create(&PostParams::default(), &cm).await {
Ok(cm) => get_configmap(api, &cm.name_any()).await,
Err(err) => Err(err),
}
}
async fn label_configmap(
api: Api<ConfigMap>,
s3bucket_name: &str,
mut cm: ConfigMap,
) -> Result<ConfigMap, kube::Error> {
let mut labels = match &cm.clone().metadata.labels {
Some(labels) => labels.clone(),
None => {
let map: BTreeMap<String, String> = BTreeMap::new();
map
}
};
labels.insert(CONFIGMAP_LABEL.to_string(), s3bucket_name.to_string());
cm.metadata.labels = Some(labels);
api.replace(&cm.name_any(), &PostParams::default(), &cm)
.await?;
let cm = match api.get(&cm.name_any()).await {
Ok(cm) => cm,
Err(err) => {
return Err(err);
}
};
Ok(cm)
}
async fn own_configmap(
api: Api<ConfigMap>,
s3bucket: S3Bucket,
mut cm: ConfigMap,
) -> Result<ConfigMap, kube::Error> {
let mut owner_references = match &cm.clone().metadata.owner_references {
Some(owner_references) => owner_references.clone(),
None => {
let owner_references: Vec<OwnerReference> = vec![];
owner_references
}
};
if owner_references
.iter()
.find(|or| or.uid == s3bucket.uid().unwrap())
.is_some()
{
return Ok(cm);
}
let new_owner_reference = OwnerReference {
api_version: S3Bucket::api_version(&()).into(),
kind: S3Bucket::kind(&()).into(),
name: s3bucket.name_any(),
uid: s3bucket.uid().unwrap(),
..Default::default()
};
owner_references.push(new_owner_reference);
cm.metadata.owner_references = Some(owner_references);
api.replace(&cm.name_any(), &PostParams::default(), &cm)
.await?;
let cm = match api.get(&cm.name_any()).await {
Ok(cm) => cm,
Err(err) => {
return Err(err);
}
};
Ok(cm)
}
async fn ensure_data_configmap(
api: Api<ConfigMap>,
s3in: S3Instance,
mut cm: ConfigMap,
) -> Result<ConfigMap, kube::Error> {
let mut data = match &cm.clone().data {
Some(data) => data.clone(),
None => {
let map: BTreeMap<String, String> = BTreeMap::new();
map
}
};
data.insert(AWS_REGION.to_string(), s3in.spec.region);
data.insert(AWS_ENDPOINT_URL.to_string(), s3in.spec.endpoint);
cm.data = Some(data);
api.replace(&cm.name_any(), &PostParams::default(), &cm)
.await?;
match api.get(&cm.name_any()).await {
Ok(cm) => Ok(cm),
Err(err) => Err(err),
}
}
pub(crate) fn error_policy(_: Arc<S3Bucket>, _: &S3BucketError, _: Arc<Context>) -> Action {
Action::requeue(Duration::from_secs(5 * 60))
}
#[instrument(skip(client), fields(trace_id))]
pub async fn run(client: Client) {
let s3buckets = Api::<S3Bucket>::all(client.clone());
if let Err(err) = s3buckets.list(&ListParams::default().limit(1)).await {
error!("{}", err);
std::process::exit(1);
}
let recorder = Recorder::new(client.clone(), "s3bucket-controller".into());
let context = Context { client, recorder };
Controller::new(s3buckets, Config::default().any_semantic())
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(context))
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await;
}
// Context for our reconciler
#[derive(Clone)]
pub(crate) struct Context {
/// Kubernetes client
pub client: Client,
/// Event recorder
pub recorder: Recorder,
}
#[derive(Error, Debug)]
pub enum S3BucketError {
#[error("SerializationError: {0}")]
SerializationError(#[source] serde_json::Error),
#[error("Kube Error: {0}")]
KubeError(#[source] kube::Error),
#[error("Finalizer Error: {0}")]
// NB: awkward type because finalizer::Error embeds the reconciler error (which is this)
// so boxing this error to break cycles
FinalizerError(#[source] Box<kube::runtime::finalizer::Error<S3BucketError>>),
#[error("IllegalS3Bucket")]
IllegalS3Bucket,
#[error("SecretIsAlreadyLabeled")]
SecretIsAlreadyLabeled,
#[error("Invalid Secret: {0}")]
InvalidSecret(#[source] anyhow::Error),
}
pub type S3BucketResult<T, E = S3BucketError> = std::result::Result<T, E>;

View File

@@ -0,0 +1,496 @@
use crate::conditions::{is_condition_true, set_condition};
use crate::controllers::s3_instance;
use crate::providers::{ProviderAPI, SupportedProvider};
use crate::providers::rustfs::RustFS;
use crate::s3::s3api::S3Api;
use api::api::v1beta1::s3_bucket::S3Bucket;
use api::api::v1beta1::s3_bucket_user::{S3BucketUser, S3BucketUserStatus};
use api::api::v1beta1::s3_instance::S3Instance;
use futures::StreamExt;
use k8s_openapi::ByteString;
use k8s_openapi::api::core::v1::{Pod, Secret};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
use kube::api::{ListParams, ObjectMeta, PostParams};
use kube::runtime::Controller;
use kube::runtime::controller::Action;
use kube::runtime::events::Recorder;
use kube::runtime::watcher::Config;
use kube::{Api, Client, Error, Resource, ResourceExt};
use rand::RngExt;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tracing::*;
const TYPE_USER_READY: &str = "UserReady";
const FIN_CLEANUP: &str = "s3.badhouseplants.net/user-cleanup";
const SECRET_LABEL: &str = "s3.badhouseplants.net/s3-bucket";
const AWS_ACCESS_KEY_ID: &str = "AWS_ACCESS_KEY_ID";
const AWS_SECCRET_ACCESS_KEY: &str = "AWS_SECRET_ACCESS_KEY";
const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ\
abcdefghijklmnopqrstuvwxyz\
0123456789)(*&^%$#@!~";
const PASSWORD_LEN: usize = 40;
#[instrument(skip(ctx, obj), fields(trace_id))]
pub(crate) async fn reconcile(
obj: Arc<S3BucketUser>,
ctx: Arc<Context>,
) -> S3BucketUserResult<Action> {
info!("Staring reconciling");
let s3bucketuser_api: Api<S3BucketUser> =
Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
let s3bucket_api: Api<S3Bucket> =
Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap());
let s3in_api: Api<S3Instance> = Api::all(ctx.client.clone());
info!("Getting the S3BucketUser resource");
let mut s3bucketuser = match s3bucketuser_api.get(&obj.name_any()).await {
Ok(s3bucketuser) => s3bucketuser,
Err(Error::Api(ae)) if ae.code == 404 => {
info!("Object is not found, probably removed");
return Ok(Action::await_change());
}
Err(err) => {
error!("{}", err);
return Err(S3BucketUserError::KubeError(err));
}
};
// On the first reconciliation status is None
// it needs to be initialized
let mut status = match s3bucketuser.clone().status {
None => {
info!("Status is not yet set, initializing the object");
return init_object(s3bucketuser, s3bucketuser_api).await;
}
Some(status) => status,
};
let secret_name = format!("{}-bucket-creds", s3bucketuser.name_any());
info!("Getting the secret");
// Get the secret, if it's already there, we need to validate, or create an empty one
let mut secret = match get_secret(secret_api.clone(), &secret_name).await {
Ok(secret) => secret,
Err(Error::Api(ae)) if ae.code == 404 => {
info!("Secret is not found, creating a new one");
let secret = Secret {
metadata: ObjectMeta {
name: Some(secret_name),
namespace: Some(s3bucketuser.clone().namespace().unwrap()),
..Default::default()
},
..Default::default()
};
match create_secret(secret_api.clone(), secret).await {
Ok(cm) => cm,
Err(err) => {
error!("{}", err);
return Err(S3BucketUserError::KubeError(err));
}
}
}
Err(err) => {
error!("{}", err);
return Err(S3BucketUserError::KubeError(err));
}
};
info!("Labeling the secret");
secret = match label_secret(secret_api.clone(), &s3bucketuser.name_any(), secret).await {
Ok(configmap) => configmap,
Err(err) => {
error!("{}", err);
return Err(S3BucketUserError::KubeError(err));
}
};
info!("Setting owner references to the secret");
if s3bucketuser.spec.own_secret {
secret = match own_secret(secret_api.clone(), s3bucketuser.clone(), secret).await {
Ok(secret) => secret,
Err(err) => {
error!("{}", err);
return Err(S3BucketUserError::KubeError(err));
}
};
};
if is_condition_true(status.clone().conditions, TYPE_USER_READY) {
let mut current_finalizers = match s3bucketuser.clone().metadata.finalizers {
Some(finalizers) => finalizers,
None => vec![],
};
if s3bucketuser.spec.cleanup {
if !current_finalizers.contains(&FIN_CLEANUP.to_string()) {
info!("Adding a finalizer");
current_finalizers.push(FIN_CLEANUP.to_string());
}
} else {
if current_finalizers.contains(&FIN_CLEANUP.to_string()) {
if let Some(index) = current_finalizers
.iter()
.position(|x| *x == FIN_CLEANUP.to_string())
{
current_finalizers.remove(index);
};
}
};
s3bucketuser.metadata.finalizers = Some(current_finalizers);
if let Err(err) = s3bucketuser_api
.replace(
&s3bucketuser.name_any(),
&PostParams::default(),
&s3bucketuser,
)
.await
{
error!("{}", err);
return Err(S3BucketUserError::KubeError(err));
}
};
info!("Getting the S3Bucket");
let s3bucket = match s3bucket_api.get(&s3bucketuser.spec.bucket).await {
Ok(s3bucket) => s3bucket,
Err(err) => {
error!("{}", err);
return Err(S3BucketUserError::KubeError(err));
}
};
info!("Getting the S3Intsance");
let s3in = match s3in_api.get(&s3bucket.spec.instance).await {
Ok(s3in) => s3in,
Err(err) => {
error!("{}", err);
return Err(S3BucketUserError::KubeError(err));
}
};
info!("Getting the s3instance secret");
let secret_ns = s3in.clone().spec.credentials_secret.namespace;
let s3in_secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), &secret_ns);
let s3in_secret = match s3_instance::get_secret(s3in_secret_api.clone(), s3in.clone()).await {
Ok(secret) => secret,
Err(err) => {
error!("{}", err);
return Err(S3BucketUserError::KubeError(err));
}
};
info!("Getting data from the secret");
// Getting data from the secret to initialize the clinet
let data = match s3in_secret.data {
Some(data) => data,
None => {
let err = anyhow::Error::msg("empty data");
error!("{}", err);
return Err(S3BucketUserError::InvalidSecret(err));
}
};
let access_key = match data.get(s3_instance::ACCESS_KEY) {
Some(access_key) => String::from_utf8(access_key.0.clone()).unwrap(),
None => {
let err = anyhow::Error::msg("empty access key");
error!("{}", err);
return Err(S3BucketUserError::InvalidSecret(err));
}
};
let secret_key = match data.get(s3_instance::SECRET_KEY) {
Some(secret_key) => String::from_utf8(secret_key.0.clone()).unwrap(),
None => {
let err = anyhow::Error::msg("empty secret key");
error!("{}", err);
return Err(S3BucketUserError::InvalidSecret(err));
}
};
let username = format!(
"{}-{}",
s3bucketuser.namespace().unwrap(),
s3bucketuser.name_any()
);
let password = generate_password();
let provider: SupportedProvider = match s3in.clone().spec.provider {
api::api::v1beta1::s3_instance::Provider::Minio => todo!(),
api::api::v1beta1::s3_instance::Provider::Rustfs => SupportedProvider::RustFS(RustFS::new(access_key.clone(), secret_key.clone(), s3in.spec.endpoint.clone(), s3in.spec.region.clone())),
};
info!("Creating a user");
if let Err(err) = provider.create_user(username.clone(), password.clone()) {
error!("{}", err);
return Err(S3BucketUserError::IllegalS3BucketUser);
}
info!("Creating an s3 client");
let s3_client = S3Api::new(
access_key,
secret_key,
s3in.clone().spec.endpoint.to_string(),
s3in.clone().spec.region.to_string(),
s3in.clone().spec.force_path_style,
)
.await;
secret = match ensure_data_secret(secret_api, secret, username.clone(), password.clone()).await
{
Ok(secret) => secret,
Err(err) => {
error!("{}", err);
return Err(S3BucketUserError::KubeError(err));
}
};
if s3bucketuser.metadata.deletion_timestamp.is_some() {
todo!();
}
status.ready = true;
status.conditions = set_condition(
status.conditions,
s3bucket.metadata.clone(),
TYPE_USER_READY,
"True".to_string(),
"Reconciled".to_string(),
"User is ready".to_string(),
);
status.access_key = Some(username.clone());
s3bucketuser.status = Some(status);
info!("Updating status of the s3bucket user resource");
match s3bucketuser_api
.replace_status(
&s3bucketuser.name_any(),
&PostParams::default(),
&s3bucketuser,
)
.await
{
Ok(_) => {
return Ok(Action::requeue(Duration::from_secs(120)));
}
Err(err) => {
error!("{}", err);
return Err(S3BucketUserError::KubeError(err));
}
};
}
// Bootstrap the object by adding a default status to it
async fn init_object(
mut obj: S3BucketUser,
api: Api<S3BucketUser>,
) -> Result<Action, S3BucketUserError> {
let conditions = set_condition(
vec![],
obj.metadata.clone(),
TYPE_USER_READY,
"Unknown".to_string(),
"Reconciling".to_string(),
"Reconciliation started".to_string(),
);
obj.status = Some(S3BucketUserStatus {
conditions,
..S3BucketUserStatus::default()
});
match api
.replace_status(obj.clone().name_any().as_str(), &Default::default(), &obj)
.await
{
Ok(_) => Ok(Action::await_change()),
Err(err) => {
error!("{}", err);
Err(S3BucketUserError::KubeError(err))
}
}
}
// Get the configmap with the bucket data
async fn get_secret(api: Api<Secret>, name: &str) -> Result<Secret, kube::Error> {
info!("Getting a secret: {}", name);
match api.get(name).await {
Ok(cm) => Ok(cm),
Err(err) => Err(err),
}
}
// Create ConfigMap
async fn create_secret(api: Api<Secret>, secret: Secret) -> Result<Secret, kube::Error> {
match api.create(&PostParams::default(), &secret).await {
Ok(secret) => get_secret(api, &secret.name_any()).await,
Err(err) => Err(err),
}
}
async fn label_secret(
api: Api<Secret>,
s3bucket_name: &str,
mut secret: Secret,
) -> Result<Secret, kube::Error> {
let mut labels = match &secret.clone().metadata.labels {
Some(labels) => labels.clone(),
None => {
let map: BTreeMap<String, String> = BTreeMap::new();
map
}
};
labels.insert(SECRET_LABEL.to_string(), s3bucket_name.to_string());
secret.metadata.labels = Some(labels);
api.replace(&secret.name_any(), &PostParams::default(), &secret)
.await?;
let secret = match api.get(&secret.name_any()).await {
Ok(secret) => secret,
Err(err) => {
return Err(err);
}
};
Ok(secret)
}
async fn own_secret(
api: Api<Secret>,
s3bucketuser: S3BucketUser,
mut secret: Secret,
) -> Result<Secret, kube::Error> {
let mut owner_references = match &secret.clone().metadata.owner_references {
Some(owner_references) => owner_references.clone(),
None => {
let owner_references: Vec<OwnerReference> = vec![];
owner_references
}
};
if owner_references
.iter()
.find(|or| or.uid == s3bucketuser.uid().unwrap())
.is_some()
{
return Ok(secret);
}
let new_owner_reference = OwnerReference {
api_version: S3Bucket::api_version(&()).into(),
kind: S3Bucket::kind(&()).into(),
name: s3bucketuser.name_any(),
uid: s3bucketuser.uid().unwrap(),
..Default::default()
};
owner_references.push(new_owner_reference);
secret.metadata.owner_references = Some(owner_references);
api.replace(&secret.name_any(), &PostParams::default(), &secret)
.await?;
let secret = match api.get(&secret.name_any()).await {
Ok(secret) => secret,
Err(err) => {
return Err(err);
}
};
Ok(secret)
}
async fn ensure_data_secret(
api: Api<Secret>,
mut secret: Secret,
username: String,
password: String,
) -> Result<Secret, kube::Error> {
let mut data = match &secret.clone().data {
Some(data) => data.clone(),
None => {
let map: BTreeMap<String, ByteString> = BTreeMap::new();
map
}
};
data.insert(
AWS_ACCESS_KEY_ID.to_string(),
ByteString(username.as_bytes().to_vec()),
);
data.insert(
AWS_SECCRET_ACCESS_KEY.to_string(),
ByteString(password.as_bytes().to_vec()),
);
secret.data = Some(data);
api.replace(&secret.name_any(), &PostParams::default(), &secret)
.await?;
api.get(&secret.name_any()).await
}
fn generate_password() -> String {
let mut rng = rand::rng();
let password: String = (0..PASSWORD_LEN)
.map(|_| {
let idx = rng.random_range(0..CHARSET.len());
char::from(CHARSET[idx])
})
.collect();
password
}
pub(crate) fn error_policy(_: Arc<S3BucketUser>, _: &S3BucketUserError, _: Arc<Context>) -> Action {
Action::requeue(Duration::from_secs(5 * 60))
}
#[instrument(skip(client), fields(trace_id))]
pub async fn run(client: Client) {
let s3bucketusers = Api::<S3BucketUser>::all(client.clone());
if let Err(err) = s3bucketusers.list(&ListParams::default().limit(1)).await {
error!("{}", err);
std::process::exit(1);
}
let recorder = Recorder::new(client.clone(), "s3bucketuser-controller".into());
let context = Context { client, recorder };
Controller::new(s3bucketusers, Config::default().any_semantic())
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(context))
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await;
}
// Context for our reconciler
#[derive(Clone)]
pub(crate) struct Context {
/// Kubernetes client
pub client: Client,
/// Event recorder
pub recorder: Recorder,
}
#[derive(Error, Debug)]
pub enum S3BucketUserError {
#[error("SerializationError: {0}")]
SerializationError(#[source] serde_json::Error),
#[error("Kube Error: {0}")]
KubeError(#[source] kube::Error),
#[error("Finalizer Error: {0}")]
// NB: awkward type because finalizer::Error embeds the reconciler error (which is this)
// so boxing this error to break cycles
FinalizerError(#[source] Box<kube::runtime::finalizer::Error<S3BucketUserError>>),
#[error("IllegalS3BucketUser")]
IllegalS3BucketUser,
#[error("SecretIsAlreadyLabeled")]
SecretIsAlreadyLabeled,
#[error("Invalid Secret: {0}")]
InvalidSecret(#[source] anyhow::Error),
}
pub type S3BucketUserResult<T, E = S3BucketUserError> = std::result::Result<T, E>;

View File

@@ -0,0 +1,456 @@
use crate::conditions::{is_condition_true, is_condition_unknown, set_condition};
use crate::s3::S3Client;
use crate::s3::s3api::S3Api;
use api::api::v1beta1::s3_instance::{S3Instance, S3InstanceStatus};
use futures::StreamExt;
use k8s_openapi::api::core::v1::Secret;
use kube::api::{ListParams, PostParams};
use kube::runtime::Controller;
use kube::runtime::controller::Action;
use kube::runtime::events::{Event, Recorder};
use kube::runtime::watcher::Config;
use kube::{Api, Client, Error, Resource, ResourceExt};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tracing::*;
const TYPE_CONNECTED: &str = "Connected";
const TYPE_SECRET_LABELED: &str = "SecretLabeled";
const FIN_SECRET_LABEL: &str = "s3.badhouseplants.net/s3-label";
const SECRET_LABEL: &str = "s3.badhouseplants.net/s3-instance";
pub(crate) const ACCESS_KEY: &str = "ACCESS_KEY";
pub(crate) const SECRET_KEY: &str = "SECRET_KEY";
#[instrument(skip(ctx, obj), fields(trace_id))]
pub(crate) async fn reconcile(obj: Arc<S3Instance>, ctx: Arc<Context>) -> S3InstanceResult<Action> {
info!("Staring reconciling");
info!("Getting the S3Instance resource");
let s3_api: Api<S3Instance> = Api::all(ctx.client.clone());
let mut s3in = match s3_api.get(obj.name_any().as_str()).await {
Ok(res) => res,
Err(Error::Api(ae)) if ae.code == 404 => {
info!("Object is not found, probably removed");
return Ok(Action::await_change());
}
Err(err) => {
error!("{}", err);
return Err(S3InstanceError::KubeError(err));
}
};
let secret_ns = s3in.clone().spec.credentials_secret.namespace;
let secret_api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
let mut status = match s3in.clone().status {
None => {
info!("Status is not yet set, initializing the object");
return init_object(s3in, s3_api).await;
}
Some(status) => status,
};
let secret = match get_secret(secret_api.clone(), s3in.clone()).await {
Ok(secret) => secret,
Err(err) => {
error!("{}", err);
ctx.recorder
.publish(
&Event {
type_: kube::runtime::events::EventType::Warning,
reason: "S3InstanceReconciliation".to_string(),
note: Some("Secret wasn't found".to_string()),
action: "SecretLookUp".to_string(),
secondary: None,
},
&s3in.clone().object_ref(&()),
)
.await
.unwrap();
return Err(S3InstanceError::KubeError(err));
}
};
if s3in.metadata.deletion_timestamp.is_some() {
info!("Object is marked for deletion");
if let Some(mut finalizers) = s3in.clone().metadata.finalizers {
if finalizers.contains(&FIN_SECRET_LABEL.to_string()) {
match unlabel_secret(ctx.clone(), s3in.clone(), secret).await {
Ok(_) => {
if let Some(index) = finalizers
.iter()
.position(|x| *x == FIN_SECRET_LABEL.to_string())
{
finalizers.remove(index);
};
}
Err(err) => {
error!("{}", err);
return Err(S3InstanceError::KubeError(err));
}
};
}
s3in.metadata.finalizers = Some(finalizers);
};
match s3_api
.replace(&s3in.name_any(), &PostParams::default(), &s3in)
.await
{
Ok(_) => {
return Ok(Action::await_change());
}
Err(err) => {
error!("{}", err);
return Err(S3InstanceError::KubeError(err));
}
}
}
if is_condition_true(status.clone().conditions, TYPE_SECRET_LABELED) {
let mut current_finalizers = match s3in.clone().metadata.finalizers {
Some(finalizers) => finalizers,
None => vec![],
};
if !current_finalizers.contains(&FIN_SECRET_LABEL.to_string()) {
info!("Adding a finalizer");
current_finalizers.push(FIN_SECRET_LABEL.to_string());
s3in.metadata.finalizers = Some(current_finalizers);
match s3_api
.replace(&s3in.name_any(), &PostParams::default(), &s3in)
.await
{
Ok(_) => {
return Ok(Action::await_change());
}
Err(err) => {
error!("{}", err);
return Err(S3InstanceError::KubeError(err));
}
}
}
}
if is_condition_unknown(status.clone().conditions, TYPE_SECRET_LABELED) {
if is_secret_labeled(secret.clone()) {
if is_secret_labeled_by_another_obj(s3in.clone(), secret.clone()) {
error!("{}", S3InstanceError::SecretIsAlreadyLabeled);
return Err(S3InstanceError::SecretIsAlreadyLabeled);
}
if is_secret_labeled_by_obj(s3in.clone(), secret.clone()) {
info!("Secret is already labeled");
status.conditions = set_condition(
status.clone().conditions,
obj.metadata.clone(),
TYPE_SECRET_LABELED,
"True".to_string(),
"Reconciled".to_string(),
"Secret is already labeled".to_string(),
);
}
} else {
info!("Labeling the secret");
if let Err(err) = label_secret(ctx.clone(), s3in.clone(), secret).await {
error!("{}", err);
return Err(S3InstanceError::KubeError(err));
};
status.conditions = set_condition(
status.clone().conditions,
obj.metadata.clone(),
TYPE_SECRET_LABELED,
"True".to_string(),
"Reconciled".to_string(),
"Secret is labeled".to_string(),
);
};
s3in.status = Some(status);
match s3_api
.replace_status(&s3in.name_any(), &PostParams::default(), &s3in)
.await
{
Ok(_) => {
return Ok(Action::await_change());
}
Err(err) => {
error!("{}", err);
return Err(S3InstanceError::KubeError(err));
}
}
};
info!("Checking if the secret is labeled by another object");
if !is_secret_labeled_by_obj(s3in.clone(), secret.clone()) {
status.conditions = set_condition(
status.conditions,
s3in.clone().metadata,
TYPE_SECRET_LABELED,
"Unknown".to_string(),
"S3InstanceReconciliation".to_string(),
"Secret is not labeled".to_string(),
);
s3in.status = Some(status);
match s3_api
.replace_status(&s3in.clone().name_any(), &PostParams::default(), &s3in)
.await
{
Ok(_) => {
return Ok(Action::await_change());
}
Err(err) => {
error!("{}", err);
return Err(S3InstanceError::KubeError(err));
}
}
};
info!("Getting data from the secret");
// Getting data from the secret to initialize the clinet
let data = match secret.data {
Some(data) => data,
None => {
let err = anyhow::Error::msg("empty data");
error!("{}", err);
return Err(S3InstanceError::InvalidSecret(err));
}
};
let access_key = match data.get(ACCESS_KEY) {
Some(access_key) => String::from_utf8(access_key.0.clone()).unwrap(),
None => {
let err = anyhow::Error::msg("empty access key");
error!("{}", err);
return Err(S3InstanceError::InvalidSecret(err));
}
};
let secret_key = match data.get(SECRET_KEY) {
Some(secret_key) => String::from_utf8(secret_key.0.clone()).unwrap(),
None => {
let err = anyhow::Error::msg("empty secret key");
error!("{}", err);
return Err(S3InstanceError::InvalidSecret(err));
}
};
info!("Creating an s3 client");
let s3_client = S3Api::new(
access_key,
secret_key,
s3in.clone().spec.endpoint.to_string(),
s3in.clone().spec.region.to_string(),
s3in.clone().spec.force_path_style,
)
.await;
info!("Getting buckets");
let buckets = match s3_client.list_buckets().await {
Ok(buckets) => buckets,
Err(err) => {
error!("{}", err);
return Err(S3InstanceError::IllegalS3Instance);
}
};
status.ready = true;
status.buckets = Some(buckets.clone());
status.total_buckets = Some(buckets.len());
s3in.status = Some(status);
info!("Updating status of the s3in resource");
match s3_api
.replace_status(&s3in.name_any(), &PostParams::default(), &s3in)
.await
{
Ok(_) => {
return Ok(Action::requeue(Duration::from_secs(120)));
}
Err(err) => {
error!("{}", err);
return Err(S3InstanceError::IllegalS3Instance);
}
};
}
// Bootstrap the object by adding a default status to it
async fn init_object(mut obj: S3Instance, api: Api<S3Instance>) -> Result<Action, S3InstanceError> {
let mut conditions = set_condition(
vec![],
obj.metadata.clone(),
TYPE_CONNECTED,
"Unknown".to_string(),
"Reconciling".to_string(),
"Reconciliation started".to_string(),
);
conditions = set_condition(
conditions,
obj.metadata.clone(),
TYPE_SECRET_LABELED,
"Unknown".to_string(),
"Reconciling".to_string(),
"Reconciliation started".to_string(),
);
let ready = false;
let buckets = None;
let total_buckets = None;
obj.status = Some(S3InstanceStatus {
ready,
conditions,
buckets,
total_buckets,
});
match api
.replace_status(obj.clone().name_any().as_str(), &Default::default(), &obj)
.await
{
Ok(_) => Ok(Action::await_change()),
Err(err) => {
error!("{}", err);
Err(S3InstanceError::KubeError(err))
}
}
}
// Get the secret with credentials
pub(crate) async fn get_secret(api: Api<Secret>, obj: S3Instance) -> Result<Secret, kube::Error> {
let secret = match api.get(&obj.spec.credentials_secret.name).await {
Ok(secret) => secret,
Err(err) => {
return Err(err);
}
};
Ok(secret)
}
async fn unlabel_secret(
ctx: Arc<Context>,
obj: S3Instance,
mut secret: Secret,
) -> Result<(), kube::Error> {
let secret_ns = obj.clone().spec.credentials_secret.namespace;
let api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
if let Some(mut labels) = secret.clone().metadata.labels {
labels.remove(&SECRET_LABEL.to_string());
secret.metadata.labels = Some(labels);
api.replace(&secret.name_any(), &PostParams::default(), &secret)
.await?;
}
Ok(())
}
async fn label_secret(
ctx: Arc<Context>,
obj: S3Instance,
mut secret: Secret,
) -> Result<Secret, kube::Error> {
let secret_ns = obj.clone().spec.credentials_secret.namespace;
let api: Api<Secret> = Api::namespaced(ctx.client.clone(), &secret_ns);
secret
.clone()
.metadata
.labels
.get_or_insert_with(BTreeMap::new)
.insert(SECRET_LABEL.to_string(), obj.name_any());
let mut labels = match &secret.clone().metadata.labels {
Some(labels) => labels.clone(),
None => {
let map: BTreeMap<String, String> = BTreeMap::new();
map
}
};
labels.insert(SECRET_LABEL.to_string(), obj.name_any());
secret.metadata.labels = Some(labels);
api.replace(&secret.name_any(), &PostParams::default(), &secret)
.await?;
let secret = match api.get(&obj.spec.credentials_secret.name).await {
Ok(secret) => secret,
Err(err) => {
return Err(err);
}
};
Ok(secret)
}
// Checks whether a secret ia already labeled by the operator
fn is_secret_labeled(secret: Secret) -> bool {
match secret.metadata.labels {
Some(labels) => labels.get_key_value(SECRET_LABEL).is_some(),
None => false,
}
}
// Checks whether a secret is already labeled by another object
fn is_secret_labeled_by_another_obj(obj: S3Instance, secret: Secret) -> bool {
match secret.metadata.labels {
Some(labels) => labels
.get(SECRET_LABEL)
.is_some_and(|label| label != &obj.name_any()),
None => false,
}
}
// Checks whether a secret is already labeled by this object
fn is_secret_labeled_by_obj(obj: S3Instance, secret: Secret) -> bool {
match secret.metadata.labels {
Some(labels) => labels
.get(SECRET_LABEL)
.is_some_and(|label| label == &obj.name_any()),
None => false,
}
}
pub(crate) fn error_policy(_: Arc<S3Instance>, _: &S3InstanceError, _: Arc<Context>) -> Action {
Action::requeue(Duration::from_secs(5 * 60))
}
#[instrument(skip(client), fields(trace_id))]
pub async fn run(client: Client) {
let s3instances = Api::<S3Instance>::all(client.clone());
if let Err(err) = s3instances.list(&ListParams::default().limit(1)).await {
error!("{}", err);
std::process::exit(1);
}
let recorder = Recorder::new(client.clone(), "s3instance-controller".into());
let context = Context { client, recorder };
Controller::new(s3instances, Config::default().any_semantic())
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(context))
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await;
}
// Context for our reconciler
#[derive(Clone)]
pub(crate) struct Context {
/// Kubernetes client
pub client: Client,
/// Event recorder
pub recorder: Recorder,
}
#[derive(Error, Debug)]
pub enum S3InstanceError {
#[error("SerializationError: {0}")]
SerializationError(#[source] serde_json::Error),
#[error("Kube Error: {0}")]
KubeError(#[source] kube::Error),
#[error("Finalizer Error: {0}")]
// NB: awkward type because finalizer::Error embeds the reconciler error (which is this)
// so boxing this error to break cycles
FinalizerError(#[source] Box<kube::runtime::finalizer::Error<S3InstanceError>>),
#[error("IllegalS3Instance")]
IllegalS3Instance,
#[error("SecretIsAlreadyLabeled")]
SecretIsAlreadyLabeled,
#[error("Invalid Secret: {0}")]
InvalidSecret(#[source] anyhow::Error),
}
pub type S3InstanceResult<T, E = S3InstanceError> = std::result::Result<T, E>;

14
operator/src/crdgen.rs Normal file
View File

@@ -0,0 +1,14 @@
use api::api::v1beta1::s3_bucket_user::S3BucketUser;
use api::api::v1beta1::{s3_bucket::S3Bucket, s3_instance::S3Instance};
use kube::CustomResourceExt;
fn main() {
println!(
"---\n{}",
serde_yaml::to_string(&S3Instance::crd()).unwrap()
);
println!("---\n{}", serde_yaml::to_string(&S3Bucket::crd()).unwrap());
println!(
"---\n{}",
serde_yaml::to_string(&S3BucketUser::crd()).unwrap()
);
}

1
operator/src/lib.rs Normal file
View File

@@ -0,0 +1 @@
pub mod api;

View File

@@ -0,0 +1,29 @@
use async_trait::async_trait;
use crate::providers::rustfs::RustFS;
pub(crate) mod rustfs;
pub(crate) enum SupportedProvider {
RustFS(RustFS),
}
pub(crate) trait ProviderAPI {
fn create_user(&self, access_key: String, secret_key: String) -> Result<(), anyhow::Error>;
fn update_user(&self) -> Result<(), anyhow::Error>;
fn delete_user(&self) -> Result<(), anyhow::Error>;
}
impl ProviderAPI for SupportedProvider {
fn create_user(&self, access_key: String, secret_key: String) -> Result<(), anyhow::Error> {
match self {
SupportedProvider::RustFS(rust_fs) => rust_fs.create_user(access_key, secret_key),
}
}
fn update_user(&self) -> Result<(), anyhow::Error> {
todo!()
}
fn delete_user(&self) -> Result<(), anyhow::Error> {
todo!()
}
}

View File

@@ -0,0 +1,73 @@
use std::process::Command;
use super::ProviderAPI;
use async_trait::async_trait;
use reqwest::Client;
use tracing::info;
pub(crate) struct RustFS {
username: String,
password: String,
endpoint: String,
region: String,
}
impl RustFS {
pub(crate) fn new(
username: String,
password: String,
endpoint: String,
region: String,
) -> Self {
Self {
username,
region,
password,
endpoint,
}
}
}
impl ProviderAPI for RustFS {
fn create_user(&self, access_key: String, secret_key: String) -> Result<(), anyhow::Error> {
info!("Preparing an alias");
let name = format!("{}-alias", access_key.clone());
let output = Command::new("rc")
.args([
"alias",
"set",
&name,
self.endpoint.as_str(),
self.username.as_str(),
self.password.as_str(),
])
.output()?;
info!("{:?}", output);
info!("Creating a user");
let output = Command::new("rc")
.args([
"admin",
"user",
"add",
&name,
access_key.as_str(),
secret_key.as_str(),
])
.output()?;
info!("{:?}", output);
info!("Removing the alias");
let output = Command::new("rc")
.args(["alias", "remove", &name])
.output()?;
info!("{:?}", output);
Ok(())
}
fn update_user(&self) -> Result<(), anyhow::Error> {
todo!()
}
fn delete_user(&self) -> Result<(), anyhow::Error> {
todo!()
}
}

1
operator/src/s3/dummy.rs Normal file
View File

@@ -0,0 +1 @@

12
operator/src/s3/mod.rs Normal file
View File

@@ -0,0 +1,12 @@
use anyhow::Error;
pub(crate) mod dummy;
pub(crate) mod s3api;
pub(crate) trait S3Client {
async fn list_buckets(self) -> Result<Vec<String>, Error>;
async fn create_bucket(self, bucket_name: String) -> Result<(), Error>;
async fn delete_bucket(self, bucket_name: String) -> Result<(), Error>;
async fn count_objects(self, bucket_name: String) -> Result<u64, Error>;
async fn count_size(self, bucket_name: String) -> Result<u64, Error>;
}

168
operator/src/s3/s3api.rs Normal file
View File

@@ -0,0 +1,168 @@
use aws_config::{BehaviorVersion, Region};
use aws_credential_types::Credentials;
use aws_sdk_s3::config::Builder;
use aws_sdk_s3::types::{Delete, ObjectIdentifier};
use aws_sdk_s3::{Client, config::SharedCredentialsProvider};
use crate::s3::S3Client;
#[derive(Clone)]
pub(crate) struct S3Api {
client: aws_sdk_s3::Client,
}
impl S3Api {
pub(crate) async fn new(
access_key: String,
secret_key: String,
endpoint: String,
region: String,
force_path_style: bool,
) -> Self {
let creds = Credentials::new(access_key, secret_key, None, None, "static");
let config = aws_config::defaults(BehaviorVersion::latest())
.credentials_provider(SharedCredentialsProvider::new(creds))
.region(Region::new(region))
.endpoint_url(endpoint)
.load()
.await;
let conf = Builder::from(&config)
.force_path_style(force_path_style)
.build();
let client = Client::from_conf(conf);
Self { client }
}
}
impl S3Client for S3Api {
async fn list_buckets(self) -> Result<Vec<String>, anyhow::Error> {
let mut buckets = self.client.list_buckets().into_paginator().send();
let mut result: Vec<String> = vec![];
match buckets.next().await {
Some(output) => {
match output {
Ok(buckets_res) => buckets_res.buckets().iter().for_each(|bucket| {
if let Some(name) = bucket.name() {
result.push(name.to_string());
}
}),
Err(err) => {
return Err(err.into());
}
};
}
None => {
return Ok(result);
}
};
Ok(result)
}
async fn create_bucket(self, bucket_name: String) -> Result<(), anyhow::Error> {
match self.client.create_bucket().bucket(bucket_name).send().await {
Ok(_) => Ok(()),
Err(err) => Err(err.into()),
}
}
async fn count_objects(self, bucket_name: String) -> Result<u64, anyhow::Error> {
let mut total_count = 0u64;
let mut continuation_token = None;
loop {
let resp = self
.client
.list_objects_v2()
.bucket(bucket_name.clone())
.set_continuation_token(continuation_token.clone())
.send()
.await?;
if let Some(contents) = resp.contents {
for _ in contents {
total_count += 1;
}
}
if resp.is_truncated.unwrap() {
continuation_token = resp.next_continuation_token;
} else {
break;
}
}
Ok(total_count)
}
async fn count_size(self, bucket_name: String) -> Result<u64, anyhow::Error> {
let mut total_size = 0u64;
let mut continuation_token = None;
loop {
let resp = self
.client
.list_objects_v2()
.bucket(bucket_name.clone())
.set_continuation_token(continuation_token.clone())
.send()
.await?;
if let Some(contents) = resp.contents {
for obj in contents {
total_size += obj.size.unwrap() as u64;
}
}
if resp.is_truncated.unwrap() {
continuation_token = resp.next_continuation_token;
} else {
break;
}
}
Ok(total_size)
}
async fn delete_bucket(self, bucket_name: String) -> Result<(), anyhow::Error> {
let mut continuation_token = None;
loop {
// List objects in the bucket
let resp = self
.client
.clone()
.list_objects_v2()
.bucket(bucket_name.clone())
.set_continuation_token(continuation_token.clone())
.send()
.await?;
let objects: Vec<ObjectIdentifier> = resp
.contents
.unwrap_or_default()
.into_iter()
.map(|obj| ObjectIdentifier::builder().key(obj.key.unwrap()).build())
.collect::<Result<Vec<ObjectIdentifier>, _>>()?;
if !objects.is_empty() {
// Delete objects in batch
self.client
.clone()
.delete_objects()
.bucket(bucket_name.clone())
.delete(Delete::builder().set_objects(Some(objects)).build()?)
.send()
.await?;
}
if resp.is_truncated.unwrap() {
continuation_token = resp.next_continuation_token;
} else {
break;
}
}
self.client
.clone()
.delete_bucket()
.bucket(bucket_name.clone())
.send()
.await?;
Ok(())
}
}