From 33c18ff340597e90342d68a6778081f79280eb1b Mon Sep 17 00:00:00 2001 From: Nikolai Rodionov Date: Sat, 31 May 2025 23:38:09 +0200 Subject: [PATCH] Trying something that will probably never happen Signed-off-by: Nikolai Rodionov --- src/api/v1alpha1/configsets_api.rs | 1 + src/controllers/configsets_controller.rs | 188 +++++++++++++++++++++-- src/helpers/manifests.rs | 10 ++ src/main.rs | 2 +- tests/manifests/example.yaml | 4 +- 5 files changed, 193 insertions(+), 12 deletions(-) diff --git a/src/api/v1alpha1/configsets_api.rs b/src/api/v1alpha1/configsets_api.rs index b414c13..94bd639 100644 --- a/src/api/v1alpha1/configsets_api.rs +++ b/src/api/v1alpha1/configsets_api.rs @@ -24,6 +24,7 @@ use thiserror::Error; version = "v1alpha1", namespaced )] + #[kube(status = "ConfigSetStatus", shortname = "confset")] pub struct ConfigSetSpec { pub targets: Vec, diff --git a/src/controllers/configsets_controller.rs b/src/controllers/configsets_controller.rs index db76274..5c94938 100644 --- a/src/controllers/configsets_controller.rs +++ b/src/controllers/configsets_controller.rs @@ -2,19 +2,23 @@ use crate::api::v1alpha1::configsets_api::{ ConfigSet, Input, InputWithName, TargetWithName, Templates, }; use core::fmt; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt, stream}; use handlebars::Handlebars; -use k8s_openapi::api::core::v1::{ConfigMap, Secret}; +use k8s_openapi::api::core::v1::{ConfigMap, Namespace, Secret}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; use k8s_openapi::{ByteString, NamespaceResourceScope}; -use kube::api::{ListParams, PostParams}; -use kube::core::{Object, ObjectMeta}; +use kube::api::{GroupVersionKind, ListParams, PostParams, WatchEvent}; +use kube::core::{DynamicResourceScope, Object, ObjectMeta}; use kube::error::ErrorResponse; use kube::runtime::controller::Action; use kube::runtime::finalizer::Event as Finalizer; -use kube::runtime::watcher::Config; use kube::runtime::{finalizer, Controller}; +use kube::runtime::{ + watcher::{self, Config}, + WatchStreamExt, +}; use kube::{Api, Client, CustomResource}; +use kube_client::api::WatchParams; use kube_client::core::DynamicObject; use kube_client::{Resource, ResourceExt}; use log::*; @@ -94,13 +98,18 @@ pub async fn setup() { let client = Client::try_default() .await .expect("failed to create kube Client"); - let docs = Api::::all(client.clone()); - if let Err(e) = docs.list(&ListParams::default().limit(1)).await { + let confsets = Api::::all(client.clone()); + if let Err(e) = confsets.list(&ListParams::default().limit(1)).await { error!("{}", e); std::process::exit(1); } - let ctx = Arc::new(Context { client }); - Controller::new(docs, Config::default().any_semantic()) + + tokio::spawn(setup_secret_watcher(client.clone())); + + let ctx = Arc::new(Context { + client: client.clone(), + }); + Controller::new(confsets, Config::default().any_semantic()) .shutdown_on_signal() .run(reconcile, error_policy, ctx) .filter_map(|x| async move { std::result::Result::ok(x) }) @@ -108,6 +117,107 @@ pub async fn setup() { .await; } +async fn setup_secret_watcher(client: Client) { + let namespace_api: Api = Api::all(client.clone()); + let stream = match namespace_api.watch(&WatchParams::default(), "0").await { + Ok(status) => { + let mut status_box = status.boxed(); + while let Some(status) = status_box.try_next().await.unwrap() { + match status { + WatchEvent::Added(ns) => { + info!("namespace is being watched {}", ns.name_any()); + let sec_api = get_secret_api(client.clone(), ns.name_any()); + let cm_api = get_configmap_api(client.clone(), ns.name_any()); + tokio::spawn( async move { + let stream = match sec_api.watch(&WatchParams::default(), "0").await { + Ok(status) => { + let mut status_box = status.boxed(); + while let Some(status) = status_box.try_next().await.unwrap() { + match status { + WatchEvent::Modified(secret) => { + if is_shu_object(secret.metadata.clone()) { + let config_set_name = secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap(); + info!( + "secret {} was updated, triggering configset reconciliation", + secret.metadata.clone().name.unwrap(), + ); + watcher_trigger_reconcile(secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), secret.metadata.namespace.unwrap()).await; + } + } + _ => {}, + } + } + } + Err(err) => { + error!("couldn't start secrets watcher"); + }, + }; + }); + tokio::spawn( async move { + let stream = match cm_api.watch(&WatchParams::default(), "0").await { + Ok(status) => { + let mut status_box = status.boxed(); + while let Some(status) = status_box.try_next().await.unwrap() { + match status { + WatchEvent::Modified(configmap) => { + if is_shu_object(configmap.metadata.clone()) { + let config_set_name = configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap(); + info!( + "configmap {} was updated, triggering configset reconciliation", + configmap.metadata.clone().name.unwrap(), + ); + watcher_trigger_reconcile(configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), configmap.metadata.namespace.unwrap()).await; + } + }, + _ => {}, + } + } + } + Err(err) => { + error!("couldn't start secrets watcher"); + }, + }; + }); + + } + _ => {}, +// WatchEvent::Deleted(_) => todo!(), +// WatchEvent::Bookmark(_) => todo!(), +// WatchEvent::Error(_) => todo!(), + } + }; + }, + Err(err) => { + error!("couldn't watch namespaces, watcher is not enabled"); + }, + }; +} + +fn is_shu_object(metadata: ObjectMeta) -> bool { + match metadata.annotations { + Some(annotations) => match annotations.get(WATCHED_BY_SHU) { + Some(_) => true, + None => false, + }, + None => false, + } +} + +async fn watcher_trigger_reconcile(config_set_name: String, namespace: String) { + let client = Client::try_default().await.expect("failed to create kube Client"); + let api: Api = Api::namespaced(client.clone(), &namespace); + let config_set = match api.get(&config_set_name).await { + Ok(res) => { + if let Err(err) = reconcile(Arc::new(res), Arc::new(Context {client })).await { + error!("an error occured during reconciliation: {}", err); + } + }, + Err(err) => { + error!("couldn't get configset: {}", err); + }, + }; +} + fn error_policy(doc: Arc, error: &Error, ctx: Arc) -> Action { Action::requeue(Duration::from_secs(5 * 60)) } @@ -122,8 +232,10 @@ fn get_configmap_api(client: Client, namespace: String) -> Api { async fn gather_inputs( client: Client, + config_set_name: String, namespace: String, inputs: Vec, + is_removed: bool, ) -> Result> { let mut result: HashMap = HashMap::new(); for i in inputs { @@ -148,8 +260,19 @@ async fn gather_inputs( )))) } }; + // Take dynamic resource identifiers: + + // Use the full resource info to create an Api with the ApiResource as its DynamicType + let api: Api = Api::namespaced(client.clone(), &namespace); + let a: Api = api.try_into()?; + + match is_removed { + true => unset_annotations(, i.from.key, config_set_name).await?, + false => set_annotations(api, i.from.key, config_set_name).await?, + }; value.to_string() } + Err(err) => { error!("{err}"); return Err(Error::KubeError(err)); @@ -187,6 +310,49 @@ async fn gather_inputs( Ok(result) } +async fn set_annotations(api: Api, object_name: String, config_set_name: String) -> Result<()> { + let object = match api.get_opt(&object_name).await { + Ok(object) => match object { + Some(object) => object, + None => return Ok(()), + }, + Err(err) => return Err(Error::KubeError(err)), + }; + + let mut annotations = match object.clone().metadata.annotations { + Some(annotations) => annotations, + None => BTreeMap::new(), + }; + + annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap(); + if let Err(err) = api.replace(&object_name, &PostParams::default(), &object).await { + return Err(Error::KubeError(err)); + } + Ok(()) +} + +async fn unset_annotations(api: Api, object_name: String, config_set_name: String) -> Result<()> { + let object = match api.get_opt(&object_name).await { + Ok(object) => match object { + Some(object) => object, + None => return Ok(()), + }, + Err(err) => return Err(Error::KubeError(err)), + }; + + let mut annotations = match object.clone().metadata.annotations { + Some(annotations) => annotations, + None => BTreeMap::new(), + }; + + annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap(); + if let Err(err) = api.replace(&object_name, &PostParams::default(), &object).await { + return Err(Error::KubeError(err)); + } + Ok(()) +} + + async fn gather_targets( client: Client, namespace: String, @@ -399,8 +565,10 @@ impl ConfigSet { let inputs: HashMap = gather_inputs( ctx.client.clone(), + self.name_any(), self.metadata.namespace.clone().unwrap(), self.spec.inputs.clone(), + false, ) .await?; @@ -470,8 +638,10 @@ impl ConfigSet { async fn cleanup(&self, ctx: Arc) -> Result { let inputs: HashMap = gather_inputs( ctx.client.clone(), + self.name_any(), self.metadata.namespace.clone().unwrap(), self.spec.inputs.clone(), + true, ) .await?; diff --git a/src/helpers/manifests.rs b/src/helpers/manifests.rs index d8644be..1c97953 100644 --- a/src/helpers/manifests.rs +++ b/src/helpers/manifests.rs @@ -79,6 +79,16 @@ fn prepare_cluster_role(namespace: String) -> ClusterRole { ], ..Default::default() }, + PolicyRule { + api_groups: Some(vec!["".to_string()]), + resources: Some(vec!["namespaces".to_string()]), + verbs: vec![ + "get".to_string(), + "list".to_string(), + "watch".to_string() + ], + ..Default::default() + }, ]; ClusterRole { diff --git a/src/main.rs b/src/main.rs index a8cf5d9..b547e14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,6 +52,6 @@ async fn main() -> anyhow::Result<()> { }; } } - + Ok(()) } diff --git a/tests/manifests/example.yaml b/tests/manifests/example.yaml index d347abe..bf04a94 100644 --- a/tests/manifests/example.yaml +++ b/tests/manifests/example.yaml @@ -39,8 +39,8 @@ spec: template: "{{ PROTO }}:{{ USERNAME }}:{{ PASSWORD }}/{{ DATABASE }}" target: app-connection-string - name: EXISTING - template: TEST - target: existing-target + template: "{{ PROTO }}" + target: app-connection-string - name: IS_POSTGRES template: | {{#if (eq PROTO "postgresql") }}