Trying something that will probably never happen
Signed-off-by: Nikolai Rodionov <allanger@badhouseplants.net>
This commit is contained in:
		@@ -24,6 +24,7 @@ use thiserror::Error;
 | 
			
		||||
    version = "v1alpha1",
 | 
			
		||||
    namespaced
 | 
			
		||||
)]
 | 
			
		||||
 | 
			
		||||
#[kube(status = "ConfigSetStatus", shortname = "confset")]
 | 
			
		||||
pub struct ConfigSetSpec {
 | 
			
		||||
    pub targets: Vec<TargetWithName>,
 | 
			
		||||
 
 | 
			
		||||
@@ -2,19 +2,23 @@ use crate::api::v1alpha1::configsets_api::{
 | 
			
		||||
    ConfigSet, Input, InputWithName, TargetWithName, Templates,
 | 
			
		||||
};
 | 
			
		||||
use core::fmt;
 | 
			
		||||
use futures::StreamExt;
 | 
			
		||||
use futures::{StreamExt, TryStreamExt, stream};
 | 
			
		||||
use handlebars::Handlebars;
 | 
			
		||||
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
 | 
			
		||||
use k8s_openapi::api::core::v1::{ConfigMap, Namespace, Secret};
 | 
			
		||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
 | 
			
		||||
use k8s_openapi::{ByteString, NamespaceResourceScope};
 | 
			
		||||
use kube::api::{ListParams, PostParams};
 | 
			
		||||
use kube::core::{Object, ObjectMeta};
 | 
			
		||||
use kube::api::{GroupVersionKind, ListParams, PostParams, WatchEvent};
 | 
			
		||||
use kube::core::{DynamicResourceScope, Object, ObjectMeta};
 | 
			
		||||
use kube::error::ErrorResponse;
 | 
			
		||||
use kube::runtime::controller::Action;
 | 
			
		||||
use kube::runtime::finalizer::Event as Finalizer;
 | 
			
		||||
use kube::runtime::watcher::Config;
 | 
			
		||||
use kube::runtime::{finalizer, Controller};
 | 
			
		||||
use kube::runtime::{
 | 
			
		||||
    watcher::{self, Config},
 | 
			
		||||
    WatchStreamExt,
 | 
			
		||||
};
 | 
			
		||||
use kube::{Api, Client, CustomResource};
 | 
			
		||||
use kube_client::api::WatchParams;
 | 
			
		||||
use kube_client::core::DynamicObject;
 | 
			
		||||
use kube_client::{Resource, ResourceExt};
 | 
			
		||||
use log::*;
 | 
			
