From 9f5d105f5457104f9624cff08fa3befeddbb0bc0 Mon Sep 17 00:00:00 2001 From: Nikolai Rodionov Date: Wed, 11 Mar 2026 13:45:06 +0100 Subject: [PATCH] WIP: Adding first controller Signed-off-by: Nikolai Rodionov --- operator/manifests/s3_bucket.yaml | 10 + operator/manifests/s3_instance.yaml | 5 +- operator/src/api/v1beta1/mod.rs | 1 + operator/src/api/v1beta1/s3_bucket.rs | 49 +++ operator/src/api/v1beta1/s3_instance.rs | 6 +- operator/src/controller.rs | 22 +- operator/src/controllers/mod.rs | 1 + operator/src/controllers/s3_bucket.rs | 422 ++++++++++++++++++++++++ operator/src/controllers/s3_instance.rs | 108 ++++-- operator/src/crdgen.rs | 3 +- operator/src/s3/mod.rs | 1 + operator/src/s3/s3.rs | 12 +- 12 files changed, 587 insertions(+), 53 deletions(-) create mode 100644 operator/manifests/s3_bucket.yaml create mode 100644 operator/src/api/v1beta1/s3_bucket.rs create mode 100644 operator/src/controllers/s3_bucket.rs diff --git a/operator/manifests/s3_bucket.yaml b/operator/manifests/s3_bucket.yaml new file mode 100644 index 0000000..e688b6a --- /dev/null +++ b/operator/manifests/s3_bucket.yaml @@ -0,0 +1,10 @@ +--- +apiVersion: s3.badhouseplants.net/v1beta1 +kind: S3Bucket +metadata: + name: test + namespace: default +spec: + instance: test + cleanup: false + ownConfigmap: false diff --git a/operator/manifests/s3_instance.yaml b/operator/manifests/s3_instance.yaml index c8ae2a1..1dc7e74 100644 --- a/operator/manifests/s3_instance.yaml +++ b/operator/manifests/s3_instance.yaml @@ -13,8 +13,9 @@ kind: S3Instance metadata: name: test spec: - endpoint: rustfs.badhouseplants.net:443 - region: us-east1 + endpoint: https://rustfs.badhouseplants.net + forcePathStyle: true + region: us-east-1 credentialsSecret: namespace: default name: test diff --git a/operator/src/api/v1beta1/mod.rs b/operator/src/api/v1beta1/mod.rs index bc18982..0a4836d 100644 --- a/operator/src/api/v1beta1/mod.rs +++ b/operator/src/api/v1beta1/mod.rs @@ -1 +1,2 @@ +pub mod s3_bucket; pub mod s3_instance; diff --git a/operator/src/api/v1beta1/s3_bucket.rs b/operator/src/api/v1beta1/s3_bucket.rs new file mode 100644 index 0000000..b3b38af --- /dev/null +++ b/operator/src/api/v1beta1/s3_bucket.rs @@ -0,0 +1,49 @@ +use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition; +use k8s_openapi::serde::{Deserialize, Serialize}; +use kube::CustomResource; +use kube::{self}; +use schemars::JsonSchema; + +#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[kube( + kind = "S3Bucket", + group = "s3.badhouseplants.net", + version = "v1beta1", + shortname = "bucket", + doc = "Manage buckets on the s3 instance", + namespaced, + status = "S3BucketStatus", + printcolumn = r#"{"name":"Instance","type":"string","description":"On which instance this bucket is created","jsonPath":".spec.instance"}"#, + printcolumn = r#"{"name":"Region","type":"string","description":"The region of the bucket","jsonPath":".status.region"}"#, + printcolumn = r#"{"name":"Total Objects","type":"number","description":"How many objects are there in the bucket","jsonPath":".status.total_objects"}"#, + printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"# +)] +#[serde(rename_all = "camelCase")] +pub struct S3BucketSpec { + /// On which instance this bucket should be created + pub instance: String, + /// Should perform a cleanup on delete? + /// It will remove all objects from the bucket + #[serde(default)] + pub cleanup: bool, + /// Should set the owner reference on the CM + #[serde(default)] + pub own_configmap: bool, +} + +/// The status object of `DbInstance` +#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)] +pub struct S3BucketStatus { + /// Is this bucket ready. + #[serde(default)] + pub ready: bool, + pub conditions: Vec, + #[serde(default)] + pub size: Option, + #[serde(default)] + pub objects_buckets: Option, + #[serde(default)] + pub endpoint: Option, + #[serde(default)] + pub region: Option, +} diff --git a/operator/src/api/v1beta1/s3_instance.rs b/operator/src/api/v1beta1/s3_instance.rs index 97d33f9..25a8bc8 100644 --- a/operator/src/api/v1beta1/s3_instance.rs +++ b/operator/src/api/v1beta1/s3_instance.rs @@ -14,9 +14,9 @@ use schemars::JsonSchema; status = "S3InstanceStatus", printcolumn = r#"{"name":"Endpoint","type":"string","description":"The URL of the instance","jsonPath":".spec.endpoint"}"#, printcolumn = r#"{"name":"Region","type":"string","description":"The region of the instance","jsonPath":".spec.region"}"#, - printcolumn = r#"{"name":"Force Path Style","type":"boolean","description":"Is forcing path style","jsonPath":".spec.forcePathStyle"}"#, - printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"#, - printcolumn = r#"{"name":"Total Buckets","type":"number","description":"How many buckets are there on the instance","jsonPath":".status.total_buckets"}"# + printcolumn = r#"{"name":"Path Style","type":"boolean","description":"Is forcing path style","jsonPath":".spec.forcePathStyle"}"#, + printcolumn = r#"{"name":"Total Buckets","type":"number","description":"How many buckets are there on the instance","jsonPath":".status.total_buckets"}"#, + printcolumn = r#"{"name":"Status","type":"boolean","description":"Is the S3Instance ready","jsonPath":".status.ready"}"# )] #[serde(rename_all = "camelCase")] pub struct S3InstanceSpec { diff --git a/operator/src/controller.rs b/operator/src/controller.rs index d8cdfad..8270728 100644 --- a/operator/src/controller.rs +++ b/operator/src/controller.rs @@ -1,19 +1,12 @@ mod conditions; mod controllers; mod s3; -use std::sync::Arc; -use self::controllers::s3_instance::{State, error_policy, reconcile, run}; -use actix_web::web::Data; +use crate::controllers::{s3_bucket, s3_instance}; + use actix_web::{App, HttpRequest, HttpResponse, HttpServer, Responder, get, middleware}; -use api::api::v1beta1::s3_instance::S3Instance; use clap::Parser; -use futures::StreamExt; -use kube::api::ListParams; -use kube::runtime::Controller; -use kube::runtime::events::{Recorder, Reporter}; -use kube::runtime::watcher::Config; -use kube::{Api, Client}; +use kube::Client; use tracing_subscriber::EnvFilter; /// Simple program to greet a person @@ -42,18 +35,15 @@ async fn health(_: HttpRequest) -> impl Responder { #[tokio::main] async fn main() -> anyhow::Result<()> { - // - // Initiatilize Kubernetes controller state - // tracing_subscriber::fmt() .json() .with_env_filter(EnvFilter::from_default_env()) .init(); - let state = State::default(); let client = Client::try_default() .await .expect("failed to create kube Client"); - let dbin_controller = run(client, state); + let s3in_controller = s3_instance::run(client.clone()); + let s3bucket_controller = s3_bucket::run(client.clone()); // Start web server let server = HttpServer::new(move || { App::new() @@ -64,6 +54,6 @@ async fn main() -> anyhow::Result<()> { .shutdown_timeout(5); // Both runtimes implements graceful shutdown, so poll until both are done - tokio::join!(dbin_controller, server.run()).1?; + tokio::join!(s3in_controller, s3bucket_controller, server.run()).2?; Ok(()) } diff --git a/operator/src/controllers/mod.rs b/operator/src/controllers/mod.rs index 3941471..910ac74 100644 --- a/operator/src/controllers/mod.rs +++ b/operator/src/controllers/mod.rs @@ -1 +1,2 @@ +pub(crate) mod s3_bucket; pub(crate) mod s3_instance; diff --git a/operator/src/controllers/s3_bucket.rs b/operator/src/controllers/s3_bucket.rs new file mode 100644 index 0000000..dada8cd --- /dev/null +++ b/operator/src/controllers/s3_bucket.rs @@ -0,0 +1,422 @@ +use crate::conditions::set_condition; +use crate::controllers::s3_instance; +use crate::s3::S3Client; +use crate::s3::s3::S3Api; +use api::api::v1beta1::s3_bucket::{S3Bucket, S3BucketStatus}; +use api::api::v1beta1::s3_instance::S3Instance; +use futures::StreamExt; +use k8s_openapi::api::core::v1::{ConfigMap, Secret}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; +use kube::api::{ListParams, ObjectMeta, PostParams}; +use kube::runtime::Controller; +use kube::runtime::controller::Action; +use kube::runtime::events::Recorder; +use kube::runtime::watcher::Config; +use kube::{Api, Client, Error, Resource, ResourceExt}; +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::Duration; +use thiserror::Error; +use tracing::*; + +const TYPE_INSTANCE_CONNECTED: &str = "InstanceConnected"; +const TYPE_CONFIGMAP_READY: &str = "ConfigMapReady"; +const TYPE_BUCKET_READY: &str = "BucketReady"; +const FIN_CLEANUP: &str = "s3.badhouseplants.net/bucket-cleanup"; +const CONFIGMAP_LABEL: &str = "s3.badhouseplants.net/s3-bucket"; + +const AWS_REGION: &str = "AWS_REGION"; +const AWS_ENDPOINT_URL: &str = "AWS_ENDPOINT_URL"; + +#[instrument(skip(ctx, obj), fields(trace_id))] +pub(crate) async fn reconcile(obj: Arc, ctx: Arc) -> S3BucketResult { + info!("Staring reconciling"); + let s3bucket_api: Api = Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap()); + let cm_api: Api = Api::namespaced(ctx.client.clone(), &obj.namespace().unwrap()); + let s3in_api: Api = Api::all(ctx.client.clone()); + + info!("Getting the S3Bucket resource"); + let mut s3bucket = match s3bucket_api.get(&obj.name_any()).await { + Ok(s3bucket) => s3bucket, + Err(Error::Api(ae)) if ae.code == 404 => { + info!("Object is not found, probably removed"); + return Ok(Action::await_change()); + } + Err(err) => { + error!("{}", err); + return Err(S3BucketError::KubeError(err)); + } + }; + + // On the first reconciliation status is None + // it needs to be initialized + let mut status = match s3bucket.clone().status { + None => { + info!("Status is not yet set, initializing the object"); + return init_object(s3bucket, s3bucket_api).await; + } + Some(status) => status, + }; + + let configmap_name = format!("{}-bucket-info", s3bucket.name_any()); + + info!("Getting the configmap"); + // Get the cm, if it's already there, we need to validate, or create an empty one + let mut configmap = match get_configmap(cm_api.clone(), &configmap_name).await { + Ok(configmap) => configmap, + Err(Error::Api(ae)) if ae.code == 404 => { + info!("ConfigMap is not found, creating a new one"); + let cm = ConfigMap{ + metadata: ObjectMeta { + name: Some(configmap_name), + namespace: Some(s3bucket.clone().namespace().unwrap()), + ..Default::default() + }, + ..Default::default() + }; + match create_configmap(cm_api.clone(), cm).await { + Ok(cm) => cm, + Err(err) => { + error!("{}", err); + return Err(S3BucketError::KubeError(err)); + }, + } + } + Err(err) => { + error!("{}", err); + return Err(S3BucketError::KubeError(err)); + }, + }; + + info!("Labeling the configmap"); + configmap = match label_configmap(cm_api.clone(), &s3bucket.name_any(), configmap).await { + Ok(configmap) => configmap, + Err(err) => { + error!("{}", err); + return Err(S3BucketError::KubeError(err)); + }, + }; + + info!("Setting owner references to the configmap"); + if s3bucket.spec.own_configmap { + configmap = match own_configmap(cm_api.clone(), s3bucket.clone(), configmap).await { + Ok(configmap) => configmap, + Err(err) => { + error!("{}", err); + return Err(S3BucketError::KubeError(err)); + }, + }; + }; + + info!("Getting the S3Intsance"); + let s3in = match s3in_api.get(&s3bucket.spec.instance).await { + Ok(s3in) => s3in, + Err(err) => { + error!("{}", err); + return Err(S3BucketError::KubeError(err)); + } + }; + + info!("Updating the ConfigMap"); + if let Err(err) = ensure_data_configmap(cm_api.clone(), s3in.clone(), configmap.clone()).await { + error!("{}", err); + return Err(S3BucketError::KubeError(err)); + }; + + info!("Getting the s3instance secret"); + let secret_ns = s3in.clone().spec.credentials_secret.namespace; + let secret_api: Api = Api::namespaced(ctx.client.clone(), &secret_ns); + + let secret = match s3_instance::get_secret(secret_api.clone(), s3in.clone()).await { + Ok(secret) => secret, + Err(err) => { + error!("{}", err); + return Err(S3BucketError::KubeError(err)); + } + }; + + info!("Getting data from the secret"); + // Getting data from the secret to initialize the clinet + let data = match secret.data { + Some(data) => data, + None => { + let err = anyhow::Error::msg("empty data"); + error!("{}", err); + return Err(S3BucketError::InvalidSecret(err)); + } + }; + + let access_key = match data.get(s3_instance::ACCESS_KEY) { + Some(access_key) => String::from_utf8(access_key.0.clone()).unwrap(), + None => { + let err = anyhow::Error::msg("empty access key"); + error!("{}", err); + return Err(S3BucketError::InvalidSecret(err)); + } + }; + let secret_key = match data.get(s3_instance::SECRET_KEY) { + Some(secret_key) => String::from_utf8(secret_key.0.clone()).unwrap(), + None => { + let err = anyhow::Error::msg("empty secret key"); + error!("{}", err); + return Err(S3BucketError::InvalidSecret(err)); + } + }; + + info!("Creating an s3 client"); + let s3_client = S3Api::new( + access_key, + secret_key, + s3in.clone().spec.endpoint.to_string(), + s3in.clone().spec.region.to_string(), + s3in.clone().spec.force_path_style, + ) + .await; + + info!("Getting buckets"); + let buckets = match s3_client.clone().list_buckets().await { + Ok(buckets) => buckets, + Err(err) => { + error!("{}", err); + return Err(S3BucketError::IllegalS3Bucket); + } + }; + let bucket_name = format!("{}-{}", s3bucket.namespace().unwrap(), s3bucket.name_any()); + if buckets.contains(&bucket_name) { + info!("Bucket already exists"); + return Ok(Action::await_change()); + } + + if let Err(err) = s3_client.create_buckets(bucket_name).await { + error!("{}", err); + return Err(S3BucketError::IllegalS3Bucket); + } + + status.ready = true; + status.objects_buckets = None; + status.endpoint = Some(s3in.clone().spec.endpoint); + status.size = None; + status.region = Some(s3in.spec.region); + + s3bucket.status = Some(status); + + + info!("Updating status of the s3bucket resource"); + match s3bucket_api + .replace_status(&s3bucket.name_any(), &PostParams::default(), &s3bucket) + .await + { + Ok(_) => { + return Ok(Action::requeue(Duration::from_secs(120))); + } + Err(err) => { + error!("{}", err); + return Err(S3BucketError::KubeError(err)); + } + }; +} + +// Bootstrap the object by adding a default status to it +async fn init_object(mut obj: S3Bucket, api: Api) -> Result { + let mut conditions = set_condition( + vec![], + obj.metadata.clone(), + TYPE_INSTANCE_CONNECTED, + "Unknown".to_string(), + "Reconciling".to_string(), + "Reconciliation started".to_string(), + ); + conditions = set_condition( + conditions, + obj.metadata.clone(), + TYPE_BUCKET_READY, + "Unknown".to_string(), + "Reconciling".to_string(), + "Reconciliation started".to_string(), + ); + conditions = set_condition( + conditions, + obj.metadata.clone(), + TYPE_CONFIGMAP_READY, + "Unknown".to_string(), + "Reconciling".to_string(), + "Reconciliation started".to_string(), + ); + obj.status = Some(S3BucketStatus { + conditions, + ..S3BucketStatus::default() + }); + match api + .replace_status(obj.clone().name_any().as_str(), &Default::default(), &obj) + .await + { + Ok(_) => Ok(Action::await_change()), + Err(err) => { + error!("{}", err); + Err(S3BucketError::KubeError(err)) + } + } +} + +// Get the configmap with the bucket data +async fn get_configmap(api: Api, name: &str) -> Result { + info!("Getting a configmap: {}", name); + match api.get(name).await { + Ok(cm) => Ok(cm), + Err(err) => Err(err), + } +} + +// Create ConfigMap +async fn create_configmap(api: Api, cm: ConfigMap) -> Result { + match api.create(&PostParams::default(), &cm).await { + Ok(cm) => get_configmap(api, &cm.name_any()).await, + Err(err) => Err(err), + } +} + +async fn label_configmap( + api: Api, + s3bucket_name: &str, + mut cm: ConfigMap, +) -> Result { + let mut labels = match &cm.clone().metadata.labels { + Some(labels) => labels.clone(), + None => { + let map: BTreeMap = BTreeMap::new(); + map + } + }; + labels.insert(CONFIGMAP_LABEL.to_string(), s3bucket_name.to_string()); + cm.metadata.labels = Some(labels); + api.replace(&cm.name_any(), &PostParams::default(), &cm) + .await?; + + let cm = match api.get(&cm.name_any()).await { + Ok(cm) => cm, + Err(err) => { + return Err(err); + } + }; + Ok(cm) +} + +async fn own_configmap( + api: Api, + s3bucket: S3Bucket, + mut cm: ConfigMap, +) -> Result { + let mut owner_references = match &cm.clone().metadata.owner_references { + Some(owner_references) => owner_references.clone(), + None => { + let owner_references: Vec = vec![]; + owner_references + } + }; + + if owner_references.iter().find(|or| or.uid == s3bucket.uid().unwrap()).is_some() { + return Ok(cm); + } + + let new_owner_reference = OwnerReference{ + api_version: S3Bucket::api_version(&()).into(), + kind: S3Bucket::kind(&()).into(), + name: s3bucket.name_any(), + uid: s3bucket.uid().unwrap(), + ..Default::default() + }; + + owner_references.push(new_owner_reference); + cm.metadata.owner_references = Some(owner_references); + api.replace(&cm.name_any(), &PostParams::default(), &cm) + .await?; + + let cm = match api.get(&cm.name_any()).await { + Ok(cm) => cm, + Err(err) => { + return Err(err); + } + }; + Ok(cm) +} + +async fn ensure_data_configmap( + api: Api, + s3in: S3Instance, + mut cm: ConfigMap, +) -> Result { + let mut data = match &cm.clone().data { + Some(data) => data.clone(), + None => { + let map: BTreeMap = BTreeMap::new(); + map + } + }; + + data.insert(AWS_REGION.to_string(), s3in.spec.region); + data.insert(AWS_ENDPOINT_URL.to_string(), s3in.spec.endpoint); + + + cm.data = Some(data); + api.replace(&cm.name_any(), &PostParams::default(), &cm) + .await?; + + match api.get(&cm.name_any()).await { + Ok(cm) => Ok(cm), + Err(err) => Err(err), + } +} + +pub(crate) fn error_policy(_: Arc, _: &S3BucketError, _: Arc) -> Action { + Action::requeue(Duration::from_secs(5 * 60)) +} + +#[instrument(skip(client), fields(trace_id))] +pub async fn run(client: Client) { + let s3buckets = Api::::all(client.clone()); + if let Err(err) = s3buckets.list(&ListParams::default().limit(1)).await { + error!("{}", err); + std::process::exit(1); + } + let recorder = Recorder::new(client.clone(), "s3bucket-controller".into()); + let context = Context { client, recorder }; + Controller::new(s3buckets, Config::default().any_semantic()) + .shutdown_on_signal() + .run(reconcile, error_policy, Arc::new(context)) + .filter_map(|x| async move { std::result::Result::ok(x) }) + .for_each(|_| futures::future::ready(())) + .await; +} +// Context for our reconciler +#[derive(Clone)] +pub(crate) struct Context { + /// Kubernetes client + pub client: Client, + /// Event recorder + pub recorder: Recorder, +} + +#[derive(Error, Debug)] +pub enum S3BucketError { + #[error("SerializationError: {0}")] + SerializationError(#[source] serde_json::Error), + + #[error("Kube Error: {0}")] + KubeError(#[source] kube::Error), + + #[error("Finalizer Error: {0}")] + // NB: awkward type because finalizer::Error embeds the reconciler error (which is this) + // so boxing this error to break cycles + FinalizerError(#[source] Box>), + + #[error("IllegalS3Bucket")] + IllegalS3Bucket, + + #[error("SecretIsAlreadyLabeled")] + SecretIsAlreadyLabeled, + + #[error("Invalid Secret: {0}")] + InvalidSecret(#[source] anyhow::Error), +} + +pub type S3BucketResult = std::result::Result; diff --git a/operator/src/controllers/s3_instance.rs b/operator/src/controllers/s3_instance.rs index bee86ca..99003af 100644 --- a/operator/src/controllers/s3_instance.rs +++ b/operator/src/controllers/s3_instance.rs @@ -1,4 +1,4 @@ -use crate::conditions::{is_condition_unknown, set_condition}; +use crate::conditions::{is_condition_true, is_condition_unknown, set_condition}; use crate::s3::S3Client; use crate::s3::s3::S3Api; use api::api::v1beta1::s3_instance::{S3Instance, S3InstanceStatus}; @@ -18,13 +18,15 @@ use tracing::*; const TYPE_CONNECTED: &str = "Connected"; const TYPE_SECRET_LABELED: &str = "SecretLabeled"; +const FIN_SECRET_LABEL: &str = "s3.badhouseplants.net/s3-label"; const SECRET_LABEL: &str = "s3.badhouseplants.net/s3-instance"; -const ACCESS_KEY: &str = "ACCESS_KEY"; -const SECRET_KEY: &str = "SECRET_KEY"; +pub(crate) const ACCESS_KEY: &str = "ACCESS_KEY"; +pub(crate) const SECRET_KEY: &str = "SECRET_KEY"; #[instrument(skip(ctx, obj), fields(trace_id))] pub(crate) async fn reconcile(obj: Arc, ctx: Arc) -> S3InstanceResult { info!("Staring reconciling"); + info!("Getting the S3Instance resource"); let s3_api: Api = Api::all(ctx.client.clone()); let mut s3in = match s3_api.get(obj.name_any().as_str()).await { Ok(res) => res, @@ -38,10 +40,8 @@ pub(crate) async fn reconcile(obj: Arc, ctx: Arc) -> S3Inst } }; - if s3in.metadata.deletion_timestamp.is_some() { - info!("Object is marked for deletion"); - return Ok(Action::await_change()); - } + let secret_ns = s3in.clone().spec.credentials_secret.namespace; + let secret_api: Api = Api::namespaced(ctx.client.clone(), &secret_ns); let mut status = match s3in.clone().status { None => { @@ -51,7 +51,7 @@ pub(crate) async fn reconcile(obj: Arc, ctx: Arc) -> S3Inst Some(status) => status, }; - let secret = match get_secret(ctx.clone(), s3in.clone()).await { + let secret = match get_secret(secret_api.clone(), s3in.clone()).await { Ok(secret) => secret, Err(err) => { error!("{}", err); @@ -71,7 +71,58 @@ pub(crate) async fn reconcile(obj: Arc, ctx: Arc) -> S3Inst return Err(S3InstanceError::KubeError(err)); } }; + + if s3in.metadata.deletion_timestamp.is_some() { + info!("Object is marked for deletion"); + if let Some(mut finalizers) = s3in.clone().metadata.finalizers { + if finalizers.contains(&FIN_SECRET_LABEL.to_string()) { + match unlabel_secret(ctx.clone(), s3in.clone(), secret).await { + Ok(_) => { + if let Some(index) = finalizers.iter().position(|x| *x == FIN_SECRET_LABEL.to_string()) { + finalizers.remove(index); + }; + }, + Err(err) => { + error!("{}", err); + return Err(S3InstanceError::KubeError(err)); + }, + }; + } + s3in.metadata.finalizers = Some(finalizers); + }; + match s3_api.replace(&s3in.name_any(), &PostParams::default(), &s3in).await { + Ok(_) => { + return Ok(Action::await_change()); + }, + Err(err) => { + error!("{}", err); + return Err(S3InstanceError::KubeError(err)) + }, + } + } + if is_condition_true(status.clone().conditions, TYPE_SECRET_LABELED) { + let mut current_finalizers = match s3in.clone().metadata.finalizers { + Some(finalizers) => finalizers, + None => vec![], + }; + + if !current_finalizers.contains(&FIN_SECRET_LABEL.to_string()) { + info!("Adding a finalizer"); + current_finalizers.push(FIN_SECRET_LABEL.to_string()); + + s3in.metadata.finalizers = Some(current_finalizers); + match s3_api.replace(&s3in.name_any(), &PostParams::default(), &s3in).await { + Ok(_) => { + return Ok(Action::await_change()); + }, + Err(err) => { + error!("{}", err); + return Err(S3InstanceError::KubeError(err)) + }, + } + } + } if is_condition_unknown(status.clone().conditions, TYPE_SECRET_LABELED) { if is_secret_labeled(secret.clone()) { if is_secret_labeled_by_another_obj(s3in.clone(), secret.clone()) { @@ -88,7 +139,6 @@ pub(crate) async fn reconcile(obj: Arc, ctx: Arc) -> S3Inst "Reconciled".to_string(), "Secret is already labeled".to_string(), ); - s3in.status = Some(status.clone()); } } else { info!("Labeling the secret"); @@ -105,6 +155,7 @@ pub(crate) async fn reconcile(obj: Arc, ctx: Arc) -> S3Inst "Secret is labeled".to_string(), ); }; + s3in.status = Some(status); match s3_api .replace_status(&s3in.name_any(), &PostParams::default(), &s3in) .await { @@ -190,7 +241,6 @@ pub(crate) async fn reconcile(obj: Arc, ctx: Arc) -> S3Inst } }; - info!("{:?}", buckets); status.ready = true; status.buckets = Some(buckets.clone()); status.total_buckets = Some(buckets.len()); @@ -251,9 +301,7 @@ async fn init_object(mut obj: S3Instance, api: Api) -> Result, obj: S3Instance) -> Result { - let secret_ns = obj.clone().spec.credentials_secret.namespace; - let api: Api = Api::namespaced(ctx.client.clone(), &secret_ns); +pub(crate) async fn get_secret(api: Api, obj: S3Instance) -> Result { let secret = match api.get(&obj.spec.credentials_secret.name).await { Ok(secret) => secret, @@ -265,6 +313,21 @@ async fn get_secret(ctx: Arc, obj: S3Instance) -> Result, + obj: S3Instance, + mut secret: Secret, +) -> Result<(), kube::Error> { + let secret_ns = obj.clone().spec.credentials_secret.namespace; + let api: Api = Api::namespaced(ctx.client.clone(), &secret_ns); + if let Some(mut labels) = secret.clone().metadata.labels { + labels.remove(&SECRET_LABEL.to_string()); + secret.metadata.labels = Some(labels); + api.replace(&secret.name_any(), &PostParams::default(), &secret) + .await?; + } + Ok(()) +} async fn label_secret( ctx: Arc, obj: S3Instance, @@ -333,32 +396,23 @@ pub(crate) fn error_policy(_: Arc, _: &S3InstanceError, _: Arc::all(client.clone()); if let Err(err) = s3instances.list(&ListParams::default().limit(1)).await { error!("{}", err); std::process::exit(1); } + let recorder = Recorder::new(client.clone(), "s3instance-controller".into()); + let context = Context{ client, recorder }; Controller::new(s3instances, Config::default().any_semantic()) .shutdown_on_signal() - .run(reconcile, error_policy, state.to_context(client).await) + .run(reconcile, error_policy, Arc::new(context)) .filter_map(|x| async move { std::result::Result::ok(x) }) .for_each(|_| futures::future::ready(())) .await; } -/// State shared between the controller and the web server -#[derive(Clone, Default)] -pub(crate) struct State {} -impl State { - pub async fn to_context(&self, client: Client) -> Arc { - Arc::new(Context { - client: client.clone(), - recorder: Recorder::new(client, "s3instance-controller".into()), - }) - } -} // Context for our reconciler #[derive(Clone)] pub(crate) struct Context { diff --git a/operator/src/crdgen.rs b/operator/src/crdgen.rs index a628b75..6e8aadf 100644 --- a/operator/src/crdgen.rs +++ b/operator/src/crdgen.rs @@ -1,8 +1,9 @@ -use api::api::v1beta1::s3_instance::S3Instance; +use api::api::v1beta1::{s3_bucket::S3Bucket, s3_instance::S3Instance}; use kube::CustomResourceExt; fn main() { println!( "---\n{}", serde_yaml::to_string(&S3Instance::crd()).unwrap() ); + println!("---\n{}", serde_yaml::to_string(&S3Bucket::crd()).unwrap()); } diff --git a/operator/src/s3/mod.rs b/operator/src/s3/mod.rs index 42fb2e7..d9aa4ba 100644 --- a/operator/src/s3/mod.rs +++ b/operator/src/s3/mod.rs @@ -5,4 +5,5 @@ pub(crate) mod s3; pub(crate) trait S3Client { async fn list_buckets(self) -> Result, Error>; + async fn create_buckets(self, bucket_name: String ) -> Result<(), Error>; } diff --git a/operator/src/s3/s3.rs b/operator/src/s3/s3.rs index d6a8180..31e2be5 100644 --- a/operator/src/s3/s3.rs +++ b/operator/src/s3/s3.rs @@ -2,10 +2,10 @@ use aws_config::{BehaviorVersion, Region}; use aws_credential_types::Credentials; use aws_sdk_s3::config::Builder; use aws_sdk_s3::{Client, config::SharedCredentialsProvider}; -use tracing::*; use crate::s3::S3Client; +#[derive(Clone)] pub(crate) struct S3Api { client: aws_sdk_s3::Client, } @@ -18,8 +18,6 @@ impl S3Api { region: String, force_path_style: bool, ) -> Self { - info!(access_key); - info!(secret_key); let creds = Credentials::new(access_key, secret_key, None, None, "static"); let config = aws_config::defaults(BehaviorVersion::latest()) .credentials_provider(SharedCredentialsProvider::new(creds)) @@ -51,7 +49,6 @@ impl S3Client for S3Api { }) }, Err(err) => { - error!{"{}", err}; return Err(err.into()); }, }; @@ -62,4 +59,11 @@ impl S3Client for S3Api { }; Ok(result) } + + async fn create_buckets(self, bucket_name: String) -> Result<(), anyhow::Error> { + match self.client.create_bucket().bucket(bucket_name).send().await { + Ok(_) => Ok(()), + Err(err) => Err(err.into()), + } + } }