1 Commits

Author SHA1 Message Date
33c18ff340 Trying something that will probably never happen
Signed-off-by: Nikolai Rodionov <allanger@badhouseplants.net>
2025-05-31 23:38:09 +02:00
5 changed files with 193 additions and 12 deletions

View File

@ -24,6 +24,7 @@ use thiserror::Error;
version = "v1alpha1",
namespaced
)]
#[kube(status = "ConfigSetStatus", shortname = "confset")]
pub struct ConfigSetSpec {
pub targets: Vec<TargetWithName>,

View File

@ -2,19 +2,23 @@ use crate::api::v1alpha1::configsets_api::{
ConfigSet, Input, InputWithName, TargetWithName, Templates,
};
use core::fmt;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt, stream};
use handlebars::Handlebars;
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
use k8s_openapi::api::core::v1::{ConfigMap, Namespace, Secret};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
use k8s_openapi::{ByteString, NamespaceResourceScope};
use kube::api::{ListParams, PostParams};
use kube::core::{Object, ObjectMeta};
use kube::api::{GroupVersionKind, ListParams, PostParams, WatchEvent};
use kube::core::{DynamicResourceScope, Object, ObjectMeta};
use kube::error::ErrorResponse;
use kube::runtime::controller::Action;
use kube::runtime::finalizer::Event as Finalizer;
use kube::runtime::watcher::Config;
use kube::runtime::{finalizer, Controller};
use kube::runtime::{
watcher::{self, Config},
WatchStreamExt,
};
use kube::{Api, Client, CustomResource};
use kube_client::api::WatchParams;
use kube_client::core::DynamicObject;
use kube_client::{Resource, ResourceExt};
use log::*;
@ -94,13 +98,18 @@ pub async fn setup() {
let client = Client::try_default()
.await
.expect("failed to create kube Client");
let docs = Api::<ConfigSet>::all(client.clone());
if let Err(e) = docs.list(&ListParams::default().limit(1)).await {
let confsets = Api::<ConfigSet>::all(client.clone());
if let Err(e) = confsets.list(&ListParams::default().limit(1)).await {
error!("{}", e);
std::process::exit(1);
}
let ctx = Arc::new(Context { client });
Controller::new(docs, Config::default().any_semantic())
tokio::spawn(setup_secret_watcher(client.clone()));
let ctx = Arc::new(Context {
client: client.clone(),
});
Controller::new(confsets, Config::default().any_semantic())
.shutdown_on_signal()
.run(reconcile, error_policy, ctx)
.filter_map(|x| async move { std::result::Result::ok(x) })
@ -108,6 +117,107 @@ pub async fn setup() {
.await;
}
async fn setup_secret_watcher(client: Client) {
let namespace_api: Api<Namespace> = Api::all(client.clone());
let stream = match namespace_api.watch(&WatchParams::default(), "0").await {
Ok(status) => {
let mut status_box = status.boxed();
while let Some(status) = status_box.try_next().await.unwrap() {
match status {
WatchEvent::Added(ns) => {
info!("namespace is being watched {}", ns.name_any());
let sec_api = get_secret_api(client.clone(), ns.name_any());
let cm_api = get_configmap_api(client.clone(), ns.name_any());
tokio::spawn( async move {
let stream = match sec_api.watch(&WatchParams::default(), "0").await {
Ok(status) => {
let mut status_box = status.boxed();
while let Some(status) = status_box.try_next().await.unwrap() {
match status {
WatchEvent::Modified(secret) => {
if is_shu_object(secret.metadata.clone()) {
let config_set_name = secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap();
info!(
"secret {} was updated, triggering configset reconciliation",
secret.metadata.clone().name.unwrap(),
);
watcher_trigger_reconcile(secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), secret.metadata.namespace.unwrap()).await;
}
}
_ => {},
}
}
}
Err(err) => {
error!("couldn't start secrets watcher");
},
};
});
tokio::spawn( async move {
let stream = match cm_api.watch(&WatchParams::default(), "0").await {
Ok(status) => {
let mut status_box = status.boxed();
while let Some(status) = status_box.try_next().await.unwrap() {
match status {
WatchEvent::Modified(configmap) => {
if is_shu_object(configmap.metadata.clone()) {
let config_set_name = configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap();
info!(
"configmap {} was updated, triggering configset reconciliation",
configmap.metadata.clone().name.unwrap(),
);
watcher_trigger_reconcile(configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), configmap.metadata.namespace.unwrap()).await;
}
},
_ => {},
}
}
}
Err(err) => {
error!("couldn't start secrets watcher");
},
};
});
}
_ => {},
// WatchEvent::Deleted(_) => todo!(),
// WatchEvent::Bookmark(_) => todo!(),
// WatchEvent::Error(_) => todo!(),
}
};
},
Err(err) => {
error!("couldn't watch namespaces, watcher is not enabled");
},
};
}
fn is_shu_object(metadata: ObjectMeta) -> bool {
match metadata.annotations {
Some(annotations) => match annotations.get(WATCHED_BY_SHU) {
Some(_) => true,
None => false,
},
None => false,
}
}
async fn watcher_trigger_reconcile(config_set_name: String, namespace: String) {
let client = Client::try_default().await.expect("failed to create kube Client");
let api: Api<ConfigSet> = Api::namespaced(client.clone(), &namespace);
let config_set = match api.get(&config_set_name).await {
Ok(res) => {
if let Err(err) = reconcile(Arc::new(res), Arc::new(Context {client })).await {
error!("an error occured during reconciliation: {}", err);
}
},
Err(err) => {
error!("couldn't get configset: {}", err);
},
};
}
fn error_policy(doc: Arc<ConfigSet>, error: &Error, ctx: Arc<Context>) -> Action {
Action::requeue(Duration::from_secs(5 * 60))
}
@ -122,8 +232,10 @@ fn get_configmap_api(client: Client, namespace: String) -> Api<ConfigMap> {
async fn gather_inputs(
client: Client,
config_set_name: String,
namespace: String,
inputs: Vec<InputWithName>,
is_removed: bool,
) -> Result<HashMap<String, String>> {
let mut result: HashMap<String, String> = HashMap::new();
for i in inputs {
@ -148,8 +260,19 @@ async fn gather_inputs(
))))
}
};
// Take dynamic resource identifiers:
// Use the full resource info to create an Api with the ApiResource as its DynamicType
let api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
let a: Api<DynamicObject> = api.try_into()?;
match is_removed {
true => unset_annotations(, i.from.key, config_set_name).await?,
false => set_annotations(api, i.from.key, config_set_name).await?,
};
value.to_string()
}
Err(err) => {
error!("{err}");
return Err(Error::KubeError(err));
@ -187,6 +310,49 @@ async fn gather_inputs(
Ok(result)
}
async fn set_annotations(api: Api<DynamicObject>, object_name: String, config_set_name: String) -> Result<()> {
let object = match api.get_opt(&object_name).await {
Ok(object) => match object {
Some(object) => object,
None => return Ok(()),
},
Err(err) => return Err(Error::KubeError(err)),
};
let mut annotations = match object.clone().metadata.annotations {
Some(annotations) => annotations,
None => BTreeMap::new(),
};
annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap();
if let Err(err) = api.replace(&object_name, &PostParams::default(), &object).await {
return Err(Error::KubeError(err));
}
Ok(())
}
async fn unset_annotations(api: Api<DynamicObject>, object_name: String, config_set_name: String) -> Result<()> {
let object = match api.get_opt(&object_name).await {
Ok(object) => match object {
Some(object) => object,
None => return Ok(()),
},
Err(err) => return Err(Error::KubeError(err)),
};
let mut annotations = match object.clone().metadata.annotations {
Some(annotations) => annotations,
None => BTreeMap::new(),
};
annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap();
if let Err(err) = api.replace(&object_name, &PostParams::default(), &object).await {
return Err(Error::KubeError(err));
}
Ok(())
}
async fn gather_targets(
client: Client,
namespace: String,
@ -399,8 +565,10 @@ impl ConfigSet {
let inputs: HashMap<String, String> = gather_inputs(
ctx.client.clone(),
self.name_any(),
self.metadata.namespace.clone().unwrap(),
self.spec.inputs.clone(),
false,
)
.await?;
@ -470,8 +638,10 @@ impl ConfigSet {
async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
let inputs: HashMap<String, String> = gather_inputs(
ctx.client.clone(),
self.name_any(),
self.metadata.namespace.clone().unwrap(),
self.spec.inputs.clone(),
true,
)
.await?;

View File

@ -79,6 +79,16 @@ fn prepare_cluster_role(namespace: String) -> ClusterRole {
],
..Default::default()
},
PolicyRule {
api_groups: Some(vec!["".to_string()]),
resources: Some(vec!["namespaces".to_string()]),
verbs: vec![
"get".to_string(),
"list".to_string(),
"watch".to_string()
],
..Default::default()
},
];
ClusterRole {

View File

@ -39,8 +39,8 @@ spec:
template: "{{ PROTO }}:{{ USERNAME }}:{{ PASSWORD }}/{{ DATABASE }}"
target: app-connection-string
- name: EXISTING
template: TEST
target: existing-target
template: "{{ PROTO }}"
target: app-connection-string
- name: IS_POSTGRES
template: |
{{#if (eq PROTO "postgresql") }}