		||||
@@ -94,13 +98,18 @@ pub async fn setup() {
 | 
			
		||||
    let client = Client::try_default()
 | 
			
		||||
        .await
 | 
			
		||||
        .expect("failed to create kube Client");
 | 
			
		||||
    let docs = Api::<ConfigSet>::all(client.clone());
 | 
			
		||||
    if let Err(e) = docs.list(&ListParams::default().limit(1)).await {
 | 
			
		||||
    let confsets = Api::<ConfigSet>::all(client.clone());
 | 
			
		||||
    if let Err(e) = confsets.list(&ListParams::default().limit(1)).await {
 | 
			
		||||
        error!("{}", e);
 | 
			
		||||
        std::process::exit(1);
 | 
			
		||||
    }
 | 
			
		||||
    let ctx = Arc::new(Context { client });
 | 
			
		||||
    Controller::new(docs, Config::default().any_semantic())
 | 
			
		||||
 | 
			
		||||
    tokio::spawn(setup_secret_watcher(client.clone()));
 | 
			
		||||
 | 
			
		||||
    let ctx = Arc::new(Context {
 | 
			
		||||
        client: client.clone(),
 | 
			
		||||
    });
 | 
			
		||||
    Controller::new(confsets, Config::default().any_semantic())
 | 
			
		||||
        .shutdown_on_signal()
 | 
			
		||||
        .run(reconcile, error_policy, ctx)
 | 
			
		||||
        .filter_map(|x| async move { std::result::Result::ok(x) })
 | 
			
		||||
@@ -108,6 +117,107 @@ pub async fn setup() {
 | 
			
		||||
        .await;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn setup_secret_watcher(client: Client) {
 | 
			
		||||
    let namespace_api: Api<Namespace> = Api::all(client.clone());
 | 
			
		||||
    let stream = match namespace_api.watch(&WatchParams::default(), "0").await {
 | 
			
		||||
        Ok(status) => {
 | 
			
		||||
            let mut status_box = status.boxed();
 | 
			
		||||
            while let Some(status) = status_box.try_next().await.unwrap() {
 | 
			
		||||
                match status {
 | 
			
		||||
                    WatchEvent::Added(ns) => {
 | 
			
		||||
                        info!("namespace is being watched {}", ns.name_any());
 | 
			
		||||
                        let sec_api = get_secret_api(client.clone(), ns.name_any());
 | 
			
		||||
                        let cm_api = get_configmap_api(client.clone(), ns.name_any());
 | 
			
		||||
                        tokio::spawn( async move {
 | 
			
		||||
                            let stream = match sec_api.watch(&WatchParams::default(), "0").await {
 | 
			
		||||
                                Ok(status) => {
 | 
			
		||||
                                    let mut status_box = status.boxed();
 | 
			
		||||
                                    while let Some(status) = status_box.try_next().await.unwrap() {
 | 
			
		||||
                                        match status {
 | 
			
		||||
                                            WatchEvent::Modified(secret) => {
 | 
			
		||||
                                                if is_shu_object(secret.metadata.clone()) {
 | 
			
		||||
                                                    let config_set_name = secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap();
 | 
			
		||||
                                                    info!(
 | 
			
		||||
                                                        "secret {} was updated, triggering configset reconciliation",
 | 
			
		||||
                                                        secret.metadata.clone().name.unwrap(),
 | 
			
		||||
                                                    );
 | 
			
		||||
                                                    watcher_trigger_reconcile(secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), secret.metadata.namespace.unwrap()).await;
 | 
			
		||||
                                                }
 | 
			
		||||
                                            }
 | 
			
		||||
                                            _ => {},
 | 
			
		||||
                                        }
 | 
			
		||||
                                    }
 | 
			
		||||
                                }
 | 
			
		||||
                                Err(err) => {
 | 
			
		||||
                                    error!("couldn't start secrets watcher");
 | 
			
		||||
                                },
 | 
			
		||||
                            };
 | 
			
		||||
                        });
 | 
			
		||||
                        tokio::spawn( async move {
 | 
			
		||||
                            let stream = match cm_api.watch(&WatchParams::default(), "0").await {
 | 
			
		||||
                                Ok(status) => {
 | 
			
		||||
                                    let mut status_box = status.boxed();
 | 
			
		||||
                                    while let Some(status) = status_box.try_next().await.unwrap() {
 | 
			
		||||
                                        match status {
 | 
			
		||||
                                            WatchEvent::Modified(configmap) => {
 | 
			
		||||
                                                if is_shu_object(configmap.metadata.clone()) {
 | 
			
		||||
                                                    let config_set_name = configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap();
 | 
			
		||||
                                                    info!(
 | 
			
		||||
                                                        "configmap {} was updated, triggering configset reconciliation",
 | 
			
		||||
                                                        configmap.metadata.clone().name.unwrap(),
 | 
			
		||||
                                                    );
 | 
			
		||||
                                                    watcher_trigger_reconcile(configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), configmap.metadata.namespace.unwrap()).await;
 | 
			
		||||
                                                }
 | 
			
		||||
                                            },
 | 
			
		||||
                                            _ => {},
 | 
			
		||||
                                        }
 | 
			
		||||
                                    }
 | 
			
		||||
                                }
 | 
			
		||||
                                Err(err) => {
 | 
			
		||||
                                    error!("couldn't start secrets watcher");
 | 
			
		||||
                                },
 | 
			
		||||
                             };
 | 
			
		||||
                        });
 | 
			
		||||
                        
 | 
			
		||||
                    }
 | 
			
		||||
                    _ => {},
 | 
			
		||||
//                    WatchEvent::Deleted(_) => todo!(),
 | 
			
		||||
//                    WatchEvent::Bookmark(_) => todo!(),
 | 
			
		||||
//                    WatchEvent::Error(_) => todo!(),
 | 
			
		||||
                }
 | 
			
		||||
            };
 | 
			
		||||
        },
 | 
			
		||||
        Err(err) => {
 | 
			
		||||
            error!("couldn't watch namespaces, watcher is not enabled");
 | 
			
		||||
        },
 | 
			
		||||
    };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn is_shu_object(metadata: ObjectMeta) -> bool {
 | 
			
		||||
    match metadata.annotations {
 | 
			
		||||
        Some(annotations) => match annotations.get(WATCHED_BY_SHU) {
 | 
			
		||||
            Some(_) => true,
 | 
			
		||||
            None => false,
 | 
			
		||||
        },
 | 
			
		||||
        None => false,
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn watcher_trigger_reconcile(config_set_name: String, namespace: String) {
 | 
			
		||||
    let client = Client::try_default().await.expect("failed to create kube Client");
 | 
			
		||||
    let api: Api<ConfigSet> = Api::namespaced(client.clone(), &namespace);
 | 
			
		||||
    let config_set = match api.get(&config_set_name).await {
 | 
			
		||||
        Ok(res) => {
 | 
			
		||||
            if let Err(err) =  reconcile(Arc::new(res), Arc::new(Context {client })).await {
 | 
			
		||||
                error!("an error occured during reconciliation: {}", err);      
 | 
			
		||||
            }
 | 
			
		||||
        },
 | 
			
		||||
        Err(err) => {
 | 
			
		||||
            error!("couldn't get configset: {}", err);
 | 
			
		||||
        },
 | 
			
		||||
    };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn error_policy(doc: Arc<ConfigSet>, error: &Error, ctx: Arc<Context>) -> Action {
 | 
			
		||||
    Action::requeue(Duration::from_secs(5 * 60))
 | 
			
		||||
}
 | 
			
		||||
@@ -122,8 +232,10 @@ fn get_configmap_api(client: Client, namespace: String) -> Api<ConfigMap> {
 | 
			
		||||
 | 
			
		||||
async fn gather_inputs(
 | 
			
		||||
    client: Client,
 | 
			
		||||
    config_set_name: String,
 | 
			
		||||
    namespace: String,
 | 
			
		||||
    inputs: Vec<InputWithName>,
 | 
			
		||||
    is_removed: bool,
 | 
			
		||||
) -> Result<HashMap<String, String>> {
 | 
			
		||||
    let mut result: HashMap<String, String> = HashMap::new();
 | 
			
		||||
    for i in inputs {
 | 
			
		||||
@@ -148,8 +260,19 @@ async fn gather_inputs(
 | 
			
		||||
                                ))))
 | 
			
		||||
                            }
 | 
			
		||||
                        };
 | 
			
		||||
    // Take dynamic resource identifiers:
 | 
			
		||||
 | 
			
		||||
                        // Use the full resource info to create an Api with the ApiResource as its DynamicType
 | 
			
		||||
                        let api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
 | 
			
		||||
                        let a: Api<DynamicObject> = api.try_into()?;
 | 
			
		||||
 | 
			
		||||
                        match is_removed {
 | 
			
		||||
                            true => unset_annotations(, i.from.key, config_set_name).await?,
 | 
			
		||||
                            false => set_annotations(api, i.from.key, config_set_name).await?,
 | 
			
		||||
                        };
 | 
			
		||||
                        value.to_string()
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    Err(err) => {
 | 
			
		||||
                        error!("{err}");
 | 
			
		||||
                        return Err(Error::KubeError(err));
 | 
			
		||||
@@ -187,6 +310,49 @@ async fn gather_inputs(
 | 
			
		||||
    Ok(result)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn set_annotations(api: Api<DynamicObject>, object_name: String, config_set_name: String) -> Result<()> {
 | 
			
		||||
    let object = match api.get_opt(&object_name).await {
 | 
			
		||||
        Ok(object) => match object {
 | 
			
		||||
            Some(object) => object,
 | 
			
		||||
            None => return Ok(()),
 | 
			
		||||
        },
 | 
			
		||||
        Err(err) => return Err(Error::KubeError(err)),
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    let mut annotations = match object.clone().metadata.annotations {
 | 
			
		||||
        Some(annotations) => annotations,
 | 
			
		||||
        None => BTreeMap::new(),
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap();
 | 
			
		||||
    if let Err(err) =  api.replace(&object_name, &PostParams::default(), &object).await {
 | 
			
		||||
        return Err(Error::KubeError(err));
 | 
			
		||||
    }
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn unset_annotations(api: Api<DynamicObject>, object_name: String, config_set_name: String) -> Result<()> {
 | 
			
		||||
    let object = match api.get_opt(&object_name).await {
 | 
			
		||||
        Ok(object) => match object {
 | 
			
		||||
            Some(object) => object,
 | 
			
		||||
            None => return Ok(()),
 | 
			
		||||
        },
 | 
			
		||||
        Err(err) => return Err(Error::KubeError(err)),
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    let mut annotations = match object.clone().metadata.annotations {
 | 
			
		||||
        Some(annotations) => annotations,
 | 
			
		||||
        None => BTreeMap::new(),
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap();
 | 
			
		||||
    if let Err(err) =  api.replace(&object_name, &PostParams::default(), &object).await {
 | 
			
		||||
        return Err(Error::KubeError(err));
 | 
			
		||||
    }
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async fn gather_targets(
 | 
			
		||||
    client: Client,
 | 
			
		||||
    namespace: String,
 | 
			
		||||
@@ -399,8 +565,10 @@ impl ConfigSet {
 | 
			
		||||
 | 
			
		||||
        let inputs: HashMap<String, String> = gather_inputs(
 | 
			
		||||
            ctx.client.clone(),
 | 
			
		||||
            self.name_any(),
 | 
			
		||||
            self.metadata.namespace.clone().unwrap(),
 | 
			
		||||
            self.spec.inputs.clone(),
 | 
			
		||||
            false,
 | 
			
		||||
        )
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
@@ -470,8 +638,10 @@ impl ConfigSet {
 | 
			
		||||
    async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
 | 
			
		||||
        let inputs: HashMap<String, String> = gather_inputs(
 | 
			
		||||
            ctx.client.clone(),
 | 
			
		||||
            self.name_any(),
 | 
			
		||||
            self.metadata.namespace.clone().unwrap(),
 | 
			
		||||
            self.spec.inputs.clone(),
 | 
			
		||||
            true,
 | 
			
		||||
        )
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -79,6 +79,16 @@ fn prepare_cluster_role(namespace: String) -> ClusterRole {
 | 
			
		||||
            ],
 | 
			
		||||
            ..Default::default()
 | 
			
		||||
        },
 | 
			
		||||
        PolicyRule {
 | 
			
		||||
            api_groups: Some(vec!["".to_string()]),
 | 
			
		||||
            resources: Some(vec!["namespaces".to_string()]),
 | 
			
		||||
            verbs: vec![
 | 
			
		||||
                "get".to_string(),
 | 
			
		||||
                "list".to_string(),
 | 
			
		||||
                "watch".to_string()
 | 
			
		||||
            ],
 | 
			
		||||
            ..Default::default()
 | 
			
		||||
        },
 | 
			
		||||
    ];
 | 
			
		||||
 | 
			
		||||
    ClusterRole {
 | 
			
		||||
 
 | 
			
		||||
@@ -52,6 +52,6 @@ async fn main() -> anyhow::Result<()> {
 | 
			
		||||
            };
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -39,8 +39,8 @@ spec:
 | 
			
		||||
      template: "{{ PROTO }}:{{ USERNAME }}:{{ PASSWORD }}/{{ DATABASE }}"
 | 
			
		||||
      target: app-connection-string
 | 
			
		||||
    - name: EXISTING
 | 
			
		||||
      template: TEST
 | 
			
		||||
      target: existing-target
 | 
			
		||||
      template: "{{ PROTO }}"
 | 
			
		||||
      target: app-connection-string
 | 
			
		||||
    - name: IS_POSTGRES
 | 
			
		||||
      template: |
 | 
			
		||||
        {{#if (eq PROTO "postgresql") }}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user