Compare commits
1 Commits
renovate/c
...
add-watche
Author | SHA1 | Date | |
---|---|---|---|
33c18ff340
|
@ -24,6 +24,7 @@ use thiserror::Error;
|
||||
version = "v1alpha1",
|
||||
namespaced
|
||||
)]
|
||||
|
||||
#[kube(status = "ConfigSetStatus", shortname = "confset")]
|
||||
pub struct ConfigSetSpec {
|
||||
pub targets: Vec<TargetWithName>,
|
||||
|
@ -2,19 +2,23 @@ use crate::api::v1alpha1::configsets_api::{
|
||||
ConfigSet, Input, InputWithName, TargetWithName, Templates,
|
||||
};
|
||||
use core::fmt;
|
||||
use futures::StreamExt;
|
||||
use futures::{StreamExt, TryStreamExt, stream};
|
||||
use handlebars::Handlebars;
|
||||
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
|
||||
use k8s_openapi::api::core::v1::{ConfigMap, Namespace, Secret};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
|
||||
use k8s_openapi::{ByteString, NamespaceResourceScope};
|
||||
use kube::api::{ListParams, PostParams};
|
||||
use kube::core::{Object, ObjectMeta};
|
||||
use kube::api::{GroupVersionKind, ListParams, PostParams, WatchEvent};
|
||||
use kube::core::{DynamicResourceScope, Object, ObjectMeta};
|
||||
use kube::error::ErrorResponse;
|
||||
use kube::runtime::controller::Action;
|
||||
use kube::runtime::finalizer::Event as Finalizer;
|
||||
use kube::runtime::watcher::Config;
|
||||
use kube::runtime::{finalizer, Controller};
|
||||
use kube::runtime::{
|
||||
watcher::{self, Config},
|
||||
WatchStreamExt,
|
||||
};
|
||||
use kube::{Api, Client, CustomResource};
|
||||
use kube_client::api::WatchParams;
|
||||
use kube_client::core::DynamicObject;
|
||||
use kube_client::{Resource, ResourceExt};
|
||||
use log::*;
|
||||
@ -94,13 +98,18 @@ pub async fn setup() {
|
||||
let client = Client::try_default()
|
||||
.await
|
||||
.expect("failed to create kube Client");
|
||||
let docs = Api::<ConfigSet>::all(client.clone());
|
||||
if let Err(e) = docs.list(&ListParams::default().limit(1)).await {
|
||||
let confsets = Api::<ConfigSet>::all(client.clone());
|
||||
if let Err(e) = confsets.list(&ListParams::default().limit(1)).await {
|
||||
error!("{}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
let ctx = Arc::new(Context { client });
|
||||
Controller::new(docs, Config::default().any_semantic())
|
||||
|
||||
tokio::spawn(setup_secret_watcher(client.clone()));
|
||||
|
||||
let ctx = Arc::new(Context {
|
||||
client: client.clone(),
|
||||
});
|
||||
Controller::new(confsets, Config::default().any_semantic())
|
||||
.shutdown_on_signal()
|
||||
.run(reconcile, error_policy, ctx)
|
||||
.filter_map(|x| async move { std::result::Result::ok(x) })
|
||||
@ -108,6 +117,107 @@ pub async fn setup() {
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn setup_secret_watcher(client: Client) {
|
||||
let namespace_api: Api<Namespace> = Api::all(client.clone());
|
||||
let stream = match namespace_api.watch(&WatchParams::default(), "0").await {
|
||||
Ok(status) => {
|
||||
let mut status_box = status.boxed();
|
||||
while let Some(status) = status_box.try_next().await.unwrap() {
|
||||
match status {
|
||||
WatchEvent::Added(ns) => {
|
||||
info!("namespace is being watched {}", ns.name_any());
|
||||
let sec_api = get_secret_api(client.clone(), ns.name_any());
|
||||
let cm_api = get_configmap_api(client.clone(), ns.name_any());
|
||||
tokio::spawn( async move {
|
||||
let stream = match sec_api.watch(&WatchParams::default(), "0").await {
|
||||
Ok(status) => {
|
||||
let mut status_box = status.boxed();
|
||||
while let Some(status) = status_box.try_next().await.unwrap() {
|
||||
match status {
|
||||
WatchEvent::Modified(secret) => {
|
||||
if is_shu_object(secret.metadata.clone()) {
|
||||
let config_set_name = secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap();
|
||||
info!(
|
||||
"secret {} was updated, triggering configset reconciliation",
|
||||
secret.metadata.clone().name.unwrap(),
|
||||
);
|
||||
watcher_trigger_reconcile(secret.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), secret.metadata.namespace.unwrap()).await;
|
||||
}
|
||||
}
|
||||
_ => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("couldn't start secrets watcher");
|
||||
},
|
||||
};
|
||||
});
|
||||
tokio::spawn( async move {
|
||||
let stream = match cm_api.watch(&WatchParams::default(), "0").await {
|
||||
Ok(status) => {
|
||||
let mut status_box = status.boxed();
|
||||
while let Some(status) = status_box.try_next().await.unwrap() {
|
||||
match status {
|
||||
WatchEvent::Modified(configmap) => {
|
||||
if is_shu_object(configmap.metadata.clone()) {
|
||||
let config_set_name = configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap();
|
||||
info!(
|
||||
"configmap {} was updated, triggering configset reconciliation",
|
||||
configmap.metadata.clone().name.unwrap(),
|
||||
);
|
||||
watcher_trigger_reconcile(configmap.metadata.clone().annotations.unwrap().get(WATCHED_BY_SHU).unwrap().to_string(), configmap.metadata.namespace.unwrap()).await;
|
||||
}
|
||||
},
|
||||
_ => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("couldn't start secrets watcher");
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
}
|
||||
_ => {},
|
||||
// WatchEvent::Deleted(_) => todo!(),
|
||||
// WatchEvent::Bookmark(_) => todo!(),
|
||||
// WatchEvent::Error(_) => todo!(),
|
||||
}
|
||||
};
|
||||
},
|
||||
Err(err) => {
|
||||
error!("couldn't watch namespaces, watcher is not enabled");
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
fn is_shu_object(metadata: ObjectMeta) -> bool {
|
||||
match metadata.annotations {
|
||||
Some(annotations) => match annotations.get(WATCHED_BY_SHU) {
|
||||
Some(_) => true,
|
||||
None => false,
|
||||
},
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn watcher_trigger_reconcile(config_set_name: String, namespace: String) {
|
||||
let client = Client::try_default().await.expect("failed to create kube Client");
|
||||
let api: Api<ConfigSet> = Api::namespaced(client.clone(), &namespace);
|
||||
let config_set = match api.get(&config_set_name).await {
|
||||
Ok(res) => {
|
||||
if let Err(err) = reconcile(Arc::new(res), Arc::new(Context {client })).await {
|
||||
error!("an error occured during reconciliation: {}", err);
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
error!("couldn't get configset: {}", err);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
fn error_policy(doc: Arc<ConfigSet>, error: &Error, ctx: Arc<Context>) -> Action {
|
||||
Action::requeue(Duration::from_secs(5 * 60))
|
||||
}
|
||||
@ -122,8 +232,10 @@ fn get_configmap_api(client: Client, namespace: String) -> Api<ConfigMap> {
|
||||
|
||||
async fn gather_inputs(
|
||||
client: Client,
|
||||
config_set_name: String,
|
||||
namespace: String,
|
||||
inputs: Vec<InputWithName>,
|
||||
is_removed: bool,
|
||||
) -> Result<HashMap<String, String>> {
|
||||
let mut result: HashMap<String, String> = HashMap::new();
|
||||
for i in inputs {
|
||||
@ -148,8 +260,19 @@ async fn gather_inputs(
|
||||
))))
|
||||
}
|
||||
};
|
||||
// Take dynamic resource identifiers:
|
||||
|
||||
// Use the full resource info to create an Api with the ApiResource as its DynamicType
|
||||
let api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
|
||||
let a: Api<DynamicObject> = api.try_into()?;
|
||||
|
||||
match is_removed {
|
||||
true => unset_annotations(, i.from.key, config_set_name).await?,
|
||||
false => set_annotations(api, i.from.key, config_set_name).await?,
|
||||
};
|
||||
value.to_string()
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
error!("{err}");
|
||||
return Err(Error::KubeError(err));
|
||||
@ -187,6 +310,49 @@ async fn gather_inputs(
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn set_annotations(api: Api<DynamicObject>, object_name: String, config_set_name: String) -> Result<()> {
|
||||
let object = match api.get_opt(&object_name).await {
|
||||
Ok(object) => match object {
|
||||
Some(object) => object,
|
||||
None => return Ok(()),
|
||||
},
|
||||
Err(err) => return Err(Error::KubeError(err)),
|
||||
};
|
||||
|
||||
let mut annotations = match object.clone().metadata.annotations {
|
||||
Some(annotations) => annotations,
|
||||
None => BTreeMap::new(),
|
||||
};
|
||||
|
||||
annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap();
|
||||
if let Err(err) = api.replace(&object_name, &PostParams::default(), &object).await {
|
||||
return Err(Error::KubeError(err));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn unset_annotations(api: Api<DynamicObject>, object_name: String, config_set_name: String) -> Result<()> {
|
||||
let object = match api.get_opt(&object_name).await {
|
||||
Ok(object) => match object {
|
||||
Some(object) => object,
|
||||
None => return Ok(()),
|
||||
},
|
||||
Err(err) => return Err(Error::KubeError(err)),
|
||||
};
|
||||
|
||||
let mut annotations = match object.clone().metadata.annotations {
|
||||
Some(annotations) => annotations,
|
||||
None => BTreeMap::new(),
|
||||
};
|
||||
|
||||
annotations.insert(WATCHED_BY_SHU.to_string(), config_set_name).unwrap();
|
||||
if let Err(err) = api.replace(&object_name, &PostParams::default(), &object).await {
|
||||
return Err(Error::KubeError(err));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
async fn gather_targets(
|
||||
client: Client,
|
||||
namespace: String,
|
||||
@ -399,8 +565,10 @@ impl ConfigSet {
|
||||
|
||||
let inputs: HashMap<String, String> = gather_inputs(
|
||||
ctx.client.clone(),
|
||||
self.name_any(),
|
||||
self.metadata.namespace.clone().unwrap(),
|
||||
self.spec.inputs.clone(),
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@ -470,8 +638,10 @@ impl ConfigSet {
|
||||
async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
|
||||
let inputs: HashMap<String, String> = gather_inputs(
|
||||
ctx.client.clone(),
|
||||
self.name_any(),
|
||||
self.metadata.namespace.clone().unwrap(),
|
||||
self.spec.inputs.clone(),
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -79,6 +79,16 @@ fn prepare_cluster_role(namespace: String) -> ClusterRole {
|
||||
],
|
||||
..Default::default()
|
||||
},
|
||||
PolicyRule {
|
||||
api_groups: Some(vec!["".to_string()]),
|
||||
resources: Some(vec!["namespaces".to_string()]),
|
||||
verbs: vec![
|
||||
"get".to_string(),
|
||||
"list".to_string(),
|
||||
"watch".to_string()
|
||||
],
|
||||
..Default::default()
|
||||
},
|
||||
];
|
||||
|
||||
ClusterRole {
|
||||
|
@ -52,6 +52,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -39,8 +39,8 @@ spec:
|
||||
template: "{{ PROTO }}:{{ USERNAME }}:{{ PASSWORD }}/{{ DATABASE }}"
|
||||
target: app-connection-string
|
||||
- name: EXISTING
|
||||
template: TEST
|
||||
target: existing-target
|
||||
template: "{{ PROTO }}"
|
||||
target: app-connection-string
|
||||
- name: IS_POSTGRES
|
||||
template: |
|
||||
{{#if (eq PROTO "postgresql") }}
|
||||
|
Reference in New Issue
Block a user